2017-12-03 2 views
0

대기열이 비어 있고 모든 스레드가 이미 작업 처리를 완료했는지 (예 : task_done()이 실행되었는지) 확인하는 옵션이 있습니까? 두 조건이 모두 충족되는 경우에만 추가 작업을 추가하고 싶습니다.Python - 대기열의 모든 작업이 완료되었는지 확인

나는 더 많은 작업을 추가 할 수 있기를 원하며 사용하지 않은 스레드를 종료하고 activeCount()를 사용할 수 없습니다. 또한 나는 실행의 진행 상황을 적극적으로 모니터링 할 수 있기를 원하기 때문에 대기열에 가입()하려고하지 않습니다. 여기

는 예제 코드입니다 :

from Queue import Queue 
from threading import Thread 
import time 

queue = Queue() 

def my_method(queue): 
    while True: 
     task = queue.get() 

     time.sleep((task + 2) * 3) 

     queue.task_done() 

num_queue_threads = 2 
queue_threads = [None] * num_queue_threads 

for i in range(num_queue_threads): 
    queue_threads[i] = Thread(target=my_method, args=(queue,)) 
    queue_threads[i].setDaemon(True) 
    queue_threads[i].start() 

for task in range(3): 
    queue.put(task) 

#queue.join() #need to wait actively 

while True: 
    print("queue.qsize(): {}, queue.empty(): {}".format(queue.qsize(), queue.empty())) 

    time.sleep(1) 

큐가 즉시 마지막 작업의 실행이 시작되면 비어 있습니다.

답변

0

여기에는 공용 인터페이스가 없습니다. 누군가 큐에서 미완성 된 태스크를 추적하기 위해 사용하는 내부 속성을 파고 드는 취약한 솔루션을 게시 할 것이지만, 그렇게하지 마십시오. 이 속성은 문서화 된 API의 일부가 아니며 향후 버전에서 이름을 변경하거나 다시 디자인 할 수 있습니다.

작업 완료를 직접 추적하십시오.

from Queue import Queue 
from threading import Thread 
import time 

task_queue = Queue() 
completion_queue = Queue() 

def my_method(in_queue, out_queue): 
    while True: 
     task = in_queue.get() 
     time.sleep((task + 2) * 3) 
     in_queue.task_done() 

     # Send completion message 
     out_queue.put(task) 

num_queue_threads = 2 
queue_threads = [None] * num_queue_threads 

for i in range(num_queue_threads): 
    queue_threads[i] = Thread(target=my_method, args=(task_queue, completion_queue)) 
    queue_threads[i].setDaemon(True) 
    queue_threads[i].start() 

for task in range(3): 
    task_queue.put(task) 

for _ in range(3): 
    completion_queue.get() 
    completion_queue.task_done() 
    print("One task done!") 

print("All done!") 
+0

을 : 그것은 할당 된 작업의 수와 같은 메시지의 번호를받은 때까지 하나의 옵션은 노동자가 지휘자에 ​​"작업 완료"메시지를 보낼 수있는 별도의 큐 및 오케 스트레이터 대기를하는 것입니다 여기서 유일한 문제는 completion_queue.get()이 소요되는 시간을 제어 할 수 없으므로 정기 활동을 수행하지 못할 수도 있다는 것입니다. 그러나 적어도 미완성 된 작업을 추적하는 다른 권장 방법은 없다는 것을 알고 있습니다. 아마 대신 SQL 대신 sqlite에서 작업 상태를 사용하도록 논리를 전환 할 것입니다 (이미이 메타 데이터를 저장하기 위해이 db를 사용함). 감사! –

+0

또한 completion_queue.get()을 타임 아웃과 함께 실행하고, 빈 예외를 무시하고, 필요한 정기 활동을 실행하고 다음 작업을 기다리는 것으로 돌아갈 수 있습니다. –