이론적으로는 (이론적으로는 WireShark을 통한 패키지 트래픽 분석 결과, AMQ 관리 페이지 ActiveMQ의 localhost : 8161 \ admin 페이지에서 작업 브라우저를 사용하여 결과적으로 메시지를 삭제할 수없고 프로그래밍 방식으로 (Python) 삭제할 수 없습니다.
secret
이 - -가
id
[id,secret]
, 고유 번호 (사용될 수 있습니다 "토큰") 내가 (예 : F5)를 \ 관리자 \ 찾아 업데이트 할 때마다 변경되는 페이지. 나는 그것이 무엇인지 말할 수 없다 ....
거기에 이미지를보십시오 : https://ru.stackoverflow.com/q/618697/228254 내 대답을 아래로 게시하십시오.
예 테스트 queueu :
- ID : ID : #######의 NAME_SERVER의 ###### - 44458-1485427798954-6 : 1 : 1 : 1 : 1
- 비밀 : desision의 내 변종 1dbd2916-337a-48cc-bce7-63b00d38ba3이 순간
을했다 : 내가 \ 드롭 큐에서 삭제 nedd 큐와 ACK MSG의 모든 MSG를 얻을.
이 from stompy import stomp
import json
s = stomp.Stomp(amq_ip, amq_port)
try:
s.connect(username=amq_user, password=amq_pass)
s.subscribe({'destination': '%s' % amq_queue, 'ack': 'client'})
except Exception as e:
print "ActiveMQ error\n %s" % e
while True:
try:
frame = s.receive_frame()
body = json.loads(frame.body)
# это сообщение для меня?
if body["interested_atr_in_msg"] == "interested_value_of_attr_in_msg":
print "Its for me. I receive it"
# Это сообщение для меня. Я его приму и обработаю
s.ack(frame)
else:
# Это сообщение предназначено для кого-то другого и мне не подходит
print "Its not for me"
except Exception as e:
print e
또한 큐에서 MSG를 삭제 실험 테스트 코드를 추가 (삭제되지 않음)
# -*- coding: utf-8 -*-
import activemq_api
import urllib3
import json
# Connection to ActiveMQ
BROKER_NAME = "localhost"
AMQ_API_PORT = 8161
AMQ_API_USER = "admin"
AMQ_API_PASS = "admin"
AMQ_API_POSTFIX = "/api/jolokia"
AMQ_TASK_QUEUE_NAME = "test"
BASIC_AUTH ='%s:%s' % (AMQ_API_USER, AMQ_API_PASS)
AMQ_STATUS_QUEUE = "/queue/test"
LOGIN_EXEMPT_URLS = [
r'admin/'
]
LOGIN_URL = 'url_login'
LOGOUT_REDIRECT_URL = 'url_login'
if __name__ == '__main__':
user_agent = "curl/7.49.1"
headers = urllib3.util.make_headers(basic_auth=BASIC_AUTH, user_agent=user_agent)
addition = {
"Content-Type": "application/x-www-form-urlencoded",
"Accept": "*/*"
}
try:
headers.update(addition)
connect = activemq_api.Connection(AMQ_IP, AMQ_API_PORT, BROKER_NAME, headers, AMQ_API_POSTFIX)
manager = activemq_api.AMQManager(connect)
except Exception as e:
print(u'%s: Превышено число максимальных попыток соединения к ActiveMQ' % e.__class__.__name__)
else:
print(u'Соединение успешно установлено')
try:
id="ID:№№№№№№№№№№№№№№№№№№-54825-1482598606528-3:586:-1:1:1"
secret="wertrtd-3fdf-4dfd-gr56-dfghdvhshtdfgdw"
print(manager.removeMsgQueue("test", id))
except Exception as inst:
print inst
#!/usr/bin/python2
# -*- coding: utf-8 -*-
import urllib3
import json
class Connection:
def __init__(self, amq_ip, amq_port, broker, header, postfix):
self.BROKER_NAME = broker
self.AMQ_IP = amq_ip
self.AMQ_PORT = amq_port
self.HEADERS = header
self.POSTFIX = postfix
class AMQManager():
def __init__(self, conn):
self.QUEUES = {}
self.QUEUES_COUNT = None
self.HEAP_MEMORY_USED = None
self.MEMORY_PERSENT_USED = None
self.CONNECTION = conn
self.update()
def rmQueue(self, queue_names):
REUQEST = {
"type": "exec",
"mbean": "org.apache.activemq:type=Broker,brokerName=%s" % self.CONNECTION.BROKER_NAME,
"operation": "removeQueue(java.lang.String)",
"arguments": [queue_names]
}
return json.dumps(REUQEST)
def queueList(self):
REUQEST = {
"type": "read",
"mbean": "org.apache.activemq:type=Broker,brokerName=%s" % self.CONNECTION.BROKER_NAME,
"attribute":"Queues"
}
return json.dumps(REUQEST)
def browseQueueSubscribers(self):
REUQEST = {
"type": "read",
"mbean": "org.apache.activemq:type=Broker,brokerName=%s" % self.CONNECTION.BROKER_NAME,
"attribute": "QueueSubscribers"
}
return json.dumps(REUQEST)
def memoryPersentUsed(self):
REUQEST = {
"type": "read",
"mbean": "org.apache.activemq:type=Broker,brokerName=%s" % self.CONNECTION.BROKER_NAME,
"attribute": "MemoryPercentUsage"
}
return json.dumps(REUQEST)
def heapMemoryUsed(self):
REUQEST = {
"type": "read",
"mbean": "java.lang:type=Memory",
"attribute":"HeapMemoryUsage",
"path":"used"
}
return json.dumps(REUQEST)
def request(self, name, param):
http = urllib3.PoolManager()
body = ''
if name == "removeQueue":
body = self.rmQueue(param["QUEUE_NAME"])
elif name == "queueList":
body = self.queueList()
elif name == "browseQueueSubscribers":
body = self.browseQueueSubscribers()
elif name == "memoryPersentUsed":
body = self.memoryPersentUsed()
elif name == "heapMemoryUsed":
body = self.heapMemoryUsed()
url = "http://%s:%d%s" % (self.CONNECTION.AMQ_IP, self.CONNECTION.AMQ_PORT, self.CONNECTION.POSTFIX)
r = http.request('POST', url, headers=self.CONNECTION.HEADERS, body=body)
return r.data
def updateQueues(self):
res = json.loads(self.request("queueList", {}))
# print res
data = []
for queue in res["value"]:
object = {}
queue["objectName"] = queue["objectName"].split(":")[1]
for key in queue["objectName"].split(","):
object.update({key.split("=")[0]: key.split("=")[1]})
data.append(object)
self.QUEUES_COUNT = 0
self.QUEUES = {}
# print data
for queue in data:
self.QUEUES.update({queue["destinationName"]: Queue(queue["destinationName"], self.CONNECTION)})
self.QUEUES_COUNT += 1
def updateHeapMem(self):
self.HEAP_MEMORY_USED = json.loads(self.request("heapMemoryUsed", {}))["value"]
def updatePersMem(self):
self.MEMORY_PERSENT_USED = json.loads(self.request("memoryPersentUsed", {}))["value"]
Ars, [26.01.17 14:06]
## EXPORTABLE
def update(self):
self.updateQueues()
self.updateHeapMem()
self.updatePersMem()
## EXPORTABLE
def getQueues(self):
self.updateQueues()
data = []
for queue in self.QUEUES:
data.append(self.QUEUES[queue].getInfo())
return {
"queues_count": self.QUEUES_COUNT,
"queues": data
}
## EXPORTABLE
def getQueueInfo(self, name):
return self.QUEUES[name].getInfo()
## EXPORTABLE
def browseQueue(self, name):
return self.QUEUES[name].browse()
## EXPORTABLE
def getMessage(self, name, msg_id):
return self.QUEUES[name].message(msg_id)
def getAllQueueMessages(self, name):
return self.QUEUES[name].messages()
## EXPORTABLE
def removeQueue(self, name):
param = {
"QUEUE_NAME": name
}
return json.loads(self.request("removeQueue", param))
## EXPORTABLE
def clearQueue(self, name):
return self.QUEUES[name].clear()
# ARS
def removeMsgQueue(self,nameQueue, id):
return self.QUEUES[nameQueue].delete_msg(id)
class Queue():
def __init__(self, q_name, conn):
# научите обращаться к атрибутам суперкласса!
self.MESSAGES = []
self.QUEUE_NAME = q_name
self.ENQUEUE_COUNT = None
self.DEQUEUE_COUNT = None
self.CONSUMER_COUNT = None
self.QUEUE_SIZE = None
self.CONNECTION = conn
self.updateEnCount()
self.updateDeCount()
self.updateCoCount()
self.updateQuSize()
def queueEnqueueCount(self):
# MSG_NAMES = ['JMSMessageID="ID:localhost-39797-1466874134889-3:1:-1:1:1"']
REUQEST = {
"type": "read",
"mbean": "org.apache.activemq:type=Broker,brokerName=%s,destinationName=%s,destinationType=Queue" \
% (self.CONNECTION.BROKER_NAME, self.QUEUE_NAME),
"attribute": "EnqueueCount"
}
return json.dumps(REUQEST)
def queueDequeueCount(self):
REUQEST = {
"type": "read",
"mbean": "org.apache.activemq:type=Broker,brokerName=%s,destinationName=%s,destinationType=Queue" \
% (self.CONNECTION.BROKER_NAME, self.QUEUE_NAME),
"attribute": "DequeueCount"
}
return json.dumps(REUQEST)
def queueConsumerCount(self):
REUQEST = {
"type": "read",
"mbean": "org.apache.activemq:type=Broker,brokerName=%s,destinationName=%s,destinationType=Queue" \
% (self.CONNECTION.BROKER_NAME, self.QUEUE_NAME),
"attribute": "ConsumerCount"
}
return json.dumps(REUQEST)
def queueSize(self):
REUQEST = {
"type": "read",
"mbean": "org.apache.activemq:type=Broker,brokerName=%s,destinationName=%s,destinationType=Queue" \
% (self.CONNECTION.BROKER_NAME, self.QUEUE_NAME),
"attribute": "QueueSize"
}
return json.dumps(REUQEST)
def browseMessages(self):
REUQEST = {
"type": "exec",
"mbean": "org.apache.activemq:type=Broker,brokerName=%s,destinationName=%s,destinationType=Queue" \
% (self.CONNECTION.BROKER_NAME, self.QUEUE_NAME),
"operation": "browse()",
# "arguments": [""]
}
return json.dumps(REUQEST)
Ars, [26.01.17 14:06]
def purge(self):
REUQEST = {
"type": "exec",
"mbean": "org.apache.activemq:type=Broker,brokerName=%s,destinationName=%s,destinationType=Queue" \
% (self.CONNECTION.BROKER_NAME, self.QUEUE_NAME),
"operation": "purge()"
}
return json.dumps(REUQEST)
#ARS
def deleteMsg(self, ID):
REUQEST = {
"type": "exec",
"mbean": "org.apache.activemq:type=Broker,brokerName=%s,destinationName=%s,destinationType=Queue" \
% (self.CONNECTION.BROKER_NAME, self.QUEUE_NAME),
"operation": "deleteMessage()",
"arguments": [ID, "11111111-1111-1111-1111-111111111111"]
}
return json.dumps(REUQEST)
def request(self, name, param):
http = urllib3.PoolManager()
if name == "queueEnqueueCount":
body = self.queueEnqueueCount()
elif name == "queueDequeueCount":
body = self.queueDequeueCount()
elif name == "queueConsumerCount":
body = self.queueConsumerCount()
elif name == "queueSize":
body = self.queueSize()
elif name == "browseMessages":
body = self.browseMessages()
elif name == "purge":
body = self.purge()
elif name == "delete_msg":
body = self.deleteMsg(param)
url = "http://%s:%d%s" % (self.CONNECTION.AMQ_IP, self.CONNECTION.AMQ_PORT, self.CONNECTION.POSTFIX)
r = http.request('POST', url, headers=self.CONNECTION.HEADERS, body=body)
return r.data
def updateEnCount(self):
try:
self.ENQUEUE_COUNT = json.loads(self.request("queueEnqueueCount", {}))["value"]
except Exception as inst:
self.ENQUEUE_COUNT = -1
def updateDeCount(self):
try:
self.DEQUEUE_COUNT = json.loads(self.request("queueDequeueCount", {}))["value"]
except Exception as inst:
self.ENQUEUE_COUNT = -1
def updateCoCount(self):
try:
self.CONSUMER_COUNT = json.loads(self.request("queueConsumerCount", {}))["value"]
except Exception as inst:
self.ENQUEUE_COUNT = -1
def updateQuSize(self):
try:
self.QUEUE_SIZE = json.loads(self.request("queueSize", {}))["value"]
except Exception as inst:
self.ENQUEUE_COUNT = -1
def updateMessages(self):
self.MESSAGES = []
res = json.loads(self.request("browseMessages", {}))["value"]
for msg in res:
data = {
"id": msg["JMSMessageID"],
"data": msg["Text"],
"timestamp": msg["JMSTimestamp"],
"priority": msg["JMSPriority"]
}
self.MESSAGES.append(data)
def update(self):
self.updateEnCount()
self.updateDeCount()
self.updateCoCount()
self.updateQuSize()
self.updateMessages()
def getInfo(self):
self.updateEnCount()
self.updateDeCount()
self.updateCoCount()
self.updateQuSize()
return {
"queue_name": self.QUEUE_NAME,
"enqueue_count": self.ENQUEUE_COUNT,
"dequeue_count": self.DEQUEUE_COUNT,
"consumer_count": self.CONSUMER_COUNT,
"queue_size": self.QUEUE_SIZE
}
def browse(self):
self.updateMessages()
data = []
for msg in self.MESSAGES:
chunk = {
"id": msg["id"],
"timestamp": msg["timestamp"],
"priority": msg["priority"]
}
data.append(chunk)
return data
Ars, [26.01.17 14:06]
def message(self, msg_id):
self.updateMessages()
for msg in self.MESSAGES:
if msg["id"] == msg_id:
return msg["data"]
# ARS
def messages(self):
self.updateMessages()
return self.MESSAGES
# ARS
def delete_msg(self, id):
return json.loads(self.request("delete_msg",id))
def clear(self):
return json.loads(self.request("purge", {}))
어떻게 삭제하려면이 작업을 수행하기위한
이것은 간단한 클라이언트의 내 대구입니다 REST를 사용하여 메시지? – Arseny