MQTT 在 flutter 中的使用
安装 mqtt
初始化 mqtt
int res = await MqttUtils.getInstance().init('客户标签Clients');
print("mqtt init res = $res");
if (res == 0) {
}
设置订阅 Topics, 监听某个订阅 Topics 信息
MqttUtils.getInstance().subscribe('testtopic1', callBack: (e) {});
发送消息
MqttUtils.getInstance().publishMessage('testtopic2', massageData, callBack: (e) {});
创建 MqttUtils 类
import 'dart:io';
import 'package:mqtt_client/mqtt_client.dart';
import 'package:mqtt_client/mqtt_server_client.dart';
final _client = MqttServerClient('192.168.100.119', '');
class MqttUtils {
static MqttUtils _instance;
static MqttUtils getInstance() {
if (_instance == null) {
_instance = MqttUtils();
}
return _instance;
}
Future<int> init(String userId) async {
_client.logging(on: true);
_client.keepAlivePeriod = 20;
_client.onConnected = onConnected;
_client.onDisconnected = onDisconnected;
_client.onSubscribed = onSubscribed;
_client.pongCallback = ping;
final connMess = MqttConnectMessage()
.withClientIdentifier('$userId')
.keepAliveFor(20)
.withWillTopic('willtopic')
.withWillMessage('My Will message')
.startClean()
.withWillQos(MqttQos.atLeastOnce);
print('EXAMPLE::Mosquitto client connecting....');
_client.connectionMessage = connMess;
try {
await _client.connect();
} on Exception catch (e) {
print('MqttUtils::client exception - $e');
_client.disconnect();
}
if (_client.connectionStatus.state == MqttConnectionState.connected) {
print('EXAMPLE::已连接Mosquito客户端');
} else {
print('EXAMPLE::错误mosquito客户端连接失败-正在断开连接,状态为 ${_client.connectionStatus.state}');
_client.disconnect();
exit(-1);
}
return 0;
}
bool subscribe(String topic, {MqttQos qosLevel = MqttQos.atMostOnce, Function callBack}) {
_client.subscribe(topic, qosLevel);
_client.updates.listen((dynamic c) {
final MqttPublishMessage recMess = c[0].payload;
final pt = MqttPublishPayload.bytesToStringAsString(recMess.payload.message);
print('EXAMPLE::Change notification:: topic is <${c[0].topic}>, payload is <-- $pt -->');
print('');
if (callBack != null) {
callBack(pt);
}
});
_client.published.listen((MqttPublishMessage message) {
print('MqttUtils::Published notification:: topic is ${message.variableHeader.topicName}, with Qos ${message.header.qos}');
});
return true;
}
bool unsubscribe(String topic) {
print('MqttUtils::Unsubscribing' + topic);
_client.unsubscribe(topic);
return true;
}
void disconnect() {
print('MqttUtils::Disconnecting');
_client.disconnect();
_client.securityContext = null;
}
bool isConnected() {
}
void publishMessage(String topic, String jsonString, { Function callBack }) {
print(topic);
print(jsonString);
final builder1 = MqttClientPayloadBuilder();
builder1.addString(jsonString);
print('EXAMPLE:: <<<< PUBLISH 1 >>>>');
_client.publishMessage(topic, MqttQos.atLeastOnce, builder1.payload);
callBack(jsonString);
}
}
void onConnected() {
print('MqttUtils::OnConnected client callback - Client connection was sucessful');
}
void onSubscribed(String topic) {
print('已确认订阅主题 $topic');
}
void onDisconnected() {
print('-客户端断开连接');
print('MqttUtils::OnDisconnected client callback - Client disconnection');
if (_client.connectionStatus.returnCode == MqttConnectReturnCode.solicited) {
print('MqttUtils::OnDisconnected callback is solicited, this is correct');
}
}
void ping() {
print('MqttUtils::Ping response client callback invoked');
}