2017-10-14 6 views
1

입력을 받아 처리하는 일부 프로세스를 설정하려고합니다.이 결과는 처리해야하는 또 다른 작업입니다. 본질적으로 각 작업은 0 또는 여러 개의 새 작업 (동일한 유형의 작업)을 초래하므로 결국 모든 작업에서 새로운 작업이 생성되지 않습니다.모든 프로세스가 대기열에서 가져 오려고하고 대기열이 비어있을 때 처리를 종료합니까?

대기열이 좋을 것으로 생각하여 입력 대기열과 결과 대기열을 사용하여 새로운 결과가없는 작업을 추가했습니다. 언제든지 대기열이 비어있을 수 있지만 다른 프로세스가 작업을하고 있으면 대기열이 추가 될 수 있습니다.

따라서 모든 프로세스가 동시에 입력 큐에서 가져 오려고 할 때만 끝내기를 원합니다.

저는 파이썬 다중 프로세싱과 일반적으로 멀티 프로세싱에 완전히 새로운 것입니다.

class Consumer(Process): 
    def __init__(self, name): 
     super().__init__(name=name) 

    def run(): 
     # This is where I would have the task try to get a new task off of the 
     # queue and then calculate the results and put them into the queue 
     # After which it would then try to get a new task and repeat 

     # If this an all other processes are trying to get and the queue is 
     # empty That is the only time I know that everything is complete and can 
     # continue 
     pass 

def start_processing(): 
    in_queue = Queue() 
    results_queue = Queue() 
    consumers = [Consumer(str(i)) for i in range(cpu_count())] 

    for i in consumers: 
     i.start() 

    # Wait for the above mentioned conditions to be true before continuing 
+0

는 난 단지 절대 골격 코드가 있습니다. 기본적으로 프로세스와 대기열을 만들었습니다. –

+0

몇 가지 개요 코드를 추가했습니다. –

답변

1

JoinableQueue이 목적에 맞게 설계되었습니다 :

편집은 내가 무슨 뜻인지에 대한 기본적인 개요를 추가 할 수 있습니다. JoinableQueue에 참여하면 진행중인 작업이있을 때까지 차단됩니다.

다음과 같이 사용할 수 있습니다. 주 프로세스는 특정 양의 작업자 프로세스를 할당하여 JoinableQueue을 할당합니다. 작업자 프로세스는 대기열을 사용하여 새 작업을 생성하고 사용합니다. 주 프로세스는 더 이상 작업이 진행될 때까지 대기열에 참여하여 대기합니다. 그런 다음 작업 프로세스를 종료하고 종료합니다.

아주 간단한 예 (의사) :이 내가 앞으로 이동하기 전에 해결해야 할 일처럼 보이기 때문에이 시점에서

def consumer(queue): 
    for task in queue.get(): 
     results = process_task(task) 

     if 'more_tasks' in results: 
      for new_task in results['more_tasks']: 
       queue.put(new_task) 

     # signal the queue that a task has been completed 
     queue.task_done() 

def main(): 
    queue = JoinableQueue() 

    processes = start_processes(consumer, queue) 

    for task in initial_tasks: 
     queue.put(task) 

    queue.join() # block until all work is done 

    terminate_processes(processes)