대기열에 저장된 요소의 컨테이너로 std :: vector를 사용하고 스레딩/동기화에 Boost를 사용하여 다음과 같은 블로킹 대기열 구현을 생각해 냈습니다. 나는 또한 유사한 게시물 here을 언급했다.부스트가있는 C++ 블로킹 큐
template<typename T>
class BlockingQueue
{
public:
explicit BlockingQueue(const std::vector<T>& buf):
buffer(buf)
{}
explicit BlockingQueue(): buffer()
{}
void push(const T& elem);
T pop();
~BlockingQueue()
{}
private:
boost::mutex mutex; // mutex variable
boost::condition_variable_any notEmptyCond; // condition variable, to check whether the queue is empty
std::vector<T> buffer;
};
template<typename T>
void BlockingQueue<T>::push(const T& elem)
{
boost::mutex::scoped_lock lock(mutex);
buffer.push_back(elem);
notEmptyCond.notify_one(); // notifies one of the waiting threads which are blocked on the queue
// assert(!buffer.empty());
}
template<typename T>
T BlockingQueue<T>::pop()
{
boost::mutex::scoped_lock lock(mutex);
notEmptyCond.wait(lock,[&](){ return (buffer.size() > 0); }); // waits for the queue to get filled and for a notification, to resume consuming
T elem = buffer.front();
buffer.erase(buffer.begin());
return elem;
}
나는 두 개의 스레드 (생산자/소비자) 파일에서 하나의 읽기 문자열을했습니다와 BlockingQueue의로 채우기, 다른 하나는 BlockingQueue의에서 문자열을 제거하고이를 인쇄 할 수 있습니다. 이 두 가지 모두 아래에 정의 된 클래스에서 초기화됩니다.
실행 1 :
inserted AZ
inserted yezjAdCeV
inserted icKU
inserted q
inserted b
inserted DRQL
inserted aaOj
inserted CqlNRv
inserted e
inserted XuDemby
inserted rE
inserted YPk
inserted dLd
inserted xb
inserted bSrZdf
inserted sCQiRna
...
실행 4 :
consumed jfRnjSxrw
inserted INdmXSCr
consumed oIDlu
inserted FfXdARGu
consumed tAO
inserted mBq
consumed I
inserted aoXNhP
consumed OOAf
inserted Qoi
consumed wCxJXGWJu
inserted WZGYHluTV
consumed oIFOh
inserted kkIoFF
consumed ecAYyjHh
inserted C
consumed KdrBIixw
inserted Ldeyjtxe
...
내 문제 : 소비자 스레드가
class FileProcessor
{
public:
explicit FileProcessor():bqueue(),inFile("random.txt")
{
rt = boost::thread(boost::bind(&FileVerifier::read, this));
pt1 = boost::thread(boost::bind(&FileVerifier::process, this));
}
volatile ~FileProcessor()
{
rt.interrupt();
pt1.interrupt();
rt.join();
pt1.join();
}
/* Read strings from a file, populate them in the blocking-queue */
void read()
{
std::ifstream file(inFile, std::ios_base::in | std::ios_base::binary);
boost::iostreams::filtering_istream in;
if (file.fail()) {
std::cout << "couldn't open the input file.. please check its name and read permissions\n";
return;
}
try {
in.push(file);
for(std::string inputStr; std::getline(in,inputStr);)
{
bqueue.push(inputStr);
std::cout << "inserted " << inputStr << "\n";
}
}
catch(std::exception& e) {
std::cout << "exception occurred while reading file\n" << e.what() << "\n";
}
}
/* Process the elements (dequeue and print) */
void process()
{
while (true)
{
std::string rstr = bqueue.pop();
std::cout << "consumed " << rstr << "\n";
}
}
private:
boost::mutex mutex;
boost::thread rt;
boost::thread pt1;
BlockingQueue<std::string> bqueue;
std::string inFile; // input file name from where the strings are read
};
나는 (스냅 샷 만 포함) 다음과 같은 출력을 관찰 때로는 대기열의 자원 (대기열에서 빼내고 인쇄 할 수 있음)을 제어 할 수 있으며 때로는 그렇지 않을 수도 있습니다. 왜 이런 일이 일어날 지 모르겠습니다. 대기열의 디자인 결함에 대한 모든 힌트는 크게 감사하겠습니다. 감사!
관찰 : 즉, 예상대로 스레드가 (FileProcessor) 클래스 '의 ctor에서 초기화되지
, 그들은이 BlockingQueue의 리소스에 액세스하고 자신의 읽기/쓰기 작업을 수행 할 동작합니다. 이 동작을위한 변경 사항은 아래 스 니펫을 참조하십시오.
생산자 - 소비자 스레드는 @ n.m이 생산자가 명시 적으로 소비자에게 양보하지 않는다고 언급 했으므로 대체 선회를하지 않습니다. 위의 관찰에 따라, 각각의 출력은 클래스의 ctor 외부에서 초기화
inserted DZxcOw consumed inserted DZxcOw consumed robECjOp robECjOp inserted BaILFsVaA inserted HomURR inserted PVjLPb consumed BaILFsVaA consumed HomURR consumed PVjLPb inserted SHdBVSEyU consumed SHdBVSEyU consumed JaEH inserted JaEH inserted g inserted MwEgOVB inserted qlohoszv consumed g consumed MwEgOVB consumed qlohoszv consumed AsQgq inserted AsQgq inserted tbm inserted iriADeEL inserted Zoxs consumed tbm
아래에 주어진 것과 같은 일이었다.
#include <iostream>
#include <threading/file_processor.h> //has the FileProcessor class declaration
int main()
{
FileProcessor fp; //previously, I had only this statement which called the class constructor, from where the threads were initialized.
boost::thread rt(boost::bind(&FileProcessor::read, &fp));
boost::thread pt1(boost::bind(&FileProcessor::process, &fp));
rt.join();
pt1.join();
return 0;
}
수정 FileProcessor 클래스 (제거 된 ctor에에서 스레드 초기화) 편집
#include <boost/iostreams/filtering_stream.hpp>
#include <threading/blocking_queue.h> //has the BlockingQueue class
using namespace boost::iostreams;
class FileProcessor
{
public:
explicit FileProcessor():bqueue(),inFile("random.txt")
{}
~FileProcessor()
{}
void read()
{
std::ifstream file(inFile, std::ios_base::in | std::ios_base::binary);
filtering_istream in;
if (file.fail()) {
std::cout << "couldn't open the input file.. please check its name and read permissions\n";
return;
}
try {
in.push(file);
for(std::string inputStr; std::getline(in,inputStr);)
{
bqueue.push(inputStr);
std::cout << "inserted " << inputStr << "\n";
}
}
catch(std::exception& e) {
std::cout << "exception occurred while reading file\n" << e.what() << "\n";
}
}
void process()
{
while (true)
{
std::string rstr = bqueue.pop();
std::cout << "consumed " << rstr << "\n";
}
}
private:
BlockingQueue<std::string> bqueue;
std::string inFile; // input file name from where the strings are read
};
: 부정확 한 주석을 제거 "로 전체 파일 내용을 가져옵니다 2017
5월 24일 버퍼 ".
당신이 무엇을 의미하는지 불분명하다. 당신이 관찰 한 것을 설명하십시오. 결론을 내리지 말고, 우리가 무엇을 보는 지 알려주십시오 (프로그램 출력, 디버거 출력 등). [mcve]는 게임 추측에 참여할 필요가없는 다른 사람들이 귀하의 문제를 들여다 볼 수있는 좋은 방법입니다. –
'boost :: concurrent :: sync_queue'를 고려해야합니다. – sbabbi
생산자/소비자는 shared_mutex 및 읽기/쓰기 잠금에 대한 일반적인 시나리오입니다. – didiz