2016-11-26 4 views
3

StateStore와 상호 작용하여 메시지를 필터링하고 복잡한 논리를 처리하는 프로세서가 있습니다. process(key,value) 메서드에서는 context.forward(key,value)을 사용하여 필요한 키와 값을 보냅니다. 디버깅 목적으로도 인쇄합니다.Kafka 스트림을 사용하는 프로세서로 키와 값을 필터링하는 방법 DSL

다른 두 스트림을 결합한 결과 KStream mergedStream이 있습니다. 그 스트림의 레코드에 프로세서를 적용하고 싶습니다. 나는 이것을 달성한다 : mergedStream.process(myprocessor,"stateStoreName")

이 프로그램을 시작할 때, 나는 나의 콘솔에 인쇄 될 적당한 값을 볼 수있다. 그러나 mergedStream.to("topic")을 사용하여 주제에 mergedStream을 보내면 주제 값은 프로세서에서 전달했지만 원래 값은 아닙니다.

나는 kafka-streams 0.10.1.0을 사용합니다.

내가 처리기에서 다른 스트림으로 전달한 값을 가져 오는 가장 좋은 방법은 무엇입니까?

Processor APIKStream DSL으로 만든 스트림과 함께 사용할 수 있습니까?

답변

9

짧은 :

당신도 transform(...) 대신 당신이 DSL 내에서 프로세서 API에 대한 액세스를 제공하는 process(...)의를 사용하여 문제를 해결합니다.

:

당신은 스트림 프로세서를 적용 process(...) 사용하는 경우 - 그러나, 이것은 즉, 그렇습니다, (그것의 반환 형식이 void입니다) 작업은 "종료"(또는 싱크) 어떤 결과도 반환하지 않습니다. (여기에 "싱크"는 연산자가 후계자가 없다는 의미입니다.)

mergedStream.process(...)mergedStream.to(...)으로 전화하면 기본적으로 분기 - 스트림을 복제하고 각 다운 스트림 운영자에게 사본 한 부를 보냅니다 (즉, process a 한 사본을 to으로 보내십시오.

DSL과 프로세서 API를 함께 사용하는 것은 절대 가능합니다. 그러나 process(...)을 사용하면 DSL 내에서 forward(...) 데이터를 소비 할 수 없습니다. 프로세서 API 결과를 사용하려면 process(...) 대신 transform(...)을 사용할 수 있습니다.