CAP 10 · LEC 02·Patrones idiomáticos

Patrones concurrentes: worker pools, fan-in/fan-out

Goroutines y channels son los ladrillos de la concurrencia en Go. Combinándolos surgen patrones reusables — worker pools, fan-out/fan-in, pipelines y done channels — que cubren la mayoría de los problemas de concurrencia que encontrarás.

● AVANZADO9 min lecturapor Fernando Herrera · actualizado mayo de 2026
¿Encontraste un error o algo que mejorar?Editá esta lección en GitHub →

Worker pool: limitar la concurrencia

Lanzar una goroutine por cada tarea suena tentador, pero si las tareas son miles y cada una consume CPU, red o memoria, el sistema colapsa. Un worker pool lanza un número fijo de workers que consumen tareas de un channel compartido.

package main import ( "fmt" "sync" "time" ) type Job struct { ID int Value int } type Result struct { JobID int Output int } // worker consume jobs de 'jobs' y publica resultados en 'results' // El channel 'jobs' se cierra desde fuera para señalizar fin de trabajo func worker(id int, jobs <-chan Job, results chan<- Result, wg *sync.WaitGroup) { defer wg.Done() for job := range jobs { time.Sleep(10 * time.Millisecond) // simula trabajo results <- Result{JobID: job.ID, Output: job.Value * 2} _ = id } } func main() { const numWorkers = 3 const numJobs = 6 jobs := make(chan Job, numJobs) results := make(chan Result, numJobs) var wg sync.WaitGroup // Lanzar workers for w := 1; w <= numWorkers; w++ { wg.Add(1) go worker(w, jobs, results, &wg) } // Encolar jobs y cerrar el channel (señal de fin) for j := 1; j <= numJobs; j++ { jobs <- Job{ID: j, Value: j * 10} } close(jobs) // Cerrar 'results' cuando todos los workers terminen go func() { wg.Wait() close(results) }() // Recolectar resultados total := 0 for r := range results { total += r.Output } fmt.Println("Suma de outputs:", total) // (10+20+30+40+50+60) * 2 = 420 }
SalidaSuma de outputs: 420
Quién cierra el channel

Regla idiomática: el productor cierra el channel, nunca el consumidor. En el worker pool, quien encola los jobs (main) cierra jobs; quien produce results (los workers, coordinados con un WaitGroup) cierra results.

Fan-out: distribuir trabajo a N goroutines

Fan-out significa que un único productor envía tareas a varias goroutines consumidoras que trabajan en paralelo. Es la base del worker pool y se usa cuando el trabajo es CPU-bound o tiene latencia (I/O).

package main import ( "fmt" "sync" ) // producer emite los números 1..n en un channel y lo cierra func producer(n int) <-chan int { out := make(chan int) go func() { defer close(out) for i := 1; i <= n; i++ { out <- i } }() return out } func main() { source := producer(5) // Fan-out: 3 workers procesan el mismo 'source' const numWorkers = 3 channels := make([]<-chan int, numWorkers) for i := 0; i < numWorkers; i++ { channels[i] = squareWorker(source) } // Recolectar de los 3 channels (fan-in en la siguiente sección) var wg sync.WaitGroup wg.Add(numWorkers) total := 0 var mu sync.Mutex for _, c := range channels { go func(ch <-chan int) { defer wg.Done() for v := range ch { mu.Lock() total += v mu.Unlock() } }(c) } wg.Wait() fmt.Println("Suma de cuadrados:", total) // 1+4+9+16+25 = 55 } func squareWorker(in <-chan int) <-chan int { out := make(chan int) go func() { defer close(out) for v := range in { out <- v * v } }() return out }
SalidaSuma de cuadrados: 55
Channels comparten, no copian trabajo

Cuando N workers leen del mismo channel, cada valor lo recibe uno solo de ellos (la entrega es exclusiva). Así, fan-out reparte el trabajo de forma natural: el primer worker libre toma el siguiente valor.

Fan-in: mergear N channels en uno

Fan-in es la operación inversa: tomar varios channels y combinarlos en uno único. Es la pieza que falta para cerrar un pipeline con fan-out — los consumidores quieren leer de un solo lugar.

package main import ( "fmt" "sort" "sync" ) // merge implementa fan-in: combina N channels en uno solo // Lanza una goroutine por cada channel de entrada que reenvía a 'out' // y cierra 'out' cuando todas terminaron (sync.WaitGroup) func merge(channels ...<-chan int) <-chan int { var wg sync.WaitGroup out := make(chan int) output := func(c <-chan int) { defer wg.Done() for v := range c { out <- v } } wg.Add(len(channels)) for _, c := range channels { go output(c) } // Goroutine "closer": cierra out cuando todos los inputs terminaron go func() { wg.Wait() close(out) }() return out } func source(values ...int) <-chan int { out := make(chan int) go func() { defer close(out) for _, v := range values { out <- v } }() return out } func main() { a := source(1, 2, 3) b := source(10, 20, 30) c := source(100, 200, 300) merged := merge(a, b, c) results := []int{} for v := range merged { results = append(results, v) } sort.Ints(results) // el orden de fan-in no es determinista fmt.Println(results) // [1 2 3 10 20 30 100 200 300] }
Salida[1 2 3 10 20 30 100 200 300]

Pipeline: stages conectados por channels

Un pipeline es una serie de stages donde cada uno recibe valores por un channel de entrada, los transforma y los emite por uno de salida. Cada stage es una goroutine independiente y la composición se hace conectando channels.

package main import "fmt" // Stage 1: generar números 1..n func gen(nums ...int) <-chan int { out := make(chan int) go func() { defer close(out) for _, n := range nums { out <- n } }() return out } // Stage 2: elevar al cuadrado func sq(in <-chan int) <-chan int { out := make(chan int) go func() { defer close(out) for n := range in { out <- n * n } }() return out } // Stage 3: filtrar pares func evens(in <-chan int) <-chan int { out := make(chan int) go func() { defer close(out) for n := range in { if n%2 == 0 { out <- n } } }() return out } func main() { // Composición del pipeline: gen → sq → evens → consumer pipeline := evens(sq(gen(1, 2, 3, 4, 5))) for v := range pipeline { fmt.Println(v) // 4 16 (los cuadrados pares de 1..5) } }
Salida4 16

Done channel: cancelación cooperativa

Cuando un consumidor decide cortar antes de tiempo (error, timeout, cancelación del usuario), todas las goroutines del pipeline deben terminar para evitar leaks. El patrón done channel (precursor de context.Context) lo resuelve: un channel compartido que, al cerrarse, señala "abandonen todo".

package main import "fmt" func gen(done <-chan struct{}, nums ...int) <-chan int { out := make(chan int) go func() { defer close(out) for _, n := range nums { select { case out <- n: case <-done: return // cancelación: salir limpio } } }() return out } func sq(done <-chan struct{}, in <-chan int) <-chan int { out := make(chan int) go func() { defer close(out) for n := range in { select { case out <- n * n: case <-done: return } } }() return out } func main() { done := make(chan struct{}) // defer close(done) garantiza que TODAS las goroutines del pipeline // terminen aunque el consumidor corte el for antes de tiempo defer close(done) pipeline := sq(done, gen(done, 1, 2, 3, 4, 5)) // Solo consumimos los primeros 2 valores y nos vamos count := 0 for v := range pipeline { fmt.Println(v) count++ if count == 2 { break // 'defer close(done)' cancela el resto } } }
Salida1 4
En código real: context.Context

El patrón done channel es la base conceptual de context.Context. En código de producción usá ctx.Done() en vez de un done manual: te da además deadlines, valores y la convención compartida del ecosistema.