2017-10-28 17 views
1

다운로드 할 500 개의 링크가 있으며 예를 들어 10 개의 항목별로 배치 할 수 있습니다.luigi 일괄 처리 모듈이 직선 일괄 처리입니다. 작업

이 의사 코드는 어떻게 될 것입니까?

class BatchJobTask(luigi.Task) 
    items = luigi.Parameter() 
    def run(self): 
     listURLs = [] 
     with ('urls_chunk', 'r') as urls 
      for line in urls: 
       listURLs.append('http://ggg'+line+'.org') 
      10_urls = listURLs[0:items] #10 items here 
      for i in 10_urls: 
       req = request.get(url) 
       req.contents 
    def output(self): 
     return self.LocalTarger("downloaded_filelist.txt") 

class BatchWorker(luigi.Task) 
    def run(self) 
     # Here I should run BatchJobTask from 0 to 10, next 11 - 21 new etc... 

어떻게 될까요?

+0

귀하의 URL 목록은 어디에 있습니까? – MattMcKnight

+0

첫 번째 게시물 – GarfieldCat

+0

을 업데이트했습니다.이 URL 목록은 어디에 저장 되었습니까? 큐, 데이터베이스, 파일? 당신이해야 할 일은 그 일에 얼마나 많은 사람들이 있는지를 알아 내고 그곳에서 당신의 덩어리를 만드는 것입니다. 아래에 예제를 만들 겠지만 문제의 관련 부분을 지정하지 않았으므로 문제와 관련성이 낮습니다. – MattMcKnight

답변

1

다음은 원하는 것을 수행하는 방법이지만 파일 목록에는 문자열 목록이 별도로 저장되어 있습니다.

import luigi 
import requests 

BATCH_SIZE = 10 


class BatchProcessor(luigi.Task): 
    items = luigi.ListParameter() 
    max = luigi.IntParameter() 

    def requires(self): 
     return None 

    def output(self): 
     return luigi.LocalTarget('processed'+str(max)+'.txt') 

    def run(self): 
     for item in self.items: 
      req = requests.get('http://www.'+item+'.org') 
      # do something useful here 
      req.contents 
     open("processed"+str(max)+".txt",'w').close() 


class BatchCreator(luigi.Task): 
    file_with_urls = luigi.Parameter() 

    def requires(self): 
     required_tasks = [] 
     f = open(self.file_with_urls) 
     batch_index = 0 
     total_index = 0 
     lines = [] 
     while True: 
      line = f.readline() 
      if not line: break 
      total_index += 1 
      if batch_index < BATCH_SIZE: 
       lines.append(line) 
       batch_index += 1 
      else: 
       required_tasks.append(BatchProcessor(batch=lines)) 
       lines = [line] 
       batch_index = 1 
     return required_tasks 

    def output(self): 
     return luigi.LocalTarget(str(self.file_with_urls) + 'processed') 

    def run(self): 
     open(str(self.file_with_urls) + 'processed', 'w').close() 
1

나는 이것을했다.

class GetListtask(luigi.Task) 
    def run(self): 
     ... 
    def output(self): 
    return luigi.LocalTarget(self.outputfile) 

class GetJustOneFile(luigi.Task): 
    fid = luigi.IntParameter() 
    def requires(self): 
     pass 

    def run(self): 
     url = 'http://my-server.com/test' + str(self.fid) + '.txt' 
     download_file = requests.get(url, stream=True) 
     with self.output().open('w') as downloaded_file: 
      downloaded_file.write(str(download_file.content)) 

    def output(self): 
     return luigi.LocalTarget("test{}.txt".format(self.fid)) 


class GetAllFiles(luigi.WrapperTask): 
    def requires(self): 
     listoffiles = [] # 0..999 
     for i in range(899): 
      listoffiles.append(i) 
     return [GetJustOneFile(fid=fileid) for fileid in listoffiles] 

이 코드는 끔찍한가요?

+0

글쎄, 그것은 일괄 처리를하지 않지만 작동해야합니다. – MattMcKnight

+0

미리 정의 된 목록 대신 GetAllFiles에서 GetListTask의 파일을 어떻게 입력합니까? – GarfieldCat

+0

그건 내'BatchCreator' 태스크의'requires' 메소드에서 보여주었습니다. 파일의 각 라인이 다양한 URN 컴포넌트 인 파일을 가지고 있다고 가정합니다. – MattMcKnight