http-outbound-gateway에 http-output-channel이있는 spring-xd http-processor 모듈이 있습니다. HTTP 200이 포함 된 메시지는 outputChannel에 제공되고 나머지는 failureChannel에 저장됩니다.kafka-sink에서 여러 주제로 메시지를 라우팅하는 방법
현재 http 프로세서 모듈은 TopicX가있는 kafka- 아웃 바운드 어댑터가있는 Kafka-Sink에 연결됩니다. TopicX는 추가 처리를 위해 HTTP 200 메시지 만 수신합니다. 이제 TopicY로 라우트 될 failureChannel의 메시지가 필요합니다.
kafka-sink에서 여러 카파 주제를 어떻게 보낼 수 있습니까? 메시지 헤더에 httpStatusCode가 있습니다. 내 프로젝트에 사용 카프카의 버전은 0.8.2이며, 자바 버전은 1.7 카프카 싱크에
<!-- http-processor-config -->
<int-http:outbound-gateway
request-channel="input"
url-expression="'myUrlLink'"
http-method="POST"
expected-response-type="java.lang.String"
charset="UTF-8"
reply-timeout="10"
reply-channel="output">
<int-http:request-handler-advice-chain>
<bean class="org.springframework.integration.handler.advice.RequestHandlerRetryAdvice">
<property name="recoveryCallback">
<bean class="org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer">
<constructor-arg ref="errorChannel" />
</bean>
</property>
<property name="retryTemplate" ref="retryTemplate" />
</bean>
</int-http:request-handler-advice-chain>
</int-http:outbound-gateway>
<!-- Handle failed messages and route to failureChannel for specific http codes-->
<int:service-activator input-channel="errorChannel" ref="customErrorHandler" method="handleFailedRequest" output-channel="failureChannel"/>
, 나는 다음과 같은 한 생산자 상황입니다 : 사실
<int-kafka:producer-context id="kafkaProducerContext">
<int-kafka:producer-configurations>
<int-kafka:producer-configuration broker-list="localhost:9092"
topic="${topicX}"
key-class-type="java.lang.String"
key-serializer="serializer"
value-class-type="[B"
value-serializer="valueSerializer"/>
</int-kafka:producer-configurations>
</int-kafka:producer-context>
마침내 작동했습니다. – Vidhya