2017-11-21 16 views
0

MQTT 중개자로부터 메시지를 수신하는 프로그램을 작성 중입니다. 서버가 클라이언트에서 ID를 가져 와서 주제의 이름이되도록하십시오 (예 : topic1, topic2). 구독 할 때 서버는 주제의 이름을 전달한 다음 해당 주제에서 메시지를 읽습니다. 이 내 서버입니다 :주제에서 메시지를 수신하는 MQTT

public class AnalyticServer { 

     // The server socket. 
     private static ServerSocket serverSocket = null; 
     // The client socket. 
     private static Socket clientSocket = null; 

     // This server can accept up to maxClientsCount clients' connections. 
     private static final int maxClientsCount = 5; 
     private static final clientThread[] threads = new clientThread[maxClientsCount]; 

     public static void main(String args[]) throws MqttException, InterruptedException { 





     // The default port number. 
     int portNumber = 4544; 

     //Open Server 
     try { 
      serverSocket = new ServerSocket(portNumber); 
     } catch (IOException e) { 
      System.out.println(e); 
     } 
     // 


     //When server is listening 
     System.out.println("Server is now listening at port 4544"); 
     while (true) { 
      try { 
      //Make connection 

      clientSocket = serverSocket.accept(); 

      System.out.println("Connected"); 



      int i = 0; 
      //Find thread null to run the connection 
      for (i = 0; i < maxClientsCount; i++) { 
       if (threads[i] == null) { 
       (threads[i] = new clientThread(clientSocket, threads)).start(); 
       break; 
       } 
      } 
      if (i == maxClientsCount) { 
       PrintStream os = new PrintStream(clientSocket.getOutputStream()); 
       os.println("Server is now, please try again later"); 
       os.close(); 
       clientSocket.close(); 
      } 
      } catch (IOException e) { 
      System.out.println(e); 
      } 
     } 
     } 
    } 

    //Thread control each Request 
    class clientThread extends Thread { 


     private Socket clientSocket = null; 
     private final clientThread[] threads; 
     private int maxClientsCount; 

     public clientThread(Socket clientSocket, clientThread[] threads) { 
     this.clientSocket = clientSocket; 
     this.threads = threads; 
     maxClientsCount = threads.length; 
     } 

     public void run() { 
     int maxClientsCount = this.maxClientsCount; 
     clientThread[] threads = this.threads; 


     try { 
      int identifier=0; 
      //get id 
      InputStream input = null; 
      input = clientSocket.getInputStream(); 


      identifier=input.read(); 

      String topic="topic".concat(String.valueOf(identifier)); 

      //Subscribe 
      try { 
       System.out.println("subscribing"); 
       Subscribe receive=new Subscribe(topic); 


      } catch (MqttException e) { 
       // TODO Auto-generated catch block 
       e.printStackTrace(); 
      } catch (URISyntaxException e) { 
       // TODO Auto-generated catch block 
       e.printStackTrace(); 
      } 
      // open image 
      FileInputStream imgPath = new FileInputStream("image.jpg"); 
      BufferedImage bufferedImage = ImageIO.read(imgPath); 

      Thread.sleep(1200); 
      ByteArrayOutputStream baos = new ByteArrayOutputStream(); 
      ImageIO.write(bufferedImage, "jpg", baos); 
      baos.flush(); 
      byte[] imageInByte = baos.toByteArray(); 
      baos.close(); 



      //SendImage 
      DataOutputStream outToClient = new DataOutputStream(clientSocket.getOutputStream());   
      outToClient.write(imageInByte); 
      System.out.println(outToClient.size()); 

      clientSocket.close(); 

     } catch (IOException e) { 
     } catch (InterruptedException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 
     } 
    } 

'그리고 이것은 나의

public class Subscribe implements MqttCallback { 

    private final int qos = 1; 
    static String topic=null; 

    private MqttClient client; 

    String subText = "abc"; 
    public Subscribe(String topic) throws MqttException, URISyntaxException { 
     this.topic=topic; 

     String host = "tcp://m14.cloudmqtt.com:19484"; 
     String username = "***"; 
     String password = "********"; 
     String clientId = MqttClient.generateClientId(); 
     MqttConnectOptions conOpt = new MqttConnectOptions(); 
     conOpt.setCleanSession(true); 
     conOpt.setUserName(username); 
     conOpt.setPassword(password.toCharArray()); 



     this.client = new MqttClient(host, clientId, new MemoryPersistence()); 

    ; 
     this.client.setCallback(this); 

     this.client.connect(conOpt); 
     this.client.subscribe(topic,1); 
     System.out.println("subscribe topic: " +this.topic); 
    } 





    /** 
    * @see MqttCallback#connectionLost(Throwable) 
    */ 
    public void connectionLost(Throwable cause) { 
     System.out.println("Connection lost because: " + cause); 
     System.exit(1); 
    } 

    /** 
    * @see MqttCallback#deliveryComplete(IMqttDeliveryToken) 
    */ 
    public void deliveryComplete(IMqttDeliveryToken token) { 
    } 

    /** 
    * @throws IOException 
    * @see MqttCallback#messageArrived(String, MqttMessage) 
    */ 

    public void messageArrived(String topic, MqttMessage message) throws MqttException, IOException { 
     System.out.println("1"); 
     subText = message.getPayload().toString(); 

     System.out.println("Received"+subText); 

    } 
} 

생성자가 올바른 가입의 주제, 그러나 this.client.setCallback (이)가 호출 할 것 같다 클래스를 구독하다 messageArrived 메소드. 그래서 나는 아무것도받을 수 없다.

아무도 몰라요? 당신에게 너무

+0

, 가장 좋은 방법은 질문을 삭제하는 것입니다 그 정보를 숨겨둔 채 다시 물어보십시오. – hardillb

+0

오, 잊어 버렸습니다. 정말 고마워요. –

+0

@ ngoc-anh 정보가 편집 내역에 있습니다. 완전히 질문을 삭제하고 다시 질문하십시오. 해당 사용자의 비밀번호도 변경하십시오. – hardillb

답변

0

당신은 당신의 가입자에 대해 충분히 게시하지 않습니다 많이 있지만 여기에 감사 간단한 하나가 작동 :

당신은 소스에서 사용자 이름과 비밀번호를 포함했다
import java.io.IOException; 
import java.sql.Timestamp; 

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; 
import org.eclipse.paho.client.mqttv3.MqttCallback; 
import org.eclipse.paho.client.mqttv3.MqttClient; 
import org.eclipse.paho.client.mqttv3.MqttConnectOptions; 
import org.eclipse.paho.client.mqttv3.MqttException; 
import org.eclipse.paho.client.mqttv3.MqttMessage; 
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence; 

public class TestSub implements MqttCallback 
{ 
    public static void main(String[] args) 
    { 
     String url = "tcp://iot.eclipse.org:1883"; 
     String clientId = "TestSub_"+System.currentTimeMillis(); 
     String topicName = "test/ABC/one"; 
     int qos = 1; 
     boolean cleanSession = true; 
     String userName = "myUserId"; 
     String password = "mypwd"; 

     try 
     { 
     new TestSub(url, clientId, cleanSession, userName, password, topicName, qos); 
     } 
     catch (MqttException me) 
     { 
     System.out.println(me.getLocalizedMessage()); 
     System.out.println(me.getCause()); 
     me.printStackTrace(); 
     } 
    } 

    public TestSub(String url, String clientId, boolean cleanSession, String userName, String password, String topicName, int qos) throws MqttException 
    { 
     String tmpDir = System.getProperty("java.io.tmpdir"); 
     MqttDefaultFilePersistence dataStore = new MqttDefaultFilePersistence(tmpDir); 
     MqttClient   client; 
     MqttConnectOptions conOpt; 

     try 
     { 
     conOpt = new MqttConnectOptions(); 
     conOpt.setMqttVersion(MqttConnectOptions.MQTT_VERSION_DEFAULT); 
     conOpt.setCleanSession(cleanSession); 
     if (userName != null) 
      conOpt.setUserName(userName); 

     if (password != null) 
      conOpt.setPassword(password.toCharArray()); 

     // Construct an MQTT blocking mode client 
     client = new MqttClient(url, clientId, dataStore); 

     // Set this wrapper as the callback handler 
     client.setCallback(this); 

     // Connect to the MQTT server 
     client.connect(conOpt); 
     System.out.println("Connected to " + url + " with client ID " + client.getClientId()); 

     System.out.println("Subscribing to topic \"" + topicName + "\" qos " + qos); 
     client.subscribe(topicName, qos); 

     // Continue waiting for messages until the Enter is pressed 
     System.out.println("Press <Enter> to exit"); 
     try 
     { 
      System.in.read(); 
     } 
     catch (IOException e) 
     { 
      // If we can't read we'll just exit 
     } 

     // Disconnect the client from the server 
     client.disconnect(); 
     System.out.println("Disconnected"); 

     } 
     catch (MqttException e) 
     { 
     e.printStackTrace(); 
     System.out.println("Unable to set up client: " + e.toString()); 
     System.exit(1); 
     } 
    } 

    public void connectionLost(Throwable cause) 
    { 
     System.out.println("Connection lost! " + cause.getLocalizedMessage()); 
     System.exit(1); 
    } 

    public void deliveryComplete(IMqttDeliveryToken token) 
    { 
    } 

    public void messageArrived(String topic, MqttMessage message) 
     throws MqttException 
    { 
     String time = new Timestamp(System.currentTimeMillis()).toString(); 
     System.out.println("Time:\t" + time + " Topic:\t" + topic + " Message:\t" + new String(message.getPayload())); 
    } 
}