2017-10-03 10 views
1

luigi에서 응용 프로그램을 실행하는 동안 로깅은 표준 출력과 파일로 전달됩니다. 이것은 노동자 = 1에서 잘 작동합니다. 그러나 worker = 4로 설정하자마자 응용 프로그램 로깅을 얻지 못합니다.여러 작업자와 함께 luigi에서 로깅을 구성하는 방법은 무엇입니까?

여러 작업자가 기록 할 수 있습니까?

+1

Github의 문제에서 언급했듯이 이것은 Windows 관련 문제이며 Windows 운영 체제에서는 os.fork()와 관련이 있습니다. – hirolau

답변

1

luigi에는 간단한 해결책이없는 것 같지만 다음과 같은 작업이 있습니다. 모든 실행 방법의 시작 부분에서 "활성화"를 부르는 것을 기억해야합니다. 나는 이것을 피하기위한 해결책을 찾지 못했습니다.

import logging 
import luigi 
from luigi.interface import build, setup_interface_logging 

from multiprocessing import Queue 
from logging.handlers import QueueHandler, QueueListener 

########### enable multiprocess logging ############################ 

q = Queue() 

def run(tasks, *args, **kwargs): 
    """ run tasks with queuelistener to handle logging """ 
    setup_interface_logging.has_run = True 
    workers = kwargs.get("workers", 1) > 1 
    if workers: 
     log=logging.getLogger() 
     listener = QueueListener(q, *log.handlers) 
     listener.start() 
     build(tasks, *args, **kwargs) 
     listener.stop() 
    else: 
     build(tasks, *args, **kwargs) 

class Task(luigi.Task): 
    """ redirect logging to queue """ 
    def __init__(self): 
     """ add q to process """ 
     super().__init__() 
     self.q = q 

    def enable(self): 
     """ call at start of each run process to initialise settings and redirect logging to queue """ 

     # for 1 worker leave settings alone 
     log = logging.getLogger() 
     if log.handlers: 
      return 

     # for multiple workers load settings but replace handlers with queue 
     from logcon import log 
     log.handlers = [] 
     log.addHandler(QueueHandler(self.q)) 

###################################################################### 

class Test(Task): 

    def complete(self): 
     return False 

    def run(self): 
     self.enable() 

     log = logging.getLogger() 
     log.debug("running") 
     log.info("running") 
     log.warning("running") 

     log = logging.getLogger("runlog") 
     log.debug("running") 
     log.info("running") 
     log.warning("running") 

     log = logging.getLogger("luigi-interface") 
     log.debug("running") 
     log.info("running") 
     log.warning("running") 

    def requires(self): 
     return []