2017-04-13 5 views
1

Boost.Asio의 스트랜드와 우선 순위가 지정된 래퍼를 동시에 사용하고 싶습니다.Boost Asio에 스트랜드 래퍼와 우선 래퍼를 결합하는 방법

Boost asio priority and strand

boost::asio and Active Object

http://thread.gmane.org/gmane.comp.lib.boost.asio.user/3531

Why do I need strand per connection when using boost::asio?

내가 래퍼 접근 방식을 사용하고 싶습니다 : 내 코드를 작성하기 전에

, 나는 다음과 같은 정보를 읽었습니다 왜냐하면 async_read, a와 같은 다양한 비동기 API를 사용하고 싶기 때문입니다. sync_write 및 async_connect. http://thread.gmane.org/gmane.comp.lib.boost.asio.user/3531에 따르면 우선 순위 래퍼와 스트랜드 래퍼를 결합 할 수 있습니다.

#include <iostream> 
#include <functional> 
#include <queue> 
#include <vector> 
#include <thread> 
#include <mutex> 

#include <boost/asio.hpp> 
#include <boost/optional.hpp> 

#define ENABLE_STRAND 1 
#define ENABLE_PRIORITY 1 

class handler_priority_queue { 
public: 
    template <typename Handler> 
    void add(int priority, Handler&& handler) { 
     std::cout << "add(" << priority << ")" << std::endl; 
     std::lock_guard<std::mutex> g(mtx_); 
     handlers_.emplace(priority, std::forward<Handler>(handler)); 
    } 

    void execute_all() { 
     auto top = [&]() -> boost::optional<queued_handler> { 
      std::lock_guard<std::mutex> g(mtx_); 
      if (handlers_.empty()) return boost::none; 
      boost::optional<queued_handler> opt = handlers_.top(); 
      handlers_.pop(); 
      return opt; 
     }; 
     while (auto h_opt = top()) { 
      h_opt.get().execute(); 
     } 
    } 

    template <typename Handler> 
    class wrapped_handler { 
    public: 
     wrapped_handler(handler_priority_queue& q, int p, Handler h) 
      : queue_(q), priority_(p), handler_(std::move(h)) 
     { 
     } 

     template <typename... Args> 
     void operator()(Args&&... args) { 
      std::cout << "operator() " << std::endl; 
      handler_(std::forward<Args>(args)...); 
     } 

     //private: 
     handler_priority_queue& queue_; 
     int priority_; 
     Handler handler_; 
    }; 

    template <typename Handler> 
    wrapped_handler<Handler> wrap(int priority, Handler&& handler) { 
     return wrapped_handler<Handler>(*this, priority, std::forward<Handler>(handler)); 
    } 

private: 
    class queued_handler { 
    public: 
     template <typename Handler> 
     queued_handler(int p, Handler&& handler) 
      : priority_(p), function_(std::forward<Handler>(handler)) 
     { 
      std::cout << "queued_handler()" << std::endl; 
     } 

     void execute() { 
      std::cout << "execute(" << priority_ << ")" << std::endl; 
      function_(); 
     } 

     friend bool operator<(
      queued_handler const& lhs, 
      queued_handler const & rhs) { 
      return lhs.priority_ < rhs.priority_; 
     } 

    private: 
     int priority_; 
     std::function<void()> function_; 
    }; 

    std::priority_queue<queued_handler> handlers_; 
    std::mutex mtx_; 
}; 

// Custom invocation hook for wrapped handlers. 
template <typename Function, typename Handler> 
void asio_handler_invoke(Function&& f, 
         handler_priority_queue::wrapped_handler<Handler>* h) { 
    std::cout << "asio_handler_invoke " << std::endl; 
    h->queue_.add(h->priority_, std::forward<Function>(f)); 
} 

//---------------------------------------------------------------------- 

int main() { 
    int const num_of_threads = 4; 
    int const num_of_tasks = 5; 

    boost::asio::io_service ios; 
    boost::asio::strand strand(ios); 


    handler_priority_queue pq; 

    for (int i = 0; i != num_of_tasks; ++i) { 
     ios.post(
#if ENABLE_STRAND 
      strand.wrap(
#endif 
#if ENABLE_PRIORITY 
       pq.wrap(
        i, 
#endif 
        [=] { 
         std::cout << "[called] " << i << "," << std::this_thread::get_id() << std::endl; 
        } 
#if ENABLE_PRIORITY 
       ) 
#endif 
#if ENABLE_STRAND 
      ) 
#endif 
     ); 
    } 

    std::vector<std::thread> pool; 
    for (int i = 0; i != num_of_threads; ++i) { 
     pool.emplace_back([&]{ 
       std::cout << "before run_one()" << std::endl; 
       while (ios.run_one()) { 
        std::cout << "before poll_one()" << std::endl; 
        while (ios.poll_one()) 
         ; 
        std::cout << "before execute_all()" << std::endl; 
        pq.execute_all(); 
       } 
      } 
     ); 
    } 
    for (auto& t : pool) t.join(); 
} 

래퍼는 다음 매크로로 사용할 수 있습니다 :

#define ENABLE_STRAND 1 
#define ENABLE_PRIORITY 1 
다음 http://www.boost.org/doc/libs/1_63_0/doc/html/boost_asio/example/cpp03/invocation/prioritised_handlers.cpp

내 코드입니다 :

그래서 나는 다음과 같은 예에 따라 코드를 작성

두 매크로를 모두 사용하면 다음과 같은 결과가 나타납니다. t :

before run_one() 
asio_handler_invoke 
add(0) 
queued_handler() 
before poll_one() 
asio_handler_invoke 
add(1) 
queued_handler() 
asio_handler_invoke 
add(2) 
queued_handler() 
asio_handler_invoke 
add(3) 
queued_handler() 
asio_handler_invoke 
add(4) 
queued_handler() 
before execute_all() 
execute(4) 
execute(3) 
execute(2) 
execute(1) 
execute(0) 
before run_one() 
before run_one() 
before run_one() 

내가

[called] 1,140512649541376 

[called] priority,thread_id 

출력을 가지고하지만 난 그것을하지 않았다 기대합니다.

execute()의 경우 function_()이 호출되었지만 wrapped_handler::operator()이 호출되지 않은 것처럼 보입니다. (이 기능 execute()는 내 코드에 pq.execute_all();에서 호출됩니다.) function_() 호출 한 후에 나는 순서를 추적

void execute() { 
    std::cout << "execute(" << priority_ << ")" << std::endl; 
    function_(); // It is called. 
} 

template <typename Handler> 
class wrapped_handler { 
public: 

    template <typename... Args> 
    void operator()(Args&&... args) { // It is NOT called 
     std::cout << "operator() " << std::endl; 
     handler_(std::forward<Args>(args)...); 
    } 

.

다음 기능

호출된다 함수 bool strand_service::do_dispatch(implementation_type& impl, operation* op)에이어서 https://github.com/boostorg/asio/blob/boost-1.63.0/include/boost/asio/detail/impl/strand_service.ipp#L94

https://github.com/boostorg/asio/blob/boost-1.63.0/include/boost/asio/detail/wrapped_handler.hpp#L191 https://github.com/boostorg/asio/blob/boost-1.63.0/include/boost/asio/detail/wrapped_handler.hpp#L76 https://github.com/boostorg/asio/blob/boost-1.63.0/include/boost/asio/strand.hpp#L158 https://github.com/boostorg/asio/blob/boost-1.63.0/include/boost/asio/detail/impl/strand_service.hpp#L55는 동작 op 호출하지만 다음 줄하는 int 대기열에 가압되지 않는다 :

https://github.com/boostorg/asio/blob/boost-1.63.0/include/boost/asio/detail/impl/strand_service.ipp#L111

function_()이 strand_service로 발송되는 이유가 확실하지 않습니다.나는 가닥 래퍼가 이미 내 코드에 다음과 같은 점에서 unwraped되었다고 생각 : 난 단지 우선 순위 래퍼를 사용하도록 설정 한 경우

template <typename Function, typename Handler> 
void asio_handler_invoke(Function&& f, 
         handler_priority_queue::wrapped_handler<Handler>* h) { 
    std::cout << "asio_handler_invoke " << std::endl; 
    h->queue_.add(h->priority_, std::forward<Function>(f)); 
} 

, 나는 다음과 같은 결과를 얻었다. 예상대로 작동하는 것 같습니다.

before run_one() 
asio_handler_invoke 
add(0) 
queued_handler() 
before poll_one() 
asio_handler_invoke 
add(1) 
queued_handler() 
asio_handler_invoke 
add(2) 
queued_handler() 
asio_handler_invoke 
add(3) 
queued_handler() 
asio_handler_invoke 
add(4) 
queued_handler() 
before execute_all() 
execute(4) 
operator() 
[called] 4,140512649541376 
execute(3) 
operator() 
[called] 3,140512649541376 
execute(2) 
operator() 
[called] 2,140512649541376 
execute(1) 
operator() 
[called] 1,140512649541376 
execute(0) 
operator() 
[called] 0,140512649541376 
before run_one() 
before run_one() 
before run_one() 

스트랜드 래퍼 만 사용 설정하면 다음과 같은 결과가 나타납니다. 예상대로 작동하는 것 같습니다.

before run_one() 
[called] 0,140127385941760 
before poll_one() 
[called] 1,140127385941760 
[called] 2,140127385941760 
[called] 3,140127385941760 
[called] 4,140127385941760 
before execute_all() 
before run_one() 
before run_one() 
before run_one() 

아이디어가 있으십니까?

답변

1

나는이 문제를 해결했다.

function_()이 strand_service로 전달되는 이유가 확실하지 않습니다. 나는 가닥 래퍼가 이미 내 코드에 다음과 같은 점에서 unwraped되었다고 생각 :

template <typename Function, typename Handler> 
void asio_handler_invoke(Function&& f, 
         handler_priority_queue::wrapped_handler<Handler>* h) { 
    std::cout << "asio_handler_invoke " << std::endl; 
    h->queue_.add(h->priority_, std::forward<Function>(f)); 
} 

매개 변수 f 원래 핸들러입니다. 이는 우선 순위 대기열이 랩핑되고 스트랜드 랩핑 처리기를 의미합니다. 스트랜드 래퍼가 밖에 있습니다. 따라서 f을 호출하면 strand_service로 전달됩니다. 이 프로세스는 동일한 strand_service에서 발생하므로 핸들러가 호출되지 않습니다.

// Custom invocation hook for wrapped handlers. 
template <typename Function, typename Handler> 
void asio_handler_invoke(Function&& f, 
         handler_priority_queue::wrapped_handler<Handler>* h) { 
    std::cout << "asio_handler_invoke " << std::endl; 
    h->queue_.add(h->priority_, h->handler_); 
} 

handler_ 클래스 템플릿 wrapped_handler의 멤버 변수이다

다음과 같이 f을 우선 순위 큐에 h->handler_를 추가하는 대신,이 문제를 해결한다. 래핑되지 않은 핸들러를 보유합니다. 여기

는 전체 코드입니다 :

#include <iostream> 
#include <functional> 
#include <queue> 
#include <vector> 
#include <thread> 
#include <mutex> 

#include <boost/asio.hpp> 
#include <boost/optional.hpp> 

#define ENABLE_STRAND 1 
#define ENABLE_PRIORITY 1 

class handler_priority_queue { 
public: 
    template <typename Handler> 
    void add(int priority, Handler&& handler) { 
     std::cout << "add(" << priority << ")" << std::endl; 
     std::lock_guard<std::mutex> g(mtx_); 
     handlers_.emplace(priority, std::forward<Handler>(handler)); 
    } 

    void execute_all() { 
     auto top = [&]() -> boost::optional<queued_handler> { 
      std::lock_guard<std::mutex> g(mtx_); 
      if (handlers_.empty()) return boost::none; 
      boost::optional<queued_handler> opt = handlers_.top(); 
      handlers_.pop(); 
      return opt; 
     }; 
     while (auto h_opt = top()) { 
      h_opt.get().execute(); 
     } 
    } 

    template <typename Handler> 
    class wrapped_handler { 
    public: 
     template <typename HandlerArg> 
     wrapped_handler(handler_priority_queue& q, int p, HandlerArg&& h) 
      : queue_(q), priority_(p), handler_(std::forward<HandlerArg>(h)) 
     { 
     } 

     template <typename... Args> 
     void operator()(Args&&... args) { 
      std::cout << "operator() " << std::endl; 
      handler_(std::forward<Args>(args)...); 
     } 

     //private: 
     handler_priority_queue& queue_; 
     int priority_; 
     Handler handler_; 
    }; 

    template <typename Handler> 
    wrapped_handler<Handler> wrap(int priority, Handler&& handler) { 
     return wrapped_handler<Handler>(*this, priority, std::forward<Handler>(handler)); 
    } 

private: 
    class queued_handler { 
    public: 
     template <typename Handler> 
     queued_handler(int p, Handler&& handler) 
      : priority_(p), function_(std::forward<Handler>(handler)) 
     { 
      std::cout << "queued_handler()" << std::endl; 
     } 

     void execute() { 
      std::cout << "execute(" << priority_ << ")" << std::endl; 
      function_(); 
     } 

     friend bool operator<(
      queued_handler const& lhs, 
      queued_handler const & rhs) { 
      return lhs.priority_ < rhs.priority_; 
     } 

    private: 
     int priority_; 
     std::function<void()> function_; 
    }; 

    std::priority_queue<queued_handler> handlers_; 
    std::mutex mtx_; 
}; 

// Custom invocation hook for wrapped handlers. 
template <typename Function, typename Handler> 
void asio_handler_invoke(Function&& f, 
         handler_priority_queue::wrapped_handler<Handler>* h) { 
    std::cout << "asio_handler_invoke " << std::endl; 
    h->queue_.add(h->priority_, h->handler_); 
} 

//---------------------------------------------------------------------- 

int main() { 
    int const num_of_threads = 4; 
    int const num_of_tasks = 5; 

    boost::asio::io_service ios; 
    boost::asio::strand strand(ios); 


    handler_priority_queue pq; 

    for (int i = 0; i != num_of_tasks; ++i) { 
     ios.post(
#if ENABLE_STRAND 
      strand.wrap(
#endif 
#if ENABLE_PRIORITY 
       pq.wrap(
        i, 
#endif 
        [=] { 
         std::cout << "[called] " << i << "," << std::this_thread::get_id() << std::endl; 
        } 
#if ENABLE_STRAND 
       ) 
#endif 
#if ENABLE_PRIORITY 
      ) 
#endif 
     ); 
    } 

    std::vector<std::thread> pool; 
    for (int i = 0; i != num_of_threads; ++i) { 
     pool.emplace_back([&]{ 
       std::cout << "before run_one()" << std::endl; 
       while (ios.run_one()) { 
        std::cout << "before poll_one()" << std::endl; 
        while (ios.poll_one()) 
         ; 
        std::cout << "before execute_all()" << std::endl; 
        pq.execute_all(); 
       } 
      } 
     ); 
    } 
    for (auto& t : pool) t.join(); 
} 

그리고 여기에 출력 :

before run_one() 
asio_handler_invoke 
add(0) 
queued_handler() 
before poll_one() 
asio_handler_invoke 
add(1) 
queued_handler() 
asio_handler_invoke 
add(2) 
queued_handler() 
asio_handler_invoke 
add(3) 
queued_handler() 
asio_handler_invoke 
add(4) 
queued_handler() 
before execute_all() 
execute(4) 
[called] 4,139903315736320 
execute(3) 
[called] 3,139903315736320 
execute(2) 
[called] 2,139903315736320 
execute(1) 
[called] 1,139903315736320 
execute(0) 
[called] 0,139903315736320 
before run_one() 
before run_one() 
before run_one()