0

내가 concurrent.futures.ProcessPoolExecutor 꽤 평범한 구현을 처리 _get_records_save_output 방법은 다양의파이썬 concurrent.futures (파이썬 3.6을 사용하여) 같은 것을 아이 취급 예외

def process(self, file): 
    log.debug(f"Processing source file {file.name}.") 
    with DBConnection(self.db_url) as session: 
     file = session.merge(file) 
     session.refresh(file) 
     self._set_file(file) 
     timer = perf_counter() 
     try: 
      self.records = self._get_records() 
      self._save_output() 
     except Exception as ex: 
      log.warning(f"Failed to process source file {file.ORIGINAL_NAME}: {ex}") 
      self.error_time = time.time() 
      self.records = None 
     else: 
      process_duration = perf_counter() - timer 
      log.info(f'File {file.name} processed in {process_duration:.6f} seconds.') 
      file.process_duration = process_duration 
     session.commit() 

: 구현 가능한 프로세서 클래스, 그들은 모두 같은 약 보이는 process 방법을 공유 클래스 당하지만 내 문제는 오류 처리입니다. 의도적으로 두 가지 방법 중 하나에서 메모리가 부족해 지도록 테스트하고 있지만 위의 블록 except이 그것을 잡아서 다음 파일을 옮길 것으로 기대합니다. 그리고 이것은 코드를 실행할 때 정확히 어떻게됩니까? 단일 프로세스.

전술 한 바와 같이 나는 ProcessPoolExecutor를 사용하는 경우

, 그것은 BrokenProcessPool 예외가 발생하고 모든 실행 죽이기 :

Traceback (most recent call last): 
    File "/vagrant/myapp/myapp.py", line 94, in _process 
    list(executor.map(processor.process, files)) 
    File "/home/ubuntu/.pyenv/versions/3.6.3/lib/python3.6/concurrent/futures/process.py", line 366, in _chain_from_iterable_of_lists 
    for element in iterable: 
    File "/home/ubuntu/.pyenv/versions/3.6.3/lib/python3.6/concurrent/futures/_base.py", line 586, in result_iterator 
    yield fs.pop().result() 
    File "/home/ubuntu/.pyenv/versions/3.6.3/lib/python3.6/concurrent/futures/_base.py", line 432, in result 
    return self.__get_result() 
    File "/home/ubuntu/.pyenv/versions/3.6.3/lib/python3.6/concurrent/futures/_base.py", line 384, in __get_result 
    raise self._exception 
concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending. 

나는 물론 호출 코드에 BrokenProcessPool를 잡을 수 있지만, 나는 오류를 처리하는 것을 선호 것 내부적으로 다음 파일로 진행합니다.

는 또한 다음과 같이 표준 multiprocessing.Pool 객체를 사용하여 시도 :

이때
with multiprocessing.Pool() as pool: 
    pool.map(processor.process, files) 

가 동작에도 괴상이다 메모리 부족 오류를 발생 제 두 파일을 처리 시작 후, 더 작은 파일을 처리하는 것으로 넘어 가서 완전히 처리됩니다. 그러나 except 블록은 결코 트리거되지 않으며 (로그 메시지 없음, error_time) 응용 프로그램이 수동으로 종료 될 때까지 멈추거나 아무 것도하지 않고 중단됩니다.

try..except 블록이 각 프로세스를 독립적으로 만들어 주 응용 프로그램에 영향을 미치지 않고 자체 오류를 처리하기를 바랬습니다. 어떤 아이디어를 달성하는 방법?

+1

내부의'process' 함수/rout에서 메인 플로우 (** caller ** scope) 레벨에서 던져진 예외를 잡을 수 없다고 가정합니다. 너. 위에서 언급 한 오류가 발생할 때'executor' 객체를 디버그 해보십시오. – RomanPerekhrest

답변

0

많은 디버깅 후 (그리고 executor 개체를 확인하기위한 @ RomanPerekhrest의 제안으로 인해) 많은 원인을 발견했습니다. 이 질문에서 설명한 것처럼 테스트 데이터는 많은 파일 (두 개는 1 백만 라인이 넘는 CSV)로 구성됩니다. 그것들은 각각 내 테스트 기계 (2GB VM)가 초크를 일으키는 원인이되었지만, 다른 방법으로 - 더 큰 첫 번째 메모리는 except에 의해 처리 될 메모리 오류를 정기적으로 발생 시켰지만, 두 번째는 단순히 sigkill을 발생 시켰습니다. 너무 많은 것을 탐험하지 않으면, 더 큰 파일은 단순히 읽을 때 메모리에 맞지 않을 수 있다고 생각합니다 (작은 파일은 수행 할 수 있지만 _save_output에서 완료) caused the overflow을 처리하고 프로세스를 종료합니다.

내 해결책은 단순히 BrokenProcessPool 예외를 잡아 사용자에게 알리는 것이 었으며 한 프로세스에서 처리 작업을 실행하는 옵션을 추가했습니다.이 경우 너무 큰 파일에는 오류가있는 것으로 표시됩니다 :

files = get_files() 
processor = get_processor_instance() 
results = [] 
if args.nonconcurrent: 
    results = list(map(processor.process, files)) 
else: 
    with concurrent.futures.ProcessPoolExecutor() as executor: 
     try: 
      results = list(executor.map(processor.process, files)) 
     except concurrent.futures.process.BrokenProcessPool as ex: 
      raise MyCustomProcessingError(
       f"{ex} This might be caused by limited system resources. " 
       "Try increasing system memory or disable concurrent processing " 
       "using the --nonconcurrent option." 
      )