2017-05-09 4 views
1

나는 작업자를 생성하고, 일부 작업을 제공하고, 결과를 처리하는 기본 프로세스로 결과를 보내고, 모든 작업자를 새로운 정보로 업데이트하고 전송합니다. 그들에게 더 많은 작업. 나는 노동자들을 끝내기 전에 일정한 양의 반복을하고 싶다.다중 처리 - 작업자에게 작업 보내기, 결과 받기, 작업자 업데이트, 다시 계산

내 코드가 정상적으로 작동하는 것 같지만 모든 반복자마다 모든 작업자가 업데이트되는 것은 아닙니다. 예 : 내가받은 출력했다 :

Creating 4 workers 
Finished workin 4 
Finished workin 6 
Finished workin 3 
Finished workin 4 

받을 수있는 무언가가있을 때까지 차단해야 Pipe Documentationrecv()에 따르면. 제 예제에서 왜 이것이 사실이 아닙니까?

from multiprocessing import Process, Queue, JoinableQueue, Pipe, Event, Barrier, Value 
import time 

def worker(qu_task,qu_results,chi,ev,bar,val): 
    ext=False 
    dum=0 
    while True: 
     ev.wait() 
     while qu_task.empty() is False: 
      a=qu_task.get() # Pop work from queue 
      if a is None: #Poison pill 
       bar.wait() # Wait for all processes so that they all receive "None" 
       ext=True 
       qu_task.task_done() # 
       print("Finished workin", dum) 
       break 

      # Do stuff 
      time.sleep(.1) 
      qu_results.put(111) 
      qu_task.task_done() 

#  time.sleep(1.0) 

     if ext: # Break out if finished 
      break 

     data1, data2 = chi.recv() # Blocks until there its something to receive 
     dum+=data2 
     # Do stuff 

def main(NProc, NScen, NTime, branch): 
    qu_tasks=JoinableQueue() # To use task_done() and join() 
    qu_results=Queue() 
    workers=[] 
    val=Value('i',0) 
    conn=[] 
    bar=Barrier(NProc) 
    ev=Event() 
    print("Creating ", NProc, " workers") 
    for i in range(NProc): 
     par,chi=Pipe() 
     workers.append(Process(target=worker,args=(qu_tasks,qu_results,chi,ev,bar,val))) 
     conn.append(par) 

    for w in workers: 
     w.start() 

    for t in reversed(range(NTime)): 
     for s in range(NScen): 
      a=0.0 
      for b in branch: 
       k=(b,s,t) 
       qu_tasks.put(k) # Send work 

      ev.set() # Start workers 
      qu_tasks.join() # Synchronize workers 
      ev.clear() # Reset lock 
      for idx,i in enumerate(branch): 
       a+=qu_results.get() 
      #print(a) 

      kk=[1,2,3,5] 
      for idx,c in enumerate(conn): 
       c.send(([i*(1+idx) for i in kk],1)) # Send more data to workers 

    for i in range(NProc): # Kill workers 
     qu_tasks.put(None) 
    ev.set() 
    qu_tasks.join() 

    for w in workers: # Wait for them to finish 
     w.join() 

if __name__=="__main__": 
    branch=[i for i in range(1,7)] 
    NTime=2 
    NScen=3 

    main(4,NScen, NTime, branch) 

답변

0

Pipe.recv() 대기 중이므로 걱정하지 마십시오. None 때문에 루프가 빨리 종료되므로 6 회 호출하지 않습니다. ,

는 비동기 프로그래밍이 아니라, 비동기 기억하고 통신 + 당신이 주변에 그 time.sleep(.1)를 이동하는 경우 근로자가 Pipe.recv()

에 도달하지 않고 "다음"배치에서 소비하는 메시지를 시작 처리 패턴 등의 일정이있다 당신은 "완성 된 workin 0"도 도달 할 수 있습니다. 당신이 (Joinable)Queue.put() 루프를 늦출 경우

또한, 작업 대기열이 일시적으로 빈 중간 루프를 얻을 때 당신은 빨리 당신이 모든 작업을 파견 것보다, 상황에 도달 할 수 있습니다, 그래서 노동자들은 Pipe.recv()하지만 주에 도달 할 것 프로그램이 대기 할 것입니다. JoinableQueue.join()

제안 : 파이프 또는 (결합 가능한) 대기열을 선택하지만 둘 다 선택하지 마십시오. 그것들을 연동시키는 것은 어렵고 이익을 가져 오지 않습니다.

+0

나는 여전히 원하는 동작을 제공하는 솔루션을 찾을 수 없습니다. 코드 예제를 게시 할 수 있습니까? – martihj