When I first started learning Go, the idea of concurrency sounded exciting but honestly, a bit mysterious too. Goroutines, channels, fan-out, fan-in... all the buzzwords were floating around, but I didn’t know how to piece them together in a meaningful way.
So I decided to build a small project to help things click. The goal was simple: generate random numbers, filter out the prime ones, and do it as fast as possible by using all the CPU cores. Along the way, I ended up using most of the core concurrency concepts that Go offers. Let me walk you through what I built and what I learned.
The Big Idea
Here’s what the program does:
- Continuously generates random integers
- Uses multiple workers to check if each number is a prime
- Combines the output of those workers into one stream
- Stops after printing 10 prime numbers
Along the way, we’ll use channels, goroutines, and cancellation signals and we’ll see how concurrency in Go just fits together naturally.
The Full Code
Here's the complete code.
package main
import (
"fmt"
"math/rand"
"runtime"
"sync"
)
// Generator: produces values using a function until canceled
func generator[T any, K any](done <-chan K, fn func() T) <-chan T {
stream := make(chan T)
go func() {
defer close(stream)
for {
select {
case <-done:
return
case stream <- fn():
}
}
}()
return stream
}
// Take: limits how many values we read from a stream
func take[T any, K any](done <-chan K, stream <-chan T, n int) <-chan T {
taken := make(chan T)
go func() {
defer close(taken)
for i := 0; i < n; i++ {
select {
case <-done:
return
case taken <- <-stream:
}
}
}()
return taken
}
// Prime finder: filters primes from a stream of integers
func primeFinder(done <-chan int, randIntStream <-chan int) <-chan int {
isPrime := func(n int) bool {
for i := n - 1; i > 1; i-- {
if n%i == 0 {
return false
}
}
return true
}
primes := make(chan int)
go func() {
defer close(primes)
for {
select {
case <-done:
return
case n := <-randIntStream:
if isPrime(n) {
primes <- n
}
}
}
}()
return primes
}
// Mux (fan-in): merges multiple channels into one
func mux[T any](done <-chan int, channels ...<-chan T) <-chan T {
var wg sync.WaitGroup
out := make(chan T)
transfer := func(c <-chan T) {
defer wg.Done()
for v := range c {
select {
case <-done:
return
case out <- v:
}
}
}
for _, c := range channels {
wg.Add(1)
go transfer(c)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
done := make(chan int)
defer close(done)
// Generate random integers
randInt := func() int {
return rand.Intn(10000000)
}
generatorStream := generator(done, randInt)
// Launch one prime finder per CPU core
cpuCount := runtime.NumCPU()
primeFinderChannels := make([]<-chan int, cpuCount)
for i := 0; i < cpuCount; i++ {
primeFinderChannels[i] = primeFinder(done, generatorStream)
}
// Merge all prime finders into one output
for prime := range take(done, mux(done, primeFinderChannels...), 10) {
fmt.Println(prime)
}
}
Now Let’s Break It Down
1. Generator
The generator function produces values continuously by calling a function (fn()) and sending the result on a channel.
Why the done channel? It allows us to cancel the generator from the outside, which is important to avoid goroutines running forever.
generatorStream := generator(done, randInt)
This gives us a stream of random numbers.
2. Take
The take function just reads the first n values from a stream, and then stops. It's like saying, “I only want 10 results.”
take(done, stream, 10)
This is how we make sure the program ends after getting 10 primes.
3. Prime Checker
This part listens to random numbers and filters the ones that are prime.
func isPrime(n int) bool {
// Very simple primality check
}
It's not the most optimized algorithm, but it’s perfect for this demo, especially because it’s intentionally slow, so we can see the benefit of running it in parallel.
4. Fan-Out: Run Multiple Prime Checkers
Instead of having just one goroutine checking for primes, we spin up one per CPU core. This way, each core can work independently and in parallel.
cpuCount := runtime.NumCPU()
for i := 0; i < cpuCount; i++ {
primeFinderChannels[i] = primeFinder(done, generatorStream)
}
Now we have several independent workers filtering primes at the same time.
5. Fan-In: Merging Results
Once we have multiple channels from the prime checkers, we want to combine them into one stream. That’s what mux does it merges multiple input channels into a single output.
mux(done, primeFinderChannels...)
This gives us one unified stream of prime numbers from all workers.
6. Final Output
Now we just take the first 10 primes from the merged stream and print them.
for prime := range take(done, mux(...), 10) {
fmt.Println(prime)
}
Once we’ve printed 10 primes, we close the done channel, and everything shuts down gracefully.
What I Learned
This small project helped me understand some important Go concurrency patterns:
Generators are a clean way to build infinite or cancellable streams.
Fan-out helps you distribute slow tasks across multiple goroutines.
Fan-in lets you merge results from those goroutines back into one place.
Channels and select statements give you a ton of control without needing locks or semaphores.
And most importantly: clean shutdown matters. The done channel ensures no goroutine is left behind.
Final Thoughts
Even a small project like this can teach you how to structure concurrent programs in an elegant and idiomatic way.
Top comments (0)