Полное руководство по обменникам, очередям и их настройкам

  • Михаил
  • 12 мин. на прочтение
  • 630
  • 17 Dec 2019
  • 06 Feb 2026

RabbitMQ следует модели "Издатель (Publisher) → Обменник (Exchange) → Очередь (Queue) → Потребитель (Consumer)".
Ключевая идея: Издатели никогда не отправляют сообщения напрямую в очередь. Они отправляют сообщения в обменник, который, следуя определенным правилам (тип обменника и привязки), решает, в какие очереди это сообщение попадет.

Основные компоненты:

Обменник (Exchange): "Почтовое отделение". Получает сообщения и решает, куда их направить.

Очередь (Queue): "Почтовый ящик". Хранит сообщения до обработки потребителем.

Привязка (Binding): "Правило доставки". Связывает обменник с очередью и определяет условие маршрутизации.

Ключ маршрутизации (Routing Key): "Адрес на конверте". Строка, которую издатель прикрепляет к сообщению. Обменник использует её для принятия решения.


Часть 1: Типы обменников и когда их использовать

Выбор типа обменника — самый важный архитектурный при работе с RabbitMQ. Он определяет логику доставки сообщений.

1. Direct Exchange (Прямой обменник)

Принцип работы: Сравнивает ключ маршрутизации сообщения с ключом привязки очереди. Сообщение попадает в очередь только при точном совпадении.

Аналогия: Доставка письма по конкретному адресу (ID заказа, имя пользователя).

Когда использовать:

Точечная (point-to-point) отправка: Одно сообщение — одной конкретной очереди.

Маршрутизация задач: Разные типы задач (routing_key: "image.resize", "email.send", "report.generate") направляются в разные воркер-очереди.

Уведомления для конкретного пользователя/сессии.

Пример:

# Привязки: Queue_A -> ключ "orders.paid", Queue_B -> ключ "orders.cancelled"
channel.basic_publish(exchange='direct_logs',
                     routing_key='orders.paid',  # Сообщение уйдёт ТОЛЬКО в Queue_A
                     body=message)

2. Fanout Exchange (Широковещательный обменник)

Принцип работы: Игнорирует ключ маршрутизации. Рассылает копию сообщения во все привязанные очереди.

Аналогия: Радиовещание. Все, кто настроился на волну, получают сигнал.

Когда использовать:

Событийная архитектура: Одно событие (например, "user.registered") должно быть обработано несколькими независимыми сервисами (отправка приветственного email, создание профиля в CRM, начисление бонусов).

Трансляция обновлений состояния: Рассылка обновлений цены всем подключенным клиентам.

Логирование в несколько мест.

Пример:

# Привязки: Queue_Emails, Queue_Analytics, Queue_Backup привязаны к fanout-обменнику 'events'
channel.basic_publish(exchange='events',  # Ключ routing_key не важен
                     routing_key='',  # Все три очереди получат копию сообщения
                     body=message)

3. Topic Exchange (Тематический обменник)

Принцип работы: Использует шаблоны (паттерны) для ключей маршрутизации и привязки. Сообщение попадает в очередь, если ключ сообщения соответствует шаблону привязки.

Синтаксис шаблонов:

* (звездочка) — заменяет ровно одно слово.

# (решетка) — заменяет ноль или больше слов.

Слова разделяются точкой (.). Пример: "usa.nyc.weather".

Аналогия: Подписка на теги или категории.

Когда использовать:

Фильтрация событий по нескольким критериям: Система уведомлений, где потребители подписываются на интересующие их категории.

Агрегация логов разного уровня и из разных источников.

Сложная маршрутизация, где комбинируются несколько атрибутов (регион.тип.действие).

Пример:

# Привязки:
# Queue_AllLogs -> ключ "#" (все логи)
# Queue_Errors -> ключ "*.error"
# Queue_USA_News -> ключ "usa.news.*"
channel.basic_publish(exchange='topic_logs',
                     routing_key='app.backend.error',  # Попадёт в Queue_AllLogs и Queue_Errors
                     body=message)
channel.basic_publish(exchange='topic_logs',
                     routing_key='usa.news.sports',  # Попадёт в Queue_AllLogs и Queue_USA_News
                     body=message)

4. Headers Exchange (Заголовочный обменник)

Принцип работы: Игнорирует ключ маршрутизации. Решение принимается на основе заголовков сообщения (headers). Привязка указывает условия совпадения (x-match: all — все заголовки должны совпасть, x-match: any — достаточно любого).

Аналогия: Фильтр по атрибутам в каталоге товаров.

Когда использовать:

Когда логика маршрутизации сложна и зависит от множества параметров (версия протокола, тип клиента, флаги), которые неудобно упаковывать в строку ключа.

Встречается реже, чем direct/topic.

Пример (привязка):

Аргументы привязки очереди: { "format": "pdf", "department": "finance", "x-match": "all" }

Сообщение с заголовками {"format": "pdf", "department": "finance"} будет доставлено. Сообщение с {"format": "pdf"} — нет (т.к. x-match: all).


Часть 2: Типы очередей и их настройки

После того как обменник решил, в какую очередь отправить сообщение, важно правильно её настроить.

1. Классические очереди (Classic Queues)

Что это: Стандартный, проверенный временем тип. Хорошо подходит для большинства сценариев с умеренной нагрузкой.

Когда использовать:

Небольшие и средние нагрузки.

Когда не требуется высокая доступность при отказе узлов (или используется Mirrored Queues, но это устаревший подход).

Временные или эксклюзивные очереди.

2. Очереди Quorum (Quorum Queues) — РЕКОМЕНДУЕМЫЙ ВЫБОР

Что это: Современный тип очередей на основе алгоритма консенсуса Raft. Сообщения реплицируются между большинством узлов кластера (N/2 + 1). Очередь остается доступной для записи и чтения, даже если часть узлов отключится.

Когда использовать (почти всегда для production):

Требуется гарантия сохранности данных и высокая доступность.

Работа в кластере.

Новые проекты.

Критичные бизнес-процессы.

Особенности:

Всегда Durable.

Имеют небольшие накладные расходы на репликацию (латентность, дисковое IO).

Не поддерживают автоматическое удаление (auto-delete) и некоторые другие экзотические функции классических очередей.

3. Очереди Streams (Потоки) — для "логов событий"

Что это: Не совсем очередь в классическом понимании. Это реплицируемый, упорядоченный, append-only журнал сообщений. Потребители могут независимо "перематывать" и читать с любого места.

Когда использовать:

Хранение истории событий (event sourcing).

Передача больших объемов данных (например, телеметрии, логов) множеству потребителей.

Когда нужно многократно перечитывать одни и те же сообщения разными системами.

Аналог Apache Kafka в мире RabbitMQ.

Сводная таблица выбора типа очереди

 

КритерийКлассическая очередьОчередь QuorumПоток (Stream)
Гарантии доставкиБазовые (зависит от настроек)Высокие (репликация Raft)Высокие (репликация)
Доступность в кластереОграниченная (mirrored устарели)ОтличнаяХорошая
Пропускная способностьОчень высокаяВысокаяВысокая (для своего сценария)
Задержка (latency)НизкаяСредняя (из-за консенсуса)Низкая/Средняя
Потребление памятиЭффективноеВыше (хранит индекс)Зависит от удержания
Основной сценарийПростые задачи, воркеры, RPCНадёжные бизнес-процессы, событияЖурнал событий, данные временных рядов

Часть 3: Практические настройки и паттерны использования

Паттерн 1: Распределение задач между воркерами (Work Queue)

Обменник: Direct (или Fanout для широковещания задачи всем).

Очередь: Quorum (для надёжности) или Classic.

Ключ: Используется имя очереди. Несколько воркеров подписываются на одну и ту же очередь.

Настройки очереди:

Durable=True

Ручные подтверждения (Manual Ack) для гарантии обработки.

QoS Prefetch Count=1 (или небольшое число) для честного распределения задач между воркерами.

Dead Letter Exchange (DLX) для переноса "проваленных" задач в очередь ошибок.

Паттерн 2: Публикация/Подписка на события (Pub/Sub)

Обменник: Fanout (простая рассылка) или Topic (интеллектуальная подписка по темам).

Очередь: Quorum для важных событий, Classic для временных данных.

Ключевой момент: Каждый сервис-подписчик создаёт свою собственную очередь и привязывает её к обменнику. Это гарантирует, что отключение одного сервиса не повлияет на других.

Паттерн 3: Маршрутизация по темам (Сложная система уведомлений)

Обменник: Topic.

Очередь: Quorum.

Пример: Система уведомлений для трейдинговой платформы.

"stocks.#" → очередь для всех событий по акциям.

"#.alert" → очередь для всех критических алертов.

"stocks.nyse.ibm" → очередь для конкретной акции.

Паттерн 4: Отложенная обработка (Delayed Messages)

Плагин: Требуется плагин rabbitmq_delayed_message_exchange.

Обменник: x-delayed-message (с указанием типа direct, topic и т.д.).

Механизм: Сообщение публикуется в этот обменник с заголовком x-delay (в миллисекундах). Обменник задерживает сообщение и только потом маршрутизирует его в целевую очередь.

Важно: Не путать с TTL (Time-To-Live), который просто удаляет старые сообщения. x-delayed-message обеспечивает именно отложенную доставку.

Критические настройки для Production (на уровне очереди или политики):

TTL (Message/Queue): x-message-ttl. Защита от накопления старых данных. Удаляет сообщения, которые не были обработаны вовремя.

Ограничение длины: x-max-length (сообщения) или x-max-length-bytes (байты). Защита от переполнения памяти при падении потребителей.

Dead Letter Exchange (DLX): x-dead-letter-exchange. Куда отправлять сообщения, которые были отклонены (NACK), истекли по TTL или не поместились из-за ограничения длины. Обязательно настраивайте DLX для обработки ошибок!

Приоритеты: x-max-priority. Позволяет назначать сообщениям приоритет (от 0 до 255). Высокоприоритетные сообщения обрабатываются первыми.


Итоговая шпаргалка по выбору

 

Ваш сценарийРекомендуемый обменникРекомендуемый тип очередиКлючевые настройки
"Отправить задачу свободному воркеру"directquorumManual Ack, Prefetch=1, DLX
"Уведомить все сервисы о событии"fanoutquorumСвоя очередь на сервис
"Отправлять уведомления по подписке (регион, тип)"topicquorumПаттерны привязки: region.type.*
"Гарантированно сохранить историю всех событий"direct/fanoutstreamРазмер удержания (retention)
"Выполнить операцию через 5 минут"x-delayed-messagequorumЗаголовок x-delay
"Временные данные для сессии клиента"directclassicexclusive=True, auto-delete=True

Главный принцип: Начинайте проектирование с событий и их потока, выбирайте подходящий обменник для маршрутизации, а затем настраивайте надёжные очереди (Quorum) с необходимыми политиками (TTL, DLX). Мониторьте длину очередей и скорость обработки.