2017-05-21 3 views
1

대기열에 저장된 요소의 컨테이너로 std :: vector를 사용하고 스레딩/동기화에 Boost를 사용하여 다음과 같은 블로킹 대기열 구현을 생각해 냈습니다. 나는 또한 유사한 게시물 here을 언급했다.부스트가있는 C++ 블로킹 큐

template<typename T> 
class BlockingQueue 
{ 
public: 
    explicit BlockingQueue(const std::vector<T>& buf): 
    buffer(buf) 
    {} 
    explicit BlockingQueue(): buffer() 
    {} 
    void push(const T& elem); 
    T pop(); 
    ~BlockingQueue() 
    {} 

private: 
    boost::mutex mutex;        // mutex variable 
    boost::condition_variable_any notEmptyCond;  // condition variable, to check whether the queue is empty 
    std::vector<T> buffer; 
}; 

template<typename T> 
void BlockingQueue<T>::push(const T& elem) 
{ 
    boost::mutex::scoped_lock lock(mutex); 
    buffer.push_back(elem); 
    notEmptyCond.notify_one();      // notifies one of the waiting threads which are blocked on the queue 
    // assert(!buffer.empty()); 
} 

template<typename T> 
T BlockingQueue<T>::pop() 
{ 
    boost::mutex::scoped_lock lock(mutex); 
    notEmptyCond.wait(lock,[&](){ return (buffer.size() > 0); }); // waits for the queue to get filled and for a notification, to resume consuming 
    T elem = buffer.front(); 
    buffer.erase(buffer.begin()); 
    return elem; 
} 

나는 두 개의 스레드 (생산자/소비자) 파일에서 하나의 읽기 문자열을했습니다와 BlockingQueue의로 채우기, 다른 하나는 BlockingQueue의에서 문자열을 제거하고이를 인쇄 할 수 있습니다. 이 두 가지 모두 아래에 정의 된 클래스에서 초기화됩니다.

실행 1 :

inserted AZ 
inserted yezjAdCeV 
inserted icKU 
inserted q 
inserted b 
inserted DRQL 
inserted aaOj 
inserted CqlNRv 
inserted e 
inserted XuDemby 
inserted rE 
inserted YPk 
inserted dLd 
inserted xb 
inserted bSrZdf 
inserted sCQiRna 
... 

실행 4 :

consumed jfRnjSxrw 
inserted INdmXSCr 
consumed oIDlu 
inserted FfXdARGu 
consumed tAO 
inserted mBq 
consumed I 
inserted aoXNhP 
consumed OOAf 
inserted Qoi 
consumed wCxJXGWJu 
inserted WZGYHluTV 
consumed oIFOh 
inserted kkIoFF 
consumed ecAYyjHh 
inserted C 
consumed KdrBIixw 
inserted Ldeyjtxe 
... 

내 문제 : 소비자 스레드가

class FileProcessor 
{ 
public: 
    explicit FileProcessor():bqueue(),inFile("random.txt") 
    { 
    rt = boost::thread(boost::bind(&FileVerifier::read, this)); 
    pt1 = boost::thread(boost::bind(&FileVerifier::process, this)); 
    } 

    volatile ~FileProcessor() 
    { 
    rt.interrupt(); 
    pt1.interrupt(); 
    rt.join(); 
    pt1.join(); 
    } 

    /* Read strings from a file, populate them in the blocking-queue */ 
    void read() 
    { 
    std::ifstream file(inFile, std::ios_base::in | std::ios_base::binary); 
    boost::iostreams::filtering_istream in; 
    if (file.fail()) { 
     std::cout << "couldn't open the input file.. please check its name and read permissions\n"; 
     return; 
    } 
    try { 
     in.push(file);      
     for(std::string inputStr; std::getline(in,inputStr);) 
     { 
     bqueue.push(inputStr); 
     std::cout << "inserted " << inputStr << "\n"; 
     } 
    } 
    catch(std::exception& e) { 
     std::cout << "exception occurred while reading file\n" << e.what() << "\n"; 
    } 
    } 

    /* Process the elements (dequeue and print) */ 
    void process() 
    { 
    while (true) 
    { 
     std::string rstr = bqueue.pop(); 
     std::cout << "consumed " << rstr << "\n"; 
    } 
    } 

private: 
    boost::mutex mutex; 
    boost::thread rt; 
    boost::thread pt1; 
    BlockingQueue<std::string> bqueue; 
    std::string inFile;  // input file name from where the strings are read 
}; 

나는 (스냅 샷 만 포함) 다음과 같은 출력을 관찰 때로는 대기열의 자원 (대기열에서 빼내고 인쇄 할 수 있음)을 제어 할 수 있으며 때로는 그렇지 않을 수도 있습니다. 왜 이런 일이 일어날 지 모르겠습니다. 대기열의 디자인 결함에 대한 모든 힌트는 크게 감사하겠습니다. 감사!

관찰 : 즉, 예상대로 스레드가 (FileProcessor) 클래스 '의 ctor에서 초기화되지

  1. , 그들은이 BlockingQueue의 리소스에 액세스하고 자신의 읽기/쓰기 작업을 수행 할 동작합니다. 이 동작을위한 변경 사항은 아래 스 니펫을 참조하십시오.

  2. 생산자 - 소비자 스레드는 @ n.m이 생산자가 명시 적으로 소비자에게 양보하지 않는다고 언급 했으므로 대체 선회를하지 않습니다. 위의 관찰에 따라, 각각의 출력은 클래스의 ctor 외부에서 초기화

    inserted DZxcOw 
    consumed inserted DZxcOw 
    consumed robECjOp 
    robECjOp 
    inserted BaILFsVaA 
    inserted HomURR 
    inserted PVjLPb 
    consumed BaILFsVaA 
    consumed HomURR 
    consumed PVjLPb 
    inserted SHdBVSEyU 
    consumed SHdBVSEyU 
    consumed JaEH 
    inserted JaEH 
    inserted g 
    inserted MwEgOVB 
    inserted qlohoszv 
    consumed g 
    consumed MwEgOVB 
    consumed qlohoszv 
    consumed AsQgq 
    inserted AsQgq 
    inserted tbm 
    inserted iriADeEL 
    inserted Zoxs 
    consumed tbm 
    

아래에 주어진 것과 같은 일이었다.

#include <iostream> 
#include <threading/file_processor.h> //has the FileProcessor class declaration 

int main() 
{ 
    FileProcessor fp; //previously, I had only this statement which called the class constructor, from where the threads were initialized. 
    boost::thread rt(boost::bind(&FileProcessor::read, &fp)); 
    boost::thread pt1(boost::bind(&FileProcessor::process, &fp)); 
    rt.join(); 
    pt1.join(); 
    return 0; 
} 

수정 FileProcessor 클래스 (제거 된 ctor에에서 스레드 초기화) 편집

#include <boost/iostreams/filtering_stream.hpp> 
#include <threading/blocking_queue.h> //has the BlockingQueue class 

using namespace boost::iostreams; 

    class FileProcessor 
    { 
    public: 
     explicit FileProcessor():bqueue(),inFile("random.txt") 
     {} 

    ~FileProcessor() 
    {} 

    void read() 
    { 
    std::ifstream file(inFile, std::ios_base::in | std::ios_base::binary); 
    filtering_istream in; 
    if (file.fail()) { 
     std::cout << "couldn't open the input file.. please check its name and read permissions\n"; 
     return; 
    } 
    try { 
     in.push(file); 
     for(std::string inputStr; std::getline(in,inputStr);) 
     { 
     bqueue.push(inputStr); 
     std::cout << "inserted " << inputStr << "\n"; 
     } 
    } 
    catch(std::exception& e) { 
     std::cout << "exception occurred while reading file\n" << e.what() << "\n"; 
    } 
    } 

    void process() 
    { 
    while (true) 
    { 
     std::string rstr = bqueue.pop(); 
     std::cout << "consumed " << rstr << "\n"; 
    } 
    } 

private: 
    BlockingQueue<std::string> bqueue; 
    std::string inFile;  // input file name from where the strings are read 
    }; 

: 부정확 한 주석을 제거 "로 전체 파일 내용을 가져옵니다 2017

5월 24일 버퍼 ".

+1

당신이 무엇을 의미하는지 불분명하다. 당신이 관찰 한 것을 설명하십시오. 결론을 내리지 말고, 우리가 무엇을 보는 지 알려주십시오 (프로그램 출력, 디버거 출력 등). [mcve]는 게임 추측에 참여할 필요가없는 다른 사람들이 귀하의 문제를 들여다 볼 수있는 좋은 방법입니다. –

+1

'boost :: concurrent :: sync_queue'를 고려해야합니다. – sbabbi

+0

생산자/소비자는 shared_mutex 및 읽기/쓰기 잠금에 대한 일반적인 시나리오입니다. – didiz

답변

0

실제로 설계상의 결함은 없으며, OS가 스레드를 예약하는 방법에 대한 결함이 있습니다.

최대 큐 "깊이"(capacity)를 추가하고 큐가 용량에 도달하면 푸시 블록을 만드는 버전입니다. 그런 다음 데모는 완벽한 턴 바이 턴 (turn-by-turn) 소비를 보여주기 위해 1의 용량을 사용합니다 (물론 이것은 차선책이지만 성능면에서는 현저합니다).

내가 할 수 있기 때문에 _any 조건을 정규 조건으로 바꿨습니다.잠시 iostreams를 사용했습니다 (주석 // gets the whole file content into an input stream buffer은 완전히 부정확합니다).

Live On Coliru

#include <boost/thread.hpp> 
#include <deque> 
#include <fstream> 

#include <iostream> 
static boost::mutex s_iomutex; 

template <typename T> class BlockingQueue { 
    public: 
    explicit BlockingQueue(size_t capacity) : _buffer(), _capacity(capacity) { 
     assert(capacity>0); 
    } 

    void push(const T &elem) { 
     boost::unique_lock<boost::mutex> lock(_mutex); 
     _pop_event.wait(lock, [&] { return _buffer.size() < _capacity; }); 
     _buffer.push_back(elem); 
     _push_event.notify_one(); // notifies one of the waiting threads which are blocked on the queue 
     // assert(!_buffer.empty()); 
    } 

    T pop() { 
     boost::unique_lock<boost::mutex> lock(_mutex); 
     _push_event.wait(lock, [&] { return _buffer.size() > 0; }); 

     T elem = _buffer.front(); 
     _buffer.pop_front(); 
     _pop_event.notify_one(); 
     return elem; 
    } 

    private: 
    boost::mutex _mutex; 
    boost::condition_variable _push_event, _pop_event; 
    std::deque<T> _buffer; 
    size_t _capacity; 
}; 

class FileProcessor { 
    public: 
    explicit FileProcessor(size_t capacity = 10) : bqueue(capacity), inFile("random.txt") {} 

    /* Read strings from a file, populate them in the blocking-queue */ 
    void read() { 
     try { 
      std::ifstream file(inFile, std::ios_base::in | std::ios_base::binary); 
      for (std::string inputStr; std::getline(file, inputStr);) { 
       bqueue.push(inputStr); 

       boost::lock_guard<boost::mutex> lock(s_iomutex); 
       std::cout << "inserted " << inputStr << "\n"; 
      } 
     } catch (std::exception &e) { 
      std::cerr << "exception occurred while reading file\n" << e.what() << "\n"; 
     } 
    } 

    /* Process the elements (dequeue and print) */ 
    void process() { 
     while (true) { 
      std::string rstr = bqueue.pop(); 
      boost::lock_guard<boost::mutex> lock(s_iomutex); 
      std::cout << "consumed " << rstr << "\n"; 
     } 
    } 

    private: 
    BlockingQueue<std::string> bqueue; 
    std::string inFile; // input file name from where the strings are read 
}; 

int main() { 
    FileProcessor fp(1); 
    boost::thread rt(boost::bind(&FileProcessor::read, &fp)); 
    boost::thread pt1(boost::bind(&FileProcessor::process, &fp)); 
    rt.join(); 

    pt1.interrupt(); 
    pt1.join(); 
} 

인쇄

inserted 1 15786 
inserted 2 2099 
consumed 1 15786 
consumed 2 2099 
inserted 3 23963 
consumed 3 23963 
inserted 4 6928 
consumed 4 6928 
inserted 5 16279 
consumed 5 16279 
inserted 6 26787 
consumed 6 26787 
inserted 7 13463 
consumed 7 13463 
inserted 8 14099 
consumed 8 14099 
inserted 9 21808 
consumed 9 21808 
inserted 10 22618 
consumed 10 22618 
inserted 11 10618 
consumed 11 10618 
inserted 12 8211 
consumed 12 8211 
inserted 13 32033 
consumed 13 32033 
inserted 14 14512 
consumed 14 14512 
inserted 15 17734 
consumed 15 17734 
inserted 16 3632 
consumed 16 3632 
inserted 17 8265 
consumed 17 8265 
inserted 18 17922 
consumed 18 17922 
inserted 19 15753 
consumed 19 15753 
inserted 20 7474 
consumed 20 7474 
inserted 21 20136 
consumed 21 20136 
inserted 22 12334 
consumed 22 12334 
inserted 23 23299 
consumed 23 23299 
inserted 24 4066 
consumed 24 4066 
inserted 25 5173 
consumed 25 5173 
inserted 26 17640 
consumed 26 17640 
inserted 27 19218 
consumed 27 19218 
inserted 28 26387 
consumed 28 26387 
inserted 29 26357 
consumed 29 26357 
inserted 30 15206 
consumed 30 15206 
inserted 31 28714 
consumed 31 28714 
inserted 32 32648 
consumed 32 32648 
inserted 33 1500 
consumed 33 1500 
inserted 34 20941 
consumed 34 20941 
inserted 35 3838 
consumed 35 3838 
inserted 36 29680 
consumed 36 29680 
inserted 37 24626 
consumed 37 24626 
inserted 38 14824 
consumed 38 14824 
inserted 39 19690 
consumed 39 19690 
inserted 40 27815 
consumed 40 27815 
inserted 41 6760 
consumed 41 6760 
inserted 42 21322 
consumed 42 21322 
inserted 43 17966 
consumed 43 17966 
inserted 44 15292 
consumed 44 15292 
inserted 45 23321 
consumed 45 23321 
inserted 46 7437 
consumed 46 7437 
inserted 47 5444 
consumed 47 5444 
inserted 48 26785 
consumed 48 26785 
inserted 49 22430 
consumed 49 22430 
inserted 50 25417 
consumed 50 25417 
inserted 51 10408 
consumed 51 10408 
inserted 52 32096 
consumed 52 32096 
inserted 53 489 
consumed 53 489 
inserted 54 7083 
consumed 54 7083 
inserted 55 21555 
consumed 55 21555 
inserted 56 3759 
consumed 56 3759 
inserted 57 20811 
consumed 57 20811 
inserted 58 20176 
consumed 58 20176 
inserted 59 31305 
consumed 59 31305 
inserted 60 9894 
consumed 60 9894 
inserted 61 5515 
consumed 61 5515 
inserted 62 9978 
consumed 62 9978 
inserted 63 1981 
consumed 63 1981 
inserted 64 22286 
consumed 64 22286 
inserted 65 11081 
consumed 65 11081 
inserted 66 4392 
consumed 66 4392 
inserted 67 2252 
consumed 67 2252 
inserted 68 16714 
consumed 68 16714 
inserted 69 16003 
consumed 69 16003 
inserted 70 16695 
consumed 70 16695 
inserted 71 11288 
consumed 71 11288 
inserted 72 4788 
consumed 72 4788 
inserted 73 14454 
consumed 73 14454 
inserted 74 29920 
consumed 74 29920 
inserted 75 25154 
consumed 75 25154 
inserted 76 6206 
consumed 76 6206 
inserted 77 14444 
consumed 77 14444 
inserted 78 2921 
consumed 78 2921 
inserted 79 26908 
consumed 79 26908 
inserted 80 24148 
consumed 80 24148 
inserted 81 8487 
consumed 81 8487 
inserted 82 11371 
consumed 82 11371 
inserted 83 31047 
consumed 83 31047 
inserted 84 27749 
consumed 84 27749 
inserted 85 13548 
consumed 85 13548 
inserted 86 13807 
consumed 86 13807 
inserted 87 9411 
consumed 87 9411 
inserted 88 21999 
consumed 88 21999 
inserted 89 24386 
consumed 89 24386 
inserted 90 10190 
consumed 90 10190 
inserted 91 2472 
consumed 91 2472 
inserted 92 17149 
consumed 92 17149 
inserted 93 14288 
consumed 93 14288 
inserted 94 31625 
consumed 94 31625 
inserted 95 4732 
consumed 95 4732 
inserted 96 20273 
consumed 96 20273 
inserted 97 29036 
consumed 97 29036 
inserted 98 4425 
consumed 98 4425 
inserted 99 1563 
consumed 99 1563 
inserted 100 2796 
consumed 100 2796 
inserted 101 24374 
consumed 101 24374 
inserted 102 8151 
consumed 102 8151 
inserted 103 31361 
consumed 103 31361 
inserted 104 22466 
consumed 104 22466 
inserted 105 23365 
consumed 105 23365 
inserted 106 23762 
consumed 106 23762 
inserted 107 3616 
consumed 107 3616 
inserted 108 7711 
consumed 108 7711 
inserted 109 23178 
consumed 109 23178 
inserted 110 18791 
consumed 110 18791 
inserted 111 13371 
consumed 111 13371 
inserted 112 14553 
consumed 112 14553 
inserted 113 32026 
consumed 113 32026 
inserted 114 4567 
consumed 114 4567 
inserted 115 22178 
consumed 115 22178 
inserted 116 23947 
inserted 117 5928 
consumed 116 23947 
consumed 117 5928 
inserted 118 25606 
consumed 118 25606 
inserted 119 5141 
consumed 119 5141 
inserted 120 17681 
consumed 120 17681 
inserted 121 8024 
consumed 121 8024 
inserted 122 9094 
consumed 122 9094 
inserted 123 24878 
consumed 123 24878 
inserted 124 27800 
consumed 124 27800 
inserted 125 10225 
consumed 125 10225 
inserted 126 1157 
consumed 126 1157 
inserted 127 28217 
consumed 127 28217 
inserted 128 15144 
consumed 128 15144 
inserted 129 25692 
consumed 129 25692 
inserted 130 250 
consumed 130 250 
inserted 131 17432 
consumed 131 17432 
inserted 132 10055 
consumed 132 10055 
inserted 133 24279 
consumed 133 24279 
inserted 134 9445 
consumed 134 9445 
inserted 135 4149 
consumed 135 4149 
inserted 136 23240 
consumed 136 23240 
inserted 137 23146 
consumed 137 23146 
inserted 138 8576 
consumed 138 8576 
inserted 139 11469 
consumed 139 11469 
inserted 140 27250 
consumed 140 27250 
inserted 141 12203 
consumed 141 12203 
inserted 142 21730 
consumed 142 21730 
inserted 143 30824 
consumed 143 30824 
inserted 144 11197 
consumed 144 11197 
inserted 145 11076 
consumed 145 11076 
inserted 146 6960 
consumed 146 6960 
inserted 147 7313 
consumed 147 7313 
inserted 148 16701 
consumed 148 16701 
inserted 149 21044 
consumed 149 21044 
inserted 150 9934 
consumed 150 9934 
inserted 151 18562 
consumed 151 18562 
inserted 152 3559 
consumed 152 3559 
inserted 153 5541 
consumed 153 5541 
inserted 154 16024 
consumed 154 16024 
inserted 155 9877 
consumed 155 9877 
inserted 156 18443 
consumed 156 18443 
inserted 157 6312 
consumed 157 6312 
inserted 158 24237 
consumed 158 24237 
inserted 159 27685 
consumed 159 27685 
inserted 160 6154 
consumed 160 6154 
inserted 161 32723 
consumed 161 32723 
inserted 162 8358 
consumed 162 8358 
inserted 163 5518 
consumed 163 5518 
inserted 164 15857 
consumed 164 15857 
inserted 165 26383 
consumed 165 26383 
inserted 166 13179 
consumed 166 13179 
inserted 167 29919 
consumed 167 29919 
inserted 168 5135 
consumed 168 5135 
inserted 169 7147 
consumed 169 7147 
inserted 170 4383 
consumed 170 4383 
inserted 171 13147 
consumed 171 13147 
inserted 172 15658 
consumed 172 15658 
inserted 173 18478 
consumed 173 18478 
inserted 174 29793 
consumed 174 29793 
inserted 175 16003 
consumed 175 16003 
inserted 176 12804 
consumed 176 12804 
inserted 177 25713 
consumed 177 25713 
inserted 178 28108 
consumed 178 28108 
inserted 179 8518 
consumed 179 8518 
inserted 180 9874 
consumed 180 9874 
inserted 181 30731 
consumed 181 30731 
inserted 182 15582 
consumed 182 15582 
inserted 183 12589 
consumed 183 12589 
inserted 184 15839 
consumed 184 15839 
inserted 185 19505 
consumed 185 19505 
inserted 186 20543 
consumed 186 20543 
inserted 187 6331 
consumed 187 6331 
inserted 188 25289 
consumed 188 25289 
inserted 189 14877 
consumed 189 14877 
inserted 190 25571 
consumed 190 25571 
inserted 191 10873 
consumed 191 10873 
inserted 192 13568 
consumed 192 13568 
inserted 193 16319 
consumed 193 16319 
inserted 194 28590 
consumed 194 28590 
inserted 195 22303 
consumed 195 22303 
inserted 196 20685 
consumed 196 20685 
inserted 197 1528 
consumed 197 1528 
inserted 198 5200 
consumed 198 5200 
inserted 199 25689 
consumed 199 25689 
inserted 200 25140 
consumed 200 25140 
+0

안녕하세요. 귀하의 대답과 사용자 n.m의 의견을 통해 차단 대기열에서 달성하고자하는 바에 따라 구현이 변경 될 수 있음을 이해했습니다. 스레드가 클래스 내에 캡슐화 될 때 대기열에 대한 단일 제작자/소비자의 액세스를 교대로 테스트하려고했습니다. 귀하의 접근 방식은 깔끔하게 보입니다. 그 부정확 한 의견을 삭제했습니다. 메모리 맵핑 된 IO를 시도했다. – freax

+0

작업 기반 처리 만 원하는 경우 Asio를 고려하십시오. http://coliru.stacked-crooked.com/a/7e9361f5397c8e8e (다중 스레드 및 단일 스레드 데모) – sehe

+1

오, 여기 FileProcessor 인스턴스 2 개를 사용하여 데모를 계속합니다. 단일 스레드 http://coliru.stacked-crooked.com/a/1cffb328e414a38e – sehe