2017-02-11 1 views
0

큰 CSV 파일의 병렬 컴퓨팅 데이터에 문제가 있습니다. 문제는 파일에서 읽기가 병렬 처리 될 수 없지만 병렬 컴퓨팅을 위해 파일의 데이터 청크가 전달 될 수 있다는 것입니다. Multiprocessing.Pool을 사용하여 결과를 얻지 못했습니다 (Pool.imap이 yield 생성기를 허용하지 않습니다).파일의 병렬 프로세스 데이터

파일에서 데이터 청크를 읽는 제너레이터가 있습니다. 그것은 ca. 3 초. 파일에서 하나의 데이터 청크를 가져옵니다. 이 데이터 덩어리는 마녀가 처리합니다. 2 초. 나는 파일로부터 50 개의 덩어리의 데이터를 얻는다. 다음 덩어리의 파일을 기다리면서 이전 덩어리 "병렬"을 계산할 수있었습니다.

Let`s 개념에서 몇 가지 코드를 가지고 (그러나 실제로는 작동하지 않습니다) :

def file_data_generator(path): 
    # file reading chunk by chunk 
    yield datachunk 

def compute(datachunk): 
    # some heavy computation 2.sec 
    return partial_result 

from multiprocessing import Pool 
p = Pool() 
result = p.imap(compute, file_data_generator(path)) # yield is the issue? 

을 내가 잘못 뭐하는 거지? 다른 도구를 사용해야합니까? 감사 그것은`Python3.5

간단한 코드 개념/골격 :

답변

2

당신은 매우 가까웠다. yield의 생성기 비트가 올바른지 확인하십시오. 을 사용하면이 생성기를 인수로 사용하고 next()이 실행되므로 yield이이 문맥에서 정확합니다.

누락 된 부분은 imap이 블로킹되지 않는다는 것입니다. 즉, 프로세스가 아직 완료되지 않은 경우에도 result = p.imap 호출이 반환됨을 의미합니다. 당신이 중 하나는

p.close() 
p.join() 

을 할 필요가 그리고 전체 results 뭔가를 수행하거나 단순히 결과를 반복. 다음은 작동 예제입니다.

from multiprocessing import Pool, Queue 

def compute(line): 
    # some heavy computation 2.sec 
    return len(line) 

def file_data_generator(path): 
    # file reading chunk by chunk 
    with open('book.txt') as f: 
     for line in f: 
      yield line.strip() 

if __name__ == '__main__': 
    p = Pool() 
    # start processes, they are still blocked because queue is empty 
    # results is a generator and is empty at the start 
    results = p.imap(compute, file_data_generator('book.txt')) 

    # now we tell pool that we finished filling the queue 
    p.close() 
    for res in results: 
     print(res) 
+0

헬리오, 빠른 응답을 보내 주셔서 감사합니다. 이 질문은 더 큰 문제의 일부입니다. 어쩌면 당신은 나에게 시작 방향을 가르쳐 줄 수 있습니다. 문제는 –

+0

입니다. @MaciejskiPawel 여기에서 답을 삭제했지만 새로운 질문을 어디에서 열어 봤습니까? 나는 그것을 볼 수 없으며 나는 이미 해답을 얻었습니다 ;-) – hansaplast