<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>Forem: Onkar</title>
    <description>The latest articles on Forem by Onkar (@rakno).</description>
    <link>https://forem.com/rakno</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F3903579%2F2780481a-5e16-40ac-8840-126059da6cde.png</url>
      <title>Forem: Onkar</title>
      <link>https://forem.com/rakno</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://forem.com/feed/rakno"/>
    <language>en</language>
    <item>
      <title>I Built a CRM AI Assistant in Go From Scratch — No LangChain, No Shortcuts</title>
      <dc:creator>Onkar</dc:creator>
      <pubDate>Tue, 05 May 2026 06:28:12 +0000</pubDate>
      <link>https://forem.com/rakno/i-built-a-crm-ai-assistant-in-go-from-scratch-no-langchain-no-shortcuts-4jh5</link>
      <guid>https://forem.com/rakno/i-built-a-crm-ai-assistant-in-go-from-scratch-no-langchain-no-shortcuts-4jh5</guid>
      <description>&lt;blockquote&gt;
&lt;p&gt;&lt;strong&gt;TL;DR:&lt;/strong&gt; I built Aria — a production-grade AI assistant for CRM agents. Agents log in with Google, type natural language questions about their leads, tasks, and bookings, and get real answers streamed from a live database. No hallucinations. No canned responses. Just SQL executed against real data, formatted by GPT-4o, and streamed token by token. Built entirely in Go, with a Python schema intelligence pipeline, pgvector for semantic search, Redis for caching and sessions, and a Next.js frontend. Here's everything I learned.&lt;/p&gt;
&lt;/blockquote&gt;




&lt;h2&gt;
  
  
  Why I built this
&lt;/h2&gt;

&lt;p&gt;I work as a backend engineer at a PropTech company. Every day I watch pre-sales agents open five different tabs to answer one question — leads dashboard, task list, booking tracker, payment sheet, activity log. The data exists. It's all in PostgreSQL. But to answer "which of my leads haven't been contacted in 3 days?", an agent has to mentally join four tables themselves.&lt;/p&gt;

&lt;p&gt;I wanted to build something that let an agent just ask that question — in plain English — and get a real answer from the live database. Not a chatbot that makes things up. An AI that runs actual SQL and tells you what it found.&lt;/p&gt;

&lt;p&gt;That's Aria.&lt;/p&gt;




&lt;h2&gt;
  
  
  What Aria actually does
&lt;/h2&gt;

&lt;p&gt;Before I get into the technical decisions, let me show you what it looks like in practice.&lt;/p&gt;

&lt;p&gt;An agent logs in with Google. They see a chat interface. They type:&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;"How many leads are assigned to me?"&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;Aria responds:&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;"You have 5 leads assigned to you."&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;With a "SQL executed" toggle that reveals the actual query that ran. Then they ask:&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;"What tasks are due today?"&lt;/p&gt;

&lt;p&gt;"There is one task due today. It is a high-priority call titled 'Welcome call — Priya,' which involves an intro call and confirming the budget range. The task is currently pending and has not been completed yet."&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;Then:&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;"Recent activity on my leads"&lt;/p&gt;

&lt;p&gt;"Recent activities on your leads include:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;Email Activity&lt;/strong&gt; — Type: Outbound Email, Subject: Sent brochure, Body: PDF with Manchester options, Occurred: May 4, 2026 at 12:43 PM&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Call Activity&lt;/strong&gt; — Type: Inbound Call, Subject: Inbound budget check, Body: Student asked about bills-inclusive options, Occurred: May 3, 2026 at 4:43 PM, Outcome: Connected"&lt;/li&gt;
&lt;/ol&gt;
&lt;/blockquote&gt;

&lt;p&gt;Real data. Live database. The agent never wrote a query.&lt;/p&gt;




&lt;h2&gt;
  
  
  The core technical problem — and why RAG isn't the answer
&lt;/h2&gt;

&lt;p&gt;My first instinct was RAG (Retrieval Augmented Generation) — the pattern everyone uses for document question-answering. Embed all your CRM data as vectors, find the most similar documents when someone asks a question, feed them to GPT-4.&lt;/p&gt;

&lt;p&gt;This is completely wrong for structured CRM data.&lt;/p&gt;

&lt;p&gt;RAG is for &lt;strong&gt;unstructured data&lt;/strong&gt; — PDFs, notes, articles. The answer to "how many leads are assigned to me?" isn't found by semantic similarity. It requires an exact COUNT query with a WHERE clause. Semantic search on a vector of "John Smith, high priority, interested state, assigned to agent_42" doesn't give you a count. It gives you a similar-sounding document.&lt;/p&gt;

&lt;p&gt;The right approach is &lt;strong&gt;Text-to-SQL&lt;/strong&gt; — translate the natural language question into an actual SQL query, execute it, get real rows, format the result.&lt;/p&gt;

&lt;p&gt;But raw Text-to-SQL (just dumping your schema into a prompt and asking GPT-4 to write SQL) breaks down on complex schemas with business-specific terminology. "High priority leads" only maps to &lt;code&gt;priority = 'HIGH'&lt;/code&gt; if the LLM knows your schema well enough. "Stale leads" needs to know that means &lt;code&gt;last_activity_at &amp;lt; NOW() - INTERVAL '3 days'&lt;/code&gt;.&lt;/p&gt;

&lt;p&gt;The solution I landed on: &lt;strong&gt;RAG over the schema, not over the data rows.&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;I build a semantic layer — plain-English descriptions of every table, every column, every enum value, and what they mean in business terms. That semantic layer gets embedded as vectors. When an agent asks a question, I retrieve the most relevant schema descriptions first, then ask GPT-4 to generate SQL using that focused context. The LLM sees exactly what it needs, not the entire 15-table schema.&lt;/p&gt;




&lt;h2&gt;
  
  
  Why Go
&lt;/h2&gt;

&lt;p&gt;I'd been using Ruby on Rails professionally for over a year. I wanted to learn Go properly — not from tutorials, but by building something real with real constraints.&lt;/p&gt;

&lt;p&gt;Go turned out to be the right call for this specific project for three reasons.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Goroutines for SSE streaming.&lt;/strong&gt; LLM responses stream token by token. In Go, you open a channel, pump tokens from the OpenAI stream into it, and flush each one as an SSE event. The pattern is clean and idiomatic. The &lt;code&gt;http.Flusher&lt;/code&gt; interface handles the rest. Zero boilerplate.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Type safety for LLM tool calling.&lt;/strong&gt; OpenAI's function calling returns JSON that maps to your defined tool schema. In Go, you define a struct, and the type system ensures you're handling the response correctly. No runtime surprises from dynamic typing.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Concurrency for parallel queries.&lt;/strong&gt; Some questions require multiple SQL queries — "show me my leads and today's bookings." Go's &lt;code&gt;sync.WaitGroup&lt;/code&gt; + goroutines make parallel execution natural. Run both queries simultaneously, merge results, format the combined answer. Python could do this, but Go makes it feel like the default way to think.&lt;/p&gt;

&lt;p&gt;The one place I kept Python: the schema intelligence pipeline that reads the DB schema and generates semantic documentation via GPT-4o. The Python AI ecosystem (psycopg2, pgvector library, openai SDK) is simply more mature for that one-time build step.&lt;/p&gt;




&lt;h2&gt;
  
  
  The architecture
&lt;/h2&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;Browser (Next.js)
      │
      │ Google OAuth → JWT → httpOnly refresh cookie
      │ POST /chat → SSE stream
      │
Go API (Chi router)
      │
      ├── JWT middleware (validates + blacklists in Redis)
      ├── Rate limiter (30 req/min per agent, Redis)
      │
      ├── Schema retriever
      │   → embed question → pgvector cosine search → top-5 relevant schema docs
      │
      ├── Intent example retriever
      │   → cosine search on pre-seeded Q→SQL examples
      │
      ├── GPT-4o tool calling
      │   → system prompt: agent context + retrieved schema + examples + history
      │   → tool: query_crm_database(sql, explanation)
      │
      ├── SQL validator
      │   → must be SELECT only
      │   → inject WHERE assigned_agent_id = $1 (agent isolation)
      │
      ├── pgx executor (read-only role, 5s timeout)
      │
      ├── Redis query cache (5 min TTL, invalidated on thumbs-down)
      ├── Redis session history (last 10 messages, 2h TTL)
      │
      └── SSE streamer → tokens flow to browser

PostgreSQL
  - 15 tables: leads, tasks, bookings, payments, users, partners, properties, activities...
  - pgvector extension: schema_embeddings, intent_examples
  - Read-only role: aria_readonly (SELECT only at DB level)

Python schema pipeline (runs once on setup)
  - Introspects PostgreSQL schema
  - GPT-4o generates plain-English column descriptions
  - Embeds + stores in schema_embeddings
  - Auto-generates 30 example Q→SQL pairs → intent_examples
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;






&lt;h2&gt;
  
  
  The decision that matters most: agent data isolation
&lt;/h2&gt;

&lt;p&gt;This is not a technical trick. It's a security requirement.&lt;/p&gt;

&lt;p&gt;When agent A asks "show me my leads," they must only see their leads. Not agent B's. Not all agents' combined. This sounds obvious but there are three places where it could fail:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;The LLM layer.&lt;/strong&gt; GPT-4o might forget to include a WHERE clause. The LLM is non-deterministic — you cannot trust it to always include the agent filter.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;The application layer.&lt;/strong&gt; This is where I enforce it. After the LLM generates SQL, before it ever hits the database, I inject &lt;code&gt;AND assigned_agent_id = $1&lt;/code&gt; and pass the authenticated agent's ID as a parameterised argument. The agent ID comes from the JWT — which is cryptographically signed. This runs regardless of what the LLM produced.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;The database layer.&lt;/strong&gt; The application connects as &lt;code&gt;aria_readonly&lt;/code&gt; — a PostgreSQL role with only SELECT permissions. Even if somehow a non-SELECT query got through, it would fail at the database level.&lt;/p&gt;

&lt;p&gt;Three independent layers. Any one of them catches a failure in the others.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight go"&gt;&lt;code&gt;&lt;span class="c"&gt;// After SQL is generated by LLM, before execution&lt;/span&gt;
&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="n"&gt;injectAgentFilter&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;sql&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;agentID&lt;/span&gt; &lt;span class="kt"&gt;string&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="kt"&gt;string&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="c"&gt;// Replace :agent_id placeholder if LLM included it&lt;/span&gt;
    &lt;span class="c"&gt;// If not, inject the filter regardless&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="o"&gt;!&lt;/span&gt;&lt;span class="n"&gt;strings&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Contains&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;strings&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;ToUpper&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;sql&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt; &lt;span class="s"&gt;"ASSIGNED_AGENT_ID"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="c"&gt;// Wrap the query: SELECT * FROM (...) WHERE assigned_agent_id = $1&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;fmt&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Sprintf&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
            &lt;span class="s"&gt;"SELECT * FROM (%s) AS q WHERE q.assigned_agent_id = $1"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;sql&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;strings&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;ReplaceAll&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;sql&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;":agent_id"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"$1"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;






&lt;h2&gt;
  
  
  The schema intelligence pipeline
&lt;/h2&gt;

&lt;p&gt;This is the piece I'm most proud of, because it solves a real problem that existing tools don't.&lt;/p&gt;

&lt;p&gt;The problem: Text-to-SQL tools expect you to manually write documentation about your schema — what each column means, what each enum value represents. For a 15-table CRM with columns like &lt;code&gt;ai_ineligible_reason&lt;/code&gt;, &lt;code&gt;source_details&lt;/code&gt;, &lt;code&gt;meta_ai_call_status&lt;/code&gt;, manually writing that documentation is hours of work. And every time you add a column, you have to update the docs.&lt;/p&gt;

&lt;p&gt;My solution: a Python script that introspects the PostgreSQL schema and sends each table's DDL to GPT-4o, which generates the documentation automatically.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;generate_table_doc&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;client&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;table_name&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;ddl&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;foreign_keys&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;sample_rows&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="n"&gt;prompt&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;
    You are documenting a CRM database for a student accommodation company.

    Here is the DDL for the &lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;table_name&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt; table:
    &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;ddl&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;

    Foreign keys: &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;foreign_keys&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;
    Sample rows (3): &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;sample_rows&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;

    Generate a JSON object with:
    - table purpose (1-2 sentences)
    - for each column: plain English description, possible values if enum
    - common business questions this table answers
    - relationships to other tables
    &lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;
    &lt;span class="c1"&gt;# Returns structured JSON
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The output for the &lt;code&gt;leads&lt;/code&gt; table tells GPT-4o things like: "state = 'interested' means the student has shown interest but has not yet booked. pre_sales_agent_id is the agent currently responsible for converting this lead." These descriptions get embedded as vectors and stored in &lt;code&gt;schema_embeddings&lt;/code&gt;.&lt;/p&gt;

&lt;p&gt;The pipeline is incremental — it hashes each table's DDL. On re-run, only tables whose schema has changed get re-documented. Add a column to &lt;code&gt;leads&lt;/code&gt;, only &lt;code&gt;leads&lt;/code&gt; gets reprocessed. The rest is instant.&lt;/p&gt;




&lt;h2&gt;
  
  
  The dual-write problem and why I store conversations in both Redis and PostgreSQL
&lt;/h2&gt;

&lt;p&gt;Conversation history lives in two places:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Redis&lt;/strong&gt; (&lt;code&gt;session:{user_id}:{session_id}&lt;/code&gt;, TTL 2 hours) — the last 10 messages, for fast lookup on every API call. The LLM needs this context to answer follow-up questions.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;PostgreSQL&lt;/strong&gt; (&lt;code&gt;conversations&lt;/code&gt; and &lt;code&gt;messages&lt;/code&gt; tables) — permanent storage for audit, feedback training, and conversation history display.&lt;/p&gt;

&lt;p&gt;Why both? Speed vs durability. Redis is fast — loading session history from Redis takes ~1ms. PostgreSQL with a JOIN takes ~10ms. For something that happens on every LLM call, that 9ms difference matters. But Redis is ephemeral — TTL expires, memory pressure evicts keys. The PostgreSQL record is permanent and queryable.&lt;/p&gt;

&lt;p&gt;This is the same pattern I use at my day job for a different problem. The tradeoff is always: what needs to be fast, and what needs to be permanent?&lt;/p&gt;




&lt;h2&gt;
  
  
  Streaming with SSE in Go
&lt;/h2&gt;

&lt;p&gt;Server-Sent Events is the right choice for streaming LLM responses. It's one-directional (server to client), works over standard HTTP, and auto-reconnects on disconnect. No WebSocket handshake overhead.&lt;/p&gt;

&lt;p&gt;The Go implementation is clean:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight go"&gt;&lt;code&gt;&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="n"&gt;streamResponse&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;w&lt;/span&gt; &lt;span class="n"&gt;http&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;ResponseWriter&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;openaiStream&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;openai&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;ChatCompletionStream&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;w&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Header&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Set&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Content-Type"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"text/event-stream"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;w&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Header&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Set&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Cache-Control"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"no-cache"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;w&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Header&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Set&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"X-Accel-Buffering"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"no"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="n"&gt;flusher&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;w&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;http&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Flusher&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;chunk&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;openaiStream&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Recv&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="n"&gt;io&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;EOF&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="n"&gt;fmt&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Fprintf&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;w&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"data: [DONE]&lt;/span&gt;&lt;span class="se"&gt;\n\n&lt;/span&gt;&lt;span class="s"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
            &lt;span class="n"&gt;flusher&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Flush&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
            &lt;span class="k"&gt;return&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt;

        &lt;span class="n"&gt;token&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;chunk&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Choices&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="m"&gt;0&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Delta&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Content&lt;/span&gt;
        &lt;span class="n"&gt;event&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;fmt&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Sprintf&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;`{"type":"token","text":%q}`&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;token&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="n"&gt;fmt&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Fprintf&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;w&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"data: %s&lt;/span&gt;&lt;span class="se"&gt;\n\n&lt;/span&gt;&lt;span class="s"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;event&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="n"&gt;flusher&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Flush&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The frontend uses the &lt;code&gt;EventSource&lt;/code&gt; API with a custom hook that parses event types (&lt;code&gt;sql&lt;/code&gt;, &lt;code&gt;token&lt;/code&gt;, &lt;code&gt;done&lt;/code&gt;, &lt;code&gt;error&lt;/code&gt;) and updates React state accordingly. The agent sees tokens appearing one by one — exactly like ChatGPT, but with real CRM data behind it.&lt;/p&gt;




&lt;h2&gt;
  
  
  The feedback loop
&lt;/h2&gt;

&lt;p&gt;Every AI response has thumbs-up / thumbs-down buttons. This isn't just UX polish — it's the learning mechanism.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Thumbs up:&lt;/strong&gt; The Q→SQL pair gets stored in &lt;code&gt;query_feedback&lt;/code&gt; with &lt;code&gt;is_helpful = true&lt;/code&gt;. After 3 upvotes on similar question patterns, the pair gets auto-promoted to &lt;code&gt;intent_examples&lt;/code&gt; — it becomes part of the training data that future queries retrieve during the schema retrieval step. The system gets smarter from usage.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Thumbs down:&lt;/strong&gt; The agent can optionally add a correction note. The bad Q→SQL pair is flagged. The Redis cache entry for that question is immediately invalidated — the next time someone asks the same question, it goes to the LLM fresh instead of serving the cached wrong answer.&lt;/p&gt;

&lt;p&gt;A nightly job clusters negative feedback to identify recurring failure patterns — questions the system consistently gets wrong — and flags them to &lt;code&gt;intent_gaps&lt;/code&gt; for manual review.&lt;/p&gt;




&lt;h2&gt;
  
  
  What I'd do differently
&lt;/h2&gt;

&lt;p&gt;&lt;strong&gt;Add OpenTelemetry from the start.&lt;/strong&gt; I added structured logging but not distributed tracing. Being able to see a full trace from HTTP request → schema retrieval → LLM call → SQL execution → SSE stream with timing at each step would have made debugging much faster.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Add a "confidence score" to responses.&lt;/strong&gt; Right now, Aria answers with equal confidence whether it's 100% sure or guessing. A mechanism to say "I'm not sure this is right — here's what I think you're asking" would make the product more trustworthy.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Separate the schema pipeline into its own service.&lt;/strong&gt; It's a Python CLI script today. Making it a proper FastAPI service that the Go API can call on-demand (when a new table is detected) would make the incremental update flow smoother.&lt;/p&gt;




&lt;h2&gt;
  
  
  What this project taught me about Go
&lt;/h2&gt;

&lt;p&gt;Four things I didn't expect before building this:&lt;/p&gt;

&lt;p&gt;Go's error handling forces you to think about every failure mode. There's no &lt;code&gt;try/catch&lt;/code&gt; that lets you defer the "what if this fails?" question. Every database call, every API call, every Redis operation has an explicit error path. The code is more verbose but the failure modes are all visible.&lt;/p&gt;

&lt;p&gt;Interfaces in Go make dependency injection natural. My &lt;code&gt;AIService&lt;/code&gt; depends on a &lt;code&gt;SchemaRetriever&lt;/code&gt; interface, not a concrete type. Swapping implementations for tests, or switching from pgvector to a different vector store, requires changing one line.&lt;/p&gt;

&lt;p&gt;The &lt;code&gt;context&lt;/code&gt; package is everywhere and it's the right abstraction. Timeouts, cancellation, request-scoped values (like the authenticated user ID) — all flow through &lt;code&gt;context.Context&lt;/code&gt;. Once you understand why, you stop fighting it.&lt;/p&gt;

&lt;p&gt;Goroutines are cheap but coordination is hard. Running parallel SQL queries with &lt;code&gt;sync.WaitGroup&lt;/code&gt; is easy. Making sure errors from each goroutine are correctly propagated and the context cancels properly when one fails — that took more thought.&lt;/p&gt;




&lt;h2&gt;
  
  
  The numbers
&lt;/h2&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;15 tables&lt;/strong&gt; with relational integrity&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;500 seeded leads&lt;/strong&gt;, 300 tasks, 200 bookings, 150 payments&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;30 pre-seeded intent examples&lt;/strong&gt; from the schema pipeline&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;~90 schema embeddings&lt;/strong&gt; covering every table and column&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Sub-100ms&lt;/strong&gt; response time on cache hits&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;~1.5–2s&lt;/strong&gt; to first token on cache misses (LLM latency)&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;5 second&lt;/strong&gt; SQL execution timeout, hard limit at DB level&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Read-only&lt;/strong&gt; PostgreSQL role — no write access possible&lt;/li&gt;
&lt;/ul&gt;




&lt;h2&gt;
  
  
  The stack
&lt;/h2&gt;

&lt;p&gt;&lt;strong&gt;Backend:&lt;/strong&gt; Go 1.22, Chi router, pgx v5, go-redis v9, official openai-go SDK, golang-jwt&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;AI layer:&lt;/strong&gt; GPT-4o for SQL generation and response formatting, text-embedding-3-small for schema embeddings, pgvector (HNSW index) for similarity search&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Schema pipeline:&lt;/strong&gt; Python 3.12, psycopg2, pgvector Python client, openai Python SDK&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Frontend:&lt;/strong&gt; Next.js 14, TypeScript, Tailwind CSS, Zustand for auth state, React Query&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Infrastructure:&lt;/strong&gt; Docker Compose, PostgreSQL 16, Redis 7&lt;/p&gt;




&lt;h2&gt;
  
  
  The code
&lt;/h2&gt;

&lt;p&gt;Everything is on GitHub: &lt;strong&gt;github.com/Deonkar/Aria&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;It runs with &lt;code&gt;docker compose up&lt;/code&gt;. Run &lt;code&gt;make seed&lt;/code&gt; to populate the database. Run &lt;code&gt;make schema-pipeline&lt;/code&gt; to generate the semantic documentation and embeddings. Then ask it anything about your leads.&lt;/p&gt;

&lt;p&gt;The thing to try first: ask it the same question twice. Watch the first response stream token by token. Watch the second response come back instantly from cache with the "SQL executed" badge still visible. That's the difference between a demo and a system.&lt;/p&gt;




&lt;h2&gt;
  
  
  What's next
&lt;/h2&gt;

&lt;p&gt;The natural next step is the RAG layer — adding support for unstructured data like call transcripts, email threads, and agent notes. "What did we discuss with this lead last week?" can't be answered by SQL alone. That question needs semantic search over unstructured text — which is where pgvector plus proper chunking comes in. The infrastructure is already there. The schema pipeline already uses pgvector for embeddings. Extending it to cover activity bodies and notes is the obvious v2.&lt;/p&gt;

&lt;p&gt;The other direction is multi-agent context — letting admins ask questions across all agents ("which agent has the most stale leads this week?") while keeping agent-level queries scoped. The row-level security layer already handles this via the &lt;code&gt;role&lt;/code&gt; claim in the JWT. It's a configuration change, not an architecture change.&lt;/p&gt;




&lt;p&gt;&lt;em&gt;If you're building something similar or have thoughts on the architecture decisions — particularly around the schema intelligence pipeline or the agent isolation approach — I'd like to hear from you. Drop a comment or find me on LinkedIn.&lt;/em&gt;&lt;/p&gt;




</description>
      <category>go</category>
      <category>ai</category>
      <category>systemdesign</category>
      <category>llm</category>
    </item>
    <item>
      <title>Why I Stopped Using SQS and Built a Kafka System From Scratch</title>
      <dc:creator>Onkar</dc:creator>
      <pubDate>Fri, 01 May 2026 13:10:56 +0000</pubDate>
      <link>https://forem.com/rakno/why-i-stopped-using-sqs-and-built-a-kafka-system-from-scratch-43m5</link>
      <guid>https://forem.com/rakno/why-i-stopped-using-sqs-and-built-a-kafka-system-from-scratch-43m5</guid>
      <description>&lt;h1&gt;
  
  
  Why I Stopped Using SQS and Built a Kafka System From Scratch
&lt;/h1&gt;

&lt;blockquote&gt;
&lt;p&gt;&lt;strong&gt;TL;DR:&lt;/strong&gt; I'd been using SQS and Sidekiq at work for a year. They worked fine — until I needed five services to independently react to the same event without knowing each other existed. This is the story of what broke, what I built, and what Kafka taught me that queues fundamentally cannot.&lt;/p&gt;
&lt;/blockquote&gt;




&lt;h2&gt;
  
  
  The Setup: What I Already Knew
&lt;/h2&gt;

&lt;p&gt;I'm a backend engineer at a PropTech startup. Over the past year I've shipped a lot of infrastructure — event-driven pipelines using AWS SQS, background job queues with Sidekiq and Resque, dead-letter queues, circuit breakers, the works.&lt;/p&gt;

&lt;p&gt;I thought I understood event-driven architecture.&lt;/p&gt;

&lt;p&gt;I didn't. I understood &lt;strong&gt;queues&lt;/strong&gt;. Those are different things.&lt;/p&gt;

&lt;p&gt;The difference didn't click until I tried to build a system where one payment event needed to trigger five completely independent downstream effects — and I realised that with SQS, the architecture I had in my head was simply not possible.&lt;/p&gt;

&lt;p&gt;That realisation sent me down a two-week rabbit hole building &lt;strong&gt;TxFlow&lt;/strong&gt; — a payment event orchestrator built entirely from scratch with Kafka (specifically Redpanda), FastAPI, PostgreSQL, Redis, and a Next.js observability dashboard.&lt;/p&gt;

&lt;p&gt;This post is what I learned.&lt;/p&gt;




&lt;h2&gt;
  
  
  The Problem: One Event, Five Reactions
&lt;/h2&gt;

&lt;p&gt;Here's the scenario that broke my mental model.&lt;/p&gt;

&lt;p&gt;A payment comes in. You need five things to happen:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;Fraud scoring&lt;/strong&gt; — check if the amount is suspicious&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Wallet update&lt;/strong&gt; — debit the sender's balance&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Notification&lt;/strong&gt; — email the user&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Audit log&lt;/strong&gt; — write an immutable record for compliance&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Analytics&lt;/strong&gt; — increment payment counters&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;With SQS, my instinct was: fire four separate messages, one per job type. But that immediately creates problems:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;What if the wallet update succeeds but the audit log message never gets enqueued?&lt;/li&gt;
&lt;li&gt;What if you add a sixth service next month — you have to go back and modify the producer code&lt;/li&gt;
&lt;li&gt;If the analytics service is down, does it block everything? Or does that message just disappear?&lt;/li&gt;
&lt;li&gt;You want to replay three days of events through a new fraud model — how do you do that if the messages were deleted after consumption?&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;The SQS mental model is a &lt;strong&gt;pipe&lt;/strong&gt;. One message goes in, one consumer picks it up, it's gone.&lt;/p&gt;

&lt;p&gt;Kafka's mental model is a &lt;strong&gt;log&lt;/strong&gt;. The message is written and &lt;em&gt;stays&lt;/em&gt;. Any number of independent consumers can read it at their own offset, at their own pace, completely unaware of each other.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;SQS:
Producer → [queue] → Consumer A
                   (message deleted)

Kafka:
Producer → [topic: payments.initiated]
                ↓              ↓              ↓              ↓              ↓
          Consumer A      Consumer B      Consumer C      Consumer D      Consumer E
          (fraud)         (wallet)        (notify)        (audit)         (analytics)
          offset: 42      offset: 38      offset: 42      offset: 41      offset: 40
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Each consumer has its own offset — its own "bookmark" in the log. They move independently. Consumer B being slow doesn't affect Consumer A. Consumer D being down for an hour doesn't lose any messages — it just falls behind, and catches up when it restarts.&lt;/p&gt;

&lt;p&gt;This is the thing that took me a week to fully internalise.&lt;/p&gt;




&lt;h2&gt;
  
  
  Building TxFlow: The Architecture
&lt;/h2&gt;

&lt;p&gt;The system is simple by design. It runs entirely with &lt;code&gt;docker compose up&lt;/code&gt; — no cloud, no real money, no external services. The point is to understand the concepts, not manage infrastructure.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;curl → FastAPI (POST /payment)
         │
         ▼
    PostgreSQL        ← outbox_events table (more on this shortly)
         │
         ▼
    Redpanda (Kafka-compatible broker)
    Topic: payments.initiated (3 partitions, 7-day retention)
         │
    ─────┼──────────────────────────────────────────────────
         │         │           │            │           │
    fraud       wallet      notifier      audit     analytics
    consumer    consumer    consumer    consumer    consumer
    group:fraud group:wallet group:notify group:audit group:analytics
         │         │
         ▼         ▼
    PostgreSQL  PostgreSQL
    fraud_      wallets
    assessments table
         │
    failures → payments.dlq topic → dlq_handler consumer
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Stack:&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Redpanda (Kafka-compatible, runs as a single Docker container — no Zookeeper)&lt;/li&gt;
&lt;li&gt;FastAPI + Python (producer + DLQ handler API)&lt;/li&gt;
&lt;li&gt;PostgreSQL (state: wallets, fraud assessments, audit log, outbox)&lt;/li&gt;
&lt;li&gt;Redis (deduplication keys + analytics counters)&lt;/li&gt;
&lt;li&gt;Next.js + TypeScript (observability dashboard)&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Let me walk through the three decisions that taught me the most.&lt;/p&gt;




&lt;h2&gt;
  
  
  Lesson 1: The Dual-Write Problem (and why the Outbox pattern exists)
&lt;/h2&gt;

&lt;p&gt;When I first wrote the payment producer, I did what felt natural:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="c1"&gt;# The naive approach — DO NOT DO THIS
&lt;/span&gt;&lt;span class="nd"&gt;@app.post&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;/payment&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;create_payment&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;req&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;PaymentRequest&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="n"&gt;payment&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;save_to_database&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;req&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;   &lt;span class="c1"&gt;# Step 1: write to DB
&lt;/span&gt;    &lt;span class="nf"&gt;publish_to_kafka&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;payment&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;          &lt;span class="c1"&gt;# Step 2: publish to Kafka
&lt;/span&gt;    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;status&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;accepted&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This looks fine. It has a critical flaw.&lt;/p&gt;

&lt;p&gt;These are two separate systems. They are not in the same transaction. If the app crashes between step 1 and step 2 — power cut, OOM kill, deployment — the DB has the payment record, but Kafka never received the event. Five consumers are waiting. None of them will ever process this payment. Silently. No error. The data just never flows.&lt;/p&gt;

&lt;p&gt;This is called the &lt;strong&gt;dual-write problem&lt;/strong&gt;. You cannot atomically write to two systems that don't share a transaction boundary.&lt;/p&gt;

&lt;p&gt;The solution is the &lt;strong&gt;Outbox pattern&lt;/strong&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="nd"&gt;@app.post&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;/payment&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;create_payment&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;req&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;PaymentRequest&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="k"&gt;with&lt;/span&gt; &lt;span class="n"&gt;db&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;transaction&lt;/span&gt;&lt;span class="p"&gt;():&lt;/span&gt;
        &lt;span class="nf"&gt;save_to_database&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;req&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

        &lt;span class="c1"&gt;# Write to outbox IN THE SAME TRANSACTION
&lt;/span&gt;        &lt;span class="nf"&gt;insert_outbox_event&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;event_id&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;...,&lt;/span&gt; &lt;span class="n"&gt;payload&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;req&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;published&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="bp"&gt;False&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="c1"&gt;# Transaction committed — the outbox record is the source of truth
&lt;/span&gt;    &lt;span class="c1"&gt;# NOW publish to Kafka (outside the transaction)
&lt;/span&gt;    &lt;span class="k"&gt;try&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="nf"&gt;publish_to_kafka&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;event&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="nf"&gt;mark_outbox_published&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;event_id&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;except&lt;/span&gt; &lt;span class="n"&gt;KafkaException&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="k"&gt;pass&lt;/span&gt;  &lt;span class="c1"&gt;# Background poller will retry this
&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;status&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;accepted&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;And a background poller runs every 30 seconds:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;poll_outbox&lt;/span&gt;&lt;span class="p"&gt;():&lt;/span&gt;
    &lt;span class="k"&gt;while&lt;/span&gt; &lt;span class="bp"&gt;True&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="n"&gt;unpublished&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;get_unpublished_events&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;  &lt;span class="c1"&gt;# WHERE published = FALSE
&lt;/span&gt;        &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;event&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;unpublished&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="nf"&gt;publish_to_kafka&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;event&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
            &lt;span class="nf"&gt;mark_outbox_published&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;event&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;event_id&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="n"&gt;asyncio&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;sleep&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;30&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Now if the app crashes between the Kafka publish and the &lt;code&gt;mark_outbox_published&lt;/code&gt; call, the poller picks it up on the next cycle. The event might be published twice — but that's intentional. "At-least-once delivery" is the guarantee. The consumers handle idempotency (more on that next).&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;The insight:&lt;/strong&gt; the DB is your single source of truth. Kafka is your delivery mechanism. Never treat them as equals — make one subordinate to the other.&lt;/p&gt;




&lt;h2&gt;
  
  
  Lesson 2: Manual Offset Commits (the thing that actually guarantees delivery)
&lt;/h2&gt;

&lt;p&gt;This was the hardest concept to get right, and it's the one most tutorials gloss over.&lt;/p&gt;

&lt;p&gt;Kafka tracks where each consumer group is in the log via an &lt;strong&gt;offset&lt;/strong&gt; — a simple incrementing integer. Consumer group "wallet" is at offset 42 means it has processed messages 0 through 41, and is waiting for message 42.&lt;/p&gt;

&lt;p&gt;By default, Kafka &lt;strong&gt;auto-commits&lt;/strong&gt; offsets on a timer (every 5 seconds). This creates a dangerous window:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;T=0s  — message 42 polled from Kafka
T=1s  — processing begins
T=3s  — AUTO-COMMIT fires — offset 42 committed as "done"
T=4s  — processing throws an exception
T=4s  — consumer restarts
T=4s  — consumer reads from offset 43
          message 42 is GONE. Permanently skipped.
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;A payment was silently dropped because the offset was committed before the side effect completed.&lt;/p&gt;

&lt;p&gt;With &lt;strong&gt;manual offset commit&lt;/strong&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;T=0s  — message 42 polled
T=1s  — processing begins
T=4s  — processing throws an exception
T=4s  — consumer restarts (offset NOT committed — still at 41)
T=4s  — consumer reads message 42 AGAIN and retries it
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;In code, the base consumer pattern looks like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="k"&gt;while&lt;/span&gt; &lt;span class="bp"&gt;True&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
    &lt;span class="n"&gt;msg&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;poll&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;timeout&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mf"&gt;1.0&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;msg&lt;/span&gt; &lt;span class="ow"&gt;is&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="k"&gt;continue&lt;/span&gt;

    &lt;span class="n"&gt;event&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;json&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;loads&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;value&lt;/span&gt;&lt;span class="p"&gt;())&lt;/span&gt;

    &lt;span class="c1"&gt;# Check Redis dedup first
&lt;/span&gt;    &lt;span class="n"&gt;dedup_key&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;wallet:processed:&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;event&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;event_id&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;redis&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;exists&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;dedup_key&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;commit&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;   &lt;span class="c1"&gt;# already processed — commit and skip
&lt;/span&gt;        &lt;span class="k"&gt;continue&lt;/span&gt;

    &lt;span class="c1"&gt;# Try to process
&lt;/span&gt;    &lt;span class="n"&gt;success&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="bp"&gt;False&lt;/span&gt;
    &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;attempt&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="nf"&gt;range&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;MAX_RETRIES&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="k"&gt;try&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="nf"&gt;process_payment&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;event&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;   &lt;span class="c1"&gt;# DB write happens here
&lt;/span&gt;            &lt;span class="n"&gt;success&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="bp"&gt;True&lt;/span&gt;
            &lt;span class="k"&gt;break&lt;/span&gt;
        &lt;span class="k"&gt;except&lt;/span&gt; &lt;span class="nb"&gt;Exception&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="n"&gt;wait&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;BASE_DELAY_MS&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;2&lt;/span&gt; &lt;span class="o"&gt;**&lt;/span&gt; &lt;span class="n"&gt;attempt&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;/&lt;/span&gt; &lt;span class="mi"&gt;1000&lt;/span&gt;
            &lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;sleep&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;wait&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;success&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="n"&gt;redis&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;setex&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;dedup_key&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;DEDUP_TTL&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;1&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;else&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="nf"&gt;publish_to_dlq&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;event&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;consumer_group&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;wallet&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;error&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="nf"&gt;str&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;

    &lt;span class="c1"&gt;# ALWAYS commit offset — even on failure (DLQ handles the event now)
&lt;/span&gt;    &lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;commit&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Two things to notice:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;1. Commit happens last, always.&lt;/strong&gt; Even on failure — once we've published to DLQ, we don't want to keep reprocessing a poison pill event that will never succeed. Commit it, move on.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;2. Redis deduplication is required.&lt;/strong&gt; Because we have at-least-once delivery (producer can publish the same event twice via the outbox poller), and because a consumer might process an event before crashing before committing the offset, the same event can arrive at a consumer multiple times. Without Redis dedup, a payment could debit a wallet twice.&lt;/p&gt;

&lt;p&gt;The contract is: &lt;strong&gt;Kafka guarantees at-least-once. Your consumer guarantees exactly-once side effects via idempotency.&lt;/strong&gt;&lt;/p&gt;




&lt;h2&gt;
  
  
  Lesson 3: The Fan-Out Test (the moment Kafka clicked)
&lt;/h2&gt;

&lt;p&gt;This is the test I ran that made everything concrete.&lt;/p&gt;

&lt;p&gt;I fired a single payment event:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;curl &lt;span class="nt"&gt;-X&lt;/span&gt; POST http://localhost:8000/payment &lt;span class="se"&gt;\&lt;/span&gt;
  &lt;span class="nt"&gt;-H&lt;/span&gt; &lt;span class="s2"&gt;"Content-Type: application/json"&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
  &lt;span class="nt"&gt;-d&lt;/span&gt; &lt;span class="s1"&gt;'{"user_id":"user_001","amount":500,"currency":"USD","idempotency_key":"test-fanout-001"}'&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Then I checked every downstream system:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="c1"&gt;-- Wallet debited&lt;/span&gt;
&lt;span class="k"&gt;SELECT&lt;/span&gt; &lt;span class="n"&gt;balance&lt;/span&gt; &lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="n"&gt;wallets&lt;/span&gt; &lt;span class="k"&gt;WHERE&lt;/span&gt; &lt;span class="n"&gt;user_id&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'user_001'&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="c1"&gt;-- 9500.00 ✓&lt;/span&gt;

&lt;span class="c1"&gt;-- Fraud assessed&lt;/span&gt;
&lt;span class="k"&gt;SELECT&lt;/span&gt; &lt;span class="n"&gt;risk_level&lt;/span&gt; &lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="n"&gt;fraud_assessments&lt;/span&gt; &lt;span class="k"&gt;WHERE&lt;/span&gt; &lt;span class="n"&gt;event_id&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'...'&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="c1"&gt;-- CLEAR ✓&lt;/span&gt;

&lt;span class="c1"&gt;-- Audit record written (immutable, append-only)&lt;/span&gt;
&lt;span class="k"&gt;SELECT&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt; &lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="n"&gt;audit_log&lt;/span&gt; &lt;span class="k"&gt;WHERE&lt;/span&gt; &lt;span class="n"&gt;event_id&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'...'&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="c1"&gt;-- row present ✓&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;





&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="c"&gt;# Notification logged&lt;/span&gt;
docker compose logs consumer-notifier | &lt;span class="nb"&gt;grep &lt;/span&gt;test-fanout-001
&lt;span class="c"&gt;# {"event":"notification_sent","user_id":"user_001","amount":500.0,...} ✓&lt;/span&gt;

&lt;span class="c"&gt;# Analytics incremented&lt;/span&gt;
docker &lt;span class="nb"&gt;exec &lt;/span&gt;txflow-redis redis-cli GET analytics:total_payments
&lt;span class="c"&gt;# 1 ✓&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;One API call. Five side effects. Five completely independent services. Zero coupling — none of the consumers know the others exist. The fraud scorer doesn't call the wallet updater. The audit logger doesn't call the notifier. They all just read from the same Kafka topic, each at their own pace.&lt;/p&gt;

&lt;p&gt;Then I ran the replayability test.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="c"&gt;# Stop the audit consumer&lt;/span&gt;
docker compose stop consumer-audit

&lt;span class="c"&gt;# Fire 10 more events&lt;/span&gt;
./scripts/fire_bulk.sh

&lt;span class="c"&gt;# Redpanda Console shows: audit consumer group lag = 10&lt;/span&gt;
&lt;span class="c"&gt;# Every other consumer: lag = 0 (they processed normally)&lt;/span&gt;

&lt;span class="c"&gt;# Restart audit consumer&lt;/span&gt;
docker compose start consumer-audit

&lt;span class="c"&gt;# Watch it replay all 10 missed events from its last committed offset&lt;/span&gt;
docker compose logs &lt;span class="nt"&gt;-f&lt;/span&gt; consumer-audit
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Every single missed event was processed. The audit log caught up completely.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;This is impossible with SQS.&lt;/strong&gt; When a message is consumed from SQS, it's deleted. There's no log to replay. There's no offset to reset. If the audit service was down when the messages were consumed by the other workers, those events are gone from the queue perspective. You'd have to implement your own replay mechanism from scratch.&lt;/p&gt;

&lt;p&gt;With Kafka, replay is not a feature you implement. It's the default behaviour. The log is the database.&lt;/p&gt;




&lt;h2&gt;
  
  
  Lesson 4: Partitions Are the Scaling Unit
&lt;/h2&gt;

&lt;p&gt;The &lt;code&gt;payments.initiated&lt;/code&gt; topic has 3 partitions. Every payment event is published with &lt;code&gt;user_id&lt;/code&gt; as the partition key.&lt;/p&gt;

&lt;p&gt;This means:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;All events for &lt;code&gt;user_001&lt;/code&gt; always land on partition 0 (hash of "user_001" % 3 = 0)&lt;/li&gt;
&lt;li&gt;All events for &lt;code&gt;user_002&lt;/code&gt; always land on partition 2&lt;/li&gt;
&lt;li&gt;Within a partition, events are strictly ordered&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Why does this matter? Because the wallet consumer needs to process &lt;code&gt;user_001&lt;/code&gt;'s events in order. If two payments arrive simultaneously and get processed in wrong order, the balance calculations could be wrong. Partitioning by &lt;code&gt;user_id&lt;/code&gt; gives you a per-user ordering guarantee for free.&lt;/p&gt;

&lt;p&gt;The other thing partitions determine: &lt;strong&gt;how many parallel consumers you can have in a group.&lt;/strong&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="c"&gt;# Scale wallet consumer to 4 instances&lt;/span&gt;
docker compose up &lt;span class="nt"&gt;--scale&lt;/span&gt; consumer-wallet&lt;span class="o"&gt;=&lt;/span&gt;4
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;What happens: 3 instances each own 1 partition. The 4th sits idle. You can never have more active consumers in a group than you have partitions. This is Kafka's fundamental scaling model — you scale by adding partitions, not just by adding consumers.&lt;/p&gt;

&lt;p&gt;I ran this test with 4 instances, watched the Redpanda Console, and saw exactly one instance sitting there doing nothing. 10 minutes of reading about this is worth less than watching it happen once.&lt;/p&gt;




&lt;h2&gt;
  
  
  The Observability Dashboard
&lt;/h2&gt;

&lt;p&gt;One thing that separates "I ran Kafka" from "I understand Kafka operations" is knowing what to watch.&lt;/p&gt;

&lt;p&gt;The most important metric in Kafka is &lt;strong&gt;consumer lag&lt;/strong&gt; — the difference between the latest message in a topic and the consumer group's current position. Lag = 0 means the consumer is keeping up. Lag = 500 means the consumer is 500 messages behind the producer.&lt;/p&gt;

&lt;p&gt;Consumer lag is Kafka's equivalent of Sidekiq queue depth. If it's rising, something is wrong — either your consumer is too slow, there's an exception in the processing logic, or the consumer is down entirely.&lt;/p&gt;

&lt;p&gt;The dashboard polls the Redpanda Admin API every 5 seconds and colour-codes lag per consumer group:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;Green&lt;/strong&gt; — lag = 0, all caught up&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Amber&lt;/strong&gt; — lag 1–20, slight backlog&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Red&lt;/strong&gt; — lag &amp;gt; 20, needs attention&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;I also wired in the DLQ event table and analytics counters. Running &lt;code&gt;fire_bulk.sh&lt;/code&gt; (20 events in rapid succession) and watching the lag spike and drain in real time made the whole system feel alive in a way that just reading logs never does.&lt;/p&gt;




&lt;h2&gt;
  
  
  What I'd Do Differently
&lt;/h2&gt;

&lt;p&gt;&lt;strong&gt;Use a schema registry from the start.&lt;/strong&gt; I serialised events as plain JSON. That's fine for a POC, but in production it's a trap — change the event schema and you silently break consumers. Redpanda includes a schema registry compatible with Avro. I plan to add this as a stretch goal, but I wish I'd built it in from the start.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Add consumer lag alerting earlier.&lt;/strong&gt; Rising lag is the first signal of a problem. I'd add a simple threshold alert (lag &amp;gt; 50 for &amp;gt; 60 seconds = log a structured alert) as part of the base consumer, not as an afterthought.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Test the poison pill scenario deliberately.&lt;/strong&gt; A poison pill is a message that will never succeed — malformed data, a downstream service that's permanently broken for that record type. Without handling it explicitly, a poison pill will cause your consumer to retry forever, never advancing its offset, completely blocking that partition. I added DLQ handling but I should've stress-tested it earlier.&lt;/p&gt;




&lt;h2&gt;
  
  
  SQS vs Kafka — When to Use Which
&lt;/h2&gt;

&lt;p&gt;I want to be clear: this isn't "Kafka is better than SQS." They solve different problems.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Use SQS when:&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;You have one consumer per job type&lt;/li&gt;
&lt;li&gt;Messages can be deleted after processing&lt;/li&gt;
&lt;li&gt;You don't need replayability&lt;/li&gt;
&lt;li&gt;You're already on AWS and want managed infrastructure&lt;/li&gt;
&lt;li&gt;The scale is modest and operational simplicity matters&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;strong&gt;Use Kafka when:&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Multiple independent consumers need to react to the same event&lt;/li&gt;
&lt;li&gt;You need replayability (new service, bug fix, backfill)&lt;/li&gt;
&lt;li&gt;Event ordering within a key matters&lt;/li&gt;
&lt;li&gt;You want to decouple producers from consumers at an architectural level&lt;/li&gt;
&lt;li&gt;You're building something that will grow to high throughput&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;The payment system I described — where fraud scoring, wallet updates, notifications, audit logging, and analytics all react to the same event independently — is a textbook Kafka use case. SQS would work, but you'd be fighting the tool.&lt;/p&gt;




&lt;h2&gt;
  
  
  What Building This Taught Me About Backend Engineering
&lt;/h2&gt;

&lt;p&gt;The deeper lesson isn't about Kafka specifically. It's about what "event-driven architecture" actually means.&lt;/p&gt;

&lt;p&gt;Before this project, I used the phrase "event-driven" to mean "I use a message queue." That's not wrong, but it's incomplete. True event-driven architecture means the event is the primary citizen. Services don't call each other — they react to facts that have already been recorded. The payment happened. The event is a statement of that fact. Any service that cares about it can react. Any service that doesn't, ignores it. Adding a new service next month requires touching zero existing code.&lt;/p&gt;

&lt;p&gt;The log is the system. Everything else is a view.&lt;/p&gt;

&lt;p&gt;That's the mental model shift that Kafka forces, and it's worth building a project from scratch just to internalise it.&lt;/p&gt;




&lt;h2&gt;
  
  
  The Code
&lt;/h2&gt;

&lt;p&gt;Everything is on GitHub: &lt;strong&gt;github.com/Deonkar/txFlow_build_tasks&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;It runs with a single &lt;code&gt;docker compose up&lt;/code&gt;. Fire a payment with &lt;code&gt;./scripts/fire_event.sh&lt;/code&gt;. Watch five things happen simultaneously.&lt;/p&gt;

&lt;p&gt;If you're coming from SQS or RabbitMQ or Sidekiq, the thing to run first is the replayability test — stop one consumer, fire 10 events, restart it, watch it catch up. That 30-second test will reframe how you think about message processing.&lt;/p&gt;




&lt;p&gt;&lt;em&gt;If you found this useful or have questions about any of the implementation decisions, drop a comment. I'm particularly interested in hearing from people who've run Kafka at production scale — there's definitely more to learn about partition rebalancing, exactly-once semantics, and schema evolution.&lt;/em&gt;&lt;/p&gt;




&lt;p&gt;&lt;strong&gt;Tags:&lt;/strong&gt; &lt;code&gt;kafka&lt;/code&gt; &lt;code&gt;backend&lt;/code&gt; &lt;code&gt;python&lt;/code&gt; &lt;code&gt;systemdesign&lt;/code&gt; &lt;code&gt;distributedsystems&lt;/code&gt;&lt;/p&gt;

</description>
      <category>kafka</category>
      <category>python</category>
      <category>systemdesign</category>
      <category>distributedsystems</category>
    </item>
  </channel>
</rss>
