714просмотров
70.0%от подписчиков
11 февраля 2026 г.
📷 ФотоScore: 785
💭Привет, это Катя, Flutter Dev Friflex В прошлом посте мы разобрали основы работы с библиотекой dart_amqp 0.3.1: подключение к серверу, создание каналов, работу с очередями и обменниками. Сегодня я покажу, как работать с несколькими RabbitMQ инстансами одновременно в проекте. AmqpMessage
Класс AmqpMessage — это обертка над входящими сообщениями, которая предоставляет удобные методы для работы с данными. consumer.listen((AmqpMessage message) {
// Сырые байты
Uint8List rawBytes = message.payload;
// UTF8 строка
String text = message.payloadAsString;
print('Текст: $text');
// JSON
Map<String, dynamic> json = message.payloadAsJson;
print('User ID: ${json['userId']}');
// Метаданные
print('Exchange: ${message.exchangeName}');
print('Routing key: ${message.routingKey}');
}); Publisher Confirms
Publisher Confirms — это механизм подтверждения от брокера о том, что сообщение было принято и обработано. Важно: ACK от брокера означает, что сообщение сохранено на сервере, но не гарантирует, что его получил потребитель Зачем нужно несколько RabbitMQ соединений?
В микросервисной архитектуре часто возникает необходимость подключаться к разным RabbitMQ серверам:
✔️Разные окружения (dev, staging, production)
✔️Географически распределенные кластеры
✔️Разделение ответственности между сервисами
✔️Миграция между серверами Паттерн Registry для управления соединениями
Вместо создания множества отдельных клиентов, используйте паттерн Registry — централизованное хранилище соединений с разными RabbitMQ инстансами: class RabbitRegistry {
RabbitRegistry(); final Map<String, RabbitService> _registry = {}; /// Инициализация соединений для всех хостов
void saveAll(List<String> hosts) {
_removeAllExcept(hosts.toSet());
for (final host in hosts) {
_registry[host] ??= RabbitService(
host: host,
)..initialize();
}
} /// Получить соединение по хосту
RabbitService read(String host, {bool canCreate = false}) {
if (canCreate) {
return _registry.putIfAbsent(
host,
() => RabbitService(
host: host,
)..initialize(),
);
} final rabbit = _registry[host];
if (rabbit == null) {
throw StateError(
'RabbitService для "$host" не найден. '
'Инициализируйте его через saveAll().',
);
}
return rabbit;
} /// Удалить соединение
void remove(String host) {
_registry.remove(host)?.dispose();
} /// Закрыть все соединения
void dispose() {
for (final service in _registry.values) {
service.dispose();
}
_registry.clear();
}
} Как это работает
1️⃣ Инициализация реестра при старте приложения
При запуске приложения создаем экземпляр RabbitRegistry и передаем ему список хостов RabbitMQ-серверов из конфигурации. Реестр автоматически создаст и инициа