2017-03-08 3 views
0

내가 파이썬에서 멀티 프로세싱 이해하려고 노력하고 있지만 현재 다음이 고민하고있는 큐를 확장 발전기 기능을 대기열로 변환 한 다음 작업자가 소비합니다. 이것은 잘 작동하지만, 이제 직원들이 대기열에 작업을 추가 할 수 있도록 프로그램을 확장하려고합니다. 그러나 이것은 첫 번째 루프에서 추가 한 작업이 두 번째 루프에 추가 된 정지 코드 바로 뒤에 오르기 때문에 문제가있는 부분입니다 (예제 코드 참조). 작업자가 추가 한 모든 추가 작업이 절대로 실행되지 않는다는 것을 의미합니다 ...는시키는 노동자

대기열이 비어 있고 작업자 중 아무 것도 수행하지 않는 경우에만 확인하는 방법이 필요하다고 생각합니다. 마지막으로 for 루프를 계속하기 전에 작업자를 중지시킵니다. 그러나 나는이 일을하는 노동자의 상태를 확인하는 방법을 모른다. 내가 원하는 것을 구현하기 위해 관리 카운팅 세마포어 값을 사용

import multiprocessing, time, random 

def f(queue): 
    worker_name = multiprocessing.current_process().name 
    print "Started: {}".format(worker_name) 

    while True: 
     value = queue.get() 
     if value is None: 
      break 

     print "{} is processing '{}'".format(worker_name, value) 
     # compute(value) 
     time.sleep(1) 

     # Worker may add additional work to queue 
     if random.random() > 0.7: 
      queue.put("Extra work!") 

    print "Stopping: {}".format(worker_name) 


n_workers = 4 
queue = multiprocessing.Queue() 
pool = multiprocessing.Pool(n_workers, f, (queue,)) 

# Feed large objects from generator 
for i in xrange(20): 
    queue.put(i) 

# All extra work is skipped 

# Terminate workers after finishing work 
for __ in xrange(n_workers): 
    queue.put(None) 

pool.close() 
pool.join() 

print "Finished!" 
print queue.get() # Will yield 'Extra Work!' should be empty 

답변

0

:

최소한의 코드 예제를 표시합니다. 이 값을 증가/감소시켜 각 작업자의 작업을 추적하고 대기열이 비어있는 경우 즉시 작업자가 더 이상 아무 것도 처리하지 않습니다.

모든 조언을 주시면 감사하겠습니다.

예제 코드 :

import multiprocessing, time, random 

def f(queue, semaphore): 
    worker_name = multiprocessing.current_process().name 
    print "Started: {}".format(worker_name) 

    while True: 
     value = queue.get() 
     if value is None: 
      break 

     with semaphore.get_lock(): 
      semaphore.value -= 1 

     print "{} is processing '{}'".format(worker_name, value) 
     # compute(value) 
     time.sleep(1) 

     # Worker may add additional work to queue 
     if random.random() > 0.7: 
      queue.put("Extra work!") 

     with semaphore.get_lock(): 
      semaphore.value += 1 

    print "Stopping: {}".format(worker_name) 


n_workers = 4 
semaphore = multiprocessing.Value('i', n_workers) 
queue = multiprocessing.Queue() 
pool = multiprocessing.Pool(n_workers, f, (queue, semaphore)) 

# Feed large objects from generator 
for i in xrange(20): 
    queue.put(i) 

while not queue.empty() or semaphore.value != n_workers: 
    time.sleep(0.2) 

# Terminate workers after finishing work 
for __ in xrange(n_workers): 
    queue.put(None) 

pool.close() 
pool.join() 

print "Finished!" 
print queue.empty() # True