3

python 2.7을 실행하면 dev 환경은 OS X이지만 생산은 Linux입니다.OSError : 멀티 프로세싱을 사용하는 코드에서 longish 테스트 스위트를 실행할 때 열려있는 파일이 너무 많습니다.

멀티 프로세싱 속도를 높이려는 코드가 있는데 잘 작동하고 원하는 이론적 속도 향상을 관찰했습니다. 그런 다음 테스트 스위트를 실행하여 몇 가지 테스트를 거친 후 모든 후속 테스트에서 위의 OSError를 가져 오기 시작했습니다. 오류가 발생하기 시작한 시점부터 테스트를 실행하면 몇 가지 오류가 발생하고 오류가 다시 발생합니다. 상당히 논리적이며 정당성 검사입니다.

하는 것은 잘못 무슨 일이 있었는지 알아 내려고 시도하기 위해, 나는 __builtin__openclose

import __builtin__ 
import traceback 
import sys 
openfiles = set() 
oldfile = __builtin__.file 
class newfile(oldfile): 
    def __init__(self, *args): 
     self.x = args[0] 
     print "### OPENING %s ###" % str(self.x) 
     traceback.print_stack(limit=20) 
     print 
     sys.stdout.flush() 
     oldfile.__init__(self, *args) 
     openfiles.add(self) 
    def close(self): 
     print "### CLOSING %s ###" % str(self.x) 
     oldfile.close(self) 
     openfiles.remove(self) 
oldopen = __builtin__.open 
def newopen(*args): 
    return newfile(*args) 
__builtin__.file = newfile 
__builtin__.open = newopen 

(https://stackoverflow.com/a/2023709/3543200의 조언을 다음)를 인쇄하고 내가 무엇을 보았는가 사람과 통화를하지만, 수백 수백 교체 라인은 ### OPENING /dev/null ###입니다.

동일한 작업을 수행하지만 멀티 프로세싱을 수행하지 않는 코드에 대해 동일한 작업을 수행 할 때 이러한 파일 연결이 없으므로 멀티 프로세싱에 문제가있는 것입니다. 그것이 무엇을 위해,

def _bootstrap(self): 
    from . import util 
    global _current_process 

    try: 
     self._children = set() 
     self._counter = itertools.count(1) 
     try: 
      sys.stdin.close() 
      sys.stdin = open(os.devnull) 
     except (OSError, ValueError): 
      pass 
     _current_process = self 
     util._finalizer_registry.clear() 
     util._run_after_forkers() 
     util.info('child process calling self.run()') 
     try: 
      self.run() 
      exitcode = 0 
     finally: 
      util._exit_function() 
    except SystemExit, e: 
     if not e.args: 
      exitcode = 1 
     elif isinstance(e.args[0], int): 
      exitcode = e.args[0] 
     else: 
      sys.stderr.write(str(e.args[0]) + '\n') 
      sys.stderr.flush() 
      exitcode = 1 
    except: 
     exitcode = 1 
     import traceback 
     sys.stderr.write('Process %s:\n' % self.name) 
     sys.stderr.flush() 
     traceback.print_exc() 

    util.info('process exiting with exitcode %d' % exitcode) 
    return exitcode 

그리고 :

File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/process.py", line 250, in _bootstrap 
    sys.stdin = open(os.devnull) 

그것이 도움이 단지의 경우, 여기에 multiprocessing::process.py::_bootstrap 함수의 코드를 게시 : 이것은 범인이 여기에 있음을 제안하는 traceback 호출에 의해 지원됩니다 가치, 나는 코드로 멀티를 호출하고있어 다음과 같이 보이 :

num_cpus = multiprocessing.cpu_count() 
pool = multiprocessing.Pool(processes=num_cpus) 
num_per_job = len(input_data)/num_cpus + 1 
chunks = [input_data[num_per_job*i:num_per_job*(i+1)] for i in range(num_cpus)] 
# TODO: ^^^ make this a list of generators 
data = pool.map(get_output_from_input, chunks) 
return itertools.chain.from_iterable(data) 

그래서, 질문 :이 multiprocessing 버그, 또는이다 나는 끔찍한 잘못을 저질렀 는가? 나는 다음주에 multiprocessing 코드를 파고 그 작동 원리를 알아내는 핑계를 정말로 환영 할 것이다. 그러나 이것이 나의 시간의 타당한 사용이라는 상향식의 확신을 가질 수는있다. 경험을 가진 사람을 도우십시오!

+2

풀이 닫히지 않아 해당 하위 프로세스가 종료되지 않습니다. 테스트 스위트가 단일 프로세스에서 동일한 코드를 계속 호출하면 리소스가 부족합니다. – tdelaney

+0

그래, 방금 pool.close() 함수를 찾았습니다. 감사! 별도의 게시물을 만들고 싶다면 답으로 받아 들일 것입니다. – gmoss

답변

1

자식 프로세스를 종료하고 해당 프로세스와 통신하는 데 사용되는 파이프를 확보하려면 풀을 닫아야합니다. contextlib.closing으로 처리하면 닫기를 건너 뛰는 예외에 대해 걱정할 필요가 없습니다. closing은 예외가 발생하여 종료 될 때를 포함하여 with 블록의 끝에서 풀을 닫습니다. 따라서, 당신은 결코 가까이서 전화 할 필요가 없습니다.

또한 Pool.map은 요청을 청크하므로 사용자가 직접 처리 할 필요가 없습니다. 나는 약간의 코드를 제거했지만 get_output_from_input 서명이 맞지 않을 수도 있습니다 (이는 입력 항목별로 한 번만 호출됩니다). 따라서 일부 수정을해야 할 수도 있습니다.

import contextlib 
num_cpus = multiprocessing.cpu_count() 
with contextlib.closing(multiprocessing.Pool(processes=num_cpus)) as pool: 
    data = pool.map(get_output_from_input, input_data) 
    return itertools.chain.from_iterable(data) 
+0

아름다운, 고마워! – gmoss