ActiveMQ와의 스프링 통합으로 작업하고 있습니다. 내 대기열이 비어있을 때 (보류중인 메시지가 없을 때) 몇 가지 프로세스를 수행해야한다는 요구 사항이 있습니다. 나는ActiveMQ와의 Spring 통합 : 큐에 보류중인 메시지가 더 이상 없을 때 알림 가져 오기
<bean id="my.jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://127.0.0.1:61616?jms.prefetchPolicy.queuePrefetch=250&jms.useAsyncSend=true" />
<property name="optimizeAcknowledge" value="true" />
</bean>
<bean id="my.jms.cachedConnectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory"
p:targetConnectionFactory-ref="my.jmsConnectionFactory"
p:sessionCacheSize="10" p:reconnectOnException="true" />
<bean id="myListenerContainer "
class="my.com.spring.integration.MyListenerContainer ">
<property name="connectionFactory"
ref="my.jms.cachedConnectionFactory" />
<property name="destination" ref="myQ" />
<property name="concurrentConsumers" value="1" />
<property name="maxConcurrentConsumers" value="1" />
</bean>
<bean id="myMessageListener"
class="org.springframework.integration.jms.ChannelPublishingJmsMessageListener" />
<bean id="myJmsEndpoint"
class="org.springframework.integration.jms.JmsMessageDrivenEndpoint">
<constructor-arg ref="myListenerContainer" />
<constructor-arg ref="myMessageListener" />
<property name="outputChannel" ref="myConsumerChannel" />
</bean>
모든 것은 아직 현재 잘 작동이 같은,
package my.com.spring.integration;
public class MyListenerContainer extends DefaultMessageListenerContainer {
@Override protected void messageReceived(Object invoker, Session session) {
// I mark lastMessageReceived time here
super.messageReceived(invoker, session);
}
@Override protected void noMessageReceived(Object invoker, Session session) {
// I wait for 1 minute from last message messageReceived and after
// that I consider that queue has no more messages now and
// I start my stuff
super.noMessageReceived(invoker, session);
}
}
그리고 내 스프링 XML은 다음과 같습니다 있음을 달성했다. 대기열이 비어있을 때 알림이 표시되고 원하는 항목을 수행 할 수 있습니다.
[질문 :이 이 비어있는 큐에서 더 이상 떨어져 메시지를 수신하지 않는 다른 이유로 있습니까? 스프링 코드 실행 중에 예외가 발생하는 경우와 마찬가지로, 어떤 일이 발생합니까? 내 목적을 수행하기위한 적절한 방법인가? 그렇지 않다면 어떻게 개선 할 수 있습니까?