파이썬에서 콜백 함수와 핸들러를 사용하여 이상한 현상이 발생했습니다. ZMQ를 사용하여 통신을 처리하고 소켓 용 스트림을 사용합니다. 나는 기본 클래스가 있습니다콜백 함수가 인스턴스에서 올바른 값을 보지 않습니다.
import multiprocessing
import zmq
from concurrent.futures import ThreadPoolExecutor
from zmq.eventloop import ioloop, zmqstream
from zmq.utils import jsonapi as json
# Types of messages
TYPE_A = 'type_a'
TYPE_B = 'type_b'
class ZmqProcess(multiprocessing.Process):
def __init__(self):
super(ZmqProcess, self).__init__()
self.context = None
self.loop = None
self.handle_stream = None
def setup(self):
self.context = zmq.Context()
self.loop = ioloop.IOLoop.instance()
def send(self, msg_type, msg, host, port):
sock = zmq.Context().socket(zmq.PAIR)
sock.connect('tcp://%s:%s' % (host, port))
sock.send_json([msg_type, msg])
def stream(self, sock_type, addr):
sock = self.context.socket(sock_type)
if isinstance(addr, str):
addr = addr.split(':')
host, port = addr if len(addr) == 2 else (addr[0], None)
if port:
sock.bind('tcp://%s:%s' % (host, port))
else:
port = sock.bind_to_random_port('tcp://%s' % host)
stream = zmqstream.ZMQStream(sock, self.loop)
return stream, int(port)
class MessageHandler(object):
def __init__(self, json_load=-1):
self._json_load = json_load
self.pool = ThreadPoolExecutor(max_workers=10)
def __call__(self, msg):
i = self._json_load
msg_type, data = json.loads(msg[i])
msg[i] = data
if msg_type.startswith('_'):
raise AttributeError('%s starts with an "_"' % msg_type)
getattr(self, msg_type)(*msg)
을 그리고 나는 그것에서 상속하는 클래스가 있습니다
import zmq
import zmq_base
class ZmqServerMeta(zmq_base.ZmqProcess):
def __init__(self, bind_addr, handlers):
super(ZmqServerMeta, self).__init__()
self.bind_addr = bind_addr
self.handlers = handlers
def setup(self):
super(ZmqServerMeta, self).setup()
self.handle_stream, _ = self.stream(zmq.PAIR, self.bind_addr)
self.handle_stream.on_recv(StreamHandler(self.handle_stream, self.stop,
self.handlers))
def run(self):
self.setup()
self.loop.start()
def stop(self):
self.loop.stop()
class StreamHandler(zmq_base.MessageHandler):
def __init__(self, handle_stream, stop, handlers):
super(StreamHandler, self).__init__()
self._handle_stream = handle_stream
self._stop = stop
self._handlers = handlers
def type_a(self, data):
if zmq_base.TYPE_A in self._handlers:
if self._handlers[zmq_base.TYPE_A]:
for handle in self._handlers[zmq_base.TYPE_A]:
self.pool.submit(handle, data)
else:
pass
else:
pass
def type_b(self, data):
if zmq_base.TYPE_B in self._handlers:
if self._handlers[zmq_base.TYPE_B]:
for handle in self._handlers[zmq_base.TYPE_B]:
self.pool.submit(handle, data)
else:
pass
else:
pass
def endit(self):
self._stop()
이 또한 내가 스토리지로 사용하려는 클래스가 있습니다. 문제의 시작 위치는 다음과 같습니다.
import threading
import zmq_server_meta as server
import zmq_base as base
class Storage:
def __init__(self):
self.list = []
self.list_lock = threading.RLock()
self.zmq_server = None
self.host = '127.0.0.1'
self.port = 5432
self.bind_addr = (self.host, self.port)
def setup(self):
handlers = {base.TYPE_A: [self. remove]}
self.zmq_server = server.ZmqServerMeta(handlers=handlers, bind_addr=self.bind_addr)
self.zmq_server.start()
def add(self, data):
with self.list_lock:
try:
self.list.append(data)
except:
print "Didn't work"
def remove(self, msg):
with self.list_lock:
try:
self.list.remove(msg)
except:
print "Didn't work"
아이디어는 클래스가 수신하는 일부 글로벌 정보를 저장한다는 것입니다. 그것은 모든 테스트 파일에 시작 :
import sys
import time
import storage
import zmq_base as base
import zmq_server_meta as server
def printMsg(msg):
print msg
store = storage.Storage()
store.setup()
handlers = {base.TYPE_B: [printMsg]}
client = server.ZmqServerMeta(handlers=handlers, bind_addr=('127.0.0.1', 5431))
client.start()
message = "Test"
store.add(message)
client.send(base.TYPE_A, message, '127.0.0.1', 5432)
내가 혼란을 줄이기 위해 그것을 단순화. 그냥 추가하는 대신 대개 전송하고 응답이 다시옵니다. 클라이언트가 보내는 응답은 올바른 콜백 인 remove()에 의해 처리되어야하며 목록에서 제거해야합니다. 목록에서 요소가 있어야하지만 remove() 함수는 빈 목록을 보는 문제가 발생합니다. 테스트 파일을 검사하면 추가 된 요소를 볼 수 있습니다. 거기에서 remove()를 호출하면 비어 있지 않은 목록을보고 제거 할 수 있습니다. 내 질문은, 왜 콜백 빈 목록을보고 어떻게 목록에서 올바른 요소를 볼 수 있는지 확인할 수 있습니까?
종류는 패트릭