일부 Paho- 기반 클라이언트가 Vert.x MQTT 서버로 작업하려고합니다. 내 수신 클라이언트가 구독하는 테스트 주제에 게시하려고합니다. 내 고객 게시자가 내 고객에게 메시지를 전달하는 데 문제가 있습니다.Vert.x MQTT 서버로부터 메시지를 수신 할 수 없습니다.
인터넷에서 보았던 훌륭한 예제를 사용하여 MQTT Broker를 작성했습니다. Vert.x MQTT 브로커 코드에 대한 코드는 다음과 같습니다 :
public class MQTTBroker
{
public MQTTBroker()
{
MqttServerOptions opts = new MqttServerOptions();
opts.setHost("localhost");
opts.setPort(1883);
MqttServer server = MqttServer.create(Vertx.vertx(),opts);
server.endpointHandler(endpoint -> {
System.out.println("MQTT client [" + endpoint.clientIdentifier() + "] request to connect, clean session = " + endpoint.isCleanSession());
endpoint.accept(false);
if("Test_Send".equals(endpoint.clientIdentifier()))
{
doPublish(endpoint);
handleClientDisconnect(endpoint);
}
else
{
handleSubscription(endpoint);
handleUnsubscription(endpoint);
handleClientDisconnect(endpoint);
}
}).listen(ar -> {
if (ar.succeeded()) {
System.out.println("MQTT server is listening on port " + ar.result().actualPort());
} else {
System.out.println("Error on starting the server");
ar.cause().printStackTrace();
}
});
}
protected void handleSubscription(MqttEndpoint endpoint) {
endpoint.subscribeHandler(subscribe -> {
List grantedQosLevels = new ArrayList < >();
for (MqttTopicSubscription s: subscribe.topicSubscriptions()) {
System.out.println("Subscription for " + s.topicName() + " with QoS " + s.qualityOfService());
grantedQosLevels.add(s.qualityOfService());
}
endpoint.subscribeAcknowledge(subscribe.messageId(), grantedQosLevels);
});
}
protected void handleUnsubscription(MqttEndpoint endpoint) {
endpoint.unsubscribeHandler(unsubscribe -> {
for (String t: unsubscribe.topics()) {
System.out.println("Unsubscription for " + t);
}
endpoint.unsubscribeAcknowledge(unsubscribe.messageId());
});
}
protected void publishHandler(MqttEndpoint endpoint) {
endpoint.publishHandler(message -> {
endpoint.publishAcknowledge(message.messageId());
}).publishReleaseHandler(messageId -> {
endpoint.publishComplete(messageId);
});
}
protected void handleClientDisconnect(MqttEndpoint endpoint) {
endpoint.disconnectHandler(h -> {
System.out.println("The remote client has closed the connection.");
});
}
protected void doPublish(MqttEndpoint endpoint) {
// just as example, publish a message with QoS level 2
endpoint.publish("Test_Topic",
Buffer.buffer("Hello from the Vert.x MQTT server"),
MqttQoS.EXACTLY_ONCE,
false,
false);
publishHandler(endpoint);
// specifing handlers for handling QoS 1 and 2
endpoint.publishAcknowledgeHandler(messageId -> {
System.out.println("Received ack for message = " + messageId);
}).publishReceivedHandler(messageId -> {
endpoint.publishRelease(messageId);
}).publishCompleteHandler(messageId -> {
System.out.println("Received ack for message = " + messageId);
});
}
}
나는 그것이 구독 토픽에서 메시지를 수신하기로하는 PAHO 클라이언트를 가지고있다. 그 코드는 다음과 같습니다 : 그들은 (그리고 것입니다) 수 있지만 내 응용 프로그램을 배포하면
public class Receiver implements MqttCallback
{
public Receiver()
{
MqttClient client = null;
try
{
MemoryPersistence persist = new MemoryPersistence();
client = new MqttClient("tcp://localhost:1883",persist);
client.setCallback(this);
client.connect();
client.subscribe("Test_Topic");
System.out.println("The receiver is initialized.");
}
catch(Exception exe)
{
exe.printStackTrace();
}
}
@Override
public void connectionLost(Throwable arg0)
{
System.out.println("Connection is lost!");
}
@Override
public void deliveryComplete(IMqttDeliveryToken arg0)
{
System.out.println("Delivered!");
}
@Override
public void messageArrived(String theStr, MqttMessage theMsg)
{
System.out.println("Message from: "+theStr);
try
{
String str = new String(theMsg.getPayload());
System.out.println("Message is: "+str);
}
catch(Exception exe)
{
exe.printStackTrace();
}
}
}
가입자와 게시자는 브로커에서 원격 지역입니다. 내가 게시하는 데 사용하는 Paho- 기반 코드는 다음과 같습니다 :합니다 (doPublish 방법에서와 같이) 그것이 있어야하지만 나는 브로커 (테스트 용)는 하드 - 코딩 된 문자열을 게시하고
public void publish(String theMsg)
{
MqttClient nwclient = null;
try
{
ServConfigurator serv = ServConfigurator.getInstance();
MemoryPersistence persist = new MemoryPersistence();
String url = "tcp://localhost:1883";
nwclient = new MqttClient(url,"Test_Send",persist);
MqttConnectOptions option = new MqttConnectOptions();
option.setCleanSession(true);
nwclient.connect(option);
MqttMessage message = new MqttMessage(theMsg.getBytes());
message.setQos(2);
nwclient.publish("Test_Topic",message);
nwclient.disconnect();
nwclient.close();
}
catch(Exception exe)
{
exe.printStackTrace();
}
}
주 내 엔드 포인트 처리기에서 게시자 클라이언트의 게시 시도를 포착하고 있다고 언급했습니다. 아쉽게도 브로커에 구독자와 게시자를 연결하는 데 문제가 없지만 어떤 이유로 메시지가 구독자에게 전달되지 않습니다. 나는 여러 가지를 시도했지만 게시자 클라이언트가 실제로 브로커에 게시하는 동안 브로커의 게시가 어떤 이유로 구독자에게 문자열을 보내지 못합니다.
나는 여기에서 뭔가를 놓치고 있다는 것을 확신하지만 나는 그것이 무엇인지를 생각할 수 없다. 누구든지이 일을하도록 나를 도울 수 있습니까 ???
도움이나 의견을 미리 보내 주셔서 감사합니다.
사실, 당신이 옳다는 것이 밝혀졌습니다. 나는 서버를 조립하고 내가 브로커를 사용하고 있다고 생각했다. 내가 원하는 브로커를 만들 수 있기 때문에 이것은 실제로 좋은 일입니다. 고객에게 지금 메시지를 받고 멋진 맞춤형 인프라를 구축 할 것입니다. 통찰력에 감사드립니다. –
대단히 환영합니다! – ppatierno