2016-07-15 2 views
0

나는 카프카에서 데이터를 읽고 플린크로 인쇄하는 간단한 프로그램을 작성했습니다. 아래는 코드입니다. FlinkKafkaConsumer09는 몇 가지 메시지를 반복해서 읽습니다.

public static void main(String[] args) throws Exception { 

    Options flinkPipelineOptions = PipelineOptionsFactory.create().as(Options.class); 

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
    Class<?> unmodColl = Class.forName("java.util.Collections$UnmodifiableCollection"); 
    env.getConfig().addDefaultKryoSerializer(unmodColl, UnmodifiableCollectionsSerializer.class); 
    env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE); 

    flinkPipelineOptions.setJobName("MyFlinkTest"); 
    flinkPipelineOptions.setStreaming(true); 
    flinkPipelineOptions.setCheckpointingInterval(1000L); 
    flinkPipelineOptions.setNumberOfExecutionRetries(5); 
    flinkPipelineOptions.setExecutionRetryDelay(3000L); 

    Properties p = new Properties(); 
    p.setProperty("zookeeper.connect", "localhost:2181"); 
    p.setProperty("bootstrap.servers", "localhost:9092"); 
    p.setProperty("group.id", "test"); 

    FlinkKafkaConsumer09<Notification> kafkaConsumer = new FlinkKafkaConsumer09<>("testFlink",new ProtoDeserializer(),p); 

    DataStream<Notification> input = env.addSource(kafkaConsumer); 

    input.rebalance().map(new MapFunction<Notification, String>() { 
     @Override 
     public String map(Notification value) throws Exception { 
      return "Kafka and Flink says: " + value.toString(); 
     } 

    }).print(); 

    env.execute(); 
} 

나는 정확히 한 번만 카프카 내 데이터를 처리하는 FLINK이 필요하고 나는 그것을 할 수있는 방법에 대한 몇 가지 질문이 있습니다.

  • FlinkKafkaConsumer09는 처리 된 오프셋을 kafka로 언제 커밋합니까?
  • 내 주제에 10 개의 메시지가 있다고 가정하면 소비자는 10 개의 메시지를 처리합니다. 작업을 중지하고 다시 시작하면 이전에 읽은 메시지 세트에서 임의의 메시지를 처리하기 시작합니다. 내 메시지가 두 번 처리되지 않도록해야합니다.

모든 도움을 감사하십시오. 감사.

답변

0

이 페이지는 fault tolerance guarantees of the Flink Kafka connector에 대해 설명합니다.

Flink's savepoints to re-start a job in an exactly-once (state preserving) manner을 사용할 수 있습니다.

Flink가 Kafka 브로커/사육사에게 위임 한 오프셋이 Flink의 등록 된 상태와 일치하지 않기 때문에 메시지를 다시 볼 수 있습니다. 의미론이 활성화 된 경우에도 Flink에서 복원/실패 후 여러 번 처리 된 메시지가 항상 표시됩니다. exactly-once guarantees in Flink은 등록 된 상태와 관련이 있으며 운영자에게 보내는 레코드가 아닙니다.


약간의 오프 주제 :이 라인은 무엇입니까? 그들은 어디서나 Flink로 넘어 가지 않습니다.

Options flinkPipelineOptions = PipelineOptionsFactory.create().as(Options.class); 
flinkPipelineOptions.setJobName("MyFlinkTest"); 
flinkPipelineOptions.setStreaming(true); 
flinkPipelineOptions.setCheckpointingInterval(1000L); 
flinkPipelineOptions.setNumberOfExecutionRetries(5); 
flinkPipelineOptions.setExecutionRetryDelay(3000L); 
+0

답장을 보내 주셔서 감사합니다. rmetzger. 죄송합니다 flinkPipelineOptions가 데이터 흐름 러너에 제공되었으며이를 제거하는 것을 잊었습니다. 당신이 지적한 튜토리얼을 읽었지 만 상태를 저장하고 다음 번에 재개 할 수있는 방법에 대해서는 여전히 명확하지 않습니다. 가능한 모든 예제가 정말 도움이 될 것입니다. 감사와 도움을 주셔서 감사합니다. – Neoster