일련의 프로세스가 있으며 서로 통신해야하는 A, B 및 C라고 부르 자고합니다. A는 B와 C와 통신해야합니다. B는 A와 C와 통신해야합니다. C는 A와 B와 통신해야합니다. A, B, C는 서로 다른 기계 또는 동일한 시스템에 위치 할 수 있습니다.SocketServer 인스턴스와의 파이썬 다중 처리 통신
제 생각에 소켓을 통해 통신하고 모든 컴퓨터가 동일한 경우 (예 : A 포트 11111, B 포트 22222 등) "localhost"를 사용하는 것이 었습니다. 이렇게하면 비 로컬 프로세스가 로컬 프로세스처럼 처리됩니다. 이를 위해 A, B 및 C 각각에 대해 SocketServer 인스턴스를 설정하고 각각 다른 인스턴스의 주소를 알고 있다고 생각했습니다. 통신이 필요한 경우 (예 : A에서 B), A는 B 소켓을 열고 데이터를 씁니다. 그러면 B가 끊임없이 운영되는 서버가 데이터를 읽고 나중에 필요할 때 사용할 수 있도록 목록에 저장합니다.
내가 겪고있는 문제는 저장된 정보가 finish_request
메서드 (듣기 처리 중)와 __call__
(처리 중임) 메서드간에 공유되지 않는다는 것입니다. (서버 클래스는 다른 것을 필요로하기 때문에 호출 할 수 있습니다. 문제의 원인과 관련이 있다고 생각하지 않습니다.)
내가 상상 한 것처럼 내 질문에 답변 할 수 있을까요? multiprocessing
, threading
및 socketserver
이 모두 같은 기기에서 잘 작동합니까? (Queue
또는 Pipe
같은) 프로세스간에 통신하는 다른 메커니즘을 사용하는 데 관심이 없습니다. 나는 그것들과 함께 작동하는 해결책을 가지고있다. 효율성이 떨어지더라도이 방법이 가능한지 알고 싶습니다. 그리고 만약 그것이 그렇다면, 내가 뭘 잘못해서 일하는 것을 막고 있습니까?
문제를 나타내는 최소한의 예는 아래와 같다 :
가 run_it:: Port: 11111 MID: 3071227860 Size: 0
__call__:: From: 11111 To: 22222 MID: 3071227860 Size: 0 -- e00e0891e0714f99b86e9ad743731a00
finish_request:: From: 60782 To: 22222 MID: 3071227972 Size: 10 -- e00e0891e0714f99b86e9ad743731a00
"MID"는 id
이다 :이 예에서 출력 찾고
import uuid
import sys
import socket
import time
import threading
import collections
import SocketServer
import multiprocessing
class NetworkMigrator(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
def __init__(self, server_address, client_addresses, max_migrants=1):
SocketServer.TCPServer.__init__(self, server_address, None)
self.client_addresses = client_addresses
self.migrants = collections.deque(maxlen=max_migrants)
self.allow_reuse_address = True
t = threading.Thread(target=self.serve_forever)
t.daemon = True
t.start()
def finish_request(self, request, client_address):
try:
rbufsize = -1
wbufsize = 0
rfile = request.makefile('rb', rbufsize)
wfile = request.makefile('wb', wbufsize)
data = rfile.readline().strip()
self.migrants.append(data)
print("finish_request:: From: %d To: %d MID: %d Size: %d -- %s" % (client_address[1],
self.server_address[1],
id(self.migrants),
len(self.migrants),
data))
if not wfile.closed:
wfile.flush()
wfile.close()
rfile.close()
finally:
sys.exc_traceback = None
def __call__(self, random, population, args):
client_address = random.choice(self.client_addresses)
migrant_index = random.randint(0, len(population) - 1)
data = population[migrant_index]
data = uuid.uuid4().hex
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
sock.connect(client_address)
sock.send(data + '\n')
finally:
sock.close()
print(" __call__:: From: %d To: %d MID: %d Size: %d -- %s" % (self.server_address[1],
client_address[1],
id(self.migrants),
len(self.migrants),
data))
if len(self.migrants) > 0:
migrant = self.migrants.popleft()
population[migrant_index] = migrant
return population
def run_it(migrator, rand, pop):
for i in range(10):
pop = migrator(r, pop, {})
print(" run_it:: Port: %d MID: %d Size: %d" % (migrator.server_address[1],
id(migrator.migrants),
len(migrator.migrants)))
time.sleep(1)
if __name__ == '__main__':
import random
r = random.Random()
a = ('localhost', 11111)
b = ('localhost', 22222)
c = ('localhost', 33333)
am = NetworkMigrator(a, [b, c], max_migrants=11)
bm = NetworkMigrator(b, [a, c], max_migrants=22)
cm = NetworkMigrator(c, [a, b], max_migrants=33)
fun = [am, bm, cm]
pop = [["larry", "moe", "curly"], ["red", "green", "blue"], ["small", "medium", "large"]]
jobs = []
for f, p in zip(fun, pop):
pro = multiprocessing.Process(target=run_it, args=(f, r, p))
jobs.append(pro)
pro.start()
for j in jobs:
j.join()
am.shutdown()
bm.shutdown()
cm.shutdown()
인쇄의 세 가지 유형이있을 것 그 경우에 migrants
대기열이없는 경우. "From"및 "To"는 전송을 보내거나받는 포트입니다. 그리고 데이터를 임의의 16 진수 문자열로 설정하면 개별 전송을 추적 할 수 있습니다.
같은 MID라도 어느 정도 크기가 0이 아니라고 말할 것이고, 나중에 나중에 크기가 0이라고 말할 것입니다. 나는 그것이 줄기 같아야한다고 생각합니다. 호출이 다중 스레드된다는 사실로부터 이 라인 대신 최종 2 개 for
루프로 사용되는 경우, 시스템은 내가 기대하는 방식으로 작동 :
for _ in range(10):
for f, p in zip(fun, pop):
f(r, p, {})
time.sleep(1)
그래서 그것을 깨 멀티 버전 일 이죠?
우수한 sleuthing. 매우 명확한. 정말 고마워. – agarrett