을 intercetors.classes하지, 그래서 나는 당신이 그 동안 해결책을 찾을 가정합니다. 그러나 다른 사람을 돕는 경우에 대비하여 내 스트림에 이미 지정된 출력이 없으면 메시지의 내용을 기반으로 다른 항목에 메시지를 전달하는 내 ProducerInterceptor
클래스가 호출되지 않은 것으로 나타났습니다.
첫 번째 시도는 출력 항목을 지정할 필요가 없다고 생각했기 때문에 이와 비슷한 것으로 보입니다. 이 작동하지 않습니다
val builder: KStreamBuilder = new KStreamBuilder
val input = builder.stream("input-topic")
val stream: KafkaStreams = new KafkaStreams(builder, streamsConfigWithProducerInterceptor)
stream.start()
하지만이 작업을 수행합니다
val builder: KStreamBuilder = new KStreamBuilder
val input = builder.stream("input-topic").through("dummy-output-topic")
val stream: KafkaStreams = new KafkaStreams(builder, streamsConfigWithProducerInterceptor)
stream.start()
그것은 아무것도 두 번째 예제에서 dummy-output-topic
에 게시하지됩니다 것을주의 대신 through
의 to
도 작동 것으로 보인다 사용하는 것이 가치 같은 길. 내 코드는 실제로 보이는 그래서 내 경우
, 나는, 다른 주제로 파견 인터셉터를 사용하기 전에 레코드를 변경
map
를 호출 한보기 다음과 같습니다 :
val builder: KStreamBuilder = new KStreamBuilder
val input = builder.stream("input-topic")
.map(new CustomKeyValueMapper)
.through("dummy-output-topic")
val stream: KafkaStreams = new KafkaStreams(builder, streamsConfigWithProducerInterceptor)
stream.start()
내가 그 사례가 작동 사람을 도와 희망 ProducerInterceptor
s와 같은 실수를 저 지르게되었습니다.
답장 Chris에게 감사드립니다. 예, producer.properties의 interceptor.classes로 올바르게 언급되었습니다. 오타를 유감스럽게 생각합니다. –