1
나는 Flink에 처음 온 사람입니다. 이 코드는지도, 그룹 및 JSON 입력 합계가 있습니다.Flink keyBy groping issue
단어 수 계산식과 매우 유사합니다.
내가 어떤 이유로 난 당신의 코드를 실행(occupied,1) (vacant,1) (occupied,2)
public static void main(String[] args) throws Exception {
String s = "{\n" +
" \"Port_128\": \"occupied\",\n" +
" \"Port_129\": \"occupied\",\n" +
" \"Port_120\": \"vacant\"\n" +
"\n" +
"}";
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> in = env.fromElements(s);
SingleOutputStreamOperator<Tuple2<String, Integer>> t =
in.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>>
collector) throws Exception {
ObjectMapper mapper = new ObjectMapper();
JsonNode node = mapper.readTree(s);
node.elements().forEachRemaining(v -> {
collector.collect(new Tuple2<>(v.textValue(), 1));
});
}
}).keyBy(0).sum(1);
t.print();
env.execute();
:
아래에 의견을 당 EDIT
, 여기 당신이 원하는 출력을 줄 것입니다 코드입니다 단어 수를 예를 들어? – MIkCode
데이터 스트림 프로그램을 설정했습니다. Flink 단어 개수 예제는 DataSet 프로그램입니다. 둘은 다르게 행동합니다. 스트림의 데이터는 파이프 라인을 통해 수신 될 때 처리되므로 왜 통과하는지 각 요소에서 처리됩니다. wordcount 예제와 같은 DataSet 코드를 사용하는 코드가 변경되면서 답변을 업데이트하겠습니다. 실행하면 예상 한 결과를 얻을 수 있습니다. – Jicaar
지금 내가 가지고있어, 나의 실수는 내가 스트림을 사용하고 데이터 세트가 아니었다. – MIkCode