4.9Kпросмотров
21 сентября 2025 г.
📷 ФотоScore: 5.4K
👻 Concurrency Patterns. Bridge Следующая ступень после Or Done Channel паттерна — Bridge. Сначала разберемся с какой ситуацией помогает бороться эта модель Представим, что у нас время от времени динамически появляются каналы с данными, и их число заранее неизвестно. Все эти стримы мы хотим слить в единый канал, откуда и планируем получать данные. Наш потребитель как раз и будет высасывать все-все из этого стрима Кратенько:
— Источники (каналы) создаются динамически (по событиям / шардам / тайм-слайсам)
— Внутренние каналы имеют свой lifecycle, по истечении которого закрываются
— Потребителю нужна единая точка чтения, без координации множества каналов Перед освоением данного материала рекомендуем обратиться к статье про Or Done Channel Ну что ж, поехали! 1. Synchronous Bridge
В синхронной версии бриджа мы будем дрейнить данные из поступающих стримов последовательно, то есть вычитывать все из полученного стрима и только потом переходить к следующему. Так же мы будем использовать Or Done паттерн, чтобы избавить себя от лишних мук с обработкой завершения вычитки:
Playground пример Что мы здесь видим?
— Результирующий канал out, мгновенно возвращающийся из функции
— Канал done, содержащий ивенты завершения нашей работы
— Канал streams, спавнящий нам каналы для вычитки данных и последующей перекладки в out Обсудим процесс подробнее:
— Внутри нашего потока мы создаем переменную stream, хранящую текущий стрим вычитки
— Спинимся в бесконечном цикле в поисках нового стрима, реагируя на ивент завершения
— Как только находим новый стрим для вычитки, мы мигом оборачиваем его в OrDone, чтобы закрыть канал по окончании вычитки
— Далее мы просто вычитываем из него данные и перекладываем в результирующий канал В целом, ничего сложного, но есть нюанс. Нюанс возникает тогда, когда в канал streams насыпают ну ож очень много стримов на единицу времени. Синхронная модель в данном случае будет наступать нам на горло, находясь в ожидании N-го стрима. После профилирования лочек на каналах вы покрутите у виска и придете к мысли, что надо бы эту проблему решить. А решение просто — распараллелить обработку стримов 2. Asynchronous Bridge
Исходя из наших потребностей мы пишем вот такую портянку:
func BridgeSemaphoreT any <-chan T { if parallel <= 0 { parallel = 1 } out := make(chan T) go func() { defer close(out) var wg sync.WaitGroup sem := make(chan struct{}, parallel) fwd := func(stream <-chan T) { defer func() { <-sem }() for v := range OrDone(done, stream) { select { case <-done: return case out <- v: } } } for { select { case <-done: wg.Wait() return case s, ok := <-streams: if !ok { wg.Wait() return } sem <- struct{}{} wg.Go(func() { fwd(s) }) } } }() return out
}
Видим мы уж очень много знакомого, но стоит обсудить нововведения, а именно: 2.1 WaitGroup
Необходимость наличия группы здесь обусловлена нуждой дождаться всех наших воркеров, усердно сливающих данные в канал out. Именно поэтому в ветках select'a при выходе из функции дочернего потока мы прожимаем wg.Wait() 2.2 Semaphore
При резво-растущем количестве стримов мы можем столкнуться с проблемой, когда дочерних потоков будет создаваться безмерное количество. С целью подавить это мы используем самый простецкий семафор, сделанный на канале пустых структур. Можно было бы это провернуть, используя Worker Pool:
Playground пример 2.3 Forwarder Closure
Функция fwd инкапсулирует логику вычитки из стрима с использованием семафора и OrDone канала Получили уж очень хороший мостик! Сегодня вы заимели в арсенал еще один паттерн, чему мы очень рады Статью писали с Дашей:
@dariasroom Спасибо, что читаете и остаетесь с нами!
Stay tuned 🧑💻 #golang #concurrency