2017-01-23 5 views
1

실제로이 실행을 얻지 못합니다. 어쩌면 나는 오해를하고 어쨌든 이것은 불가능하다. 하나의 동일한 대기열, 동일한 교환기에서 2 개의 수신기를 구성하려고하지만 라우팅 키만 달라야합니다. 제 문제는 어떻게 든 상황이 엉망이된다는 것입니다. 결과적으로 청취자 A는 청취자 B를위한 메시지를 얻습니다. 그러나 때로는 때로는 모든 것이 제대로 작동합니다. 어떤 제안?spring ampq 주석은 하나의 큐를 구동합니다. 두 명의 리스너가 라우팅 키를 구별합니다.

MyConfig를

@Bean 
public ConnectionFactory connectionFactory() { 
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory(getHostname()); 
    connectionFactory.setUsername(getUsername()); 
    connectionFactory.setPassword(getPassword()); 
    return connectionFactory; 
} 

@Bean 
public RabbitAdmin rabbitAdmin() { 
    return new RabbitAdmin(connectionFactory()); 
} 

@Bean 
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() { 
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); 
    factory.setMessageConverter(new CustomMessageConverter()); 
    factory.setConnectionFactory(connectionFactory()); 
    factory.setAcknowledgeMode(AcknowledgeMode.AUTO); 
    factory.setConcurrentConsumers(10); 
    factory.setMaxConcurrentConsumers(10); 
    return factory; 
} 

@Override 
public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) { 
    registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory()); 
} 

@Bean 
public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() { 
    DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory(); 
    factory.setMessageConverter(new MappingJackson2MessageConverter()); 
    return factory; 
} 

MyListeners

@RabbitListener(bindings = @QueueBinding(value = @Queue(value = QUEUE, durable = "true"), exchange = @Exchange(value = EXCHANGE, type = "topic", durable = "true", ignoreDeclarationExceptions = "true"), key = "routingKeyA")) 
public String myListenerA(@Payload PayloadA payload, @Header(AmqpHeaders.CORRELATION_ID) byte[] correlationId) { 

    return SUCCESS_RESPONSE; 
} 

에 myListener B

@RabbitListener(bindings = @QueueBinding(value = @Queue(value = QUEUE, durable = "true"), exchange = @Exchange(value = EXCHANGE, type = "topic", durable = "true", ignoreDeclarationExceptions = "true"), key = "routingKeyB")) 
public String myListenerB(@Payload PayloadB payload, @Header(AmqpHeaders.CORRELATION_ID) byte[] correlationId) { 

    return SUCCESS_RESPONSE; 
} 

추가 정보 :이 큐에 (20 개) 소비자 있어요. Thx 사전에!

답변

1

RabbitMQ는 그런 식으로 작동하지 않습니다. JMS와 달리 (예 : 라우팅 키를 기반으로) 대기열에서 메시지를 선택할 수있는 방법이 없습니다.

당신이 한 모든 작업은 두 개의 다른 라우팅 키가있는 교환기와 동일한 대기열에 묶여 있습니다. 따라서 리스너는 대기열에 어떤 방식 으로든 관계없이 메시지를 받게됩니다.

RabbitMQ를 사용하면 각 수신기마다 별도의 대기열이 필요합니다. 제작자가 교환기에 게시하면 브로커는 자신이 사용한 라우팅 키에 따라 메시지를 올바른 대기열로 라우팅 처리합니다.

각 수신기의 인스턴스가 여러 개인 경우 메시지가 그에 따라 배포됩니다 (대기열 당 하나의 전달 만).

+0

대단히 감사합니다. 내 질문에 감사드립니다. 귀하의 anwser 매우 도움이되었다. – stritzi

+0

문제 없음. 나는 네가 여기 새로 왔음을 안다. 응답을 "수락"으로 표시하는 것이 일반적입니다 (투표 버튼 아래의 체크 표시/체크 표시를 클릭하십시오). 이것은 비슷한 질문에 대한 답을 찾을 때 다른 사람들을 도울 것입니다. –

+0

그 조언을 주셔서 대단히 감사합니다. 실제로 나는 여기에서보다 적극적으로 노력하고 있습니다! 또한 정말 나를 도왔 stackoverflow에 대한 다른 질문 anwsering 주셔서 감사합니다. – stritzi