1
class Download(Task):
date_interval = DateIntervalParameter()
def output(self):
return LocalTarget("data/user_{0}.tar.bz2".format(self.date_interval))
def run(self):
#import pdb; pdb.set_trace()
SENTENCE_URL = 'http://downloads.org/exports/user_lists.tar.bz2'
sentence_file = download(SENTENCE_URL, out=self.output().path)
class Uncompress(Task):
date_interval = DateIntervalParameter()
def output(self):
return LocalTarget("data/user_{0}.tar".format(self.date_interval))
def requires(self):
return Download(self.date_interval)
def run(self):
with open(self.output().path, 'wb') as tar_file, open(self.input().path, 'rb') as file:
decompressor = BZ2Decompressor()
#loop over each tar file in the bzip file
for data in iter(lambda : file.read(100 * 1024), b''):
tar_file.write(decompressor.decompress(data))
내 첫 번째 작업은 인터넷에서 파일을 다운로드합니다. 다음 작업은 압축을 해제하는 것입니다. 필자가 작성하려고하는 다음 작업은 tar 파일에있는 CSV 파일에서 읽고 여러 파일로 구문 분석합니다. 즉 data/file_ {var}, data/faile_ {var2} .. 등등.하지만 작업 3은 다른 작업으로 전달할 날짜 간격을 가져야합니다.Python Luigi 작업 구조
내 작업을 구성하는 방법이나 더 좋은 방법이 있습니까?