<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>Forem: SNEHASISH DUTTA</title>
    <description>The latest articles on Forem by SNEHASISH DUTTA (@datasosneh).</description>
    <link>https://forem.com/datasosneh</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F1649477%2Fae4410a9-fa5e-414b-a077-bc87ddff5528.png</url>
      <title>Forem: SNEHASISH DUTTA</title>
      <link>https://forem.com/datasosneh</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://forem.com/feed/datasosneh"/>
    <language>en</language>
    <item>
      <title>From Reddit Trolls to Real-Time Analytics: Building an LLM-Powered Flink Deployment System</title>
      <dc:creator>SNEHASISH DUTTA</dc:creator>
      <pubDate>Thu, 29 May 2025 20:51:30 +0000</pubDate>
      <link>https://forem.com/datasosneh/from-reddit-trolls-to-real-time-analytics-building-an-llm-powered-flink-deployment-system-25o3</link>
      <guid>https://forem.com/datasosneh/from-reddit-trolls-to-real-time-analytics-building-an-llm-powered-flink-deployment-system-25o3</guid>
      <description>&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F6u4w1tkdofnl3o11pr5t.jpeg" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F6u4w1tkdofnl3o11pr5t.jpeg" alt="sshot" width="800" height="1244"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  The Origin Story: When Reddit Roasts Spark Innovation
&lt;/h2&gt;

&lt;p&gt;Picture this: You're a data engineer scrolling through Reddit, genuinely asking about emerging AI trends to stay ahead of the curve. You post a thoughtful question about what new technologies you should learn, expecting insights about MLOps, vector databases, or maybe the latest streaming frameworks.&lt;/p&gt;

&lt;p&gt;Instead, you get: &lt;em&gt;"You still have time to sell yourself on OnlyFans."&lt;/em&gt;&lt;/p&gt;

&lt;p&gt;Most people would roll their eyes and move on. But sometimes, the most ridiculous comments spark the most interesting ideas. What if we took that sarcastic comment and turned it into a legitimate technical challenge? What if we built a sophisticated real-time data processing system that could handle the scale and complexity of a content platform, complete with an AI-powered deployment interface?&lt;/p&gt;

&lt;p&gt;That's exactly what happened here, and the result is a fascinating exploration of modern data engineering architecture that combines LLM-powered DevOps automation with Apache Flink streaming processing.&lt;/p&gt;

&lt;h2&gt;
  
  
  The Technical Vision: Beyond the Meme
&lt;/h2&gt;

&lt;p&gt;What started as a Reddit joke evolved into a comprehensive demonstration of cutting-edge data engineering patterns:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;Natural Language DevOps&lt;/strong&gt;: Using OpenAI GPT-4 to parse deployment commands and automatically provision Apache Flink jobs&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Real-Time Stream Processing&lt;/strong&gt;: Apache Flink jobs processing events with sub-second latency&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Modern Data Lakehouse&lt;/strong&gt;: Apache Iceberg tables providing ACID transactions and schema evolution&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Event-Driven Architecture&lt;/strong&gt;: Kafka-based event streaming with automatic scaling&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Infrastructure as Code&lt;/strong&gt;: Complete Docker Compose orchestration for reproducible deployments&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;The system architecture demonstrates enterprise-grade patterns while maintaining the flexibility to experiment with emerging technologies.&lt;/p&gt;

&lt;h2&gt;
  
  
  System Architecture: Three Pillars of Modern Data Processing
&lt;/h2&gt;

&lt;h3&gt;
  
  
  1. Event Publisher: The Data Generator
&lt;/h3&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;┌─────────────────┐    ┌──────────────┐    ┌─────────────────┐
│  Go Publisher   │───▶│   Redpanda   │───▶│  Event Topics   │
│                 │    │   (Kafka)    │    │                 │
│ • GPU Temp Sim  │    │              │    │ • content       │
│ • Configurable  │    │ • Multi-node │    │ • creator       │
│ • Docker Ready  │    │ • Web UI     │    │ • temperature   │
└─────────────────┘    └──────────────┘    └─────────────────┘
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The first component simulates realistic event streams. While themed around content platforms, it's actually generating GPU temperature data - a perfect proxy for any time-series monitoring system. The publisher includes:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Smart Simulation Features:&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Configurable anomaly injection (5% abnormal readings by default)&lt;/li&gt;
&lt;li&gt;Multiple device simulation (scalable from 1 to N devices)&lt;/li&gt;
&lt;li&gt;Adjustable publishing intervals (milliseconds to minutes)&lt;/li&gt;
&lt;li&gt;Built-in Docker orchestration&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;strong&gt;Production-Ready Architecture:&lt;/strong&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight go"&gt;&lt;code&gt;&lt;span class="k"&gt;type&lt;/span&gt; &lt;span class="n"&gt;TemperatureReading&lt;/span&gt; &lt;span class="k"&gt;struct&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;DeviceID&lt;/span&gt;    &lt;span class="kt"&gt;string&lt;/span&gt;    &lt;span class="s"&gt;`json:"device_id"`&lt;/span&gt;
    &lt;span class="n"&gt;Temperature&lt;/span&gt; &lt;span class="kt"&gt;float64&lt;/span&gt;   &lt;span class="s"&gt;`json:"temperature"`&lt;/span&gt;
    &lt;span class="n"&gt;IsAbnormal&lt;/span&gt;  &lt;span class="kt"&gt;bool&lt;/span&gt;      &lt;span class="s"&gt;`json:"is_abnormal"`&lt;/span&gt;
    &lt;span class="n"&gt;Timestamp&lt;/span&gt;   &lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Time&lt;/span&gt; &lt;span class="s"&gt;`json:"timestamp"`&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The publisher demonstrates real-world patterns for event generation, including proper error handling, graceful shutdowns, and configurable parameters through environment variables.&lt;/p&gt;

&lt;h3&gt;
  
  
  2. LLM-Powered Deployment Service: The AI Operations Layer
&lt;/h3&gt;

&lt;p&gt;This is where things get interesting. Instead of traditional deployment scripts or complex CI/CD pipelines, the system uses OpenAI GPT-4 to interpret natural language commands and automatically deploy Apache Flink jobs.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;┌─────────────────┐    ┌──────────────┐    ┌─────────────────┐
│   Chat Input    │───▶│  OpenAI GPT  │───▶│  Flink Jobs     │
│                 │    │              │    │                 │
│ "deploy content │    │ • Parse NL   │    │ • Auto Deploy   │
│  event".        │    │ • Validate   │    │ • Docker/CLI    │
│                 │    │ • Generate   │    │ • Monitoring    │
└─────────────────┘    └──────────────┘    └─────────────────┘
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Natural Language Processing Examples:&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;em&gt;"deploy content event processor"&lt;/em&gt; → Launches content stream processing job&lt;/li&gt;
&lt;li&gt;
&lt;em&gt;"I need creator analytics running"&lt;/em&gt; → Deploys creator event processor&lt;/li&gt;
&lt;li&gt;
&lt;em&gt;"start processing video events"&lt;/em&gt; → Spins up video content pipeline&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;strong&gt;Dual Deployment Strategies:&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Docker-Based Deployment:&lt;/strong&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight go"&gt;&lt;code&gt;&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;d&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;DockerClient&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="n"&gt;deployFlinkJob&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;eventType&lt;/span&gt; &lt;span class="kt"&gt;string&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;JobInfo&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="kt"&gt;error&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;containerName&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;fmt&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Sprintf&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"flink-%s-processor-%s"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; 
        &lt;span class="n"&gt;eventType&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Now&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Format&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"20060102-150405"&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;

    &lt;span class="c"&gt;// Create container with automatic port assignment&lt;/span&gt;
    &lt;span class="n"&gt;config&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;container&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Config&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;Image&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="s"&gt;"flink-event-processor:latest"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;ExposedPorts&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="n"&gt;nat&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;PortSet&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="s"&gt;"8081/tcp"&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="k"&gt;struct&lt;/span&gt;&lt;span class="p"&gt;{}{}},&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;d&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;createAndStartContainer&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;containerName&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;config&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;CLI-Based Deployment:&lt;/strong&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight go"&gt;&lt;code&gt;&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;f&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;FlinkClient&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="n"&gt;submitJob&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;eventType&lt;/span&gt; &lt;span class="kt"&gt;string&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;JobInfo&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="kt"&gt;error&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;cmd&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;exec&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Command&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"flink"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"run"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="s"&gt;"--jobmanager"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;f&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;config&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;JobManagerAddress&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="s"&gt;"--class"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"com.eventprocessor.FlinkStreamingJob"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;f&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;config&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;JarPath&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="s"&gt;"--event-type"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;eventType&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;f&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;executeWithMonitoring&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;cmd&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The service provides intelligent error handling, automatic retry logic, and comprehensive monitoring integration.&lt;/p&gt;

&lt;h3&gt;
  
  
  3. Flink Event Processor: The Stream Processing Engine
&lt;/h3&gt;

&lt;p&gt;The heart of the system is a sophisticated Apache Flink application that processes multiple event types in real-time. This isn't a toy example - it's a production-ready streaming application with proper error handling, exactly-once processing guarantees, and multiple sink strategies.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;┌─────────────────┐    ┌──────────────┐    ┌─────────────────┐
│  Kafka Source   │───▶│ Flink Stream │───▶│ Iceberg Tables  │
│                 │    │              │    │                 │
│ • Content Events│    │ • Transform  │    │ • ACID Trans    │
│ • Creator Events│    │ • Validate   │    │ • Time Travel   │
│ • Temp Events   │    │ • Enrich     │    │ • Schema Evolve │
└─────────────────┘    └──────────────┘    └─────────────────┘
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Event Processing Architecture:&lt;/strong&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;FlinkStreamingJob&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;static&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;main&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;[]&lt;/span&gt; &lt;span class="n"&gt;args&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="kd"&gt;throws&lt;/span&gt; &lt;span class="nc"&gt;Exception&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="nc"&gt;StreamExecutionEnvironment&lt;/span&gt; &lt;span class="n"&gt;env&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;StreamExecutionEnvironment&lt;/span&gt;
            &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getExecutionEnvironment&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;

        &lt;span class="c1"&gt;// Configure for production&lt;/span&gt;
        &lt;span class="n"&gt;env&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;enableCheckpointing&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;30000&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;  &lt;span class="c1"&gt;// 30-second checkpoints&lt;/span&gt;
        &lt;span class="n"&gt;env&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getCheckpointConfig&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;
           &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setCheckpointingMode&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;CheckpointingMode&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;EXACTLY_ONCE&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

        &lt;span class="c1"&gt;// Create event-specific processors&lt;/span&gt;
        &lt;span class="nc"&gt;EventProcessorFactory&lt;/span&gt; &lt;span class="n"&gt;factory&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;EventProcessorFactory&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
        &lt;span class="nc"&gt;BaseEventProcessor&lt;/span&gt; &lt;span class="n"&gt;processor&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;factory&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;createProcessor&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;eventType&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

        &lt;span class="c1"&gt;// Execute streaming pipeline&lt;/span&gt;
        &lt;span class="n"&gt;processor&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;buildPipeline&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;env&lt;/span&gt;&lt;span class="o"&gt;).&lt;/span&gt;&lt;span class="na"&gt;execute&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Multi-Event Support:&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;The system processes different event types with specialized handling:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="c1"&gt;// Content Events&lt;/span&gt;
&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;ContentEvent&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;id&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;creatorId&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;title&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;contentType&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="nc"&gt;BigDecimal&lt;/span&gt; &lt;span class="n"&gt;price&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="nc"&gt;Long&lt;/span&gt; &lt;span class="n"&gt;viewCount&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="nc"&gt;Boolean&lt;/span&gt; &lt;span class="n"&gt;isLocked&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="nc"&gt;List&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;tags&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="c1"&gt;// ... additional fields&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;

&lt;span class="c1"&gt;// Creator Events  &lt;/span&gt;
&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;CreatorEvent&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;id&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;username&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;displayName&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="nc"&gt;Boolean&lt;/span&gt; &lt;span class="n"&gt;isVerified&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="nc"&gt;Long&lt;/span&gt; &lt;span class="n"&gt;subscriberCount&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="nc"&gt;BigDecimal&lt;/span&gt; &lt;span class="n"&gt;monthlyPrice&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;category&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="c1"&gt;// ... additional fields&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Advanced Storage Integration:&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;The system supports multiple storage strategies, from simple file sinks to full Apache Iceberg integration:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;IcebergTableManager&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;createTable&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;tableName&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;Schema&lt;/span&gt; &lt;span class="n"&gt;schema&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="nc"&gt;Table&lt;/span&gt; &lt;span class="n"&gt;table&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;catalog&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;buildTable&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;TableIdentifier&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;of&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"default"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;tableName&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt;
            &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;withSchema&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;schema&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
            &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;withPartitionSpec&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;PartitionSpec&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;builderFor&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;schema&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
                &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;day&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"created_at"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
                &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;build&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt;
            &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;withProperty&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;TableProperties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;FORMAT_VERSION&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"2"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
            &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;create&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;

    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="nc"&gt;DataStream&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;Row&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="nf"&gt;createIcebergSink&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;DataStream&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="no"&gt;T&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;stream&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;stream&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;sinkTo&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;
            &lt;span class="nc"&gt;IcebergSinks&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;forRow&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;table&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;TableSchema&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;fromTypeInfo&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;typeInfo&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt;
                &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;build&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Key Technical Innovations
&lt;/h2&gt;

&lt;h3&gt;
  
  
  1. LLM-Powered Infrastructure Automation
&lt;/h3&gt;

&lt;p&gt;The most fascinating aspect of this system is the natural language interface for infrastructure deployment. Instead of remembering complex CLI commands or navigating web UIs, operators can use conversational language:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;User: "deploy creator event"
System: 🤖 Processing: deploy creator event 
        📋 Parsed command: deploy creator event  
        🚀 Submitting creator processing job...
        ✅ Successfully submitted creator processor job: flink-creator-processor-20240529-143022
        🌐 Monitor at: http://localhost:8081
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This demonstrates a powerful pattern for the future of DevOps: using LLMs to abstract away the complexity of infrastructure management while maintaining full control and visibility.&lt;/p&gt;

&lt;h3&gt;
  
  
  2. Hybrid Deployment Architecture
&lt;/h3&gt;

&lt;p&gt;The system supports both containerized and traditional CLI-based deployments, providing flexibility for different operational environments:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;Docker Deployment&lt;/strong&gt;: Perfect for development, testing, and containerized production environments&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;CLI Deployment&lt;/strong&gt;: Integrates with existing Flink clusters and traditional operational workflows&lt;/li&gt;
&lt;/ul&gt;

&lt;h3&gt;
  
  
  3. Modern Data Lakehouse Patterns
&lt;/h3&gt;

&lt;p&gt;The Apache Iceberg integration showcases modern data lakehouse architecture:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;ACID Transactions&lt;/strong&gt;: Ensuring data consistency even with concurrent writers&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Schema Evolution&lt;/strong&gt;: Adding new fields without breaking existing queries
&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Time Travel&lt;/strong&gt;: Querying historical states of data&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Partition Management&lt;/strong&gt;: Automatic daily partitioning for optimal query performance&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Performance and Scalability Considerations
&lt;/h2&gt;

&lt;p&gt;The system is designed with production scalability in mind:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Flink Configuration:&lt;/strong&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="c1"&gt;// Optimized for throughput&lt;/span&gt;
&lt;span class="n"&gt;env&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setParallelism&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;4&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="n"&gt;env&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getConfig&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;setLatencyTrackingInterval&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;1000&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

&lt;span class="c1"&gt;// Memory management&lt;/span&gt;
&lt;span class="nc"&gt;Configuration&lt;/span&gt; &lt;span class="n"&gt;config&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;Configuration&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
&lt;span class="n"&gt;config&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setString&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"taskmanager.memory.process.size"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"2g"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="n"&gt;config&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setString&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"jobmanager.memory.process.size"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"1g"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Kafka Integration:&lt;/strong&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="c1"&gt;// High-throughput consumer configuration&lt;/span&gt;
&lt;span class="nc"&gt;Properties&lt;/span&gt; &lt;span class="n"&gt;properties&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;Properties&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setProperty&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"fetch.min.bytes"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"1048576"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;  &lt;span class="c1"&gt;// 1MB&lt;/span&gt;
&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setProperty&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"fetch.max.wait.ms"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"500"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setProperty&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"max.partition.fetch.bytes"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"10485760"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;  &lt;span class="c1"&gt;// 10MB&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Monitoring and Observability:&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;The system includes comprehensive monitoring:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Flink Web UI for job monitoring and metrics&lt;/li&gt;
&lt;li&gt;Structured logging with configurable levels&lt;/li&gt;
&lt;li&gt;Docker container health checks&lt;/li&gt;
&lt;li&gt;Kafka consumer lag monitoring&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Real-World Applications
&lt;/h2&gt;

&lt;p&gt;While the "OnlyFans" theming is obviously humorous, the underlying architecture patterns are applicable to numerous real-world scenarios:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Content Platforms:&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Video streaming analytics&lt;/li&gt;
&lt;li&gt;User engagement tracking&lt;/li&gt;
&lt;li&gt;Content recommendation engines&lt;/li&gt;
&lt;li&gt;Creator monetization systems&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;strong&gt;IoT and Monitoring:&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Sensor data processing (the GPU temperature simulation)&lt;/li&gt;
&lt;li&gt;Infrastructure monitoring&lt;/li&gt;
&lt;li&gt;Anomaly detection systems&lt;/li&gt;
&lt;li&gt;Predictive maintenance&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;strong&gt;Financial Services:&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Transaction processing&lt;/li&gt;
&lt;li&gt;Risk assessment&lt;/li&gt;
&lt;li&gt;Fraud detection&lt;/li&gt;
&lt;li&gt;Regulatory reporting&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;strong&gt;E-commerce:&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;User behavior analytics&lt;/li&gt;
&lt;li&gt;Inventory management&lt;/li&gt;
&lt;li&gt;Price optimization&lt;/li&gt;
&lt;li&gt;Recommendation systems&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Lessons Learned and Technical Insights
&lt;/h2&gt;

&lt;h3&gt;
  
  
  1. LLM Integration Complexity
&lt;/h3&gt;

&lt;p&gt;Integrating LLMs into operational systems requires careful consideration of:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;Error Handling&lt;/strong&gt;: What happens when the LLM misinterprets a command?&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Cost Management&lt;/strong&gt;: OpenAI API costs can accumulate quickly in production&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Latency&lt;/strong&gt;: Adding an LLM call adds 1-3 seconds to deployment workflows&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Security&lt;/strong&gt;: Ensuring the LLM can't be tricked into executing malicious commands&lt;/li&gt;
&lt;/ul&gt;

&lt;h3&gt;
  
  
  2. Multi-Language Microservices
&lt;/h3&gt;

&lt;p&gt;The combination of Go (for the LLM service) and Java (for Flink processing) demonstrates the power of polyglot architectures:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;Go&lt;/strong&gt;: Excellent for HTTP services, concurrent operations, and simple deployment&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Java&lt;/strong&gt;: Rich ecosystem for data processing, mature Flink integration, robust type systems&lt;/li&gt;
&lt;/ul&gt;

&lt;h3&gt;
  
  
  3. Stream Processing Design Patterns
&lt;/h3&gt;

&lt;p&gt;The Flink application showcases several important patterns:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;Factory Pattern&lt;/strong&gt;: For creating event-specific processors&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Strategy Pattern&lt;/strong&gt;: For different sink implementations (files vs. Iceberg)&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Builder Pattern&lt;/strong&gt;: For configuring complex streaming pipelines&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Future Enhancements and Roadmap
&lt;/h2&gt;

&lt;p&gt;The system provides a solid foundation for several interesting extensions:&lt;/p&gt;

&lt;h3&gt;
  
  
  1. Advanced LLM Capabilities
&lt;/h3&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;Multi-step Deployments&lt;/strong&gt;: "Deploy a content processing pipeline with anomaly detection"&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Resource Optimization&lt;/strong&gt;: LLM-driven resource allocation based on workload patterns&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Troubleshooting Assistant&lt;/strong&gt;: AI-powered diagnosis of failing jobs&lt;/li&gt;
&lt;/ul&gt;

&lt;h3&gt;
  
  
  2. Enhanced Stream Processing
&lt;/h3&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;Machine Learning Integration&lt;/strong&gt;: Real-time feature engineering and model serving&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Complex Event Processing&lt;/strong&gt;: Pattern detection across multiple event streams&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Auto-scaling&lt;/strong&gt;: Dynamic parallelism adjustment based on throughput&lt;/li&gt;
&lt;/ul&gt;

&lt;h3&gt;
  
  
  3. Operational Excellence
&lt;/h3&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;GitOps Integration&lt;/strong&gt;: Version control for deployment configurations&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Multi-tenancy&lt;/strong&gt;: Support for multiple teams and environments&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Advanced Monitoring&lt;/strong&gt;: Custom metrics and alerting integrations&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Conclusion: From Meme to Modern Architecture
&lt;/h2&gt;

&lt;p&gt;What started as a sarcastic Reddit comment evolved into a legitimate exploration of cutting-edge data engineering patterns. The system demonstrates several important trends in modern data infrastructure:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;AI-Powered Operations&lt;/strong&gt;: Using LLMs to simplify complex operational tasks&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Event-Driven Architecture&lt;/strong&gt;: Building resilient, scalable systems around event streams&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Modern Data Lakehouse&lt;/strong&gt;: Combining the flexibility of data lakes with the reliability of data warehouses&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Polyglot Microservices&lt;/strong&gt;: Choosing the right tool for each specific task&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;The technical implementation showcases production-ready patterns while maintaining the experimental spirit needed to explore emerging technologies. It proves that sometimes the best innovations come from the most unexpected inspirations.&lt;/p&gt;

&lt;p&gt;Whether you're building content platforms, IoT systems, or financial services, the architectural patterns demonstrated here provide a solid foundation for modern real-time data processing systems. And if nothing else, it's a reminder that great engineering can emerge from the most unlikely sources - even Reddit trolls.&lt;/p&gt;




&lt;p&gt;&lt;strong&gt;Technologies Used:&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Apache Flink 1.17.1 (Stream Processing)&lt;/li&gt;
&lt;li&gt;Apache Iceberg 1.3.1 (Data Lakehouse)&lt;/li&gt;
&lt;li&gt;Apache Kafka / Redpanda (Event Streaming)&lt;/li&gt;
&lt;li&gt;OpenAI GPT-4 (Natural Language Processing)&lt;/li&gt;
&lt;li&gt;Go 1.21+ (LLM Service)&lt;/li&gt;
&lt;li&gt;Java 11+ (Stream Processing)&lt;/li&gt;
&lt;li&gt;Docker &amp;amp; Docker Compose (Orchestration)&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;strong&gt;Repository Links:&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;a href="https://github.com/snepar/onlyfans-event-publisher" rel="noopener noreferrer"&gt;Event Publisher&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;
&lt;a href="https://github.com/snepar/go-llm-service" rel="noopener noreferrer"&gt;LLM Service&lt;/a&gt;
&lt;/li&gt;
&lt;li&gt;&lt;a href="https://github.com/snepar/onlyfans-flink-event-processor" rel="noopener noreferrer"&gt;Flink Processor&lt;/a&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;em&gt;Built with ❤️ and a healthy sense of humor about Reddit comments.&lt;/em&gt;&lt;/p&gt;

</description>
      <category>eventdriven</category>
      <category>dataengineering</category>
      <category>llm</category>
      <category>go</category>
    </item>
    <item>
      <title>"Our GPUs Are Melting": Building a Real-Time Streaming System with Go and Redpanda Ghibli Style</title>
      <dc:creator>SNEHASISH DUTTA</dc:creator>
      <pubDate>Wed, 02 Apr 2025 19:55:00 +0000</pubDate>
      <link>https://forem.com/datasosneh/kafka-with-go-ghibli-style-3m04</link>
      <guid>https://forem.com/datasosneh/kafka-with-go-ghibli-style-3m04</guid>
      <description>&lt;h2&gt;
  
  
  Introduction
&lt;/h2&gt;

&lt;p&gt;In the world of AI research and development, computational resources are pushed to their limits. This fact was humorously highlighted by OpenAI CEO Sam Altman's memorable tweet: &lt;strong&gt;"our GPUs are melting"&lt;/strong&gt; - a testament to the intense workloads these specialized processors endure during advanced AI training.&lt;br&gt;
While Altman's tweet was likely hyperbolic, the concern about GPU health is very real. Modern data centers and AI research facilities invest millions in GPU infrastructure, making temperature monitoring a critical operational concern. Overheating can lead to hardware damage, reduced lifespan, and even catastrophic failures that interrupt crucial workloads.&lt;br&gt;
This article explores a practical implementation of a GPU temperature monitoring system built with Go (Golang). We'll demonstrate how to create a complete pipeline for:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;Generating simulated GPU temperature data&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Publishing these events to Redpanda (a Kafka API-compatible streaming platform)&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Consuming these events in real-time&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Triggering alerts when temperatures exceed critical thresholds&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;Go's combination of performance, concurrency model, and straightforward syntax makes it an excellent choice for building such systems. &lt;/p&gt;

&lt;p&gt;The solution we'll explore is lightweight yet robust, capable of handling thousands of events per second while maintaining low latency - perfect for mission-critical monitoring applications.&lt;/p&gt;

&lt;p&gt;Whether you're managing a small GPU cluster for research or a massive data center for production AI workloads, the patterns demonstrated here can be adapted to build a reliable early warning system that might just prevent your own GPUs from metaphorically (or literally) melting down.&lt;/p&gt;
&lt;h2&gt;
  
  
  Architecture Overview
&lt;/h2&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fm0pt55k57yagjsy19xdy.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fm0pt55k57yagjsy19xdy.png" alt="architecture" width="785" height="117"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;The GPU Temperature Monitoring System employs an event-driven microservices architecture to track GPU temperatures in real-time and alert on potentially dangerous conditions. Built with Go, it leverages Redpanda (Kafka API-compatible) as the central event streaming platform.&lt;/p&gt;
&lt;h3&gt;
  
  
  Core Components
&lt;/h3&gt;

&lt;p&gt;&lt;strong&gt;Temperature Publisher Service&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Purpose&lt;/strong&gt;: Simulates/collects GPU temperature readings and publishes to Redpanda&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Key Features&lt;/strong&gt;:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Configurable simulation of multiple GPU devices&lt;/li&gt;
&lt;li&gt;Randomized temperature patterns with occasional anomalies&lt;/li&gt;
&lt;li&gt;Efficient event serialization using JSON&lt;/li&gt;
&lt;li&gt;Batched publishing for throughput optimization&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;strong&gt;Alert Consumer Service&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Purpose:&lt;/strong&gt; Monitors temperature stream and triggers alerts when thresholds are exceeded&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Key Features&lt;/strong&gt;:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Real-time consumption of temperature events&lt;/li&gt;
&lt;li&gt;Threshold-based alerting logic with configurable thresholds&lt;/li&gt;
&lt;li&gt;Cooldown mechanism to prevent alert storms&lt;/li&gt;
&lt;li&gt;Integration with Slack for immediate notifications&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;strong&gt;Redpanda (Event Streaming Platform)&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Purpose: Provides reliable, high-throughput messaging between services&lt;br&gt;
Configuration:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Two-node cluster for redundancy&lt;/li&gt;
&lt;li&gt;Topic-based event segregation&lt;/li&gt;
&lt;li&gt;Persistent storage of temperature data&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;strong&gt;Data Flow&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;GPU temperature readings are generated/collected by the Publisher&lt;/li&gt;
&lt;li&gt;Events are serialized to JSON and published to the gpu-temperature topic&lt;/li&gt;
&lt;li&gt;Alert Consumer subscribes to the topic and processes each reading&lt;/li&gt;
&lt;li&gt;When temperatures exceed thresholds, the Consumer formats and sends Slack alerts&lt;/li&gt;
&lt;li&gt;All temperature data remains available in Redpanda for historical analysis&lt;/li&gt;
&lt;/ul&gt;
&lt;h3&gt;
  
  
  Technical Considerations
&lt;/h3&gt;

&lt;p&gt;&lt;strong&gt;Scalability:&lt;/strong&gt; Both services can be horizontally scaled to handle more GPUs&lt;br&gt;
&lt;strong&gt;Resilience:&lt;/strong&gt; Components reconnect automatically after network disruptions&lt;br&gt;
&lt;strong&gt;Observability:&lt;/strong&gt; Structured logging throughout the pipeline&lt;br&gt;
&lt;strong&gt;Configuration:&lt;/strong&gt; Environment-based configuration for deployment flexibility&lt;br&gt;
&lt;strong&gt;Containerization:&lt;/strong&gt; Docker-based deployment for consistent environments&lt;/p&gt;

&lt;p&gt;This architecture demonstrates how Go's concurrency model and Redpanda's streaming capabilities can be combined to create an efficient, real-time monitoring system that helps prevent hardware damage from GPU overheating conditions.&lt;/p&gt;


&lt;h2&gt;
  
  
  Temperature Publisher Service :: Producer
&lt;/h2&gt;

&lt;p&gt;&lt;strong&gt;Repository:&lt;/strong&gt; &lt;a href="https://github.com/snepar/gpu-temp-publisher" rel="noopener noreferrer"&gt;https://github.com/snepar/gpu-temp-publisher&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;The Temperature Publisher Service forms the foundation of our GPU monitoring system. Written in Go, this service simulates multiple GPU devices and publishes temperature readings to Redpanda. Let's explore the core components with key code snippets.&lt;/p&gt;
&lt;h3&gt;
  
  
  Data Model
&lt;/h3&gt;

&lt;p&gt;At the heart of our system is the temperature reading model:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;type TemperatureReading struct {
    DeviceID     string    `json:"device_id"`
    Temperature  float64   `json:"temperature"`
    Timestamp    time.Time `json:"timestamp"`
    GPUModel     string    `json:"gpu_model"`
    HostName     string    `json:"host_name"`
    PowerConsume float64   `json:"power_consume"`
    GPUUtil      float64   `json:"gpu_util"`
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This structure encapsulates all the essential data points for monitoring GPU health.&lt;/p&gt;

&lt;h3&gt;
  
  
  Temperature Simulation
&lt;/h3&gt;

&lt;p&gt;The service generates realistic temperature patterns using a combination of baseline values, utilization-based adjustments, and random fluctuations:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;func (s *TemperatureSimulator) generateDeviceReading(deviceIndex int, device GPUDevice) TemperatureReading {
    // Decide if this reading should be abnormal
    isAbnormal := s.rng.Float64() &amp;lt; s.abnormalProbability

    // Update GPU utilization with realistic movements
    targetUtil := s.utilization[deviceIndex]
    if s.rng.Float64() &amp;lt; 0.1 {
        // 10% chance of changing target utilization significantly
        targetUtil = s.rng.Float64() * 100
    }

    // Calculate temperature based on utilization and trend
    baseTemp := device.BaseTemp
    utilizationFactor := s.utilization[deviceIndex] / 100 * 40
    temperature := baseTemp + utilizationFactor + s.tempTrend[deviceIndex]

    // Apply abnormal spike if needed
    if isAbnormal {
        temperature += s.rng.Float64() * 100
    }

    return TemperatureReading{
        DeviceID:     device.ID,
        Temperature:  temperature,
        Timestamp:    time.Now(),
        GPUModel:     device.Model,
        HostName:     device.Host,
        PowerConsume: s.utilization[deviceIndex] * 3.5 + temperature/4,
        GPUUtil:      s.utilization[deviceIndex],
    }
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This simulation logic creates realistic temperature patterns that mimic actual GPU behavior, including occasional temperature spikes.&lt;/p&gt;

&lt;h3&gt;
  
  
  Publishing to Redpanda
&lt;/h3&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;// NewRedpandaPublisher creates a new Redpanda publisher
func NewRedpandaPublisher(ctx context.Context, brokers, topic string) (*RedpandaPublisher, error) {
    // Create client options
    opts := []kgo.Opt{
        kgo.SeedBrokers(strings.Split(brokers, ",")...),
        kgo.DefaultProduceTopic(topic),
        kgo.AllowAutoTopicCreation(),
        kgo.ProducerBatchMaxBytes(1024 * 1024), // 1MB
        kgo.ProducerLinger(5 * time.Millisecond),
        kgo.RecordRetries(5),
    }

    // Create client with retry logic
    client, err := kgo.NewClient(opts...)
    if err != nil {
        return nil, fmt.Errorf("failed to create Redpanda client: %w", err)
    }

    return &amp;amp;RedpandaPublisher{
        client: client,
        topic:  topic,
    }, nil
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The actual publishing of readings happens through a simple but effective method:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;// PublishReading publishes a temperature reading to Redpanda
func (p *RedpandaPublisher) PublishReading(ctx context.Context, reading TemperatureReading) error {
    // Marshal reading to JSON
    data, err := json.Marshal(reading)
    if err != nil {
        return fmt.Errorf("failed to marshal reading: %w", err)
    }

    // Create record with metadata
    record := &amp;amp;kgo.Record{
        Key:   []byte(reading.DeviceID),
        Value: data,
        Topic: p.topic,
        Headers: []kgo.RecordHeader{
            {Key: "content-type", Value: []byte("application/json")},
        },
    }

    // Produce record with error handling
    result := p.client.ProduceSync(ctx, record)
    if err := result.FirstErr(); err != nil {
        return fmt.Errorf("failed to produce record: %w", err)
    }

    return nil
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Main Service Loop
&lt;/h3&gt;

&lt;p&gt;The service's main loop ties everything together:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;func main() {
    // Load configuration
    cfg, err := config.Load()
    if err != nil {
        log.Fatalf("Failed to load configuration: %v", err)
    }

    // Create temperature simulator
    sim := simulator.NewTemperatureSimulator(cfg.NumDevices, cfg.AbnormalProbability)

    // Create publisher
    pub, err := publisher.NewRedpandaPublisher(ctx, cfg.RedpandaBrokers, cfg.Topic)
    if err != nil {
        log.Fatalf("Failed to create Redpanda publisher: %v", err)
    }
    defer pub.Close()

    // Set up ticker for regular publishing
    ticker := time.NewTicker(time.Duration(cfg.IntervalMs) * time.Millisecond)
    defer ticker.Stop()

    // Main loop
    for {
        select {
        case &amp;lt;-ticker.C:
            // Generate and publish temperature readings
            readings := sim.GenerateReadings()
            for _, reading := range readings {
                if err := pub.PublishReading(ctx, reading); err != nil {
                    log.Printf("Error publishing reading: %v", err)
                } else if reading.Temperature &amp;gt; 80.0 {
                    log.Printf("Published HIGH TEMP: Device %s - %.2f°C", 
                        reading.DeviceID, reading.Temperature)
                }
            }
        case &amp;lt;-sigChan:
            return // Graceful shutdown
        }
    }
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Configuration via Environment Variables
&lt;/h3&gt;

&lt;p&gt;The service is easily configurable through environment variables:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;func Load() (*Config, error) {
    config := &amp;amp;Config{
        RedpandaBrokers:     getEnv("REDPANDA_BROKERS", "localhost:9092"),
        Topic:               getEnv("REDPANDA_TOPIC", "gpu-temperature"),
        NumDevices:          getEnvAsInt("NUM_DEVICES", 5),
        IntervalMs:          getEnvAsInt("INTERVAL_MS", 1000),
        AbnormalProbability: getEnvAsFloat("ABNORMAL_PROBABILITY", 0.05),
    }
    return config, nil
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This approach makes the service highly configurable while maintaining sensible defaults.&lt;/p&gt;

&lt;h3&gt;
  
  
  Containerization
&lt;/h3&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;FROM golang:1.21-alpine AS builder
WORKDIR /app
COPY . .
RUN CGO_ENABLED=0 GOOS=linux go build -o /gpu-temp-publisher ./cmd/publisher

FROM alpine:3.18
COPY --from=builder /gpu-temp-publisher .
ENTRYPOINT ["/gpu-temp-publisher"]
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;






&lt;h2&gt;
  
  
  Alert Generator Service :: Consumer
&lt;/h2&gt;

&lt;p&gt;&lt;strong&gt;Repository:&lt;/strong&gt; &lt;a href="https://github.com/snepar/gpu-temp-alert" rel="noopener noreferrer"&gt;https://github.com/snepar/gpu-temp-alert&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;The Temperature Alert Service is the vigilant guardian in our GPU monitoring system. This service consumes temperature readings from Redpanda, analyzes them in real-time, and triggers alerts when temperatures reach dangerous levels. Let's explore how this service works with key code snippets.&lt;/p&gt;

&lt;h3&gt;
  
  
  Consumer Implementation
&lt;/h3&gt;

&lt;p&gt;The core of the alert service is the Redpanda consumer that processes the stream of temperature readings:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;// RedpandaConsumer consumes temperature readings from Redpanda
type RedpandaConsumer struct {
    client               *kgo.Client
    topic                string
    temperatureThreshold float64
    notifier             *notifier.SlackNotifier
    seenAlerts           map[string]time.Time
    seenAlertsMutex      sync.Mutex
    alertCooldown        time.Duration
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This structure maintains the Redpanda client connection, configuration values, and a map to track recent alerts to prevent alert storms.&lt;/p&gt;

&lt;h3&gt;
  
  
  Connecting to Redpanda
&lt;/h3&gt;

&lt;p&gt;The consumer establishes a connection to Redpanda with careful error handling and retry logic:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;func NewRedpandaConsumer(ctx context.Context, brokers, topic, consumerGroup string, 
                        temperatureThreshold float64, notifier *notifier.SlackNotifier) (*RedpandaConsumer, error) {

    // Create Redpanda client options
    opts := []kgo.Opt{
        kgo.SeedBrokers(strings.Split(brokers, ",")...),
        kgo.ConsumerGroup(consumerGroup),
        kgo.ConsumeTopics(topic),
        kgo.ConsumeResetOffset(kgo.NewOffset().AtEnd()), // Start from newest messages
        kgo.DisableAutoCommit(),                         // Manual commit for better control
        kgo.SessionTimeout(30 * time.Second),
    }

    // Create client with retry logic
    client, err := kgo.NewClient(opts...)
    if err != nil {
        return nil, fmt.Errorf("failed to create Redpanda client: %w", err)
    }

    return &amp;amp;RedpandaConsumer{
        client:               client,
        topic:                topic,
        temperatureThreshold: temperatureThreshold,
        notifier:             notifier,
        seenAlerts:           make(map[string]time.Time),
        alertCooldown:        5 * time.Minute, // Prevent alert storms
    }, nil
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Processing Temperature Readings
&lt;/h3&gt;

&lt;p&gt;The service continuously polls for new temperature readings and processes them:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;func (c *RedpandaConsumer) Start(ctx context.Context) error {
    log.Printf("Starting to consume from topic %s", c.topic)
    log.Printf("Monitoring for temperatures above %.2f°C", c.temperatureThreshold)

    for {
        select {
        case &amp;lt;-ctx.Done():
            return nil // Graceful shutdown

        default:
            // Poll for messages
            fetches := c.client.PollFetches(ctx)
            if fetches.IsClientClosed() {
                return fmt.Errorf("client closed")
            }

            // Process each record
            fetches.EachRecord(func(record *kgo.Record) {
                c.processRecord(ctx, record)
            })

            // Commit offsets
            if err := c.client.CommitUncommittedOffsets(ctx); err != nil {
                log.Printf("Error committing offsets: %v", err)
            }
        }
    }
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Temperature Threshold Detection
&lt;/h3&gt;

&lt;p&gt;The core logic for detecting high temperatures and triggering alerts:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;func (c *RedpandaConsumer) processRecord(ctx context.Context, record *kgo.Record) {
    // Parse the temperature reading
    var reading model.TemperatureReading
    if err := json.Unmarshal(record.Value, &amp;amp;reading); err != nil {
        log.Printf("Error parsing record: %v", err)
        return
    }

    // Check if temperature exceeds threshold
    if reading.Temperature &amp;gt; c.temperatureThreshold {
        // Check if we've already alerted for this device recently
        if c.shouldSendAlert(reading.DeviceID) {
            log.Printf("HIGH TEMPERATURE ALERT: Device %s - %.2f°C exceeds threshold of %.2f°C",
                reading.DeviceID, reading.Temperature, c.temperatureThreshold)

            // Create alert event
            alert := &amp;amp;model.AlertEvent{
                DeviceID:            reading.DeviceID,
                Temperature:         reading.Temperature,
                Timestamp:           reading.Timestamp,
                TemperatureThreshold: c.temperatureThreshold,
                GPUModel:            reading.GPUModel,
                HostName:            reading.HostName,
                PowerConsume:        reading.PowerConsume,
                GPUUtil:             reading.GPUUtil,
            }

            // Send notification to Slack
            if err := c.notifier.SendTemperatureAlert(ctx, alert); err != nil {
                log.Printf("Failed to send alert to Slack: %v", err)
            }
        }
    }
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Alert Cooldown Mechanism
&lt;/h3&gt;

&lt;p&gt;To prevent alert storms, the service implements a cooldown period for  each device:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;func (c *RedpandaConsumer) shouldSendAlert(deviceID string) bool {
    c.seenAlertsMutex.Lock()
    defer c.seenAlertsMutex.Unlock()

    now := time.Now()
    if lastAlerted, exists := c.seenAlerts[deviceID]; exists {
        // If last alert was less than cooldown period ago, don't alert
        if now.Sub(lastAlerted) &amp;lt; c.alertCooldown {
            return false
        }
    }

    // Update last alerted time
    c.seenAlerts[deviceID] = now
    return true
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This prevents notification fatigue by limiting alerts for the same device to a reasonable frequency.&lt;/p&gt;

&lt;h3&gt;
  
  
  Slack Integration
&lt;/h3&gt;

&lt;p&gt;The service integrates with Slack to deliver immediate alerts when GPUs are overheating:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;func (s *SlackNotifier) SendTemperatureAlert(ctx context.Context, alert *model.AlertEvent) error {
    // Determine color based on severity
    color := "#FF0000" // Default to red
    if alert.Temperature &amp;lt; alert.TemperatureThreshold+20 {
        color = "#FFA500" // Orange for less severe
    }

    // Create attachment
    attachment := Attachment{
        Fallback:  fmt.Sprintf("GPU IS MELTING: %.2f°C on %s", alert.Temperature, alert.DeviceID),
        Color:     color,
        Title:     "🔥 GPU IS MELTING 🔥",
        Text:      fmt.Sprintf("Temperature of *%.2f°C* detected, exceeding threshold of *%.2f°C*", 
                   alert.Temperature, alert.TemperatureThreshold),
        Timestamp: alert.Timestamp.Unix(),
        Fields: []Field{
            {
                Title: "Device ID",
                Value: alert.DeviceID,
                Short: true,
            },
            {
                Title: "Temperature",
                Value: fmt.Sprintf("%.2f°C", alert.Temperature),
                Short: true,
            },
            // Additional fields omitted for brevity
        },
    }

    slackMsg := SlackMessage{
        Channel:     s.channel,
        Username:    "GPU Temperature Monitor",
        IconEmoji:   ":fire:",
        Attachments: []Attachment{attachment},
    }

    return s.sendMessage(ctx, slackMsg)
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This creates visually striking alerts that draw immediate attention to overheating GPUs.&lt;/p&gt;

&lt;h3&gt;
  
  
  Main Service Initialization
&lt;/h3&gt;

&lt;p&gt;The service's main function handles setup and graceful shutdown:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;func main() {
    // Load configuration
    cfg, err := config.Load()
    if err != nil {
        log.Fatalf("Failed to load configuration: %v", err)
    }

    // Create context that can be cancelled
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // Create Slack notifier
    slack, err := notifier.NewSlackNotifier(cfg.SlackWebhookURL, cfg.SlackChannel)
    if err != nil {
        log.Fatalf("Failed to create Slack notifier: %v", err)
    }

    // Create consumer
    cons, err := consumer.NewRedpandaConsumer(
        ctx, 
        cfg.RedpandaBrokers, 
        cfg.SourceTopic, 
        cfg.ConsumerGroup,
        cfg.TemperatureThreshold,
        slack,
    )
    if err != nil {
        log.Fatalf("Failed to create Redpanda consumer: %v", err)
    }
    defer cons.Close()

    // Start consuming in a separate goroutine
    go func() {
        if err := cons.Start(ctx); err != nil {
            log.Printf("Consumer error: %v, shutting down", err)
            cancel()
        }
    }()

    // Wait for shutdown signal
    &amp;lt;-sigChan
    log.Println("Shutting down gracefully")
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Environment-Based Configuration
&lt;/h3&gt;

&lt;p&gt;Like the publisher service, the alert service uses environment variables for configuration:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;func Load() (*Config, error) {
    config := &amp;amp;Config{
        RedpandaBrokers:      getEnv("REDPANDA_BROKERS", "localhost:9092"),
        SourceTopic:          getEnv("SOURCE_TOPIC", "gpu-temperature"),
        ConsumerGroup:        getEnv("CONSUMER_GROUP", "gpu-temp-alert-group"),
        TemperatureThreshold: getEnvAsFloat("TEMPERATURE_THRESHOLD", 190.0),
        SlackWebhookURL:      getEnv("SLACK_WEBHOOK_URL", ""),
        SlackChannel:         getEnv("SLACK_CHANNEL", "#alerts"),
    }

    // Validation omitted for brevity
    return config, nil
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The Temperature Alert Service exemplifies how Go's strong concurrency model and straightforward error handling make it ideal for building reliable monitoring systems. By consuming temperature data from Redpanda and triggering immediate alerts when thresholds are exceeded, this service provides the critical "last mile" in preventing GPU damage.&lt;br&gt;
The combination of efficient stream processing, intelligent alert suppression, and actionable notifications creates a monitoring solution that can help prevent the scenario that Sam Altman humorously tweeted about - truly melting GPUs.&lt;/p&gt;


&lt;h2&gt;
  
  
  Implementation Guide
&lt;/h2&gt;

&lt;p&gt;This guide will walk you through setting up and running both services of our GPU temperature monitoring system. Follow these steps to deploy the publisher for generating temperature readings and the alert service for monitoring critical temperatures.&lt;/p&gt;
&lt;h3&gt;
  
  
  Prerequisites
&lt;/h3&gt;

&lt;ul&gt;
&lt;li&gt;Docker and Docker Compose installed&lt;/li&gt;
&lt;li&gt;A Slack workspace with webhook URL configured&lt;/li&gt;
&lt;/ul&gt;
&lt;h3&gt;
  
  
  Build and Run the Publisher
&lt;/h3&gt;


&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;docker-compose up -d
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;


&lt;p&gt;This will start the Redpanda cluster and the publisher service.&lt;/p&gt;
&lt;h3&gt;
  
  
  Monitor the Redpanda Console
&lt;/h3&gt;

&lt;p&gt;Open your browser and navigate to &lt;code&gt;http://localhost:8080&lt;/code&gt; to access the Redpanda Console.&lt;/p&gt;
&lt;h3&gt;
  
  
  Verify Publisher is Running
&lt;/h3&gt;


&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;docker logs -f gpu-temp-publisher
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;


&lt;p&gt;You should see output similar to this:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F5i6me8zvp0trsa0aeikz.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F5i6me8zvp0trsa0aeikz.png" alt="logs-gpu-temp-publisher" width="800" height="613"&gt;&lt;/a&gt;&lt;/p&gt;
&lt;h3&gt;
  
  
  Build and Run the Alert Consumer
&lt;/h3&gt;


&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;docker-compose up -d
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;h3&gt;
  
  
  Verify Alert Service is Running
&lt;/h3&gt;


&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;docker logs -f gpu-temp-alert
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;


&lt;p&gt;You should see output similar to this:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F4y016kr1vhst3jc6rp5y.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F4y016kr1vhst3jc6rp5y.png" alt="logs-gpu-temp-alert" width="800" height="191"&gt;&lt;/a&gt;&lt;/p&gt;
&lt;h3&gt;
  
  
  Update Slack Webhook URL
&lt;/h3&gt;

&lt;p&gt;Edit docker-compose.yml and replace YOUR_WEBHOOK_URL with your actual Slack webhook URL:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;- SLACK_WEBHOOK_URL=https://hooks.slack.com/services/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Check Slack for Alerts
&lt;/h3&gt;

&lt;p&gt;When a temperature exceeds the threshold, you should see an alert in your configured Slack channel:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fjij02g82holjzb4d64oz.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fjij02g82holjzb4d64oz.png" alt="Slack" width="800" height="385"&gt;&lt;/a&gt;&lt;/p&gt;




&lt;p&gt;This GPU temperature monitoring system, while inspired by Sam Altman's humorous "our GPUs are melting" tweet, is primarily a hypothetical scenario designed to demonstrate how Go and Kafka-compatible messaging systems like Redpanda can be effectively combined to build real-time event processing pipelines. Through this example, we've explored fundamental patterns in event-driven architecture, stream processing, and alerting that can be applied to many real-world monitoring challenges, all while learning practical Go programming techniques.&lt;/p&gt;

</description>
      <category>go</category>
      <category>kafka</category>
      <category>eventdriven</category>
      <category>dataengineering</category>
    </item>
    <item>
      <title>Data Engineer as a Real-Time Algo Trader – Turning Pipelines into Profit (or at Least Trying)!</title>
      <dc:creator>SNEHASISH DUTTA</dc:creator>
      <pubDate>Mon, 09 Dec 2024 09:25:59 +0000</pubDate>
      <link>https://forem.com/datasosneh/data-engineer-as-a-real-time-algo-trader-turning-pipelines-into-profit-or-at-least-trying-3phl</link>
      <guid>https://forem.com/datasosneh/data-engineer-as-a-real-time-algo-trader-turning-pipelines-into-profit-or-at-least-trying-3phl</guid>
      <description>&lt;p&gt;In an era where data drives decisions, this project explores the intersection of trading and real-time analytics. &lt;/p&gt;

&lt;p&gt;Using Alpaca’s paper trading API, transactional data is streamed into Redpanda and analyzed with Apache Flink to extract actionable insights. &lt;br&gt;
Sentiment analysis powers buy and sell signals, seamlessly delivered via Slack, creating a streamlined and responsive trading workflow.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;GitHub&lt;/strong&gt; :: &lt;a href="https://github.com/snepar/flink-algo-trading" rel="noopener noreferrer"&gt;https://github.com/snepar/flink-algo-trading&lt;/a&gt;&lt;/p&gt;
&lt;h2&gt;
  
  
  Data Flow Diagram (Architecture)
&lt;/h2&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fm616qi08wgh74a6f3u13.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fm616qi08wgh74a6f3u13.png" alt="Architecture" width="800" height="283"&gt;&lt;/a&gt;&lt;/p&gt;
&lt;h2&gt;
  
  
  Introducing the Building Blocks
&lt;/h2&gt;

&lt;p&gt;This concisely conveys the purpose of the following components.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Alpaca ::&lt;/strong&gt; Alpaca Trading APIs offer commission-free, programmatic access to U.S. stock and ETF trading through a modern REST interface. The platform stands out for its developer-friendly paper trading environment, which allows risk-free testing of trading strategies using real market data. With comprehensive SDKs in multiple languages (Python, JavaScript, Go), real-time WebSocket data streams, and support for various order types (market, limit, stop), Alpaca enables developers to quickly build and test algorithmic trading systems. Whether you're developing a trading strategy or building a full-scale automated trading platform, Alpaca's combination of zero-cost paper trading and production-ready infrastructure makes it an ideal choice for both learning and deployment.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;VADER ::&lt;/strong&gt;  (Valence Aware Dictionary and sEntiment Reasoner) is a powerful sentiment analysis tool specifically designed for social media text. It's part of the NLTK library and uses a combination of lexical features and rule-based analysis to assess text sentiment.&lt;br&gt;
Key Features:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Pre-built sentiment lexicon with emotion-word ratings&lt;/li&gt;
&lt;li&gt;Handles informal language, emojis, and social media context&lt;/li&gt;
&lt;li&gt;Considers punctuation and capitalization for emphasis&lt;/li&gt;
&lt;li&gt;Returns scores for positive, negative, neutral, and compound sentiment&lt;/li&gt;
&lt;li&gt;&lt;strong&gt;No training required (rule-based approach)&lt;/strong&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;strong&gt;Advantages:&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;Social Media Optimized:&lt;/strong&gt; Accurately handles slang, emoticons, and informal language common in social media posts&lt;/li&gt;
&lt;li&gt;Fast Processing: Being rule-based, it's computationally efficient and doesn't require model training&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Context Sensitive:&lt;/strong&gt; Understands sentiment intensifiers, contractions, and negations&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Multiple Scores:&lt;/strong&gt; Provides granular sentiment analysis with separate scores for different sentiments&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Easy Integration:&lt;/strong&gt; Simple to use with just a few lines of code&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Domain Adaptable:&lt;/strong&gt; Works well across various domains, from social media to product reviews&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;The compound score (-1 to +1) makes it particularly useful for quick sentiment classification, while the individual positive, negative, and neutral scores provide deeper insight into the text's emotional content.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Redpanda ::&lt;/strong&gt; Redpanda is a modern streaming data platform designed as a Kafka API-compatible alternative with significantly improved performance and simplified operations. It's written in C++ and provides a zero-copy architecture, eliminating the need for a separate JVM, Zookeeper, or replication controller. Redpanda offers real-time data streaming with sub-millisecond latency, making it ideal for high-throughput scenarios like algorithmic trading. The platform stands out for its self-tuning capabilities, transparent data replication, and ability to handle millions of events per second while maintaining data consistency.&lt;br&gt;
Key advantages include:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Kafka API compatibility without configuration overhead&lt;/li&gt;
&lt;li&gt;Single binary deployment with no external dependencies&lt;/li&gt;
&lt;li&gt;Hardware optimized performance with lower resource consumption&lt;/li&gt;
&lt;li&gt;Built-in disaster recovery and data durability&lt;/li&gt;
&lt;li&gt;Simple scaling and maintenance without complex configurations&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;strong&gt;Flink-SQL ::&lt;/strong&gt; Flink SQL is a powerful query interface in Apache Flink that enables real-time stream processing and analytics using standard SQL syntax. It allows developers to write SQL queries that can analyze both streaming and batch data without changing the underlying code. What sets Flink SQL apart is its ability to handle continuous queries over streaming data, with support for event time processing, windowing operations, and complex event pattern matching.&lt;br&gt;
Key features include:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Real-time continuous querying&lt;/li&gt;
&lt;li&gt;Advanced window operations (sliding, tumbling, session)&lt;/li&gt;
&lt;li&gt;Stream-table joins and temporal table joins&lt;/li&gt;
&lt;li&gt;Pattern detection using MATCH_RECOGNIZE&lt;/li&gt;
&lt;li&gt;Built-in connectors for various data sources/sinks&lt;/li&gt;
&lt;li&gt;Dynamic table concepts for stream processing&lt;/li&gt;
&lt;li&gt;Support for user-defined functions (UDFs)&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;strong&gt;Py-Flink ::&lt;/strong&gt; PyFlink is Python's API for Apache Flink, offering a powerful stream processing framework with the accessibility of Python. It enables developers to write scalable stream processing applications using familiar Python syntax while leveraging Flink's robust distributed computing capabilities. PyFlink supports both the DataStream API for low-level stream processing and the Table API/SQL for declarative data analytics, making it particularly valuable for real-time data analysis and complex event processing.&lt;br&gt;
Key features include:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Native Python UDFs (User Defined Functions)&lt;/li&gt;
&lt;li&gt;Seamless integration with Python data science libraries&lt;/li&gt;
&lt;li&gt;Support for both batch and stream processing&lt;/li&gt;
&lt;li&gt;Real-time data analytics using SQL&lt;/li&gt;
&lt;li&gt;Stateful stream processing capabilities&lt;/li&gt;
&lt;li&gt;Event-time processing and windowing operations&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;The combination of Python's ease of use with Flink's performance makes PyFlink an excellent choice for building real-time data pipelines and streaming analytics applications.&lt;/p&gt;
&lt;h2&gt;
  
  
  Goal
&lt;/h2&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;Setup Paper Trading:&lt;/strong&gt; using Alpaca&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Data Ingestion:&lt;/strong&gt; using Kafka APIs into RedPanda&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Streaming Extract Transform and Aggregate:&lt;/strong&gt; using Flink SQL &lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Generate Trade Signals :&lt;/strong&gt; using Flink Source APIs to &lt;strong&gt;Slack&lt;/strong&gt;
&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Backtest :&lt;/strong&gt; Algorithmic Trading Strategies&lt;/li&gt;
&lt;/ul&gt;
&lt;h2&gt;
  
  
  Let Us Begin
&lt;/h2&gt;
&lt;h2&gt;
  
  
  &amp;gt;&amp;gt; Paper-Trading With Alpaca
&lt;/h2&gt;

&lt;p&gt;Sign Up for Trading API&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fufn4w8cuy3xdw0owzkfh.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fufn4w8cuy3xdw0owzkfh.png" alt="AlpacaLogin" width="800" height="265"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Copy The API Keys&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F9n2ryktqvzo8jjcomriv.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F9n2ryktqvzo8jjcomriv.png" alt="AlpacaKeys" width="565" height="352"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Configure the keys in your Python Project&lt;/p&gt;

&lt;p&gt;&lt;code&gt;pip install alpaca_trade_api&lt;/code&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;config = {
    'key_id': ' API KEY ',
    'secret_key': ' SECRET KEY ',
    'redpanda_brokers': 'localhost:9092,localhost:9093',
    'base_url': 'https://data.alpaca.markets/v1beta1/',
    'trade_api_base_url': 'https://paper-api.alpaca.markets/v2',
    'slack_token': '',
    'slack_channel_id': ''
} 
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;GitHub&lt;/strong&gt; ::  &lt;a href="https://github.com/snepar/flink-algo-trading/blob/main/alpaca_config/keys.py" rel="noopener noreferrer"&gt;https://github.com/snepar/flink-algo-trading/blob/main/alpaca_config/keys.py&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  &amp;gt;&amp;gt; Infrastructure With Docker
&lt;/h2&gt;

&lt;p&gt;Configure &lt;code&gt;docker-compose.yml&lt;/code&gt;&lt;br&gt;
And &lt;code&gt;Dockerfile-sql&lt;/code&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;services:
  redpanda-1:
...
  redpanda-console:
      ports:
        - 8080:8080
# Flink cluster
  jobmanager:
    container_name: jobmanager
    build:
      context: .
      dockerfile: Dockerfile-sql  
      ports:
        - 8081:8081
   sql-client:
     container_name: sql-client
     build:
      context: .
      dockerfile: Dockerfile-sql
     command:
       - /opt/flink/bin/sql-client.sh
...

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Execute Command &lt;code&gt;docker compose up -d --build&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Check the Status of RedPanda&lt;/strong&gt;&lt;br&gt;
&lt;code&gt;http://localhost:8080/overview&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fj2q4zerz4y1dw7nulfi8.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fj2q4zerz4y1dw7nulfi8.png" alt="redpanda-overview" width="800" height="352"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Check the Status of  Flink SQL&lt;/strong&gt; &lt;br&gt;
&lt;code&gt;http://localhost:8081/#/overview&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F3uziphhuuxnewacgoenk.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F3uziphhuuxnewacgoenk.png" alt="flink-overview" width="800" height="256"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Test Flink-SQL Client&lt;/strong&gt;&lt;br&gt;
Execute Command &lt;code&gt;docker compose run sql-client&lt;/code&gt;&lt;br&gt;
Execute Test Query &lt;code&gt;Flink SQL&amp;gt; select 'hello world';&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F7ejyx7gp05hm1d1qtm4y.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F7ejyx7gp05hm1d1qtm4y.png" alt="flink-sql-hw" width="800" height="234"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;GitHub&lt;/strong&gt; :: &lt;a href="https://github.com/snepar/flink-algo-trading/blob/main/docker-compose.yml" rel="noopener noreferrer"&gt;https://github.com/snepar/flink-algo-trading/blob/main/docker-compose.yml&lt;/a&gt;&lt;/p&gt;
&lt;h2&gt;
  
  
  &amp;gt;&amp;gt; The Redpanda Producer With Sentiment Analysis from Past News Headlines
&lt;/h2&gt;

&lt;p&gt;&lt;strong&gt;Install NLTK Libraries&lt;/strong&gt;&lt;br&gt;
&lt;code&gt;pip install nltk&lt;/code&gt;&lt;br&gt;
Or Alternatively &lt;code&gt;python -m nltk.downloader vader_lexicon&lt;/code&gt;&lt;br&gt;
&lt;strong&gt;Or skip ssl and run&lt;/strong&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;import nltk
import ssl

try:
  _create_unverified_https_context = ssl._create_unverified_context
except AttributeError:
  pass

else:
  ssl._create_default_https_context = _create_unverified_https_context

nltk.download()
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Select The Appropriate Model from the list&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F8hiy5jh0fiudj1qh50yv.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F8hiy5jh0fiudj1qh50yv.png" alt="vader-downloader" width="712" height="418"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Define the sentiment analyser function&lt;/strong&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;from nltk.sentiment.vader import SentimentIntensityAnalyzer as SIA

sia = SIA()


def get_sentiment(text):
    scores = sia.polarity_scores(text)
    return scores['compound']
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Install Kafka Libraries to Produce Data in RedPanda&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;&lt;code&gt;pip install kafka-python requests pandas&lt;/code&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;def get_producer(brokers: List[str]):
    producer = KafkaProducer(
        bootstrap_servers=brokers,
        key_serializer=str.encode,
        value_serializer=lambda v: json.dumps(v).encode('utf-8')
    )
    return producer
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Define the Kafka Producer for fetching Historical News From Alpaca and Publish to a Topic in RedPanda of a certain date range&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;def produce_historical_news(
        redpanda_client: KafkaProducer,
        start_date: str,
        end_date: str,
        symbols: List[str],
        topic: str
    ):
    key_id = config['key_id']
    secret_key = config['secret_key']
    base_url = config['base_url']

    api = REST(key_id=key_id, secret_key=secret_key, base_url=URL(base_url)) ... 
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;GitHub&lt;/strong&gt; :: &lt;a href="https://github.com/snepar/flink-algo-trading/blob/main/news-producer.py" rel="noopener noreferrer"&gt;https://github.com/snepar/flink-algo-trading/blob/main/news-producer.py&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Invoke the Sentiment Function to calculate Sentiment Score Based on the News Headline&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;article = row._raw
            should_proceed = any(term in article['headline'] for term in symbols)
            if not should_proceed:
                continue
article['sentiment'] = get_sentiment(article['headline'])
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Publish to RedPanda Topic &lt;code&gt;market-news&lt;/code&gt; For the Company Name &lt;code&gt;Apple / AAPL&lt;/code&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;produce_historical_news(
        get_producer(config['redpanda_brokers']),
        topic='market-news',
        start_date='2024-01-01',
        end_date='2024-12-08',
        symbols=['AAPL', 'Apple']
    )
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Check the Topic &lt;code&gt;market-news&lt;/code&gt; in RedPanda UI at &lt;code&gt;http://localhost:8080/topics/market-news&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F0kii7nx1dleexrec2tti.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F0kii7nx1dleexrec2tti.png" alt="histnews" width="800" height="432"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;GitHub&lt;/strong&gt; :: &lt;a href="https://github.com/snepar/flink-algo-trading/blob/main/news-producer.py" rel="noopener noreferrer"&gt;https://github.com/snepar/flink-algo-trading/blob/main/news-producer.py&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  &amp;gt;&amp;gt; The Redpanda Producer for Historical and Real-Time Stock Price Changes
&lt;/h2&gt;

&lt;p&gt;Use the Alpaca &lt;code&gt;StockBarsRequest&lt;/code&gt; API&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;start_date = datetime.strptime(start_date, '%Y-%m-%d')
end_date = datetime.strptime(end_date, '%Y-%m-%d')
granularity = TimeFrame.Minute

request_params = StockBarsRequest(
        symbol_or_symbols=symbol,
        timeframe=granularity,
        start=start_date,
        end=end_date)

prices_df = api.get_stock_bars(request_params).df
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;GitHub&lt;/strong&gt; :: &lt;a href="https://github.com/snepar/flink-algo-trading/blob/main/prices-producer.py" rel="noopener noreferrer"&gt;https://github.com/snepar/flink-algo-trading/blob/main/prices-producer.py&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Fetch the informations as follows in Response it also includes the Volume Weighted Average Price&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;{
  "symbol":"AAPL"
  "timestamp":1707246180000
  "open":188.7199
  "high":188.74
  "low":188.585
  "close":188.64
  "volume":108418
  "trade_count":1313
  "vwap":188.660311
  "provider":"alpaca"
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Check the Topic &lt;code&gt;stock-prices&lt;/code&gt; in RedPanda UI at &lt;code&gt;http://localhost:8080/topics/stock-prices&lt;/code&gt; for key &lt;code&gt;AAPL&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Flmyhi8s8o7h0j3iyu3k6.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Flmyhi8s8o7h0j3iyu3k6.png" alt="pricehist" width="800" height="356"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  &amp;gt;&amp;gt; Flink SQL Based Table Creation on RedPanda Topics
&lt;/h2&gt;

&lt;p&gt;&lt;strong&gt;Market News Table&lt;/strong&gt;&lt;br&gt;
This table captures financial news data from Kafka with sentiment analysis:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Stores news articles with metadata (author, headline, source)&lt;/li&gt;
&lt;li&gt;Includes sentiment scores for each news item&lt;/li&gt;
&lt;li&gt;Uses event time processing with 5-second watermark&lt;/li&gt;
&lt;li&gt;Connects to Redpanda topic &lt;code&gt;market-news&lt;/code&gt; for data streaming
&lt;/li&gt;
&lt;/ul&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;CREATE OR REPLACE TABLE market_news (
    id BIGINT,
    author VARCHAR,
    headline VARCHAR,
    source VARCHAR,
    summary VARCHAR,
    data_provider VARCHAR,
    `url` VARCHAR,
    symbol VARCHAR,
    sentiment DECIMAL,
    timestamp_ms BIGINT,
    event_time AS TO_TIMESTAMP_LTZ(timestamp_ms, 3),
    WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
    'connector' = 'kafka',
    'topic' = 'market-news',
    'properties.bootstrap.servers' = 'redpanda-1:29092,redpanda-2:29092',
    'properties.group.id' = 'test-group',
    'properties.auto.offset.reset' = 'earliest',
    'format' = 'json'
);
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;


&lt;p&gt;&lt;strong&gt;Stock Prices Table&lt;/strong&gt;&lt;br&gt;
Captures real-time stock price data&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Stores OHLCV (Open, High, Low, Close, Volume) data&lt;/li&gt;
&lt;li&gt;Includes additional metrics like VWAP and trade count&lt;/li&gt;
&lt;li&gt;Uses event time processing with 5-second watermark&lt;/li&gt;
&lt;li&gt;Streams data from a separate RedPanda topic &lt;code&gt;stock-prices&lt;/code&gt;
&lt;/li&gt;
&lt;/ul&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;CREATE OR REPLACE TABLE stock_prices (
    symbol VARCHAR,
    `open` FLOAT,
    high FLOAT,
    low FLOAT,
    `close` FLOAT,
    volume DECIMAL,
    trade_count FLOAT,
    vwap DECIMAL,
    provider VARCHAR,
    `timestamp` BIGINT,
    event_time AS TO_TIMESTAMP_LTZ(`timestamp`, 3),
    WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
    'connector' = 'kafka',
    'topic' = 'stock-prices',
    'properties.bootstrap.servers' = 'redpanda-1:29092,redpanda-2:29093',
    'properties.group.id' = 'test-group',
    'properties.auto.offset.reset' = 'earliest',
    'format' = 'json'
);
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;h2&gt;
  
  
  &amp;gt;&amp;gt; Real-Time Aggregation Using Flink-SQL and Simple Moving Average (Algorithm)
&lt;/h2&gt;

&lt;p&gt;Moving Average Views (sma_20 and sma_50)&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Creates 20-period and 50-period simple moving averages&lt;/li&gt;
&lt;li&gt;Uses window functions for continuous calculation&lt;/li&gt;
&lt;li&gt;Partitions by symbol for multiple stock analysis&lt;/li&gt;
&lt;li&gt;Maintains temporal order using event_time
&lt;/li&gt;
&lt;/ul&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;CREATE OR REPLACE VIEW sma_20 AS
SELECT symbol, `close`, event_time,
    AVG(`close`) OVER (PARTITION BY symbol ORDER BY event_time ROWS BETWEEN 19 PRECEDING AND CURRENT ROW) AS sma_20
FROM stock_prices;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;CREATE OR REPLACE VIEW sma_50 AS
SELECT
    symbol,
    `close`,
    event_time,
    AVG(`close`) OVER (PARTITION BY symbol ORDER BY event_time ROWS BETWEEN 49 PRECEDING AND CURRENT ROW) AS sma_50
FROM stock_prices;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;


&lt;p&gt;&lt;strong&gt;Price with Moving Averages View&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Combines both SMAs (20 and 50 period)&lt;/li&gt;
&lt;li&gt;Joins the moving averages on symbol and event_time&lt;/li&gt;
&lt;li&gt;Provides a consolidated view of price and technical indicators
&lt;/li&gt;
&lt;/ul&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;CREATE OR REPLACE VIEW price_with_movavg AS
SELECT
    s20.symbol,
    s20.`close`,
    s20.event_time,
    s20.sma_20,
    s50.sma_50
FROM sma_20 s20
JOIN sma_50 s50
    ON s20.symbol = s50.symbol AND s20.event_time = s50.event_time;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;


&lt;p&gt;&lt;strong&gt;News and Prices View&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Correlates news events with price movements&lt;/li&gt;
&lt;li&gt;Uses a 2-minute window (±1 minute) to match news with prices&lt;/li&gt;
&lt;li&gt;Combines sentiment data with technical indicators&lt;/li&gt;
&lt;li&gt;Enables analysis of news impact on price
&lt;/li&gt;
&lt;/ul&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;CREATE OR REPLACE VIEW news_and_prices AS
SELECT
    n.symbol,
    n.headline,
    n.sentiment,
    p.`close`,
    p.sma_20,
    p.sma_50,
    n.event_time AS news_time,
    p.event_time AS price_time
FROM market_news n
JOIN price_with_movavg p
    ON n.symbol = p.symbol
    AND n.event_time BETWEEN p.event_time - INTERVAL '1' MINUTE AND p.event_time + INTERVAL '1' MINUTE;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;


&lt;p&gt;&lt;strong&gt;Trading Signals View&lt;/strong&gt;&lt;br&gt;
Implements the trading strategy:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;BUY Signal Conditions:&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Positive sentiment (sentiment &amp;gt; 0)&lt;/li&gt;
&lt;li&gt;Price crosses below SMA20 (current &amp;lt; SMA20 &amp;amp;&amp;amp; previous &amp;gt;= SMA20)&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;strong&gt;SELL Signal Conditions:&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Negative sentiment (sentiment &amp;lt; 0)&lt;/li&gt;
&lt;li&gt;Price crosses above SMA20 (current &amp;gt; SMA20 &amp;amp;&amp;amp; previous &amp;lt;= SMA20)&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Uses LAG function for price crossover detection&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;CREATE OR REPLACE VIEW trading_signals AS
SELECT
    symbol,
    news_time,
    `close`,
    1 as quantity,
    CASE
        WHEN sentiment &amp;gt; 0 AND `close` &amp;lt; sma_20 AND lag(`close`, 1) OVER (PARTITION BY symbol ORDER BY news_time) &amp;gt;= sma_20 THEN 'BUY'
        WHEN sentiment &amp;lt; 0 AND `close` &amp;gt; sma_20 AND lag(`close`, 1) OVER (PARTITION BY symbol ORDER BY news_time) &amp;lt;= sma_20 THEN 'SELL'
        ELSE NULL
    END AS signal
FROM news_and_prices;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  &amp;gt;&amp;gt; Publish Trading Signals to a Topic in RedPanda using Flink-SQL
&lt;/h2&gt;

&lt;p&gt;Create a topic in RedPanda &lt;code&gt;trading-signals&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F3c8kkwhxghgwi2ofepss.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F3c8kkwhxghgwi2ofepss.png" alt="createtopic" width="800" height="381"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Create a Table using Flink SQL&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;CREATE OR REPLACE TABLE trading_signals_sink (
    symbol STRING,
    signal_time TIMESTAMP_LTZ,
    signal STRING
) WITH (
    'connector' = 'kafka',
    'topic' = 'trading-signals',
    'properties.bootstrap.servers' = 'redpanda-1:29092, redpanda-2:29092',
    'properties.group.id' = 'test-group',
    'format' = 'json'
);
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Start Publishing Trade Signals to the topic&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;INSERT INTO trading_signals_sink
SELECT symbol, news_time, signal
FROM trading_signals
WHERE signal IS NOT NULL;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Verify Data from RedPanda UI&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fznka268cbpihdwxtcdyo.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fznka268cbpihdwxtcdyo.png" alt="trade-signal-topic" width="800" height="537"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  &amp;gt;&amp;gt; PyFlink to Consume Trade Signals
&lt;/h2&gt;

&lt;p&gt;Install Apache Flink Library &lt;code&gt;pip install apache-flink&lt;/code&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Create Flink Kafka Consumer Group&lt;/li&gt;
&lt;li&gt;Add Relevant JAVA JARs
&lt;/li&gt;
&lt;/ul&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(4)
    env.add_jars('&amp;lt;&amp;lt;location to&amp;gt;&amp;gt;/flink-sql-connector-kafka-3.1.0-1.18.jar')

    kafka_consumer_properties = {
        'bootstrap.servers': 'localhost:9092,localhost:9093',
        'group.id': 'news_trading_consumer_group',
        'auto.offset.reset': 'earliest'
    }

    kafka_consumer = FlinkKafkaConsumer(
        topics='trading-signals',
        deserialization_schema=SimpleStringSchema(),
        properties=kafka_consumer_properties
    )

kafka_stream = env.add_source(kafka_consumer, type_info=Types.STRING())
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;ul&gt;
&lt;li&gt;Process The Event
&lt;/li&gt;
&lt;/ul&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;message_dict = json.loads(message)
        symbol = message_dict.get('symbol', 'N/A')
        signal_time = message_dict.get('signal_time', 'N/A')
        signal = message_dict.get('signal', 'N/A')

        formatted_message = """
        =============================
        ALERT ⚠️ New Trading Signal!\n
        Symbol: {symbol} \n
        Signal: {signal} \n
        Time: {signal_time}
        =============================
        """.format(
            symbol=symbol,
            signal=signal,
            signal_time=signal_time
        )
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;GitHub&lt;/strong&gt; :: &lt;a href="https://github.com/snepar/flink-algo-trading/blob/main/signal_handler.py" rel="noopener noreferrer"&gt;https://github.com/snepar/flink-algo-trading/blob/main/signal_handler.py&lt;/a&gt; &lt;/p&gt;

&lt;h2&gt;
  
  
  &amp;gt;&amp;gt; Slack to Post Alerts
&lt;/h2&gt;

&lt;p&gt;Configure A Slack Channel with a BOT to Publish Message &lt;code&gt;OAuth Scope = chat:write&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fjhnjbmp2dqu0mg5cr2oi.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fjhnjbmp2dqu0mg5cr2oi.png" alt="SlackSettings" width="720" height="687"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Start Pushing Alert Messages to This Channel&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;def send_to_slack(message, token, channel_id):
    url = 'https://slack.com/api/chat.postMessage'
    headers = {
        'Content-Type': 'application/json',
        'Authorization': f'Bearer {token}'
    }

    data = {
        'channel': channel_id,
        'text': message
    }

    response = requests.post(url, headers=headers, json=data)

    if response.status_code != 200:
        raise ValueError(f'Failed to send message to slack, {response.status_code}, response: {response.text}')
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;GitHub&lt;/strong&gt; :: &lt;a href="https://github.com/snepar/flink-algo-trading/blob/main/signal_handler.py" rel="noopener noreferrer"&gt;https://github.com/snepar/flink-algo-trading/blob/main/signal_handler.py&lt;/a&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Alerts in Slack&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fs3jcmmxtnot5jbgmjgbo.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fs3jcmmxtnot5jbgmjgbo.png" alt="SlackAlerts" width="800" height="574"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  &amp;gt;&amp;gt; Place Order Based on Trade Signals
&lt;/h2&gt;

&lt;ul&gt;
&lt;li&gt;Configure the Trade API to place order based on Buy and Sell Signal in Alpaca Broker
&lt;/li&gt;
&lt;/ul&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;def place_order(symbol, qty, side, order_type, time_in_force):
    try:
        order = api.submit_order(
            symbol=symbol,
            qty=qty,
            side=side,
            type=order_type,
            time_in_force=time_in_force
        )
        print(f'Order submitted: {order}')
        return order
    except Exception as e:
        print(f'An error occured while submitting order {e}')
        return None
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;GitHub&lt;/strong&gt; :: &lt;a href="https://github.com/snepar/flink-algo-trading/blob/main/signal_handler.py" rel="noopener noreferrer"&gt;https://github.com/snepar/flink-algo-trading/blob/main/signal_handler.py&lt;/a&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Console Log &lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Flket41cqhr95eadgu6ke.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Flket41cqhr95eadgu6ke.png" alt="order" width="738" height="1230"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Alpaca Dashboard&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fni0hpnmpbxtunn525kro.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fni0hpnmpbxtunn525kro.png" alt="AlpacaDashboard1" width="800" height="480"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F2ijsfsw5tog8epntql7e.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F2ijsfsw5tog8epntql7e.png" alt="AlpacaDashboard2" width="800" height="776"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  &amp;gt;&amp;gt; BackTesting The Algorithm Strategies
&lt;/h2&gt;

&lt;p&gt;&lt;strong&gt;All Weather Strategy&lt;/strong&gt; :: This implements Ray Dalio's All Weather strategy, which aims to perform well in any economic environment by balancing growth assets with protection against different economic scenarios (inflation, deflation, growth, recession).&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;GitHub&lt;/strong&gt; :: &lt;a href="https://github.com/snepar/flink-algo-trading/blob/main/strategies/AllWeatherStrategy.py" rel="noopener noreferrer"&gt;https://github.com/snepar/flink-algo-trading/blob/main/strategies/AllWeatherStrategy.py&lt;/a&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;import backtrader as bt

class AllWeatherStrategy(bt.Strategy):
    def __init__(self):
        self.year_last_rebalanced = -1
        self.weights = {"VTI": 0.30, 'TLT': 0.40, 'IEF': 0.15, 'GLD': 0.075, 'DBC': 0.075}

    def next(self):
        if self.datetime.date().year == self.year_last_rebalanced:
            return

        self.year_last_rebalanced = self.datetime.date().year

        for i, d in enumerate(self.datas):
            symbol = d._name
            self.order_target_percent(d, target=self.weights[symbol])
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This code defines a trading strategy class &lt;strong&gt;AllWeatherStrategy&lt;/strong&gt; that inherits from Backtrader's Strategy class. The strategy implements annual portfolio rebalancing with predefined asset allocations:&lt;/p&gt;

&lt;p&gt;In the &lt;code&gt;__init__&lt;/code&gt; method:&lt;/p&gt;

&lt;p&gt;self.year_last_rebalanced = -1: Tracks the last rebalancing year&lt;br&gt;
self.weights dictionary defines the asset allocation:&lt;/p&gt;

&lt;p&gt;30% in VTI (Vanguard Total Stock Market ETF) - Growth&lt;br&gt;
40% in TLT (Long-term Treasury Bonds) - Deflation protection&lt;br&gt;
15% in IEF (Intermediate Treasury Bonds) - Income&lt;br&gt;
7.5% in GLD (Gold) - Inflation protection&lt;br&gt;
7.5% in DBC (Commodity Index) - Inflation protection&lt;/p&gt;

&lt;p&gt;The next method executes the rebalancing logic:&lt;/p&gt;

&lt;p&gt;Checks if we're in a new year (compared to last rebalance)&lt;br&gt;
If it's a new year:&lt;/p&gt;

&lt;p&gt;Updates the last rebalanced year&lt;br&gt;
Iterates through each asset in the portfolio&lt;br&gt;
Uses order_target_percent to adjust each position to match its target weight&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Golden Cross Strategy&lt;/strong&gt; :: &lt;/p&gt;

&lt;p&gt;&lt;strong&gt;GitHub&lt;/strong&gt; :: &lt;a href="https://github.com/snepar/flink-algo-trading/blob/main/strategies/GoldenCrossStrategy.py" rel="noopener noreferrer"&gt;https://github.com/snepar/flink-algo-trading/blob/main/strategies/GoldenCrossStrategy.py&lt;/a&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;import backtrader as bt

class GoldenCrossStrategy(bt.Strategy):
    params = (
        ('short_window', 50),
        ('long_window', 200)
    )

    def __init__(self):
        self.short_ema = bt.indicators.EMA(self.datas[0].close, period=self.params.short_window)
        self.long_ema = bt.indicators.EMA(self.datas[0].close, period=self.params.long_window)
        self.crossover = bt.indicators.CrossOver(self.short_ema, self.long_ema)

    def next(self):
        if not self.position:
            if self.crossover &amp;gt; 0:
                self.buy()
        elif self.crossover &amp;lt; 0:
            self.close()
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Golden Cross trading strategy, which is a popular technical analysis method.&lt;/p&gt;

&lt;p&gt;The GoldenCrossStrategy class defines a trend-following strategy based on exponential moving average (EMA) crossovers:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Strategy Parameters (params):&lt;/li&gt;
&lt;/ul&gt;

&lt;ol&gt;
&lt;li&gt;        short_window = 50: 50-day EMA period&lt;/li&gt;
&lt;li&gt;        long_window = 200: 200-day EMA period&lt;/li&gt;
&lt;/ol&gt;

&lt;ul&gt;
&lt;li&gt;In the &lt;code&gt;__init__&lt;/code&gt; method:
&lt;strong&gt;Creates two EMAs using closing prices:&lt;/strong&gt;
&lt;/li&gt;
&lt;/ul&gt;

&lt;ol&gt;
&lt;li&gt;short_ema: 50-day EMA (faster moving average)&lt;/li&gt;
&lt;li&gt;long_ema: 200-day EMA (slower moving average)&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;strong&gt;Creates a crossover indicator to detect when the EMAs cross&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;The &lt;code&gt;next&lt;/code&gt; method contains trading logic:&lt;br&gt;
        If no position is held (if not self.position):&lt;br&gt;
            Buys when short EMA crosses above long EMA (crossover &amp;gt; 0)&lt;br&gt;
        If holding a position:&lt;br&gt;
            Sells when short EMA crosses below long EMA (crossover &amp;lt; 0)&lt;/p&gt;

&lt;p&gt;This strategy follows the principle that:&lt;/p&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;A "Golden Cross" (short EMA crossing above long EMA) signals an uptrend and triggers a buy
A "Death Cross" (short EMA crossing below long EMA) signals a downtrend and triggers a sell
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;p&gt;&lt;strong&gt;MomentumStrategy Backtesting Implementation&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Key features of this strategy:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Uses momentum to identify strong upward price movements&lt;/li&gt;
&lt;li&gt;Uses EMA as a trailing stop mechanism&lt;/li&gt;
&lt;li&gt;Combines momentum and trend following concepts&lt;/li&gt;
&lt;li&gt;Momentum &amp;gt; 100 indicates price is moving up significantly&lt;/li&gt;
&lt;li&gt;Price below EMA suggests trend might be weakening&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;The strategy aims to:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Catch strong upward price movements (momentum &amp;gt; 100)&lt;/li&gt;
&lt;li&gt;Stay in the trade while trend remains positive&lt;/li&gt;
&lt;li&gt;Exit when trend weakens (price &amp;lt; EMA)&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;strong&gt;GitHub&lt;/strong&gt; :: &lt;a href="https://github.com/snepar/flink-algo-trading/blob/main/strategies/MomentumStrategy.py" rel="noopener noreferrer"&gt;https://github.com/snepar/flink-algo-trading/blob/main/strategies/MomentumStrategy.py&lt;/a&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;import backtrader as bt

class MomentumStrategy(bt.Strategy):
    params = (
        ('momentum_period', 12),
        ('exit_period', 26)
    )

    def __init__(self):
        self.momentum = bt.indicators.MomentumOscillator(
            self.datas[0].close, period=self.params.momentum_period)
        self.exit_signal = bt.indicators.EMA(
            self.datas[0].close, period=self.params.exit_period)

    def next(self):
        if not self.position:
            if self.momentum &amp;gt; 100:
                self.buy()
        elif self.datas[0].close[0] &amp;lt; self.exit_signal[0]:
            self.close()
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;*&lt;em&gt;Test Using Backtester (Momentum Strategy) *&lt;/em&gt; &lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Backtrader backtesting script integrates with Alpaca's API.&lt;/li&gt;
&lt;li&gt;Creates Backtrader's main engine (Cerebro)&lt;/li&gt;
&lt;li&gt;Sets initial cash amount&lt;/li&gt;
&lt;li&gt;Adds the specified trading strategy&lt;/li&gt;
&lt;li&gt;Runs the backtest&lt;/li&gt;
&lt;li&gt;Prints initial and final portfolio values&lt;/li&gt;
&lt;li&gt;Calculates percentage return&lt;/li&gt;
&lt;li&gt;Reports Sharpe Ratio&lt;/li&gt;
&lt;li&gt;Generates performance plots
&lt;/li&gt;
&lt;/ul&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;def run_backtest(strategy, symbols, start, end, timeframe, cash):
    rest_api = REST(config['key_id'], config['secret_key'], base_url=config['trade_api_base_url'])

    #initialize backtrader broker
    cerebro = bt.Cerebro(stdstats=True)
    cerebro.broker.setcash(cash)

    # add strategy
    cerebro.addstrategy(strategy)

    # add analytics
    cerebro.addobserver(bt.observers.Value)
    cerebro.addobserver(bt.observers.BuySell)

    cerebro.addanalyzer(bt.analyzers.SharpeRatio, _name='mysharpre')
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;GitHub&lt;/strong&gt; :: &lt;a href="https://github.com/snepar/flink-algo-trading/blob/main/backtester.py" rel="noopener noreferrer"&gt;https://github.com/snepar/flink-algo-trading/blob/main/backtester.py&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F5cpdy4gxm9l0y1kucvk6.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F5cpdy4gxm9l0y1kucvk6.png" alt="Momentum" width="800" height="442"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  &amp;gt;&amp;gt; Paper Vs Real Verifications
&lt;/h2&gt;

&lt;p&gt;&lt;strong&gt;Paper Trading&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fsv71t3dj9vwyjq39pfec.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fsv71t3dj9vwyjq39pfec.png" alt="PaperView" width="800" height="400"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Real Trading&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fd0s6009hmqzhrwkzpp7u.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fd0s6009hmqzhrwkzpp7u.png" alt="TradingView" width="800" height="492"&gt;&lt;/a&gt;&lt;/p&gt;




&lt;h2&gt;
  
  
  References
&lt;/h2&gt;

&lt;ul&gt;
&lt;li&gt;Realtime Algorithmic Trading with Apache Flink
&lt;a href="https://www.youtube.com/watch?v=7r_oO_uLbSM" rel="noopener noreferrer"&gt;https://www.youtube.com/watch?v=7r_oO_uLbSM&lt;/a&gt;
&lt;/li&gt;
&lt;/ul&gt;

</description>
      <category>sql</category>
      <category>python</category>
      <category>dataengineering</category>
      <category>eventdriven</category>
    </item>
    <item>
      <title>Apache Spark-Structured Streaming :: Cab Aggregator Use-case</title>
      <dc:creator>SNEHASISH DUTTA</dc:creator>
      <pubDate>Sun, 30 Jun 2024 17:50:09 +0000</pubDate>
      <link>https://forem.com/datasosneh/apache-spark-structured-streaming-cab-aggregator-use-case-2od0</link>
      <guid>https://forem.com/datasosneh/apache-spark-structured-streaming-cab-aggregator-use-case-2od0</guid>
      <description>&lt;p&gt;&lt;em&gt;Building helps you retain more knowledge.&lt;br&gt;
But teaching helps you retain even more. Teaching is another modality that locks in the experience you gain from building.--Dan Koe&lt;/em&gt;&lt;/p&gt;
&lt;h2&gt;
  
  
  Objective
&lt;/h2&gt;

&lt;p&gt;Imagine a very simple system that can automatically warn cab companies whenever a driver rejects a bunch of rides in a short time. This system would use Kafka to send ride information (accepted, rejected) and Spark Structured Streaming to analyze it in real-time. If a driver rejects too many rides, the system would trigger an alert so the cab company can investigate.&lt;/p&gt;
&lt;h2&gt;
  
  
  What is Spark Structured Streaming ?
&lt;/h2&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fjvnndwdl0ulog4bexcxg.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fjvnndwdl0ulog4bexcxg.png" alt="Structured" width="800" height="354"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Spark Structured Streaming is a powerful tool for processing data streams in real-time. It's built on top of Apache Spark SQL, which means it leverages the familiar DataFrame and Dataset APIs you might already use for batch data processing in Spark. This offers several advantages:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Unified Programming Model:&lt;/strong&gt; You can use the same set of operations for both streaming and batch data, making it easier to develop and maintain code.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Declarative API:&lt;/strong&gt; Spark Structured Streaming lets you describe what you want to achieve with your data processing, rather than writing complex low-level code to handle the streaming aspects.&lt;br&gt;
Fault Tolerance: Spark Structured Streaming ensures your processing jobs can recover from failures without losing data. It achieves this through techniques like checkpointing and write-ahead logs.&lt;/p&gt;

&lt;p&gt;Here's a breakdown of how Spark Structured Streaming works:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Streaming Data Source:&lt;/strong&gt; Your data comes from a streaming source like Kafka, Flume, or custom code that generates a continuous stream of data.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Micro-Batching:&lt;/strong&gt; Spark Structured Streaming breaks down the continuous stream into small chunks of data called micro-batches.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Structured Processing:&lt;/strong&gt; Each micro-batch is processed like a regular DataFrame or Dataset using Spark SQL operations. This allows you to perform transformations, aggregations, and other data manipulations on the streaming data.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Updated Results:&lt;/strong&gt; As new micro-batches arrive, the processing continues, and the results are constantly updated, reflecting the latest data in the stream.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Sinks:&lt;/strong&gt; The final output can be written to various destinations like databases, dashboards, or other streaming systems for further analysis or action.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Benefits of Spark Structured Streaming:&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Real-time Insights:&lt;/strong&gt; Analyze data as it arrives, enabling quicker decision-making and proactive responses to events.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Scalability:&lt;/strong&gt; Handles large volumes of streaming data efficiently by leveraging Spark's distributed processing capabilities.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Ease of Use:&lt;/strong&gt; The familiar DataFrame/Dataset API makes it easier to develop and maintain streaming applications.&lt;/p&gt;

&lt;p&gt;In essence, Spark Structured Streaming bridges the gap between batch processing and real-time analytics, allowing you to analyze data as it's generated and gain valuable insights from continuous data streams.&lt;/p&gt;
&lt;h2&gt;
  
  
  Project Architecture
&lt;/h2&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F2mw8k9f6919h4insllwi.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F2mw8k9f6919h4insllwi.png" alt="Architecture" width="699" height="90"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Extract&lt;/strong&gt; From : Apache Kafka&lt;br&gt;
&lt;strong&gt;Transform&lt;/strong&gt; Using : Apache Spark&lt;br&gt;
&lt;strong&gt;Load&lt;/strong&gt; Into : Apache Kafka&lt;/p&gt;
&lt;h2&gt;
  
  
  Producer and Infrastructure
&lt;/h2&gt;

&lt;p&gt;Repository : &lt;a href="https://github.com/snepar/cab-producer-infra" rel="noopener noreferrer"&gt;https://github.com/snepar/cab-producer-infra&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;It is a Simple Application which ingests data into Kafka&lt;br&gt;
It ingests Random Events either Accepted or Rejected&lt;/p&gt;

&lt;p&gt;Sample Event&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;{ 
   "id": 3949106,
   "event_date": 1719749696532,
   "tour_value": 29.75265579847153,
   "id_driver": 3,
   "id_passenger": 11,
   "tour_status": rejected
} 
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Start the Infrastructure&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;docker compose up
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Radom Events Generator&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;val statuses = List("accepted", "rejected")
    val status = statuses(Random.nextInt(statuses.length))
    while (true) {
      val topic = "ride"
      val r = scala.util.Random
      val id = r.nextInt(10000000)
      val tour_value = r.nextDouble() * 100
      val id_driver = r.nextInt(10)
      val id_passenger = r.nextInt(100)
      val event_date = System.currentTimeMillis
      val payload =
        s"""{"id":$id,"event_date":$event_date,"tour_value":$tour_value,"id_driver":$id_driver,"id_passenger":$id_passenger,"tour_status":"$status"}""".stripMargin

      EventProducer.send(topic, payload)
      Thread.sleep(1000)
    }
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Send Random Events to Producer&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;def send(topic: String, payload: String): Unit = {
    val record = new ProducerRecord[String, String](topic, key, payload)
    producer.send(record)
  }
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;See the produced events from Topic named &lt;strong&gt;ride&lt;/strong&gt; in the Docker Terminal&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;kafka-console-consumer --topic ride --bootstrap-server broker:9092
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F4btbuyegia986ok7ejie.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F4btbuyegia986ok7ejie.png" alt="Ride" width="800" height="334"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Spark Structured Streaming Application
&lt;/h2&gt;

&lt;p&gt;Repository : &lt;a href="https://github.com/snepar/spark-streaming-cab" rel="noopener noreferrer"&gt;https://github.com/snepar/spark-streaming-cab&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Create Spark Session to Execute the application locally ::&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;val spark = SparkSession.builder()
      .appName("Integrating Kafka")
      .master("local[2]")
      .getOrCreate()

spark.sparkContext.setLogLevel("WARN")
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Configure Reader and Writer - Kafka topics&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;    val kafkahost = "localhost:9092"
    val inputTopic = "ride"
    val outputTopic = "rejectalert"
    val props = new Properties()
    props.put("host", kafkahost)
    props.put("input_topic",inputTopic)
    props.put("output_host", kafkahost)
    props.put("output_topic",outputTopic)
    props.put("checkpointLocation","/tmp")
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Define Schema for the Events&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;val schema = StructType(Seq(
      StructField("id", IntegerType, nullable = true), 
      StructField("event_date", LongType, nullable = false), 
      StructField("tour_value", DoubleType, nullable = true), 
      StructField("id_driver", StringType, nullable = false), 
      StructField("id_passenger", IntegerType, nullable = false), 
      StructField("tour_status", StringType, nullable = false) 
    ))
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Read from Kafka Topic and Create the Streaming Dataframe&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;val df = spark.readStream.format("kafka")
      .option("kafka.bootstrap.servers","localhost:9092")
      .option("failOnDataLoss","false")
      .option("startingOffsets", "latest")
      .option("subscribe", "ride").load()
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Parse the Dataframe with Schema and filter out only Events which are marked as Rejected &lt;br&gt;
&lt;strong&gt;The Rejected Events Signify that a Driver has rejected a ride&lt;/strong&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;val parsedDF = df.selectExpr("cast(value as string) as value")
      .select(from_json(col("value"), schema).as("data"))
      .select("data.*").where("tour_status='rejected'")
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Aggregate in a Window of 1 minute how many rides were rejected and Group By driver ID , also calculate how much money has been lost due to this rejection&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;val driverPerformance: DataFrame = parsedDF.groupBy(
      window(to_utc_timestamp(from_unixtime(col("event_date") / 1000, "yyyy-MM-dd HH:mm:ss"), "UTC+1")
        .alias("event_timestamp"),
        "1 minute"),
      col("id_driver"))
      .agg(count(col("id")).alias("total_rejected_tours"),
        sum("tour_value").alias("total_loss"))
.select("id_driver", "total_rejected_tours", "total_loss")

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Set a threshold of 3 cancellations , if it crosses 3 generate an event&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;val thresholdCrossedDF = driverPerformance.where(col("total_rejected_tours").gt(3))
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Write this DataFrame to a Kafka Topic &lt;strong&gt;rejectalert&lt;/strong&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;thresholdCrossedDF.selectExpr("CAST(id_driver AS STRING) AS key", "to_json(struct(*)) AS value")
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers",
      prop.getProperty("output_host","localhost:9092"))
      .option("topic",prop.getProperty("output_topic","rejectalert"))
.outputMode("update".option("checkpointLocation",prop.getProperty("checkpoint","/tmp"))
      .start().awaitTermination()
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Run the Complete Application : &lt;a href="https://github.com/snepar/spark-streaming-cab/blob/master/src/main/scala/rideevent/AlertGenerator.scala" rel="noopener noreferrer"&gt;https://github.com/snepar/spark-streaming-cab/blob/master/src/main/scala/rideevent/AlertGenerator.scala&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Using A Consumer on Kafka Broker Subscribe to these Alerts&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fcr3m9ujzr95gz5szqb9f.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fcr3m9ujzr95gz5szqb9f.png" alt="Reject" width="800" height="380"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  References
&lt;/h2&gt;

&lt;p&gt;&lt;a href="https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html" rel="noopener noreferrer"&gt;https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://github.com/rockthejvm/spark-streaming" rel="noopener noreferrer"&gt;https://github.com/rockthejvm/spark-streaming&lt;/a&gt;&lt;/p&gt;

</description>
      <category>apachespark</category>
      <category>dataengineering</category>
      <category>streaming</category>
      <category>realtimedata</category>
    </item>
    <item>
      <title>Apache Flink :: E-commerce Data Pipeline Usecase</title>
      <dc:creator>SNEHASISH DUTTA</dc:creator>
      <pubDate>Wed, 19 Jun 2024 07:35:15 +0000</pubDate>
      <link>https://forem.com/datasosneh/apache-flink-e-commerce-data-pipeline-usecase-3ha9</link>
      <guid>https://forem.com/datasosneh/apache-flink-e-commerce-data-pipeline-usecase-3ha9</guid>
      <description>&lt;p&gt;In today's data-driven world, real-time streaming is essential for businesses to gain instant insights and respond swiftly to market changes. Technologies like Apache Flink, Kafka, Postgres, Elasticsearch, and Kibana combine to create a powerful stack for processing, storing, searching, and visualizing streaming data in real time. &lt;/p&gt;

&lt;p&gt;This article explores the integration of these technologies to build a robust real-time streaming architecture for an e-commerce business.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;GitHub&lt;/strong&gt; :: &lt;a href="https://github.com/snepar/flink-ecom" rel="noopener noreferrer"&gt;https://github.com/snepar/flink-ecom&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Introducing the Building Blocks
&lt;/h2&gt;

&lt;p&gt;This concisely conveys the purpose of the following components.&lt;br&gt;
&lt;strong&gt;Apache Kafka ::&lt;/strong&gt; A distributed streaming platform that excels in handling real-time data feeds. It enables the seamless transmission of data across systems, ensuring high-throughput, low-latency, fault-tolerant communication.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Apache Flink ::&lt;/strong&gt; A Powerful real-time stream engine (with APIs in Java/Scala/Python) for low-latency data analysis &amp;amp; transformations on massive data streams.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;PostgreSQL (Postgres)::&lt;/strong&gt; is an advanced open-source relational database known for its robustness, scalability, and SQL compliance. It supports complex queries, ACID transactions, and extensibility with custom functions and types.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Elasticsearch ::&lt;/strong&gt; is a distributed, open-source search and analytics engine. It excels at full-text search, real-time data indexing, and querying. It's highly scalable and ideal for log and event data analysis.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Kibana ::&lt;/strong&gt; is a powerful visualization tool for Elasticsearch. It provides real-time insights into data with interactive charts, graphs, and dashboards, making it easy to explore and analyze large datasets visually.&lt;/p&gt;
&lt;h2&gt;
  
  
  Data Flow Diagram (Architecture)
&lt;/h2&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fpqoje8fwheifbf7st0bf.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fpqoje8fwheifbf7st0bf.png" alt="Data Flow Diagram" width="800" height="293"&gt;&lt;/a&gt;&lt;/p&gt;
&lt;h2&gt;
  
  
  Goal
&lt;/h2&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;Data Ingestion:&lt;/strong&gt; using Kafka&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Streaming Extract:&lt;/strong&gt; using Flink Source APIs&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Transform and Aggregate:&lt;/strong&gt; using Flink SerDe and transformations&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Load:&lt;/strong&gt; using Flink Sink APIs to Postgres and Elasticsearch&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Visualise:&lt;/strong&gt; using Kibana&lt;/li&gt;
&lt;/ul&gt;
&lt;h2&gt;
  
  
  Let Us Begin With the Infrastructure
&lt;/h2&gt;

&lt;ul&gt;
&lt;li&gt;Install the following&lt;/li&gt;
&lt;/ul&gt;

&lt;ol&gt;
&lt;li&gt;Python 3 , pip&lt;/li&gt;
&lt;li&gt;Scala 2.12 , sbt&lt;/li&gt;
&lt;li&gt;Docker Desktop&lt;/li&gt;
&lt;li&gt;Docker Compose&lt;/li&gt;
&lt;/ol&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;Refer to this repository for the complete setup &lt;br&gt;
&lt;a href="https://github.com/snepar/flink-ecom-infra" rel="noopener noreferrer"&gt;https://github.com/snepar/flink-ecom-infra&lt;/a&gt;&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Execute command &lt;code&gt;docker compose up -d&lt;/code&gt;&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Run the python file &lt;code&gt;main.py&lt;/code&gt; to generate events to kafka&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fx7t6gjvlbzmihybu6fyj.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fx7t6gjvlbzmihybu6fyj.png" alt="main-py-output" width="800" height="150"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Execute the kafka consumer from your docker service to verify if the events are flowing as expected&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;code&gt;kafka-console-consumer --topic financial_transactions --bootstrap-server broker:9092&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fb0rav3k1tjtw5rt9u1n1.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fb0rav3k1tjtw5rt9u1n1.png" alt="kafka-consumer-log" width="800" height="155"&gt;&lt;/a&gt;&lt;/p&gt;
&lt;h2&gt;
  
  
  Set Up Apache Flink
&lt;/h2&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;I have used Flink 1.16.3 &lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Run Flink Locally using -&amp;gt;&lt;br&gt;
&lt;a href="https://nightlies.apache.org/flink/flink-docs-stable/docs/try-flink/local_installation/" rel="noopener noreferrer"&gt;https://nightlies.apache.org/flink/flink-docs-stable/docs/try-flink/local_installation/&lt;/a&gt;&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;
&lt;h2&gt;
  
  
  The Flink Project Deep Dive
&lt;/h2&gt;

&lt;p&gt;&lt;strong&gt;build.sbt&lt;/strong&gt; manages all the dependencies for the project. To connect to Kafka, Postgres, and Elasticsearch.&lt;br&gt;
&lt;a href="https://github.com/snepar/flink-ecom/blob/master/build.sbt" rel="noopener noreferrer"&gt;https://github.com/snepar/flink-ecom/blob/master/build.sbt&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Flink Kafka Connector&lt;/strong&gt; uses &lt;code&gt;org.apache.flink.connector.kafka.source.KafkaSource&lt;/code&gt;&lt;br&gt;
Example Snippet::&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;KafkaSource.builder[Transaction]()
      .setBootstrapServers("localhost:9092")
      .setTopics(topic)
      .setGroupId("ecom-group")
      .setStartingOffsets(OffsetsInitializer.earliest())
      .setValueOnlyDeserializer(new JSONValueDeserializationSchema())
      .build()
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;JSON Deserializer&lt;/strong&gt; is used to deserialze kafka payload to &lt;code&gt;Transaction&lt;/code&gt; &lt;br&gt;
Can be referred here :: &lt;a href="https://github.com/snepar/flink-ecom/blob/master/src/main/scala/ecom/deserializer/JSONValueDeserializationSchema.scala" rel="noopener noreferrer"&gt;https://github.com/snepar/flink-ecom/blob/master/src/main/scala/ecom/deserializer/JSONValueDeserializationSchema.scala&lt;/a&gt;&lt;br&gt;
&lt;em&gt;Important Fact:&lt;/em&gt; as we are dealing with Scala Case Classes &lt;br&gt;
use DefaultScalaModule &lt;br&gt;
&lt;code&gt;import com.fasterxml.jackson.module.scala.DefaultScalaModule&lt;br&gt;
mapper.registerModule(DefaultScalaModule)&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;see the data consumed from Kafka in Flink using&lt;/strong&gt; &lt;code&gt;transactionStream.print()&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fbuhtaec09n746bxotoyu.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fbuhtaec09n746bxotoyu.png" alt="ide-output" width="800" height="283"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Flink-Postgres Sink&lt;/strong&gt; &lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;DDLs are Defined here : &lt;a href="https://github.com/snepar/flink-ecom/tree/master/src/main/scala/ecom/generators/DDL" rel="noopener noreferrer"&gt;https://github.com/snepar/flink-ecom/tree/master/src/main/scala/ecom/generators/DDL&lt;/a&gt;&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Aggregation Examples (Monthly)&lt;br&gt;
&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;def writeToDBsalesPerMonth(transactionStream: DataStream[Transaction]) = {
    transactionStream.addSink(JdbcSink.sink(
      DDL.SalesPerMonthSQL.createTable,
      new JdbcStatementBuilder[Transaction] {
        override def accept(t: PreparedStatement, u: Transaction): Unit = {
        }
      },
      execOptions,
      connOptions
    ))

    transactionStream.map(transaction =&amp;gt;
      {
        val transactionDate = new Date(transaction.transactionDate.getTime);
        val year = transactionDate.toLocalDate().getYear();
        val month = transactionDate.toLocalDate().getMonth().getValue();
        SalesPerMonth(year, month, totalSales = transaction.totalAmount)
      }
    ).keyBy(spm=&amp;gt;(spm.year,spm.month)).reduce((acc,curr) =&amp;gt; acc.copy(totalSales = acc.totalSales + curr.totalSales))
      .addSink(JdbcSink.sink(
        DDL.SalesPerMonthSQL.insertStmt,
        new JdbcStatementBuilder[SalesPerMonth] {
          override def accept(preparedStatement: PreparedStatement, salesPerMonth: SalesPerMonth): Unit = {
            preparedStatement.setInt(1, salesPerMonth.year)
            preparedStatement.setInt(2, salesPerMonth.month)
            preparedStatement.setDouble(3, salesPerMonth.totalSales)
          }
        },
        execOptions,
        connOptions
      ))

  }
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;


&lt;p&gt;&lt;strong&gt;Flink-Elastic Sink&lt;/strong&gt; &lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Important :: While defining the emitter function, type description is required (it is not mentioned in the documentation examples somehow)&lt;/strong&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;def writeToElastic(transactionStream: DataStream[Transaction]) = {

    val sink: ElasticsearchSink[Transaction] = new Elasticsearch7SinkBuilder[Transaction]
      .setHosts(new HttpHost("localhost", 9200, "http"))
      .setBulkFlushMaxActions(2)
      .setBulkFlushInterval(10L)
      .setEmitter[Transaction]{
        (transaction, context, indexer) =&amp;gt; {
          val mapper = new ObjectMapper()
          mapper.registerModule(DefaultScalaModule)
          val json: String = mapper.writeValueAsString(transaction)

          val indexRequest = Requests.indexRequest()
            .index("transactions")
            .id(transaction.transactionId)
            .source(json, XContentType.JSON);
          indexer.add(indexRequest)
        }
      }.build()

    transactionStream.sinkTo(sink)
  }
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Flink Job Execution
&lt;/h2&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;code&gt;KafkaPGESIntegrationEcom.scala&lt;/code&gt; : Can be directly run from the IDE
OR&lt;/li&gt;
&lt;li&gt; Install a flink cluster and deploy using &lt;code&gt;$flink run -c ecom.KafkaPGESIntegrationEcom flink-ecom_2.12-0.1.jar&lt;/code&gt;
&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fzy49k2ol8dm8r9ihkppm.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fzy49k2ol8dm8r9ihkppm.png" alt="From_IDE_Run" width="800" height="283"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  A Glimpse into Postgres SQL
&lt;/h2&gt;

&lt;p&gt;&lt;strong&gt;transactions&lt;/strong&gt;&lt;br&gt;
&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fu8t1dm5d9e7dcq6pqh7j.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fu8t1dm5d9e7dcq6pqh7j.png" alt="transactions" width="800" height="288"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;sales_per_category&lt;/strong&gt;&lt;br&gt;
&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fmgrqg58gslsdbbbamu23.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fmgrqg58gslsdbbbamu23.png" alt="sales_per_category" width="778" height="514"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;sales_per_day&lt;/strong&gt;&lt;br&gt;
&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fs2lfyxdh2wfvg6jogfw6.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fs2lfyxdh2wfvg6jogfw6.png" alt="sales_per_day" width="726" height="254"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;sales_per_month&lt;/strong&gt;&lt;br&gt;
&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F3qt1v24cj0upag35dba4.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F3qt1v24cj0upag35dba4.png" alt="sales_per_month" width="718" height="184"&gt;&lt;/a&gt;&lt;/p&gt;
&lt;h2&gt;
  
  
  Data on Elasticsearch
&lt;/h2&gt;

&lt;p&gt;&lt;strong&gt;Indexing&lt;/strong&gt; here’s the structure of the transaction index on elasticsearch.&lt;br&gt;
You can get this by running &lt;code&gt;GET transactions&lt;/code&gt; in the DevTools.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fanx6gi16qcuw3r9idgd6.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fanx6gi16qcuw3r9idgd6.png" alt="Indexing" width="800" height="399"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;You can query them by running &lt;code&gt;GET transactions/_search&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fpzg8v7ui49lwcbep47fb.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fpzg8v7ui49lwcbep47fb.png" alt="_search" width="800" height="325"&gt;&lt;/a&gt;&lt;/p&gt;
&lt;h2&gt;
  
  
  Reindexing Data on Elasticsearch
&lt;/h2&gt;

&lt;p&gt;To get a readable transaction date, we need to reindex into a different index. To reindex data on elasticsearch, we use the &lt;code&gt;_reindex&lt;/code&gt; function.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;POST _reindex
{
 "source": {"index": "transactions"}, 
 "dest": {"index": "transaction_part1"},
 "script": {"source":"""
   ctx._source.transactionDate = new 
   Date(ctx._source.transactionDate).toString();
"""}
}

GET reindex/_search
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fkr6oa70o7zic19c5hfk0.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fkr6oa70o7zic19c5hfk0.png" alt="reindex" width="800" height="248"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;However, using toString() does not give us much room to wiggle around the format. So we need to use a more robust way to format the data.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;POST _reindex
{
"source": {"index": "transactions"}, 
"dest": {"index": "transaction_part2"},
"script": {"source": 
 """SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");
  formatter.setTimeZone(TimeZone.getTimeZone('UTC'));
  ctx._source.transactionDate = formatter.format (new 
  Date(ctx._source.transactionDate));"""
 }
}

GET transaction_part2/_search
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fs5f00fj4i9zvvgsg88lh.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fs5f00fj4i9zvvgsg88lh.png" alt="_reindex2" width="800" height="293"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Dashboard-ing in Realtime With Kibana
&lt;/h2&gt;

&lt;p&gt;&lt;strong&gt;Index on transaction_part2&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Creating Donut Chart&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fei6gd49qsbwf8w2oophh.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fei6gd49qsbwf8w2oophh.png" alt="Donut" width="800" height="348"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Number Of Transactions&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fpbsf3a9upm3kh5jbtlou.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fpbsf3a9upm3kh5jbtlou.png" alt="Count" width="800" height="364"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Top Brands&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Ffli944wup25qfeozes38.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Ffli944wup25qfeozes38.png" alt="Brands" width="800" height="291"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Final Dashboard&lt;/strong&gt;&lt;br&gt;
&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Frs4tkv2nncukmz2ulcyt.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Frs4tkv2nncukmz2ulcyt.png" alt="Dashboard" width="800" height="177"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Thank You For Spending Sometime Here
&lt;/h2&gt;

&lt;h2&gt;
  
  
  References
&lt;/h2&gt;

&lt;ul&gt;
&lt;li&gt;&lt;a href="https://medium.com/towards-data-engineering/real-time-streaming-at-scale-integrating-apache-flink-kafka-postgres-elasticsearch-kibana-and-132a7fd59e00" rel="noopener noreferrer"&gt;https://medium.com/towards-data-engineering/real-time-streaming-at-scale-integrating-apache-flink-kafka-postgres-elasticsearch-kibana-and-132a7fd59e00&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://github.com/rockthejvm/flink" rel="noopener noreferrer"&gt;https://github.com/rockthejvm/flink&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Image Reference&lt;/strong&gt; &lt;a href="https://www.youtube.com/watch?v=deepQRXnniM" rel="noopener noreferrer"&gt;https://www.youtube.com/watch?v=deepQRXnniM&lt;/a&gt;
&lt;/li&gt;
&lt;/ul&gt;

</description>
      <category>dataengineering</category>
      <category>streaming</category>
      <category>scala</category>
      <category>apacheflink</category>
    </item>
  </channel>
</rss>
