1
다른 카프카 싱크에 데이터를 쓰는 데 Flink의 출력 기능을 사용합니다. 측면 출력은 IDE에서 실행될 때 데이터를 기록하지만 Flink 클러스터에서는 기록되지 않습니다. 왜 그런가?데이터가 IDE에서는 출력되지만 클러스터에는 출력되지 않습니다.
final OutputTag<SideOutputObject> sideOutputTag = new OutputTag<SideOutputObject>("side-output-tag"){};
SingleOutputStreamOperator<String> processedDataStream = outputStream
.process(new ProcessAndSortBinaryData(sideOutputTag))
.startNewChain()
.name("processedDataStream")
.uid("processedDataStream");
DataStream<String> sideOutputObjectStream = processedDataStream.getSideOutput(sideOutputTag)
.flatMap(new FlatMapFunction<SideOutputObject, String>() {
@Override
public void flatMap(SideOutputObject sideOutputObject, Collector<String> collector) throws Exception {
System.out.println("sideOutputObject in side output flat map!");
collector.collect(sideOutputObject.toString());
}
})
.startNewChain()
.name("sideOutputStream")
.uid("sideOutputStream");
sideOutputObjectStream.addSink(new FlinkKafkaProducer010<>(
"sideOutputKafkaTopic",
new SimpleStringSchema(),
kafkaSinkProperties)
).name("sideOutput-KafkaSink")
.uid("sideOutput-KafkaSink");
flatmap는 클러스터 GUI에서 레코드를받은과에서 System.out.println 메시지 중 하나를 표준 출력에 기록되지 않습니다 것을 알 수 없습니다 : 여기
코드의 샘플입니다.도움을 주시면 감사하겠습니다. 미리 감사드립니다!