2012-02-26 4 views
4

짧은 버전 :lock-free 대기열과 boost :: atomic -이 작업을 수행하고 있습니까?

나는 표준 : here에서 잠금 무료, 단일 생산자, 단일 소비자 큐의 구현에 사용 된 C++ (11)에서 원자를 대체하기 위해 노력하고있어. 이것을 boost::atomic으로 바꾸려면 어떻게해야합니까?

긴 버전 :

나는 작업자 스레드와 함께 우리의 응용 프로그램에서 더 나은 성능을 얻기 위해 노력하고있어. 각 스레드는 자체 작업 큐를 가지고 있습니다. 우리는 각 작업을 큐에서 빼내거나 큐에 넣기 전에 잠금을 사용하여 동기화해야합니다.

그런 다음 허브 스터 (Lock Sutter)의 기사가 잠금 해제 대기열에 있음을 발견했습니다. 이상적인 대체품 인 것 같습니다. 그러나이 코드는 C++ 11의 std::atomic을 사용합니다. 현재로서는이 프로젝트를 소개 할 수 없습니다.

더 많은 인터넷 검색 결과가 this one for Linux (echelon's)this one for Windows (TINESWARE's)과 같이 나타납니다. 둘 다 WinAPI의 InterlockedExchangePointer 및 GCC의 __sync_lock_test_and_set과 같은 플랫폼의 특정 구문을 사용합니다.

Windows & 리눅스 만 지원하면됩니다. #ifdef 님과 같이 갈 수 있습니다. 하지만 boost::atomic이 제공하는 것을 사용하는 것이 더 좋을 것이라고 생각했습니다. Boost Atomic은 아직 공식 Boost 라이브러리에 포함되어 있지 않습니다. 그래서 http://www.chaoticmind.net/~hcb/projects/boost.atomic/에서 소스를 다운로드하고 프로젝트에 포함 파일을 사용합니다.

#pragma once 

#include <boost/atomic.hpp> 

template <typename T> 
class LockFreeQueue 
{ 
private: 
    struct Node 
    { 
     Node(T val) : value(val), next(NULL) { } 
     T value; 
     Node* next; 
    }; 
    Node* first; // for producer only 
    boost::atomic<Node*> divider; // shared 
    boost::atomic<Node*> last; // shared 

public: 
    LockFreeQueue() 
    { 
     first = new Node(T()); 
     divider = first; 
     last= first; 
    } 

    ~LockFreeQueue() 
    { 
     while(first != NULL) // release the list 
     { 
      Node* tmp = first; 
      first = tmp->next; 
      delete tmp; 
     } 
    } 

    void Produce(const T& t) 
    { 
     last.load()->next = new Node(t); // add the new item 
     last = last.load()->next; 
     while(first != divider) // trim unused nodes 
     { 
      Node* tmp = first; 
      first = first->next; 
      delete tmp; 
     } 
    } 

    bool Consume(T& result) 
    { 
     if(divider != last) // if queue is nonempty 
     { 
      result = divider.load()->next->value; // C: copy it back 
      divider = divider.load()->next; 
      return true; // and report success 
     } 
     return false; // else report empty 
    } 
}; 

일부 수정주의하는 것이 :

boost::atomic<Node*> divider; // shared 
boost::atomic<Node*> last; // shared 

last.load()->next = new Node(t); // add the new item 
    last = last.load()->next; 

 result = divider.load()->next->value; // C: copy it back 
     divider = divider.load()->next; 

내가 지금까지 무엇을 얻을

boost :: atomic의 load() (및 암시 적 저장소())가 올바르게 적용 되었습니까? 이것이 Sutter의 원래 C++ 11 lock-free 큐와 동일하다고 말할 수 있습니까?

추신. 그래서 많은 스레드를 연구했지만 아무도 boost :: atomic & lock-free 큐에 대한 예제를 제공하지 않는 것 같습니다.

+0

잠금없는 호넷의 둥지를 파킹하기 전에 작업자 스레드에 더 큰 페이로드를 부여하여 뮤텍스 경합에 소요되는 시간이 덜 중요하게되는 것을 고려하십시오. 예를 들어 데이터 스트림을 처리하는 경우 작업자 스레드로 전달하기 전에 큰 청크로 데이터를 수집하십시오. –

+0

FWIF, Lockfree 라이브러리가 Boost에 수락되었음을 유의하십시오. http://lists.boost.org/boost-announce/2011/08/0331.php http://tim.klingt.org/boost_lockfree/ –

+0

@EmileCormier 나는 이것에 관해서 일할 것이다. – Gant

답변

1

시도해 보셨습니까 Intel Thread Building Blocks'atomic<T>? 크로스 플랫폼 및 무료. 당신의 선형화 포인트가 하나의 운영자가 될 수 있기 때문에

...

단일 생산자/단일 소비자가 문제가 훨씬 쉬워집니다. 제한된 대기열을 허용 할 준비가되면 여전히 쉽게됩니다.

바운드 큐은 히트를 극대화하기 위해 캐시 정렬 메모리 블록을 예약 할 수 있기 때문에 캐시 성능에 이점을 제공합니다.:

#include <vector> 
#include "tbb/atomic.h" 
#include "tbb/cache_aligned_allocator.h"  

template< typename T > 
class SingleProdcuerSingleConsumerBoundedQueue { 
    typedef vector<T, cache_aligned_allocator<T> > queue_type; 

public: 
    BoundedQueue(int capacity): 
     queue(queue_type()) { 
     head = 0; 
     tail = 0; 
     queue.reserve(capacity); 
    } 

    size_t capacity() { 
     return queue.capacity(); 
    } 

    bool try_pop(T& result) { 
     if(tail - head == 0) 
      return false; 
     else { 
      result = queue[head % queue.capacity()]; 
      head.fetch_and_increment(); //linearization point 
      return(true); 
     } 
    } 

    bool try_push(const T& source) { 
     if(tail - head == queue.capacity()) 
      return(false); 
     else { 
      queue[tail % queue.capacity()] = source; 
      tail.fetch_and_increment(); //linearization point 
      return(true); 
     } 
    } 

    ~BoundedQueue() {} 

private: 
    queue_type queue; 
    atomic<int> head; 
    atomic<int> tail; 
}; 
+0

TBB에는 동시 대기열 구현이 이미 제공됩니다. – Puppy

+0

참으로 그것은 않습니다 concurrent_bounded_queue 템플릿 클래스 멋진 솔루션 드롭 것입니다하지만 OP 최적화 된 솔루션을 따라서 나는 여전히 위의 제안을 생각합니다 –

+0

꼬리 % queue.capacity() 꼬리 후 부정적인 결과를 생성하지 않을 것 같아요 구르다. 에반 당신이 서명되지 않은 int로 전환한다면 용량은 MAX_INT의 제수 일 필요가 없을까요? – odinthenerd

0

체크 아웃 문서에서이 boost.atomic ringbuffer example :

#include <boost/atomic.hpp> 

template <typename T, size_t Size> 
class ringbuffer 
{ 
public: 
    ringbuffer() : head_(0), tail_(0) {} 

    bool push(const T & value) 
    { 
     size_t head = head_.load(boost::memory_order_relaxed); 
     size_t next_head = next(head); 
     if (next_head == tail_.load(boost::memory_order_acquire)) 
      return false; 
     ring_[head] = value; 
     head_.store(next_head, boost::memory_order_release); 
     return true; 
    } 

    bool pop(T & value) 
    { 
     size_t tail = tail_.load(boost::memory_order_relaxed); 
     if (tail == head_.load(boost::memory_order_acquire)) 
      return false; 
     value = ring_[tail]; 
     tail_.store(next(tail), boost::memory_order_release); 
     return true; 
    } 

private: 
    size_t next(size_t current) 
    { 
     return (current + 1) % Size; 
    } 

    T ring_[Size]; 
    boost::atomic<size_t> head_, tail_; 
}; 

// How to use  
int main() 
{ 
    ringbuffer<int, 32> r; 

    // try to insert an element 
    if (r.push(42)) { /* succeeded */ } 
    else { /* buffer full */ } 

    // try to retrieve an element 
    int value; 
    if (r.pop(value)) { /* succeeded */ } 
    else { /* buffer empty */ } 
} 

코드의 유일한 제한은 버퍼 길이가 컴파일시에 알려 져야한다는 것이다 (또는 건설 시간에, 당신은 배열을 교체하는 경우에 의해). 버퍼가 늘어나고 축소되는 것을 허용하는 것은 내가 이해하는 한 사소한 것이 아니다.