While True 루프에서 영원히 실행되는 스레드 또는 프로세스를 만들고 싶습니다.Queue()/deque()와 통신을위한 클래스 변수 및 "poison pill"사용과 관련한 프로세스 대 스레드
다중 처리 .Queue() 또는 collections.deque() 대기열에 대한 양식으로 작업자에게 데이터를 전송하고 수신해야합니다. 나는 상당히 빠르기 때문에 collections.deque()를 사용하는 것을 선호한다.
또한 작업자를 결국 죽일 수 있어야합니다 (while 루프에서 실행 됨). 다음은 스레드, 프로세스, 대기열 및 큐의 차이점을 이해하기 위해 함께 사용한 테스트 코드입니다. 당신이 볼 수 있듯이 ..
import time
from multiprocessing import Process, Queue
from threading import Thread
from collections import deque
class ThreadingTest(Thread):
def __init__(self, q):
super(ThreadingTest, self).__init__()
self.q = q
self.toRun = False
def run(self):
print("Started Thread")
self.toRun = True
while self.toRun:
if type(self.q) == type(deque()):
if self.q:
i = self.q.popleft()
print("Thread deque: " + str(i))
elif type(self.q) == type(Queue()):
if not self.q.empty():
i = self.q.get_nowait()
print("Thread Queue: " + str(i))
def stop(self):
print("Trying to stop Thread")
self.toRun = False
while self.isAlive():
time.sleep(0.1)
print("Stopped Thread")
class ProcessTest(Process):
def __init__(self, q):
super(ProcessTest, self).__init__()
self.q = q
self.toRun = False
self.ctr = 0
def run(self):
print("Started Process")
self.toRun = True
while self.toRun:
if type(self.q) == type(deque()):
if self.q:
i = self.q.popleft()
print("Process deque: " + str(i))
elif type(self.q) == type(Queue()):
if not self.q.empty():
i = self.q.get_nowait()
print("Process Queue: " + str(i))
def stop(self):
print("Trying to stop Process")
self.toRun = False
while self.is_alive():
time.sleep(0.1)
print("Stopped Process")
if __name__ == '__main__':
q = Queue()
t1 = ProcessTest(q)
t1.start()
for i in range(10):
if type(q) == type(deque()):
q.append(i)
elif type(q) == type(Queue()):
q.put_nowait(i)
time.sleep(1)
t1.stop()
t1.join()
if type(q) == type(deque()):
print(q)
elif type(q) == type(Queue()):
while q.qsize() > 0:
print(str(q.get_nowait()))
, T1은 ThreadingTest, 또는 ProcessTest이 될 수 있습니다. 또한, 전달 큐 수 중 하나 multiprocessing.Queue 또는 collections.deque합니다. 작동 ThreadingTest
을 Queue 또는 deque() 또한 stop() 메서드가 호출 될 때 run()을 제대로 종료합니다.
Started Thread
Thread deque: 0
Thread deque: 1
Thread deque: 2
Thread deque: 3
Thread deque: 4
Thread deque: 5
Thread deque: 6
Thread deque: 7
Thread deque: 8
Thread deque: 9
Trying to stop Thread
Stopped Thread
deque([])
ProcessTest는 멀티 프로세싱 유형의 큐 인 경우에만 큐에서 읽을 수 있습니다. collections.deque에서는 작동하지 않습니다. 또한 stop()을 사용하여 프로세스를 종료 할 수 없습니다.
Process Queue: 0
Process Queue: 1
Process Queue: 2
Process Queue: 3
Process Queue: 4
Process Queue: 5
Process Queue: 6
Process Queue: 7
Process Queue: 8
Process Queue: 9
Trying to stop Process
왜 그럴까요? 또한, deque를 프로세스와 함께 사용하는 가장 좋은 방법은 무엇입니까? 그리고, 어떤 종류의 stop() 메소드를 사용하여 프로세스를 죽이는 방법은 어떻게 될까요?
나는 다음과 같은 방법으로 프로세스를 종료 할 수 있습니다 self.killQ = 대기열() : 가 다른 클래스 변수를 작성하는 대신 토룬 동안의 을, 내가 할 수있는 동안 self.killQ.empty() : 정지에 () 메서드 나는 큐에 dummy 객체를 추가 할 수 있습니다. self.killQ.put_nowait (0) 하지만 ... toRun과 같은 클래스 변수가 작동하지 않는 이유는 무엇입니까? 그것은 stop() 메소드에서 self.toRun = False의 할당과 관련이 있습니까? –
프로세스에 대한 또 다른 관찰. 프로세스 내에서 deque를 사용하여 물건을 전달할 수 있으며 정상적으로 작동합니다. 프로세스 사이에서 통신하는 데 deque를 사용할 수 없습니다 (이 예제에서는 주 ProcessTest). –
제쳐두고,'Queue'가 소비되기 전에 비어 있는지 확인하는 것은 일반적으로 나쁜 습관입니다. 여러 스레드가'Queue'에서 읽는다면 크기를 확인할 때와 실제로'get' 할 때 사이에 빈 상태가 될 수 있기 때문에 경쟁 조건에 취약합니다. 이 경우'q.get()'을 직접 호출하는 것이 나을 것 같다. 그런 식으로 당신은 끊임없이 루핑을하지 않으므로 큐에있는 것을 기다리는 동안 자식의 CPU를 사용하게됩니다. – dano