2017-12-07 17 views
0

지도 함수 (SCALA) 내에서 kafka 토픽에 쓰기? 나는 목록을 반복하고있는지도 기능 내 -지도 기능 지도 메서드 내에서 kafka 토픽에 게시

  • 문제 설명 내에서 데이터를 처리하는 FLINK 응용 프로그램
  • 의 카프카 항목에서 읽기

    1. . 목록의 각 요소에 대해 카프카 (kafka) 주제에 게시하고 싶습니다.
    2. 나는지도의 출력을 얻을 작동,하지만 난지도 방법 내에서 주제에 밀어 넣기를 시도하는 경우는
    3. 는지도 방법 내에서 주제에 게시하는 것이 가능하지 않으며 가라 앉을 때

      // Main Function 
      def main(args: Array[String]) { 
      
      ... 
      // some list 
      val list_ = ("a", "b", "c", "d") 
      // Setup Properties 
      val props = new Properties() 
      props.setProperty("zookeeper.connect", zookeeper_url + ":" + zookeeper_port) 
      props.setProperty("bootstrap.servers", broker_url + ":" + broker_port) 
      props.setProperty("auto.offset.reset", "earliest") 
      props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") 
      props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") 
      
      ... 
      
      // Connect to Source 
      val input_stream = env.addSource(new FlinkKafkaConsumer09[String](topic_in, new SimpleStringSchema(), properties)) 
      
      // Process each Record 
      val stream = input_stream.map(x=> {   
      
          // loop through list "list_" -> variable in in Main 
          // and publish to topic_out 
          // -- THIS IS MY CURRENT ISSUE !!!) 
          // -- Does not work (No compile issue) 
          // 
          var producer2 = new KafkaProducer[String, String](props) 
          var record = new ProducerRecord(topic_out, "KEY", list(i)) 
          producer2.send(record) 
          producer2.flush() 
      
      // ... Other process and return processed string 
      
      }) 
      
      // publish to different topic of proccessed input string (Works) 
      stream.addSink(new FlinkKafkaProducer09[String](broker_url + ":" + broker_port, other_topic, new SimpleStringSchema())) 
      
  • +0

    "작동하지 않는"것은 무엇입니까? 왜 당신은 루프에서 프로듀서를 만들고 있니? 모든'vars'와의 거래는 무엇입니까? '나는 무엇입니까? 왜 다른 제작자 클래스를 '맵'내부 및 외부에서 사용하고 있습니까? – Dima

    +0

    "작동하지 않습니다."- 내가 작업을 제출할 때. 그것은 작업을 실행하지만 그것은 생산자 코드가 포함되어 있으면 맵 기능에 들어 가지 않습니다 솔기 " –

    +0

    입력 스트림에서 일부 조건을 기반으로 목록을 통해 루프 및 항목 항목을 게시 할 관심 –

    답변

    3

    는지도 기능의 내부 카프카 생산을하지 말고지도의 내부 카프카의 주제를 작성하려고하지 않습니다. 솔직히, 나는 그것이 나쁜 생각이라고 말하는 것을 인용 할 수는 없다. 그러나 그것은 나쁜 생각이다.

    대신. 지도 함수를 flatMap으로 변경하십시오 (첫 번째 예제는 여기 https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/datastream_api.html 참조).

    그래서 루프마다 kafka 제작자를 모든 루프로 만드는 대신 collector.collect(recordToPublishToKafka)을 작성하십시오.

    싱크대는 수집 될 때마다 게시합니다.

    +0

    고맙습니다, 도움이됩니다. (나는 여전히 Flink/Kafka 도구 세트를 배우고 있으므로 나쁜 생각입니다 ...) 당신의 조언. 많은 감사와 안부 –