2017-12-25 14 views
0

모든 소스에서 데이터를 가져 오는 스레드 (생산자)가 t mSec입니다. 데이터가 수집되고 준비되면 다른 스레드 (소비자)가 데이터를 가져와 처리해야합니다.떨어지는 소비자 생산자

그러나, 스레드가 빠른 보장이 (생산자가 소비자보다 느리게 또는 빠르게하지 않을 수 있습니다

내가 무슨 짓을 :. 소비자를 고려

//Producer 
while(is_enabled_) { 
    std::chrono::high_resolution_clock::time_point t1 = std::chrono::high_resolution_clock::now(); 
    std::unique_lock<std::mutex> lk(mutex_); 
    ready_ = false; 
    //acquiring the data 
    ready_ = true; 
    lk.unlock(); 
    cv_.notify_all(); 
    std::chrono::high_resolution_clock::time_point t2 = std::chrono::high_resolution_clock::now(); 
    auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(t2 - t1).count(); 
    std::this_thread::sleep_for(std::chrono::milliseconds(sleep_milliseconds < duration ? 0 : sleep_milliseconds - duration)); 
} 

모두 않습니다 같은 :

//A consumer 
while(is_enabled_){ 
    std::unique_lock<std::mutex> lk(mutex_); 
    cv_.wait(lk, [this] {return this->ready_; }); 
    //Process the data 
} 

대기열이 없습니다. 마지막으로 획득 한 데이터 만 처리해야하며, 각 데이터는 한 번만 처리해야합니다. 일부 데이터가 수집되어 소비자가 시간을 찾지 못했습니다. e를 처리하려면 데이터가 삭제되고 다른 데이터가 제작자에 의해 덮어 씁니다.

반면에 소비자가 생산자보다 빠르면 소비자는 이전 데이터를 처리하는 대신 새 데이터가 준비 될 때까지 기다려야합니다.

생산자가 새로운 데이터를 생성 할만큼 충분히 빠르지 않은 경우 소비자가 생산자가 만든 동일한 이전 데이터를 사용하고 있다는 문제가 있습니다.

구현이 부족합니다.

+0

가장 최근의 데이터 패킷을 가리키는 글로벌'shared_pointer '가 있습니다. 생산자는 새로운 패킷을 준비하고 그것을 전역 적으로'atomic_store'합니다. Consumer는 그것을'atomic_load' (또는 하나의 소비자 만이 그것을 가져야한다면 널 포인터로'atomic_exchange') 할 것입니다. –

+0

mutex_와 point_cloud_mutex_ 사이, 그리고 cv_와 point_cloud_cv_ 사이의 관계는 무엇입니까? 아무도 설정하지 않을 때 point_cloud_ready_가 true가되기를 기다리는 이유는 무엇입니까? –

+0

받은 오류는 무엇입니까? –

답변

0

당신은 당신의 목표를 달성하기 위해 이런 일을 인해 수 :

글로벌 변수 : 생산자에 대한 std::vector<bool> newData;

: while 루프 전에 소비자에

while(is_enabled_) { 
    std::chrono::high_resolution_clock::time_point t1 = std::chrono::high_resolution_clock::now(); 
    std::unique_lock<std::mutex> lk(mutex_); 
    ready_ = false; 
    //acquiring the data 
    ready_ = true; 
    std::fill(newData.begin(), newData.end(), true); 
    lk.unlock(); 
    cv_.notify_all(); 
    std::chrono::high_resolution_clock::time_point t2 = std::chrono::high_resolution_clock::now(); 
    auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(t2 - t1).count(); 
    std::this_thread::sleep_for(std::chrono::milliseconds(sleep_milliseconds < duration ? 0 : sleep_milliseconds - duration)); 
} 

는 수행

int index; 
std::unique_lock<std::mutex> lk(mutex_); 
index = newData.size(); 
newData.push_back(false); 
lk.unlock(); 

그러면 몸은 이렇게됩니다.

while(is_enabled_){ 
    std::unique_lock<std::mutex> lk(mutex_); 
    if(newData[index]) { 
     cv_.wait(lk, [this] {return this->ready_; }); 
     newData[index] = false; 
    //Process the data 
    }else { 
      lk.unlock(); 
      std::this_thread::sleep_for(std::chrono::milliseconds(50); 
    } 
} 

희망이 있습니다.

+0

여기에는 데이터 경쟁이 있습니다. 생산자가 이러한 내용을 수정하는 동안에도 소비자는 자물쇠 밖의'newData'의 내용에 접근합니다. –

+0

@IgorTandetnik ok 고맙습니다. 고쳐 주셔서 감사합니다. –

+0

이제 데이터를 처리 한 후 소비자 스레드가 뮤텍스를 독점하는 바쁜 대기 루프를 시작합니다. –