2014-11-28 4 views
18

MqttClient 유형의 client을 만들었으며 아래 코드와 같이 클라이언트를 만들고 Asynchronous callback입니다. 문제는,MqttClient 객체의 동기화 및 비동기 인터페이스가 작동하지 않습니다.

1 - 내가 실행할 때, System.out.println("Client is Connected");가 나타납니다. 그러나 응답이 없습니다또는 onFailure에서 왜 응답합니까? 나는 코드에서 잘못하고있다.

2-i는 static IMqttAsyncClient asynchClientCB = new IMqttAsyncClient() 인터페이스를 구현했지만, MqttClient 유형의 클라이언트를 가지고 있기 때문에이 IMqttAsyncClient 인터페이스를 사용할 수 없습니다. 나는 mqttAsynchClien을 사용하려고 시도했지만, java를위한 프로그램이 아니기 때문에 Android을 사용할 수 없습니다. IMqttAsyncClient 인터페이스를 사용하는 방법.? 아래의 코드 "Updated_code_1"에서 Update_1

, 내가 코드를 약간 수정,하지만 난 onSuccess 동기 콜백 메시지를 인쇄 할 내가 broker에 성공적으로 연결할 때마다 기대하고 메시지 onFailure 의도적으로 네트워크 연결을 끊은 경우와 같이 연결이 끊어진 경우 인쇄 할 동기 콜백입니다. 그러나 내가 broker에 연결할 때, onSuccessonFailur도 아무 것도 배출하지 않습니다. 그래서, 그들은 무엇을 위해 설계 되었습니까?

* Update_2_17_Dec_2014

I가 내가 유선/와이어가없는 네트워크를 통해 브로커에 연결하고 있다면, 그건 문제가 않습니다있는 용액에 우리를 이끌 수 문의? 그 동기 및 비동기 리스너의 동작이 바뀌겠습니까?

Updated_1_code :

MqttConnectOptions opts = getClientOptions(); 
     client = MQTTClientFactory.newClient(broker, port, clientID); 

     if (client != null) { 
      System.out.println("Client is not Null"); 
      client.setCallback(AsynchCallBack); 
      if (opts != null) { 
       iMQTTToken = client.connectWithResult(opts); 
       publishMSG(client, TOPIC,"010101".getBytes(), QoS, pub_isRetained); 
       iMQTTToken.setActionCallback(synchCallBack); 
       if (client.isConnected()) { 
        System.out.println("Client CONNECTED."); 
        publishMSG(client, TOPIC,"010101".getBytes(), QoS, pub_isRetained); 
       } 
      } 
     } 
    .... 
    .... 
    .... 
    .... 
IMqttToken iMQTTToken = new IMqttToken() { 

    @Override 
    public void waitForCompletion(long arg0) throws MqttException { 
     // TODO Auto-generated method stub 
     System.out.println("@waitForCompletion(): waiting " + (arg0 * 1000) + " seconds for connection to be established."); 
    } 

    @Override 
    public void waitForCompletion() throws MqttException { 
     // TODO Auto-generated method stub 
     System.out.println("@waitForCompletion(): waiting for connection to be established."); 
    } 

    @Override 
    public void setUserContext(Object arg0) { 
     // TODO Auto-generated method stub 

    } 

    @Override 
    public void setActionCallback(IMqttActionListener arg0) { 
     // TODO Auto-generated method stub 
     arg0.onSuccess(iMQTTToken); 
     //System.out.println(" " + arg0.onSuccess()); 
     //System.out.println(" " + arg0.onSuccess(iMQTTToken)); 
     iMQTTToken.setActionCallback(synchCallBack); 
    } 

    @Override 
    public boolean isComplete() { 
     // TODO Auto-generated method stub 
     return false; 
    } 

    @Override 
    public Object getUserContext() { 
     // TODO Auto-generated method stub 
     return null; 
    } 

    @Override 
    public String[] getTopics() { 
     // TODO Auto-generated method stub 
     return null; 
    } 

    @Override 
    public boolean getSessionPresent() { 
     // TODO Auto-generated method stub 
     return false; 
    } 

    @Override 
    public MqttWireMessage getResponse() { 
     // TODO Auto-generated method stub 
     return null; 
    } 

    @Override 
    public int getMessageId() { 
     // TODO Auto-generated method stub 
     return 0; 
    } 

    @Override 
    public int[] getGrantedQos() { 
     // TODO Auto-generated method stub 
     return null; 
    } 

    @Override 
    public MqttException getException() { 
     // TODO Auto-generated method stub 
     return null; 
    } 

    @Override 
    public IMqttAsyncClient getClient() { 
     // TODO Auto-generated method stub 
     return null; 
    } 

    @Override 
    public IMqttActionListener getActionCallback() { 
     // TODO Auto-generated method stub 
     return null; 
    } 
}; 

IMqttActionListener synchCallBack = new IMqttActionListener() { 

    @Override 
    public void onSuccess(IMqttToken arg0) { 
     // TODO Auto-generated method stub 
     System.out.println("@onSuccess: Connection Successful."); 
    } 

    @Override 
    public void onFailure(IMqttToken arg0, Throwable arg1) { 
     // TODO Auto-generated method stub 
     System.out.println("@onFailure: Connection Failed."); 
     setViewEnableState(Bconnect, true); 
    } 
}; 

MqttCallback AsynchCallBack = new MqttCallback() { 

    @Override 
    public void messageArrived(String topic, MqttMessage msg) throws Exception { 
     // TODO Auto-generated method stub 
     System.out.println("@messageArrived: Message Delivered."); 
    } 

    @Override 
    public void deliveryComplete(IMqttDeliveryToken token) { 
     // TODO Auto-generated method stub 
     System.out.println("@deliveryComplete: Delivery Completed."); 
    } 

    @Override 
    public void connectionLost(Throwable thrw) { 
     // TODO Auto-generated method stub 
     System.out.println("@Connection Lost: Connection Lost."); 
     setViewEnableState(Bconnect, true); 
    } 
}; 

Newclient :

MqttConnectOptions opts = new MqttConnectOptions(); 
    opts.setCleanSession(CS); 
    opts.setKeepAliveInterval(KATimer); 
    HashMap<Integer, WILL> LWTData = WILLFactory.newWILL("LWT", "LWT MS".getBytes(), 1, false); 
    opts.setWill(LWTData.get(0).getWILLTopic(), 
      LWTData.get(0).getWILLPayLoad(), 
      LWTData.get(0).getWILLQoS(), 
      LWTData.get(0).isWILLRetained()); 

    client = MQTTClientFactory.newClient(IP, PORT, clientID); 

    if (client != null) { 
     System.out.println("client is not null"); 

     client.setCallback(AsynchCB); 
     IMqttToken token = client.connectWithResult(opts); 

     if (client.isConnected()) { 
      System.out.println("Client is Connected"); 

      token.setActionCallback(new IMqttActionListener() { 

       public void onSuccess(IMqttToken arg0) { 
        // TODO Auto-generated method stub 
        System.out.println("synchCB->@onSuccess(): Connection Successful"); 

        try { 
         client.subscribe(TOPIC, QoS); 
        } catch (MqttException e) { 
         // TODO Auto-generated catch block 
         e.printStackTrace(); 
        } 
        try { 
         client.disconnect(); 
        } catch (MqttException e) { 
         // TODO Auto-generated catch block 
         e.printStackTrace(); 
        } 
       } 

       public void onFailure(IMqttToken arg0, Throwable arg1) { 
        // TODO Auto-generated method stub 
        System.out.println("synchCB->@onFailure(): Connection Failed"); 
       } 
      }); 
     }else { 
      System.out.println("client is not connected"); 
     } 
    }else { 
     System.out.println("client = null"); 
    } 

비동기 콜백 :

/** 
* Asynchronous Callback to inform the user about events that might happens Asynchronously. If it is not used, any pending 
* messages destined to the client would not be received. 
*/ 
private static MqttCallback AsynchCB = new MqttCallback() { 

    public void messageArrived(String topic, MqttMessage msg) throws Exception { 
     // TODO Auto-generated method stub 
     System.out.println("AsynchCB->@messageArrived(): "); 

     System.out.println("Topic: " + topic); 
     System.out.println("MSG: " + msg.toString()); 

    } 

    public void deliveryComplete(IMqttDeliveryToken arg0) { 
     // TODO Auto-generated method stub 
     System.out.println("AsynchCB->@deliveryComplete(): "); 
    } 

    public void connectionLost(Throwable arg0) { 
     // TODO Auto-generated method stub 
     System.out.println("AsynchCB->@connectionLost(): "); 
    } 
}; 
+0

onSuccess()에서 구독을 제거하고 연결이 성공했음을 확인한 직후에 추가하면 어떻게됩니까? 내 말은이 줄을 client.subscribe (TOPIC, QoS)라고 지정하는 것입니다. isConnected()가 true를 반환하는지 확인한 직후. 콜백 리스너를 설정하는 것 이외의 다른 연결을 실제로 구독하거나 수행하기 전에 onSuccess()가 호출되기를 기대하는 이유가 조금 혼란 스럽습니다. – kha

+0

@kha 코멘트 주셔서 감사합니다. 실제로 귀하의 의견을 읽은 후에 나는 onSuccess와 onFailure가하는 일을 오해 한 것 같습니다. OnSuccess() 및 onFailure는 연결이 성공한 "onSuccess"또는 실패한 "onFailure"일 때 호출되는 동기 콜백이므로 onSucess() 내부에서 가입하고 te 연결을 설정할 때/성공한 다음 가입 할 때를 생각합니다. 나는 옳은가 틀린가? 안내해주십시오 – rmaik

+0

귀하의 연결은 이미 성공적으로 이루어져 있습니다. 이 줄에서 이미 올바르게 검사하고 있습니다. if (client.isConnected()) ... 연결이되었으므로 주제를 구독해도 좋습니다.시도해보고 주제가 제대로 작동하는지 확인하십시오. 그렇다면 해당 주제에 게시 된 메시지 수신을 시작할 수 있어야합니다. – kha

답변

2

당신의 m 콜백을 처리하는 클라이언트가있는 achine은 발신 포트가 시스템의 방화벽에 의해 차단 될 수 있습니다.

+0

게시 할 수 있기 때문에 그렇게 생각하지 않습니다. 일반적으로 – rmaik

+0

클라이언트가 발신 게시에 특정 포트를 사용하는 경우 확실하지 않은 경우 옳습니다. 확인하십시오. – aurelius

+0

예 그 포트를 사용하고 있다고 확신합니다. 1883 – rmaik