0
MQTT 중개자로부터 메시지를 수신하는 프로그램을 작성 중입니다. 서버가 클라이언트에서 ID를 가져 와서 주제의 이름이되도록하십시오 (예 : topic1, topic2). 구독 할 때 서버는 주제의 이름을 전달한 다음 해당 주제에서 메시지를 읽습니다. 이 내 서버입니다 :주제에서 메시지를 수신하는 MQTT
public class AnalyticServer {
// The server socket.
private static ServerSocket serverSocket = null;
// The client socket.
private static Socket clientSocket = null;
// This server can accept up to maxClientsCount clients' connections.
private static final int maxClientsCount = 5;
private static final clientThread[] threads = new clientThread[maxClientsCount];
public static void main(String args[]) throws MqttException, InterruptedException {
// The default port number.
int portNumber = 4544;
//Open Server
try {
serverSocket = new ServerSocket(portNumber);
} catch (IOException e) {
System.out.println(e);
}
//
//When server is listening
System.out.println("Server is now listening at port 4544");
while (true) {
try {
//Make connection
clientSocket = serverSocket.accept();
System.out.println("Connected");
int i = 0;
//Find thread null to run the connection
for (i = 0; i < maxClientsCount; i++) {
if (threads[i] == null) {
(threads[i] = new clientThread(clientSocket, threads)).start();
break;
}
}
if (i == maxClientsCount) {
PrintStream os = new PrintStream(clientSocket.getOutputStream());
os.println("Server is now, please try again later");
os.close();
clientSocket.close();
}
} catch (IOException e) {
System.out.println(e);
}
}
}
}
//Thread control each Request
class clientThread extends Thread {
private Socket clientSocket = null;
private final clientThread[] threads;
private int maxClientsCount;
public clientThread(Socket clientSocket, clientThread[] threads) {
this.clientSocket = clientSocket;
this.threads = threads;
maxClientsCount = threads.length;
}
public void run() {
int maxClientsCount = this.maxClientsCount;
clientThread[] threads = this.threads;
try {
int identifier=0;
//get id
InputStream input = null;
input = clientSocket.getInputStream();
identifier=input.read();
String topic="topic".concat(String.valueOf(identifier));
//Subscribe
try {
System.out.println("subscribing");
Subscribe receive=new Subscribe(topic);
} catch (MqttException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (URISyntaxException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
// open image
FileInputStream imgPath = new FileInputStream("image.jpg");
BufferedImage bufferedImage = ImageIO.read(imgPath);
Thread.sleep(1200);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ImageIO.write(bufferedImage, "jpg", baos);
baos.flush();
byte[] imageInByte = baos.toByteArray();
baos.close();
//SendImage
DataOutputStream outToClient = new DataOutputStream(clientSocket.getOutputStream());
outToClient.write(imageInByte);
System.out.println(outToClient.size());
clientSocket.close();
} catch (IOException e) {
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
'그리고 이것은 나의
public class Subscribe implements MqttCallback {
private final int qos = 1;
static String topic=null;
private MqttClient client;
String subText = "abc";
public Subscribe(String topic) throws MqttException, URISyntaxException {
this.topic=topic;
String host = "tcp://m14.cloudmqtt.com:19484";
String username = "***";
String password = "********";
String clientId = MqttClient.generateClientId();
MqttConnectOptions conOpt = new MqttConnectOptions();
conOpt.setCleanSession(true);
conOpt.setUserName(username);
conOpt.setPassword(password.toCharArray());
this.client = new MqttClient(host, clientId, new MemoryPersistence());
;
this.client.setCallback(this);
this.client.connect(conOpt);
this.client.subscribe(topic,1);
System.out.println("subscribe topic: " +this.topic);
}
/**
* @see MqttCallback#connectionLost(Throwable)
*/
public void connectionLost(Throwable cause) {
System.out.println("Connection lost because: " + cause);
System.exit(1);
}
/**
* @see MqttCallback#deliveryComplete(IMqttDeliveryToken)
*/
public void deliveryComplete(IMqttDeliveryToken token) {
}
/**
* @throws IOException
* @see MqttCallback#messageArrived(String, MqttMessage)
*/
public void messageArrived(String topic, MqttMessage message) throws MqttException, IOException {
System.out.println("1");
subText = message.getPayload().toString();
System.out.println("Received"+subText);
}
}
생성자가 올바른 가입의 주제, 그러나 this.client.setCallback (이)가 호출 할 것 같다 클래스를 구독하다 messageArrived 메소드. 그래서 나는 아무것도받을 수 없다.
아무도 몰라요? 당신에게 너무
, 가장 좋은 방법은 질문을 삭제하는 것입니다 그 정보를 숨겨둔 채 다시 물어보십시오. – hardillb
오, 잊어 버렸습니다. 정말 고마워요. –
@ ngoc-anh 정보가 편집 내역에 있습니다. 완전히 질문을 삭제하고 다시 질문하십시오. 해당 사용자의 비밀번호도 변경하십시오. – hardillb