2017-11-25 1 views
1

원래 질문은 파이썬에서의 병렬 처리에 관한 질문이었습니다. 그러나 질문이 대답없이 남아 있기 때문에 그것을 삭제하고 내 결론을 요약하려고합니다. 하나 라이브러리를 멀티 프로세싱 또는 멀티 스레딩 를 사용하여 - 희망 그것은 당신의 코드가 병렬로 실행하게하는 두 가지 방법이 일반적으로단일 스레드보다 느린 파이썬에서의 다중 처리

... 사람을 도움이 될 것입니다. stackoverflow.com 라이브러리를 멀티 스레딩에 많은 게시물에 따르면

스레드를 통해 효율적으로 메모리를 공유 할 수 있지만, 단일 코어에서 스레드를 실행합니다. 따라서 병목이 I/O 작업 인 경우 주로 코드 속도를 높일 수 있습니다. 라이브러리에 많은 실제 응용 프로그램이 있는지 확실하지 않습니다.

코드가 CPU를 많이 사용하는 경우 (CPU 제한이라고도 함) 다중 처리 라이브러리가 문제의 원인 일 수 있습니다. 라이브러리는 개별 코어에 스레드를 분산시킵니다. 그러나 많은 사람들 (나를 포함해서)은 이러한 멀티 코어 코드가 싱글 코어보다 훨씬 느릴 수 있다는 것을 발견했습니다. 이 문제는 개별 스레드가 효과적으로 메모리를 공유 할 수 없다는 사실에 기인합니다. 데이터는 광범위하게 복사되므로 상당한 오버 헤드가 발생합니다. 아래의 코드에서 볼 수 있듯이 오버 헤드는 입력 데이터 유형에 크게 의존합니다. 문제는 Linux보다 Windows에서 훨씬 더 심오합니다. 필자는 병렬 처리가 가장 큰 파이썬 실망이라고 말해야합니다. 파이썬은 병렬 처리를 염두에두고 설계되지 않았습니다.

첫 번째 코드는 Process을 사용하여 코어 사이에 pandas dataframe을 할당합니다.

import numpy as np 
import math as mth 
import pandas as pd 
import time as tm 
import multiprocessing as mp 

def bnd_calc_npv_dummy(bnds_info, core_idx, npv): 
    """ multiple core dummy valuation function (based on single core function) """ 

    bnds_no = len(bnds_info) 
    tm.sleep(0.0001 * bnds_no) 

    npv[core_idx] = np.array(bnds_info['npv']) 

def split_bnds_info(bnds_info, cores_no): 
    """ cut dataframe with bond definitions into pieces - one piece per core """ 

    bnds_info_mp = [] 
    bnds_no = len(bnds_info) 
    batch_size = mth.ceil(np.float64(bnds_no)/cores_no) # number of bonds allocated to one core 

    # split dataframe among cores 
    for idx in range(cores_no): 
     lower_bound = int(idx * batch_size) 
     upper_bound = int(np.min([(idx + 1) * batch_size, bnds_no])) 
     bnds_info_mp.append(bnds_info[lower_bound : upper_bound].reset_index().copy()) 

    # return list of dataframes 
    return bnds_info_mp 

def bnd_calc_npv(bnds_info, cores_no): 
    """ dummy valuation function running multicore """ 

    manager = mp.Manager() 
    npv = manager.dict() 

    bnds_info_mp = split_bnds_info(bnds_info, cores_no) 

    processes = [mp.Process(target = bnd_calc_npv_dummy, args = (bnds_info_mp[core_idx], core_idx, npv)) for core_idx in xrange(cores_no)]  
    [process.start() for process in processes]  
    [process.join() for process in processes] 

    # return NPV of individual bonds  
    return np.hstack(npv.values()) 

if __name__ == '__main__': 

    # create dummy dataframe 
    bnds_no = 1200 # number of dummy in the sample 
    bnds_info = {'currency_name' : 'EUR', 'npv' : 100} 
    bnds_info = pd.DataFrame(bnds_info, index = range(1)) 
    bnds_info = pd.concat([bnds_info] * bnds_no, ignore_index = True) 

    # one core 
    print("ONE CORE") 
    start_time = tm.time() 
    bnds_no = len(bnds_info) 
    tm.sleep(0.0001 * bnds_no) 
    npv = np.array(bnds_info['npv']) 
    elapsed_time = (tm.time() - start_time) 
    print(' elapsed time: ' + str(elapsed_time) + 's') 

    # two cores 
    print("TWO CORES") 
    cores_no = 2 
    start_time = tm.time() 
    npv = bnd_calc_npv(bnds_info, cores_no) 
    elapsed_time = (tm.time() - start_time) 
    print(' elapsed time: ' + str(elapsed_time) + 's') 

    # three cores 
    print("THREE CORES") 
    cores_no = 3 
    start_time = tm.time() 
    npv = bnd_calc_npv(bnds_info, cores_no) 
    elapsed_time = (tm.time() - start_time) 
    print(' elapsed time: ' + str(elapsed_time) + 's') 

    # four cores 
    print("FOUR CORES") 
    cores_no = 4 
    start_time = tm.time() 
    npv = bnd_calc_npv(bnds_info, cores_no) 
    elapsed_time = (tm.time() - start_time) 
    print(' elapsed time: ' + str(elapsed_time) + 's') 

두 번째 코드는 이전의 것과 동일 - 유일한 차이점에 대한 런타임 변경 단일 코어에 대한 실행 시간 변경 비교 (이 시간 우리가 numpy array 대신 pandas dataframe의 사용 및 성능 차이가 큰 점이다 멀티 코어).

import numpy as np 
import math as mth 
import time as tm 
import multiprocessing as mp 

def bnd_calc_npv_dummy(bnds_info, core_idx, npv): 
    """ multiple core dummy valuation function (based on single core function) """ 

    bnds_no = len(bnds_info) 
    tm.sleep(0.0001 * bnds_no) 

    npv[core_idx] = bnds_info 

def split_bnds_info(bnds_info, cores_no): 
    """ cut dataframe with bond definitions into pieces - one piece per core """ 

    bnds_info_mp = [] 
    bnds_no = len(bnds_info) 
    batch_size = mth.ceil(np.float64(bnds_no)/cores_no) # number of bonds allocated to one core 

    # split dataframe among cores 
    for idx in range(cores_no): 
     lower_bound = int(idx * batch_size) 
     upper_bound = int(np.min([(idx + 1) * batch_size, bnds_no])) 
     bnds_info_mp.append(bnds_info[lower_bound : upper_bound]) 

    # return list of dataframes 
    return bnds_info_mp 

def bnd_calc_npv(bnds_info, cores_no): 
    """ dummy valuation function running multicore """ 

    manager = mp.Manager() 
    npv = manager.dict() 

    bnds_info_mp = split_bnds_info(bnds_info, cores_no) 

    processes = [mp.Process(target = bnd_calc_npv_dummy, args = (bnds_info_mp[core_idx], core_idx, npv)) for core_idx in xrange(cores_no)]  
    [process.start() for process in processes]  
    [process.join() for process in processes] 

    # return NPV of individual bonds  
    return np.hstack(npv.values()) 

if __name__ == '__main__': 

    # create dummy dataframe 
    bnds_no = 1200 # number of dummy in the sample 
    bnds_info = np.array([100] * bnds_no) 

    # one core 
    print("ONE CORE") 
    start_time = tm.time() 
    bnds_no = len(bnds_info) 
    tm.sleep(0.0001 * bnds_no) 
    elapsed_time = (tm.time() - start_time) 
    print(' elapsed time: ' + str(elapsed_time) + 's') 

    # two cores 
    print("TWO CORES") 
    cores_no = 2 
    start_time = tm.time() 
    npv = bnd_calc_npv(bnds_info, cores_no) 
    elapsed_time = (tm.time() - start_time) 
    print(' elapsed time: ' + str(elapsed_time) + 's') 

    # three cores 
    print("THREE CORES") 
    cores_no = 3 
    start_time = tm.time() 
    npv = bnd_calc_npv(bnds_info, cores_no) 
    elapsed_time = (tm.time() - start_time) 
    print(' elapsed time: ' + str(elapsed_time) + 's') 

    # four cores 
    print("FOUR CORES") 
    cores_no = 4 
    start_time = tm.time() 
    npv = bnd_calc_npv(bnds_info, cores_no) 
    elapsed_time = (tm.time() - start_time) 
    print(' elapsed time: ' + str(elapsed_time) + 's') 

코드의 마지막 부분은 Pool 대신 Process을 사용하고 있습니다. 실행 시간이 약간 더 좋습니다.

import numpy as np 
import time as tm 
import multiprocessing as mp 

#import pdb 
#pdb.set_trace() 

def bnd_calc_npv_dummy(bnds_info): 
    """ multiple core dummy valuation function (based on single core function) """ 

    try: 
     # get number of bonds 
     bnds_no = len(bnds_info) 
    except: 
     pass 
     bnds_no = 1 

     tm.sleep(0.0001 * bnds_no) 

    return bnds_info 

def bnd_calc_npv(bnds_info, cores_no): 
    """ dummy valuation function running multicore """ 

    pool = mp.Pool(processes = cores_no) 
    npv = pool.map(bnd_calc_npv_dummy, bnds_info.tolist()) 

    # return NPV of individual bonds  
    return npv 

if __name__ == '__main__': 

    # create dummy dataframe 
    bnds_no = 1200 # number of dummy in the sample 
    bnds_info = np.array([100.0] * bnds_no) 

    # one core 
    print("ONE CORE") 
    start_time = tm.time() 
    bnds_no = len(bnds_info) 
    tm.sleep(0.0001 * bnds_no) 
    elapsed_time = (tm.time() - start_time) 
    print(' elapsed time: ' + str(elapsed_time) + 's') 

    # two cores 
    print("TWO CORES") 
    cores_no = 2 
    start_time = tm.time() 
    npv = bnd_calc_npv(bnds_info, cores_no) 
    elapsed_time = (tm.time() - start_time) 
    print(' elapsed time: ' + str(elapsed_time) + 's') 

    # three cores 
    print("THREE CORES") 
    cores_no = 3 
    start_time = tm.time() 
    npv = bnd_calc_npv(bnds_info, cores_no) 
    elapsed_time = (tm.time() - start_time) 
    print(' elapsed time: ' + str(elapsed_time) + 's') 

    # four cores 
    print("FOUR CORES") 
    cores_no = 4 
    start_time = tm.time() 
    npv = bnd_calc_npv(bnds_info, cores_no) 
    elapsed_time = (tm.time() - start_time) 
    print(' elapsed time: ' + str(elapsed_time) + 's') 

그래서 내 결론은 병렬 파이썬 구현은 실제 생활에서 적용되지 않는 것입니다 (파이썬 2.7.13 및 윈도우 7을 사용). 안부,

Macky

PS : 누군가가 문제의 부분은 독립적으로 계산 될 수있을 때

+4

전혀 읽지 않고 다중 처리가 잘못되어 단일 프로세스보다 속도가 느립니다.[Minimal, Complete, Verifiable example] (https://stackoverflow.com/help/mcve)을 만드는 것을 고려하십시오. –

+1

I/O 바운드 작업 (예 : 파일 읽기 또는 쓰기)의 경우 '다중 처리'모듈 대신 '스레딩'모듈을 고려해야합니다. 멀티 프로세싱은 CPU 바운드 작업에 대해 더 잘 작동합니다. – ettanany

답변

1

멀티가 가장 잘 작동합니다 ... 좀 더 행복하게보다 더 내 마음을 바꿀 것입니다 코드를 변경 할 수있는 경우 , 예. multiprocessing.Pool. 풀의 모든 작업자 프로세스는 입력의 일부를 처리하고 결과를 마스터 프로세스에 반환합니다.

입력 배열 전체에 데이터를 수정해야하는 경우 manager의 동기로 오버 헤드가 멀티 프로세싱의 모든 이점을 파괴 할 수 있습니다.