2015-01-19 2 views
3

나는 RabbitMQ와 함께 Spring을 사용하고 있으며 런타임 예외가 발생할 경우 메시지 재 전달을 피하려고합니다. requeue-rejectfalse으로 설정하고 listener-container으로 설정하고 을 던지는 사용자 지정 오류 처리기를 구성하려고했습니다. 두 가지 전략 모두 실패했고 메시지는 영원히 계속 재 배달됩니다. 이유에 대한 아이디어가 있습니까?RabbitMQ with Spring reelivering message forever

감사합니다. 모든 requeue-rejected="false"AmqpRejectAndDontRequeueException

<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.1.xsd"> 

    <rabbit:connection-factory id="rabbitMQConnectionFactory" host="localhost" port="5672" username="guest" password="guest" /> 

    <rabbit:admin connection-factory="rabbitMQConnectionFactory" /> 

    <rabbit:template id="amqpTemplate" connection-factory="rabbitMQConnectionFactory" /> 

    <rabbit:queue name="q1" /> 
    <rabbit:queue name="q2" /> 

    <rabbit:listener-container error-handler="errorHandler" connection-factory="rabbitMQConnectionFactory" concurrency="10" transaction-manager="transactionManager" requeue-rejected="false"> 
     <rabbit:listener ref="q1Listener" method="consumeMessage" queue-names="q1" /> 
     <rabbit:listener ref="q2Listener" method="consumeMessage" queue-names="q2" /> 
    </rabbit:listener-container> 

    <bean id="errorHandler" class="ErrorHandler" /> 

    <bean id="q1Listener" class="Q1MessageConsumerBean" /> 
    <bean id="q2Listener" class="Q2MessageConsumerBean" /> 
</beans> 
+0

메시지를 대기열에서 제거하고 실패로 표시하려면 메시지를 "Nack"해야합니다. – jhilden

+0

안녕하세요, 당신은 errorHandler 빈의 예제 코드를 줄 수 있습니까? – Adelin

답변

3

먼저 타겟 청취자로부터 RuntimeException 경우 동일한다.

다음은 requeue-rejected="false"이 잘 작동하는 것을 보여주는 테스트 케이스입니다.


@Autowired 
private RabbitTemplate rabbitTemplate; 

@Autowired 
private SimpleMessageListenerContainer container; 

@Autowired 
private ThrowListener throwListener; 

@Test 
public void test() throws Exception { 
    rabbitTemplate.convertAndSend("foo"); 
    container.start(); 
    Thread.sleep(2000); 
    Mockito.verify(throwListener).onMessage(Mockito.any(Message.class)); 
} 

public static class ThrowListener implements MessageListener { 

    @Override 
    public void onMessage(Message message) { 
     throw new RuntimeException("intentional reject"); 
    } 

} 

Mockito.verify(throwListener).onMessage(Mockito.any(Message.class));

<rabbit:connection-factory id="connectionFactory" host="localhost" /> 

<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" 
    exchange="foo" routing-key="foo" /> 

<rabbit:admin connection-factory="connectionFactory" /> 

<rabbit:queue name="foo" /> 

<rabbit:direct-exchange name="foo"> 
    <rabbit:bindings> 
     <rabbit:binding queue="foo" key="foo" /> 
    </rabbit:bindings> 
</rabbit:direct-exchange> 

<rabbit:listener-container connection-factory="connectionFactory" auto-startup="false" requeue-rejected="false"> 
    <rabbit:listener ref="listener" queue-names="foo" /> 
</rabbit:listener-container> 

<bean id="listener" class="org.mockito.Mockito" factory-method="spy"> 
    <constructor-arg> 
     <bean class="org.springframework.amqp.rabbit.listener.RejectedTests$ThrowListener" /> 
    </constructor-arg> 
</bean> 

onMessage 한 번만 처음 배달이라는 것을 확인합니다. 두 번째 배달은 일어나지 않습니다. 왜냐하면 우리의 메시지가 거부되어 RabbitMQ Broker에서 삭제되기 때문입니다.

for (Long deliveryTag : deliveryTags.get(channel)) { 
    try { 
     channel.basicReject(deliveryTag, true); 
    } catch (IOException ex) { 
    throw new AmqpIOException(ex); 
    } 
} 

그러나이 블록에 도달 할 때, 텍사스 커밋하고 커밋에 Expception에서만 : RabbitResourceHolder#rollbackAll() -

나는 그것이 독립적으로 requeue-rejected="false"의 일이 않는 경우에만 signle 장소를 참조하십시오. TX 롤백은 다음과 같은 경우에 발생합니다.

private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Throwable { 

    Channel channel = consumer.getChannel(); 

    for (int i = 0; i < txSize; i++) { 

     logger.trace("Waiting for message from consumer."); 
     Message message = consumer.nextMessage(receiveTimeout); 
     if (message == null) { 
      break; 
     } 
     try { 
      executeListener(channel, message); 
     } 
     catch (ImmediateAcknowledgeAmqpException e) { 
      break; 
     } 
     catch (Throwable ex) { 
      consumer.rollbackOnExceptionIfNecessary(ex); 
      throw ex; 
     } 

    } 

    return consumer.commitIfNecessary(isChannelLocallyTransacted(channel)); 

마지막 줄에주의하십시오. consumer.rollbackOnExceptionIfNecessary(ex);은 리스너에서 예외가 발생하는 경우입니다. 그렇지 않으면 우리는 마지막 라인에 도달하여 외부 TX 커밋을 기다린다.

귀하의 사례인지 알려주십시오.

2

답변 해 주셔서 감사합니다. 나는 다른 고환을 만들었고 이제 같은 구성 (오류 제거됨)으로 예상되는 동작을 보입니다. 재전송이 발생

FYK 내 시험 중 하나에 나는 시나리오를 발견했는데 그것은 당신이 한 말과 관련된 것일 수 있습니다

흐름 : Listener ->Facade ->Service. 모든 트랜잭션.

RuntimeException을 던지고 Facade에 걸렸고 (예외를 삼켜 서) 다시 잡히지 않으면 메시지가 다시 전달됩니다. tx가 롤백 된 것처럼 보입니다. 예외를 삼킨다 고해도 메시지는 다시 전달됩니다 - requeue-rejected 속성을 무시하십시오.

다시 한 번 감사드립니다.

+0

그래서 우리는 재발신없이 메시지를 거부 할 수 있도록 트랜잭션 롤백을 사용할 수 있습니까? –