Patrones concurrentes: worker pools, fan-in/fan-out
Goroutines y channels son los ladrillos de la concurrencia en Go. Combinándolos surgen patrones reusables — worker pools, fan-out/fan-in, pipelines y done channels — que cubren la mayoría de los problemas de concurrencia que encontrarás.
Worker pool: limitar la concurrencia
Lanzar una goroutine por cada tarea suena tentador, pero si las tareas son miles y cada una consume CPU, red o memoria, el sistema colapsa. Un worker pool lanza un número fijo de workers que consumen tareas de un channel compartido.
package main
import (
"fmt"
"sync"
"time"
)
type Job struct {
ID int
Value int
}
type Result struct {
JobID int
Output int
}
// worker consume jobs de 'jobs' y publica resultados en 'results'
// El channel 'jobs' se cierra desde fuera para señalizar fin de trabajo
func worker(id int, jobs <-chan Job, results chan<- Result, wg *sync.WaitGroup) {
defer wg.Done()
for job := range jobs {
time.Sleep(10 * time.Millisecond) // simula trabajo
results <- Result{JobID: job.ID, Output: job.Value * 2}
_ = id
}
}
func main() {
const numWorkers = 3
const numJobs = 6
jobs := make(chan Job, numJobs)
results := make(chan Result, numJobs)
var wg sync.WaitGroup
// Lanzar workers
for w := 1; w <= numWorkers; w++ {
wg.Add(1)
go worker(w, jobs, results, &wg)
}
// Encolar jobs y cerrar el channel (señal de fin)
for j := 1; j <= numJobs; j++ {
jobs <- Job{ID: j, Value: j * 10}
}
close(jobs)
// Cerrar 'results' cuando todos los workers terminen
go func() {
wg.Wait()
close(results)
}()
// Recolectar resultados
total := 0
for r := range results {
total += r.Output
}
fmt.Println("Suma de outputs:", total) // (10+20+30+40+50+60) * 2 = 420
}Suma de outputs: 420Regla idiomática: el productor cierra el channel, nunca el consumidor. En el worker pool, quien encola los jobs (main) cierra jobs; quien produce results (los workers, coordinados con un WaitGroup) cierra results.
Fan-out: distribuir trabajo a N goroutines
Fan-out significa que un único productor envía tareas a varias goroutines consumidoras que trabajan en paralelo. Es la base del worker pool y se usa cuando el trabajo es CPU-bound o tiene latencia (I/O).
package main
import (
"fmt"
"sync"
)
// producer emite los números 1..n en un channel y lo cierra
func producer(n int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for i := 1; i <= n; i++ {
out <- i
}
}()
return out
}
func main() {
source := producer(5)
// Fan-out: 3 workers procesan el mismo 'source'
const numWorkers = 3
channels := make([]<-chan int, numWorkers)
for i := 0; i < numWorkers; i++ {
channels[i] = squareWorker(source)
}
// Recolectar de los 3 channels (fan-in en la siguiente sección)
var wg sync.WaitGroup
wg.Add(numWorkers)
total := 0
var mu sync.Mutex
for _, c := range channels {
go func(ch <-chan int) {
defer wg.Done()
for v := range ch {
mu.Lock()
total += v
mu.Unlock()
}
}(c)
}
wg.Wait()
fmt.Println("Suma de cuadrados:", total) // 1+4+9+16+25 = 55
}
func squareWorker(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for v := range in {
out <- v * v
}
}()
return out
}Suma de cuadrados: 55Cuando N workers leen del mismo channel, cada valor lo recibe uno solo de ellos (la entrega es exclusiva). Así, fan-out reparte el trabajo de forma natural: el primer worker libre toma el siguiente valor.
Fan-in: mergear N channels en uno
Fan-in es la operación inversa: tomar varios channels y combinarlos en uno único. Es la pieza que falta para cerrar un pipeline con fan-out — los consumidores quieren leer de un solo lugar.
package main
import (
"fmt"
"sort"
"sync"
)
// merge implementa fan-in: combina N channels en uno solo
// Lanza una goroutine por cada channel de entrada que reenvía a 'out'
// y cierra 'out' cuando todas terminaron (sync.WaitGroup)
func merge(channels ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
output := func(c <-chan int) {
defer wg.Done()
for v := range c {
out <- v
}
}
wg.Add(len(channels))
for _, c := range channels {
go output(c)
}
// Goroutine "closer": cierra out cuando todos los inputs terminaron
go func() {
wg.Wait()
close(out)
}()
return out
}
func source(values ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, v := range values {
out <- v
}
}()
return out
}
func main() {
a := source(1, 2, 3)
b := source(10, 20, 30)
c := source(100, 200, 300)
merged := merge(a, b, c)
results := []int{}
for v := range merged {
results = append(results, v)
}
sort.Ints(results) // el orden de fan-in no es determinista
fmt.Println(results) // [1 2 3 10 20 30 100 200 300]
}[1 2 3 10 20 30 100 200 300]Pipeline: stages conectados por channels
Un pipeline es una serie de stages donde cada uno recibe valores por un channel de entrada, los transforma y los emite por uno de salida. Cada stage es una goroutine independiente y la composición se hace conectando channels.
package main
import "fmt"
// Stage 1: generar números 1..n
func gen(nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
out <- n
}
}()
return out
}
// Stage 2: elevar al cuadrado
func sq(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
out <- n * n
}
}()
return out
}
// Stage 3: filtrar pares
func evens(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
if n%2 == 0 {
out <- n
}
}
}()
return out
}
func main() {
// Composición del pipeline: gen → sq → evens → consumer
pipeline := evens(sq(gen(1, 2, 3, 4, 5)))
for v := range pipeline {
fmt.Println(v) // 4 16 (los cuadrados pares de 1..5)
}
}4
16Done channel: cancelación cooperativa
Cuando un consumidor decide cortar antes de tiempo (error, timeout, cancelación del usuario), todas las goroutines del pipeline deben terminar para evitar leaks. El patrón done channel (precursor de context.Context) lo resuelve: un channel compartido que, al cerrarse, señala "abandonen todo".
package main
import "fmt"
func gen(done <-chan struct{}, nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
select {
case out <- n:
case <-done:
return // cancelación: salir limpio
}
}
}()
return out
}
func sq(done <-chan struct{}, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
select {
case out <- n * n:
case <-done:
return
}
}
}()
return out
}
func main() {
done := make(chan struct{})
// defer close(done) garantiza que TODAS las goroutines del pipeline
// terminen aunque el consumidor corte el for antes de tiempo
defer close(done)
pipeline := sq(done, gen(done, 1, 2, 3, 4, 5))
// Solo consumimos los primeros 2 valores y nos vamos
count := 0
for v := range pipeline {
fmt.Println(v)
count++
if count == 2 {
break // 'defer close(done)' cancela el resto
}
}
}1
4El patrón done channel es la base conceptual de context.Context. En código de producción usá ctx.Done() en vez de un done manual: te da además deadlines, valores y la convención compartida del ecosistema.