나는 작업자를 생성하고, 일부 작업을 제공하고, 결과를 처리하는 기본 프로세스로 결과를 보내고, 모든 작업자를 새로운 정보로 업데이트하고 전송합니다. 그들에게 더 많은 작업. 나는 노동자들을 끝내기 전에 일정한 양의 반복을하고 싶다.다중 처리 - 작업자에게 작업 보내기, 결과 받기, 작업자 업데이트, 다시 계산
내 코드가 정상적으로 작동하는 것 같지만 모든 반복자마다 모든 작업자가 업데이트되는 것은 아닙니다. 예 : 내가받은 출력했다 :
Creating 4 workers
Finished workin 4
Finished workin 6
Finished workin 3
Finished workin 4
받을 수있는 무언가가있을 때까지 차단해야 Pipe Documentationrecv()
에 따르면. 제 예제에서 왜 이것이 사실이 아닙니까?
from multiprocessing import Process, Queue, JoinableQueue, Pipe, Event, Barrier, Value
import time
def worker(qu_task,qu_results,chi,ev,bar,val):
ext=False
dum=0
while True:
ev.wait()
while qu_task.empty() is False:
a=qu_task.get() # Pop work from queue
if a is None: #Poison pill
bar.wait() # Wait for all processes so that they all receive "None"
ext=True
qu_task.task_done() #
print("Finished workin", dum)
break
# Do stuff
time.sleep(.1)
qu_results.put(111)
qu_task.task_done()
# time.sleep(1.0)
if ext: # Break out if finished
break
data1, data2 = chi.recv() # Blocks until there its something to receive
dum+=data2
# Do stuff
def main(NProc, NScen, NTime, branch):
qu_tasks=JoinableQueue() # To use task_done() and join()
qu_results=Queue()
workers=[]
val=Value('i',0)
conn=[]
bar=Barrier(NProc)
ev=Event()
print("Creating ", NProc, " workers")
for i in range(NProc):
par,chi=Pipe()
workers.append(Process(target=worker,args=(qu_tasks,qu_results,chi,ev,bar,val)))
conn.append(par)
for w in workers:
w.start()
for t in reversed(range(NTime)):
for s in range(NScen):
a=0.0
for b in branch:
k=(b,s,t)
qu_tasks.put(k) # Send work
ev.set() # Start workers
qu_tasks.join() # Synchronize workers
ev.clear() # Reset lock
for idx,i in enumerate(branch):
a+=qu_results.get()
#print(a)
kk=[1,2,3,5]
for idx,c in enumerate(conn):
c.send(([i*(1+idx) for i in kk],1)) # Send more data to workers
for i in range(NProc): # Kill workers
qu_tasks.put(None)
ev.set()
qu_tasks.join()
for w in workers: # Wait for them to finish
w.join()
if __name__=="__main__":
branch=[i for i in range(1,7)]
NTime=2
NScen=3
main(4,NScen, NTime, branch)
나는 여전히 원하는 동작을 제공하는 솔루션을 찾을 수 없습니다. 코드 예제를 게시 할 수 있습니까? – martihj