2016-11-22 5 views
1

이 코드가 있습니다 (사과 코드는 작업 코드에서 복사 - 붙여 넣기와 거의 동일합니다. 문제가있는 곳을 모르므로 전체 여기) :파이썬 다중 처리 코드는 정상적으로 실행되지만 종료되지 않습니다

def init(Q): 
    """Serves to initialize the queue across all child processes""" 
    global q 
    q = Q 

def queue_manager(q): 
    """Listens on the queue, and writes pushed data to file""" 
    while True: 
     data = q.get() 
     if data is None: 
      break 
     key, preds = data 
     with pd.HDFStore(hdf_out, mode='a', complevel=5, complib='blosc') as out_store: 
      out_store.append(key, preds) 

def writer(message): 
    """Pushes messages to queue""" 
    q.put(message) 

def reader(key): 
    """Reads data from store, selects required days, processes it""" 
    try: 
     # Read the data 
     with pd.HDFStore(hdf_in, mode='r') as in_store: 
      df = in_store[key] 
    except KeyError as ke: 
     # Almost guaranteed to not happen 
     return (key, pd.DataFrame()) 
    else: 
     # Executes only if exception is not raised 
     fit_df = df[(df.index >= '2016-09-11') & \ 
        (df.index < '2016-09-25') & \ 
        (df.index.dayofweek < 5)].copy() 
     pre_df = df[(df.index >= '2016-09-18') & \ 
        (df.index < '2016-10-2') & \ 
        (df.index.dayofweek < 5)].copy() 
     del df 
     # model_wrapper below is a custom function in another module. 
     # It works fine. 
     models, preds = model_wrapper(fit_df=fit_df, pre_df=pre_df) 
     if preds is not None: 
      writer((key, preds)) 
      del preds 
    return (key, models) 

def main(): 
    sensors = pd.read_csv('sens_metadata.csv', index_col=[0]) 
    nprocs = int(cpu_count() - 0) 
    maxproc = 10 
    q = Queue() 
    t = Thread(target=queue_manager, args=(q,)) 

    print("Starting process at\t{}".format(dt.now().time())) 
    sys.stdout.flush() 
    t.start() 
    with Pool(processes=nprocs, maxtasksperchild=maxproc, initializer=init, 
       initargs=(q,)) as p: 
     models = p.map(reader, sensors.index.tolist(), 1) 
    print("Processing done at\t{}".format(dt.now().time())) 
    print("\nJoining Thread, and finishing writing predictions") 
    sys.stdout.flush() 
    q.put(None) 
    t.join() 
    print("Thread joined successfully at\t{}".format(dt.now().time())) 
    print("\nConcatenating models and serializing to pickle") 
    sys.stdout.flush() 
    pd.concat(dict(models)).to_pickle(path + 'models.pickle') 
    print("Pickled successfully at\t{}".format(dt.now().time())) 

if __name__ == '__main__': 
    main() 

이 코드는 심하게 편향된 동전 던지기처럼 동작합니다. 대부분의 경우 작동하지 않으며 때로는 작동합니다. 그것이 실행되면 전체 데이터 (모두 keys)를 실행하는 데 약 2.5 시간이 걸린다는 것을 알고 있습니다. 10 회 중 9 번 실행하면 모든 데이터가 처리되고 hdf_out 파일의 데이터가 표시되지만 다중 처리 풀이 결합되지 않습니다. 모든 하위 프로세스는 활성 상태이지만 작업을 수행하지 않습니다. 나는 왜 그 프로그램이 왜 그렇게 매달렸는지 이해하지 못한다.

내가 이런 메시지가 나타나면 "Processing done at ...""Joining Thread, ..." 메시지가 표시되지 않습니다. 또한, 작은 데이터 세트를 제공하면 완료됩니다. preds의 계산을 제외하면 완료됩니다. 나는 프로젝트의 나머지 부분에 도움이되지 않을 무거운 수정없이 models의 계산을 제외 할 수 없다.

왜 이런 일이 일어날 지 모르겠습니다. Linux (쿠분투 16.04)를 사용하고 있습니다.

답변

0

분명히 maxtaskperchild kwag를 삭제하면 문제가 해결됩니다. 왜 내가 분명히 이해하지 못하는 이유가 무엇인가. 포크 프로세스 (Linux의 기본값)와 스폰 프로세스 (Windows의 유일한 옵션) 간의 차이점과 관련이 있다고 가정합니다.

포크 프로세스 maxtaskperchild은 성능이 없으므로 더 이상 필요하지 않습니다. 나는 maxtaskperchild을 떨어 뜨림으로써 메모리 사용이 향상되었다는 것을 알아 차렸다. 메모리는 하위 프로세스에 의해 교착 상태가 아니지만 상위 프로세스에서 공유됩니다. 그러나 Windows를 사용해야 할 때 maxtaskperchild은 특히 긴 작업 목록으로 메모리 집약적 인 작업을 실행할 때 하위 프로세스가 부풀어 오르는 것을 막는 중요한 방법이었습니다.

어떤 일이 일어나고 있는지 잘 아는 사람은 언제든지이 답변을 편집하십시오.