Базовый MQTT с C#

  • Михаил
  • 12 мин. на прочтение
  • 174
  • 19 Dec 2019
  • 19 Dec 2019

В этом посте мы рассмотрим, как работать с MQTT и Csharp. Создание Брокера и Клиента, который отправляет ему данные.

Но сначала...

Что такое MQTT?

Согласно MQTT.org :

MQTT (сокращение от Message Queuing Telemetry Transport) — это стандартный протокол обмена сообщениями OASIS для Интернета вещей (IoT). Он разработан как чрезвычайно легкий транспорт обмена сообщениями публикации/подписки, который идеально подходит для подключения удаленных устройств с небольшим объемом кода и минимальной пропускной способностью сети. MQTT сегодня используется в самых разных отраслях, таких как автомобилестроение, производство, телекоммуникации, нефть и газ и т. д.

Кодирование

Структура проекта

Структура проекта содержит .slnфайл и два консольных проекта C#, как показано ниже:
 

.
├── MQTTFirstLook.Broker
├── MQTTFirstLook.Client
└── MQTTFirstLook.sln

 

Используемые библиотеки

MQTTNet
MQTTnet.Extensions.ManagedClient
Serilog
Serilog.Sinks.Console
Newtonsoft.Json

Брокерская реализация

Эта небольшая реализация брокера отвечает за считывание данных, полученных от подключенных клиентов, и отображение их на экране.

Для этого мы создадим новый MQTTServer, который прослушивает порт 707 на локальном хосте.

Вот что нужно:

Использование

Это библиотеки, используемые сервером.
 

using System;
using System.Text;
using MQTTnet;
using MQTTnet.Server;
using Serilog;

 

Выполнение

Вот как мы создаем новый MQTTServer. В этом примере этот код находится внутри void Main(string[] args)метода.
 


// Create the options for our MQTT Broker
MqttServerOptionsBuilder options = new MqttServerOptionsBuilder()
                                     // set endpoint to localhost
                                     .WithDefaultEndpoint()
                                     // port used will be 707
                                     .WithDefaultEndpointPort(707) 
                                     // handler for new connections
                                     .WithConnectionValidator(OnNewConnection) 
                                     // handler for new messages
                                     .WithApplicationMessageInterceptor(OnNewMessage);

// creates a new mqtt server     
IMqttServer mqttServer = new MqttFactory().CreateMqttServer();

// start the server with options  
mqttServer.StartAsync(options.Build()).GetAwaiter().GetResult();

// keep application running until user press a key
Console.ReadLine();

 

Обработчики

Обработчики — это обратные вызовы, которые MQTT вызывает всякий раз, когда вызывается действие. В этом примере у нас есть обработчик новых подключений и всякий раз, когда сервер получает новое сообщение.
 

public static void OnNewConnection(MqttConnectionValidatorContext context)
{
    Log.Logger.Information(
            "New connection: ClientId = {clientId}, Endpoint = {endpoint}",
            context.ClientId,
            context.Endpoint);
}

public static void OnNewMessage(MqttApplicationMessageInterceptorContext context)
{
    var payload = context.ApplicationMessage?.Payload == null ? null : Encoding.UTF8.GetString(context.ApplicationMessage?.Payload);

    MessageCounter++;

    Log.Logger.Information(
        "MessageId: {MessageCounter} - TimeStamp: {TimeStamp} -- Message: ClientId = {clientId}, Topic = {topic}, Payload = {payload}, QoS = {qos}, Retain-Flag = {retainFlag}",
        MessageCounter,
        DateTime.Now,
        context.ClientId,
        context.ApplicationMessage?.Topic,
        payload,
        context.ApplicationMessage?.QualityOfServiceLevel,
        context.ApplicationMessage?.Retain);
}

 

Клиент

В нашем клиенте мы собираемся создать новый MQTTClientэкземпляр. Этот экземпляр будет подключаться к нашему брокеру на локальном хосте: 707 и отправлять сообщения в тему.Dev.to/topic/json

Что необходимо для этой реализации ниже:

Использование

Это библиотеки, используемые сервером.
 

using System;
using System.Threading.Tasks;
using MQTTnet;
using MQTTnet.Client.Connecting;
using MQTTnet.Client.Disconnecting;
using MQTTnet.Client.Options;
using MQTTnet.Extensions.ManagedClient;
using Newtonsoft.Json;
using Serilog;

 

Выполнение

Наш MQTTClient подключится к нашему брокеру через TCP. И так же, как сервер, этот код будет внутри void Main(string[] args) метода.
 


// Creates a new client
MqttClientOptionsBuilder builder = new MqttClientOptionsBuilder()
                                        .WithClientId("Dev.To")
                                        .WithTcpServer("localhost", 707);

// Create client options objects
ManagedMqttClientOptions options = new ManagedMqttClientOptionsBuilder()
                        .WithAutoReconnectDelay(TimeSpan.FromSeconds(60))
                        .WithClientOptions(builder.Build())
                        .Build();

// Creates the client object
IManagedMqttClient _mqttClient = new MqttFactory().CreateManagedMqttClient();

// Set up handlers
_mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(OnConnected);
_mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(OnDisconnected);
_mqttClient.ConnectingFailedHandler = new ConnectingFailedHandlerDelegate(OnConnectingFailed);

// Starts a connection with the Broker
_mqttClient.StartAsync(options).GetAwaiter().GetResult();

// Send a new message to the broker every second
while (true)
{
    string json = JsonConvert.SerializeObject(new { message = "Heyo :)", sent= DateTimeOffset.UtcNow });
    _mqttClient.PublishAsync("dev.to/topic/json", json);

    Task.Delay(1000).GetAwaiter().GetResult();
}

 

Обработчики

Обработчики клиента вызываются, когда клиент получает сигнал о подключении, сбое подключения и отключении.
 

public static void OnConnected(MqttClientConnectedEventArgs obj)
{
    Log.Logger.Information("Successfully connected.");
}

public static void OnConnectingFailed(ManagedProcessFailedEventArgs obj)
{
    Log.Logger.Warning("Couldn't connect to broker.");
}

public static void OnDisconnected(MqttClientDisconnectedEventArgs obj)
{
    Log.Logger.Information("Successfully disconnected.");
}

 

Заключение.

Вы можете найти этот проект на Github