2012-01-07 2 views
0

일련의 프로세스가 있으며 서로 통신해야하는 A, B 및 C라고 부르 자고합니다. A는 B와 C와 통신해야합니다. B는 A와 C와 통신해야합니다. C는 A와 B와 통신해야합니다. A, B, C는 서로 다른 기계 또는 동일한 시스템에 위치 할 수 있습니다.SocketServer 인스턴스와의 파이썬 다중 처리 통신

제 생각에 소켓을 통해 통신하고 모든 컴퓨터가 동일한 경우 (예 : A 포트 11111, B 포트 22222 등) "localhost"를 사용하는 것이 었습니다. 이렇게하면 비 로컬 프로세스가 로컬 프로세스처럼 처리됩니다. 이를 위해 A, B 및 C 각각에 대해 SocketServer 인스턴스를 설정하고 각각 다른 인스턴스의 주소를 알고 있다고 생각했습니다. 통신이 필요한 경우 (예 : A에서 B), A는 B 소켓을 열고 데이터를 씁니다. 그러면 B가 끊임없이 운영되는 서버가 데이터를 읽고 나중에 필요할 때 사용할 수 있도록 목록에 저장합니다.

내가 겪고있는 문제는 저장된 정보가 finish_request 메서드 (듣기 처리 중)와 __call__ (처리 중임) 메서드간에 공유되지 않는다는 것입니다. (서버 클래스는 다른 것을 필요로하기 때문에 호출 할 수 있습니다. 문제의 원인과 관련이 있다고 생각하지 않습니다.)

내가 상상 한 것처럼 내 질문에 답변 할 수 있을까요? multiprocessing, threadingsocketserver이 모두 같은 기기에서 잘 작동합니까? (Queue 또는 Pipe 같은) 프로세스간에 통신하는 다른 메커니즘을 사용하는 데 관심이 없습니다. 나는 그것들과 함께 작동하는 해결책을 가지고있다. 효율성이 떨어지더라도이 방법이 가능한지 알고 싶습니다. 그리고 만약 그것이 그렇다면, 내가 뭘 잘못해서 일하는 것을 막고 있습니까?

문제를 나타내는 최소한의 예는 아래와 같다 :

 run_it:: Port: 11111 MID: 3071227860 Size: 0 
     __call__:: From: 11111 To: 22222 MID: 3071227860 Size: 0 -- e00e0891e0714f99b86e9ad743731a00 
finish_request:: From: 60782 To: 22222 MID: 3071227972 Size: 10 -- e00e0891e0714f99b86e9ad743731a00 

"MID"는 id이다 :이 예에서 출력 찾고

import uuid 
import sys 
import socket 
import time 
import threading 
import collections 
import SocketServer 
import multiprocessing 

class NetworkMigrator(SocketServer.ThreadingMixIn, SocketServer.TCPServer): 
    def __init__(self, server_address, client_addresses, max_migrants=1): 
     SocketServer.TCPServer.__init__(self, server_address, None) 
     self.client_addresses = client_addresses 
     self.migrants = collections.deque(maxlen=max_migrants) 
     self.allow_reuse_address = True 
     t = threading.Thread(target=self.serve_forever) 
     t.daemon = True 
     t.start() 

    def finish_request(self, request, client_address): 
     try: 
      rbufsize = -1 
      wbufsize = 0 
      rfile = request.makefile('rb', rbufsize) 
      wfile = request.makefile('wb', wbufsize) 

      data = rfile.readline().strip() 
      self.migrants.append(data) 
      print("finish_request:: From: %d To: %d MID: %d Size: %d -- %s" % (client_address[1], 
                        self.server_address[1], 
                        id(self.migrants), 
                        len(self.migrants), 
                        data)) 

      if not wfile.closed: 
       wfile.flush() 
      wfile.close() 
      rfile.close()   
     finally: 
      sys.exc_traceback = None 

    def __call__(self, random, population, args): 
     client_address = random.choice(self.client_addresses) 
     migrant_index = random.randint(0, len(population) - 1) 
     data = population[migrant_index] 
     data = uuid.uuid4().hex 
     sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
     try: 
      sock.connect(client_address) 
      sock.send(data + '\n') 
     finally: 
      sock.close() 
     print("  __call__:: From: %d To: %d MID: %d Size: %d -- %s" % (self.server_address[1], 
                       client_address[1], 
                       id(self.migrants), 
                       len(self.migrants), 
                       data)) 
     if len(self.migrants) > 0: 
      migrant = self.migrants.popleft() 
      population[migrant_index] = migrant 
     return population 


def run_it(migrator, rand, pop): 
    for i in range(10): 
     pop = migrator(r, pop, {}) 
     print("  run_it:: Port: %d MID: %d Size: %d" % (migrator.server_address[1], 
                   id(migrator.migrants), 
                   len(migrator.migrants))) 
     time.sleep(1) 


if __name__ == '__main__': 
    import random 
    r = random.Random() 
    a = ('localhost', 11111) 
    b = ('localhost', 22222) 
    c = ('localhost', 33333) 
    am = NetworkMigrator(a, [b, c], max_migrants=11) 
    bm = NetworkMigrator(b, [a, c], max_migrants=22) 
    cm = NetworkMigrator(c, [a, b], max_migrants=33) 

    fun = [am, bm, cm] 
    pop = [["larry", "moe", "curly"], ["red", "green", "blue"], ["small", "medium", "large"]] 
    jobs = [] 
    for f, p in zip(fun, pop): 
     pro = multiprocessing.Process(target=run_it, args=(f, r, p)) 
     jobs.append(pro) 
     pro.start() 
    for j in jobs: 
     j.join() 
    am.shutdown() 
    bm.shutdown() 
    cm.shutdown() 

인쇄의 세 가지 유형이있을 것 그 경우에 migrants 대기열이없는 경우. "From"및 "To"는 전송을 보내거나받는 포트입니다. 그리고 데이터를 임의의 16 진수 문자열로 설정하면 개별 전송을 추적 할 수 있습니다.

같은 MID라도 어느 정도 크기가 0이 아니라고 말할 것이고, 나중에 나중에 크기가 0이라고 말할 것입니다. 나는 그것이 줄기 같아야한다고 생각합니다. 호출이 다중 스레드된다는 사실로부터 이 라인 대신 최종 2 개 for 루프로 사용되는 경우, 시스템은 내가 기대하는 방식으로 작동 :

for _ in range(10): 
    for f, p in zip(fun, pop): 
     f(r, p, {}) 
     time.sleep(1) 

그래서 그것을 깨 멀티 버전 일 이죠?

답변

1

우리는 3 개의 새로운 NetworkMigrator 객체를 만들 때 각각 3 개의 새로운 스레드가 시작되어 각각 새로운 TCP 연결을 수신 대기합니다. 나중에 우리는 run_it 함수를 위해 3 개의 새로운 프로세스를 시작한다. 전체적으로 4 개의 프로세스가 있으며 첫 번째 프로세스에는 4 개의 스레드가 포함됩니다 (1 개의 주 + 3 서버). 이제 문제는 다른 3 개의 프로세스가 수신 서버 스레드에 의해 변경된 개체에 액세스하지 못한다는 것입니다. 이것은 프로세스가 기본적으로 메모리를 공유하지 않기 때문입니다.

pro = threading.Thread(target=run_it,args=(f,r,p)) 

다른 사소한 문제가있다 : 당신이 대신 프로세스의 3 개 새 스레드를 시작하면

그래서, 당신은 그 차이를 알 수 있습니다. 스레드 간의 이러한 공유는 완전히 안전하지 않습니다. 객체의 상태를 변경할 때마다 잠금을 사용하는 것이 가장 좋습니다. 다음과 같이 finish_request와 메서드를 호출하는 것이 가장 좋습니다.

lock = Lock() 
... 
lock.acquire()  
self.migrants.append(data) 
lock.release() 

당신은 멀티 스레딩에 만족하고 여기에 설명 된대로 당신은 프록시 객체를 사용할 수있는 멀티 프로세싱을 원하십니까 경우 이 http://docs.python.org/library/multiprocessing.html#proxy-objects

오브젝트 ID의이 같은 인에 관해서는, 즉 예상치 못한 없습니다. 새로운 프로세스는 그 시점에서 객체의 상태 (객체 ID 포함)에 전달됩니다. 새로운 프로세스는 이러한 객체 ID를 유지하기 위해 계속 진행되지만 서로 다른 프로세스이므로 두 개의 완전히 다른 메모리 공간에 대해 이야기하고 있습니다. 따라서 주 프로세스에서 변경 한 내용은 작성된 하위 프로세스에 반영되지 않습니다.

+0

우수한 sleuthing. 매우 명확한. 정말 고마워. – agarrett