2

http-outbound-gateway에 http-output-channel이있는 spring-xd http-processor 모듈이 있습니다. HTTP 200이 포함 된 메시지는 outputChannel에 제공되고 나머지는 failureChannel에 저장됩니다.kafka-sink에서 여러 주제로 메시지를 라우팅하는 방법

현재 http 프로세서 모듈은 TopicX가있는 kafka- 아웃 바운드 어댑터가있는 Kafka-Sink에 연결됩니다. TopicX는 추가 처리를 위해 HTTP 200 메시지 만 수신합니다. 이제 TopicY로 라우트 될 failureChannel의 메시지가 필요합니다.

kafka-sink에서 여러 카파 주제를 어떻게 보낼 수 있습니까? 메시지 헤더에 httpStatusCode가 있습니다. 내 프로젝트에 사용 카프카의 버전은 0.8.2이며, 자바 버전은 1.7 카프카 싱크에

<!-- http-processor-config --> 
<int-http:outbound-gateway 
     request-channel="input" 
     url-expression="'myUrlLink'" 
     http-method="POST" 
     expected-response-type="java.lang.String" 
     charset="UTF-8" 
     reply-timeout="10" 
     reply-channel="output"> 

     <int-http:request-handler-advice-chain> 
        <bean class="org.springframework.integration.handler.advice.RequestHandlerRetryAdvice"> 
         <property name="recoveryCallback"> 
          <bean class="org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer"> 
           <constructor-arg ref="errorChannel" /> 
          </bean> 
         </property> 
         <property name="retryTemplate" ref="retryTemplate" /> 
        </bean> 
     </int-http:request-handler-advice-chain> 

</int-http:outbound-gateway> 


<!-- Handle failed messages and route to failureChannel for specific http codes--> 
<int:service-activator input-channel="errorChannel" ref="customErrorHandler" method="handleFailedRequest" output-channel="failureChannel"/> 

, 나는 다음과 같은 한 생산자 상황입니다 : 사실

<int-kafka:producer-context id="kafkaProducerContext"> 
    <int-kafka:producer-configurations> 
     <int-kafka:producer-configuration broker-list="localhost:9092" 
              topic="${topicX}" 
              key-class-type="java.lang.String" 
              key-serializer="serializer" 
              value-class-type="[B" 
              value-serializer="valueSerializer"/> 
    </int-kafka:producer-configurations> 
</int-kafka:producer-context> 

답변

1

마침내 작동했습니다. 지금은 HTTP 프로세서 모듈에 스플리터를 추가하여 0.8.x 버전의 해결 방법을 발견하고 kafka_topic 변수를 메시지 헤더에 추가했습니다. HTTP 상태 코드에 따라 다른 주제를 설정했습니다.

Kafka-sink에서 XD params를 통해 설정된 새로운 주제 이름 변수를 가진 다른 제작자 구성을 추가했습니다. 내가 여러 스트림에서 kafka-source 및 kafka-sink 모듈을 재사용하기 때문에 다른 해결책을 생각할 수 없다.

이 특정 kafka-sink는 입력을 다른 XD 스트림으로 보냅니다. 따라서 다음 스트림이 시작될 때 kafka-source 모듈에서 kafka_topic을 제거하기위한 헤더 필터를 추가했습니다.

더 읽으려면 : 대상 카프카 항목을 설정하는 라인 http://docs.spring.io/autorepo/docs/spring-kafka-dist/1.0.2.RELEASE/reference/html/_spring_integration.html

봐. 그게 핵심이야.

1

가 지원되지 않습니다 그러지 않을 것이다. Spring XD는 이미 올해 EOL입니다. 누구나 Spring Cloud Data Flow으로 마이그레이션하는 것이 좋습니다.

귀하의 경우에는 Kafka Sink 모듈 구성을 편집 할 수 있습니다. 다른 주제에 <int-kafka:outbound-channel-adapter>을 하나 더 추가하십시오. 들어오는 메시지를 보낼 항목을 결정하려면 <router>을이 구성에 추가하면됩니다.

Router Sink을 사용해보십시오. 그리고 각 메시지 유형에 대해 두 개의 별도 스트림이 있으므로 각 주제가 있습니다.

+0

마침내 작동했습니다. – Vidhya