2016-10-25 4 views
2

우리 회사는 C++ 11에서 레거시 C 코드의 대부분을 다시 작성하고 있습니다. (또한 C++을 배우는 C 프로그래머입니다.) 메시지 처리기에 대한 조언이 필요합니다.C++의 효율적인 메시지 팩토리 및 핸들러

우리는 분산 시스템을 가지고 있습니다 - 서버 프로세스는 TCP를 통해 압축 된 메시지를 클라이언트 프로세스로 보냅니다.

는 C 코드에서이 이루어지고 있었다 : - 유형/하위 유형 당 하나의 핸들러가 항상 처음 2 개 필드

- call a handler as handler[type](Message *msg) 

- handler creates temporary struct say, tmp_struct to hold the parsed values and .. 

- calls subhandler[type][subtype](tmp_struct) 

있습니다 유형 및 하위 유형에 따라 메시지를 구문 분석합니다.

C++ 11 및 다중 스레드 환경으로 이동. 내가 가진 기본 아이디어는 다음과 같습니다.

1) 각 유형/유형 유형 조합에 대해 프로세서 객체를 등록하십시오. 이것은 실제로 벡터의 벡터
인 - 벡터 < 벡터>

class MsgProcessor { 

    // Factory function 
    virtual Message *create(); 
    virtual Handler(Message *msg) 
} 

이것은 벡터의 벡터로 조회를하여 상기 프로세서를 받기 다른 메시지 프로세서

class AMsgProcessor : public MsgProcessor { 

     Message *create() override(); 
     handler(Message *msg); 
} 

2

) 상속 될 . 오버로드 된 create() 팩토리 함수를 사용하여 메시지를 가져옵니다. 실제 메시지와 구문 분석 된 값을 메시지 안에 보관할 수 있습니다.

3) 이제 해킹 비트가 발생합니다.이 메시지는 무거운 처리를 위해 다른 스레드로 보내야합니다. 벡터를 다시 검색하지 않으려면 메시지 안에 proc에 대한 포인터를 추가해야합니다.

class Message { 
    const MsgProcessor *proc; // set to processor, 
           // which we got from the first lookup 
           // to get factory function. 
}; 

그래서 다른 스레드, 단지

Message->proc->Handler(Message *); 

이 나쁜 보이지만, 희망,이 공장에서 별도의 메시지 핸들러에 도움이 될 것입니다을 할 것입니다. 여러 유형/하위 유형이 동일한 Message를 작성하려고하지만 다르게 처리해야하는 경우입니다.

나는 이것에 대해 검색하고 건너 온되었다

http://www.drdobbs.com/cpp/message-handling-without-dependencies/184429055?pgno=1

그것은 완전히 핸들러에서 메시지를 분리하는 방법을 제공합니다. 그러나 위의 단순한 계획이 수용 가능한 디자인으로 간주되는지 아닌지 궁금합니다. 또한 내가 원하는 것을 성취하는 잘못된 방법일까요?

효율성과 마찬가지로 속도는이 응용 프로그램에서 가장 중요한 요구 사항입니다. 이미 우리는 몇 가지 메모리 Jumbs => 2 벡터 + 가상 함수를 호출하여 메시지를 작성하고 있습니다. 처리기에 도착하는 데 2 ​​가지 견해가 있는데, 캐싱의 관점에서는 좋지 않습니다.

+0

불쾌감을 느끼지 않습니다. 사용 사례를 명확하게 설명하지 않았고 디자인에 빨리 뛰어 들었을 것입니다. 적어도 어떤 종류의 부하에서 성능에 대한 기대치를 충족시켜야합니까? 이것에 대한 간단한 글을 쓰고 싶니? – DAG

+0

로드가 의미하는 바를 모르겠습니다. 우리 시스템에서는 초당 200,000 개의 메시지를 처리해야합니다.실제 제품 세부 사항에 들어 가지 않고. 서버는 짧은 구성 메시지를 클라이언트에 전송합니다. 구성 메시지에는 유형, 부속 유형 및 데이터가 있습니다. 큰 번호가 있습니다. 유형/하위 유형 조합의 - 약 1000라고 말합니다. 이들은 다른 번호로 소비됩니다. 모듈의. 따라서 모든 모듈이 메시지를 등록하고 수신 할 수 있도록 깨끗한 ​​인터페이스를 갖는 것이 중요합니다. – MGH

+0

언제 'create'메서드를 호출 할 것으로 기대합니까? 그리고 반환 된 '메시지 *'는 무엇입니까? 파싱 ​​된 데이터로 채워질 빈 메시지일까요? – Arunmu

답변

1

귀하의 요구 사항이 분명하지는 않지만 귀하가 찾고자하는 디자인을 가지고 있다고 생각합니다.

본격적인 예를 보려면 http://coliru.stacked-crooked.com/a/f7f9d5e7d57e6261을 확인하십시오.

그것은 다음과 같은 한 구성 요소 : 메시지 프로세서 IMessageProcessor에 대한

  1. 인터페이스 클래스입니다.
  2. 메시지를 나타내는 기본 클래스. Message
  3. (Type, Subtype) 쌍에 해당하는 메시지 프로세서를 저장하기위한 기본적으로 하나의 등록 클래스입니다. Registrator. unordered_map에 매핑을 저장합니다. 더 나은 성능을 위해 약간 조정할 수도 있습니다. 노출 된 모든 API는 Registrator이며 std::mutex으로 보호됩니다.
  4. MessageProcessor의 구체적인 구현. 이 경우 AMsgProcessorBMsgProcessor입니다.
  5. simulate은 모두 어떻게 맞는지 보여줍니다.

뿐만 아니라 여기에 코드를 붙여 넣기 : 혼란의 주요 소스가 여기에서도 시스템의 구조를 알 수 있기 때문에 것으로 보인다 1

/* 
* http://stackoverflow.com/questions/40230555/efficient-message-factory-and-handler-in-c 
*/ 

#include <iostream> 
#include <vector> 
#include <tuple> 
#include <mutex> 
#include <memory> 
#include <cassert> 
#include <unordered_map> 

class Message; 

class IMessageProcessor 
{ 
public: 
    virtual Message* create() = 0; 
    virtual void handle_message(Message*) = 0; 
    virtual ~IMessageProcessor() {}; 
}; 

/* 
* Base message class 
*/ 
class Message 
{ 
public: 
    virtual void populate() = 0; 
    virtual ~Message() {}; 
}; 

using Type = int; 
using SubType = int; 
using TypeCombo = std::pair<Type, SubType>; 
using IMsgProcUptr = std::unique_ptr<IMessageProcessor>; 

/* 
* Registrator class maintains all the registrations in an 
* unordered_map. 
* This class owns the MessageProcessor instance inside the 
* unordered_map. 
*/ 
class Registrator 
{ 
public: 
    static Registrator* instance(); 

    // Diable other types of construction 
    Registrator(const Registrator&) = delete; 
    void operator=(const Registrator&) = delete; 

public: 
    // TypeCombo assumed to be cheap to copy 
    template <typename ProcT, typename... Args> 
    std::pair<bool, IMsgProcUptr> register_proc(TypeCombo typ, Args&&... args) 
    { 
    auto proc = std::make_unique<ProcT>(std::forward<Args>(args)...); 
    bool ok; 
    { 
     std::lock_guard<std::mutex> _(lock_); 
     std::tie(std::ignore, ok) = registrations_.insert(std::make_pair(typ, std::move(proc))); 
    } 
    return (ok == true) ? std::make_pair(true, nullptr) : 
          // Return the heap allocated instance back 
          // to the caller if the insert failed. 
          // The caller now owns the Processor 
          std::make_pair(false, std::move(proc)); 
    } 

    // Get the processor corresponding to TypeCombo 
    // IMessageProcessor passed is non-owning pointer 
    // i.e the caller SHOULD not delete it or own it 
    std::pair<bool, IMessageProcessor*> processor(TypeCombo typ) 
    { 
    std::lock_guard<std::mutex> _(lock_); 

    auto fitr = registrations_.find(typ); 
    if (fitr == registrations_.end()) { 
     return std::make_pair(false, nullptr); 
    } 
    return std::make_pair(true, fitr->second.get()); 
    } 

    // TypeCombo assumed to be cheap to copy 
    bool is_type_used(TypeCombo typ) 
    { 
    std::lock_guard<std::mutex> _(lock_); 
    return registrations_.find(typ) != registrations_.end(); 
    } 

    bool deregister_proc(TypeCombo typ) 
    { 
    std::lock_guard<std::mutex> _(lock_); 
    return registrations_.erase(typ) == 1; 
    } 

private: 
    Registrator() = default; 

private: 
    std::mutex lock_; 
    /* 
    * Should be replaced with a concurrent map if at all this 
    * data structure is the main contention point (which I find 
    * very unlikely). 
    */ 
    struct HashTypeCombo 
    { 
    public: 
    std::size_t operator()(const TypeCombo& typ) const noexcept 
    { 
     return std::hash<decltype(typ.first)>()(typ.first)^
      std::hash<decltype(typ.second)>()(typ.second); 
    } 
    }; 

    std::unordered_map<TypeCombo, IMsgProcUptr, HashTypeCombo> registrations_; 
}; 

Registrator* Registrator::instance() 
{ 
    static Registrator inst; 
    return &inst; 
    /* 
    * OR some other DCLP based instance creation 
    * if lifetime or creation of static is an issue 
    */ 
} 


// Define some message processors 

class AMsgProcessor final : public IMessageProcessor 
{ 
public: 
    class AMsg final : public Message 
    { 
    public: 
    void populate() override { 
     std::cout << "Working on AMsg\n"; 
    } 

    AMsg() = default; 
    ~AMsg() = default; 
    }; 

    Message* create() override 
    { 
    std::unique_ptr<AMsg> ptr(new AMsg); 
    return ptr.release(); 
    } 

    void handle_message(Message* msg) override 
    { 
    assert (msg); 
    auto my_msg = static_cast<AMsg*>(msg); 

    //.... process my_msg ? 
    //.. probably being called in some other thread 
    // Who owns the msg ?? 
    (void)my_msg; // only for suppressing warning 

    delete my_msg; 

    return; 
    } 

    ~AMsgProcessor(); 
}; 

AMsgProcessor::~AMsgProcessor() 
{ 
} 

class BMsgProcessor final : public IMessageProcessor 
{ 
public: 
    class BMsg final : public Message 
    { 
    public: 
    void populate() override { 
     std::cout << "Working on BMsg\n"; 
    } 

    BMsg() = default; 
    ~BMsg() = default; 
    }; 

    Message* create() override 
    { 
    std::unique_ptr<BMsg> ptr(new BMsg); 
    return ptr.release(); 
    } 

    void handle_message(Message* msg) override 
    { 
    assert (msg); 
    auto my_msg = static_cast<BMsg*>(msg); 

    //.... process my_msg ? 
    //.. probably being called in some other thread 
    //Who owns the msg ?? 
    (void)my_msg; // only for suppressing warning 

    delete my_msg; 

    return; 
    } 

    ~BMsgProcessor(); 
}; 

BMsgProcessor::~BMsgProcessor() 
{ 
} 


TypeCombo read_from_network() 
{ 
    return {1, 2}; 
} 


struct ParsedData { 
}; 

Message* populate_message(Message* msg, ParsedData& pdata) 
{ 
    // Do something with the message 
    // Calling a dummy populate method now 
    msg->populate(); 
    (void)pdata; 
    return msg; 
} 

void simulate() 
{ 
    TypeCombo typ = read_from_network(); 
    bool ok; 
    IMessageProcessor* proc = nullptr; 

    std::tie(ok, proc) = Registrator::instance()->processor(typ); 
    if (!ok) { 
    std::cerr << "FATAL!!!" << std::endl; 
    return; 
    } 

    ParsedData parsed_data; 
    //..... populate parsed_data here .... 

    proc->handle_message(populate_message(proc->create(), parsed_data)); 
    return; 
} 


int main() { 

    /* 
    * TODO: Not making use or checking the return types after calling register 
    * its a must in production code!! 
    */ 
    // Register AMsgProcessor 
    Registrator::instance()->register_proc<AMsgProcessor>(std::make_pair(1, 1)); 
    Registrator::instance()->register_proc<BMsgProcessor>(std::make_pair(1, 2)); 

    simulate(); 

    return 0; 
} 

UPDATE.

모든 자기 존중 이벤트 시스템 아키텍처는 다음과 같을 것이다 :

  1. 소켓 기술자에 스레드 폴링의 풀.
  2. 타이머 관련 이벤트를 처리하기위한 스레드 풀입니다.
  3. 긴 차단 작업을 수행하는 스레드 수가 비교적 적습니다 (응용 프로그램에 따라 다름).

따라서, 귀하의 경우 :

  1. 당신은 epoll_wait 또는 select 또는 poll을하고있는 스레드에서 네트워크 이벤트를 얻을 것이다.
  2. 패킷을 완전히 읽고 Registrator::get_processor 호출을 사용하여 프로세서를 가져옵니다. 참고 : get_processor 내부에서 unordered_map이 수정되지 않는다고 보장 할 수 있다면 잠금없이 호출 할 수 있습니다. 즉 이벤트 수신을 시작하면 새로운 삽입이 이루어지지 않습니다.
  3. 얻은 프로세서를 사용하여 Message을 가져 와서 채울 수 있습니다.
  4. 이제는 내가 원하는 방식을 잘 모르겠다. 현재 epoll_wait을 수행중인 스레드 즉, 현재 스레드 (즉, epoll_wait)에서 handle_message을 호출하거나 대기열을 수신하는 스레드에 작업 (프로세서 및 메시지)을 게시하여 다른 스레드로 전달할 수있는 processor이 있습니다.
+0

답변 해 주셔서 감사합니다. 그 이유는 내가 프로세서에 대한 포인터를 가지고 있었기 때문이며, 나는이 질문에서 이것을 언급했다. 우리는 다른 스레드에서 핸들러를 조정할 필요가있다. 수신 및 구문 분석 후 메시지는 처리를 위해 다른 스레드로 전송됩니다. 우리는 모든 스레드가 메시지를 수신하고 처리 할 수있는 설계를 할 수 있었지만 일종의 잠금이 필요했습니다. 따라서 우리는 그것을 파이프 라인으로 설계했습니다. 메시지에서 proc를 가리키는 것이 좋다고 생각하십니까? – MGH

+0

이것은 프로세서가 메시지를보고 inturn이 프로세서를보아야하는 순환 의존성의 종류를 만듭니다. 아마도 전방 선언으로 해결되었을 것입니다. 내 원래 코드에서는 각 모듈에서 프로세서를 인스턴스화했습니다. IMessageProcessor와 같은 프로세서의 기본 클래스는 등록자에게 자신을 추가합니다. 등록/등록 취소는 신규/삭제로 수행됩니다. 이것은 파생 클래스에 대한 생성자 인수를 다루지 않아도되었습니다. 가변적 인 템플릿 매개 변수를 사용하여 해결했습니다. – MGH

+0

내 계획에있어서의 단점. 내가 생각하기에, 등록시의 에러는 리턴 타입이 아닌 예외로 처리되어야한다는 것입니다. 두 가지 계획 모두에 대해 의견이 있으십니까? – MGH