multprocess.Pool.apply_async로 작업하는 단일 파일에 로깅 할 수 없습니다. Logging Cookbook에서 this 예제를 적용하려고 시도했지만 multiprocessing.Process
에서만 작동합니다. 로깅 큐를 apply_async
에 전달하는 것은 효과가없는 것 같습니다. 풀을 사용하여 동시 스레드 수를 쉽게 관리 할 수 있습니다.다중 처리로 단일 파일에 로그온하는 방법 .Pool.apply_async
다음은 멀티 프로세스를 사용하는 예입니다. 프로세스는 주 프로세스의 로그 메시지를받지 못한다는 점을 제외하고 나에게 잘 작동하며, 큰 작업이 100 개있을 때 제대로 작동하지 않는다고 생각합니다. multiprocessing.Manager, multiprocessing.Queue, multiprocessing.get_logger, apply_async.get를 사용하여, 나는 많은 약간의 변화를 시도했습니다
def main_with_pool():
start_time = time.time()
queue = multiprocessing.Queue(-1)
listener = multiprocessing.Process(target=listener_process,
args=(queue, listener_configurer))
listener.start()
pool = multiprocessing.Pool(processes=3)
job_list = [np.random.randint(10)/2 for i in range(10)]
single_thread_time = np.sum(job_list)
for i, sleep_time in enumerate(job_list):
name = str(i)
pool.apply_async(worker_function,
args=(sleep_time, name, queue, worker_configurer))
queue.put_nowait(None)
listener.join()
end_time = time.time()
print("Script execution time was {}s, but single-thread time was {}s".format(
(end_time - start_time),
single_thread_time
))
if __name__ == "__main__":
main_with_pool()
:
import logging
import logging.handlers
import numpy as np
import time
import multiprocessing
import pandas as pd
log_file = 'PATH_TO_FILE/log_file.log'
def listener_configurer():
root = logging.getLogger()
h = logging.FileHandler(log_file)
f = logging.Formatter('%(asctime)s %(processName)-10s %(name)s %(levelname)-8s %(message)s')
h.setFormatter(f)
root.addHandler(h)
# This is the listener process top-level loop: wait for logging events
# (LogRecords)on the queue and handle them, quit when you get a None for a
# LogRecord.
def listener_process(queue, configurer):
configurer()
while True:
try:
record = queue.get()
if record is None: # We send this as a sentinel to tell the listener to quit.
break
logger = logging.getLogger(record.name)
logger.handle(record) # No level or filter logic applied - just do it!
except Exception:
import sys, traceback
print('Whoops! Problem:', file=sys.stderr)
traceback.print_exc(file=sys.stderr)
def worker_configurer(queue):
h = logging.handlers.QueueHandler(queue) # Just the one handler needed
root = logging.getLogger()
root.addHandler(h)
# send all messages, for demo; no other level or filter logic applied.
root.setLevel(logging.DEBUG)
# This is the worker process top-level loop, which just logs ten events with
# random intervening delays before terminating.
# The print messages are just so you know it's doing something!
def worker_function(sleep_time, name, queue, configurer):
configurer(queue)
start_message = 'Worker {} started and will now sleep for {}s'.format(name, sleep_time)
logging.info(start_message)
time.sleep(sleep_time)
success_message = 'Worker {} has finished sleeping for {}s'.format(name, sleep_time)
logging.info(success_message)
def main_with_process():
start_time = time.time()
single_thread_time = 0.
queue = multiprocessing.Queue(-1)
listener = multiprocessing.Process(target=listener_process,
args=(queue, listener_configurer))
listener.start()
workers = []
for i in range(10):
name = str(i)
sleep_time = np.random.randint(10)/2
single_thread_time += sleep_time
worker = multiprocessing.Process(target=worker_function,
args=(sleep_time, name, queue, worker_configurer))
workers.append(worker)
worker.start()
for w in workers:
w.join()
queue.put_nowait(None)
listener.join()
end_time = time.time()
final_message = "Script execution time was {}s, but single-thread time was {}s".format(
(end_time - start_time),
single_thread_time
)
print(final_message)
if __name__ == "__main__":
main_with_process()
하지만 다음 적응이 동작하지 않습니다(),하지만 일을하지 못했습니다.
나는 이것을위한 기성품 해결책이있을 것이라고 생각합니다. 대신 샐러리를 시도해야합니까?
감사합니다.
torek의 조언에 따라이 문제를 해결했습니다. 나는 github에 대한 [예제] (https://github.com/ClayCampaigne/multiprocessing-pool-logging/blob/master/pool_logging.py)를 가지고 있습니다. – GrayOnGray