2017-12-11 27 views
0

Pool.apply_async를 사용하여 호출 된 함수에 Queue 객체를 전달하면 ApplyResult.successful()으로 표시되고 인쇄 출력이없는 것처럼 함수가 실패합니다. 이 경우 함수의 본문이 전혀 실행되지 않는 것처럼 보입니다.Queue를 전달하면 Pool.apply_async를 호출 할 때이 함수가 실패하는 이유는 무엇입니까?

대기열을 사용하여 별도의 프로세스에서 결과 집합을 suggested by the multiprocessing documentation으로 동기화하려고했지만 실제로이 기능에서 실제로 사용되지는 않아도 대기열이 실패합니다.

from multiprocessing import Pool, Queue 
import time 
from random import randint 

def sample_function(name, results): 
    delay_ms = randint(1, 10) 
    print ("{} starting with delay {:d}".format(name, int(delay_ms))) 
    time.sleep(delay_ms) 
    # results argument is unused! 
    #results.put("{} result".format(name)) 
    print ("{} ending".format(name)) 

resultsQueue = Queue() 
jobs = ['one','two','three','four', 'five', 'six'] 

pool = Pool(processes=4) 
# fails 
jobStatuses = [pool.apply_async(sample_function, args=(job, resultsQueue)) for job in jobs] 
# succeeds 
#jobStatuses = [pool.apply_async(sample_function, args=(job,'works with string argument')) for job in jobs] 

pool.close() 
print('closing: no more tasks') 
pool.join() 

for status in jobStatuses: 
    print (status.ready(), status.successful()) 

while not resultsQueue.empty(): 
    print(resultsQueue.get()) 
print('All finished') 

나는 Pool.apply_async없이 동일한 기능을 호출 할 수 있으며 성공 : sample_function('test without pool', resultsQueue)합니다. 또한 문자열로 Pool.apply_async 함수를 호출 할 수 있으며 성공합니다.

답변

3

apply_async 전화에 RuntimeError 전화가 걸려 있고 multiprocessing.Queue 전화가 끊어집니다.

for status in job_statuses: 
    print(status.__dict__) 

출력 :
코드를 변경 조금 나는 그것을 추적 할 수 있었다

{ '_value'RuntimeError에 ('큐 객체는 상속을 통해 프로세스간에 공유되어야한다 '),'_success '거짓'_callback '없음'_cache '{}'_job '0'_error_callback '없음'_event '}

X6 시간.

프로세스간에 공유 할 수있는 Manager().Queue()을 사용하면 해결됩니다.