2017-10-05 9 views
2

스프링 부트와 함께 사용할 때 포함 된 ActiveMQ에 대한 재 전달 정책을 변경하는 방법은 무엇입니까? DefaultJmsListenerContainerFactory에서 FixedBackOff를 지정하려고 시도했지만 도움이되지 않았습니다. 아래는 jms factory bean을 초기화하기 위해 사용하는 코드입니다. 큐에 들어오는 메시지를 처리하는 메시지 소비자가 있습니다. 사용할 수없는 리소스 때문에 처리하는 동안 확인 된 예외가 throw됩니다. 고정 된 간격 후에 처리를 위해 메시지를 다시 전달하도록하고 싶습니다.스프린트 부팅의 임베디드 ActiveMQ에 대한 Active MQ RedeliveryPolicy 변경

봄 부팅 : 1.5.7.Release

자바 : 1.7

@Bean 
public JmsListenerContainerFactory<?> publishFactory(ConnectionFactory connectionFactory, 
               DefaultJmsListenerContainerFactoryConfigurer configurer) { 
    DefaultJmsListenerContainerFactory factory = 
     new DefaultJmsListenerContainerFactory(); 

    factory.setBackOff(new FixedBackOff(5000, 5)); 

    // This provides all boot's default to this factory, including the message converter 
    configurer.configure(factory, connectionFactory); 

    // You could still override some of Boot's default if necessary. 
    factory.setErrorHandler(new ErrorHandler() { 

     @Override 
     public void handleError(Throwable t) { 
      LOG.error("Error occured in JMS transaction.", t); 
     } 

    }); 
    return factory; 
} 

소비자 코드 :

@JmsListener(destination = "PublishQueue", containerFactory = "publishFactory") 
@Transactional 
public void receiveMessage(PublishData publishData) { 
    LOG.info("Processing incoming message on publish queue with transaction id: " + publishData.getTransactionId()); 

    PublishUser user = new PublishUser(); 
    user.setPriority(1); 
    user.setUserId(publishData.getUserId()); 

    LOG.trace("Trying to enroll in the publish lock queue for user: " + user); 
    PublishLockQueue lockQueue = publishLockQueueService.createLock(user); 
    if (lockQueue == null) 
     throw new RuntimeException("Unable to create lock for publish"); 
    LOG.trace("Publish lock queue obtained with lock queue id: " + lockQueue.getId()); 

    try { 
     publishLockQueueService.acquireLock(lockQueue.getId()); 
     LOG.trace("Acquired publish lock."); 
    } 
    catch (PublishLockQueueServiceException pex) { 
     throw new RuntimeException(pex); 
    } 

    try { 
     publishService.publish(publishData, lockQueue.getId()); 
     LOG.trace("Completed publish of changes."); 

     sendPublishSuccessNotification(publishData); 
    } 
    finally { 
     LOG.trace("Trying to release lock to publish."); 
     publishLockQueueService.releaseLock(lockQueue.getId()); 
    } 

    LOG.info("Publish has been completed for transaction id: " + publishData.getTransactionId()); 
} 
+0

해당 소비자는 트랜잭션 확인 모드를 사용해야 예외가 발생하면 소비자 롤백 기능을 제공하고 여러 소비자가 실행중인 경우 ActiveMQ가 동일한 소비자 또는 다른 소비자에게 메시지를 다시 전달할 수있게합니다. 그러나 ActiveMQ에서 backoff 등의 재 전달 옵션을 구성 할 수 있습니다. 위의 오류 처리기는 로깅 이외의 다른 기능을 수행 할 수없는 청취자입니다. –

+1

코드를 공유 할 수 있습니까? – Makoton

+1

@Makoton 소비자 코드도 포함 시켰습니다. – user320587

답변

0

@claus의 answerd :

: 난 작동하도록 테스트

그 소비자는 거래 확인 모드를 사용해야합니다. 예외가 발생하면 소비자가 롤백 할 수있게하고 여러 소비자가 실행중인 경우 ActiveMQ가 동일한 소비자 또는 다른 소비자에게 메시지를 다시 전달할 수있게합니다. 그러나 BackOff와 같은 ActiveMQ에서 재 전달 옵션을 구성 할 수 있습니다. 위의 오류 처리기는 로깅 이외의 기능을 수행 할 수없는 청취자입니다.

+0

의견에서 언급했듯이, 제 문제는 재 전달과는 다르지만 재 전달 간격과 관련이 있습니다. ActiveMQ 연결 팩토리 개체를 직접 사용하여 백 오프를 지정할 수 있음을 이해합니다. 이것은 ActiveMQ에 고유 한 코드를 포함해야한다는 것을 의미합니다. 그 일을하지 않고 Spring DefaultJmsListenerContainerFactory # setBackOff 메소드를 사용하여 백 오프 정책을 지정하기를 바라고 있습니다. – user320587