이 코드가 있습니다 (사과 코드는 작업 코드에서 복사 - 붙여 넣기와 거의 동일합니다. 문제가있는 곳을 모르므로 전체 여기) :파이썬 다중 처리 코드는 정상적으로 실행되지만 종료되지 않습니다
def init(Q):
"""Serves to initialize the queue across all child processes"""
global q
q = Q
def queue_manager(q):
"""Listens on the queue, and writes pushed data to file"""
while True:
data = q.get()
if data is None:
break
key, preds = data
with pd.HDFStore(hdf_out, mode='a', complevel=5, complib='blosc') as out_store:
out_store.append(key, preds)
def writer(message):
"""Pushes messages to queue"""
q.put(message)
def reader(key):
"""Reads data from store, selects required days, processes it"""
try:
# Read the data
with pd.HDFStore(hdf_in, mode='r') as in_store:
df = in_store[key]
except KeyError as ke:
# Almost guaranteed to not happen
return (key, pd.DataFrame())
else:
# Executes only if exception is not raised
fit_df = df[(df.index >= '2016-09-11') & \
(df.index < '2016-09-25') & \
(df.index.dayofweek < 5)].copy()
pre_df = df[(df.index >= '2016-09-18') & \
(df.index < '2016-10-2') & \
(df.index.dayofweek < 5)].copy()
del df
# model_wrapper below is a custom function in another module.
# It works fine.
models, preds = model_wrapper(fit_df=fit_df, pre_df=pre_df)
if preds is not None:
writer((key, preds))
del preds
return (key, models)
def main():
sensors = pd.read_csv('sens_metadata.csv', index_col=[0])
nprocs = int(cpu_count() - 0)
maxproc = 10
q = Queue()
t = Thread(target=queue_manager, args=(q,))
print("Starting process at\t{}".format(dt.now().time()))
sys.stdout.flush()
t.start()
with Pool(processes=nprocs, maxtasksperchild=maxproc, initializer=init,
initargs=(q,)) as p:
models = p.map(reader, sensors.index.tolist(), 1)
print("Processing done at\t{}".format(dt.now().time()))
print("\nJoining Thread, and finishing writing predictions")
sys.stdout.flush()
q.put(None)
t.join()
print("Thread joined successfully at\t{}".format(dt.now().time()))
print("\nConcatenating models and serializing to pickle")
sys.stdout.flush()
pd.concat(dict(models)).to_pickle(path + 'models.pickle')
print("Pickled successfully at\t{}".format(dt.now().time()))
if __name__ == '__main__':
main()
이 코드는 심하게 편향된 동전 던지기처럼 동작합니다. 대부분의 경우 작동하지 않으며 때로는 작동합니다. 그것이 실행되면 전체 데이터 (모두 keys
)를 실행하는 데 약 2.5 시간이 걸린다는 것을 알고 있습니다. 10 회 중 9 번 실행하면 모든 데이터가 처리되고 hdf_out
파일의 데이터가 표시되지만 다중 처리 풀이 결합되지 않습니다. 모든 하위 프로세스는 활성 상태이지만 작업을 수행하지 않습니다. 나는 왜 그 프로그램이 왜 그렇게 매달렸는지 이해하지 못한다.
내가 이런 메시지가 나타나면 "Processing done at ..."
과 "Joining Thread, ..."
메시지가 표시되지 않습니다. 또한, 작은 데이터 세트를 제공하면 완료됩니다. preds
의 계산을 제외하면 완료됩니다. 나는 프로젝트의 나머지 부분에 도움이되지 않을 무거운 수정없이 models
의 계산을 제외 할 수 없다.
왜 이런 일이 일어날 지 모르겠습니다. Linux (쿠분투 16.04)를 사용하고 있습니다.