2017-04-25 6 views
1

파이썬에서 콜백 함수와 핸들러를 사용하여 이상한 현상이 발생했습니다. ZMQ를 사용하여 통신을 처리하고 소켓 용 스트림을 사용합니다. 나는 기본 클래스가 있습니다콜백 함수가 인스턴스에서 올바른 값을 보지 않습니다.

import multiprocessing  
import zmq 
from concurrent.futures import ThreadPoolExecutor 
from zmq.eventloop import ioloop, zmqstream 
from zmq.utils import jsonapi as json 

# Types of messages 
TYPE_A = 'type_a' 
TYPE_B = 'type_b' 


class ZmqProcess(multiprocessing.Process): 
    def __init__(self): 
     super(ZmqProcess, self).__init__() 
     self.context = None 
     self.loop = None 
     self.handle_stream = None 

    def setup(self): 
     self.context = zmq.Context() 
     self.loop = ioloop.IOLoop.instance() 

    def send(self, msg_type, msg, host, port): 
     sock = zmq.Context().socket(zmq.PAIR) 
     sock.connect('tcp://%s:%s' % (host, port)) 
     sock.send_json([msg_type, msg]) 

    def stream(self, sock_type, addr): 
     sock = self.context.socket(sock_type) 
      if isinstance(addr, str): 
      addr = addr.split(':') 
     host, port = addr if len(addr) == 2 else (addr[0], None) 
      if port: 
      sock.bind('tcp://%s:%s' % (host, port)) 
     else: 
      port = sock.bind_to_random_port('tcp://%s' % host) 
     stream = zmqstream.ZMQStream(sock, self.loop)  
     return stream, int(port) 

class MessageHandler(object): 
    def __init__(self, json_load=-1): 
     self._json_load = json_load 
     self.pool = ThreadPoolExecutor(max_workers=10) 

    def __call__(self, msg): 
     i = self._json_load 
     msg_type, data = json.loads(msg[i]) 
     msg[i] = data 
     if msg_type.startswith('_'): 
      raise AttributeError('%s starts with an "_"' % msg_type) 
     getattr(self, msg_type)(*msg) 

을 그리고 나는 그것에서 상속하는 클래스가 있습니다

import zmq  
import zmq_base  

class ZmqServerMeta(zmq_base.ZmqProcess): 
    def __init__(self, bind_addr, handlers): 
     super(ZmqServerMeta, self).__init__() 
     self.bind_addr = bind_addr 
     self.handlers = handlers 

    def setup(self): 
     super(ZmqServerMeta, self).setup() 
     self.handle_stream, _ = self.stream(zmq.PAIR, self.bind_addr) 
     self.handle_stream.on_recv(StreamHandler(self.handle_stream, self.stop, 
               self.handlers)) 

    def run(self): 
     self.setup() 
     self.loop.start() 

    def stop(self): 
     self.loop.stop() 

class StreamHandler(zmq_base.MessageHandler): 
    def __init__(self, handle_stream, stop, handlers): 
     super(StreamHandler, self).__init__() 
     self._handle_stream = handle_stream 
     self._stop = stop 
     self._handlers = handlers 

    def type_a(self, data): 
     if zmq_base.TYPE_A in self._handlers: 
      if self._handlers[zmq_base.TYPE_A]: 
       for handle in self._handlers[zmq_base.TYPE_A]: 
        self.pool.submit(handle, data) 
      else: 
       pass 
     else: 
      pass 

    def type_b(self, data): 
     if zmq_base.TYPE_B in self._handlers: 
      if self._handlers[zmq_base.TYPE_B]: 
       for handle in self._handlers[zmq_base.TYPE_B]: 
        self.pool.submit(handle, data) 
      else: 
       pass 
     else: 
      pass 

    def endit(self): 
     self._stop() 

이 또한 내가 스토리지로 사용하려는 클래스가 있습니다. 문제의 시작 위치는 다음과 같습니다.

import threading 
import zmq_server_meta as server 
import zmq_base as base 


class Storage: 
    def __init__(self): 
     self.list = [] 

     self.list_lock = threading.RLock() 

     self.zmq_server = None 
     self.host = '127.0.0.1' 
     self.port = 5432 
     self.bind_addr = (self.host, self.port) 

    def setup(self): 
     handlers = {base.TYPE_A: [self. remove]} 
     self.zmq_server = server.ZmqServerMeta(handlers=handlers, bind_addr=self.bind_addr) 
     self.zmq_server.start() 

    def add(self, data): 
     with self.list_lock: 
      try: 
       self.list.append(data) 
      except: 
       print "Didn't work" 

    def remove(self, msg): 
     with self.list_lock: 
      try: 
       self.list.remove(msg) 
      except: 
       print "Didn't work" 

아이디어는 클래스가 수신하는 일부 글로벌 정보를 저장한다는 것입니다. 그것은 모든 테스트 파일에 시작 :

import sys 
import time 
import storage 
import zmq_base as base 
import zmq_server_meta as server 



def printMsg(msg): 
    print msg 

store = storage.Storage() 

store.setup() 
handlers = {base.TYPE_B: [printMsg]} 
client = server.ZmqServerMeta(handlers=handlers, bind_addr=('127.0.0.1', 5431)) 
client.start() 

message = "Test" 

store.add(message) 
client.send(base.TYPE_A, message, '127.0.0.1', 5432) 

내가 혼란을 줄이기 위해 그것을 단순화. 그냥 추가하는 대신 대개 전송하고 응답이 다시옵니다. 클라이언트가 보내는 응답은 올바른 콜백 인 remove()에 의해 처리되어야하며 목록에서 제거해야합니다. 목록에서 요소가 있어야하지만 remove() 함수는 빈 목록을 보는 문제가 발생합니다. 테스트 파일을 검사하면 추가 된 요소를 볼 수 있습니다. 거기에서 remove()를 호출하면 비어 있지 않은 목록을보고 제거 할 수 있습니다. 내 질문은, 왜 콜백 빈 목록을보고 어떻게 목록에서 올바른 요소를 볼 수 있는지 확인할 수 있습니까?

종류는 패트릭

답변

1

내가 문제가 ZmqProcess 클래스 multiprocessing.Process에서 상속 있다는 사실에 낳는 생각 간주한다. 다중 처리는 값 또는 배열을 사용하는 공유 메모리 맵을 사용하는 것을 제외하고는 다른 프로세스간에 개체를 공유 할 수 없습니다 (https://docs.python.org/3/library/multiprocessing.html#sharing-state-between-processes)

사용자 지정 개체를 사용하려면 서버 프로세스/프록시 객체. 문서의 동일한 페이지에서 찾을 수 있습니다.

예를 들어, self.manager = Manager()과 같이 Storage 클래스의 init 함수에서 관리자를 정의 할 수 있습니다. 나중에 self.list = self.manager.list()을 입력합니다. 이것은 트릭을해야합니다.