2017-05-05 5 views
0

현재 내부 루프에 결과를 저장해야하는 조건이있는 경우 루프 집합을 병렬화하는 빠르고 안정적인 방법을 찾으려고합니다. 코드는 3D 그리드에서 엄청난 양의 점들을 통과해야합니다. 이 볼륨 내의 일부 지점에 대해 다른 조건 (각도 확인)을 확인해야하며이 조건이 충족되면 밀도를 계산해야합니다. Openmp는 순서대로 출력 된 for 루프를 중첩했습니다.

가장 빠른 방법

지금까지 최대 규모의 루프뿐만 아니라뿐만 아니라 CPU를 많이 사용하는 함수를 호출하는 가장 안쪽 루프 (phiInd)에 대한 #pragma omp parallel for private (x,y,z) collapse(3) 루프에 대한 모든 외부 또는 #pragma omp parallel for했다.

밀도 루프의 내부에 밀도 값을 저장해야합니다. 그런 다음 densityarray는 나중에 별도로 저장됩니다. 내 문제는 지금 내가 설정 한 스레드의 수에 따라 y 밀도 배열이 달라진다는 것입니다. 직렬 버전과 단 하나의 스레드 만있는 openmp 실행은 동일한 결과를 보입니다. 스레드 수를 늘리면 같은 지점에서 결과가 나오지만 그 결과는 직렬 버전과 다릅니다.

나는 #pragma omp for ordered이 있다는 것을 알고 있지만 계산 속도가 너무 느립니다. 내 포인트 (x, y, z)에 따라 검색 결과를 정렬하면서이 루프를 병렬화 할 수 있습니까? 또는 더 명확하게 : 스레드 번호를 늘리면 내 결과가 변경되는 이유는 무엇입니까?

double phipoint, Rpoint, zpoint; 
double phiplane; 
double distphi = 2.0 * M_PI/nPlanes; //set desired distace to phi to assign point to fluxtubeplane 
double* densityarr = new double[max_x_steps * max_y_steps * max_z_steps]; 


for (z = 0; z < max_z_steps; z++) { 
    for (x = 0; x < max_x_steps; x++) { 
     for (y = 0; y < max_y_steps; y++) { 
      double x_center = x * stepSizeGrid - max_x/2; 
      double y_center = y * stepSizeGrid - max_y/2; 
      double z_center = z * stepSizeGrid - max_z/2; 
      cartesianCoordinate* pos = new cartesianCoordinate(x_center, y_center, z_center); 
      linearToroidalCoordinate* tor = linearToroidal(*pos); 
      simpleToroidalCoordinate* stc = simpleToroidal(*pos); 

      phipoint = tor->phi; 
      if (stc->r <= 0.174/*0.175*/) {//check if point is in vessel 

       for (int phiInd = 0; phiInd < nPlanes; ++phiInd) { 
        phiplane = phis[phiInd]; 

        if (abs(phipoint - phiplane) <= distphi) {//find right plane for point 
         Rpoint = tor->R; 
         zpoint = tor->z;     
         densityarr[z * max_y_steps * max_x_steps + x * max_y_steps + y] = TubePlanes[phiInd].getMinDistDensity(Rpoint, zpoint); 

        } 
       } 
      } 

      delete pos, tor, stc; 
     } 
    } 
} 
+0

여기서'phipoint' /'phipane'이 정의되어 있습니다. 'densityarr'는 어떤 타입입니까? – Zulan

+0

그 루프 전에. 'phipoint'와'phipane'은 모두 두 배입니다. 'densityarr'는 double *입니다 .. .. 'double phiplane, phipoint; double * densityarr = new double [max_x_steps * max_y_steps * max_z_steps]; ' – LeBo

답변

0

먼저 병렬 버전의 오류를 해결해야합니다. 공유 변수 phipoint (병렬 외부 루프) 및 phiplane, Rpoint, zpoint (모든 루프 병렬)에 대한 경쟁 조건 작성. 그러한 것들을 비공개로 선언하거나 더 나은 방법으로 선언해야한다. 그렇게하면 코드가 훨씬 쉽게 추론 될 수 있습니다. 이는 병렬 코드에서 매우 중요합니다.

당신이 설명하는 것처럼 외부 루프를 병렬 처리하는 것이 명백하고 매우 가능성이 가장 효율적인 방법입니다. 로드 불균형이 심한 경우 (stc->r <= 0.174가 포인트간에 균등하게 분배되지 않음) schedule(dynamic)을 사용할 수 있습니다.

내부 루프를 병렬 처리하는 것이 필요없는 것 같습니다. 일반적으로 외부 루프는 충분한 병렬 작업을 노출시키지 않고, 경쟁 조건을 가지지 않으며, 종속성을 가지거나, 캐시 문제를 갖지 않는 한 오버 헤드가 적기 때문에 더 나은 효율성을 제공합니다. 그러나 그것을 시도하고 측정하는 것은 가치있는 운동 일 것입니다. 그러나 둘 이상의 phi가 조건을 충족 시키면 densityarr으로 작성하여 경쟁 조건이있을 수 있습니다. 전반적으로 루프가 조금 이상하게 보입니다. 결과적으로 최대 하나 인 densityarr 만 사용하기 때문에 처음 루프를 찾으면 루프를 되돌리고 취소 할 수 있습니다. 이는 직렬 실행을 많이하는 데 도움이되지만 병렬 처리를 방해 할 수 있습니다. 또한 조건을 만족시키는 파이를 찾지 못했거나 포인트가 선박에없는 경우 densityarr의 각 항목은 초기화되지 않은 채로 남아 있습니다. 이는 값이 유효한지 나중에 판단 할 수 없기 때문에 매우 위험 할 수 있습니다. 아니.

일반적으로 사용자가 필요하지 않은 한 new으로 개체를 할당하지 마십시오. pos을 스택에 놓으면 성능이 향상됩니다. (병렬) 루프 내에서 메모리를 할당하는 것은 성능 문제 일 수 있으므로 Toroidal을 얻는 방법을 다시 생각해 볼 수 있습니다.

TubePlanes[phiInd].getMinDistDensity에는 부작용이 없다고 가정합니다. 그렇지 않으면 병렬 처리가 문제가 될 수 있습니다.

+0

내 주를 저장했습니다! 고맙습니다! 내 변수를 로컬로 선언하고 0으로 densityarr을 초기화했습니다. 나의 업보는 아직 집계되지 않았지만 당신의 대답이 내 문제를 해결했습니다. – LeBo

+0

@LeBo 도와 줘서 기뻐요! 답의 왼쪽에있는 체크 표시를 클릭하여 문제를 해결 된 것으로 표시하십시오. – Zulan