2017-01-24 4 views
0

ActiveMQ 주제 (topic)와 대기열 (queue)의 메시지를 보내는 데 사용합니다.파이썬. 대기열/주제에서 메시지를 삭제하는 방법 ActiveMQ

나는 두 가지 질문이 있습니다

방법 delete
  1. (작업 취소) 큐 또는 주제에 보낸 메시지.
  2. 어떻게 removepurge으로 완전히 모든 대기열/주제. AMQ와

작품은 프로토콜 STOMP를 통해 stompy 라이브러리를 구성,하지만 적절한 functions

사용해야 어떤 라이브러리 말해하거나 솔루션 자체에있다.

대단히 감사합니다.

+0

어떻게 삭제하려면이 작업을 수행하기위한

이것은 간단한 클라이언트의 내 대구입니다 REST를 사용하여 메시지? – Arseny

답변

0

이론적으로는 (이론적으로는 WireShark을 통한 패키지 트래픽 분석 결과, AMQ 관리 페이지 ActiveMQ의 localhost : 8161 \ admin 페이지에서 작업 브라우저를 사용하여 결과적으로 메시지를 삭제할 수없고 프로그래밍 방식으로 (Python) 삭제할 수 없습니다.

이론상 나는 param이있는 AMQ에 deleteMessage()을 삭제할 때 \ admin AMQ에 보내는 packege에서 호출 할 수 있습니다. 큐 \ 항목에서 MSG의 고유 한 이름이

  • secret이 - -

    • id[id,secret], 고유 번호 (사용될 수 있습니다 "토큰") 내가 (예 : F5)를 \ 관리자 \ 찾아 업데이트 할 때마다 변경되는 페이지. 나는 그것이 무엇인지 말할 수 없다 ....

    거기에 이미지를보십시오 : https://ru.stackoverflow.com/q/618697/228254 내 대답을 아래로 게시하십시오.

    예 테스트 queueu :

    • ID : ID : #######의 NAME_SERVER의 ###### - 44458-1485427798954-6 : 1 : 1 : 1 : 1
    • 비밀 : desision의 내 변종 1dbd2916-337a-48cc-bce7-63b00d38ba3이 순간

    을했다 : 내가 \ 드롭 큐에서 삭제 nedd 큐와 ACK MSG의 모든 MSG를 얻을.

    from stompy import stomp 
    import json 
    
    
    s = stomp.Stomp(amq_ip, amq_port) 
    
    try: 
        s.connect(username=amq_user, password=amq_pass) 
        s.subscribe({'destination': '%s' % amq_queue, 'ack': 'client'}) 
    except Exception as e: 
        print "ActiveMQ error\n %s" % e 
    
    while True: 
        try: 
         frame = s.receive_frame() 
         body = json.loads(frame.body) 
    
         # это сообщение для меня? 
         if body["interested_atr_in_msg"] == "interested_value_of_attr_in_msg": 
          print "Its for me. I receive it" 
          # Это сообщение для меня. Я его приму и обработаю 
          s.ack(frame) 
         else: 
          # Это сообщение предназначено для кого-то другого и мне не подходит 
          print "Its not for me" 
        except Exception as e: 
         print e 
    

    또한 큐에서 MSG를 삭제 실험 테스트 코드를 추가 (삭제되지 않음)

    # -*- coding: utf-8 -*- 
    import activemq_api 
    import urllib3 
    import json 
    
    
    # Connection to ActiveMQ 
    BROKER_NAME = "localhost" 
    AMQ_API_PORT = 8161 
    AMQ_API_USER = "admin" 
    AMQ_API_PASS = "admin" 
    AMQ_API_POSTFIX = "/api/jolokia" 
    AMQ_TASK_QUEUE_NAME = "test" 
    BASIC_AUTH ='%s:%s' % (AMQ_API_USER, AMQ_API_PASS) 
    AMQ_STATUS_QUEUE = "/queue/test" 
    
    LOGIN_EXEMPT_URLS = [ 
        r'admin/' 
    ] 
    
    LOGIN_URL = 'url_login' 
    
    LOGOUT_REDIRECT_URL = 'url_login' 
    
    if __name__ == '__main__': 
        user_agent = "curl/7.49.1" 
        headers = urllib3.util.make_headers(basic_auth=BASIC_AUTH, user_agent=user_agent) 
        addition = { 
         "Content-Type": "application/x-www-form-urlencoded", 
         "Accept": "*/*" 
        } 
        try: 
         headers.update(addition) 
         connect = activemq_api.Connection(AMQ_IP, AMQ_API_PORT, BROKER_NAME, headers, AMQ_API_POSTFIX) 
         manager = activemq_api.AMQManager(connect) 
        except Exception as e: 
         print(u'%s: Превышено число максимальных попыток соединения к ActiveMQ' % e.__class__.__name__) 
        else: 
         print(u'Соединение успешно установлено') 
    
        try: 
         id="ID:№№№№№№№№№№№№№№№№№№-54825-1482598606528-3:586:-1:1:1" 
         secret="wertrtd-3fdf-4dfd-gr56-dfghdvhshtdfgdw" 
         print(manager.removeMsgQueue("test", id)) 
        except Exception as inst: 
         print inst 
    
    
    #!/usr/bin/python2 
    # -*- coding: utf-8 -*- 
    import urllib3 
    import json 
    
    class Connection: 
        def __init__(self, amq_ip, amq_port, broker, header, postfix): 
         self.BROKER_NAME = broker 
         self.AMQ_IP = amq_ip 
         self.AMQ_PORT = amq_port 
         self.HEADERS = header 
         self.POSTFIX = postfix 
    
    class AMQManager(): 
        def __init__(self, conn): 
         self.QUEUES = {} 
         self.QUEUES_COUNT = None 
         self.HEAP_MEMORY_USED = None 
         self.MEMORY_PERSENT_USED = None 
         self.CONNECTION = conn 
         self.update() 
    
        def rmQueue(self, queue_names): 
         REUQEST = { 
          "type": "exec", 
          "mbean": "org.apache.activemq:type=Broker,brokerName=%s" % self.CONNECTION.BROKER_NAME, 
          "operation": "removeQueue(java.lang.String)", 
          "arguments": [queue_names] 
         } 
         return json.dumps(REUQEST) 
    
        def queueList(self): 
         REUQEST = { 
          "type": "read", 
          "mbean": "org.apache.activemq:type=Broker,brokerName=%s" % self.CONNECTION.BROKER_NAME, 
          "attribute":"Queues" 
         } 
         return json.dumps(REUQEST) 
    
        def browseQueueSubscribers(self): 
         REUQEST = { 
          "type": "read", 
          "mbean": "org.apache.activemq:type=Broker,brokerName=%s" % self.CONNECTION.BROKER_NAME, 
          "attribute": "QueueSubscribers" 
         } 
         return json.dumps(REUQEST) 
    
        def memoryPersentUsed(self): 
         REUQEST = { 
          "type": "read", 
          "mbean": "org.apache.activemq:type=Broker,brokerName=%s" % self.CONNECTION.BROKER_NAME, 
          "attribute": "MemoryPercentUsage" 
         } 
         return json.dumps(REUQEST) 
    
        def heapMemoryUsed(self): 
         REUQEST = { 
          "type": "read", 
          "mbean": "java.lang:type=Memory", 
          "attribute":"HeapMemoryUsage", 
          "path":"used" 
         } 
         return json.dumps(REUQEST) 
    
        def request(self, name, param): 
         http = urllib3.PoolManager() 
         body = '' 
         if name == "removeQueue": 
          body = self.rmQueue(param["QUEUE_NAME"]) 
         elif name == "queueList": 
          body = self.queueList() 
         elif name == "browseQueueSubscribers": 
          body = self.browseQueueSubscribers() 
         elif name == "memoryPersentUsed": 
          body = self.memoryPersentUsed() 
         elif name == "heapMemoryUsed": 
          body = self.heapMemoryUsed() 
    
         url = "http://%s:%d%s" % (self.CONNECTION.AMQ_IP, self.CONNECTION.AMQ_PORT, self.CONNECTION.POSTFIX) 
         r = http.request('POST', url, headers=self.CONNECTION.HEADERS, body=body) 
         return r.data 
    
        def updateQueues(self): 
         res = json.loads(self.request("queueList", {})) 
         # print res 
         data = [] 
         for queue in res["value"]: 
          object = {} 
          queue["objectName"] = queue["objectName"].split(":")[1] 
          for key in queue["objectName"].split(","): 
           object.update({key.split("=")[0]: key.split("=")[1]}) 
          data.append(object) 
         self.QUEUES_COUNT = 0 
         self.QUEUES = {} 
         # print data 
         for queue in data: 
          self.QUEUES.update({queue["destinationName"]: Queue(queue["destinationName"], self.CONNECTION)}) 
          self.QUEUES_COUNT += 1 
    
        def updateHeapMem(self): 
         self.HEAP_MEMORY_USED = json.loads(self.request("heapMemoryUsed", {}))["value"] 
    
        def updatePersMem(self): 
         self.MEMORY_PERSENT_USED = json.loads(self.request("memoryPersentUsed", {}))["value"] 
    
    Ars, [26.01.17 14:06] 
    ## EXPORTABLE 
        def update(self): 
         self.updateQueues() 
         self.updateHeapMem() 
         self.updatePersMem()   
        ## EXPORTABLE 
        def getQueues(self): 
         self.updateQueues() 
         data = [] 
         for queue in self.QUEUES: 
          data.append(self.QUEUES[queue].getInfo()) 
         return { 
          "queues_count": self.QUEUES_COUNT, 
          "queues": data 
         } 
        ## EXPORTABLE 
        def getQueueInfo(self, name): 
         return self.QUEUES[name].getInfo() 
        ## EXPORTABLE 
        def browseQueue(self, name): 
         return self.QUEUES[name].browse() 
        ## EXPORTABLE 
        def getMessage(self, name, msg_id): 
         return self.QUEUES[name].message(msg_id) 
        def getAllQueueMessages(self, name): 
         return self.QUEUES[name].messages() 
        ## EXPORTABLE 
        def removeQueue(self, name): 
         param = { 
          "QUEUE_NAME": name 
         } 
         return json.loads(self.request("removeQueue", param)) 
        ## EXPORTABLE 
        def clearQueue(self, name): 
         return self.QUEUES[name].clear() 
        # ARS 
        def removeMsgQueue(self,nameQueue, id): 
         return self.QUEUES[nameQueue].delete_msg(id) 
    
    
    
    class Queue(): 
        def __init__(self, q_name, conn): 
         # научите обращаться к атрибутам суперкласса! 
         self.MESSAGES = [] 
         self.QUEUE_NAME = q_name 
         self.ENQUEUE_COUNT = None 
         self.DEQUEUE_COUNT = None 
         self.CONSUMER_COUNT = None 
         self.QUEUE_SIZE = None 
         self.CONNECTION = conn 
         self.updateEnCount() 
         self.updateDeCount() 
         self.updateCoCount() 
         self.updateQuSize() 
    
        def queueEnqueueCount(self): 
         # MSG_NAMES = ['JMSMessageID="ID:localhost-39797-1466874134889-3:1:-1:1:1"'] 
         REUQEST = { 
          "type": "read", 
          "mbean": "org.apache.activemq:type=Broker,brokerName=%s,destinationName=%s,destinationType=Queue" \ 
             % (self.CONNECTION.BROKER_NAME, self.QUEUE_NAME), 
          "attribute": "EnqueueCount" 
         } 
         return json.dumps(REUQEST) 
    
        def queueDequeueCount(self): 
         REUQEST = { 
          "type": "read", 
          "mbean": "org.apache.activemq:type=Broker,brokerName=%s,destinationName=%s,destinationType=Queue" \ 
             % (self.CONNECTION.BROKER_NAME, self.QUEUE_NAME), 
          "attribute": "DequeueCount" 
         } 
         return json.dumps(REUQEST) 
    
        def queueConsumerCount(self): 
         REUQEST = { 
          "type": "read", 
          "mbean": "org.apache.activemq:type=Broker,brokerName=%s,destinationName=%s,destinationType=Queue" \ 
             % (self.CONNECTION.BROKER_NAME, self.QUEUE_NAME), 
          "attribute": "ConsumerCount" 
         } 
         return json.dumps(REUQEST) 
    
        def queueSize(self): 
         REUQEST = { 
          "type": "read", 
          "mbean": "org.apache.activemq:type=Broker,brokerName=%s,destinationName=%s,destinationType=Queue" \ 
             % (self.CONNECTION.BROKER_NAME, self.QUEUE_NAME), 
          "attribute": "QueueSize" 
         } 
         return json.dumps(REUQEST) 
    
        def browseMessages(self): 
         REUQEST = { 
          "type": "exec", 
          "mbean": "org.apache.activemq:type=Broker,brokerName=%s,destinationName=%s,destinationType=Queue" \ 
             % (self.CONNECTION.BROKER_NAME, self.QUEUE_NAME), 
          "operation": "browse()", 
          # "arguments": [""] 
         } 
         return json.dumps(REUQEST) 
    
    Ars, [26.01.17 14:06] 
    def purge(self): 
         REUQEST = { 
          "type": "exec", 
          "mbean": "org.apache.activemq:type=Broker,brokerName=%s,destinationName=%s,destinationType=Queue" \ 
             % (self.CONNECTION.BROKER_NAME, self.QUEUE_NAME), 
          "operation": "purge()" 
         } 
         return json.dumps(REUQEST) 
        #ARS 
        def deleteMsg(self, ID): 
         REUQEST = { 
          "type": "exec", 
          "mbean": "org.apache.activemq:type=Broker,brokerName=%s,destinationName=%s,destinationType=Queue" \ 
            % (self.CONNECTION.BROKER_NAME, self.QUEUE_NAME), 
          "operation": "deleteMessage()", 
          "arguments": [ID, "11111111-1111-1111-1111-111111111111"] 
         } 
         return json.dumps(REUQEST) 
    
        def request(self, name, param): 
         http = urllib3.PoolManager() 
    
         if name == "queueEnqueueCount": 
          body = self.queueEnqueueCount() 
         elif name == "queueDequeueCount": 
          body = self.queueDequeueCount() 
         elif name == "queueConsumerCount": 
          body = self.queueConsumerCount() 
         elif name == "queueSize": 
          body = self.queueSize() 
         elif name == "browseMessages": 
          body = self.browseMessages() 
         elif name == "purge": 
          body = self.purge() 
         elif name == "delete_msg": 
          body = self.deleteMsg(param) 
    
         url = "http://%s:%d%s" % (self.CONNECTION.AMQ_IP, self.CONNECTION.AMQ_PORT, self.CONNECTION.POSTFIX) 
         r = http.request('POST', url, headers=self.CONNECTION.HEADERS, body=body) 
         return r.data 
    
        def updateEnCount(self): 
         try: 
          self.ENQUEUE_COUNT = json.loads(self.request("queueEnqueueCount", {}))["value"] 
         except Exception as inst: 
          self.ENQUEUE_COUNT = -1 
    
        def updateDeCount(self): 
         try: 
          self.DEQUEUE_COUNT = json.loads(self.request("queueDequeueCount", {}))["value"] 
         except Exception as inst: 
          self.ENQUEUE_COUNT = -1 
    
        def updateCoCount(self): 
         try: 
          self.CONSUMER_COUNT = json.loads(self.request("queueConsumerCount", {}))["value"] 
         except Exception as inst: 
          self.ENQUEUE_COUNT = -1 
    
        def updateQuSize(self): 
         try: 
          self.QUEUE_SIZE = json.loads(self.request("queueSize", {}))["value"] 
         except Exception as inst: 
          self.ENQUEUE_COUNT = -1 
    
        def updateMessages(self): 
         self.MESSAGES = [] 
         res = json.loads(self.request("browseMessages", {}))["value"] 
         for msg in res: 
          data = { 
           "id": msg["JMSMessageID"], 
           "data": msg["Text"], 
           "timestamp": msg["JMSTimestamp"], 
           "priority": msg["JMSPriority"]     
          } 
          self.MESSAGES.append(data) 
    
        def update(self): 
         self.updateEnCount() 
         self.updateDeCount() 
         self.updateCoCount() 
         self.updateQuSize() 
         self.updateMessages() 
    
        def getInfo(self): 
         self.updateEnCount() 
         self.updateDeCount() 
         self.updateCoCount() 
         self.updateQuSize() 
         return { 
          "queue_name": self.QUEUE_NAME, 
          "enqueue_count": self.ENQUEUE_COUNT, 
          "dequeue_count": self.DEQUEUE_COUNT, 
          "consumer_count": self.CONSUMER_COUNT, 
          "queue_size": self.QUEUE_SIZE 
         } 
    
        def browse(self): 
         self.updateMessages() 
         data = [] 
         for msg in self.MESSAGES: 
          chunk = { 
           "id": msg["id"], 
           "timestamp": msg["timestamp"], 
           "priority": msg["priority"] 
          } 
          data.append(chunk) 
         return data 
    
    Ars, [26.01.17 14:06] 
    def message(self, msg_id): 
         self.updateMessages() 
         for msg in self.MESSAGES: 
          if msg["id"] == msg_id: 
           return msg["data"] 
        # ARS 
        def messages(self): 
         self.updateMessages() 
         return self.MESSAGES 
    
        # ARS 
        def delete_msg(self, id): 
         return json.loads(self.request("delete_msg",id)) 
    
        def clear(self): 
         return json.loads(self.request("purge", {}))