2017-05-09 10 views
0

ActiveMQ와의 스프링 통합으로 작업하고 있습니다. 내 대기열이 비어있을 때 (보류중인 메시지가 없을 때) 몇 가지 프로세스를 수행해야한다는 요구 사항이 있습니다. 나는ActiveMQ와의 Spring 통합 : 큐에 보류중인 메시지가 더 이상 없을 때 알림 가져 오기

<bean id="my.jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> 
    <property name="brokerURL" value="tcp://127.0.0.1:61616?jms.prefetchPolicy.queuePrefetch=250&jms.useAsyncSend=true" /> 
    <property name="optimizeAcknowledge" value="true" /> 
</bean> 

<bean id="my.jms.cachedConnectionFactory" 
    class="org.springframework.jms.connection.CachingConnectionFactory" 
    p:targetConnectionFactory-ref="my.jmsConnectionFactory" 
    p:sessionCacheSize="10" p:reconnectOnException="true" /> 

<bean id="myListenerContainer " 
     class="my.com.spring.integration.MyListenerContainer "> 
     <property name="connectionFactory" 
      ref="my.jms.cachedConnectionFactory" /> 
     <property name="destination" ref="myQ" /> 
     <property name="concurrentConsumers" value="1" /> 
     <property name="maxConcurrentConsumers" value="1" /> 
    </bean> 

    <bean id="myMessageListener" 
     class="org.springframework.integration.jms.ChannelPublishingJmsMessageListener" /> 

    <bean id="myJmsEndpoint" 
     class="org.springframework.integration.jms.JmsMessageDrivenEndpoint"> 
     <constructor-arg ref="myListenerContainer" /> 
     <constructor-arg ref="myMessageListener" /> 
     <property name="outputChannel" ref="myConsumerChannel" /> 
    </bean> 

모든 것은 아직 현재 잘 작동이 같은,

package my.com.spring.integration; 

public class MyListenerContainer extends DefaultMessageListenerContainer { 

     @Override protected void messageReceived(Object invoker, Session session) { 
      // I mark lastMessageReceived time here 
      super.messageReceived(invoker, session); 
     } 

     @Override protected void noMessageReceived(Object invoker, Session session) { 
      // I wait for 1 minute from last message messageReceived and after 
      // that I consider that queue has no more messages now and 
      // I start my stuff 
      super.noMessageReceived(invoker, session); 
     } 
    } 

그리고 내 스프링 XML은 다음과 같습니다 있음을 달성했다. 대기열이 비어있을 때 알림이 표시되고 원하는 항목을 수행 할 수 있습니다.

[질문 :이 비어있는 큐에서 더 이상 떨어져 메시지를 수신하지 않는 다른 이유로 있습니까? 스프링 코드 실행 중에 예외가 발생하는 경우와 마찬가지로, 어떤 일이 발생합니까? 내 목적을 수행하기위한 적절한 방법인가? 그렇지 않다면 어떻게 개선 할 수 있습니까?

답변

0

noMessageReceived은 메시지가 수신되지 않고 예외가 발생하지 않는 경우에만 호출됩니다.

이것은 DMLC에 메시지가 수신되지 않았 음을 의미합니다. 단, DMLC가 하나 뿐인 경우에만 용도가 유용하다는 것을 명심하십시오.

AMQ는 각 소비자에게 많은 메시지 (귀하의 경우 jms.prefetchPolicy.queuePrefetch=250)를 발송하고 발송 된 메시지는 각 소비자별로 처리되므로 승인하지 않고 연결을 끊으면 메시지가 다른 소비자에게 발송 될 수 있습니다. 당신이 크기 또는 발송 메시지의 수를 얻기 위해 JMX를 사용하여 큐에 대한 브로커 정보를 가지고해야 할 경우

http://activemq.apache.org/what-is-the-prefetch-limit-for.html

를보십시오.

import java.util.HashMap; 
import java.util.Map; 

import javax.management.MBeanServerConnection; 
import javax.management.MBeanServerInvocationHandler; 
import javax.management.ObjectName; 
import javax.management.remote.JMXConnector; 
import javax.management.remote.JMXConnectorFactory; 
import javax.management.remote.JMXServiceURL; 

import org.apache.activemq.broker.jmx.BrokerViewMBean; 
import org.apache.activemq.broker.jmx.QueueViewMBean; 

public class JMXGetDestinationInfos { 

    public static void main(String[] args) throws Exception { 
     JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi"); 
     Map<String, String[]> env = new HashMap<>(); 
     String[] creds = { "admin", "admin" }; 
     env.put(JMXConnector.CREDENTIALS, creds); 
     JMXConnector jmxc = JMXConnectorFactory.connect(url, env); 
     MBeanServerConnection conn = jmxc.getMBeanServerConnection(); 

     ObjectName activeMq = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost"); 

     BrokerViewMBean mbean = MBeanServerInvocationHandler.newProxyInstance(conn, activeMq, BrokerViewMBean.class, 
       true); 
     for (ObjectName name : mbean.getQueues()) { 
      if (("myQ".equals(name.getKeyProperty("destinationName")))) { 
       QueueViewMBean queueMbean = MBeanServerInvocationHandler.newProxyInstance(conn, name, 
         QueueViewMBean.class, true); 
       // ="Number of messages in the destination which are yet to be 
       // consumed. Potentially dispatched but unacknowledged.") 
       System.out.println(queueMbean.getQueueSize()); 
       // Returns the number of messages that have been delivered 
       // (potentially not acknowledged) to consumers. 
       System.out.println(queueMbean.getDispatchCount()); 
      } 
     } 
    } 
}