2014-10-20 1 views
0

Amazon S3에서 액세스 로그를 다운로드 중입니다. 이들은 많은 작은 파일입니다. 다운로드 시간을 줄이기 위해 스레드에서 각 파일을 읽기로 결정했습니다.스레드에서 많은 파일을 여는 긴 목록을 반복하면서 IOError를 피하는 방법은 무엇입니까?

이것은 S3에 처음 연결 한 다음 각 문서를 반복하고 각 문서의 내용을 별도의 스레드에서 읽는 내 주요 방법입니다.

def download_logs(self): 
    """ 
    Downloads logs from S3 using Boto. 
    """ 
    if self.aws_keys: 
     conn = S3Connection(*self.aws_keys) 
    else: 
     conn = S3Connection() 

    files = [] 
    mybucket = conn.get_bucket(self.input_bucket) 
    with tempdir.TempDir() as directory: 
     for item in mybucket.list(prefix=self.input_prefix): 
      local_file = os.path.join(directory, item.key.split("/")[-1]) 
      logger.debug("Downloading %s to %s" % (item.key, local_file)) 
      thread = threading.Thread(target=item.get_contents_to_filename, args=(local_file,)) 
      thread.start() 
      files.append((thread,local_file)) 

     elms = range(len(files)) 
     elemslen = len(elms) 
     while elemslen: 
      curr = random.choice(elms) 
      thread, file = files[curr] 
      if not thread.is_alive(): 
       yield file 
       elms.remove(curr) 
       elemslen -= 1 

여기서 알 수 있듯이 생성기가 생성됩니다.

물론
[2014-10-20 15:15:21,427: WARNING/Worker-2] Exception in thread Thread-710: 
Traceback (most recent call last): 
    File "/usr/local/Cellar/python/2.7.6/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py", line 810, in __bootstrap_inner 
    self.run() 
    File "/usr/local/Cellar/python/2.7.6/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py", line 763, in run 
    self.__target(*self.__args, **self.__kwargs) 
    File "/Users/viktornagy/.virtualenvs/vidzor/lib/python2.7/site-packages/boto/s3/key.py", line 1561, in get_contents_to_filename 
    fp = open(filename, 'wb') 
IOError: [Errno 24] Too many open files: u'/var/folders/7h/9tt8cknn1qx40bs_s467hc3r0000gn/T/tmpZS9fdn/access_log-2014-10-20-11-36-20-9D6F43B122C83BD6' 

, 내가 개방의 수를 올릴 수 : 발전기는 단순히 그들에게 위의 코드는 스레드에서 제기 된 다음과 같은 경고와 함께 실패

 logs = self.download_logs() 
     for downloaded in logs: 
      self.concat_files(tempLog, downloaded) 

를 연결하는 각 파일의 내용을 읽어 처리 파일,하지만 난 오히려 뭔가 의미있는 스레드 수를 제한 할 것이다.

이제 제 질문은 어떻게 달성 할 수 있습니까? 스레드 목록을 생성하는 루프가 있습니다. 이 루프가 끝나면 목록을 소화하고 양보 할 수있는 닫힌 스레드가 있는지 확인합니다.

첫 번째 루프의 스레드 수를 제한하면 목록을 소화하기 시작할 준비가되지 않습니다.

+0

당신이해야 할 일은 '다운로드 할 파일'묶음을 큐에 넣은 소비자 대기열로 리팩터링 한 다음이 대기열에서 항목을 팝업하여 다운로드하는 '소비자 쓰레드'세트를 갖게하는 것입니다/처리 한 다음 파일을 처리 된 것으로 표시하고 다음 파일로 이동합니다. Python의 [Queue] (https://docs.python.org/2/library/queue.html) 클래스는 이러한 목적으로 만들어졌습니다. – aruisdante

+0

비록 @ dano의 대답을 받아 들였습니다. http://stackoverflow.com/questions/11983938/python-appending-to-same-file-from-의 라인을 따라 다른 종류의 리팩토링으로 끝났습니다. 여러 번 – Akasha

답변

2

당신은 threading.Thread 객체의 풀을 만들 수 multiprocessing.dummy을 사용하고 Pool의 스레드로 작업을 배포 할 수 있습니다 : 당신은 가능한 한 빨리 항복 결과를 시작할 수 있도록 imap_unordered을 사용하고

from multiprocessing.dummy import Pool 

def download_logs(self): 
    """ 
    Downloads logs from S3 using Boto. 
    """ 
    if self.aws_keys: 
     conn = S3Connection(*self.aws_keys) 
    else: 
     conn = S3Connection() 

    files = [] 
    mybucket = conn.get_bucket(self.input_bucket) 
    pool = Pool(20) # 20 threads in the pool. Tweak this as you see fit. 
    with tempdir.TempDir() as directory: 
     results = pool.imap_unordered(item.get_contents_to_filename, 
             [os.path.join(directory, item.key.split("/")[-1] 
              for item in mybucket.list(prefix=self.input_prefix)] 

     for result in results: 
      yield result 

그들이 모든 작업이 완료 될 때까지 기다릴 필요가 없습니다.