3.7Kпросмотров
14 августа 2025 г.
📷 ФотоScore: 4.1K
☀ Concurrency Patterns. Fan Out Для начала разберемся в термине Fan Out. Частенько возникают такие ситуации, когда нам необходимо распространить одно и то же значение на несколько потоков, используя каналы. С этой задачей отлично справляется паттерн Fan Out Вы наверняка миллион раз использовали его, но вопрос только в том, насколько осознанно вы это делали. Поэтому на сегодняшней повестке дня вариативность реализаций данного паттерна В базовом случае мы имеем чот такое:
package main import ( "fmt" "math/rand" "time"
) func main() { src := make(chan int) dest1 := make(chan int) dest2 := make(chan int) // Читатели go func() { for v := range dest1 { fmt.Printf("dest1 read %d\n", v) } }() go func() { for v := range dest2 { fmt.Printf("dest2 read %d\n", v) } }() // Fan Out go func() { for v := range src { dest1 <- v dest2 <- v } close(dest1) close(dest2) }() // Генерация данных go func() { rnd := rand.New(rand.NewSource(time.Now().UnixNano())) for i := 0; i < 5; i++ { src <- rnd.Intn(10) } close(src) }() time.Sleep(time.Second)
} Fan In в свою очередь является обратным паттерном, тобишь он собирает пачку значений из N каналов и мапит их в один Далее речь будет идти в терминах Fan Out
Будет душно. Начнем! 1. Синхронный
Так как выполнение у нас синхронное, мы просто итерируемся по каждому из каналов dests и записываем в них числа из src
Playground пример 2. Синхронный с рандомизацией
Представьте, что в вашей пачке каналов для записи есть какой-то бесячий dest, в который мы вечно долго записываем из-за какого внутреннего процессинга. Имея детерминированную последовательность dests, мы рискуем постоянно зависать в одном месте, не давая другим потокам выполнять свой контекст Решение — использовать range по мапе с каналом в качестве ключа. Так как перебор по мапе у нас недетерминирован, мы случайным образом меняем порядок обхода получателей и тем самым более равномерно распределяем записи
Playground пример 3. С пропуском записи
Суть этого метода: не блокировать поток, если какой то из dests не готов принять данные. Используем default для неблокирующего select
Playground пример 4. Bounded concurrent-parallel
Если до этого мы записывали все значения линейно, то в этом случае мы имеем утилизируем возможность распараллеливания, но с сохранением порядка записи данных через sync.WaitGroup
Playground пример 5. Unbounded concurrent-parallel Внимание: WaitGroup вынесена за scope всех циклов Unbounded — неограниченное количество горутин, при которых не сохраняется порядок записи в каналы. Это значит, что в отличие от bounded, горутины на запись разных значений из src могут замапиться на разные очереди виртуальных процессоров P. Это приводит к недетерминированной записи. К примеру: значение 1 будет записано после значения 2 Также стоит учитывать, что мы можем насоздавать безграничное число потоков, забив оперативку ресурсами, которые лежат на наших потоках
Playground пример 6. Unbounded concurrent-parallel with semaphore
Для ограничения количества порождаемых горутин можно использовать semaphore для ограничения количества потоков
Playground пример Статью писали с Дашей: @dariasroom Stay tuned 🦄
#golang #concurrency