나는 카프카에서 데이터를 읽고 플린크로 인쇄하는 간단한 프로그램을 작성했습니다. 아래는 코드입니다. FlinkKafkaConsumer09는 몇 가지 메시지를 반복해서 읽습니다.
public static void main(String[] args) throws Exception {
Options flinkPipelineOptions = PipelineOptionsFactory.create().as(Options.class);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Class<?> unmodColl = Class.forName("java.util.Collections$UnmodifiableCollection");
env.getConfig().addDefaultKryoSerializer(unmodColl, UnmodifiableCollectionsSerializer.class);
env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
flinkPipelineOptions.setJobName("MyFlinkTest");
flinkPipelineOptions.setStreaming(true);
flinkPipelineOptions.setCheckpointingInterval(1000L);
flinkPipelineOptions.setNumberOfExecutionRetries(5);
flinkPipelineOptions.setExecutionRetryDelay(3000L);
Properties p = new Properties();
p.setProperty("zookeeper.connect", "localhost:2181");
p.setProperty("bootstrap.servers", "localhost:9092");
p.setProperty("group.id", "test");
FlinkKafkaConsumer09<Notification> kafkaConsumer = new FlinkKafkaConsumer09<>("testFlink",new ProtoDeserializer(),p);
DataStream<Notification> input = env.addSource(kafkaConsumer);
input.rebalance().map(new MapFunction<Notification, String>() {
@Override
public String map(Notification value) throws Exception {
return "Kafka and Flink says: " + value.toString();
}
}).print();
env.execute();
}
나는 정확히 한 번만 카프카 내 데이터를 처리하는 FLINK이 필요하고 나는 그것을 할 수있는 방법에 대한 몇 가지 질문이 있습니다.
- FlinkKafkaConsumer09는 처리 된 오프셋을 kafka로 언제 커밋합니까?
- 내 주제에 10 개의 메시지가 있다고 가정하면 소비자는 10 개의 메시지를 처리합니다. 작업을 중지하고 다시 시작하면 이전에 읽은 메시지 세트에서 임의의 메시지를 처리하기 시작합니다. 내 메시지가 두 번 처리되지 않도록해야합니다.
모든 도움을 감사하십시오. 감사.
답장을 보내 주셔서 감사합니다. rmetzger. 죄송합니다 flinkPipelineOptions가 데이터 흐름 러너에 제공되었으며이를 제거하는 것을 잊었습니다. 당신이 지적한 튜토리얼을 읽었지 만 상태를 저장하고 다음 번에 재개 할 수있는 방법에 대해서는 여전히 명확하지 않습니다. 가능한 모든 예제가 정말 도움이 될 것입니다. 감사와 도움을 주셔서 감사합니다. – Neoster