1
S3 버킷에 덤프를 업로드하고 공유하는 일상적인 작업이 있습니다. 아래의 코드가 작동하는 동안 어떤 이유로 파일을 덮어 쓰지 않으려합니다. - 그것은 S3에 대한하지 않습니다는 로컬 파일에 대한 작동하지만 path = luigi.Parameter(default=glob(DATA_DIR)[-2], batch_method=max)
Luigi가 S3에 덮어 쓰기
2) resources = {'overwrite_resource': 1}
를 추가 : 워드 프로세서
, 나는 두 개의 병렬 실행을위한 솔루션을 정의) 1이 필요합니다.
class report_to_S3(luigi.Task):
client = S3Client()
path = luigi.Parameter(default=glob(DATA_DIR)[-2], batch_method=max)
local_dump_path = '../../../data/local_db_dump.csv'
resources = {'overwrite_resource': 1}
def requires(self):
return upload_tweets(path=self.path)
def output(self):
self.s3_path = "s3://qclm-nyc-ct/dump/dump.csv"
return S3Target(self.s3_path, client=self.client)
def run(self):
c = sqa.create_engine('postgresql:///qc_soc_media')
df = pd.read_sql_query('SELECT id, user_id, timestamp, lat, lon, ct FROM tweets WHERE ct IS NOT NULL', c)
N = len(df)
df.to_csv(self.local_dump_path, index=None)
self.client.put(self.local_dump_path, self.output().path,
headers={'Content-Type': 'application/csv'})
send_S3_report(N)
if __name__ == '__main__':
luigi.run(local_scheduler=True, main_task_cls=report_to_S3)
감사합니다. 불행히도, 나는 그것을 원하지 않는다. 원본 Luigi는 로컬 타겟에 대해 resources = { 'overwrite_resource': 1}를 사용하여이 동작을 덮어 쓸 수 있지만, S3 버킷 (버그)에 대해 작동하지 않는 것 같습니다. –
'overwrite_resource'는 리소스를 만드는 기술 일뿐입니다 여러 작업이 동시에 실행되는 것을 방지합니다. 거기에 문자열을 사용할 수 있습니다. (일반적으로 리소스는 동시 API 연결 등의 제한 사항입니다). 파일 이름에 타임 스탬프를 사용하지 않으려면 output() 메서드에서 사용할 다른 표시기를 만듭니다. 출력이 이미 존재하면 작업이 실행되지 않습니다. – MattMcKnight
감사합니다. 대상으로 로그 파일을 만들었습니다. –