2017-11-28 3 views
1

다른 카프카 싱크에 데이터를 쓰는 데 Flink의 출력 기능을 사용합니다. 측면 출력은 IDE에서 실행될 때 데이터를 기록하지만 Flink 클러스터에서는 기록되지 않습니다. 왜 그런가?데이터가 IDE에서는 출력되지만 클러스터에는 출력되지 않습니다.

final OutputTag<SideOutputObject> sideOutputTag = new OutputTag<SideOutputObject>("side-output-tag"){}; 

SingleOutputStreamOperator<String> processedDataStream = outputStream 
       .process(new ProcessAndSortBinaryData(sideOutputTag)) 
       .startNewChain() 
       .name("processedDataStream") 
       .uid("processedDataStream"); 

DataStream<String> sideOutputObjectStream = processedDataStream.getSideOutput(sideOutputTag) 
       .flatMap(new FlatMapFunction<SideOutputObject, String>() { 
        @Override 
        public void flatMap(SideOutputObject sideOutputObject, Collector<String> collector) throws Exception { 
         System.out.println("sideOutputObject in side output flat map!"); 
         collector.collect(sideOutputObject.toString()); 
        } 
       }) 
       .startNewChain() 
       .name("sideOutputStream") 
       .uid("sideOutputStream"); 


sideOutputObjectStream.addSink(new FlinkKafkaProducer010<>(
        "sideOutputKafkaTopic", 
        new SimpleStringSchema(), 
        kafkaSinkProperties) 
      ).name("sideOutput-KafkaSink") 
        .uid("sideOutput-KafkaSink"); 

flatmap는 클러스터 GUI에서 레코드를받은과에서 System.out.println 메시지 중 하나를 표준 출력에 기록되지 않습니다 것을 알 수 없습니다 : 여기

코드의 샘플입니다.

도움을 주시면 감사하겠습니다. 미리 감사드립니다!

답변

0

프로세스 기능 ProcessAndSortBinaryData() 로직을 익명의 내부 함수 (문서를 다시 읽은 후 ...)로 옮겨 보았습니다. 이제는 클러스터에서 작동하고 있습니다.