2016-12-20 3 views
1

Mqtt paho 드라이버와 고민 중입니다 ...ACK가 MQTT의 게시 메시지에 해당한다고 어떻게 알 수 있습니까?

게시를 수신 할 때마다 IMqttDeliveryToken을 사용하여 서버에서 응답을받습니다.

실제 게시 메시지와 비교하려면 IMqttDeliveryToken에서 ID를 검색하기 위해 MqttMessage에 ID를 설정합니다 ...하지만 작동하지 않습니다 ... IMqttDeliveryToken.getMessageId()는 잘못된 ID와 QoS가 0이 아닌 IMqttDeliveryToken.getMessage() 후에 ID를 얻으려고하면 NPE를 반환합니다. 메시지가 전달 된

까지 전달되는 메시지가 반환됩니다

는 자바 독을 읽은 후, 나는 그것이 일반적인 행동입니다 읽어 보시기 바랍니다. 메시지가 전달되면 null이 리턴됩니다.

다른 질문으로 안내합니다. 브로커가 Acknowledgement를 보낸 후에 deliveryComplete() 메소드가 실제로 호출 되었습니까?

QoS의
client.setCallback(new MqttCallback() { 
    @Override 
    public void connectionLost(Throwable thrwbl) { } 

    @Override 
    public void messageArrived(String string, MqttMessage mm) throws Exception { } 

    @Override 
    public void deliveryComplete(IMqttDeliveryToken token) { 
     try { 
      System.out.println("Message ID from getMessageId() method : " + token.getMessageId()); 
      MqttMessage message = token.getMessage(); 
      System.out.println("Message ID from getMessage() method : " + message.getId()); 
     } catch (MqttException ex) { 
      System.out.println(ex); 
     } catch (Exception ex) { 
      System.out.println(ex); 
     } 
    } 
}); 

MqttMessage message = new MqttMessage(); 
message.setId(76); 
message.setPayload("pouet".getBytes()); 
message.setQos(0); 

client.publish("TEST", message); 

0 :

QoS의
Message ID from getMessageId() method : 1 
Message ID from getMessage() method : 76 

1 : 여기

내 코드

Message ID from getMessageId() method : 1 
java.lang.NullPointerException 

답변

3

힘내에서 언급했듯이 MqttMessage.java

/** 
* This is only to be used internally to provide the MQTT id of a message 
* received from the server. Has no effect when publishing messages. 
* @param messageId 
*/ 
public void setId(int messageId) { 
    this.messageId = messageId; 
} 

메시지를 게시하는 동안 사용되는 곳이 없습니다. 이제 getMessageId() 메소드의 Message ID가 왜 발생했는지 이해하려면 다음을 살펴보십시오.

public IMqttDeliveryToken publish(String topic, MqttMessage message, Object userContext, IMqttActionListener callback) throws MqttException, 
      MqttPersistenceException { 
     final String methodName = "publish"; 
     //@TRACE 111=< topic={0} message={1}userContext={1} callback={2} 
     log.fine(CLASS_NAME,methodName,"111", new Object[] {topic, userContext, callback}); 

     //Checks if a topic is valid when publishing a message. 
     MqttTopic.validate(topic, false/*wildcards NOT allowed*/); 

     MqttDeliveryToken token = new MqttDeliveryToken(getClientId()); 
     token.setActionCallback(callback); 
     token.setUserContext(userContext); 
     token.setMessage(message); 
     token.internalTok.setTopics(new String[] {topic}); 

     MqttPublish pubMsg = new MqttPublish(topic, message); 
     comms.sendNoWait(pubMsg, token); 

     //@TRACE 112=< 
     log.fine(CLASS_NAME,methodName,"112"); 

     return token; 
    } 

MqttDeliveryToken 내부적 MqttWireMessage.java에 다단계 연장 값은 기본적으로 0으로 설정되어 생성 MqttPublish 인스턴스를 발행 여기서 특기 할 메시지 ID를 설정하지 않는다.

public MqttWireMessage(byte type) { 
     this.type = type; 
     // Use zero as the default message ID. Can't use -1, as that is serialized 
     // as 65535, which would be a valid ID. 
     this.msgId = 0; 
    } 
최종 보내기가 ClientState.java에서 MQTT 출판 호출됩니다

, MqttWireMessage 인스턴스 MSGID = 0 때문에이 getNextMessageId이()는 조건이 참되며 경우 어떤 전달됩니다 있던 (내부적으로 MqttPublish 코드 위에서 보낼 수 있습니다) called는 1을 반환합니다 (첫 번째 메시지이기 때문에 마지막 msg ID에 따라 후속 값을 반환했을 것입니다). 그리고 코드에서 token에 추적 코드로 설정되어 있습니다 (deliveryComplete()에서 추적 중입니다).

public void send(MqttWireMessage message, MqttToken token) throws MqttException { 
     final String methodName = "send"; 
     if (message.isMessageIdRequired() && (message.getMessageId() == 0)) { 
      message.setMessageId(getNextMessageId()); 
     } 
     if (token != null) { 
      try { 
       token.internalTok.setMessageID(message.getMessageId()); 
      } catch (Exception e) { 
      } 
     } 

     /////...... 
    } 
+0

깨달음 주셔서 감사합니다. MqttCallback 대신 게시 함수의 매개 변수로 전달 된 IMqttActionCallback을 사용했습니다. – Phoste

0

다음 쿼리에 응답 : 브로커가 Acknowledgement를 보낸 후 deliveryComplete() 메서드가 실제로 호출되는지 여부 --- 예 !!

배달 완료 코드가이 작은 조각에서 호출됩니다.

private void handleActionComplete(MqttToken token) 
      throws MqttException { 
     final String methodName = "handleActionComplete"; 
     synchronized (token) { 
      // @TRACE 705=callback and notify for key={0} 
      log.fine(CLASS_NAME, methodName, "705", new Object[] { token.internalTok.getKey() }); 
      if (token.isComplete()) { 
       // Finish by doing any post processing such as delete 
       // from persistent store but only do so if the action 
       // is complete 
       clientState.notifyComplete(token); 
      } 

      // Unblock any waiters and if pending complete now set completed 
      token.internalTok.notifyComplete(); 

      if (!token.internalTok.isNotified()) { 
       // If a callback is registered and delivery has finished 
       // call delivery complete callback. 
       if (mqttCallback != null 
        && token instanceof MqttDeliveryToken 
        && token.isComplete()) { 
         mqttCallback.deliveryComplete((MqttDeliveryToken) token); 
       } 
       // Now call async action completion callbacks 
       fireActionEvent(token); 
      } 

      // Set notified so we don't tell the user again about this action. 
      if (token.isComplete()){ 
       if (token instanceof MqttDeliveryToken || token.getActionCallback() instanceof IMqttActionListener) { 
        token.internalTok.setNotified(true); 
       } 
      } 



     } 
    } 

즉 ack가 수신되었거나 notifyComplete가 완료되면 플래그가 설정되고 deliveryComplete 메소드가 호출됩니다.