2017-03-13 11 views
1
class Download(Task): 

    date_interval = DateIntervalParameter() 

    def output(self): 
      return LocalTarget("data/user_{0}.tar.bz2".format(self.date_interval)) 

    def run(self): 
      #import pdb; pdb.set_trace() 
      SENTENCE_URL = 'http://downloads.org/exports/user_lists.tar.bz2' 
      sentence_file = download(SENTENCE_URL, out=self.output().path) 

class Uncompress(Task): 

    date_interval = DateIntervalParameter() 

    def output(self): 
      return LocalTarget("data/user_{0}.tar".format(self.date_interval)) 

    def requires(self): 
      return Download(self.date_interval) 

    def run(self): 
      with open(self.output().path, 'wb') as tar_file, open(self.input().path, 'rb') as file: 
        decompressor = BZ2Decompressor() 
        #loop over each tar file in the bzip file 
        for data in iter(lambda : file.read(100 * 1024), b''): 
          tar_file.write(decompressor.decompress(data)) 

내 첫 번째 작업은 인터넷에서 파일을 다운로드합니다. 다음 작업은 압축을 해제하는 것입니다. 필자가 작성하려고하는 다음 작업은 tar 파일에있는 CSV 파일에서 읽고 여러 파일로 구문 분석합니다. 즉 data/file_ {var}, data/faile_ {var2} .. 등등.하지만 작업 3은 다른 작업으로 전달할 날짜 간격을 가져야합니다.Python Luigi 작업 구조

내 작업을 구성하는 방법이나 더 좋은 방법이 있습니까?

답변

0

몇 가지 작업을 수행 할 수 있습니다. 워드 프로세서에서 : 계층 구조에서 다른 작업에 매개 변수를 전달하는

luigi Uncompress --Download-dateinverval 2017-02-03 

: http://luigi.readthedocs.io/en/stable/parameters.html

Parameters are resolved in the following order of decreasing priority: 

1. Any value passed to the constructor, or task level value set on the command line (applies on an instance level) 
2. Any value set on the command line (applies on a class level) 
3. Any configuration option (applies on a class level) 
4. Any default value provided to the parameter (applies on a class level) 

명령 줄에서, 당신은 같은 일을 할 수 있습니다.