카프카 주제를 만들고 메시지를 푸시했습니다.KStream을 콘솔로 인쇄하는 방법은 무엇입니까?
그래서
bin/kafka-console-consumer --bootstrap-server abc.xyz.com:9092 --topic myTopic --from-beginning --property print.key=true --property key.separator="-"
인쇄 명령 줄에서
key1-customer1
.
이 주제에서 Kafka 스트림을 만들고이 key1-customer1
을 콘솔에 인쇄하려고합니다. 이 실패하지 않습니다
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-id");
streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, "client-id");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "abc.xyz.com:9092");
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
// Records should be flushed every 10 seconds. This is less than the default
// in order to keep this example interactive.
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000);
// For illustrative purposes we disable record caches
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, String> customerStream = builder.stream("myTopic");
customerStream.foreach(new ForeachAction<String, String>() {
public void apply(String key, String value) {
System.out.println(key + ": " + value);
}
});
final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
:
는 나는 그것을 위해 다음과 같이 썼다. 그러나 이것은 this answer 제안과 같이 콘솔에 아무 것도 인쇄하지 않습니다.카프카를 처음 사용했습니다. 그래서이 일을하기위한 제안은 제게 많은 도움이 될 것입니다.