G
go-with-me
@golangwithus1.9K подп.
9.9Kпросмотров
31 августа 2025 г.
📷 ФотоScore: 10.9K
⚡️ Concurrency Patterns. Fan In В предыдущем посте мы немного напутали с определением такого многопоточного паттерна как Fan Out На деле мы показывали Tee, который распространяет одно и то же значение V из канала-источника на N каналов-потребителей Отличие Fan Out от Tee в том, что на N каналов распространяются разные значения из одного канала-источника. Тобишь, воркеры тянут значения из одного канала, борясь за них насмерть Лирическое отступление закончено, наша совесть чиста, а сегодняшняя тема будет посвящена Fan In Этот паттерн является обратным для Fan-Out. Мы собираем данные из нескольких каналов-источников и направляем их в один общий канал-потребитель 1. Default Итак, что мы имеем? — Есть воркеры — они кладут значения в N каналов и являются продьюсерами — Каждый из этих N каналов будет получать значения от своего продьюсера. Назовем такие каналы "стоковыми" — Есть один общий канал out, туда будет нужно отправить все значения из стоковых каналов — Для этого мы запускаем N потоков, каждый из которых слушает свой стоковый канал, куда кладет значения продьюсер и редиректит все значения в out Playground пример Но что же будет, если какой-то наш продьюсер потух и больше не шлет никаких значений, а переданный контекст не отменен? Как бы нам понять, что воркер не является активным и перезапустить его? — в этом нам поможет такой механизм как "Heartbeats" Heartbeat — это регулярное сообщение от продьюсера/воркера, подтверждающее, что он жив и работает 2. Heartbeats Приступим к рассмотрению этого чуда! Основная идея проста: — Имеем структуру, которая хранит в себе стоковый канал, используемый как пайп между воркером и стоком, и канал "сердцебиений" — Функция Supervise ответственна за отслеживание "сердцебиений" и перезапуск воркера при их отсутствии по TTL — Функция FanIn принимает на вход стоковые каналы и возвращает результирующий канал, из которого можно читать данные Всмотримся в наши функции поподробнее 2.1. FanIn — Не отклоняемся от цели: выкачиваем данные из стоковых каналов и перекладываем в out, реагируя на контекст и неблокирующе отправляя "сердцебиение" нашему супервизору, который пристально наблюдает за нашими воркерами — WaitGroup здесь так же используется для того, чтобы дождаться конца работы наших стоков и отдать управление основному потоку после дренажа всех "живых" значений 2.2 Supervise — Создаем стоковый канал и канал "сердцебиений", агрегируем эти значения в структуре Source и возвращаем ее — В отдельном потоке запускаем нашу рутину по отслеживанию и перезапуску воркеров 2.2.1 Смотрим на внутренности запущенного потока внутри Supervise — Изначально происходит создание дочернего контекста с отменой для нашего воркера. Этот контекст будет рулить в тот момент, когда наш TTL пройдет и надо будет потушить воркера — Создаем ticker, который будет слать ивенты, семантически значащие следующее: "в нашего воркера стреляли и он упал в лужу на..." После получения ивента мы отменяем контекст и воркер окончательно "задыхается в луже" — Первично запускаем работягу в отдельном потоке — Если ловим ивент от тикера: производим отмену контекста, переназначаем этот же контекст и функцию отмены, сбрасываем таймер, и запускаем нового воркера в отдельном потоке — В случае, когда из стокового потока нам пришло "сердцебиение" мы просто сбрасываем таймер и движемся дальше! Стоит отметить, что воркеры должны "реагировать" на переданный контекст. Без этого мы получим утечку потоков и черт его знает, чем нам это грозит (профилированием и устранением проблемы, которой можно было бы и избежать) Таким образом, мы получаем более надежный Fan In, где все источники данных контролируемы и восстанавливаемы при зависаниях Playground пример Статью писали с Дашей: @dariasroom Stay tuned 😏 #golang #concurrency
9.9K
просмотров
3778
символов
Да
эмодзи
Да
медиа

Другие посты @golangwithus

Все посты канала →
⚡️ Concurrency Patterns. Fan In В предыдущем посте мы немног — @golangwithus | PostSniper