StateStore와 상호 작용하여 메시지를 필터링하고 복잡한 논리를 처리하는 프로세서가 있습니다. process(key,value)
메서드에서는 context.forward(key,value)
을 사용하여 필요한 키와 값을 보냅니다. 디버깅 목적으로도 인쇄합니다.Kafka 스트림을 사용하는 프로세서로 키와 값을 필터링하는 방법 DSL
다른 두 스트림을 결합한 결과 KStream mergedStream
이 있습니다. 그 스트림의 레코드에 프로세서를 적용하고 싶습니다. 나는 이것을 달성한다 : mergedStream.process(myprocessor,"stateStoreName")
이 프로그램을 시작할 때, 나는 나의 콘솔에 인쇄 될 적당한 값을 볼 수있다. 그러나 mergedStream.to("topic")
을 사용하여 주제에 mergedStream을 보내면 주제 값은 프로세서에서 전달했지만 원래 값은 아닙니다.
나는 kafka-streams 0.10.1.0을 사용합니다.
내가 처리기에서 다른 스트림으로 전달한 값을 가져 오는 가장 좋은 방법은 무엇입니까?
Processor API을 KStream DSL으로 만든 스트림과 함께 사용할 수 있습니까?