1

비 병렬 버전과 병렬 버전의 실행 시간을 비교하려고합니다. 문제는 병렬 함수가 스레딩 패키지와 잘 작동하지만 다중 처리 패키지로 전환 한 후에 프로세스가 시작되지 않는다는 것입니다. 이것이 내 컴파일러 또는 다른 것 때문에 발생하는지 궁금합니다. 다른 사람이 내 코드를 실행하여 다른 환경에서 작동하는지 확인할 수 있습니까? 그리고 그렇지 않다면 코드 내에서 어떤 문제가 있습니까?행렬의 외란을 계산하는 함수에 대해 다중 처리가 작동하지 않습니다.

import numpy as np 
from multiprocessing import Process 

def single_row(a,b,output): 
    for j in range(len(b[0])): 
     for k in range(len(a)): 
      output[j]=output[j]+a[k]*b[k][j] 

#Parallel Matrix Cross Multiplication 
def cross_parallel(a,b): 
    if len(a[0])==len(b): 
     tasks=[None]*len(a) 
     T=np.array([[0]*len(b[0])]*len(a)) 
     for i in range(len(a)): 
      tasks[i]=Process(target=single_row,args=(a[i],b,T[i])) 
     for task in tasks: 
      task.start() 
     for task in tasks: 
      task.join() 
     return T 
    else: 
     print 'Error: Invalid Matrices' 

#Non-parallel Matrix Cross Multiplication 
def cross_basic(a,b): 
    if len(a[0])==len(b): 
     T=np.array([[0]*len(b[0])]*len(a)) 
     for i in range(len(a)): 
      for j in range(len(b[0])): 
       for k in range(len(a[0])): 
        T[i][j]=T[i][j]+a[i][k]*b[k][j] 
     return T 
    else: 
     print 'Error: Invalid Matrices' 

if __name__ == '__main__':  
    x=[[1,2,3,4],[5,6,7,8],[9,10,11,12],[13,14,15,16]] 
    y=[[1,2,3,4],[5,6,7,8],[9,10,11,12],[13,14,15,16]] 
    print cross_basic(x,y) 
    print cross_parallel(x,y) 

결과 :

[[ 90 100 110 120] 
[202 228 254 280] 
[314 356 398 440] 
[426 484 542 600]] 
[[0 0 0 0] 
[0 0 0 0] 
[0 0 0 0] 
[0 0 0 0]] 

(라인 15에서 만 다른) 작동 스레딩 패키지를 사용하여 버전 :

import numpy as np 
from threading import Thread 

def single_row(a,b,output): 
    for j in range(len(b[0])): 
     for k in range(len(a)): 
      output[j]=output[j]+a[k]*b[k][j] 

#Parallel Matrix Cross Multiplication 
def cross_parallel(a,b): 
    if len(a[0])==len(b): 
     tasks=[None]*len(a) 
     T=np.array([[0]*len(b[0])]*len(a)) 
     for i in range(len(a)): 
      tasks[i]=Thread(target=single_row,args=(a[i],b,T[i])) 
     for task in tasks: 
      task.start() 
     for task in tasks: 
      task.join() 
     return T 
    else: 
     print 'Error: Invalid Matrices' 

#Non-parallel Matrix Cross Multiplication 
def cross_basic(a,b): 
    if len(a[0])==len(b): 
     T=np.array([[0]*len(b[0])]*len(a)) 
     for i in range(len(a)): 
      for j in range(len(b[0])): 
       for k in range(len(a[0])): 
        T[i][j]=T[i][j]+a[i][k]*b[k][j] 
     return T 
    else: 
     print 'Error: Invalid Matrices' 

if __name__ == '__main__':  
    x=[[1,2,3,4],[5,6,7,8],[9,10,11,12],[13,14,15,16]] 
    y=[[1,2,3,4],[5,6,7,8],[9,10,11,12],[13,14,15,16]] 
    print cross_basic(x,y) 
    print cross_parallel(x,y) 

결과 : 당신이

[[ 90 100 110 120] 
[202 228 254 280] 
[314 356 398 440] 
[426 484 542 600]] 
[[ 90 100 110 120] 
[202 228 254 280] 
[314 356 398 440] 
[426 484 542 600]] 

답변

1

스레드를 사용하면 결과 행렬 T은 스레드에서 공유됩니다 (즉, 동일한 메모리 슬롯을 사용하여 기본적으로 동일한 개체 임). 따라서 을 자식 Thread에서 수정하면 T의 로컬 버전이 수정되고 올바른 결과를 얻게됩니다.

하위 프로세스의 경우 각 자식 ProcessT의 새 복사본을 가져옵니다. 따라서 서브 프로세스에서 T의 수정은 로컬 버전을 수정하지 않습니다. 올바른 결과를 얻으려면 예를 들어 Queue을 사용하여 계산 결과를 다시 보내야합니다. 그러나 결과를 결정하는 것이 아니라 결과를 얻는 순서에주의해야합니다.

import numpy as np 
from multiprocessing import Process, Queue 


def single_row(a, b, idx, q): 
    N = len(b[0]) 
    output = np.zeros(N) 
    for j in range(len(b[0])): 
     for k in range(len(a)): 
      output[j] = output[j]+a[k]*b[k][j] 
    q.put((idx, output)) 

# Parallel Matrix Cross Multiplication 


def cross_parallel(a, b): 
    M = len(a) 
    q = Queue() 
    if len(a[0]) == len(b): 
     tasks = [None]*M 
     T = np.array([[0]*len(b[0])]*len(a)) 
     for i in range(M): 
      tasks[i] = Process(target=single_row, args=(a[i], b, i, q)) 
     for task in tasks: 
      task.start() 
     T = [] 
     for i in range(M): 
      T += [q.get()] 
     for task in tasks: 
      task.join() 
     T.sort() 
     T = np.array([v[1] for v in T]) 
     return T 
    else: 
     print('Error: Invalid Matrices') 

# Non-parallel Matrix Cross Multiplication 


def cross_basic(a, b): 
    if len(a[0]) == len(b): 
     T = np.array([[0]*len(b[0])]*len(a)) 
     for i in range(len(a)): 
      for j in range(len(b[0])): 
       for k in range(len(a[0])): 
        T[i][j] = T[i][j]+a[i][k]*b[k][j] 
     return T 
    else: 
     print('Error: Invalid Matrices') 

if __name__ == '__main__': 
    x = [[1, 2, 3, 4], [5, 6, 7, 8], [9, 10, 11, 12], [13, 14, 15, 16]] 
    y = [[1, 2, 3, 4], [5, 6, 7, 8], [9, 10, 11, 12], [13, 14, 15, 16]] 
    print(cross_basic(x, y)) 
    print(cross_parallel(x, y)) 

이 방식을 사용하면 프로세스를 결합하기 전에 큐를 비우하지 않는 경우, 예를 들어, 사용하는 것이 불안 할 수있다, 당신은 당신의 프로그램에서 교착 상태가 발생할 수 있습니다. 더 나은 방법으로 통신/프로세스 수를 관리하는 높은 수준의 프로토콜을 얻으려면 multiprocessing.Pool 또는 concurrents.futures.ProcessPoolExecutor을 보는 것이 좋습니다.

def single_row2(a, b): 
    N = len(b[0]) 
    output = np.zeros(N) 
    for j in range(len(b[0])): 
     for k in range(len(a)): 
      output[j] = output[j]+a[k]*b[k][j] 
    return output 

def cross_parallel2(a, b): 
    import itertools 
    from concurrent.futures import ProcessPoolExecutor 
    executor = ProcessPoolExecutor(max_workers=4) 
    M = len(a) 
    if len(a[0]) == len(b): 
     res = executor.map(single_row2, a, itertools.repeat(b)) 

     return np.array([row for row in res]) 
    else: 
     print('Error: Invalid Matrices')