DEV Community

Cover image for Understanding Go Concurrency Pipelines
Walid LAGGOUNE
Walid LAGGOUNE

Posted on

Understanding Go Concurrency Pipelines

#go

When I first started learning Go, the idea of concurrency sounded exciting but honestly, a bit mysterious too. Goroutines, channels, fan-out, fan-in... all the buzzwords were floating around, but I didn’t know how to piece them together in a meaningful way.

So I decided to build a small project to help things click. The goal was simple: generate random numbers, filter out the prime ones, and do it as fast as possible by using all the CPU cores. Along the way, I ended up using most of the core concurrency concepts that Go offers. Let me walk you through what I built and what I learned.

The Big Idea

Here’s what the program does:

  • Continuously generates random integers
  • Uses multiple workers to check if each number is a prime
  • Combines the output of those workers into one stream
  • Stops after printing 10 prime numbers

Along the way, we’ll use channels, goroutines, and cancellation signals and we’ll see how concurrency in Go just fits together naturally.

The Full Code

Here's the complete code.

package main

import (
    "fmt"
    "math/rand"
    "runtime"
    "sync"
)

// Generator: produces values using a function until canceled
func generator[T any, K any](done <-chan K, fn func() T) <-chan T {
    stream := make(chan T)
    go func() {
        defer close(stream)
        for {
            select {
            case <-done:
                return
            case stream <- fn():
            }
        }
    }()
    return stream
}

// Take: limits how many values we read from a stream
func take[T any, K any](done <-chan K, stream <-chan T, n int) <-chan T {
    taken := make(chan T)
    go func() {
        defer close(taken)
        for i := 0; i < n; i++ {
            select {
            case <-done:
                return
            case taken <- <-stream:
            }
        }
    }()
    return taken
}

// Prime finder: filters primes from a stream of integers
func primeFinder(done <-chan int, randIntStream <-chan int) <-chan int {
    isPrime := func(n int) bool {
        for i := n - 1; i > 1; i-- {
            if n%i == 0 {
                return false
            }
        }
        return true
    }

    primes := make(chan int)
    go func() {
        defer close(primes)
        for {
            select {
            case <-done:
                return
            case n := <-randIntStream:
                if isPrime(n) {
                    primes <- n
                }
            }
        }
    }()
    return primes
}

// Mux (fan-in): merges multiple channels into one
func mux[T any](done <-chan int, channels ...<-chan T) <-chan T {
    var wg sync.WaitGroup
    out := make(chan T)

    transfer := func(c <-chan T) {
        defer wg.Done()
        for v := range c {
            select {
            case <-done:
                return
            case out <- v:
            }
        }
    }

    for _, c := range channels {
        wg.Add(1)
        go transfer(c)
    }

    go func() {
        wg.Wait()
        close(out)
    }()

    return out
}

func main() {
    done := make(chan int)
    defer close(done)

    // Generate random integers
    randInt := func() int {
        return rand.Intn(10000000)
    }
    generatorStream := generator(done, randInt)

    // Launch one prime finder per CPU core
    cpuCount := runtime.NumCPU()
    primeFinderChannels := make([]<-chan int, cpuCount)
    for i := 0; i < cpuCount; i++ {
        primeFinderChannels[i] = primeFinder(done, generatorStream)
    }

    // Merge all prime finders into one output
    for prime := range take(done, mux(done, primeFinderChannels...), 10) {
        fmt.Println(prime)
    }
}

Enter fullscreen mode Exit fullscreen mode

Now Let’s Break It Down

1. Generator

The generator function produces values continuously by calling a function (fn()) and sending the result on a channel.

Why the done channel? It allows us to cancel the generator from the outside, which is important to avoid goroutines running forever.

generatorStream := generator(done, randInt)
Enter fullscreen mode Exit fullscreen mode

This gives us a stream of random numbers.

2. Take

The take function just reads the first n values from a stream, and then stops. It's like saying, “I only want 10 results.”

take(done, stream, 10)
Enter fullscreen mode Exit fullscreen mode

This is how we make sure the program ends after getting 10 primes.

3. Prime Checker

This part listens to random numbers and filters the ones that are prime.

func isPrime(n int) bool {
    // Very simple primality check
}
Enter fullscreen mode Exit fullscreen mode

It's not the most optimized algorithm, but it’s perfect for this demo, especially because it’s intentionally slow, so we can see the benefit of running it in parallel.

4. Fan-Out: Run Multiple Prime Checkers

Instead of having just one goroutine checking for primes, we spin up one per CPU core. This way, each core can work independently and in parallel.

cpuCount := runtime.NumCPU()
for i := 0; i < cpuCount; i++ {
    primeFinderChannels[i] = primeFinder(done, generatorStream)
}
Enter fullscreen mode Exit fullscreen mode

Now we have several independent workers filtering primes at the same time.

5. Fan-In: Merging Results

Once we have multiple channels from the prime checkers, we want to combine them into one stream. That’s what mux does it merges multiple input channels into a single output.

mux(done, primeFinderChannels...)
Enter fullscreen mode Exit fullscreen mode

This gives us one unified stream of prime numbers from all workers.

6. Final Output

Now we just take the first 10 primes from the merged stream and print them.

for prime := range take(done, mux(...), 10) {
    fmt.Println(prime)
}
Enter fullscreen mode Exit fullscreen mode

Once we’ve printed 10 primes, we close the done channel, and everything shuts down gracefully.
What I Learned

This small project helped me understand some important Go concurrency patterns:

  • Generators are a clean way to build infinite or cancellable streams.

  • Fan-out helps you distribute slow tasks across multiple goroutines.

  • Fan-in lets you merge results from those goroutines back into one place.

  • Channels and select statements give you a ton of control without needing locks or semaphores.

  • And most importantly: clean shutdown matters. The done channel ensures no goroutine is left behind.

Final Thoughts

Even a small project like this can teach you how to structure concurrent programs in an elegant and idiomatic way.

Warp.dev image

Warp is the #1 coding agent.

Warp outperforms every other coding agent on the market, and gives you full control over which model you use. Get started now for free, or upgrade and unlock 2.5x AI credits on Warp's paid plans.

Download Warp

Top comments (0)