DEV Community

Jones Charles
Jones Charles

Posted on

1 1 1

Mastering GoFrame's gqueue: A Practical Guide to In-Memory Queues in Go ๐Ÿš€

Hey there, Gophers! ๐Ÿ‘‹

Ever found yourself needing a lightweight, in-memory queue in Go that's both powerful and easy to use? Well, that's exactly what we're going to explore today with GoFrame's gqueue component!

What's gqueue and Why Should You Care? ๐Ÿค”

First off, let me paint a picture. You're building a service that needs to:

  • Process tasks asynchronously
  • Handle traffic spikes gracefully
  • Manage background jobs efficiently

This is where gqueue shines! It's an in-memory queue implementation that comes bundled with GoFrame, offering a perfect balance of simplicity and power.

Quick Start: Your First gqueue ๐ŸŒฑ

Let's dive right in with a simple example:

package main

import (
    "github.com/gogf/gf/v2/container/gqueue"
    "fmt"
)

func main() {
    // Create a queue that can hold 10 items
    q := gqueue.New(10)

    // Push some data
    q.Push("Hello, gqueue!")

    // Get the data back
    if value := q.Pop(); value != nil {
        fmt.Printf("Got: %v\n", value)
    }
}
Enter fullscreen mode Exit fullscreen mode

Simple, right? But wait, there's so much more we can do! ๐ŸŽจ

The Cool Features You'll Love โœจ

Here's what makes gqueue stand out from regular channels or standard library queues:

  1. Thread-Safe by Default - No more mutex headaches!
  2. Flexible Capacity - Starts small, grows as needed
  3. Batch Operations - Process multiple items efficiently
  4. Timeout Support - Never get stuck waiting

Real-World Example: Building a Task Processor ๐Ÿ› ๏ธ

Let's build something more practical - a task processing system that you might actually use in production:

package main

import (
    "github.com/gogf/gf/v2/container/gqueue"
    "context"
    "fmt"
    "time"
)

type Task struct {
    ID      string
    Payload interface{}
}

func NewTaskProcessor() {
    // Create a queue with decent capacity
    q := gqueue.New(1000)

    // Start the worker
    go func() {
        for {
            // Try to get a task
            if data := q.Pop(); data != nil {
                if task, ok := data.(Task); ok {
                    // Process the task
                    fmt.Printf("Processing task: %s\n", task.ID)
                    // Your processing logic here
                    time.Sleep(100 * time.Millisecond)
                }
            }
        }
    }()

    // Add some tasks
    for i := 0; i < 5; i++ {
        q.Push(Task{
            ID:      fmt.Sprintf("task-%d", i),
            Payload: fmt.Sprintf("data-%d", i),
        })
    }
}
Enter fullscreen mode Exit fullscreen mode

More Real-World Examples ๐ŸŒŸ

Building a Rate Limiter

Here's how you can build a simple rate limiter using gqueue:

type RateLimiter struct {
    q       *gqueue.Queue
    rate    int
    window  time.Duration
}

func NewRateLimiter(rate int, window time.Duration) *RateLimiter {
    rl := &RateLimiter{
        q:      gqueue.New(rate * 2), // Buffer for bursts
        rate:   rate,
        window: window,
    }

    // Clean up old timestamps
    go rl.cleanup()
    return rl
}

func (rl *RateLimiter) Allow() bool {
    now := time.Now()

    // Remove expired timestamps
    for {
        if ts := rl.q.Pop(); ts != nil {
            if now.Sub(ts.(time.Time)) <= rl.window {
                // Put it back if still within window
                rl.q.Push(ts)
                break
            }
        } else {
            break
        }
    }

    // Check if we can add new request
    if rl.q.Len() < int64(rl.rate) {
        rl.q.Push(now)
        return true
    }
    return false
}

func (rl *RateLimiter) cleanup() {
    ticker := time.NewTicker(rl.window / 2)
    for range ticker.C {
        now := time.Now()
        for {
            if ts := rl.q.Pop(); ts != nil {
                if now.Sub(ts.(time.Time)) <= rl.window {
                    rl.q.Push(ts)
                    break
                }
            } else {
                break
            }
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Event Processing Pipeline

Here's an example of building an event processing pipeline with retry logic:

type Event struct {
    ID        string
    Data      interface{}
    Attempts  int
    LastError error
}

type Pipeline struct {
    mainQueue   *gqueue.Queue
    retryQueue  *gqueue.Queue
    maxAttempts int
}

func NewPipeline(maxAttempts int) *Pipeline {
    p := &Pipeline{
        mainQueue:   gqueue.New(1000),
        retryQueue:  gqueue.New(1000),
        maxAttempts: maxAttempts,
    }

    // Start retry handler
    go p.handleRetries()
    // Start main processor
    go p.processEvents()

    return p
}

func (p *Pipeline) handleRetries() {
    ticker := time.NewTicker(5 * time.Second)
    for range ticker.C {
        if event := p.retryQueue.Pop(); event != nil {
            e := event.(Event)
            // Exponential backoff
            if time.Since(e.LastAttempt) > time.Second*time.Duration(1<<e.Attempts) {
                p.mainQueue.Push(e)
            } else {
                p.retryQueue.Push(e)
            }
        }
    }
}

func (p *Pipeline) processEvents() {
    for {
        if data := p.mainQueue.Pop(); data != nil {
            event := data.(Event)
            if err := p.processEvent(event); err != nil {
                event.Attempts++
                event.LastError = err
                event.LastAttempt = time.Now()

                if event.Attempts < p.maxAttempts {
                    p.retryQueue.Push(event)
                } else {
                    // Handle fatal error
                    p.handleFatalError(event)
                }
            }
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Batch Processing with Timeouts

Here's a more sophisticated batch processing implementation:

type BatchProcessor struct {
    q          *gqueue.Queue
    batchSize  int
    timeout    time.Duration
    processor  func([]interface{}) error
}

func NewBatchProcessor(batchSize int, timeout time.Duration, processor func([]interface{}) error) *BatchProcessor {
    return &BatchProcessor{
        q:         gqueue.New(batchSize * 10),
        batchSize: batchSize,
        timeout:   timeout,
        processor: processor,
    }
}

func (bp *BatchProcessor) Start(ctx context.Context) {
    batch := make([]interface{}, 0, bp.batchSize)
    timer := time.NewTimer(bp.timeout)

    for {
        select {
        case <-ctx.Done():
            if len(batch) > 0 {
                bp.processor(batch)
            }
            return

        case <-timer.C:
            if len(batch) > 0 {
                bp.processor(batch)
                batch = make([]interface{}, 0, bp.batchSize)
            }
            timer.Reset(bp.timeout)

        default:
            if item := bp.q.Pop(); item != nil {
                batch = append(batch, item)
                if len(batch) >= bp.batchSize {
                    bp.processor(batch)
                    batch = make([]interface{}, 0, bp.batchSize)
                    timer.Reset(bp.timeout)
                }
            }
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Pro Tips from the Trenches ๐Ÿ’ก

After using gqueue in several production systems, here are some tips I've learned:

1. Size Your Queue Right

// For small services (< 1000 req/s)
q := gqueue.New(1000)

// For medium services (1000-5000 req/s)
q := gqueue.New(5000)

// For high-load services (5000+ req/s)
q := gqueue.New(10000)
Enter fullscreen mode Exit fullscreen mode

2. Implement Graceful Shutdown

func main() {
    q := gqueue.New(1000)
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // Start worker
    go func() {
        for {
            select {
            case <-ctx.Done():
                // Clean up and exit
                return
            default:
                if data := q.Pop(); data != nil {
                    // Process data
                }
            }
        }
    }()
}
Enter fullscreen mode Exit fullscreen mode

3. Handle Backpressure

func pushWithBackpressure(q *gqueue.Queue, data interface{}) error {
    if q.Len() > int64(q.Cap()*0.8) {
        // Queue is getting full, take action
        return fmt.Errorf("queue is at high capacity")
    }
    q.Push(data)
    return nil
}
Enter fullscreen mode Exit fullscreen mode

Troubleshooting Guide and Common Pitfalls โš ๏ธ

1. Memory Leaks

Symptom: Increasing memory usage over time
Common Causes:

  • Forgotten goroutines still processing queue items
  • Large items not being released from memory

Solution:

func preventMemoryLeaks() {
    q := gqueue.New(1000)

    // Proper cleanup with context
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    go func() {
        for {
            select {
            case <-ctx.Done():
                // Clean up remaining items
                for q.Len() > 0 {
                    _ = q.Pop()
                }
                return
            default:
                if item := q.Pop(); item != nil {
                    // Process item
                    item = nil // Help GC
                }
            }
        }
    }()
}
Enter fullscreen mode Exit fullscreen mode

2. Queue Capacity Issues

Symptom: Application becomes slow or unresponsive
Common Causes:

  • Queue filling up faster than it's being processed
  • No backpressure mechanism

Solution:

func handleBackpressure() {
    q := gqueue.New(1000)

    // Monitor queue capacity
    go func() {
        ticker := time.NewTicker(time.Second)
        for range ticker.C {
            if q.Len() > int64(float64(q.Cap())*0.8) {
                // Alert on high usage
                log.Printf("Queue at %d%% capacity", 
                    int(float64(q.Len())/float64(q.Cap())*100))

                // Take action (e.g., slow down producers)
                throttleProducers()
            }
        }
    }()
}

func throttleProducers() {
    // Implement throttling logic
}
Enter fullscreen mode Exit fullscreen mode

3. Deadlocks and Hanging

Symptom: Workers stop processing
Common Causes:

  • Infinite loops in processing logic
  • Missing error handling

Solution:

func preventDeadlocks() {
    q := gqueue.New(1000)

    // Use timeouts
    go func() {
        for {
            done := make(chan bool)
            go func() {
                if item := q.Pop(); item != nil {
                    processWithTimeout(item)
                }
                done <- true
            }()

            // Timeout if processing takes too long
            select {
            case <-done:
                // Processing completed normally
            case <-time.After(5 * time.Second):
                // Handle timeout
                log.Println("Processing timeout")
            }
        }
    }()
}

func processWithTimeout(item interface{}) {
    // Your processing logic here
}
Enter fullscreen mode Exit fullscreen mode

4. Data Loss During Shutdown

Symptom: Items disappear when application stops
Common Causes:

  • Improper shutdown handling
  • Not waiting for queue to empty

Solution:

func gracefulShutdown() {
    q := gqueue.New(1000)
    ctx, cancel := context.WithCancel(context.Background())

    // Graceful shutdown handler
    go func() {
        sigChan := make(chan os.Signal, 1)
        signal.Notify(sigChan, syscall.SIGTERM, syscall.SIGINT)
        <-sigChan

        // Cancel context to stop new items
        cancel()

        // Wait for queue to empty
        for q.Len() > 0 {
            time.Sleep(100 * time.Millisecond)
        }

        // Now safe to exit
        os.Exit(0)
    }()
}
Enter fullscreen mode Exit fullscreen mode

5. Performance Degradation

Symptom: Processing becomes slower over time
Common Causes:

  • Too many goroutines
  • Inefficient batch processing
  • Memory pressure

Solution:

func optimizePerformance() {
    q := gqueue.New(1000)

    // Use worker pool
    const workerCount = 5
    sem := make(chan struct{}, workerCount)

    go func() {
        for {
            sem <- struct{}{} // Limit concurrent workers
            go func() {
                defer func() { <-sem }()

                if item := q.Pop(); item != nil {
                    // Process with proper error handling
                    if err := processItem(item); err != nil {
                        log.Printf("Error processing item: %v", err)
                    }
                }
            }()
        }
    }()
}
Enter fullscreen mode Exit fullscreen mode

Performance Tips ๐Ÿš„

Here's a quick batch processing pattern that can significantly boost performance:

func batchProcess(q *gqueue.Queue, batchSize int) {
    batch := make([]interface{}, 0, batchSize)

    // Collect items
    for i := 0; i < batchSize; i++ {
        if item := q.Pop(); item != nil {
            batch = append(batch, item)
        }
    }

    // Process batch
    if len(batch) > 0 {
        processBatch(batch)
    }
}
Enter fullscreen mode Exit fullscreen mode

When to Use gqueue (And When Not To) ๐Ÿค”

Perfect for:

  • โœ… Background task processing
  • โœ… Message buffering
  • โœ… Event handling
  • โœ… Rate limiting

Maybe not for:

  • โŒ Persistent storage needs
  • โŒ Distributed systems (use Kafka/RabbitMQ instead)
  • โŒ Critical transaction processing

Let's Wrap It Up! ๐ŸŽ

gqueue is a fantastic tool when you need a lightweight, in-memory queue in Go. It's perfect for those scenarios where a full-blown message queue system would be overkill, but plain channels aren't quite enough.

Your Turn! ๐ŸŽฏ

Now I'd love to hear from you:

  • Have you used gqueue in your projects?
  • What other queue implementations have you tried in Go?
  • Any cool patterns or tips to share?

Drop your thoughts in the comments below! And if you found this useful, don't forget to give it a โค๏ธ


P.S. If you want to dive deeper into GoFrame, check out my other articles in the series! ๐Ÿ“š

AWS GenAI LIVE image

Real challenges. Real solutions. Real talk.

From technical discussions to philosophical debates, AWS and AWS Partners examine the impact and evolution of gen AI.

Learn more

Top comments (0)

Tiger Data image

๐Ÿฏ ๐Ÿš€ Timescale is now TigerData: Building the Modern PostgreSQL for the Analytical and Agentic Era

Weโ€™ve quietly evolved from a time-series database into the modern PostgreSQL for todayโ€™s and tomorrowโ€™s computing, built for performance, scale, and the agentic future.

So weโ€™re changing our name: from Timescale to TigerData. Not to change who we are, but to reflect who weโ€™ve become. TigerData is bold, fast, and built to power the next era of software.

Read more

๐Ÿ‘‹ Kindness is contagious

Discover fresh viewpoints in this insightful post, supported by our vibrant DEV Community. Every developerโ€™s experience mattersโ€”add your thoughts and help us grow together.

A simple โ€œthank youโ€ can uplift the author and spark new discussionsโ€”leave yours below!

On DEV, knowledge-sharing connects us and drives innovation. Found this useful? A quick note of appreciation makes a real impact.

Okay