1
package basic.util.mqtt;
import java.io.UnsupportedEncodingException;
import java.nio.charset.Charset;
import java.util.Properties;
import org.eclipse.paho.mqttv5.client.IMqttToken;
import org.eclipse.paho.mqttv5.client.MqttCallback;
import org.eclipse.paho.mqttv5.client.MqttClient;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.eclipse.paho.mqttv5.client.MqttDisconnectResponse;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
import basic.util.FileUtil;
import basic.util.StringUtil;
import basic.util.exception.BaseException;
import basic.util.log.Log;
public class MqttHandler {
private static final String CharSetUTF8 = "UTF-8";
private volatile MqttClient mqttClient;
private volatile MqttMessage mqttMessage;
private Properties settingProperties;
public MqttHandler(Properties prop) {
settingProperties = prop;
}
private void setSettingProperties(Properties settingProperties) {
this.settingProperties = settingProperties;
}
public void startClient() {
try {
//mqttClient = new MqttClient("tcp://mqtt.eclipse.org:1883", "test1");
String host = settingProperties.getProperty("DataHost");
String port = settingProperties.getProperty("DataPort");
if (StringUtil.isAnyEmpty(host, port))
BaseException.throwException("Host or port is invalid for mqtt.");
mqttClient = new MqttClient("tcp://" + host + ":" + port, "test1");
} catch (MqttException exception) {
BaseException.throwException(exception);
}
MqttConnectionOptions options = new MqttConnectionOptions();
String userName = settingProperties.getProperty("DataUserName");
String password = settingProperties.getProperty("DataPassword");
if (!StringUtil.isAnyEmpty(userName, password)) {
options.setUserName(userName);
try {
options.setPassword(password.getBytes(CharSetUTF8));
} catch (UnsupportedEncodingException exception) {
BaseException.throwException(exception);
}
}
options.setConnectionTimeout(5);
// options.setCleanSession(getCleanSession());
options.setKeepAliveInterval(10);
options.setAutomaticReconnect(true);
try {
mqttClient.setCallback(new BtcMqttCallback());
mqttClient.connect(options);
subscribe();
} catch (Exception exception) {
close();
BaseException.throwException(exception);
}
}
public void restartClient() {
close();
startClient();
}
public void restartClient(Properties settings) {
setSettingProperties(settings);
restartClient();
}
public void close() {
if (isConnected()) {
try {
mqttClient.disconnect();
mqttClient.close(true);
} catch (MqttException exception) {
Log.severe(exception);
}
}
}
public boolean isConnected() {
if (mqttClient != null && mqttClient.isConnected()) return true;
else return false;
}
public void sendToMqtt(String data) {
if (!isConnected()) BaseException.throwException("Connection to mqtt has been closed.");
try {
if (mqttMessage == null) {
mqttMessage = new MqttMessage();
mqttMessage.setQos(getQos());
//mqttMessage.setRetained(true);
}
mqttMessage.setPayload(data.getBytes(CharSetUTF8));
mqttClient.publish(settingProperties.getProperty("DataTopic"), mqttMessage);
} catch (Exception exception) {
BaseException.throwException(exception);
}
}
private int getQos() {
String qos = settingProperties.getProperty("DataQos");
if (StringUtil.isAnyEmpty(qos)) return 0;
else return Integer.parseInt(qos);
}
public class BtcMqttCallback implements MqttCallback {
@Override
public void authPacketArrived(int arg0, MqttProperties properties) {
//Log.info("auth Packet Arrived:" + properties.getUserProperties());
}
@Override
public void deliveryComplete(IMqttToken token) {
Log.info("delivery Complete:" + token.isComplete());
}
@Override
public void disconnected(MqttDisconnectResponse response) {
Log.info(response.getReasonString());
}
@Override
public void mqttErrorOccurred(MqttException exception) {
Log.severe(exception);
}
@Override
public void connectComplete(boolean reconnect, String serverURI) {
Log.info("connectComplete() reconnect:" + reconnect + " serverURI:" + serverURI);
subscribe();
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
String msg = new String(message.getPayload(), Charset.forName(CharSetUTF8));
Log.info( "messageArrived() topic:" + topic);
Log.info(msg);
FileUtil.saveJsonFile(msg, settingProperties.getProperty("DataFilePath"));
}
}
private void subscribe() {
try {
int[] Qos = {getQos()};
String[] topic1 = {settingProperties.getProperty("DataTopic")};
mqttClient.subscribe(topic1, Qos);
} catch (Exception exception) {
BaseException.throwException(exception);
}
}
public static void main(String[] args) {
//Properties prop = SettingController.getSetting();
//MqHandler mqHandler = new MqHandler(prop);
//mqHandler.startClient();
}
}
No comments:
Post a Comment