Базовый MQTT с C#
В этом посте мы рассмотрим, как работать с MQTT и Csharp. Создание Брокера и Клиента, который отправляет ему данные.
Но сначала...
Что такое MQTT?
Согласно MQTT.org :
MQTT (сокращение от Message Queuing Telemetry Transport) — это стандартный протокол обмена сообщениями OASIS для Интернета вещей (IoT). Он разработан как чрезвычайно легкий транспорт обмена сообщениями публикации/подписки, который идеально подходит для подключения удаленных устройств с небольшим объемом кода и минимальной пропускной способностью сети. MQTT сегодня используется в самых разных отраслях, таких как автомобилестроение, производство, телекоммуникации, нефть и газ и т. д.
Кодирование
Структура проекта
Структура проекта содержит .sln
файл и два консольных проекта C#, как показано ниже:
.
├── MQTTFirstLook.Broker
├── MQTTFirstLook.Client
└── MQTTFirstLook.sln
Используемые библиотеки
MQTTNet
MQTTnet.Extensions.ManagedClient
Serilog
Serilog.Sinks.Console
Newtonsoft.Json
Брокерская реализация
Эта небольшая реализация брокера отвечает за считывание данных, полученных от подключенных клиентов, и отображение их на экране.
Для этого мы создадим новый MQTTServer
, который прослушивает порт 707 на локальном хосте.
Вот что нужно:
Использование
Это библиотеки, используемые сервером.
using System;
using System.Text;
using MQTTnet;
using MQTTnet.Server;
using Serilog;
Выполнение
Вот как мы создаем новый MQTTServer. В этом примере этот код находится внутри void Main(string[] args)
метода.
// Create the options for our MQTT Broker
MqttServerOptionsBuilder options = new MqttServerOptionsBuilder()
// set endpoint to localhost
.WithDefaultEndpoint()
// port used will be 707
.WithDefaultEndpointPort(707)
// handler for new connections
.WithConnectionValidator(OnNewConnection)
// handler for new messages
.WithApplicationMessageInterceptor(OnNewMessage);
// creates a new mqtt server
IMqttServer mqttServer = new MqttFactory().CreateMqttServer();
// start the server with options
mqttServer.StartAsync(options.Build()).GetAwaiter().GetResult();
// keep application running until user press a key
Console.ReadLine();
Обработчики
Обработчики — это обратные вызовы, которые MQTT вызывает всякий раз, когда вызывается действие. В этом примере у нас есть обработчик новых подключений и всякий раз, когда сервер получает новое сообщение.
public static void OnNewConnection(MqttConnectionValidatorContext context)
{
Log.Logger.Information(
"New connection: ClientId = {clientId}, Endpoint = {endpoint}",
context.ClientId,
context.Endpoint);
}
public static void OnNewMessage(MqttApplicationMessageInterceptorContext context)
{
var payload = context.ApplicationMessage?.Payload == null ? null : Encoding.UTF8.GetString(context.ApplicationMessage?.Payload);
MessageCounter++;
Log.Logger.Information(
"MessageId: {MessageCounter} - TimeStamp: {TimeStamp} -- Message: ClientId = {clientId}, Topic = {topic}, Payload = {payload}, QoS = {qos}, Retain-Flag = {retainFlag}",
MessageCounter,
DateTime.Now,
context.ClientId,
context.ApplicationMessage?.Topic,
payload,
context.ApplicationMessage?.QualityOfServiceLevel,
context.ApplicationMessage?.Retain);
}
Клиент
В нашем клиенте мы собираемся создать новый MQTTClient
экземпляр. Этот экземпляр будет подключаться к нашему брокеру на локальном хосте: 707 и отправлять сообщения в тему.Dev.to/topic/json
Что необходимо для этой реализации ниже:
Использование
Это библиотеки, используемые сервером.
using System;
using System.Threading.Tasks;
using MQTTnet;
using MQTTnet.Client.Connecting;
using MQTTnet.Client.Disconnecting;
using MQTTnet.Client.Options;
using MQTTnet.Extensions.ManagedClient;
using Newtonsoft.Json;
using Serilog;
Выполнение
Наш MQTTClient подключится к нашему брокеру через TCP. И так же, как сервер, этот код будет внутри void Main(string[] args)
метода.
// Creates a new client
MqttClientOptionsBuilder builder = new MqttClientOptionsBuilder()
.WithClientId("Dev.To")
.WithTcpServer("localhost", 707);
// Create client options objects
ManagedMqttClientOptions options = new ManagedMqttClientOptionsBuilder()
.WithAutoReconnectDelay(TimeSpan.FromSeconds(60))
.WithClientOptions(builder.Build())
.Build();
// Creates the client object
IManagedMqttClient _mqttClient = new MqttFactory().CreateManagedMqttClient();
// Set up handlers
_mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(OnConnected);
_mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(OnDisconnected);
_mqttClient.ConnectingFailedHandler = new ConnectingFailedHandlerDelegate(OnConnectingFailed);
// Starts a connection with the Broker
_mqttClient.StartAsync(options).GetAwaiter().GetResult();
// Send a new message to the broker every second
while (true)
{
string json = JsonConvert.SerializeObject(new { message = "Heyo :)", sent= DateTimeOffset.UtcNow });
_mqttClient.PublishAsync("dev.to/topic/json", json);
Task.Delay(1000).GetAwaiter().GetResult();
}
Обработчики
Обработчики клиента вызываются, когда клиент получает сигнал о подключении, сбое подключения и отключении.
public static void OnConnected(MqttClientConnectedEventArgs obj)
{
Log.Logger.Information("Successfully connected.");
}
public static void OnConnectingFailed(ManagedProcessFailedEventArgs obj)
{
Log.Logger.Warning("Couldn't connect to broker.");
}
public static void OnDisconnected(MqttClientDisconnectedEventArgs obj)
{
Log.Logger.Information("Successfully disconnected.");
}
Заключение.
Вы можете найти этот проект на Github
Только полноправные пользователи могут оставлять комментарии. Аутентифицируйтесь пожалуйста, используя сервисы.