에서 응답을 결합하는 봄 통합 애그리 게이터 (aggregator) 구성 :나는 다음과 같은 사용하여 봄 통합을 구성하려고 오전 RabbitMq 팬 아웃 교환
- 는 채널에 메시지를 보냅니다.
- n 명의 소비자와 토끼 팬 아웃 (pub/sub) 교환에이 메시지를 게시 해주십시오.
- 각 소비자가 응답 메시지를 제공합니다.
- Spring Integration이 이러한 응답을 원래의 클라이언트로 되돌리기 전에 집계하도록하십시오.
이걸로 몇 가지 문제를 가지고 지금까지 ... 나는 apply-sequence="true"
속성을 설정하기 위해 발행 - 구독 채널을 사용하고
되도록 correlationId가, sequenceSize & sequenceNumber 속성 설정됩니다. 이러한 속성은
DefaultAmqpHeaderMapper
에 의해 버려지고 있습니다.DEBUG headerName=[correlationId] WILL NOT be mapped
fanout 교환 내에 2 개의 대기열이 등록되어 있어도 sequenceSize 속성은 1로만 설정됩니다. 아마도 이것은 메시지가 너무 일찍 어 그리 게이터에서 해제된다는 것을 의미합니다. 나는 이것이
apply-sequence="true"
을 사용하기 위해 publish-subscribe-channel을 오용하고 있기 때문에 이것이 단지 하나의 가입자 인int-amqp:outbound-gateway
이라는 것을 확실히 말하고 있기 때문에 이것이 기대됩니다. 다음과 같이
내 아웃 바운드 봄 설정은 다음과 같습니다
<int:channel id="input"/>
<int-amqp:inbound-gateway request-channel="input" queue-names="a-queue" connection-factory="connectionFactory" concurrent-consumers="1"/>
<bean id="listenerService" class="example.ListenerService"/>
<int:service-activator input-channel="input" ref="listenerService" method="receiveMessage"/>
하나를 :
이<rabbit:connection-factory id="connectionFactory" />
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" reply-timeout="-1" />
<rabbit:admin connection-factory="connectionFactory" />
<rabbit:queue name="a-queue"/>
<rabbit:queue name="b-queue"/>
<rabbit:fanout-exchange name="fanout-exchange">
<rabbit:bindings>
<rabbit:binding queue="a-queue" />
<rabbit:binding queue="b-queue" />
</rabbit:bindings>
</rabbit:fanout-exchange>
소비자는 다음과 같습니다 : 다음과 같이
<int:publish-subscribe-channel id="output" apply-sequence="true"/>
<int:channel id="reply">
<int:interceptors>
<int:wire-tap channel="logger"/>
</int:interceptors>
</int:channel>
<int:aggregator input-channel="reply" method="combine">
<bean class="example.SimpleAggregator"/>
</int:aggregator>
<int:logging-channel-adapter id="logger" level="INFO"/>
<int:gateway id="senderGateway" service-interface="example.SenderGateway" default-request-channel="output" default-reply-channel="reply"/>
<int-amqp:outbound-gateway request-channel="output"
amqp-template="amqpTemplate" exchange-name="fanout-exchange"
reply-channel="reply"/>
내 rabbitMQ의 설정은 제안은 좋을거야. 나는 의심해.
<int:channel id="output"/>
<int:header-enricher input-channel="output" output-channel="output">
<int:correlation-id expression="headers['id']" />
</int:header-enricher>
<int:gateway id="senderGateway" service-interface="example.SenderGateway" default-request-channel="output" default-reply-timeout="5000" default-reply-channel="reply" />
<int-amqp:outbound-gateway request-channel="output"
amqp-template="amqpTemplate" exchange-name="fanout-exchange"
reply-channel="reply"
mapped-reply-headers="amqp*,correlationId" mapped-request-headers="amqp*,correlationId"/>
<int:channel id="reply"/>
<int:aggregator input-channel="reply" output-channel="reply" method="combine" release-strategy-expression="size() == 2">
<bean class="example.SimpleAggregator"/>
</int:aggregator>
감사합니다. Gary가 나에게 조금 더있어, 이제 문제는 내 outbound-gateway가 메시지에 대한 응답을 기다리지 않는 것입니다. 소비자는 팬 아웃 교환기에서 메시지를 잘 수신하고 있으며 둘 다 동일한 rabbitmq 대기열 (DEBUG)에 응답하고 있지만 발신자에게 답장을받지 못한다는 것을 알 수 있습니다. amqp *를 mapped-request-headers 속성에 추가해야했습니다. 그렇지 않으면 표준 amqp 헤더가 손실되었습니다. –
원래 구성을 새 구성으로 업데이트했습니다. –
게이트웨이를 사용하고 있음을 알지 못했습니다. 게이트웨이는 요청에 대한 단일 응답 만 처리합니다. 요청을 보내려면 아웃 바운드 어댑터를 사용하고 응답을 수신하려면 인바운드 어댑터를 사용해야합니다. 2 명의 소비자의 수신 게이트웨이가 응답하는 방법을 알 수 있도록 헤더를 수동으로 채워야합니다. –