Sunday, October 11, 2020

Java Basic Util - MQTT

1

 package basic.util.mqtt;


import java.io.UnsupportedEncodingException;

import java.nio.charset.Charset;

import java.util.Properties;


import org.eclipse.paho.mqttv5.client.IMqttToken;

import org.eclipse.paho.mqttv5.client.MqttCallback;

import org.eclipse.paho.mqttv5.client.MqttClient;

import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;

import org.eclipse.paho.mqttv5.client.MqttDisconnectResponse;

import org.eclipse.paho.mqttv5.common.MqttException;

import org.eclipse.paho.mqttv5.common.MqttMessage;

import org.eclipse.paho.mqttv5.common.packet.MqttProperties;


import basic.util.FileUtil;

import basic.util.StringUtil;

import basic.util.exception.BaseException;

import basic.util.log.Log;


public class MqttHandler {

private static final String CharSetUTF8 = "UTF-8";

private volatile MqttClient mqttClient; 

private volatile MqttMessage mqttMessage;

private Properties settingProperties;

public MqttHandler(Properties prop) {

settingProperties = prop;

}

private void setSettingProperties(Properties settingProperties) {

this.settingProperties = settingProperties;

}

public void startClient() {

    try {

        //mqttClient = new MqttClient("tcp://mqtt.eclipse.org:1883", "test1");

     String host = settingProperties.getProperty("DataHost");

     String port = settingProperties.getProperty("DataPort");

     if (StringUtil.isAnyEmpty(host, port)) 

     BaseException.throwException("Host or port is invalid for mqtt.");

    

     mqttClient = new MqttClient("tcp://" + host + ":" + port, "test1");

    } catch (MqttException exception) {

     BaseException.throwException(exception);

    }

    

    MqttConnectionOptions options = new MqttConnectionOptions();

    String userName = settingProperties.getProperty("DataUserName");

    String password = settingProperties.getProperty("DataPassword");

    if (!StringUtil.isAnyEmpty(userName, password)) {

     options.setUserName(userName);

     try {

options.setPassword(password.getBytes(CharSetUTF8));

} catch (UnsupportedEncodingException exception) {

BaseException.throwException(exception);

}

    }

         

    options.setConnectionTimeout(5); 

   // options.setCleanSession(getCleanSession());

    options.setKeepAliveInterval(10);

    options.setAutomaticReconnect(true); 

    try {

        mqttClient.setCallback(new BtcMqttCallback());

        mqttClient.connect(options);

        subscribe();

    } catch (Exception exception) {

     close();

     BaseException.throwException(exception);

    }

}

public void restartClient() {

close();

startClient();

}

public void restartClient(Properties settings) {

setSettingProperties(settings);

restartClient();

}

public void close() {

if (isConnected()) {

try {

mqttClient.disconnect();

mqttClient.close(true);

} catch (MqttException exception) {

Log.severe(exception);

}

}

}

public boolean isConnected() {

if (mqttClient != null && mqttClient.isConnected()) return true;

else return false;

}

public void sendToMqtt(String data) {

if (!isConnected()) BaseException.throwException("Connection to mqtt has been closed.");

    try {

        if (mqttMessage == null) {

            mqttMessage = new MqttMessage();

            mqttMessage.setQos(getQos());

            //mqttMessage.setRetained(true);

        }

        mqttMessage.setPayload(data.getBytes(CharSetUTF8)); 

        mqttClient.publish(settingProperties.getProperty("DataTopic"), mqttMessage);

    } catch (Exception exception) {

     BaseException.throwException(exception);

    }

}


private int getQos() {

String qos = settingProperties.getProperty("DataQos");

if (StringUtil.isAnyEmpty(qos)) return 0;

else return Integer.parseInt(qos);

}

public class BtcMqttCallback implements MqttCallback {    

@Override

public void authPacketArrived(int arg0, MqttProperties properties) {

//Log.info("auth Packet Arrived:" + properties.getUserProperties());

}

@Override

public void deliveryComplete(IMqttToken token) {

Log.info("delivery Complete:" + token.isComplete());

}

@Override

public void disconnected(MqttDisconnectResponse response) {

Log.info(response.getReasonString());

}

@Override

public void mqttErrorOccurred(MqttException exception) {

Log.severe(exception);

}

@Override

public void connectComplete(boolean reconnect, String serverURI) {

Log.info("connectComplete() reconnect:" + reconnect + " serverURI:" + serverURI);

        subscribe();

}

@Override

public void messageArrived(String topic, MqttMessage message) throws Exception {

        String msg = new String(message.getPayload(), Charset.forName(CharSetUTF8));

        Log.info( "messageArrived() topic:" + topic);

        Log.info(msg);

        FileUtil.saveJsonFile(msg, settingProperties.getProperty("DataFilePath"));

}

}

private void subscribe() {

    try {

        int[] Qos = {getQos()};

        String[] topic1 = {settingProperties.getProperty("DataTopic")};

        mqttClient.subscribe(topic1, Qos);

    } catch (Exception exception) {

     BaseException.throwException(exception);

    }

}

public static void main(String[] args) {

//Properties prop = SettingController.getSetting();

//MqHandler mqHandler = new MqHandler(prop);

//mqHandler.startClient();

}

}


2 Can use MQTTBox-win to test.

No comments:

Post a Comment