2017-05-19 3 views
1

S3 버킷에 덤프를 업로드하고 공유하는 일상적인 작업이 있습니다. 아래의 코드가 작동하는 동안 어떤 이유로 파일을 덮어 쓰지 않으려합니다. - 그것은 S3에 대한하지 않습니다는 로컬 파일에 대한 작동하지만 path = luigi.Parameter(default=glob(DATA_DIR)[-2], batch_method=max)Luigi가 S3에 덮어 쓰기

2) resources = {'overwrite_resource': 1}

를 추가 : 워드 프로세서

, 나는 두 개의 병렬 실행을위한 솔루션을 정의) 1이 필요합니다.

class report_to_S3(luigi.Task): 
    client = S3Client() 
    path = luigi.Parameter(default=glob(DATA_DIR)[-2], batch_method=max) 
    local_dump_path = '../../../data/local_db_dump.csv' 
    resources = {'overwrite_resource': 1} 

    def requires(self): 
     return upload_tweets(path=self.path) 

    def output(self): 
     self.s3_path = "s3://qclm-nyc-ct/dump/dump.csv" 
     return S3Target(self.s3_path, client=self.client) 

    def run(self): 
     c = sqa.create_engine('postgresql:///qc_soc_media') 
     df = pd.read_sql_query('SELECT id, user_id, timestamp, lat, lon, ct FROM tweets WHERE ct IS NOT NULL', c) 
     N = len(df) 
     df.to_csv(self.local_dump_path, index=None) 
     self.client.put(self.local_dump_path, self.output().path, 
         headers={'Content-Type': 'application/csv'}) 
     send_S3_report(N) 


if __name__ == '__main__': 
    luigi.run(local_scheduler=True, main_task_cls=report_to_S3) 

답변

1

output() 메서드에서 지정한 대상이 이미 있으면 run() 메서드가 실행되지 않습니다. 파일 이름에 타임 스탬프를 사용하거나 작업이 완료되었음을 나타내는 다른 센티널/플래그를 만들 수 있습니다.

+0

감사합니다. 불행히도, 나는 그것을 원하지 않는다. 원본 Luigi는 로컬 타겟에 대해 resources = { 'overwrite_resource': 1}를 사용하여이 동작을 덮어 쓸 수 있지만, S3 버킷 (버그)에 대해 작동하지 않는 것 같습니다. –

+1

'overwrite_resource'는 리소스를 만드는 기술 일뿐입니다 여러 작업이 동시에 실행되는 것을 방지합니다. 거기에 문자열을 사용할 수 있습니다. (일반적으로 리소스는 동시 API 연결 등의 제한 사항입니다). 파일 이름에 타임 스탬프를 사용하지 않으려면 output() 메서드에서 사용할 다른 표시기를 만듭니다. 출력이 이미 존재하면 작업이 실행되지 않습니다. – MattMcKnight

+0

감사합니다. 대상으로 로그 파일을 만들었습니다. –