지도 함수 (SCALA) 내에서 kafka 토픽에 쓰기? 나는 목록을 반복하고있는지도 기능 내 -지도 기능 지도 메서드 내에서 kafka 토픽에 게시
- . 목록의 각 요소에 대해 카프카 (kafka) 주제에 게시하고 싶습니다.
- 나는지도의 출력을 얻을 작동,하지만 난지도 방법 내에서 주제에 밀어 넣기를 시도하는 경우는
는지도 방법 내에서 주제에 게시하는 것이 가능하지 않으며 가라 앉을 때
// Main Function def main(args: Array[String]) { ... // some list val list_ = ("a", "b", "c", "d") // Setup Properties val props = new Properties() props.setProperty("zookeeper.connect", zookeeper_url + ":" + zookeeper_port) props.setProperty("bootstrap.servers", broker_url + ":" + broker_port) props.setProperty("auto.offset.reset", "earliest") props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") ... // Connect to Source val input_stream = env.addSource(new FlinkKafkaConsumer09[String](topic_in, new SimpleStringSchema(), properties)) // Process each Record val stream = input_stream.map(x=> { // loop through list "list_" -> variable in in Main // and publish to topic_out // -- THIS IS MY CURRENT ISSUE !!!) // -- Does not work (No compile issue) // var producer2 = new KafkaProducer[String, String](props) var record = new ProducerRecord(topic_out, "KEY", list(i)) producer2.send(record) producer2.flush() // ... Other process and return processed string }) // publish to different topic of proccessed input string (Works) stream.addSink(new FlinkKafkaProducer09[String](broker_url + ":" + broker_port, other_topic, new SimpleStringSchema()))
"작동하지 않는"것은 무엇입니까? 왜 당신은 루프에서 프로듀서를 만들고 있니? 모든'vars'와의 거래는 무엇입니까? '나는 무엇입니까? 왜 다른 제작자 클래스를 '맵'내부 및 외부에서 사용하고 있습니까? – Dima
"작동하지 않습니다."- 내가 작업을 제출할 때. 그것은 작업을 실행하지만 그것은 생산자 코드가 포함되어 있으면 맵 기능에 들어 가지 않습니다 솔기 " –
입력 스트림에서 일부 조건을 기반으로 목록을 통해 루프 및 항목 항목을 게시 할 관심 –