Интеграция .NET Core с RabbitMQ

  • Михаил
  • 45 мин. на прочтение
  • 179
  • 26 Feb 2023
  • 01 Mar 2023

RabbitMQ — это программное обеспечение для организации очередей сообщений, также известное как брокер сообщений или диспетчер очередей. Это программное обеспечение, в котором определяются очереди и к которому подключаются приложения для передачи или публикации сообщений.

Вот краткое повторение базовой архитектуры очереди сообщений, описанной в предыдущем разделе:

  • Клиентские приложения, создающие сообщения, называются издателями (производителями) или Producer.
  • Затем эти сообщения доставляются брокеру или в очередь сообщений.
  • Потребители или Consumer — это приложения-получатели, которые подключаются к очереди и подписываются на обрабатываемые сообщения.
  • Сообщения, помещенные в очередь, сохраняются до тех пор, пока потребитель не извлечет их.

RabbitMQ — это брокер сообщений или диспетчер очередей, который облегчает передачу и доставку сообщений от производителя к потребителю. Программное обеспечение для организации очереди сообщений, такое как RabbitMQ, позволяет конечным точкам создавать и потреблять сообщения, не взаимодействуя друг с другом. Вместо этого они взаимодействуют с очередью, обеспечивая более быструю доставку и более эффективное взаимодействие.

Теперь, когда мы закончили предварительные работы, давайте перейдем к реальной работе с этим, не так ли?

 

Установка необходимых компонентов RabbitMQ

В этом руководстве я буду использовать RabbitMQ установленный в Docker, но покажу вам, как установить RabbitMQ в Windows. Его можно установить двумя способами: один — с помощью Chocolatey (оптимальный способ установки); а другой — с помощью установщика, что мы и будем делать в этом посте. Если вы используете другую операционную систему, вы можете посетить официальный сайт RabbitMQ .

  1. Загрузите и установите Erlang/OTP 22.1 (или более позднюю версию) для двоичного файла Windows и запустите его под учетной записью администратора.
  2. Загрузите и установите последний сервер RabbitMQ. На момент написания этой статьи последняя версия — 3.9.13, которую мы и будем устанавливать.
  3. После завершения установки проверьте состояние RabbitMQ, перейдя в командную строку RabbitMQ и введя:
rabbitmqctl.bat status

 

Создание демонстрационного проекта с помощью Visual Studio 2022
В этой части руководства вы узнаете, как создать демонстрационный проект с помощью Visual Studio 2022 или качаем готовый.

1. Создайте два приложения .Net Core Console в одном решении — один проект отправителя и один проект получателя.

2. Затем установите пакет RabbitMQ в оба проекта. На панели инструментов Visual Studio щелкните > Инструменты > Диспетчер пакетов Nuget > Консоль диспетчера пакетов.

 

3. Выберите целевой проект, в котором должен быть установлен пакет RabbitMQ.client. 

Install-package RabbitMQ.Client

 

4. После установки пакета в обоих проектах перейдите в проект Producer, откройте Program.cs, вставьте следующие строки кода и проверьте:

using RabbitMQ.Client;
using System.Text;

class Program
{
    static void Main(string[] args)
    {
        var factory = new ConnectionFactory() { HostName = "10.10.11.18" };
        using (var connection = factory.CreateConnection())
        {
            Console.WriteLine("Enter command 'exit' to stop sending meesages");

            string message = string.Empty;
            do
            {
                SendMessage(connection, message);
                Console.Write("Enter your next message:");
                message = Console.ReadLine();
            } 
            while (!message.Equals("exit", StringComparison.OrdinalIgnoreCase));
        }
    }
    private static void SendMessage(IConnection connection, string message)
    {
        if (string.IsNullOrEmpty(message))
            return;
        using (var channel = connection.CreateModel())
        {
            channel.QueueDeclare(queue: "hello",
                                 durable: false,
                                 exclusive: false,
                                 autoDelete: false,
                                 arguments: null); var body =
                                    Encoding.UTF8.GetBytes(message);
            var properties = channel.CreateBasicProperties();
            properties.Persistent = true;
            channel.BasicPublish(exchange: "",
                                 routingKey: "hello",
                                 basicProperties: properties,
                                 body: body);
            Console.WriteLine(" [x] Sent {0}", message);
        }
    }
}

5. В проекте Consumer откройте Program.cs, вставьте следующие строки кода и проверьте:

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;

class Program
{
    public static void Main(string[] args)
    {
        var factory = new ConnectionFactory() { HostName = "10.10.11.18" }; // If your broker resides on a different machine, you can specify the name or the IP address.
        using (var connection = factory.CreateConnection()) // Creates the RabbitMQ connection
        using (var channel = connection.CreateModel()) // Creates a channel, which is where most of the API for getting things done resides.
        {
            // We will also have to declare the queue here,
            // because this application might start first so we will make sure that the queue exists before receiving messages.
            channel.QueueDeclare(queue: "hello", // The name of the queue
                                 durable: false, // true if we are declaring a durable queue(the queue will survive a server restart)
                                 exclusive: false, // true if we are declaring an exclusive queue (restricted to this connection)
                                 autoDelete: false, // true if we are declaring an auto delete queue (server will delete it when no longer in use)
                                 arguments: null); // other properties (construction arguments) for the queue

            var consumer = new EventingBasicConsumer(channel);
            // Callback
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body.ToArray();
                var message = Encoding.UTF8.GetString(body);
                Console.WriteLine("Received: {0}", message);
            };

            // Start receiving messages
            channel.BasicConsume(queue: "hello", // the name of the queue
                                 autoAck: true, // true if the server should consider messages acknowledged once delivered;
                                                //false if the server should expect explicit acknowledgements
                                 consumer: consumer); // an interface to the consumer object
            Console.WriteLine("Press [enter] to exit.");
            Console.ReadLine();
        }
    }
}

 

6. После проверки запустите оба проекта. Вы можете запустить каждый проект с помощью Visual Studio, выбрав проект запуска и нажав CTRL+F5. Или в свойствах выставить запуск двух проектов.

 

7. Кроме того, вы также можете запускать проекты с помощью консоли. Для этого щелкните правой кнопкой мыши проект Receiver и выберите «Открыть папку в проводнике». В адресной строке вкладки проводника введите cmd , нажмите Enter , затем введите dotnet run . Выполните тот же процесс для проекта Sender.

 

8. Наконец, теперь вы можете начать отправлять и получать сообщения через RabbitMQ.

 

Использование имени компьютера или IP-адреса в качестве имени хоста
Могут быть случаи, когда вы предпочитаете использовать имя компьютера или IP-адрес в соединении. Однако по умолчанию RabbitMQ не позволяет гостям удаленно подключаться. Когда вы запустите приложение в этом сценарии, оно вернется с сообщением об ошибке, в котором говорится:

 «ACCESS_REFUSED — вход в систему был отклонен с использованием механизма аутентификации PLAIN. Подробнее см. в файле журнала брокера».

Обратите внимание, что гостевой пользователь по умолчанию может подключаться только с локального хоста. Это сделано для того, чтобы ограничить широко известное использование учетных данных в производственных системах. Чтобы иметь возможность использовать имя компьютера или IP-адрес в качестве имени узла в клиентских приложениях, необходимо подключиться к узлу с учетными данными.

Чтобы добавить пользователя в RabbitMQ, откройте командную строку RabbitMQ и вставьте каждую строку.

1. Чтобы создать пользователя с именем 'testuser' и паролем 'topsecretpassword': 

rabbitmqctl add_user testuser topsecretpassword

2. Чтобы назначить пользователя администратором:

rabbitmqctl set_user_tags администратор тестового пользователя

3. Чтобы установить разрешение для пользователя:

rabbitmqctl set_permissions -p/testuser ".*" ".*" ".*"

Другой способ добавить пользователя — использовать RabbitMQ Management в браузере. Мы покажем это в следующей части.

 

Использование управления RabbitMQ

 

1. Начните с включения управления RabbitMQ. В меню «Пуск» найдите командную строку RabbitMQ и введите:

rabbitmq-plugins.bat включить rabbitmq_management

Затем нажмите Enter .

2. Далее откройте браузер и перейдите по адресу http://localhost:15672 .

3. Войдите в систему, используя гостевую учетную запись по умолчанию.

Имя пользователя: guest
Пароль: guest

 

4. Перейдите на вкладку «Администратор»> разверните «Добавить пользователя»> и введите данные пользователя.

5. Нажмите «Добавить пользователя», и пользователь будет сохранен в базе данных RabbitMQ. Вы должны увидеть только что добавленного пользователя в разделе «Пользователи». Чтобы добавить разрешение для этого пользователя, щелкните имя пользователя.

 

6. Теперь вернитесь в Visual Studio. Перезапишите эти строки кода (см. рисунок ниже) в обоих проектах в Program.cs, используя новую учетную запись пользователя и пароль, которые вы только что создали с помощью командной строки или с помощью RabbitMQ Management.

 

7. Запустите два проекта, и на этот раз он не должен возвращать ранее встречавшееся сообщение об ошибке «ACCESS_REFUSED...».


Создание и использование рабочих очередей (очередей задач)
В этом разделе руководства мы покажем вам, как создать рабочую очередь, которую можно использовать для распределения трудоемких задач между несколькими работниками. Одним из преимуществ использования Task Queue является возможность легкого распараллеливания работы. Таким образом, в случае накопления незавершенной работы можно добавить больше работников, что позволит вам легко масштабироваться.

1. В предыдущих разделах мы создали проекты отправителя и получателя в одном решении. Теперь мы добавим еще два консольных проекта для нашей демонстрации рабочей очереди. В этом примере мы назовем эти проекты — Worker и NewTask.

2. Добавьте пакет RabbitMQ.Client в оба новых проекта.

3. Разверните проект Worker, перейдите в файл program.cs и вставьте следующие строки кода:

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;

class Program
{
    private const string hostName = "10.10.11.18", userName = "testuser", password = "P@ssw0rd";
    public static void Main(string[] args)
    {
        var factory = new ConnectionFactory() { HostName = hostName, UserName = userName, Password = password };
        using (var connection = factory.CreateConnection()) // Creates the RabbitMQ connection
        using (var channel = connection.CreateModel()) // Creates a channel, which is where most of the API for getting things done resides.
        {
            // We will also have to declare the queue here,
            // because this application might start first so we will make sure that the queue exists before receiving messages.
            channel.QueueDeclare(queue: "task_queue", // The name of the queue
                        durable: false, // true if we are declaring a durable queue(the queue will survive a server restart)
                        exclusive: false, // true if we are declaring an exclusive queue (restricted to this connection)
                        autoDelete: false, // true if we are declaring an auto delete queue (server will delete it when no longer in use)
                        arguments: null); // other properties (construction arguments) for the queue

            var consumer = new EventingBasicConsumer(channel);
            // Callback
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body.ToArray();
                var message = Encoding.UTF8.GetString(body);
                Console.Write("Received: {0}", message);

                Thread.Sleep(5000); // Our fake task that will finish every 5 seconds.
                Console.WriteLine(" [Done]"); // When task is done, displays [done].
            };

            // Start receiving messages
            channel.BasicConsume(queue: "task_queue", // the name of the queue
                        autoAck: true,  // true if the server should consider messages acknowledged once delivered;
                                        //false if the server should expect explicit acknowledgements
                        consumer: consumer); // an interface to the consumer object

            Console.WriteLine("Waiting for message/s.");
            Console.ReadLine();
        }
    }
}
 

 

4. Затем разверните проект NewTask, перейдите в файл program.cs и вставьте следующие строки кода:
 

using RabbitMQ.Client;
using System.Text;

class Program
{
    private const string hostName = "10.10.11.18", userName = "testuser", password = "P@ssw0rd";
    public static void Main(string[] args)
    {
        var factory = new ConnectionFactory() { HostName = hostName, UserName = userName, Password = password };
        using (var connection = factory.CreateConnection()) // Creates the RabbitMQ connection
        using (var channel = connection.CreateModel()) // Creates a channel, which is where most of the API for getting things done resides.
        {
            Console.WriteLine("Please enter your message. Type 'exit' to exit.");
            //Declares the queue
            channel.QueueDeclare(queue: "task_queue", // The name of the queue
                        durable: false, // true if we are declaring a durable queue(the queue will survive a server restart)
                        exclusive: false, // true if we are declaring an exclusive queue (restricted to this connection)
                        autoDelete: false, // true if we are declaring an auto delete queue (server will delete it when no longer in use)
                        arguments: null); // other properties for the queue

            var properties = channel.CreateBasicProperties();
            properties.Persistent = true;

            while (true)
            {
                string message = Console.ReadLine();
                if (message?.ToUpper() == "EXIT")
                {
                    break;
                }

                var body = Encoding.UTF8.GetBytes(message); // Converts message to byte array

                //Publish message
                channel.BasicPublish(exchange: "", // the exchange to publish the message to
                              routingKey: "task_queue", // the routing key
                              basicProperties: properties, // other properties for the message
                              body: body); // the message body
            }
        }
    }
}

 

5. Запустите проект Worker три раза и проект New Task два раза, выбрав проект (Worker или New Task) и нажав CTRL+F5.

Окна должны выглядеть так:

Введите сообщения в New Task/Sender столько раз, сколько сможете, и посмотрите, как RabbitMQ последовательно распределит каждое сообщение каждому Worker. Этот процесс называется циклическим.

 

Сообщение

Теперь, когда вы отправляете сообщения, а рабочий процесс все еще обрабатывает задачу, его уничтожение приведет к потере сообщений, которые он обрабатывал в этот момент. Поскольку вы не хотите потерять задачу, вы должны убедиться, что задача продолжается и передается новому работающему работнику.

Чтобы гарантировать, что сообщение никогда не будет потеряно, RabbitMQ поддерживает подтверждения сообщений. Подтверждение (или подтверждение) возвращается потребителем, чтобы сообщить RabbitMQ, что конкретное сообщение было получено и обработано, и что RabbitMQ может удалить его.

Если потребитель умирает ( т., его канал закрыт, соединение закрыто или соединение TCP потеряно) без отправки подтверждения RabbitMQ поймет, что сообщение не было полностью обработано, и его необходимо повторно поставить в очередь. Если в то же время в сети есть другие потребители, RabbitMQ быстро передаст их другому потребителю. Таким образом, вы можете быть уверены, что ни одно сообщение не будет потеряно, даже если работники время от времени умирают.

Нет тайм-аутов сообщений — RabbitMQ повторно доставит сообщение, когда потребитель умрет. Так что это нормально, даже если обработка сообщения занимает очень и очень много времени.

Подтверждение сообщений вручную включено по умолчанию. В предыдущих примерах мы явно отключили их, установив для параметра autoAck ("режим автоматического подтверждения") значение true, чтобы подтверждения сообщений отправлялись автоматически. Теперь мы удаляем этот флаг, и работник вручную отправляет правильное подтверждение после выполнения задачи.
 

Для этого добавьте эту строку в program.cs в проекте Worker:

channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);

Затем установите для autoAck значение false .

 

Используя channel.BasicAck, вы можете быть уверены, что даже если вы убьете Worker (используя CTRL+C), пока он находится в середине обработки сообщения, ничего не будет потеряно. Все неподтвержденные сообщения будут повторно доставлены вскоре после смерти работника.

channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

 

Параметр channel.BasicQoS позволяет указать требуемое качество обслуживания при отправке сообщений. Обратитесь к рисунку выше, чтобы узнать, что означает каждое из этих значений.

preFetchSize : сервер отправит сообщение заранее, если оно равно или меньше по размеру, чем доступный размер предварительной выборки (а также попадает в другие ограничения предварительной выборки). Это значение может быть установлено равным нулю, что означает «нет определенного ограничения», хотя другие ограничения предварительной выборки все еще могут применяться. Prefetch-size игнорируется, если установлен параметр no-ack.

preFetchCount: Задает окно предварительной выборки для целых сообщений. Это поле может использоваться в сочетании с полем размера предварительной выборки; сообщение будет отправлено заранее только в том случае, если это разрешено обоими окнами предварительной выборки (и окнами на уровне канала и соединения). Prefetch-count игнорируется, если установлен параметр no-ack.

global: «По умолчанию настройки QoS применяются только к текущему каналу. Если это поле установлено, они применяются ко всему соединению». Вместо этого RabbitMQ принимает global=false, что означает, что настройки QoS должны применяться к каждому потребителю (для новых потребителей на канале; существующие не затрагиваются), и global=true означает, что настройки QoS должны применяться к каждому каналу.