2016-09-09 5 views
0

제작자가 카프카 주제에 게시 한 메시지의 유효성 검사를 위해 인터셉터를 추가하려고합니다. 카프카 (Kafka) 주제에서 수행되는 스키마 유효성 검사 외에 몇 가지 검증 작업을 수행해야합니다. 따라야 할 단계는 다음과 같습니다.카프카 제작자 인터셉터

  1. ProducerInterceptor Interface를 확장 한 Java 클래스를 작성했습니다.
  2. 클래스를 컴파일하고 클래스 경로에 포함 된 폴더에 배치 된 jar 파일을 만들었습니다.
  3. Kafka 설치 내에서 intercetors.classes = classname을 producer.properties에 추가했습니다.

그러나 주제에 메시지를 게시 할 때 작성한 사용자 정의 인터셉터 클래스가 호출되지 않습니다. (나는 어떤 오류도 발생하지 않고 있습니다. 메시지는 주제에 완벽하게 게시됩니다).

I의 객담이에 https://cwiki.apache.org/confluence/display/KAFKA/KIP-42%3A+Add+Producer+and+Consumer+Interceptors

하시기 바랍니다 조언을 언급했다.

답변

0

속성 이름은 interceptor.classes이,이 질문은 꽤 오래

+0

답장 Chris에게 감사드립니다. 예, producer.properties의 interceptor.classes로 올바르게 언급되었습니다. 오타를 유감스럽게 생각합니다. –

0

을 intercetors.classes하지, 그래서 나는 당신이 그 동안 해결책을 찾을 가정합니다. 그러나 다른 사람을 돕는 경우에 대비하여 내 스트림에 이미 지정된 출력이 없으면 메시지의 내용을 기반으로 다른 항목에 메시지를 전달하는 내 ProducerInterceptor 클래스가 호출되지 않은 것으로 나타났습니다.

첫 번째 시도는 출력 항목을 지정할 필요가 없다고 생각했기 때문에 이와 비슷한 것으로 보입니다. 이 작동하지 않습니다

val builder: KStreamBuilder = new KStreamBuilder 
val input = builder.stream("input-topic") 

val stream: KafkaStreams = new KafkaStreams(builder, streamsConfigWithProducerInterceptor) 
stream.start() 

하지만이 작업을 수행합니다

val builder: KStreamBuilder = new KStreamBuilder 
val input = builder.stream("input-topic").through("dummy-output-topic") 

val stream: KafkaStreams = new KafkaStreams(builder, streamsConfigWithProducerInterceptor) 
stream.start() 

그것은 아무것도 두 번째 예제에서 dummy-output-topic에 게시하지됩니다 것을주의 대신 throughto도 작동 것으로 보인다 사용하는 것이 가치 같은 길. 내 코드는 실제로 보이는 그래서 내 경우

, 나는, 다른 주제로 파견 인터셉터를 사용하기 전에 레코드를 변경 map를 호출 한보기 다음과 같습니다 :

val builder: KStreamBuilder = new KStreamBuilder 
val input = builder.stream("input-topic") 
    .map(new CustomKeyValueMapper) 
    .through("dummy-output-topic") 

val stream: KafkaStreams = new KafkaStreams(builder, streamsConfigWithProducerInterceptor) 
stream.start() 

내가 그 사례가 작동 사람을 도와 희망 ProducerInterceptor s와 같은 실수를 저 지르게되었습니다.