2014-10-15 5 views
2

wuclient/wuserver 예제에서 zeromq에 지연 가입자를 구현하려고합니다. 클라이언트가 서버보다 느리므로 서버에서 마지막으로 보낸 메시지 만 가져와야합니다.zeromq에 Lazy pub/sub, 마지막 메시지 만 받음 ​​

지금까지 클라이언트를 분리/연결하여 내가 그렇게 찾은 유일한 방법이지만, 3ms의 주위에 각 연결에서 원치 않는 비용, 물론이 :

server.cxx는

int main() { 
    // Prepare our context and publisher 
    zmq::context_t context (1); 
    zmq::socket_t publisher (context, ZMQ_PUB); 
    publisher.bind("tcp://*:5556"); 
    int counter = 0; 
    while (1) { 
     counter++; 

     // Send message to all subscribers 
     zmq::message_t message(20); 
     snprintf ((char *) message.data(), 20 , 
        "%d", counter); 
     publisher.send(message); 
     std::cout  << counter << std::endl; 
     usleep(100000); 
     } 
     return 0; 
    } 

client.cxx

int main (int argc, char *argv[]) 
{ 
    zmq::context_t context (1); 
    zmq::socket_t subscriber (context, ZMQ_SUB); 
    while(1){ 

    zmq::message_t update; 
    int counter; 

    subscriber.connect("tcp://localhost:5556"); // This call take some milliseconds 
    subscriber.setsockopt(ZMQ_SUBSCRIBE, "", 0); 
    subscriber.recv(&update); 
    subscriber.disconnect("tcp://localhost:5556"); 

    std::istringstream iss(static_cast<char*>(update.data())); 
    iss >> counter; 

    std::cout  << counter << std::endl; 
    usleep(1000000); 
    } 
    return 0; 
} 

서버 출력 : ,691,363 7 ...

클라이언트 출력 : ... I없이 그렇게 워터 마크 (water mark)를 사용하는 것을 시도했다

공동/데코,하지만 작동하지 않습니다. 이런 종류의 코드라도 버퍼가 적어도 수백 개의 메시지에 도달 할 때만 프레임이 삭제되기 시작합니다. :

int high_water_mark = 1; 
socket.setsockopt(ZMQ_RCVHWM, &high_water_mark, sizeof(high_water_mark)); 
socket.setsockopt(ZMQ_SNDHWM, &high_water_mark, sizeof(high_water_mark)); 

도 밀접하게 관련되어 zeromq 데브에 this post이지만, 용액은 마지막 메시지를 선택하는 또 다른 스레드 (사용을 제공하는 것은 허용되지 않는, I는 네트워크를 통해 메시지의 톤을 전송할 수 없다 느릅 후 사용되지

답변

4

용액 (단지 비 멀티 메시지) 등이 ZMQ_CONFLATE 사용하는 것이다.

client.cxx을

#include <zmq.hpp> 
#include <iostream> 
#include <sstream> 
#include <unistd.h> 

int main (int argc, char *argv[]) 
{ 
    zmq::context_t context (1); 

    zmq::socket_t subscriber (context, ZMQ_SUB); 

    int conflate = 1; 
    subscriber.setsockopt(ZMQ_CONFLATE, &conflate, sizeof(conflate)); 
    subscriber.connect("tcp://localhost:5556"); 
    subscriber.setsockopt(ZMQ_SUBSCRIBE, "", 0); 

    while(1){ 

    zmq::message_t update; 
    int counter; 

    subscriber.recv(&update); 

    std::istringstream iss(static_cast<char*>(update.data())); 
    iss >> counter; 

    std::cout  << counter << std::endl; 
    usleep(1000000); 

    } 
    return 0; 
}