DEV Community

Akarshan Gandotra
Akarshan Gandotra

Posted on

3 1 1 1 1

Building a Robust Redis Client: Async Redis + Tenacity + Circuit Breaker

Image description

Redis is the backbone of many high-performance applications, but network hiccups and temporary outages can wreak havoc on your system. What if I told you there's a way to make your Redis operations virtually bulletproof? 🛡️

Today, we'll dive deep into building a production-ready Redis client with comprehensive retry logic, circuit breaker patterns, and connection pooling that can handle the chaos of distributed systems.

🚨 The Problem: When Redis Goes Down

Picture this: Your application is humming along perfectly, handling thousands of requests per second. Suddenly, Redis experiences a brief network hiccup. Your application starts throwing exceptions, users see errors, and your monitoring dashboard lights up like a Christmas tree. 🎄💥

Common Redis failure scenarios:

  • Network timeouts 🌐⏰
  • Connection drops 🔌❌
  • Redis server restarts 🔄
  • Memory pressure 💾⚠️
  • Temporary unavailability 🚫

🏗️ The Solution: A Resilient Redis Client

Our solution combines several battle-tested patterns:

1. Exponential Backoff Retry with Tenacity 📈

We leverage the powerful Tenacity library for sophisticated retry logic. Tenacity provides a clean, declarative way to handle retries with various strategies:

from tenacity import AsyncRetrying, retry_if_exception_type, stop_after_attempt, wait_exponential

@dataclass(frozen=True)
class RedisRetryConfig:
    max_attempts: int = settings.REDIS_MAX_ATTEMPTS
    base_delay: float = settings.REDIS_BASE_DELAY
    max_delay: float = settings.REDIS_MAX_DELAY
    exponential_base: float = settings.REDIS_EXPONENTIAL_BASE
    retry_exceptions: tuple = field(
        default_factory=lambda: (
            redis.ConnectionError,
            redis.TimeoutError,
            redis.RedisError,
            ConnectionRefusedError,
            TimeoutError,
        )
    )
Enter fullscreen mode Exit fullscreen mode

Why Tenacity? 🎯

  • Declarative syntax - Easy to read and configure
  • Multiple retry strategies - Exponential, linear, random jitter
  • Exception filtering - Retry only on specific exceptions
  • Async support - Perfect for modern Python applications
  • Battle-tested - Used in production by thousands of applications

2. Circuit Breaker Pattern 🔌

class CircuitBreaker:
    """Simple circuit breaker for Redis operations."""

    def __init__(self, threshold: int, timeout: float):
        self.threshold = threshold
        self.timeout = timeout
        self.failure_count = 0
        self.last_failure_time = 0
        self.state = "closed"  # closed, open, half-open
Enter fullscreen mode Exit fullscreen mode

3. Connection Pooling 🏊‍♂️

self._connection_pool = ConnectionPool.from_url(
    settings.REDIS_URL,
    max_connections=getattr(settings, "REDIS_CONNECTION_POOL", 20),
    retry_on_timeout=True,
    socket_keepalive=True,
    health_check_interval=30,
)
Enter fullscreen mode Exit fullscreen mode

📚 Key Dependencies

This implementation leverages several powerful Python libraries:

  • 🔄 Tenacity - The star of our retry logic! Provides declarative retry decorators with exponential backoff, jitter, and exception filtering
  • 🔴 Redis-py - Official async Redis client for Python
  • ⚙️ Asyncio - For async/await support and thread-safe operations
  • 📊 Dataclasses - Clean, immutable configuration objects
pip install tenacity redis[hiredis]
Enter fullscreen mode Exit fullscreen mode

🎯 Key Features

Automatic Retry Logic

The redis_retry decorator automatically wraps Redis operations with intelligent retry logic using Tenacity:

def redis_retry(
    config: Optional[RedisRetryConfig] = None,
    operation_name: Optional[str] = None,
    use_circuit_breaker: bool = True,
) -> Callable:
    """Decorator for Redis operations with retry logic and circuit breaker."""

    retry_config = config or RedisRetryConfig()

    def decorator(func: Callable[..., T]) -> Callable[..., T]:
        @wraps(func)
        async def wrapper(*args, **kwargs) -> T:
            async def execute_with_retry():
                try:
                    # Tenacity's AsyncRetrying provides the retry logic
                    async for attempt in AsyncRetrying(
                        stop=stop_after_attempt(retry_config.max_attempts),
                        wait=wait_exponential(
                            multiplier=retry_config.base_delay,
                            max=retry_config.max_delay,
                            exp_base=retry_config.exponential_base,
                        ),
                        retry=retry_if_exception_type(retry_config.retry_exceptions),
                        reraise=True,
                    ):
                        with attempt:
                            return await func(*args, **kwargs)
                except RetryError as e:
                    logger.error(f"Failed after {retry_config.max_attempts} attempts")
                    raise e.last_attempt.exception() from e

            if circuit_breaker:
                return await circuit_breaker.call(execute_with_retry)
            else:
                return await execute_with_retry()

        return wrapper
    return decorator
Enter fullscreen mode Exit fullscreen mode

Tenacity Features in Action: 🚀

  • stop_after_attempt() - Limits total retry attempts
  • wait_exponential() - Implements exponential backoff with jitter
  • retry_if_exception_type() - Only retries on specific exceptions
  • AsyncRetrying - Async-first design for modern Python apps

🔄 Circuit Breaker States

Image description

  • Closed 🟢: Normal operation, requests pass through
  • Open 🔴: Failing fast, requests immediately rejected
  • Half-Open 🟡: Testing if service recovered

🏊‍♂️ Enhanced Redis Client

class RedisWithRetry(redis.Redis):
    """Enhanced Redis client with comprehensive retry logic and circuit breaker."""

    def __getattribute__(self, name: str) -> Any:
        """Override attribute access to automatically add retry logic."""
        attr = super().__getattribute__(name)

        if (
            name in self._core_methods
            and callable(attr)
            and asyncio.iscoroutinefunction(attr)
        ):
            retry_config = super().__getattribute__("retry_config")
            return redis_retry(retry_config, name)(attr)

        return attr
Enter fullscreen mode Exit fullscreen mode

This clever __getattribute__ override automatically wraps all Redis operations with retry logic! 🎩✨

📊 Health Monitoring

The client includes comprehensive health checking:

async def health_check(self) -> Dict[str, Any]:
    """Comprehensive health check for Redis connection."""
    try:
        start_time = time.time()
        await self.ping()
        ping_time = time.time() - start_time

        info = await self.info()

        return {
            "status": "healthy",
            "ping_time_ms": round(ping_time * 1000, 2),
            "connected_clients": info.get("connected_clients", 0),
            "used_memory": info.get("used_memory", 0),
            "used_memory_human": info.get("used_memory_human", "0B"),
            "redis_version": info.get("redis_version", "unknown"),
        }
    except Exception as e:
        return {
            "status": "unhealthy",
            "error": str(e),
            "error_type": type(e).__name__,
        }
Enter fullscreen mode Exit fullscreen mode

🔧 Advanced Tenacity Configurations

Tenacity offers incredible flexibility for different retry scenarios:

Custom Wait Strategies ⏱️

from tenacity import wait_fixed, wait_random, wait_combine

# Fixed delay
wait=wait_fixed(2)  # Always wait 2 seconds

# Random jitter to prevent thundering herd
wait=wait_random(min=1, max=3)

# Combine strategies
wait=wait_combine(
    wait_exponential(multiplier=1, max=10),
    wait_random(min=0, max=2)  # Add jitter
)
Enter fullscreen mode Exit fullscreen mode

Stop Conditions 🛑

from tenacity import stop_after_delay, stop_never

# Stop after total time
stop=stop_after_delay(30)  # Stop after 30 seconds total

# Stop after attempts OR time
stop=(stop_after_attempt(5) | stop_after_delay(30))
Enter fullscreen mode Exit fullscreen mode

Custom Retry Conditions 🎯

from tenacity import retry_if_result, retry_if_not_result

# Retry based on return value
@retry(retry=retry_if_result(lambda x: x is None))
async def get_data():
    return await redis_client.get("key")

# Retry on specific exception attributes
@retry(retry=retry_if_exception(
    lambda e: isinstance(e, redis.ConnectionError) and "timeout" in str(e)
))
async def redis_operation():
    # Your Redis operation here
    pass
Enter fullscreen mode Exit fullscreen mode

🔧 Usage Examples

Basic Usage

# Get the singleton client
client = await get_redis_client()

# All operations now have automatic retry logic!
await client.set("key", "value")
value = await client.get("key")
Enter fullscreen mode Exit fullscreen mode

Pipeline Operations

async with redis_pipeline() as pipeline:
    pipeline.set("key1", "value1")
    pipeline.set("key2", "value2")
    results = await pipeline.execute()  # Retries included!
Enter fullscreen mode Exit fullscreen mode

Health Monitoring

health = await redis_health_check()
print(f"Redis status: {health['status']}")
print(f"Ping time: {health['ping_time_ms']}ms")
Enter fullscreen mode Exit fullscreen mode

📈 Performance Benefits

Scenario Without Retry With Retry + Circuit Breaker
Network blip (100ms) ❌ Immediate failure ✅ Transparent recovery
Redis restart (5s) ❌ 5s of errors ✅ Fast failure → recovery
Sustained outage ❌ Continuous timeouts ✅ Fail fast after threshold

🛠️ Configuration Options

# Custom retry configuration
config = RedisRetryConfig(
    max_attempts=5,
    base_delay=0.1,
    max_delay=60.0,
    exponential_base=2.0,
    circuit_breaker_threshold=5,
    circuit_breaker_timeout=30.0
)

client = await get_redis_client(config)
Enter fullscreen mode Exit fullscreen mode

🔐 Thread Safety & Singleton Pattern

The RedisClientManager ensures thread-safe singleton behavior:

class RedisClientManager:
    """Thread-safe singleton manager for Redis client."""

    def __init__(self):
        self._client: Optional[RedisWithRetry] = None
        self._lock = asyncio.Lock()

    async def get_client(self) -> RedisWithRetry:
        if self._client is None:
            async with self._lock:
                if self._client is None:
                    await self._initialize_client()
        return self._client
Enter fullscreen mode Exit fullscreen mode

🚀 Production Tips

1. Monitor Circuit Breaker States 📊

# Add metrics collection
if circuit_breaker.state == "open":
    metrics.counter("redis.circuit_breaker.open").increment()
Enter fullscreen mode Exit fullscreen mode

2. Tune Retry Parameters ⚙️

  • Start with conservative values
  • Monitor success/failure rates
  • Adjust based on your Redis setup

3. Connection Pool Sizing 🏊‍♂️

# Rule of thumb: 2-3x your concurrent request count
max_connections = concurrent_requests * 2.5
Enter fullscreen mode Exit fullscreen mode

4. Health Check Integration 🏥

# Add to your application health endpoint
@app.get("/health")
async def health_check():
    redis_health = await redis_health_check()
    return {"redis": redis_health}
Enter fullscreen mode Exit fullscreen mode

🎭 Error Handling Hierarchy

Image description

📝 Conclusion

Building resilient systems requires more than just hoping nothing goes wrong. With this Redis client, you get:

  • 🔄 Automatic retries with exponential backoff
  • 🔌 Circuit breaker protection
  • 🏊‍♂️ Connection pooling for performance
  • 📊 Health monitoring for observability
  • 🛡️ Production-ready error handling

Your Redis operations will now gracefully handle network issues, temporary outages, and other distributed system challenges. No more 3 AM alerts for transient Redis hiccups! 🌙😴

Full Code Snippet

Coherence image

Authenticated AI Chat, Just 15 Lines of Code

Multi-modal streaming chat (including charts) with your existing backend data. Choose from 10+ models from all leading providers. Total control and visibility.

Learn more

Top comments (1)

Collapse
 
amit_dubey_ec930ef81c7435 profile image
Amit Dubey

@akarshan Very well explained. Keep up the good work.

Developer-first embedded dashboards

Developer-first embedded dashboards

Embed in minutes, load in milliseconds, extend infinitely. Import any chart, connect to any database, embed anywhere. Scale elegantly, monitor effortlessly, CI/CD & version control.

Get early access

👋 Kindness is contagious

Sign in to DEV to enjoy its full potential.

Unlock a customized interface with dark mode, personal reading preferences, and more.

Okay