2012-09-17 6 views
2

pthread를 사용하여 C++에서 스레드 풀을 구현하려고합니다. 이 스레드의 소유권을 가져 오는 한 객체에 스레드 관리와 관련된 논리를 캡슐화하려고합니다. 즉,이 오브젝트가 파기 될 때마다 스레드를 중지하고 정리해야합니다.안전하게 C++에서 Posix 스레드 풀을 소멸하는 방법

스레드를 중지하고 제거하는 가장 좋은 방법은 무엇입니까? 좋은 해결책을 멈출 때 시작하고 취소 할 때 분리됩니까? 또는 스레드를 취소하고 조인하는 것이 더 나은가? 내 코드를 참조하십시오, 나는 모든 관련 의견을 주셔서 감사합니다.

WorkerThreadManager.h :

#include "WorkerThreadManagerInterface.h" 
#include "utils/mutex.h" 
#include <queue> 
#include <semaphore.h> 

#include <iostream> 

class WorkerThreadManager : public WorkerThreadManagerInterface 
{ 
    public: 
     WorkerThreadManager(unsigned threadsNumber = 5); 
     virtual ~WorkerThreadManager(); 

     virtual void PushTask(thread_function_t A_threadFun, result_function_t A_resultFun); 
     void SignalResults(); 

    private: 
     static void* WorkerThread(void* A_data); 

     void PushResult(int A_result, result_function_t A_resultFun); 

     typedef boost::function<void()> signal_function_t; 

     struct worker_thread_data_t 
     { 
      worker_thread_data_t(thread_function_t A_threadFun, result_function_t A_resultFun) : 
       threadFun(A_threadFun), resultFun(A_resultFun) {} 
      worker_thread_data_t() {} 

      thread_function_t  threadFun; 
      result_function_t  resultFun; 
     }; 


     const unsigned      m_threadsNumber; 
     pthread_t*       m_pthreads; 

     utils::Mutex      m_tasksMutex; 
     sem_t        m_tasksSem; 
     std::queue<worker_thread_data_t> m_tasks; 

     utils::Mutex      m_resultsMutex; 
     std::queue<signal_function_t>  m_results; 
}; 

WorkerThreadManager.cpp :

#include "WorkerThreadManager.h" 
#include "gateway_log.h" 
#include <pthread.h> 

/** 
* @brief Creates semaphore and starts threads. 
*/ 
WorkerThreadManager::WorkerThreadManager(unsigned threadsNumber) : m_threadsNumber(threadsNumber) 
{ 
    if (sem_init(&m_tasksSem, 0, 0)) 
    { 
     std::stringstream ss; 
     ss << "Semaphore could not be initialized: " << errno << " - " << strerror(errno); 
     LOG_FATAL(ss); 
     throw std::runtime_error(ss.str()); 
    } 

    m_pthreads = new pthread_t[m_threadsNumber]; 
    for (unsigned i = 0; i < m_threadsNumber; ++i) 
    { 
     int rc = pthread_create(&m_pthreads[i], NULL, WorkerThreadManager::WorkerThread, (void*) this); 
     if(rc) 
     { 
      std::stringstream ss; 
      ss << "Pthread could not be started: " << errno << " - " << strerror(errno); 
      LOG_FATAL(ss.str()); 

      if (sem_destroy(&m_tasksSem)) 
       LOG_ERROR("Semaphore could not be destroyed: " << errno << " - " << strerror(errno)); 

      delete [] m_pthreads; 

      throw std::runtime_error(ss.str()); 
     } 
     else 
     { 
      LOG_DEBUG("Worker thread started " << m_pthreads[i]); 

      if(pthread_detach(m_pthreads[i])) 
       LOG_WARN("Failed to detach worker thread"); 
     } 
    } 
} 

/** 
* @brief Cancels all threads, destroys semaphore 
*/ 
WorkerThreadManager::~WorkerThreadManager() 
{ 
    LOG_DEBUG("~WorkerThreadManager()"); 

    for(unsigned i = 0; i < m_threadsNumber; ++i) 
    { 
     if (pthread_cancel(m_pthreads[i])) 
      LOG_ERROR("Worker thread cancellation failed"); 
    } 

    if (sem_destroy(&m_tasksSem)) 
     LOG_ERROR("Semaphore could not be destroyed: " << errno << " - " << strerror(errno)); 

    delete [] m_pthreads; 
} 

/** 
* @brief Adds new task to queue, so worker threads can 
* @param A_threadFun function which will be executed by thread 
* @param A_resultFun function which will be enqueued for calling with return value of A_threadFun as parameter 
*   after worker thread executes A_threadFun. 
*/ 
void WorkerThreadManager::PushTask(thread_function_t A_threadFun, result_function_t A_resultFun) 
{ 
    utils::ScopedLock mutex(m_tasksMutex); 

    worker_thread_data_t data(A_threadFun, A_resultFun); 
    m_tasks.push(data); 
    sem_post(&m_tasksSem); 
    LOG_DEBUG("Task for worker threads has been added to queue"); 
} 

/** 
* @brief Executes result functions (if there are any) to give feedback 
* to classes which requested task execution in worker thread. 
*/ 
void WorkerThreadManager::SignalResults() 
{ 
    while(true) 
    { 
     signal_function_t signal; 
     { 
      utils::ScopedLock mutex(m_resultsMutex); 
      if(m_results.size()) 
      { 
       signal = m_results.front(); 
       m_results.pop(); 
      } 
      else 
       return; 
     } 

     signal(); 
    } 
} 

/** 
* @brief Enqueues result of function executed in worker thread. 
* @param A_result return value of function executed in worker thread 
* @param A_resultFun function which will be enqueued for calling with A_result as a parameter. 
*/ 
void WorkerThreadManager::PushResult(int A_result, result_function_t A_resultFun) 
{ 
    utils::ScopedLock mutex(m_resultsMutex); 

    signal_function_t signal = boost::bind(A_resultFun, A_result); 
    m_results.push(signal); 
} 


/** 
* @brief worker thread body 
* @param A_data pointer to WorkerThreadManager instance 
*/ 
void* WorkerThreadManager::WorkerThread(void* A_data) 
{ 
    WorkerThreadManager* manager = reinterpret_cast<WorkerThreadManager*>(A_data); 
    LOG_DEBUG("Starting worker thread loop"); 
    while (1) 
    { 
     if (-1 == sem_wait(&manager->m_tasksSem) && errno == EINTR) 
     { 
      LOG_DEBUG("sem_wait interrupted with signal"); 
      continue; 
     } 
     LOG_DEBUG("WorkerThread:::::: about to call lock mutex"); 

     worker_thread_data_t data; 
     { 
      utils::ScopedLock mutex(manager->m_tasksMutex); 
      data = manager->m_tasks.front(); 
      manager->m_results.pop(); 
     } 

     LOG_DEBUG("WorkerThread:::::: about to call resultFun"); 
     int result = data.threadFun(); 
     LOG_DEBUG("WorkerThread:::::: after call resultFun"); 
     pthread_testcancel(); 

     manager->PushResult(result, data.resultFun); 
    } 

    return NULL; 
} 

MAIN.CPP :

#include "gateway_log.h" 
#include "WorkerThreadManager.h" 
#include <memory> 

class A { 
public: 
    int Fun() { LOG_DEBUG("Fun before sleep"); sleep(8); LOG_DEBUG("Fun after sleep");return 0; } 
    void Result(int a) { LOG_DEBUG("Result: " << a); } 
}; 


int main() 
{ 
    sd::auto_ptr<WorkerThreadManager> workerThreadManager = new WorkerThreadManager; 
    A a; 
    workerThreadManager->PushTask(boost::bind(&A::Fun, &a), boost::bind(&A::Result, &a, _1)); 
    sleep(3); 
    LOG_DEBUG("deleting workerThreadManager"); 
    workerThreadManager.reset();     // <<<--- CRASH 
    LOG_DEBUG("deleted workerThreadManager"); 
    sleep(10); 
    LOG_DEBUG("after sleep");  

    return 0; 
} 

here 설명이 코드에 문제가 있음을 유의하시기 바랍니다.

+0

의견으로는 :'pthread_cancel'은 악마입니다. :) 종료를 제어하기가 더 어려워지고 구현에 따라 C++와 잘 작동하지 않을 수도 있습니다. http://stackoverflow.com/questions/4760687/cancelling-a-thread-using-pthread-cancel-good-practice-or-bad –

+1

스레드 풀의 일부인 스레드를 종료하는 가장 좋은 방법은 대기열에 넣는 것입니다 "죽음의 일". 스레드가 대기열에서 작업을 가져올 때 "죽은 일"인지 여부를 확인한 후 종료합니다. –

+0

오 예, auto_ptr 초기화에서 "new WorkerThreadManager"가 누락되었습니다. 사과드립니다. 이 게시물을 준비하려고 할 때 응용 프로그램에서 코드를 복사하여 붙여 넣었습니다. 또한 일부 테스트 코드에서 예제를 정리 했으므로 backtrace에서 볼 수있는 행 번호는 붙여 넣은 코드와 정확히 같지 않을 수 있습니다. – Marcin

답변

2

안전 중지와 관련하여 : 나는 pthread_join을 선호합니다. pthread_cancel을 사용하지 않습니다. 특별한 중지 메시지를 사용하고 있습니다.하지만 이벤트 구동 스레드 (메시지의 일부 대기열이있는 평균 스레드)가 항상 있습니다. 스레드가 exit-message이되면 루프가 중지 된 후 조인은 내 main 코드로 돌아갑니다.

코드와 관련하여 - 단일 스레드를 캡슐화하는 class Thread을 생성하는 것이 좋습니다. 풀은 Thread 개체를 힙에 생성해야합니다. 이제 배열은 pthread_t입니다. 풀과 스레드 간의 동기화가 필요한 경우 Thread 개체가 삭제되지 않고 풀 소멸자를 종료 할 수 없습니다.

+0

어떻게 깨어야 하는가? 내가 그 (것)들에 가입하고 싶을 때 나의 일꾼 실을 위로? 가장 쉬운 해결책은 아니지만 가장 좋은 방법은 각 작업 스레드에 대해'post_sem'을 한 번 호출하는 것입니다. – Marcin

+0

당신은'exitThread' 플래그를 추가 할 수 있습니다 -'true'로 설정 한 다음 멈출 스레드만큼 많은 수의 세마포어를 게시하십시오. 그러나 이것은 매우 빠른 모양 다음에 좋은 충고 일 뿐이며 너무 심각하지는 않습니다.) – PiotrNycz