스프링 부트와 함께 사용할 때 포함 된 ActiveMQ에 대한 재 전달 정책을 변경하는 방법은 무엇입니까? DefaultJmsListenerContainerFactory에서 FixedBackOff를 지정하려고 시도했지만 도움이되지 않았습니다. 아래는 jms factory bean을 초기화하기 위해 사용하는 코드입니다. 큐에 들어오는 메시지를 처리하는 메시지 소비자가 있습니다. 사용할 수없는 리소스 때문에 처리하는 동안 확인 된 예외가 throw됩니다. 고정 된 간격 후에 처리를 위해 메시지를 다시 전달하도록하고 싶습니다.스프린트 부팅의 임베디드 ActiveMQ에 대한 Active MQ RedeliveryPolicy 변경
봄 부팅 : 1.5.7.Release
자바 : 1.7
@Bean
public JmsListenerContainerFactory<?> publishFactory(ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory =
new DefaultJmsListenerContainerFactory();
factory.setBackOff(new FixedBackOff(5000, 5));
// This provides all boot's default to this factory, including the message converter
configurer.configure(factory, connectionFactory);
// You could still override some of Boot's default if necessary.
factory.setErrorHandler(new ErrorHandler() {
@Override
public void handleError(Throwable t) {
LOG.error("Error occured in JMS transaction.", t);
}
});
return factory;
}
소비자 코드 :
@JmsListener(destination = "PublishQueue", containerFactory = "publishFactory")
@Transactional
public void receiveMessage(PublishData publishData) {
LOG.info("Processing incoming message on publish queue with transaction id: " + publishData.getTransactionId());
PublishUser user = new PublishUser();
user.setPriority(1);
user.setUserId(publishData.getUserId());
LOG.trace("Trying to enroll in the publish lock queue for user: " + user);
PublishLockQueue lockQueue = publishLockQueueService.createLock(user);
if (lockQueue == null)
throw new RuntimeException("Unable to create lock for publish");
LOG.trace("Publish lock queue obtained with lock queue id: " + lockQueue.getId());
try {
publishLockQueueService.acquireLock(lockQueue.getId());
LOG.trace("Acquired publish lock.");
}
catch (PublishLockQueueServiceException pex) {
throw new RuntimeException(pex);
}
try {
publishService.publish(publishData, lockQueue.getId());
LOG.trace("Completed publish of changes.");
sendPublishSuccessNotification(publishData);
}
finally {
LOG.trace("Trying to release lock to publish.");
publishLockQueueService.releaseLock(lockQueue.getId());
}
LOG.info("Publish has been completed for transaction id: " + publishData.getTransactionId());
}
해당 소비자는 트랜잭션 확인 모드를 사용해야 예외가 발생하면 소비자 롤백 기능을 제공하고 여러 소비자가 실행중인 경우 ActiveMQ가 동일한 소비자 또는 다른 소비자에게 메시지를 다시 전달할 수있게합니다. 그러나 ActiveMQ에서 backoff 등의 재 전달 옵션을 구성 할 수 있습니다. 위의 오류 처리기는 로깅 이외의 다른 기능을 수행 할 수없는 청취자입니다. –
코드를 공유 할 수 있습니까? – Makoton
@Makoton 소비자 코드도 포함 시켰습니다. – user320587