0

func의 생성 결과를 pool.apply_async() 메쏘드에 넣고 싶다면 모든 것이 잘되어있는 것처럼 보이지만 오류는 많이 혼란스러워합니다.대기열이 apply_async func에 추가 될 때 멀티 프로세싱의 이상한 동작이 발생합니까?

제 목적은 여러 개의 비동기 프로듀서 (여기서는 올바르지 않을 수 있음)와 여러 소비자를 만드는 것입니다.

> Process PoolWorker-1: Process PoolWorker-2: Traceback (most recent 
> call last): Traceback (most recent call last): File 
> "/usr/lib/python2.7/multiprocessing/process.py", line 258, in 
> _bootstrap File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap 
>  self.run() 
>  self.run() File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run File 
> "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run 
>  self._target(*self._args, **self._kwargs) 
>  self._target(*self._args, **self._kwargs) File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker 
> File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker 
>  task = get() 
>  task = get() File "/usr/lib/python2.7/multiprocessing/queues.py", line 376, in get 
> File "/usr/lib/python2.7/multiprocessing/queues.py", line 376, in get 
>  return recv() 
>  return recv() AttributeError: 'module' object has no attribute 'fetch_page' AttributeError: 'module' object has no attribute 
> 'fetch_page' start 0 thread.. start 1 thread.. 

나는 그것이 매우 이상한 this answer을 읽을 수는 있지만 발견하고 this answer 내 우분투 시스템에서 작동하지 않습니다

from multiprocessing import Pool 
import multiprocessing 
from threading import Thread 

from six.moves import xrange 
pool = Pool(processes=2, maxtasksperchild=1000) 


# resp_queue = multiprocessing.Queue(1000) 
manager = multiprocessing.Manager() 
resp_queue = manager.Queue() 

rang = 10000 


def fetch_page(url): 
    resp_queue.put(url) 


def parse_response(): 
    url = resp_queue.get() 
    print(url) 

r_threads = [] 


def start_processing(): 
    for i in range(2): 
     r_threads.append(Thread(target=parse_response)) 
     print("start %s thread.." % i) 
     r_threads[-1].start() 


urls = map(lambda x: "this is url %s" % x, xrange(rang)) 
for i in xrange(rang): 
    pool.apply_async(fetch_page, (urls[i],)) 

start_processing() 

pool.close() 
pool.join() 

오류가 있음을 읽

은 여기 내 장난감 예이다.

모든 의견을 보내 주시면 감사하겠습니다. 매우 감사합니다.

+2

이 작업을 정의있어 함수를 미리 선언해야합니다. – georgexsh

+0

@scriptboy 및 georgexsh 당신 말이 맞습니다! [풀이를 만들 때 현재 프로세스를 포크로 만들어서 작업자를 만듭니다]라는 [이 답변] (https://stackoverflow.com/a/18947948/3552975)에서 알 수 있습니다. – lerner

답변

1

아래 코드를 살펴보십시오. 나는 당신의 버전에 대한 변경 사항 : 그것이 반복 가능한을 얻고 분할 멋지게 노동자 사이에 작업 할

  • 내가 대신 applymap를 사용하고 있습니다.
  • parse_resp 함수 (지금은 get_url)에 while 루프를 추가하여 각 스레드가 대기열에서 소모되는 값을 얻을 수 있도록했습니다.
  • 풀 인스턴스화 & 호출은 __name__ == '__main__' 이후입니다. 파이썬 다중 처리에 필요한 윈도우 해킹입니다 (제가 아는 한 많이 우분투에 잘못 될 수도 있습니다).

    시작 0 스레드 ...
    시작 1 개 실 ..
    스레드 : 멀티 가져 오기 풀에서

수입은 수입 스레드에게

manager = multiprocessing.Manager() 
url_queue = manager.Queue() 

rang = 10000 


def put_url(url): 
    url_queue.put(url) 


def get_url(thread_id): 
    while not url_queue.empty(): 
     print('Thread {0} got url {1}'.format(str(thread_id), url_queue.get())) 


r_threads = [] 


def start_threading(): 
    for i in range(2): 
     r_threads.append(Thread(target=get_url, args=(i,))) 
     print("start %s thread.." % i) 
     r_threads[-1].start() 
    for i in r_threads: 
     i.join() 


urls = ["url %s" % x for x in range(rang)] 


if __name__ == '__main__': 
    pool = Pool(processes=2, maxtasksperchild=1000) 
    pool.map_async(put_url, urls) 
    start_threading() 
    pool.close() 
    pool.join() 

인쇄 스레딩에서 멀티 프로세싱 0 개의 URL이 있습니다.
스레드 0 URL을 가지고 1
스레드 1 URL이
스레드 0 가지고있어 URL 3
스레드 0 얻었다 URL 4
스레드 1 얻었다 URL 5
스레드 0 URL을 6

+0

대단히 감사합니다. – lerner