2012-10-29 2 views
4

에서 응답을 결합하는 봄 통합 애그리 게이터 (aggregator) 구성 :나는 다음과 같은 사용하여 봄 통합을 구성하려고 오전 RabbitMq 팬 아웃 교환

  1. 는 채널에 메시지를 보냅니다.
  2. n 명의 소비자와 토끼 팬 아웃 (pub/sub) 교환에이 메시지를 게시 해주십시오.
  3. 각 소비자가 응답 메시지를 제공합니다.
  4. Spring Integration이 이러한 응답을 원래의 클라이언트로 되돌리기 전에 집계하도록하십시오.

이걸로 몇 가지 문제를 가지고 지금까지 ... 나는 apply-sequence="true" 속성을 설정하기 위해 발행 - 구독 채널을 사용하고

  1. 되도록 correlationId가, sequenceSize & sequenceNumber 속성 설정됩니다. 이러한 속성은 DefaultAmqpHeaderMapper에 의해 버려지고 있습니다. DEBUG headerName=[correlationId] WILL NOT be mapped

  2. fanout 교환 내에 2 개의 대기열이 등록되어 있어도 sequenceSize 속성은 1로만 설정됩니다. 아마도 이것은 메시지가 너무 일찍 어 그리 게이터에서 해제된다는 것을 의미합니다. 나는 이것이 apply-sequence="true"을 사용하기 위해 publish-subscribe-channel을 오용하고 있기 때문에 이것이 단지 하나의 가입자 인 int-amqp:outbound-gateway이라는 것을 확실히 말하고 있기 때문에 이것이 기대됩니다. 다음과 같이

내 아웃 바운드 봄 설정은 다음과 같습니다

<int:channel id="input"/> 

<int-amqp:inbound-gateway request-channel="input" queue-names="a-queue" connection-factory="connectionFactory" concurrent-consumers="1"/> 

<bean id="listenerService" class="example.ListenerService"/> 

<int:service-activator input-channel="input" ref="listenerService" method="receiveMessage"/> 

하나를 :

<rabbit:connection-factory id="connectionFactory" /> 

<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" reply-timeout="-1" /> 

<rabbit:admin connection-factory="connectionFactory" /> 

<rabbit:queue name="a-queue"/> 
<rabbit:queue name="b-queue"/> 

<rabbit:fanout-exchange name="fanout-exchange"> 
    <rabbit:bindings> 
     <rabbit:binding queue="a-queue" /> 
     <rabbit:binding queue="b-queue" /> 
    </rabbit:bindings> 
</rabbit:fanout-exchange> 

소비자는 다음과 같습니다 : 다음과 같이

<int:publish-subscribe-channel id="output" apply-sequence="true"/> 

<int:channel id="reply"> 
    <int:interceptors> 
     <int:wire-tap channel="logger"/> 
    </int:interceptors> 
</int:channel> 

<int:aggregator input-channel="reply" method="combine"> 
    <bean class="example.SimpleAggregator"/> 
</int:aggregator> 

<int:logging-channel-adapter id="logger" level="INFO"/> 

<int:gateway id="senderGateway" service-interface="example.SenderGateway" default-request-channel="output" default-reply-channel="reply"/> 

<int-amqp:outbound-gateway request-channel="output" 
            amqp-template="amqpTemplate" exchange-name="fanout-exchange" 
            reply-channel="reply"/> 

내 rabbitMQ의 설정은 제안은 좋을거야. 나는 의심해.

<int:channel id="output"/> 

<int:header-enricher input-channel="output" output-channel="output"> 
    <int:correlation-id expression="headers['id']" /> 
</int:header-enricher> 

<int:gateway id="senderGateway" service-interface="example.SenderGateway" default-request-channel="output" default-reply-timeout="5000" default-reply-channel="reply" /> 

<int-amqp:outbound-gateway request-channel="output" 
            amqp-template="amqpTemplate" exchange-name="fanout-exchange" 
            reply-channel="reply" 
            mapped-reply-headers="amqp*,correlationId" mapped-request-headers="amqp*,correlationId"/> 

<int:channel id="reply"/> 

<int:aggregator input-channel="reply" output-channel="reply" method="combine" release-strategy-expression="size() == 2"> 
    <bean class="example.SimpleAggregator"/> 
</int:aggregator> 

답변

3

문제는 S.I.는 팬 아웃 교환의 토폴로지에 대해 알고하지 않는다는 것입니다 : 어디 선가 스틱의 잘못된 말 ... 게리의 의견에 따라

새로운 아웃 바운드 스프링 설정을 가지고있다.

이 주위에 가장 간단한 방법은 (2의 팬 아웃을 가정) 사용자 정의 릴리스 전략에게 애그리 게이터 (aggregator)에

release-strategy-expression="size() == 2" 

을 사용하는 것입니다. 따라서 시퀀스 크기가 필요 없습니다. 당신은 헤더 enricher에 "학대"술집/서브 채널 ...

<int:header-enricher input-channel="foo" output-channel="bar"> 
     <int:correlation-id expression="T(java.util.UUID).randomUUID().toString()" /> 
    </int:header-enricher> 

이미 고유 메시지 ID를 사용하여 새 UUID를 만드는 피할 수를 방지 할 수 있습니다 ...

<int:correlation-id expression="headers['id']" /> 

마지막으로, 당신은 당신의 AMQP 엔드 포인트에

mapped-request-headers="correlationId" 

을 추가하여 AMQP에 correlationId가 헤더를 전달할 수 있습니다.

+0

감사합니다. Gary가 나에게 조금 더있어, 이제 문제는 내 outbound-gateway가 메시지에 대한 응답을 기다리지 않는 것입니다. 소비자는 팬 아웃 교환기에서 메시지를 잘 수신하고 있으며 둘 다 동일한 rabbitmq 대기열 (DEBUG)에 응답하고 있지만 발신자에게 답장을받지 못한다는 것을 알 수 있습니다. amqp *를 mapped-request-headers 속성에 추가해야했습니다. 그렇지 않으면 표준 amqp 헤더가 손실되었습니다. –

+0

원래 구성을 새 구성으로 업데이트했습니다. –

+0

게이트웨이를 사용하고 있음을 알지 못했습니다. 게이트웨이는 요청에 대한 단일 응답 만 처리합니다. 요청을 보내려면 아웃 바운드 어댑터를 사용하고 응답을 수신하려면 인바운드 어댑터를 사용해야합니다. 2 명의 소비자의 수신 게이트웨이가 응답하는 방법을 알 수 있도록 헤더를 수동으로 채워야합니다. –

0

비록이 질문이 3 년이지만, 나는 동일한 질문을했기 때문에 그것에 답변 할 것입니다.

스프링 통합은 원래 질문과 매우 비슷하게 들리는 Scatter-Gather 구현을 가지고 있습니다.

여기에서 관련 섹션 인 Spring Documentation

이 인 목표는 수신자에게 메시지를 보내고 결과를 집계하는 복합 엔드 포인트, ... 이전

, 이 패턴은 개별 구성 요소를 사용하여 구성 할 수 있으므로이 향상된 기능을 통해보다 편리하게 구성 할 수 있습니다.

ScatterGatherHandler는 PublishSubscribeChannel (또는 RecipientListRouter) 및 AggregatingMessageHandler 결합 요청 - 응답 엔드 포인트이다. 요청 메시지는 scatter 채널로 보내지고 ScatterGatherHandler는 어 그리 게이터가 보낸 응답을 outputChannel에 보낼 때까지 기다립니다.