2017-09-12 4 views
2

다음과 같은 Python 프로그램이 있습니다.파이썬 다중 처리 대기열을 사용하는 교착 상태

from multiprocessing import Lock, Process, Queue, current_process 
import time 

lock = Lock() 


def do_job(tasks_to_accomplish, tasks_that_are_done): 
    while not tasks_to_accomplish.empty(): 
     task = tasks_to_accomplish.get() 
     print(task) 
     lock.acquire() 
     tasks_that_are_done.put(task + ' is done by ' + current_process().name) 
     lock.release() 
     time.sleep(1) 
    return True 


def main(): 
    number_of_task = 10 
    number_of_processes = 4 
    tasks_to_accomplish = Queue() 
    tasks_that_are_done = Queue() 
    processes = [] 

    for i in range(number_of_task): 
     tasks_to_accomplish.put("Task no " + str(i)) 

    # creating processes 
    for w in range(number_of_processes): 
     p = Process(target=do_job, args=(tasks_to_accomplish, tasks_that_are_done)) 
     processes.append(p) 
     p.start() 


    # completing process 
    for p in processes: 
     p.join() 

    # print the output 
    while not tasks_that_are_done.empty(): 
     print(tasks_that_are_done.get()) 

    return True 


if __name__ == '__main__': 
    main() 

때로는 프로그램이 완벽하게 실행되지만 때로는 멈추고 완료되지 않을 수도 있습니다. 수동으로 종료하면 다음과 같은 오류가 발생합니다.

$ python3 multiprocessing_example.py 
Task no 0 
Task no 1 
Task no 2 
Task no 3 
Task no 4 
Task no 5 
Task no 6 
Task no 7 
Task no 8 
Task no 9 
^CProcess Process-1: 
Traceback (most recent call last): 
    File "multiprocessing_example.py", line 47, in <module> 
    main() 
    File "multiprocessing_example.py", line 37, in main 
    p.join() 
    File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/process.py", line 121, in join 
    res = self._popen.wait(timeout) 
    File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/popen_fork.py", line 51, in wait 
    return self.poll(os.WNOHANG if timeout == 0.0 else 0) 
    File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/popen_fork.py", line 29, in poll 
    pid, sts = os.waitpid(self.pid, flag) 
KeyboardInterrupt 
Traceback (most recent call last): 
    File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/process.py", line 249, in _bootstrap 
    self.run() 
    File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/process.py", line 93, in run 
    self._target(*self._args, **self._kwargs) 
    File "multiprocessing_example.py", line 9, in do_job 
    task = tasks_to_accomplish.get() 
    File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/queues.py", line 94, in get 
    res = self._recv_bytes() 
    File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/connection.py", line 216, in recv_bytes 
    buf = self._recv_bytes(maxlength) 
    File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/connection.py", line 407, in _recv_bytes 
    buf = self._recv(4) 
    File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/connection.py", line 379, in _recv 
    chunk = read(handle, remaining) 
KeyboardInterrupt 

누군가가 프로그램의 문제점을 말해 줄 수 있습니까? 나는 파이썬 3.6을 사용하고있다.

답변

1

: LockQueue 주위에 필요하지 않습니다.

lock.acquire() 
    tasks_that_are_done.put(task + ' is done by ' + current_process().name) 
    lock.release() 

Queue
이 모듈 큐 클래스는 모든 필요한 잠금 시맨틱을 구현한다.


질문 : ... 프로그램에 문제가 무엇입니까? empty() 국가가 도달 한 get() 때까지 변경하지 않는 것이 보장이 없기 때문에

당신은 join()를 호출에 교착 상태Queue.empty()Queue.get(), 같은 리드를 사용하고 있습니다. 경향이

교착 상태 :

대신 empty/get를 사용
while not tasks_to_accomplish.empty(): 
    task = tasks_to_accomplish.get() 

, 쌍 예를 들어 사용

import queue 
while True: 
    try: 
     task = tasks_to_accomplish.get_nowait() 
    except queue.Empty: 
     break 
    else: 
     # Handle task here 
     ... 
     tasks_to_accomplish.task_done() 
+0

감사를 업데이트를 위해, 당신은 내 문제를 해결. – Pankaj