2

While True 루프에서 영원히 실행되는 스레드 또는 프로세스를 만들고 싶습니다.Queue()/deque()와 통신을위한 클래스 변수 및 "poison pill"사용과 관련한 프로세스 대 스레드

다중 처리 .Queue() 또는 collections.deque() 대기열에 대한 양식으로 작업자에게 데이터를 전송하고 수신해야합니다. 나는 상당히 빠르기 때문에 collections.deque()를 사용하는 것을 선호한다.

또한 작업자를 결국 죽일 수 있어야합니다 (while 루프에서 실행 됨). 다음은 스레드, 프로세스, 대기열 및 큐의 차이점을 이해하기 위해 함께 사용한 테스트 코드입니다. 당신이 볼 수 있듯이 ..

import time 
from multiprocessing import Process, Queue 
from threading import Thread 
from collections import deque 

class ThreadingTest(Thread): 

    def __init__(self, q): 
     super(ThreadingTest, self).__init__() 
     self.q = q 
     self.toRun = False 

    def run(self): 
     print("Started Thread") 
     self.toRun = True 
     while self.toRun: 
      if type(self.q) == type(deque()): 
       if self.q: 
        i = self.q.popleft() 
        print("Thread deque: " + str(i)) 
      elif type(self.q) == type(Queue()): 
       if not self.q.empty(): 
        i = self.q.get_nowait() 
        print("Thread Queue: " + str(i)) 

    def stop(self): 
     print("Trying to stop Thread") 
     self.toRun = False 
     while self.isAlive(): 
      time.sleep(0.1) 
     print("Stopped Thread") 

class ProcessTest(Process): 

    def __init__(self, q): 
     super(ProcessTest, self).__init__() 
     self.q = q 
     self.toRun = False 
     self.ctr = 0 

    def run(self): 
     print("Started Process") 
     self.toRun = True 
     while self.toRun: 
      if type(self.q) == type(deque()): 
       if self.q: 
        i = self.q.popleft() 
        print("Process deque: " + str(i)) 
      elif type(self.q) == type(Queue()): 
       if not self.q.empty(): 
        i = self.q.get_nowait() 
        print("Process Queue: " + str(i)) 

    def stop(self): 
     print("Trying to stop Process") 
     self.toRun = False 
     while self.is_alive(): 
      time.sleep(0.1) 
     print("Stopped Process") 

if __name__ == '__main__': 
    q = Queue() 
    t1 = ProcessTest(q) 
    t1.start() 

    for i in range(10): 
     if type(q) == type(deque()): 
      q.append(i) 
     elif type(q) == type(Queue()): 
      q.put_nowait(i) 
     time.sleep(1) 
    t1.stop() 
    t1.join() 

    if type(q) == type(deque()): 
     print(q) 
    elif type(q) == type(Queue()): 
     while q.qsize() > 0: 
      print(str(q.get_nowait())) 

, T1은 ThreadingTest, 또는 ProcessTest이 될 수 있습니다. 또한, 전달 큐 수 중 하나 multiprocessing.Queue 또는 collections.deque합니다. 작동 ThreadingTest

을 Queue 또는 deque() 또한 stop() 메서드가 호출 될 때 run()을 제대로 종료합니다.

Started Thread 
Thread deque: 0 
Thread deque: 1 
Thread deque: 2 
Thread deque: 3 
Thread deque: 4 
Thread deque: 5 
Thread deque: 6 
Thread deque: 7 
Thread deque: 8 
Thread deque: 9 
Trying to stop Thread 
Stopped Thread 
deque([]) 

ProcessTest는 멀티 프로세싱 유형의 큐 인 경우에만 큐에서 읽을 수 있습니다. collections.deque에서는 작동하지 않습니다. 또한 stop()을 사용하여 프로세스를 종료 할 수 없습니다.

Process Queue: 0 
Process Queue: 1 
Process Queue: 2 
Process Queue: 3 
Process Queue: 4 
Process Queue: 5 
Process Queue: 6 
Process Queue: 7 
Process Queue: 8 
Process Queue: 9 
Trying to stop Process 

왜 그럴까요? 또한, deque를 프로세스와 함께 사용하는 가장 좋은 방법은 무엇입니까? 그리고, 어떤 종류의 stop() 메소드를 사용하여 프로세스를 죽이는 방법은 어떻게 될까요?

+0

나는 다음과 같은 방법으로 프로세스를 종료 할 수 있습니다 self.killQ = 대기열() : 가 다른 클래스 변수를 작성하는 대신 토룬 동안의 을, 내가 할 수있는 동안 self.killQ.empty() : 정지에 () 메서드 나는 큐에 dummy 객체를 추가 할 수 있습니다. self.killQ.put_nowait (0) 하지만 ... toRun과 같은 클래스 변수가 작동하지 않는 이유는 무엇입니까? 그것은 stop() 메소드에서 self.toRun = False의 할당과 관련이 있습니까? –

+0

프로세스에 대한 또 다른 관찰. 프로세스 내에서 deque를 사용하여 물건을 전달할 수 있으며 정상적으로 작동합니다. 프로세스 사이에서 통신하는 데 deque를 사용할 수 없습니다 (이 예제에서는 주 ProcessTest). –

+0

제쳐두고,'Queue'가 소비되기 전에 비어 있는지 확인하는 것은 일반적으로 나쁜 습관입니다. 여러 스레드가'Queue'에서 읽는다면 크기를 확인할 때와 실제로'get' 할 때 사이에 빈 상태가 될 수 있기 때문에 경쟁 조건에 취약합니다. 이 경우'q.get()'을 직접 호출하는 것이 나을 것 같다. 그런 식으로 당신은 끊임없이 루핑을하지 않으므로 큐에있는 것을 기다리는 동안 자식의 CPU를 사용하게됩니다. – dano

답변

4

collections.deque은 프로세스를 인식하지 못하기 때문에 collections.deque 두 개의 multiprocessing.Process 인스턴스간에 데이터를 전달할 수 없습니다. multiprocessing.Queue은 내용을 내부적으로 multiprocessing.Pipe에 기록합니다. 즉, 내부의 데이터를 한 번 프로세스에서 대기열에 추가하고 다른 프로세스에서 검색 할 수 있습니다. collections.deque에는 그런 종류의 배관이 없으므로 작동하지 않습니다. 한 프로세스에서 deque에 쓸 때 다른 프로세스의 deque 인스턴스는 전혀 영향을받지 않습니다. 그들은 완전히 별개의 인스턴스입니다.

stop() 방법에도 비슷한 문제가 발생합니다. 주 프로세스에서 toRun의 값을 변경하고 있지만 이는 하위에 전혀 영향을 미치지 않습니다. 그것들은 완전히 별개의 인스턴스입니다. 아이를 끝내는 가장 좋은 방법은 Queue에 약간의 감시 신호를 보내는 것입니다. 당신이 아이의 감시를받을 때, 무한 루프의 탈옥 :

def run(self): 
    print("Started Process") 
    self.toRun = True 
    while self.toRun: 
     if type(self.q) == type(deque()): 
      if self.q: 
       i = self.q.popleft() 
       print("Process deque: " + str(i)) 
     elif type(self.q) == type(Queue()): 
      if not self.q.empty(): 
       i = self.q.get_nowait() 
       if i is None: 
        break # Got sentinel, so break 
       print("Process Queue: " + str(i)) 

def stop(self): 
    print("Trying to stop Process") 
    self.q.put(None) # Send sentinel 
    while self.is_alive(): 
     time.sleep(0.1) 
    print("Stopped Process") 

편집 : 당신은 실제로 두 과정 사이에 deque 의미를해야하는 경우

, 당신은을 만들기 위해 custom multiprocessing.Manager()을 사용할 수 있습니다 Manager 과정에서 deque을 공유하고 Process 각각의 인스턴스는에 Proxy을 얻을 것이다 :

import time 
from multiprocessing import Process 
from multiprocessing.managers import SyncManager 
from collections import deque 

SyncManager.register('deque', deque) 

def Manager(): 
    m = SyncManager() 
    m.start() 
    return m 

class ProcessTest(Process): 
    def __init__(self, q): 
     super(ProcessTest, self).__init__() 
     self.q = q 
     self.ctr = 0 

    def run(self): 
     print("Started Process") 
     self.toRun = True 
     while self.toRun: 
      if self.q._getvalue(): 
       i = self.q.popleft() 
       if i is None: 
        break 
       print("Process deque: " + str(i)) 

    def stop(self): 
     print("Trying to stop Process") 
     self.q.append(None) 
     while self.is_alive(): 
      time.sleep(0.1) 
     print("Stopped Process") 

if __name__ == '__main__': 
    m = Manager() 
    q = m.deque() 
    t1 = ProcessTest(q) 
    t1.start() 

    for i in range(10): 
     q.append(i) 
     time.sleep(1) 
    t1.stop() 
    t1.join() 

    print(q) 

deque에 액세스 할 때마다 IPC 비용이 발생하기 때문에 이것은 아마도 multiprocessing.Queue보다 빠르지 않을 것입니다. 또한 메시지를 전달하는 데있어 자연스러운 데이터 구조가 아닙니다.

+0

이것이 내가 알아야 할 필요가있는 것입니다. 감사! –