3.3Kпросмотров
20 сентября 2025 г.
📷 ФотоScore: 3.7K
😠 Concurrency Patterns. Or Done Channel Отгремев на MTC True Tech Go и собравшись с силами, мы решили порадовать вас новым постом! Сегодня вещаем о таком механизме как Or Done Channel Or Done Channel применим, когда мы выполняем какую-то операцию и зависимы от пачки ивентов завершения. Под пачкой ивентов подразумевается множество каналов, которые в неопределенный момент присылают токен завершения (к примеру пустую структуру) Пу-пу-пу, погнали! Обсудим использование этого паттерна на примере обычной перекладки значения из одного канала в другой 1. Базовый случай
func OrDoneT any <-chan T { out := make(chan T) go func() { defer close(out) for { select { case <-done: return case v, ok := <-src: if !ok { return } select { case <-done: return case out <- v: } } } }() return out
}
В простейшем примере мы просто забираем значение из src и кладем его в результирующий канал out, но при этом завершаем работу при получении ивента из done Здесь все достаточно просто:
— Создаем результирующий канал, немедленно возвращаемый из функции
— Запускаем поток вычитки из src и перекладки значения в out
— Постоянно реагируем на ивент из done как при вычитке, так и перекладке Получаем механизм, который уж очень похож на работу с переданным контекстом! Все становится сложнее, когда таких каналов с ивентами отмены становится несколько. Сейчас мы на это и взглянем! 2. Множество каналов с ивентами завершения
func OrAny(dones ...<-chan struct{}) <-chan struct{} { switch len(dones) { case 0: closed := make(chan struct{}) close(closed) return closed case 1: return dones[0] } out := make(chan struct{}) go func() { defer close(out) switch len(dones) { case 2: select { case <-dones[0]: case <-dones[1]: } } m := len(dones) / 2 select { case <-OrAny(dones[:m]...): case <-OrAny(dones[m:]...): } }() return out
}
В таком кейсе мы будем использовать рекурсивный подход 2.1 Проверяем базовые случаи рекурсии
— Если длина массива каналов с ивентами завершения равна 0, то возвращаем канал, который сразу закрывается
— Если канал с ивентом завершения всего лишь один, возвращаем его 2.2 Рекурсивный случай
— Создаем результирующий канал верхнего уровня, который будет закрыт тогда, когда одна из рекурсивных веток будет закрыта
— Запускаем поток, в котором регистрируем defer секцию с закрытием результирующего канала
— Проверяем крайний случай, когда длина массива с каналами равна двум
— Пройдя все корнер-кейсы, разбиваем массив на две части и запускаем эту же функцию рекурсивно и кладем результирующие каналы из этих частей в select. Этим мы запускаем шайтан-машину отмены! Теперь нам осталось склеить два наших кейса и получить очень удобный инструмент, давайте это и сделаем:
func OrDoneNT any <-chan T { return OrDone(src, orAny(dones...))
} Блюдо готово к подаче! Посмотрим на юзкейсы
1. Pipeline + Or Done
Имеем цепочку конвейера: stage1 -> ... -> stage# и хотим оборвать ее в случае получения ивента:
for v := range OrDone(done, in) { out <- process(v) } 2. Fan In + Or Done
Производим слияние нескольких каналов-продьюсеров в один результирующий канал и аналогично завершаемся по ивенту:
for _, ch := range inputs { go func(ch <-chan T) { for v := range OrDone(done, ch) { out <- v } }(ch)
} 3. Fan Out + Or Done
Шарим канал с тасками в множество потоков и так же завершаем наших воркеров по ивенту:
func worker(done <-chan struct{}, tasks <-chan Task) { for t := range OrDone(done, tasks) { handle(t) }
} В итоге, OrDone — это маленький, но очень мощный строительный блок для конвейеров. Он делает код безопасным к отмене и защищает от утечек ваших потоков.
OrAny расширяет его на несколько сигналов. Вместе это формирует паттерн, который стоит держать в кармане каждому Статью писали с Дашей: @dariasroom Stay tuned 🔥
#golang #concurrency