2017-02-21 1 views
1

Windows에서 멀티 프로세싱 ServerApp을 작동 시키려고합니다. 나는 os.fork() 기능이 없어서 socket을 피할 수없는 방법으로 전달해야 할 것입니다 (?!).Python3 Windows multiprocessing이 소켓을 처리하도록 처리합니다.

내가 here 표시하지만, 그 방법은 파이썬 3에서 사용할 수없는 것처럼이 multiprocessing.reduction에서 reduce_handlerebuild_handle를 사용 할 수있을 것으로 보았다 (?!). 사용할 수있는 duplicatesteal_handle을 사용할 수는 있지만 사용할 방법이나 예제가 필요하지는 않습니다.

또한 새 프로세스를 만들 때 logging이 문제가 될지 알고 싶습니다.

가 여기 내 ServerApp 샘플입니다 :

import logging 
import socket 

from select import select 
from threading import Thread 
from multiprocessing import Queue 
from multiprocessing import Process 
from sys import stdout 
from time import sleep 


class ServerApp(object): 

    logger = logging.getLogger(__name__) 
    logger.setLevel(logging.DEBUG) 
    handler = logging.StreamHandler(stdout) 
    formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s') 
    handler.setFormatter(formatter) 
    logger.addHandler(handler) 


    def conn_handler(self, connection, address, buffer): 

     self.logger.info("[%d] - Connection from %s:%d", self.id, address[0], address[1]) 

     try: 
      while True: 

       command = None 
       received_data = b'' 
       readable, writable, exceptional = select([connection], [], [], 0) # Check for client commands 

       if readable: 
        # Get Command ... There is more code here 
        command = 'Something' 


       if command == 'Something': 
        connection.sendall(command_response) 
       else: 
        print(':(') 

     except Exception as e: 
      print(e) 
     finally: 
      connection.close() 
      self.client_buffers.remove(buffer) 
      self.logger.info("[%d] - Connection from %s:%d has been closed.", self.id, address[0], address[1]) 


    def join(self): 

     while self.listener.is_alive(): 
      self.listener.join(0.5) 


    def acceptor(self): 

     while True: 
      self.logger.info("[%d] - Waiting for connection on %s:%d", self.id, self.ip, self.port) 

      # Accept a connection on the bound socket and fork a child process to handle it. 
      conn, address = self.socket.accept() 

      # Create Queue which will represent buffer for specific client and add it o list of all client buffers 
      buffer = Queue() 
      self.client_buffers.append(buffer) 

      process = Process(target=self.conn_handler, args=(conn, address, buffer)) 
      process.daemon = True 
      process.start() 
      self.clients.append(process) 

      # Close the connection fd in the parent, since the child process has its own reference. 
      conn.close() 


    def __init__(self, id, port=4545, ip='127.0.0.1', method='tcp', buffer_size=2048): 

     self.id = id 
     self.port = port 
     self.ip = ip 

     self.socket = None 
     self.listener = None 
     self.buffer_size = buffer_size 

     # Additional attributes here.... 

     self.clients = [] 
     self.client_buffers = [] 


    def run(self): 

     # Create TCP socket, bind port and listen for incoming connections 
     self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
     self.socket.bind((self.ip, self.port)) 
     self.socket.listen(5) 

     self.listener = Thread(target=self.acceptor) # Run acceptor thread to handle new connection 
     self.listener.daemon = True 
     self.listener.start() 
+0

일부 코드를 작성했지만 어떤 '프로토콜'정의도 볼 수 없습니다. 이미 수락 된 경우 수용 규칙을 정의 할 수 없습니다 (필터 란 무엇입니까?). – dsgdfg

+0

@dsgdfg 내가 맞았는지 확실하지 않지만 각 연결을 수락하고 별도의 프로세스에서 처리해야합니다. – sstevan

답변

3

연결 산 세척 (소켓 포함) 허용하기 python3를 들어, 당신은 mulitprocessing.allow_connection_pickling를 사용해야합니다. 소켓 용 감속기를 ForkingPickler에 등록합니다. 예를 들어 :

import socket 
import multiprocessing as mp 
mp.allow_connection_pickling() 


def _test_connection(conn): 
    msg = conn.recv(2) 
    conn.send(msg) 
    conn.close() 
    print("ok") 

if __name__ == '__main__': 
    server, client = socket.socketpair() 

    p = mp.Process(target=_test_connection, args=(server,)) 
    p.start() 

    client.settimeout(5) 

    msg = b'42' 
    client.send(msg) 
    assert client.recv(2) == msg 

    p.join() 
    assert p.exitcode == 0 

    client.close() 
    server.close() 

나는 또한 당신이 socket의 산세에 unrealted 다른 문제가 나타났습니다.

  • 대상으로 사용 self.conn_handler는 멀티 프로세싱은 전체 개체 self 피클을 시도합니다

    . 물체가 피클 할 수없는 일부 Thread을 포함하고 있기 때문에 문제가됩니다. 따라서 대상 함수가 닫히면 self을 제거해야합니다. @staticmethod 데코레이터를 사용하고 기능에서 self의 모든 언급을 제거하여 수행 할 수 있습니다.

  • 또한 logging 모듈은 여러 프로세스를 처리하지 않습니다. 기본적으로 시작된 Process의 모든 로그는 현재 코드로 손실됩니다. 이 문제를 해결하려면 두 번째 Process (시작 부분이 conn_handler)을 시작하거나 multiprocessing 로깅 유틸리티를 사용하면 새로운 logging을 시작할 수 있습니다.

이 캔은 다음과 같이 제공합니다

import logging 
import socket 

from select import select 
from threading import Thread 
from multiprocessing import util, get_context 
from sys import stdout 
from time import sleep 

util.log_to_stderr(20) 
ctx = get_context("spawn") 


class ServerApp(object): 

    logger = logging.getLogger(__name__) 
    logger.setLevel(logging.DEBUG) 
    handler = logging.StreamHandler(stdout) 
    formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s') 
    handler.setFormatter(formatter) 
    logger.addHandler(handler) 

    def __init__(self, id, port=4545, ip='127.0.0.1', method='tcp', 
       buffer_size=2048): 

     self.id = id 
     self.port = port 
     self.ip = ip 

     self.socket = None 
     self.listener = None 
     self.buffer_size = buffer_size 

     # Additional attributes here.... 

     self.clients = [] 
     self.client_buffers = [] 

    @staticmethod 
    def conn_handler(id, connection, address, buffer): 

     print("test") 
     util.info("[%d] - Connection from %s:%d", id, address[0], address[1]) 

     try: 
      while True: 

       command = None 
       received_data = b'' 
       # Check for client commands 
       readable, writable, exceptional = select([connection], [], [], 
                 0) 

       if readable: 
        # Get Command ... There is more code here 
        command = 'Something' 

       if command == 'Something': 
        connection.sendall(b"Coucouc") 
        break 
       else: 
        print(':(') 
       sleep(.1) 

     except Exception as e: 
      print(e) 
     finally: 
      connection.close() 
      util.info("[%d] - Connection from %s:%d has been closed.", id, 
        address[0], address[1]) 
      print("Close") 

    def join(self): 

     while self.listener.is_alive(): 
      self.listener.join(0.5) 

    def acceptor(self): 

     while True: 
      self.logger.info("[%d] - Waiting for connection on %s:%d", self.id, 
          self.ip, self.port) 

      # Accept a connection on the bound socket and fork a child process 
      # to handle it. 
      conn, address = self.socket.accept() 

      # Create Queue which will represent buffer for specific client and 
      # add it o list of all client buffers 
      buffer = ctx.Queue() 
      self.client_buffers.append(buffer) 

      process = ctx.Process(target=self.conn_handler, 
           args=(self.id, conn, address, buffer)) 
      process.daemon = True 
      process.start() 
      self.clients.append(process) 

      # Close the connection fd in the parent, since the child process 
      # has its own reference. 
      conn.close() 

    def run(self): 

     # Create TCP socket, bind port and listen for incoming connections 
     self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
     self.socket.bind((self.ip, self.port)) 
     self.socket.listen(5) 

     # Run acceptor thread to handle new connection 
     self.listener = Thread(target=self.acceptor) 
     self.listener.daemon = True 
     self.listener.start() 

     self.listener.join() 


def main(): 
    app = ServerApp(0) 
    app.run() 


if __name__ == '__main__': 
    main() 

난 단지 유닉스와 python3.6에 테스트를하지만 난 창에서 스폰 컨텍스트 , which should behave like the Process`를 사용할 때 너무 다른 행동을하지 말았어야 .

+0

지연에 대해 죄송합니다. 나는'allow_connection_pickling'을 테스트했지만 효과가 없습니다. 그러나, 나는 두 개의 다른 오류 (동일한 코드를 두 번 실행)가 나타납니다. 여기에 [첫 번째] (http://pastebin.com/zyS2Sbtm)와 [두 번째 기사] (http://pastebin.com/7FQ1nvxN)가 있습니다. 리스너를 ServerApp 속성 ('self.listener' 대신'listener' 만)으로 지정하지 않으면 에러가 발생하지 않지만 핸들러 프로세스도 실행되지 않습니다. – sstevan

+0

이것은'socket' 피클 링과 관련된 문제가 아닙니다. 에러를 읽으면,'_thread.Lock'과 몇몇'io' 오브젝트를 pickle 할 수 없습니다.나는 이것이 새로운'Process'를 시작하기 위해 인스턴스 메소드를 사용할 때 필요한 전체'ServerApp' 객체의 절편과 관련이 있다고 말할 수 있습니다. conn_handler에'@ staticmethod' 데코레이터를 사용하거나 클래스에서 제거해야합니다. 이것은 전체 개체를 피클하는 것이 안전하지 않기 때문에 (예를 들어 암호를 처리하는 경우) 좋은 방법이기도합니다. 또한 테스트를 위해 오류를 재생산하는 기본 스크립트를 제공하십시오. –

+0

또한 실용적인 방법으로,'process'에서 일관된 로깅을 얻기 위해서는'multiprocessing.utils.log_to_stderr'와'multiprocessing.utils.debug/info'를 사용해야합니다. 커스텀 로깅을 사용할 필요가 있다면,'logging'은 오직 하나의'Process'만으로 작동하도록 설계되었으므로 타겟 함수의 시작 부분에서 시작해야합니다. –