2017-10-19 6 views
1

나는 Flink에 처음 온 사람입니다. 이 코드는지도, 그룹 및 JSON 입력 합계가 있습니다.Flink keyBy groping issue

단어 수 계산식과 매우 유사합니다.

내가 어떤 이유로 난 당신의 코드를 실행 (occupied,1) (vacant,1) (occupied,2)

public static void main(String[] args) throws Exception { 
     String s = "{\n" + 
       " \"Port_128\": \"occupied\",\n" + 
       " \"Port_129\": \"occupied\",\n" + 
       " \"Port_120\": \"vacant\"\n" + 
       "\n" + 
       "}"; 
     StreamExecutionEnvironment env = 
     StreamExecutionEnvironment.getExecutionEnvironment(); 
     DataStream<String> in = env.fromElements(s); 
     SingleOutputStreamOperator<Tuple2<String, Integer>> t = 
     in.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { 
      @Override 
      public void flatMap(String s, Collector<Tuple2<String, Integer>> 
      collector) throws Exception { 
       ObjectMapper mapper = new ObjectMapper(); 
       JsonNode node = mapper.readTree(s); 
       node.elements().forEachRemaining(v -> { 
        collector.collect(new Tuple2<>(v.textValue(), 1)); 
       }); 

      } 
     }).keyBy(0).sum(1); 

     t.print(); 
     env.execute(); 

답변

1

받고 있어요, (vacant,1) (occupied,2)

을 얻을하지만 것으로 예상, 내가 얻을 : 약간 다릅니다

10/19/2017 11:27:38 Keyed Aggregation -> Sink: Unnamed(1/1) switched to RUNNING 
(occupied,1) 
(occupied,2) 
(vacant,1) 
10/19/2017 11:28:03 Keyed Aggregation -> Sink: Unnamed(1/1) switched to FINISHED 

당신의 출력하지만 중요합니다. 그 이유는 코드가 데이터를받을 때 각 키의 합계를 출력하기 때문입니다. 그래서 처음에는 점유 된 첫 번째 (출력 1)를 얻은 다음 두 번째 (이 키 처리 된 프로세스에 대한 합계 출력은 이제 2)가됩니다. 빈 키를 다른 키순 프로세스로 보내고 1을 출력합니다. 따라서 이것은 나에게 적절한 결과물처럼 보입니다. 내가 KeydAggregation에서 때마다이를 차단하는 방법에서 differant 그것은`어떻게

public static void main(String[] args) throws Exception { 
    String s = "{\n" + 
     " \"Port_128\": \"occupied\",\n" + 
     " \"Port_129\": \"occupied\",\n" + 
     " \"Port_120\": \"vacant\"\n" + 
     "\n" + 
     "}"; 
    ExecutionEnvironment env = 
     ExecutionEnvironment.getExecutionEnvironment(); 
    DataSet<String> in = env.fromElements(s); 
    AggregateOperator<Tuple2<String, Integer>> t = 
     in.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { 
     @Override 
     public void flatMap(String s, Collector<Tuple2<String, Integer>> 
      collector) throws Exception { 
      ObjectMapper mapper = new ObjectMapper(); 
      JsonNode node = mapper.readTree(s); 
      node.elements().forEachRemaining(v -> { 
      collector.collect(new Tuple2<>(v.textValue(), 1)); 
      }); 

     } 
     }).groupBy(0).sum(1); 

    t.print(); 
    env.execute(); 
} 
+0

:

아래에 의견을 당 EDIT

, 여기 당신이 원하는 출력을 줄 것입니다 코드입니다 단어 수를 예를 들어? – MIkCode

+1

데이터 스트림 프로그램을 설정했습니다. Flink 단어 개수 예제는 DataSet 프로그램입니다. 둘은 다르게 행동합니다. 스트림의 데이터는 파이프 라인을 통해 수신 될 때 처리되므로 왜 통과하는지 각 요소에서 처리됩니다. wordcount 예제와 같은 DataSet 코드를 사용하는 코드가 변경되면서 답변을 업데이트하겠습니다. 실행하면 예상 한 결과를 얻을 수 있습니다. – Jicaar

+0

지금 내가 가지고있어, 나의 실수는 내가 스트림을 사용하고 데이터 세트가 아니었다. – MIkCode