2017-12-21 24 views
2

카프카 주제를 만들고 메시지를 푸시했습니다.KStream을 콘솔로 인쇄하는 방법은 무엇입니까?

그래서

bin/kafka-console-consumer --bootstrap-server abc.xyz.com:9092 --topic myTopic --from-beginning --property print.key=true --property key.separator="-" 

인쇄 명령 줄에서

key1-customer1 

.

이 주제에서 Kafka 스트림을 만들고이 key1-customer1을 콘솔에 인쇄하려고합니다. 이 실패하지 않습니다

final Properties streamsConfiguration = new Properties(); 

streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-id"); 
streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, "client-id"); 
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "abc.xyz.com:9092"); 

streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); 
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); 

// Records should be flushed every 10 seconds. This is less than the default 
// in order to keep this example interactive. 
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000); 
// For illustrative purposes we disable record caches 
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); 

final StreamsBuilder builder = new StreamsBuilder(); 
final KStream<String, String> customerStream = builder.stream("myTopic"); 
customerStream.foreach(new ForeachAction<String, String>() { 
    public void apply(String key, String value) { 
     System.out.println(key + ": " + value); 
    } 
}); 

final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration); 

streams.start(); 

Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); 

:

는 나는 그것을 위해 다음과 같이 썼다. 그러나 이것은 this answer 제안과 같이 콘솔에 아무 것도 인쇄하지 않습니다.

카프카를 처음 사용했습니다. 그래서이 일을하기위한 제안은 제게 많은 도움이 될 것입니다.

답변

0

CLIENT_ID_CONFIG을 설정 해제하고 단지 APPLICATION_ID_CONFIG으로 남겨 둡니다. 카프카 스트림 uses the application ID to set the client ID.

또한 Kafka Streams 응용 프로그램이 사용하는 소비자 그룹 ID (이 소비자 그룹 ID는 응용 프로그램 ID를 기반으로 함)에 대한 오프셋을 확인합니다. kafka-consumer-groups.sh 도구를 사용하십시오. Streams 애플리케이션이 자동 토큰 재설정이 최신으로 설정되었거나 사용자의 질문에서 쉽게 식별 할 수없는 다른 이유로 인해 해당 주제에 대해 생성 한 모든 레코드보다 앞서있을 수 있습니다.

0

TL : DRPrinted을 사용하십시오.

import org.apache.kafka.streams.kstream.Printed 
val sysout = Printed 
    .toSysOut[String, String] 
    .withLabel("customerStream") 
customerStream.print(sysout)