<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>Forem: Ilya Voronin</title>
    <description>The latest articles on Forem by Ilya Voronin (@ivoronin).</description>
    <link>https://forem.com/ivoronin</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F3592181%2Fcc21e59f-78fb-4f7a-b287-6b0d62f114b0.png</url>
      <title>Forem: Ilya Voronin</title>
      <link>https://forem.com/ivoronin</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://forem.com/feed/ivoronin"/>
    <language>en</language>
    <item>
      <title>Celery + SQS: Stop Broken Workers from Monopolizing Your Queue with Circuit Breakers</title>
      <dc:creator>Ilya Voronin</dc:creator>
      <pubDate>Sat, 01 Nov 2025 08:33:59 +0000</pubDate>
      <link>https://forem.com/ivoronin/celery-sqs-stop-broken-workers-from-monopolizing-your-queue-with-circuit-breakers-11dj</link>
      <guid>https://forem.com/ivoronin/celery-sqs-stop-broken-workers-from-monopolizing-your-queue-with-circuit-breakers-11dj</guid>
      <description>&lt;h2&gt;
  
  
  The Problem
&lt;/h2&gt;

&lt;p&gt;You have 10 Celery workers processing tasks from SQS. One worker's GPU fails. Here's what happens:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;Healthy worker:&lt;/strong&gt; Takes 60 seconds to process a task&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Broken worker:&lt;/strong&gt; Fails in 0.5 seconds, immediately grabs next task&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;The broken worker is &lt;strong&gt;100x faster&lt;/strong&gt; at consuming (and failing) tasks. In one minute, it handles 90% of your queue and fails everything.&lt;/p&gt;

&lt;p&gt;This is &lt;strong&gt;systemic failure amplification&lt;/strong&gt; — one broken component paralyzes the entire system.&lt;/p&gt;

&lt;h2&gt;
  
  
  The Solution
&lt;/h2&gt;

&lt;p&gt;When a worker keeps failing, &lt;strong&gt;stop it from taking new tasks&lt;/strong&gt; for a timeout period.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;The challenge:&lt;/strong&gt; Celery + SQS doesn't have a built-in way to pause consumption based on worker health. There's no API to say "stop fetching messages for 60 seconds because this worker is broken."&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Our approach:&lt;/strong&gt; Patch Celery's consumption mechanism using bootsteps + shared memory for cross-process signaling.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;multiprocessing&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;Value&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;ctypes&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;celery&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;bootsteps&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;types&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;MethodType&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;pybreaker&lt;/span&gt;

&lt;span class="c1"&gt;# 1. Shared memory between all processes
&lt;/span&gt;&lt;span class="n"&gt;_paused_until&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;Value&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ctypes&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;c_double&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mf"&gt;0.0&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="c1"&gt;# 2. When circuit opens, write pause timestamp
&lt;/span&gt;&lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;CircuitBreakerPauseController&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;pybreaker&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;CircuitBreakerListener&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;state_change&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;cb&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;old_state&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;new_state&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;new_state&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;name&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="n"&gt;pybreaker&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;STATE_OPEN&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="n"&gt;pause_until_ts&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;datetime&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;now&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;UTC&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="nf"&gt;timestamp&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="n"&gt;cb&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;reset_timeout&lt;/span&gt;
            &lt;span class="n"&gt;_paused_until&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;value&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;pause_until_ts&lt;/span&gt;

&lt;span class="c1"&gt;# 3. Patch can_consume() to block SQS fetching during pause
&lt;/span&gt;&lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;CircuitBreakerConsumptionGate&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;bootsteps&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;StartStopStep&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="n"&gt;requires&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;celery.worker.consumer.tasks:Tasks&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,)&lt;/span&gt;

    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;start&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;parent&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="n"&gt;channel_qos&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;parent&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;task_consumer&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;channel&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;qos&lt;/span&gt;
        &lt;span class="n"&gt;original&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;channel_qos&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;can_consume&lt;/span&gt;

        &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;can_consume&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
            &lt;span class="n"&gt;pause_until_ts&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;_paused_until&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;value&lt;/span&gt;
            &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;pause_until_ts&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="mf"&gt;0.0&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
                &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;datetime&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;now&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;UTC&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="nf"&gt;timestamp&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt; &lt;span class="n"&gt;pause_until_ts&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
                    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="bp"&gt;False&lt;/span&gt;  &lt;span class="c1"&gt;# Block fetching
&lt;/span&gt;            &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nf"&gt;bool&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nf"&gt;original&lt;/span&gt;&lt;span class="p"&gt;())&lt;/span&gt;

        &lt;span class="n"&gt;channel_qos&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;can_consume&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;MethodType&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;can_consume&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;channel_qos&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="c1"&gt;# 4. Circuit breaker with listeners
&lt;/span&gt;&lt;span class="n"&gt;breaker&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;pybreaker&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nc"&gt;CircuitBreaker&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;fail_max&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;3&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;reset_timeout&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;60&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;throw_new_error_on_trip&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="bp"&gt;False&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;  &lt;span class="c1"&gt;# Keep original exceptions
&lt;/span&gt;    &lt;span class="n"&gt;listeners&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="nc"&gt;CircuitBreakerPauseController&lt;/span&gt;&lt;span class="p"&gt;()],&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="c1"&gt;# 5. Register bootstep and wrap tasks
&lt;/span&gt;&lt;span class="n"&gt;app&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;steps&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;consumer&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;].&lt;/span&gt;&lt;span class="nf"&gt;add&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;CircuitBreakerConsumptionGate&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="nd"&gt;@app.task&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;bind&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="bp"&gt;True&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;process_workflow&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;workflow&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;breaker&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;call&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;_execute_workflow&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;workflow&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;How it works:&lt;/strong&gt;&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Worker fails 3 times → circuit opens&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;PauseController&lt;/code&gt; writes &lt;code&gt;_paused_until = now + 60s&lt;/code&gt; to shared memory&lt;/li&gt;
&lt;li&gt;Main process checks &lt;code&gt;_paused_until&lt;/code&gt; in &lt;code&gt;can_consume()&lt;/code&gt;
&lt;/li&gt;
&lt;li&gt;Returns &lt;code&gt;False&lt;/code&gt; → &lt;strong&gt;no SQS fetching for 60 seconds&lt;/strong&gt;
&lt;/li&gt;
&lt;li&gt;After timeout, circuit tries one task (HALF_OPEN state)&lt;/li&gt;
&lt;li&gt;Success → close circuit, failure → pause again&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;The pattern transforms failing workers from system-killers into isolated incidents.&lt;/p&gt;

</description>
      <category>celery</category>
      <category>sqs</category>
      <category>python</category>
      <category>sre</category>
    </item>
  </channel>
</rss>
