2016-07-10 4 views
0

첫 번째 버전 인 두 개의 스레드 선택 정렬은 모든 반복마다 새 스레드를 시작합니다.멀티 스레드 정렬 알고리즘을 효율적으로 구현하는 데 필요한 핵심 요소는 무엇입니까? 순진한 구현이 제대로 작동하지 않습니다.

일부 비교 (왼쪽 - 하나 개의 스레드를 마우스 오른쪽 - 두 개의 스레드 버전, 시간 - MS) : 두 개의 스레드가 빠르게 작업

Size: 500  time 1 time 5 
Size: 1000  time 1 time 9 
Size: 1500  time 4 time 12 
Size: 2000  time 5 time 16 
Size: 2500  time 10 time 22 
Size: 3000  time 14 time 26 
Size: 3500  time 19 time 30 
Size: 4000  time 24 time 36 
Size: 4500  time 30 time 43 
Size: 5000  time 37 time 49 
Size: 5500  time 46 time 57 
Size: 6000  time 55 time 66 
Size: 6500  time 63 time 76 
Size: 7000  time 74 time 80 
Size: 7500  time 85 time 92 
Size: 8000  time 96 time 102 
Size: 8500  time 108  time 109 
Size: 9000  time 122  time 124 
Size: 9500  time 135  time 132 
Size: 10000  time 150  time 144 
Size: 10500  time 165  time 156 
Size: 11000  time 181  time 174 
Size: 11500  time 200  time 177 
Size: 12000  time 218  time 195 
Size: 12500  time 235  time 205 
Size: 13000  time 255  time 214 
Size: 13500  time 273  time 226 
Size: 14000  time 296  time 245 

9500 크기의 배열 후. 두 번째 구현에서는 스레드가 한 번 시작됩니다. 그러나 그것은 믿을 수없는 성과를 가지고 있습니다.

내 CPU에는 4 개의 코어가 있습니다.

Size: 0 time 0 time 0 
Size: 50  time 0 time 151 
Size: 100  time 0 time 1276 
Size: 150  time 0 time 2089 
Size: 200  time 0 time 3925 
Size: 250  time 0 time 5303 

코드 :

//one thread 
template<class ItType> 
void selectionSortThreadsHelper2(ItType beg, ItType end) 
{ 
    //sorting element by element 
    for (auto it = beg; it != end; ++it) { 

     ItType middleIt = it + std::distance(it, end)/2; 

     auto search = [&] { return std::min_element(it, middleIt); }; 
     //search 
     std::future<ItType> minFirstHalfResult(std::async(std::launch::async , search)); 

     //wait searching 
     ItType minSecondHalfIt = std::min_element(middleIt, end); 
     ItType minFirstHalfIt = minFirstHalfResult.get(); 

     //swap if 
     ItType minIt = *minFirstHalfIt < *minSecondHalfIt ? minFirstHalfIt : minSecondHalfIt; 
     if (minIt != it) 
      std::iter_swap(minIt, it); 
    } 
} 

//two thread 
template<class ItType> 
void selectionSortThreadsHelper3(ItType beg, ItType end) 
{ 
    bool quit = false; 
    bool readyFlag = false; 
    bool processed = false; 
    std::mutex readyMutex; 
    std::condition_variable readyCondVar; 

    ItType it; 
    ItType middleIt; 
    ItType minFirstHalfIt; 
    auto search = [&]() { 
     while (true) { 
      std::unique_lock<std::mutex> ul(readyMutex); 
      readyCondVar.wait(ul, [&] {return readyFlag; }); 

      if (quit) 
       return; 

      minFirstHalfIt = std::min_element(it, middleIt); 

      processed = true; 

      ul.unlock(); 
      readyCondVar.notify_one(); 
     } 
    }; 

    std::future<void> f(std::async(std::launch::async, search)); 

    //sorting element by element 
    for (it = beg; it != end; ++it) { 

     middleIt = it + std::distance(it, end)/2; 

     //say second thread to start searching 
     { 
      std::lock_guard<std::mutex> lg(readyMutex); 
      readyFlag = true; 
     } 
     readyCondVar.notify_one(); 
     //std::this_thread::yield(); 

     ItType minSecondHalfIt = std::min_element(middleIt, end); 

     //wait second thread 
     { 
      std::unique_lock<std::mutex> ul(readyMutex); 
      readyCondVar.wait(ul, [&] { return processed; }); 
      processed = false; 
      readyFlag = false; 
     } 
     readyCondVar.notify_all(); 

     //swap if 
     ItType minIt = *minFirstHalfIt < *minSecondHalfIt ? minFirstHalfIt : minSecondHalfIt; 
     if (minIt != it) 
      std::iter_swap(minIt, it); 
    } 

    //quit thread 
    { 
     std::lock_guard<std::mutex> lg(readyMutex); 
     readyFlag = true; 
     quit = true; 
    } 
    readyCondVar.notify_all(); 

    f.get(); 
} 
+2

키가 사용 가능한 코어 수와 정확하게 균형을 잡습니다. –

+2

코드에서 스레드를 전혀 사용하지 않습니다. 'std :: async'를 호출하여 미래를 만들고 나서, 미래가 끝나고 결과를 전달할 때까지 기다린다. 그것은 스레드 또는 선물없이 순차적으로 검색을 수행하는 것과 다르지 않습니다. –

+0

아, 문제가 있습니다. 나는 그것을 해결하고 두 개의 스레드 버전이 더 빨리 9000 사이즈 배열로 작동한다. 하지만 두 번째 구현은 무엇입니까? – Stark

답변

2

자신을 작성하지 않고 병렬 종류를 얻을하는 방법에는 여러 가지가 있습니다. 먼저, sort(par, data.begin(), data.end())을 말할 수 실험 병렬 네임 스페이스가 : 네임 스페이스 C++ 17 표준에 병합되고, 그래서 어떤 시점 (https://parallelstl.codeplex.com/)에서 std:: 네임 스페이스에해야한다고 http://en.cppreference.com/w/cpp/experimental/parallelism/existing#sort

. 또한 OpenMP를 기반으로하는 비표준 GNU g ++ 병렬 정렬 구현이 있습니다. https://gcc.gnu.org/onlinedocs/libstdc++/manual/parallel_mode.html

마지막으로 C++ 11에서 사용자 고유의 병렬 정렬을 작성하는 방법을 온라인으로 설명하는 많은 페이지가 있습니다. 검색을 시도하십시오. 다음은 매우 포괄적 인 페이지입니다. https://software.intel.com/en-us/articles/a-parallel-stable-sort-using-c11-for-tbb-cilk-plus-and-openmp

+0

감사합니다.하지만 직접 코딩하는 것은 흥미 롭습니다. 공부 목적으로. 나는 그것이 쉽지 않다는 것을 이해했다 : ( – Stark

1

예 : Windows 스레딩 인터페이스 (이 경우 4 개 이상의 코어가있는 프로세서를위한 4 개의 스레드)를 사용하는 멀티 스레드 상향식 병합 정렬. 배열의 크기에 따라 단일 스레드 병합 정렬보다 약 3 배 빠릅니다. 대개 각 코어의 로컬 L1 및 L2 캐시에서 발생하는 작업 때문입니다. 세마포어는 벤치마킹 목적으로 동시에 모든 스레드를 시작하는 데 사용됩니다. 내 시스템 (Intel 2600K 3.4GHz)에서 단일 스레드 병합 정렬의 경우 1.5 초에 비해 1600 만 개의 32 비트 정수를 정렬하는 데 약 0.5 초가 소요됩니다.

#include <cstdlib> 
#include <ctime> 
#include <iostream> 
#include <windows.h> 

#define SIZE (16*1024*1024)    // must be multiple of 4 

static HANDLE hs0;      // semaphore handles 
static HANDLE hs1; 
static HANDLE hs2; 
static HANDLE hs3; 
static HANDLE ht1;      // thread handles 
static HANDLE ht2; 
static HANDLE ht3; 

static DWORD WINAPI Thread0(LPVOID); // thread functions 
static DWORD WINAPI Thread1(LPVOID); 
static DWORD WINAPI Thread2(LPVOID); 
static DWORD WINAPI Thread3(LPVOID); 

static int *pa;      // pointers to buffers 
static int *pb; 

void BottomUpMergeSort(int a[], int b[], size_t n); 
void BottomUpMerge(int a[], int b[], size_t ll, size_t rr, size_t ee); 
void BottomUpCopy(int a[], int b[], size_t ll, size_t rr); 
size_t GetPassCount(size_t n); 

int main() 
{ 
int *array = new int[SIZE]; 
int *buffer = new int[SIZE]; 
clock_t ctTimeStart;     // clock values 
clock_t ctTimeStop; 
    pa = array; 
    pb = buffer; 
    for(int i = 0; i < SIZE; i++){  // generate pseudo random data 
     int r; 
     r = (((int)((rand()>>4) & 0xff))<< 0); 
     r += (((int)((rand()>>4) & 0xff))<< 8); 
     r += (((int)((rand()>>4) & 0xff))<<16); 
     r += (((int)((rand()>>4) & 0x7f))<<24); 
     array[i] = r; 
    } 

    hs0 = CreateSemaphore(NULL,0,1,NULL); 
    hs1 = CreateSemaphore(NULL,0,1,NULL); 
    hs2 = CreateSemaphore(NULL,0,1,NULL); 
    hs3 = CreateSemaphore(NULL,0,1,NULL); 
    ht1 = CreateThread(NULL, 0, Thread1, 0, 0, 0); 
    ht2 = CreateThread(NULL, 0, Thread2, 0, 0, 0); 
    ht3 = CreateThread(NULL, 0, Thread3, 0, 0, 0); 

    ctTimeStart = clock(); 
    ReleaseSemaphore(hs0, 1, NULL);  // start sorts 
    ReleaseSemaphore(hs1, 1, NULL); 
    ReleaseSemaphore(hs2, 1, NULL); 
    ReleaseSemaphore(hs3, 1, NULL); 
    Thread0((LPVOID)NULL); 
    WaitForSingleObject(ht2, INFINITE); 
    // merge 1st and 2nd halves 
    BottomUpMerge(pb, pa, 0, SIZE>>1, SIZE); 
    ctTimeStop = clock(); 
    std::cout << "Number of ticks " << (ctTimeStop - ctTimeStart) << std::endl; 

    for(int i = 1; i < SIZE; i++){  // check result 
     if(array[i-1] > array[i]){ 
      std::cout << "failed" << std::endl; 
     } 
    } 
    CloseHandle(ht3); 
    CloseHandle(ht2); 
    CloseHandle(ht1); 
    CloseHandle(hs3); 
    CloseHandle(hs2); 
    CloseHandle(hs1); 
    CloseHandle(hs0); 
    delete[] buffer; 
    delete[] array; 
    return 0; 
} 

static DWORD WINAPI Thread0(LPVOID lpvoid) 
{ 
    WaitForSingleObject(hs0, INFINITE); // wait for semaphore 
    // sort 1st quarter 
    BottomUpMergeSort(pa + 0*(SIZE>>2), pb + 0*(SIZE>>2), SIZE>>2); 
    WaitForSingleObject(ht1, INFINITE); // wait for thead 1 
    // merge 1st and 2nd quarter 
    BottomUpMerge(pa + 0*(SIZE>>1), pb + 0*(SIZE>>1), 0, SIZE>>2, SIZE>>1); 
    return 0; 
} 

static DWORD WINAPI Thread1(LPVOID lpvoid) 
{ 
    WaitForSingleObject(hs1, INFINITE); // wait for semaphore 
    // sort 2nd quarter 
    BottomUpMergeSort(pa + 1*(SIZE>>2), pb + 1*(SIZE>>2), SIZE>>2); 
    return 0; 
} 

static DWORD WINAPI Thread2(LPVOID lpvoid) 
{ 
    WaitForSingleObject(hs2, INFINITE); // wait for semaphore 
    // sort 3rd quarter 
    BottomUpMergeSort(pa + 2*(SIZE>>2), pb + 2*(SIZE>>2), SIZE>>2); 
    WaitForSingleObject(ht3, INFINITE); // wait for thread 3 
    // merge 3rd and 4th quarter 
    BottomUpMerge(pa + 1*(SIZE>>1), pb + 1*(SIZE>>1), 0, SIZE>>2, SIZE>>1); 
    return 0; 
} 

static DWORD WINAPI Thread3(LPVOID lpvoid) 
{ 
    WaitForSingleObject(hs3, INFINITE); // wait for semaphore 
    // sort 4th quarter 
    BottomUpMergeSort(pa + 3*(SIZE>>2), pb + 3*(SIZE>>2), SIZE>>2); 
    return 0; 
} 

void BottomUpMergeSort(int a[], int b[], size_t n) 
{ 
size_t s = 1;        // run size 
    if(GetPassCount(n) & 1){    // if odd number of passes 
     for(s = 1; s < n; s += 2)   // swap in place for 1st pass 
      if(a[s] < a[s-1]) 
       std::swap(a[s], a[s-1]); 
     s = 2; 
    } 
    while(s < n){       // while not done 
     size_t ee = 0;      // reset end index 
     while(ee < n){      // merge pairs of runs 
      size_t ll = ee;     // ll = start of left run 
      size_t rr = ll+s;    // rr = start of right run 
      if(rr >= n){     // if only left run 
       rr = n; 
       BottomUpCopy(a, b, ll, rr); // copy left run 
       break;      // end of pass 
      } 
      ee = rr+s;      // ee = end of right run 
      if(ee > n) 
       ee = n; 
      BottomUpMerge(a, b, ll, rr, ee); 
     } 
     std::swap(a, b);     // swap a and b 
     s <<= 1;       // double the run size 
    } 
} 

void BottomUpMerge(int a[], int b[], size_t ll, size_t rr, size_t ee) 
{ 
    size_t o = ll;       // b[]  index 
    size_t l = ll;       // a[] left index 
    size_t r = rr;       // a[] right index 
    while(1){        // merge data 
     if(a[l] <= a[r]){     // if a[l] <= a[r] 
      b[o++] = a[l++];    // copy a[l] 
      if(l < rr)      // if not end of left run 
       continue;     //  continue (back to while) 
      do        // else copy rest of right run 
       b[o++] = a[r++]; 
      while(r < ee); 
      break;       //  and return 
     } else {       // else a[l] > a[r] 
      b[o++] = a[r++];    // copy a[r] 
      if(r < ee)      // if not end of right run 
       continue;     //  continue (back to while) 
      do        // else copy rest of left run 
       b[o++] = a[l++]; 
      while(l < rr); 
      break;       //  and return 
     } 
    } 
} 

void BottomUpCopy(int a[], int b[], size_t ll, size_t rr) 
{ 
    do          // copy left run 
     b[ll] = a[ll]; 
    while(++ll < rr); 
} 

size_t GetPassCount(size_t n)    // return # passes 
{ 
    size_t i = 0; 
    for(size_t s = 1; s < n; s <<= 1) 
     i += 1; 
    return(i); 
}