<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>Forem: Eric Kahindi</title>
    <description>The latest articles on Forem by Eric Kahindi (@eric_kahindi_cfbfda3bd0f7).</description>
    <link>https://forem.com/eric_kahindi_cfbfda3bd0f7</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F3397763%2F4593e865-2d23-4d90-b151-a0f2a5b84780.png</url>
      <title>Forem: Eric Kahindi</title>
      <link>https://forem.com/eric_kahindi_cfbfda3bd0f7</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://forem.com/feed/eric_kahindi_cfbfda3bd0f7"/>
    <language>en</language>
    <item>
      <title>Implementing a CDC pipeline with Debezium</title>
      <dc:creator>Eric Kahindi</dc:creator>
      <pubDate>Sat, 29 Nov 2025 21:40:23 +0000</pubDate>
      <link>https://forem.com/eric_kahindi_cfbfda3bd0f7/implementing-a-cdc-pipeline-with-debezium-2lo1</link>
      <guid>https://forem.com/eric_kahindi_cfbfda3bd0f7/implementing-a-cdc-pipeline-with-debezium-2lo1</guid>
      <description>&lt;h2&gt;
  
  
  Introduction
&lt;/h2&gt;

&lt;p&gt;This project is a comprehensive real-time data pipeline that captures, processes, and visualises cryptocurrency market data from Binance. This project leverages Apache Kafka, PostgreSQL, Cassandra, and Grafana to build a scalable, event-driven architecture for financial data processing.&lt;/p&gt;

&lt;p&gt;If you need any more info, visit the &lt;a href="https://github.com/kazeric/binance_top_crypto_pipeline" rel="noopener noreferrer"&gt;repository&lt;/a&gt; to get the full pictures with all the source files.&lt;/p&gt;

&lt;h3&gt;
  
  
  Technology Stack
&lt;/h3&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;Data Ingestion:&lt;/strong&gt; Python with Binance API&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Stream Processing:&lt;/strong&gt; Apache Kafka&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Change Data Capture:&lt;/strong&gt; Debezium PostgreSQL Connector&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Operational Database:&lt;/strong&gt; PostgreSQL&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Analytical Database:&lt;/strong&gt; Apache Cassandra&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Visualization:&lt;/strong&gt; Grafana&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Orchestration:&lt;/strong&gt; Docker &amp;amp; Docker Compose&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Architecture
&lt;/h2&gt;

&lt;h3&gt;
  
  
  Data Flow Patterns
&lt;/h3&gt;

&lt;p&gt;&lt;strong&gt;Write Path:&lt;/strong&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;Binance API → ETL Service → PostgreSQL Insert
                              ↓
                         Debezium CDC
                              ↓
                         Kafka Topics
                              ↓
                         Cassandra Sink
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Read Path:&lt;/strong&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;Cassandra → Grafana Data Sources → Dashboards
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Component Interaction Flow
&lt;/h3&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;p&gt;&lt;strong&gt;Source Layer&lt;/strong&gt; (&lt;a href="https://www.binance.com/en/docs/api" rel="noopener noreferrer"&gt;Binance API&lt;/a&gt;)&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Exposes RESTful endpoints for market data&lt;/li&gt;
&lt;li&gt;No built-in event streaming capability&lt;/li&gt;
&lt;li&gt;Requires polling via ETL producer&lt;/li&gt;
&lt;/ul&gt;
&lt;/li&gt;
&lt;li&gt;
&lt;p&gt;&lt;strong&gt;Ingestion Layer&lt;/strong&gt; (PostgreSQL + Binance ETL)&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Receives transformed data from API&lt;/li&gt;
&lt;li&gt;Enables Write-Ahead Logging (WAL) for CDC&lt;/li&gt;
&lt;li&gt;Maintains logical replication slots for consistency&lt;/li&gt;
&lt;/ul&gt;
&lt;/li&gt;
&lt;li&gt;
&lt;p&gt;&lt;strong&gt;Streaming Layer&lt;/strong&gt; (Apache Kafka)&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Decouples producers from consumers&lt;/li&gt;
&lt;li&gt;Provides topic-based pub/sub model&lt;/li&gt;
&lt;li&gt;Enables event replay and stream processing&lt;/li&gt;
&lt;/ul&gt;
&lt;/li&gt;
&lt;li&gt;
&lt;p&gt;&lt;strong&gt;Persistence Layer&lt;/strong&gt; (Cassandra)&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Optimized for time-series analytical queries&lt;/li&gt;
&lt;li&gt;Distributed storage for fault tolerance&lt;/li&gt;
&lt;li&gt;Supports high-throughput writes&lt;/li&gt;
&lt;/ul&gt;
&lt;/li&gt;
&lt;li&gt;
&lt;p&gt;&lt;strong&gt;Visualization Layer&lt;/strong&gt; (Grafana)&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Queries both PostgreSQL and Cassandra&lt;/li&gt;
&lt;li&gt;Real-time dashboard rendering&lt;/li&gt;
&lt;li&gt;Alert configuration capabilities&lt;/li&gt;
&lt;/ul&gt;
&lt;/li&gt;
&lt;/ol&gt;

&lt;h2&gt;
  
  
  Key Components
&lt;/h2&gt;

&lt;h3&gt;
  
  
  1. Binance ETL Producer
&lt;/h3&gt;

&lt;p&gt;The ETL Producer is the entry point that orchestrates data collection from Binance. It fetches market data for the top cryptocurrency symbols.&lt;br&gt;
The producer identifies the top 5 crypto gainers in the 24-hour period before fetching detailed data:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;fetch_top5_24hr&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="nb"&gt;tuple&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="nb"&gt;list&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nb"&gt;list&lt;/span&gt;&lt;span class="p"&gt;]:&lt;/span&gt;
    &lt;span class="k"&gt;try&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="n"&gt;logging&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;info&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;Starting to load top 5  data ...&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="n"&gt;response&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;requests&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;get&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;https://api.binance.com/api/v3/ticker/24hr&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="n"&gt;response&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;raise_for_status&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
        &lt;span class="n"&gt;data&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;response&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;json&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
        &lt;span class="c1"&gt;# Filter based on the usdt tethered cryptos  
&lt;/span&gt;        &lt;span class="n"&gt;usdt_pairs&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;
            &lt;span class="n"&gt;item&lt;/span&gt; &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;item&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;data&lt;/span&gt;
            &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;item&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;symbol&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;].&lt;/span&gt;&lt;span class="nf"&gt;endswith&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;USDT&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="ow"&gt;and&lt;/span&gt; &lt;span class="nf"&gt;float&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;item&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;quoteVolume&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="mi"&gt;1000000&lt;/span&gt; &lt;span class="ow"&gt;and&lt;/span&gt; &lt;span class="nf"&gt;float&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;item&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;askQty&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="mf"&gt;0.0&lt;/span&gt;
            &lt;span class="p"&gt;]&lt;/span&gt;
        &lt;span class="c1"&gt;# sort the data
&lt;/span&gt;        &lt;span class="n"&gt;sorted_data&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;sorted&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
            &lt;span class="n"&gt;usdt_pairs&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="n"&gt;key&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;lambda&lt;/span&gt; &lt;span class="n"&gt;x&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nf"&gt;float&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;x&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;priceChangePercent&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;]),&lt;/span&gt;
            &lt;span class="n"&gt;reverse&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="bp"&gt;True&lt;/span&gt;
        &lt;span class="p"&gt;)&lt;/span&gt;

        &lt;span class="n"&gt;top_5&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;x&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;symbol&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;x&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;sorted_data&lt;/span&gt;&lt;span class="p"&gt;[:&lt;/span&gt;&lt;span class="mi"&gt;5&lt;/span&gt;&lt;span class="p"&gt;]]&lt;/span&gt;

        &lt;span class="c1"&gt;# push this to the other tasks with xcoms 
&lt;/span&gt;        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;sorted_data&lt;/span&gt;&lt;span class="p"&gt;[:&lt;/span&gt;&lt;span class="mi"&gt;5&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;top_5&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Once the top 5 symbols are identified, the producer fetches and transforms the other 3 types of market data for each of the symbols identified in the list:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;Kline Data&lt;/strong&gt; (OHLCV Candlestick Data) - Fetching 5-minute candlestick data for the last 24 hours. Uses interval='5m' to capture granular price movements.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Recent Trades&lt;/strong&gt; (Individual Trade Transactions) - Fetching up to 300 most recent trades. Each trade shows: who traded, at what price, how much, and when.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Order Book&lt;/strong&gt; (Bid/Ask Levels) - Fetching the current order book with up to 300 levels on each side. Returns: &lt;code&gt;{'bids': [[price, qty], ...], 'asks': [[price, qty], ...]}&lt;/code&gt;
&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Then executes the fetch and transform functions in the following main function. It completes in approximately 5-10 seconds.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;__name__&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;__main__&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
    &lt;span class="c1"&gt;# Initialize Cassandra schema
&lt;/span&gt;    &lt;span class="nf"&gt;setup_cassandra&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

    &lt;span class="c1"&gt;# Fetch top 5 gainers (runs once)
&lt;/span&gt;    &lt;span class="n"&gt;data&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;top_5&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;fetch_top5_24hr&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
    &lt;span class="nf"&gt;transform_load_top5_24hr&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;data&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="c1"&gt;# For each of the top 5 symbols
&lt;/span&gt;    &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;i&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;item&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="nf"&gt;enumerate&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;top_5&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="n"&gt;symbol&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;item&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;symbol&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
        &lt;span class="n"&gt;ranking&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;i&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;

        &lt;span class="c1"&gt;# Fetch and load klines (25 hours of 5-min data)
&lt;/span&gt;        &lt;span class="n"&gt;klines&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;fetch_klines_24hrs&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;symbol&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="nf"&gt;transform_load_klines_data&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;klines&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;symbol&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;ranking&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

        &lt;span class="c1"&gt;# Fetch and load recent trades (300 most recent)
&lt;/span&gt;        &lt;span class="n"&gt;trades&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;fetch_recent_trades&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;symbol&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="nf"&gt;transform_load_recent_trades&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;trades&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;symbol&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;ranking&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

        &lt;span class="c1"&gt;# Fetch and load order book (bid/ask levels)
&lt;/span&gt;        &lt;span class="n"&gt;ob&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;fetch_OB&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;symbol&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="nf"&gt;transform_load_OB_data&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ob&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;symbol&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;ranking&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  2. PostgreSQL
&lt;/h3&gt;

&lt;p&gt;PostgreSQL serves as the operational database—the first landing zone for all market data. Its role is critical: it must capture every insert with precision for downstream CDC.&lt;/p&gt;

&lt;h4&gt;
  
  
  Database Initialization
&lt;/h4&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight yaml"&gt;&lt;code&gt;&lt;span class="c1"&gt;# docker-compose.yml excerpt&lt;/span&gt;
  &lt;span class="na"&gt;postgres&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="na"&gt;image&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;debezium/postgres:15&lt;/span&gt;
    &lt;span class="na"&gt;container_name&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;postgres&lt;/span&gt;
    &lt;span class="na"&gt;ports&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;5432:5432"&lt;/span&gt;
    &lt;span class="na"&gt;command&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;postgres -c wal_level=logical -c max_wal_senders=10 -c max_replication_slots=10&lt;/span&gt;
    &lt;span class="na"&gt;environment&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="na"&gt;POSTGRES_USER&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;dbz&lt;/span&gt;
      &lt;span class="na"&gt;POSTGRES_PASSWORD&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;dbz&lt;/span&gt;
      &lt;span class="na"&gt;POSTGRES_DB&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;cap_stock_db&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Critical setting:&lt;/strong&gt; &lt;code&gt;wal_level=logical&lt;/code&gt; enables Logical Replication, which is required for Debezium CDC to function.&lt;/p&gt;

&lt;h3&gt;
  
  
  3. Debezium PostgreSQL Connector: The CDC Engine
&lt;/h3&gt;

&lt;p&gt;Debezium is the change data capture engine that transforms PostgreSQL into an event publisher.&lt;/p&gt;

&lt;h4&gt;
  
  
  How CDC Works
&lt;/h4&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;Logical Replication Slot&lt;/strong&gt; — Debezium creates a slot that tracks WAL position&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;WAL Scanning&lt;/strong&gt; — Reads Write-Ahead Log sequentially&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Event Decoding&lt;/strong&gt; — Converts log entries to JSON change events&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Kafka Publishing&lt;/strong&gt; — Sends events to topics matching table names&lt;/li&gt;
&lt;/ol&gt;

&lt;h4&gt;
  
  
  Docker setup
&lt;/h4&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight yaml"&gt;&lt;code&gt;    &lt;span class="na"&gt;connect&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="na"&gt;image&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;quay.io/debezium/connect:3.1&lt;/span&gt;
    &lt;span class="na"&gt;container_name&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;connect&lt;/span&gt;
    &lt;span class="na"&gt;depends_on&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="pi"&gt;[&lt;/span&gt;&lt;span class="nv"&gt;kafka&lt;/span&gt;&lt;span class="pi"&gt;]&lt;/span&gt;
    &lt;span class="na"&gt;ports&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="pi"&gt;[&lt;/span&gt;&lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;8083:8083"&lt;/span&gt;&lt;span class="pi"&gt;]&lt;/span&gt;
    &lt;span class="na"&gt;environment&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="c1"&gt;#Bootstrap&lt;/span&gt;
      &lt;span class="na"&gt;BOOTSTRAP_SERVERS&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;kafka:9092&lt;/span&gt;
      &lt;span class="na"&gt;GROUP_ID&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="m"&gt;1&lt;/span&gt;
      &lt;span class="na"&gt;CONFIG_STORAGE_TOPIC&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;connect_configs&lt;/span&gt;
      &lt;span class="na"&gt;OFFSET_STORAGE_TOPIC&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;connect_offsets&lt;/span&gt;
      &lt;span class="na"&gt;STATUS_STORAGE_TOPIC&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;connect_statuses&lt;/span&gt;
      &lt;span class="na"&gt;HOST_NAME&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;connect"&lt;/span&gt;
      &lt;span class="na"&gt;ADVERTISED_HOST_NAME&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;connect"&lt;/span&gt;
      &lt;span class="na"&gt;ADVERTISED_PORT&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;8083"&lt;/span&gt;
      &lt;span class="na"&gt;KAFKA_CONNECT_PLUGIN_PATH&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;/kafka/connect,/kafka/plugins,/kafka/plugins/kafka-connect-cassandra-sink-1.7.3,/debezium/connect&lt;/span&gt;

    &lt;span class="na"&gt;volumes&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;./plugins/kafka-connect-cassandra-sink-1.7.3:/kafka/connect/kafka-connect-cassandra-sink-1.7.3&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h4&gt;
  
  
  Connector Configuration
&lt;/h4&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"name"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"cap-stock-connector"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"config"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"connector.class"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"io.debezium.connector.postgresql.PostgresConnector"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"database.hostname"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"postgres"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"database.port"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"5432"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"database.user"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"dbz"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"database.password"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"dbz"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"database.dbname"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"cap_stock_db"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"topic.prefix"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"cap_stock"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"slot.name"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"cap_stock_slot"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"publication.autocreate.mode"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"filtered"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"table.include.list"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"public.kline_data, public.order_book, public.recent_trades, public.top_24hr"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"schema.history.internal.kafka.bootstrap.servers"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"kafka:9092"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"schema.history.internal.kafka.topic"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"schema-changes.cap_stock_db"&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  4. Apache Kafka: The Event Streaming Backbone
&lt;/h3&gt;

&lt;p&gt;Kafka acts as the central event hub, decoupling data producers (PostgreSQL via Debezium) from consumers (Cassandra Sink Connector).&lt;/p&gt;

&lt;h4&gt;
  
  
  Kafka Setup
&lt;/h4&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight yaml"&gt;&lt;code&gt;  &lt;span class="na"&gt;zookeeper&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="na"&gt;image&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;quay.io/debezium/zookeeper:3.1&lt;/span&gt;
    &lt;span class="na"&gt;container_name&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;zookeeper&lt;/span&gt;
    &lt;span class="na"&gt;ports&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="pi"&gt;[&lt;/span&gt;&lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;2181:2181"&lt;/span&gt;&lt;span class="pi"&gt;]&lt;/span&gt;

  &lt;span class="na"&gt;kafka&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="na"&gt;image&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;quay.io/debezium/kafka:3.1&lt;/span&gt;
    &lt;span class="na"&gt;container_name&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;kafka&lt;/span&gt;
    &lt;span class="na"&gt;depends_on&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="pi"&gt;[&lt;/span&gt;&lt;span class="nv"&gt;zookeeper&lt;/span&gt;&lt;span class="pi"&gt;]&lt;/span&gt;
    &lt;span class="na"&gt;ports&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="pi"&gt;[&lt;/span&gt;&lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;29092:29092"&lt;/span&gt;&lt;span class="pi"&gt;]&lt;/span&gt;
    &lt;span class="na"&gt;environment&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="na"&gt;KAFKA_BROKER_ID&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="m"&gt;1&lt;/span&gt;
      &lt;span class="na"&gt;ZOOKEEPER_CONNECT&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;zookeeper:2181&lt;/span&gt;
      &lt;span class="na"&gt;KAFKA_LISTENERS&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:29092&lt;/span&gt;
      &lt;span class="na"&gt;KAFKA_ADVERTISED_LISTENERS&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;INTERNAL://kafka:9092,EXTERNAL://localhost:29092&lt;/span&gt;
      &lt;span class="na"&gt;KAFKA_LISTENER_SECURITY_PROTOCOL_MAP&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT&lt;/span&gt;
      &lt;span class="na"&gt;KAFKA_INTER_BROKER_LISTENER_NAME&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;INTERNAL&lt;/span&gt;
      &lt;span class="na"&gt;KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="m"&gt;1&lt;/span&gt;

  &lt;span class="na"&gt;akhq&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="na"&gt;image&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;tchiotludo/akhq:latest&lt;/span&gt;
    &lt;span class="na"&gt;ports&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="pi"&gt;[&lt;/span&gt;&lt;span class="nv"&gt;8080&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;&lt;span class="nv"&gt;8080&lt;/span&gt;&lt;span class="pi"&gt;]&lt;/span&gt;
    &lt;span class="na"&gt;environment&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="na"&gt;AKHQ_CONFIGURATION&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="pi"&gt;|&lt;/span&gt;
        &lt;span class="s"&gt;akhq:&lt;/span&gt;
          &lt;span class="s"&gt;connections:&lt;/span&gt;
            &lt;span class="s"&gt;local:&lt;/span&gt;
              &lt;span class="s"&gt;properties:&lt;/span&gt;
                &lt;span class="s"&gt;bootstrap.servers: "kafka:9092"&lt;/span&gt;
              &lt;span class="s"&gt;connect:&lt;/span&gt;
                &lt;span class="s"&gt;- name: "connect"&lt;/span&gt;
                  &lt;span class="s"&gt;url: "http://connect:8083"&lt;/span&gt;
    &lt;span class="na"&gt;depends_on&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="pi"&gt;[&lt;/span&gt;&lt;span class="nv"&gt;kafka&lt;/span&gt;&lt;span class="pi"&gt;,&lt;/span&gt; &lt;span class="nv"&gt;connect&lt;/span&gt;&lt;span class="pi"&gt;]&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Four topics are created:&lt;/p&gt;

&lt;div class="table-wrapper-paragraph"&gt;&lt;table&gt;
&lt;thead&gt;
&lt;tr&gt;
&lt;th&gt;Topic&lt;/th&gt;
&lt;th&gt;Partitions&lt;/th&gt;
&lt;th&gt;Retention&lt;/th&gt;
&lt;th&gt;Purpose&lt;/th&gt;
&lt;/tr&gt;
&lt;/thead&gt;
&lt;tbody&gt;
&lt;tr&gt;
&lt;td&gt;&lt;code&gt;cap_stock.public.kline_data&lt;/code&gt;&lt;/td&gt;
&lt;td&gt;5&lt;/td&gt;
&lt;td&gt;7 days&lt;/td&gt;
&lt;td&gt;Candlestick events&lt;/td&gt;
&lt;/tr&gt;
&lt;tr&gt;
&lt;td&gt;&lt;code&gt;cap_stock.public.recent_trades&lt;/code&gt;&lt;/td&gt;
&lt;td&gt;5&lt;/td&gt;
&lt;td&gt;7 days&lt;/td&gt;
&lt;td&gt;Trade events&lt;/td&gt;
&lt;/tr&gt;
&lt;tr&gt;
&lt;td&gt;&lt;code&gt;cap_stock.public.order_book&lt;/code&gt;&lt;/td&gt;
&lt;td&gt;5&lt;/td&gt;
&lt;td&gt;7 days&lt;/td&gt;
&lt;td&gt;Order book snapshots&lt;/td&gt;
&lt;/tr&gt;
&lt;tr&gt;
&lt;td&gt;&lt;code&gt;cap_stock.public.top_24hr&lt;/code&gt;&lt;/td&gt;
&lt;td&gt;5&lt;/td&gt;
&lt;td&gt;7 days&lt;/td&gt;
&lt;td&gt;24hr statistics&lt;/td&gt;
&lt;/tr&gt;
&lt;/tbody&gt;
&lt;/table&gt;&lt;/div&gt;

&lt;p&gt;To verify, navigate to &lt;code&gt;localhost:8080&lt;/code&gt;. Here you'll the kafka ui running with all the info you might need &lt;/p&gt;

&lt;h3&gt;
  
  
  5. Cassandra Database: The Analytical Store
&lt;/h3&gt;

&lt;p&gt;Cassandra is the analytics powerhouse for our dashboard, purpose-built for time-series analytics with massive write throughput and fast range queries.&lt;br&gt;
But before you do anything, don't forget to set up the database first, lest you want to spend hours debugging, why your queries work on the terminal but nowhere else &lt;/p&gt;
&lt;h4&gt;
  
  
  Cassandra Initialization
&lt;/h4&gt;


&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;setup_cassandra&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;

    &lt;span class="n"&gt;logging&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;basicConfig&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="n"&gt;level&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;logging&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;INFO&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="nb"&gt;format&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;%(asctime)s|%(levelname)s|%(name)s|%(message)s&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;handlers&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;
            &lt;span class="n"&gt;logging&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nc"&gt;FileHandler&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;cassandra_setup&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;datetime&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;now&lt;/span&gt;&lt;span class="p"&gt;().&lt;/span&gt;&lt;span class="nf"&gt;strftime&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;%Y%m%d&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;.log&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
            &lt;span class="n"&gt;logging&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nc"&gt;StreamHandler&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
        &lt;span class="p"&gt;]&lt;/span&gt;
    &lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="nf"&gt;load_dotenv&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

    &lt;span class="n"&gt;logging&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;info&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;Creating the connection ...&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="c1"&gt;# connecting to cassandra 
&lt;/span&gt;    &lt;span class="n"&gt;cluster&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;Cluster&lt;/span&gt;&lt;span class="p"&gt;([&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;cassandra&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt;
    &lt;span class="n"&gt;session&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;cluster&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;connect&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

    &lt;span class="n"&gt;logging&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;info&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;Connected ...&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;logging&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;info&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;Creating the keyspace cap_stock ...&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="c1"&gt;# create the key space 
&lt;/span&gt;    &lt;span class="n"&gt;create_keyspace_query&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;
    CREATE KEYSPACE IF NOT EXISTS cap_stock WITH REPLICATION = {&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;class&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt; : &lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;SimpleStrategy&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;, &lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;replication_factor&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt; : 1 };
    &lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;
    &lt;span class="n"&gt;session&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;execute&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;create_keyspace_query&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="n"&gt;use_keyspace_query&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;
    USE cap_stock;
    &lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;
    &lt;span class="n"&gt;session&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;execute&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;use_keyspace_query&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="n"&gt;logging&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;info&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;using the created keyspace ...&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="n"&gt;logging&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;info&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;Creating the tables&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="n"&gt;create_kline_query&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;
    CREATE TABLE IF NOT EXISTS cap_stock.kline_data (
    symbol              text,
    k_open_time         timestamp,
    k_close_time        timestamp,
    open                double,
    high                double,
    low                 double,
    close               double,
    volume              double,
    quote_asset_volume  double,
    number_of_trades    int,
    tb_base_volume      double,
    tb_quote_volume     double,
    ranking             int,
    time_collected      timestamp,
    PRIMARY KEY ((symbol), k_open_time)
    ) WITH CLUSTERING ORDER BY (k_open_time DESC)
    AND compaction = {
        &lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;class&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;: &lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;TimeWindowCompactionStrategy&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;,
        &lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;compaction_window_unit&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;: &lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;DAYS&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;,
        &lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;compaction_window_size&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;: &lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;1&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;
    };

    &lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;
    &lt;span class="n"&gt;create_OB_query&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt;   &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;
    CREATE TABLE IF NOT EXISTS cap_stock.order_book (
    symbol          text,
    side            text,       -- &lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;bids&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt; or &lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;asks&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;
    ranking         int,        -- rank of the crypto in top performers
    price           double,
    quantity        double,
    time_collected  timestamp,  -- when this snapshot was taken
    PRIMARY KEY ((symbol, side, time_collected), price)
    ) WITH CLUSTERING ORDER BY (price DESC);

    &lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;

    &lt;span class="n"&gt;create_recent_trades_query&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;
    CREATE TABLE IF NOT EXISTS cap_stock.recent_trades(
    symbol          text,
    trade_time      timestamp,  
    trade_id        bigint,      
    price           double,
    qty             double,
    quote_qty       double,
    is_buyer_maker  boolean,
    is_best_match   boolean,
    ranking         int,
    time_collected  timestamp,  
    PRIMARY KEY ((symbol), trade_time, trade_id)
    ) WITH CLUSTERING ORDER BY (trade_time DESC, trade_id DESC)
    AND compaction = {
        &lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;class&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;: &lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;TimeWindowCompactionStrategy&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;,
        &lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;compaction_window_unit&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;: &lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;HOURS&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;,
        &lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;compaction_window_size&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;: &lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;1&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;
    };
    &lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;

    &lt;span class="n"&gt;create_top_24hrs&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;
    CREATE TABLE IF NOT EXISTS cap_stock.top_24hr (
    symbol                 text,
    time_collected         timestamp,    

    price_change           double,
    price_change_percent   double,
    weighted_avg_price     double,
    prev_close_price       double,
    last_price             double,
    last_qty               double,
    bid_price              double,
    bid_qty                double,
    ask_price              double,
    ask_qty                double,
    open_price             double,
    high_price             double,
    low_price              double,
    volume                 double,
    quote_volume           double,

    open_time              timestamp,    
    close_time             timestamp,    
    first_id               bigint,
    last_id                bigint,

    PRIMARY KEY ((symbol), time_collected)
    ) WITH CLUSTERING ORDER BY (time_collected DESC)
    AND compaction = {
        &lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;class&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;: &lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;TimeWindowCompactionStrategy&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;,
        &lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;compaction_window_unit&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;: &lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;HOURS&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;,
        &lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;compaction_window_size&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;: &lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;1&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;
    };
    &lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;
    &lt;span class="n"&gt;session&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;execute&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;create_top_24hrs&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;session&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;execute&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;create_recent_trades_query&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;session&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;execute&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;create_OB_query&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;session&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;execute&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;create_kline_query&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;h3&gt;
  
  
  6. Kafka Connect (The same DBZ connector repurposed)
&lt;/h3&gt;

&lt;p&gt;Kafka Connect is the middleware that runs connectors to move data between systems.&lt;/p&gt;
&lt;h4&gt;
  
  
  Cassandra Sink Connector Configuration
&lt;/h4&gt;


&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="err"&gt;//&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="err"&gt;register_cassandra.json&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"name"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"cassandra_sink"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"config"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"connector.class"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"com.datastax.oss.kafka.sink.CassandraSinkConnector"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"tasks.max"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"1"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;

    &lt;/span&gt;&lt;span class="nl"&gt;"topics"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"cap_stock.public.kline_data, cap_stock.public.recent_trades ,cap_stock.public.top_24hr, cap_stock.public.order_book "&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;

    &lt;/span&gt;&lt;span class="nl"&gt;"contactPoints"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"cassandra"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"loadBalancing.localDc"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"datacenter1"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;


    &lt;/span&gt;&lt;span class="nl"&gt;"topic.cap_stock.public.kline_data.cap_stock.kline_data.mapping"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"symbol=value.symbol,k_open_time=value.k_open_time,k_close_time=value.k_close_time,open=value.open,high=value.high,low=value.low,close=value.close,volume=value.volume,quote_asset_volume=value.quote_asset_volume,number_of_trades=value.number_of_trades,tb_base_volume=value.tb_base_volume,tb_quote_volume=value.tb_quote_volume,ranking=value.ranking,time_collected=value.time_collected"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;

    &lt;/span&gt;&lt;span class="nl"&gt;"topic.cap_stock.public.recent_trades.cap_stock.recent_trades.mapping"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s2"&gt;"symbol=value.symbol,trade_time=value.time,trade_id=value.id,price=value.price,qty=value.qty,quote_qty=value.quote_qty,is_buyer_maker=value.is_buyer_maker,is_best_match=value.is_best_match,ranking=value.ranking,time_collected=value.time_collected"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;

    &lt;/span&gt;&lt;span class="nl"&gt;"topic.cap_stock.public.top_24hr.cap_stock.top_24hr.mapping"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s2"&gt;"symbol=value.symbol, time_collected=value.time_collected,price_change=value.price_change,price_change_percent=value.price_change_percent,weighted_avg_price=value.weighted_avg_price,prev_close_price=value.prev_close_price,last_price=value.last_price,last_qty=value.last_qty,bid_price=value.bid_price,bid_qty=value.bid_qty,ask_price=value.ask_price,ask_qty=value.ask_qty,open_price=value.open_price,high_price=value.high_price,low_price=value.low_price,volume=value.volume,quote_volume=value.quote_volume,open_time=value.open_time,close_time=value.close_time,first_id=value.first_id,last_id=value.last_id"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;

    &lt;/span&gt;&lt;span class="nl"&gt;"topic.cap_stock.public.order_book.cap_stock.order_book.mapping"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"symbol=value.symbol, side=value.side, ranking=value.ranking, price=value.price, quantity=value.quantity, time_collected=value.time_collected"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;


    &lt;/span&gt;&lt;span class="nl"&gt;"key.converter"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s2"&gt;"org.apache.kafka.connect.storage.StringConverter"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"transforms"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s2"&gt;"unwrap"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"transforms.unwrap.type"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s2"&gt;"io.debezium.transforms.ExtractNewRecordState"&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;h4&gt;
  
  
  Connector Registration
&lt;/h4&gt;

&lt;p&gt;This is a bash script that basically initiates the connections using the configuration files to ensure that Debezium is listening for changes to record to Kafka, and Kafka Connect is listening for changes to record to Cassandra&lt;/p&gt;

&lt;p&gt;Verify the status of the connection at &lt;code&gt;localhost:8080&lt;/code&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="c"&gt;#!/bin/bash&lt;/span&gt;

&lt;span class="c"&gt;# Register Postgres and Cassandra connectors to Kafka Connect&lt;/span&gt;

curl &lt;span class="nt"&gt;-X&lt;/span&gt; POST http://localhost:8083/connectors &lt;span class="se"&gt;\&lt;/span&gt;
  &lt;span class="nt"&gt;-H&lt;/span&gt; &lt;span class="s2"&gt;"Content-Type: application/json"&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
  &lt;span class="nt"&gt;-d&lt;/span&gt; @register-postgres.json

&lt;span class="nb"&gt;echo&lt;/span&gt; &lt;span class="s2"&gt;"✓ Postgres connector registered"&lt;/span&gt;

curl &lt;span class="nt"&gt;-X&lt;/span&gt; POST http://localhost:8083/connectors &lt;span class="se"&gt;\&lt;/span&gt;
  &lt;span class="nt"&gt;-H&lt;/span&gt; &lt;span class="s2"&gt;"Content-Type: application/json"&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
  &lt;span class="nt"&gt;-d&lt;/span&gt; @register_cassandra.json

&lt;span class="nb"&gt;echo&lt;/span&gt; &lt;span class="s2"&gt;"✓ Cassandra connector registered"&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  7. Grafana: Real-Time Visualisation
&lt;/h3&gt;

&lt;p&gt;Grafana provides the user-facing dashboards that display market data in real-time.&lt;/p&gt;

&lt;h4&gt;
  
  
  Grafana Configuration
&lt;/h4&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight yaml"&gt;&lt;code&gt;  &lt;span class="na"&gt;grafana&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="na"&gt;image&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;grafana/grafana:main-ubuntu&lt;/span&gt;
    &lt;span class="na"&gt;container_name&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;grafana&lt;/span&gt;
    &lt;span class="na"&gt;environment&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="na"&gt;GF_SECURITY_ADMIN_USER&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;admin&lt;/span&gt;
      &lt;span class="na"&gt;GF_SECURITY_ADMIN_PASSWORD&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;admin&lt;/span&gt;
    &lt;span class="na"&gt;ports&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;3001:3000"&lt;/span&gt;
    &lt;span class="na"&gt;volumes&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;./grafana_data:/var/lib/grafana&lt;/span&gt;
    &lt;span class="na"&gt;depends_on&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;postgres&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Once you get Grafana running, create a new dashboard, navigate to settings, add a variable symbol, then add panels to create visualisations with the sample queries below&lt;br&gt;
This will allow you to switch between different crypto symbols like so&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F2dgjrf6at3slm2ebc8nk.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F2dgjrf6at3slm2ebc8nk.png" alt=" " width="800" height="28"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h4&gt;
  
  
  Sample Dashboard Queries
&lt;/h4&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="c1"&gt;--- Klines&lt;/span&gt;
&lt;span class="k"&gt;SELECT&lt;/span&gt; &lt;span class="n"&gt;symbol&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="n"&gt;k_open_time&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt; &lt;span class="nb"&gt;time&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="k"&gt;open&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="n"&gt;high&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="n"&gt;low&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="k"&gt;close&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="n"&gt;volume&lt;/span&gt;
&lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="n"&gt;cap_stock&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;kline_data&lt;/span&gt;
&lt;span class="k"&gt;WHERE&lt;/span&gt; &lt;span class="n"&gt;symbol&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'$symbol'&lt;/span&gt;
  &lt;span class="k"&gt;AND&lt;/span&gt; &lt;span class="n"&gt;k_open_time&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;=&lt;/span&gt; &lt;span class="err"&gt;$&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="n"&gt;__from&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;
  &lt;span class="k"&gt;AND&lt;/span&gt; &lt;span class="n"&gt;k_open_time&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;=&lt;/span&gt; &lt;span class="err"&gt;$&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="n"&gt;__to&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;


&lt;span class="c1"&gt;--- Winners at the moment &lt;/span&gt;
&lt;span class="k"&gt;SELECT&lt;/span&gt;  &lt;span class="n"&gt;symbol&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
       &lt;span class="n"&gt;price_change_percent&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
       &lt;span class="n"&gt;volume&lt;/span&gt;
&lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="n"&gt;cap_stock&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;top_24hr&lt;/span&gt; 
&lt;span class="k"&gt;WHERE&lt;/span&gt; &lt;span class="n"&gt;time_collected&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;=&lt;/span&gt; &lt;span class="err"&gt;$&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="n"&gt;__from&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;
  &lt;span class="k"&gt;AND&lt;/span&gt; &lt;span class="n"&gt;time_collected&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;=&lt;/span&gt; &lt;span class="err"&gt;$&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="n"&gt;__to&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="n"&gt;ALLOW&lt;/span&gt; &lt;span class="n"&gt;FILTERING&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Results
&lt;/h2&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F366bc66r8v0oxlaztjg3.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F366bc66r8v0oxlaztjg3.png" alt=" " width="800" height="277"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fmbs86ft5vf8f39rw2iii.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fmbs86ft5vf8f39rw2iii.png" alt=" " width="800" height="415"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fr38ak2b3439or0ojncvv.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fr38ak2b3439or0ojncvv.png" alt=" " width="800" height="415"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Conclusion: A Data Engineering Success
&lt;/h2&gt;

&lt;p&gt;The Binance Top Crypto Pipeline successfully implements a sophisticated, resilient, and performant data architecture. By strategically combining a relational anchor (PostgreSQL), an immutable log (Kafka), and a distributed analytical store (Cassandra) using Change Data Capture (Debezium). &lt;/p&gt;

</description>
      <category>python</category>
      <category>postgres</category>
      <category>dataengineering</category>
      <category>architecture</category>
    </item>
    <item>
      <title>Understanding Kafka Consumer Lag: Causes, Risks, and How to Fix It</title>
      <dc:creator>Eric Kahindi</dc:creator>
      <pubDate>Mon, 10 Nov 2025 18:38:54 +0000</pubDate>
      <link>https://forem.com/eric_kahindi_cfbfda3bd0f7/understanding-kafka-consumer-lag-causes-risks-and-how-to-fix-it-33e0</link>
      <guid>https://forem.com/eric_kahindi_cfbfda3bd0f7/understanding-kafka-consumer-lag-causes-risks-and-how-to-fix-it-33e0</guid>
      <description>&lt;p&gt;Apache Kafka has become one of the most widely adopted distributed streaming platforms in modern event-driven architectures. At its core, Kafka acts as a highly durable, scalable message queuing system that supports real-time data pipelines, event streaming, and system decoupling. It enables producers to continuously publish messages into topics, while consumers read those messages at their own pace.&lt;/p&gt;

&lt;p&gt;Despite its distributed efficiency and fault tolerance, Kafka does not come without challenges. One of the most common performance bottlenecks encountered in Kafka systems is consumer lag.&lt;/p&gt;

&lt;h1&gt;
  
  
  What is Consumer Lag?
&lt;/h1&gt;

&lt;p&gt;Consumer lag occurs when the consumer is reading messages slower than the producer is writing them.&lt;/p&gt;

&lt;p&gt;In Kafka, each message is stored in a partition and assigned an offset (a sequential ID).&lt;br&gt;
This means at any time:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Latest produced offset&lt;/strong&gt; - The most recent message written to a partition by the producer&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Latest Consumer offset&lt;/strong&gt; - The latest message read and committed by a consumer&lt;/p&gt;

&lt;p&gt;Consumer Lag = Latest Produced Offset − Latest Consumer Offset&lt;/p&gt;

&lt;p&gt;If left unresolved, lag accumulates, delaying downstream processing, analytics, notifications, and system reactions.&lt;/p&gt;

&lt;p&gt;This could be detrimental, especially in safety-critical systems that rely on reliable, accurate, and timely messaging.&lt;/p&gt;

&lt;p&gt;Imagine sitting in a self-driving car and turning left, getting directions to turn left after you pass the turn.  &lt;/p&gt;

&lt;h1&gt;
  
  
  Why Consumer Lag Happens and how to stop it.
&lt;/h1&gt;

&lt;h2&gt;
  
  
  Sudden Traffic Spikes
&lt;/h2&gt;

&lt;p&gt;An abrupt increase in message production, such as from a viral event or sensor data surge, can overwhelm consumers if the system isn't scaled well. For instance, IoT applications might experience this during peak hours.&lt;br&gt;
&lt;strong&gt;Symptom&lt;/strong&gt; - Rapid rise in log-end offsets (uncommitted offsets)&lt;br&gt;
&lt;strong&gt;Mitigation&lt;/strong&gt; - Auto-scale consumers or use elastic resources like Kubernetes autoscaling. &lt;/p&gt;

&lt;h2&gt;
  
  
  Partition Imbalances and Skew
&lt;/h2&gt;

&lt;p&gt;Usually, having more consumers is a good thing because of parallelism, but this is only if this is followed through by having multiple partitions. Without proper partitioning, this ironically becomes a problem as it increases the overhead of the Kafka broker when passing on messages to consumers &lt;br&gt;
&lt;strong&gt;Mitigation&lt;/strong&gt; - This is simply considering how many partitions you apply when having multiple consumers &lt;/p&gt;

&lt;h2&gt;
  
  
  Slow Consumer Processing
&lt;/h2&gt;

&lt;p&gt;Inefficient code within consumers, such as waiting for external APIs, complex transformations, or bugs causing retries, slows down message handling. If processing logic isn't optimized (e.g., breaking tasks into unnecessary steps), idle time accumulates.&lt;br&gt;
&lt;strong&gt;Symptom&lt;/strong&gt; - Prolonged processing times per message&lt;br&gt;
&lt;strong&gt;Mitigation&lt;/strong&gt; - write better code :), implement asynchronous processing into the code, allowing multiple messages to be processed at the same time&lt;/p&gt;

&lt;h2&gt;
  
  
  Resource Constraints
&lt;/h2&gt;

&lt;p&gt;Insufficient CPU, memory, or network bandwidth on consumer hosts can bottleneck performance. This doesn't only apply to local machines; Containerized environments with misconfigured limits exacerbate this.&lt;br&gt;
&lt;strong&gt;Symptom&lt;/strong&gt; - High system utilization metrics&lt;br&gt;
&lt;strong&gt;Mitigation&lt;/strong&gt; - Increase allocations; monitor with tools like Prometheus for CPU/memory usage, then scale up resources accordingly if need be &lt;/p&gt;

&lt;h2&gt;
  
  
  Configuration Issues
&lt;/h2&gt;

&lt;p&gt;Suboptimal settings, such as small fetch sizes or improper offset management (e.g., auto-commit enabled without careful tuning), can lead to frequent but small polls, reducing throughput.&lt;br&gt;
&lt;strong&gt;Symptoms&lt;/strong&gt; - Frequent small fetches, commit failures&lt;br&gt;
&lt;strong&gt;Mitigation&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Consumer Side: Increase &lt;code&gt;fetch.max.bytes&lt;/code&gt; and &lt;code&gt;max.partition.fetch.bytes&lt;/code&gt; to fetch larger batches, reducing poll frequency. Adjust &lt;code&gt;fetch.max.wait.ms&lt;/code&gt; to wait longer for data if needed. Use manual offset commits for better control.&lt;/li&gt;
&lt;li&gt;Producer Side: Employ balanced partitioners like &lt;code&gt;RoundRobinPartitioner&lt;/code&gt; to distribute messages evenly. Reduce &lt;code&gt;batch.size&lt;/code&gt; to avoid overwhelming consumers with large bursts.&lt;/li&gt;
&lt;li&gt;Broker Side: Tune &lt;code&gt;num.network.threads&lt;/code&gt; and &lt;code&gt;num.io.threads&lt;/code&gt; for better request handling. Set &lt;code&gt;group.initial.rebalance.delay.ms&lt;/code&gt; to minimize unnecessary rebalances.&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Partition Rebalancing
&lt;/h2&gt;

&lt;p&gt;When consumers join or leave a group, Kafka reassigns partitions, temporarily halting processing and spiking lag. Frequent rebalances due to unstable consumers amplify this.&lt;br&gt;
&lt;strong&gt;Mitigation&lt;/strong&gt; - Use sticky assignors; stabilize consumer instances to reduce churn.&lt;/p&gt;

&lt;h1&gt;
  
  
  Conclusion
&lt;/h1&gt;

&lt;p&gt;Kafka enables real-time streaming at massive scale — but real-time only holds true if consumers can keep up. Consumer lag is not a system failure; it is a signal that the pipeline has hit a scaling or processing bottleneck.&lt;/p&gt;

&lt;p&gt;By monitoring offsets, scaling consumers, optimizing workload, and tuning Kafka configurations, you can transform lagging systems into high-performance streaming pipelines with predictable throughput.&lt;/p&gt;

</description>
      <category>systemdesign</category>
      <category>performance</category>
      <category>dataengineering</category>
      <category>monitoring</category>
    </item>
    <item>
      <title>Containerization for Data Engineering: A Practical Guide with Docker and Docker Compose</title>
      <dc:creator>Eric Kahindi</dc:creator>
      <pubDate>Mon, 13 Oct 2025 08:52:51 +0000</pubDate>
      <link>https://forem.com/eric_kahindi_cfbfda3bd0f7/containerization-for-data-engineering-a-practical-guide-with-docker-and-docker-compose-3fb4</link>
      <guid>https://forem.com/eric_kahindi_cfbfda3bd0f7/containerization-for-data-engineering-a-practical-guide-with-docker-and-docker-compose-3fb4</guid>
      <description>&lt;h1&gt;
  
  
  Introduction
&lt;/h1&gt;

&lt;p&gt;Docker is a very useful tool, not just for data engineers but for all developers alike. But before we get to it, let's try to understand the concept it was built on &lt;/p&gt;

&lt;h2&gt;
  
  
  What is Containerisation?
&lt;/h2&gt;

&lt;ul&gt;
&lt;li&gt;This is essentially packaging a piece of software alongside all its dependencies, including environment variables, and even an operating system, and running it as its own isolated instance, separate from your machine. &lt;/li&gt;
&lt;li&gt;It's decoupling to the max, ensuring maximum portability for your application&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Why containerise in data engineering
&lt;/h2&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;Simplified Dependancy Management&lt;/strong&gt; - A data workflow usually depends on various Python libraries, JVM versions, and certain tools' versions. A container can encapsulate all this into a single image, eliminating the need to set up all these components manually.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Scalability for Data pipelines&lt;/strong&gt; - Containerisation allows data pipelines to be scaled up dynamically when used with tools like Kubernetes. This can be done individually for each stage of the data pipeline or for the whole thing &lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Portability and easy integration with Cloud&lt;/strong&gt; - Containers are running instances of a data pipeline completely decoupled from the machine running them, which means that they can run on any machine without that annoying, "It works on my machine", phrase. This means that it can easily be deployed on any cloud environment as well &lt;/li&gt;
&lt;/ul&gt;

&lt;h1&gt;
  
  
  Getting Started with Docker
&lt;/h1&gt;

&lt;p&gt;OK, so here's a basic rundown of everything you need to know before we get our hands dirty.&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;Docker&lt;/strong&gt; is one of the software that's used to perform containerisation. &lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Docker Engine&lt;/strong&gt; is the part of Docker that lives on your machine; it accesses your file system and applications and converts them into images. It also facilitates the running of these containers.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Docker Hub&lt;/strong&gt; is an online registry of container images. It's where people can share the images they created for others to reuse, similar to GitHub&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Volumes&lt;/strong&gt; are a way to ensure that the storage of a running container is persisted beyond the life span of the container. It's like mapping a directory on your local machine to one on the container, such that changes made in that directory happen on both sides. Docker simply picks up where it left off when the container is restarted&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;A Docker File&lt;/strong&gt; - is a simple text file we use to give docker instructions on how to build the image for the container.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;An Image&lt;/strong&gt; is a snapshot of your application and its dependencies that can be called to create a Docker container, similar to an operating system image or a blueprint for a container&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Caches&lt;/strong&gt; Docker images are built layer after layer; the changes between layers and the original/base layers are stored locally so that they can be reused again when rebuilding a container. This means that the initial build might take a long time, but subsequent builds are lightning fast.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;A Container&lt;/strong&gt; is a running instance of the image. Docker uses an image to build and run a container.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Remember thats &lt;br&gt;
&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Frrkn663s4adz357o6qdg.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Frrkn663s4adz357o6qdg.png" alt=" " width="663" height="218"&gt;&lt;/a&gt;&lt;br&gt;
We only have to worry about creating the Docker file, then we build the image and run the container using Docker commands&lt;/p&gt;

&lt;p&gt;To install Docker, you can follow this &lt;a href="https://docs.docker.com/engine/install/ubuntu/" rel="noopener noreferrer"&gt;article&lt;/a&gt; on their official website, or you can follow this &lt;a href="https://www.digitalocean.com/community/tutorials/how-to-install-and-use-docker-on-ubuntu-20-04" rel="noopener noreferrer"&gt;Digital Ocean article&lt;/a&gt;.&lt;/p&gt;
&lt;h2&gt;
  
  
  Running a Data Processing Script
&lt;/h2&gt;

&lt;p&gt;To understand the rest of Docker, we'll take a practical approach and learn as we go. &lt;br&gt;
The goal is to create a simple data processing script that loads data into a pandas data frame, drops a column, and loads the result to a new, cleaned CSV.&lt;br&gt;
We'll also use docker volumes to persist the data &lt;/p&gt;

&lt;p&gt;Create a home folder for Docker and then move into it&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;mkdir docker
cd docker 
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;In here, you can clone a repository I made on GitHub for this purpose&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;git clone https://github.com/kazeric/docker_for_dataengineering
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This should install a folder with the following files &lt;br&gt;
&lt;strong&gt;app.py file&lt;/strong&gt; contains the code for the app you want to containerise. I&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fanuvji68fzrhoagd04c9.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fanuvji68fzrhoagd04c9.png" alt=" " width="800" height="463"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;cars.csv file&lt;/strong&gt; the data to be transformed &lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F6zlkjxba1ln7ynwhj9i1.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F6zlkjxba1ln7ynwhj9i1.png" alt=" " width="800" height="463"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Docker file&lt;/strong&gt; contains the instructions for Docker to create the image &lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fvnreaalq250pn27asfq2.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fvnreaalq250pn27asfq2.png" alt=" " width="800" height="264"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Here is a quick rundown of what it's doing &lt;br&gt;
&lt;code&gt;FROM python:3.10&lt;/code&gt; - this pulls a base Docker image from Docker Hub. &lt;em&gt;You can search "python:3.10 docker" and you'll find it in Docker Hub.&lt;/em&gt; Anything after this line modifies the base image, the specifications we need to run our application &lt;/p&gt;

&lt;p&gt;&lt;code&gt;COPY requirements.txt .&lt;/code&gt; - this line copies the file requirements.txt to "." which is the current working directory&lt;/p&gt;

&lt;p&gt;&lt;code&gt;RUN pip install -r requirements.txt&lt;/code&gt; - this line tells the container to run the pip command in the terminal&lt;/p&gt;

&lt;p&gt;The rest are well explained by the comments&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Note&lt;/strong&gt; &lt;br&gt;
The convention for Docker is that whenever there is a mapping, it's usually local to the container, eg, in ports &lt;code&gt;5432:5432&lt;/code&gt;, in commands like &lt;code&gt;COPY . .&lt;/code&gt;, in volumes &lt;code&gt;./data : opt/var/grafana/data&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;Now that we have it set up, let's run the container &lt;br&gt;
Build the image&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;docker build -t my_app .
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F9227h49zowdhs7909lmi.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F9227h49zowdhs7909lmi.png" alt=" " width="800" height="255"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Run the container&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;docker run my_app
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F3hni6n51r4yjjryedjg4.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F3hni6n51r4yjjryedjg4.png" alt=" " width="800" height="186"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Volumes
&lt;/h2&gt;

&lt;p&gt;Now, let's explore the new CSV created in Docker volumes &lt;/p&gt;

&lt;p&gt;Find the specific container built from the image&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;docker ps -a
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Foqvfx6rsvtwb1hdhq8o7.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Foqvfx6rsvtwb1hdhq8o7.png" alt=" " width="800" height="29"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Get the container ID, use it to copy the container data to the host machine&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;docker cp &amp;lt;container_id&amp;gt;:/app/data/cleaned_cars.csv ./data/cleaned_cars.csv
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Your new CSV should look like this &lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fyjp9ai40iihah0upbdym.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fyjp9ai40iihah0upbdym.png" alt=" " width="800" height="308"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Finally, clean up all the unused resources to free up space on your machine&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;docker system prune -a 
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F70xsobeh8nazbl1emsnc.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F70xsobeh8nazbl1emsnc.png" alt=" " width="800" height="436"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h1&gt;
  
  
  Best Practices
&lt;/h1&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Use small base images:&lt;/strong&gt; Start from lightweight images like python:3.10-slim or alpine instead of full OS images to reduce build size and speed up deployment.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Clean up unused resources:&lt;/strong&gt; Regularly remove unused containers, images, and networks with:&lt;br&gt;
&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;docker system prune -a
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This keeps your environment tidy and reduces disk usage.&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;Secure your containers:&lt;/strong&gt; Avoid running containers as root. Use a non-root user and apply least-privilege principles. Keep your images updated to patch vulnerabilities.&lt;/li&gt;
&lt;/ul&gt;

&lt;h1&gt;
  
  
  Conclusion
&lt;/h1&gt;

&lt;p&gt;Docker is a powerful tool that transforms how data engineers build, test, and deploy data pipelines — enabling consistent, reproducible environments across teams.&lt;/p&gt;

&lt;p&gt;By containerizing ETL jobs, analytics tools, or machine learning models, teams save time and avoid “it works on my machine” issues.&lt;/p&gt;

&lt;p&gt;For the next steps, you can check out Docker Compose, a simple yet powerful addition that makes running multi-container applications easy&lt;/p&gt;

</description>
      <category>dataengineering</category>
      <category>containers</category>
      <category>tutorial</category>
      <category>docker</category>
    </item>
    <item>
      <title>A Beginner’s Guide to Big Data Analytics with Apache Spark and PySpark</title>
      <dc:creator>Eric Kahindi</dc:creator>
      <pubDate>Mon, 29 Sep 2025 21:51:47 +0000</pubDate>
      <link>https://forem.com/eric_kahindi_cfbfda3bd0f7/a-beginners-guide-to-big-data-analytics-with-apache-spark-and-pyspark-49f3</link>
      <guid>https://forem.com/eric_kahindi_cfbfda3bd0f7/a-beginners-guide-to-big-data-analytics-with-apache-spark-and-pyspark-49f3</guid>
      <description>&lt;h1&gt;
  
  
  What is Apache Spark?
&lt;/h1&gt;

&lt;p&gt;We all know about pandas data frames and how they make handling data so quick and easy. You can transform your data (drop columns, change data types, filter nulls) all in just a few lines of code.&lt;/p&gt;

&lt;p&gt;But have you ever wondered what’s happening under the hood?&lt;br&gt;
Where is this data actually stored while you’re manipulating it?&lt;br&gt;
It’s definitely not in your database.&lt;br&gt;
And once your Python script ends, where does all of it go?&lt;/p&gt;

&lt;p&gt;The answer is simple: &lt;strong&gt;main memory (RAM)&lt;/strong&gt;.&lt;/p&gt;

&lt;p&gt;All those transformations you run in pandas are only possible because your dataset is small enough to fit in your RAM.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Now here’s the problem:&lt;/strong&gt; what happens when you’re working on massive projects, like training your own general-purpose LLM or crunching billions of rows of data?&lt;/p&gt;

&lt;p&gt;The reality is—you can’t (at least not efficiently) with pandas. Sure, you can try streaming data or working in batches (I personally tried both for Lingua Connect), but it quickly becomes complex for no real reason.&lt;/p&gt;

&lt;p&gt;And that’s where Apache Spark comes in.&lt;/p&gt;
&lt;h1&gt;
  
  
  Enter Spark
&lt;/h1&gt;

&lt;p&gt;Apache Spark is the hero you call when your data is so massive that your machine can’t handle it all at once.&lt;/p&gt;

&lt;p&gt;At its core, Apache Spark is an open-source, unified analytics engine designed for large-scale data processing.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;In short: Just add more machines!&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Here’s what I mean:&lt;/p&gt;

&lt;p&gt;Instead of relying on one machine’s memory, Spark distributes your dataset across multiple machines (nodes). Each node processes a chunk of the data in parallel, and Spark combines the results. To you, it feels like working on one logical machine—but behind the scenes, it’s a cluster doing the heavy lifting.&lt;/p&gt;

&lt;p&gt;The diagram bellow summarises its architecture in a nut shell&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F6fdtpbmdey6rxr7vuzli.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F6fdtpbmdey6rxr7vuzli.png" alt=" " width="757" height="422"&gt;&lt;/a&gt;&lt;/p&gt;
&lt;h1&gt;
  
  
  The Dimensions of Apache Spark
&lt;/h1&gt;

&lt;p&gt;Spark isn’t just about running queries faster. It’s a whole ecosystem. &lt;br&gt;
So after this article, you can decide which dimensions of Spark to follow, but here are the main ones.&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Spark Core&lt;/strong&gt; – The foundation that handles memory management, job scheduling, and distributed task execution.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Spark SQL&lt;/strong&gt; – For working with structured data (tables, DataFrames, SQL queries).&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Spark Streaming&lt;/strong&gt; – For real-time data processing (think logs, IoT data, live feeds).&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;MLlib&lt;/strong&gt; – Spark’s built-in machine learning library for scalable ML models.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;GraphX&lt;/strong&gt; – For graph computations (social networks, recommendations, relationships).&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Now let's look into a Spark Core and Spark sql through the simple path pyspark &lt;/p&gt;
&lt;h1&gt;
  
  
  Apache Spark vs PySpark
&lt;/h1&gt;

&lt;p&gt;Now you might be wondering—what’s the difference between Apache Spark and PySpark?&lt;/p&gt;

&lt;p&gt;Put simply:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;Apache Spark is the engine. Think of it like the car’s engine—the part that actually does the work and propels the car forward.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;PySpark is the Python API for Spark. It’s the steering wheel you use to control the engine.&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;So while Spark itself is written in Scala and Java, PySpark gives Python developers the ability to harness Spark’s distributed power without leaving the comfort of Python.&lt;/p&gt;
&lt;h1&gt;
  
  
  Set up
&lt;/h1&gt;

&lt;p&gt;I highly recommend using an environment variable while working on this. You can then set up the venv and activate it for PySpark at the base of your working directory.&lt;/p&gt;

&lt;p&gt;We'll start by installing the latest version of Spark  (This might take a while)&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;wget https://dlcdn.apache.org/spark/spark-4.0.1/spark-4.0.1-bin-hadoop3.tgz
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;We then unzip the Spark file&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;tar -xzf spark-4.0.1-bin-hadoop3.tgz
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;We then rename the Spark folder to make it easier to use and then move into it (the Spark home)&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;mv spark-4.0.1-bin-hadoop3/ spark/
cd spark
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Run this command to ensure that your park version is downloaded&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;./bin/spark-submit --version
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;You should get an output like this.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fkogw9r7xtqd0wsidxrti.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fkogw9r7xtqd0wsidxrti.png" alt=" " width="800" height="441"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Finally, let's install PySpark&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;pip install pyspark
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h1&gt;
  
  
  Take Pyspark for a spin
&lt;/h1&gt;

&lt;p&gt;Create a new folder at the base of your working directory named files. This is where your code and data go&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;touch spark.ipynb
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Open it up in a text editor and connect to the virtual environment in which you installed PySpark&lt;br&gt;
Now, let's get started&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;from pyspark.sql import SparkSession
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Let's start the Spark session.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;spark = SparkSession.builder.appName('demo').getOrCreate()
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This is like starting up everything, ie, the entire architecture described above, like twisting the key to start the engine.&lt;/p&gt;

&lt;p&gt;Create a dataframe&lt;br&gt;
An interesting thing to note, unlike in pandas, dataframes in Spark are lists of tuples&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;data=[("Eric", 25), ("Jane", 29), ("Sam", 35)]
df = spark.createDataFrame(data, ["name", "age"])
df.show()
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fre69iq6b6x90h5nhyvzx.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fre69iq6b6x90h5nhyvzx.png" alt=" " width="369" height="193"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Here are some useful commands&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;#identify the schema
df.printSchema()
#list out the columns 
df.columns
#count the number of rows 
df.count()
#creating data from CSV
df = spark.read.csv("demo.csv", header=True, inferSchema=True)
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h1&gt;
  
  
  Spark SQL
&lt;/h1&gt;

&lt;p&gt;Now we can interact with our data using SQL, which is really cool &lt;br&gt;
First, we create a view from of dataframe (synonymous with database views)&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;df.createOrReplaceTempView("Demo")
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;That's it! Now you can write your queries here &lt;br&gt;
Note that we're accessing the view from the Spark session we created above.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;spark.sql("SELECT * FROM Demo"),show(5)
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



</description>
      <category>beginners</category>
      <category>datascience</category>
      <category>python</category>
    </item>
    <item>
      <title>Apache Kafka Deep Dive: Core Concepts, Data Engineering Applications, and Real-World Production Practices</title>
      <dc:creator>Eric Kahindi</dc:creator>
      <pubDate>Sun, 21 Sep 2025 17:28:52 +0000</pubDate>
      <link>https://forem.com/eric_kahindi_cfbfda3bd0f7/apache-kafka-deep-dive-core-concepts-data-engineering-applications-and-real-world-production-2e55</link>
      <guid>https://forem.com/eric_kahindi_cfbfda3bd0f7/apache-kafka-deep-dive-core-concepts-data-engineering-applications-and-real-world-production-2e55</guid>
      <description>&lt;p&gt;Welcome to the world of data streaming. The world of Kafka. Leave all of your previous data storage knowledge and prepare to "subscribe" to &lt;br&gt;
a whole new line of thought &lt;/p&gt;
&lt;h1&gt;
  
  
  What is Kafka
&lt;/h1&gt;

&lt;p&gt;Apache Kafka was founded by LinkedIn and was later adopted by Apache under the open source license. This means you can take the Kafka and modify it however you like to suit your needs. &lt;/p&gt;

&lt;p&gt;It is an open-source data streaming platform that uses the publish and subscribe model to decouple applications and reduce dependency on them.&lt;br&gt;
It does this by keeping logs of events between microservices in an application&lt;/p&gt;
&lt;h1&gt;
  
  
  Scenario
&lt;/h1&gt;

&lt;p&gt;Imagine you are the owner of a certain business with the structure shown below&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Froi3k90tmlfi3iimgz8f.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Froi3k90tmlfi3iimgz8f.png" alt=" " width="800" height="367"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Everything might seem ok at first, but what happens when one node in the system fails? &lt;br&gt;
Let's take, for instance, the payment microservice fails. This situation might leave your clients waiting on a loading screen once the order is placed for a payment fulfillment that will never come.&lt;br&gt;
Worse still, this order might be logged onto the analytics section, corrupting your data &lt;/p&gt;

&lt;p&gt;This is where Kafka shines&lt;br&gt;
It places itself between these microservices to act as a middleman between these services, which,&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Eliminates the single points of failure &lt;/li&gt;
&lt;li&gt;Improves recoverability&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fobtz2mzssfvna2wkvh0t.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fobtz2mzssfvna2wkvh0t.png" alt=" " width="800" height="299"&gt;&lt;/a&gt;&lt;/p&gt;
&lt;h1&gt;
  
  
  Kafka core concepts
&lt;/h1&gt;

&lt;p&gt;There are a few concepts you may need to wrap your head around when dealing with Kafka &lt;br&gt;
&lt;strong&gt;Publish-subscribe model&lt;/strong&gt; - a messaging architectural pattern where applications that generate data (publishers) send messages to an intermediary called a message broker without knowing who will receive them.&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;This leads to the decoupling advantage above&lt;/li&gt;
&lt;li&gt;It also brings about interoperability as systems only need to talk to Kafka instead of creating custom integrations for each system &lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;strong&gt;Event-first architecture&lt;/strong&gt; - This way of thinking shifts the focus from requesting data or calling services to reacting to facts and business events as they occur. &lt;/p&gt;
&lt;h1&gt;
  
  
  Kafka Architectural elements
&lt;/h1&gt;

&lt;p&gt;Ok, now that that's out of the way, let's peer into the inner workings of Kafka by briefly describing its constitution.&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;Record&lt;/strong&gt; - This, also known as an event, is the actual message that is produced by the publishing microservice &lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Producer&lt;/strong&gt; - This is the publishing microservice that creates and sends the message to Kafka &lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Consumer&lt;/strong&gt; - This is the subscribing microservice that listens for and receives messages that come from Kafka&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Topic&lt;/strong&gt; - This is the Kafka equivalent of a database table. It is an immutable log of events that a consumer and producer subscribe to and publish to. You can have an orders topic for the orders microservice &lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Broker&lt;/strong&gt; - This is the actual server on which Kafka runs &lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Kafka cluster&lt;/strong&gt; - A single Kafka instance can run on multiple servers (nodes/brokers). These servers make up the Kafka cluster&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Partitioning&lt;/strong&gt; - This is the logical subdivision of a topic into various nodes in the Kadka cluster&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Replication&lt;/strong&gt; - Partitions can be copied across multiple nodes to create replicas that can act as backups in case one node fails&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Zookeeper&lt;/strong&gt; - Partitioning and replication can be tricky business, especially when it comes to issues of data consistency. Zookeeper is an external resource that solves this problem, handling the coordination and synchronization of Kafka.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Kraft&lt;/strong&gt; - This is a newer internal part of Kafka that came out on Kafka 3.3, which handles the Zookeeper functions, eliminating Kafka's dependence on it.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Consumer Group&lt;/strong&gt; - Suppose your system leverages replication of microservices to improve scalability and accessibility. Production may straight forward as each producer microservice replica just sends a different record to a topic. But then, if producer replicas subscribe to that topic, then they each receive all the messages in sequential order. This is a problem 
&lt;strong&gt;Consumer groups&lt;/strong&gt; help solve this issue, here's how. 
Multiple consumer instances that belong to the same logical application or service are configured with the same group ID. This group ID identifies them as part of the same consumer group.
This allows Kafka the ability to coordinate the consumption of messages such that they are processed in parallel by separate consumer microservice replicas.&lt;/li&gt;
&lt;/ul&gt;
&lt;h1&gt;
  
  
  Set up
&lt;/h1&gt;

&lt;p&gt;Awesome, now that we're all caught up, let's get hands-on by installing and running an instance of Kafka in our terminal&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Note&lt;/strong&gt;&lt;br&gt;
&lt;em&gt;Kafka on the console should not be used in a production environment unless it is absolutely necessary to. AVOID in production at all costs&lt;/em&gt;&lt;/p&gt;

&lt;p&gt;But since we're running a simple single-partition Kafka instance on our pc, it should be fine. Follow the steps below.&lt;/p&gt;

&lt;p&gt;Install java&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;sudo apt install default-jre
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Confirm Java installation&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;java --version
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;em&gt;Make sure to use Java 11 or 17&lt;/em&gt;&lt;/p&gt;

&lt;p&gt;The commands below download Kafka, unzip it, then rename the kafka folder (to a more readable name, Kafk, which will act as Kafka's home directory). Finally, move into the Kafka directory&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;wget https://archive.apache.org/dist/kafka/3.3.1/kafka_2.13-3.3.1.tgz
tar -xzvf kafka_2.13-3.3.1.tgz 
mv kafka_2.13-3.3.1/ kafka/
cd kafka/
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Start up zookeeper&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;bin/zookeeper-server-start.sh config/zookeeper.properties
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Start up Kafka itself (ie, Kafka server)&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;bin/kafka-server-start.sh config/server.properties
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Now, let's create a test topic in Kafka&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;bin/kafka-topics.sh --create   --topic test-topic   --bootstrap-server localhost:9092   --partitions 1   --replication-factor 1
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Create a producer&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;bin/kafka-console-producer.sh --topic test-topic --bootstrap-server localhost:9092
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;em&gt;This will open up an interactive console where you can type in messages on your created test topic&lt;/em&gt;&lt;/p&gt;

&lt;p&gt;Create a consumer in a different terminal&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;bin/kafka-console-consumer.sh --topic test-topic --from-beginning --bootstrap-server localhost:9092
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Now, when you type anything on the producer's interactive terminal, it will show up on the producer's side. &lt;strong&gt;You are now streaming data with Kafka!&lt;/strong&gt;&lt;/p&gt;

&lt;h1&gt;
  
  
  Use cases
&lt;/h1&gt;

&lt;p&gt;&lt;strong&gt;Kafka console - (special case)&lt;/strong&gt;&lt;br&gt;
Consumers and producers are usually applications or microservices; however, the Kafka console allows the developer to be the producer.&lt;/p&gt;

&lt;p&gt;This is not ideal in production because any wrong or misspelled record made by a developer or producer becomes an irreversible record in that topic. &lt;br&gt;
Furthermore, if multiple microservices subscribe to this topic, it may trigger an unwanted chain of events that may be catastrophic&lt;/p&gt;

&lt;p&gt;Nevertheless, it's still a useful feature in some scenarios&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Testing&lt;/strong&gt;&lt;br&gt;
If you need to confirm whether or not the Kafka service that you want is working as expected, just as we did in the setup section.&lt;br&gt;
This means, &lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;You have deployed a new cluster and want to try it out&lt;/li&gt;
&lt;li&gt;You are debugging an issue with an existing cluster.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;strong&gt;Backfilling data&lt;/strong&gt;&lt;br&gt;
Suppose your orders microservice crashed before it shut down, and a few orders were placed before they could be pushed to the Topic. Not to worry, you have a backup saved in a CSV file in case of failure, you can simply run&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;kafka-console-producer \
    --topic example-topic \
    --bootstrap-server localhost:9092 \
    &amp;lt; your_prepared_backorders.csv
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;em&gt;Provided the schema aligns with the topic schema&lt;/em&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Real-world use cases
&lt;/h2&gt;

&lt;p&gt;&lt;strong&gt;Netflix&lt;/strong&gt;&lt;br&gt;
When you think about Netflix, you think about instant access to your favorite movie or series. But what happens behind the scenes when you hit that “Play” button?&lt;br&gt;
Netflix uses Kafka to handle real-time monitoring and event processing across its entire platform.&lt;/p&gt;

&lt;p&gt;Every time you play, pause, fast-forward, or even hover over a title, an event is generated.&lt;/p&gt;

&lt;p&gt;Kafka acts as the middleman, transporting billions of these events per day into different services:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Recommendations engine&lt;/strong&gt; – to suggest what you should watch next&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Quality of Service (QoS) monitoring&lt;/strong&gt; – to make sure the video resolution adjusts smoothly to your network&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Operational alerts&lt;/strong&gt; – so engineers can act if something breaks in the delivery pipeline&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Without Kafka, the massive amount of real-time events would overwhelm individual services. By centralizing these events, Netflix achieves scalability, resilience, and real-time personalization.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Uber&lt;/strong&gt;&lt;br&gt;
Uber is not just a ride-hailing app. It’s a real-time logistics platform moving people, food, and even packages around cities worldwide.&lt;/p&gt;

&lt;p&gt;Here’s how Kafka fits in:&lt;/p&gt;

&lt;p&gt;Every trip generates a constant stream of GPS events from both driver and rider apps.&lt;/p&gt;

&lt;p&gt;Kafka ingests and streams these events in real-time to different services:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;Matching service – to connect you with the nearest driver&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;ETA calculation – to update arrival times dynamically as traffic changes&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Surge pricing – to adjust fares instantly during high demand&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Fraud detection – to flag suspicious activity as it happens&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Kafka enables Uber to handle millions of concurrent, low-latency events across geographies, ensuring rides, deliveries, and payments work seamlessly without bottlenecks.&lt;/p&gt;

&lt;h1&gt;
  
  
  Production Best Practices
&lt;/h1&gt;

&lt;p&gt;Running Kafka on your laptop is fun for demos, but in production the story is very different. Major companies have learned (sometimes the hard way) that to keep Kafka reliable at scale, you need to follow certain best practice&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Partitioning and replication&lt;/strong&gt;&lt;br&gt;
In production, Companies run clusters of multiple brokers spread across different machines or even data centers.&lt;/p&gt;

&lt;p&gt;Topics are partitioned for horizontal scalability and replicated (usually with a replication factor of 3) for fault tolerance.&lt;/p&gt;

&lt;p&gt;This way, even if one broker crashes, the cluster can continue without data loss.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Retention Policy and capacity&lt;/strong&gt;&lt;br&gt;
Topics can grow endlessly in production, which is bad when your machine has finite memory. Therefore, it is good to have a sort of garbage collection device ready&lt;br&gt;
In production teams, set retention policies (e.g., 7 days or 30 days) to automatically delete old messages.&lt;/p&gt;

&lt;p&gt;They tune log compaction to retain only the latest value for each key when needed.&lt;/p&gt;

&lt;p&gt;Storage, disk throughput, and network bandwidth are carefully planned before scaling up.&lt;/p&gt;

&lt;h1&gt;
  
  
  References
&lt;/h1&gt;

&lt;p&gt;&lt;strong&gt;Kafka Tutorial for Beginners | Everything you need to get started, TechWorld with Nana&lt;/strong&gt;&lt;br&gt;
&lt;strong&gt;Kafka Tutorial—Multi-chapter guide, RedPanda&lt;/strong&gt;&lt;br&gt;
&lt;strong&gt;Featuring Apache Kafka in the Netflix Studio and Finance World, Confluent&lt;/strong&gt;&lt;br&gt;
&lt;strong&gt;Kafka retention—Types, challenges, alternatives, Red Panda&lt;/strong&gt;&lt;/p&gt;

</description>
      <category>architecture</category>
      <category>dataengineering</category>
      <category>microservices</category>
    </item>
    <item>
      <title>Creating your own data lake (MINIO+TRINO+GRAFANA)</title>
      <dc:creator>Eric Kahindi</dc:creator>
      <pubDate>Fri, 19 Sep 2025 11:13:26 +0000</pubDate>
      <link>https://forem.com/eric_kahindi_cfbfda3bd0f7/creating-your-own-data-lake-miniotrinografana-1no1</link>
      <guid>https://forem.com/eric_kahindi_cfbfda3bd0f7/creating-your-own-data-lake-miniotrinografana-1no1</guid>
      <description>&lt;p&gt;Welcome, the full article might be a bit long so Ill break it into 3 parts &lt;br&gt;
For this one, we'll focus on miniO (the backbone of your datalake)&lt;/p&gt;

&lt;h1&gt;
  
  
  Getting Started with MinIO: Your Private S3-Compatible Data Lake
&lt;/h1&gt;

&lt;p&gt;MinIO is an open-source, high-performance object storage system that is fully compatible with the Amazon S3 API. Instead of storing data as files in directories, MinIO stores them as objects inside buckets. This makes it incredibly flexible and scalable, especially when working with large datasets.&lt;/p&gt;

&lt;p&gt;MinIO can be the storage layer of a data lake.&lt;br&gt;
On top of it, you connect tools like:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Apache Spark / Dask for big data processing&lt;/li&gt;
&lt;li&gt;Presto/Trino / Athena for SQL queries&lt;/li&gt;
&lt;li&gt;Grafana / Superset for visualization&lt;/li&gt;
&lt;li&gt;Airflow for orchestration&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  The setup
&lt;/h2&gt;

&lt;p&gt;Install it &lt;/p&gt;

&lt;p&gt;&lt;code&gt;wget https://dl.min.io/server/minio/release/linux-amd64/minio&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;Make it executable &lt;/p&gt;

&lt;p&gt;&lt;code&gt;chmod +x minio&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;Move the executable so that it can be run from anywhere &lt;/p&gt;

&lt;p&gt;&lt;code&gt;sudo mv minio /usr/local/bin&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;Export the users and passwords (start from here to restart your server once installed)&lt;/p&gt;

&lt;p&gt;&lt;code&gt;export MINIO_ROOT_USER=&amp;lt;your-user-name&amp;gt;&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;&lt;code&gt;export MINIO_ROOT_PASSWORD=&amp;lt;your-password&amp;gt;&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;Start the server &lt;/p&gt;

&lt;p&gt;&lt;code&gt;minio server ~/minio-data --console-address ":9001"&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;One thing to note is, if you want to connect to minio, it exposes an api at "localhost:9000" that's why we use ":9001" for the server &lt;br&gt;
Otherwise go to "localhost:9001" for your web ui &lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F546444vpjh0kzn3xbun4.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F546444vpjh0kzn3xbun4.png" alt=" " width="800" height="450"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Then log in to your server to get a browser console like this &lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fty4wy4c9g35pcgymcylw.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fty4wy4c9g35pcgymcylw.png" alt=" " width="800" height="450"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Add data to the server
&lt;/h2&gt;

&lt;p&gt;In this case, we can create an Airflow DAG that will load the data for us. &lt;br&gt;
I'm using Coingeko to get data on a few coins(get your api key first, add it to an env, then load it as I did)&lt;br&gt;
Then, transforming the data using pandas to put it in the right format &lt;br&gt;
Finally saving it into MiniO in parquet files&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;&lt;em&gt;If you're not familiar with airflow, you can just create a separate file and run the "coin_vis_etl" function&lt;/em&gt;&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;em&gt;Notice I'm using port 9000, not 9001&lt;/em&gt;&lt;br&gt;
&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;from airflow import DAG
from airflow.operators.python import PythonOperator 
import requests
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import requests 
import pandas as pd
import os
from dotenv import load_dotenv
from datetime import datetime


def coin_vis_etl():
    load_dotenv


    MY_API_KEY = os.getenv("MY_API_KEY")
    crypto_list = ['bitcoin', 'solana', 'ethereum', 'hyperliquid', 'binancecoin']

    for crypto in crypto_list:
        #load the data from the api 
        url = f"https://api.coingecko.com/api/v3/coins/{crypto}/market_chart?vs_currency=usd&amp;amp;days=30"
        headers = {
            'accepts':'application/json',
            'x-cg-demo-api-key': MY_API_KEY    
            }
        response = requests.get(url, headers=headers)

        #define custom metatdata 
        # use encode if youre adding a formated string instead of a b 
        custom_metadata ={
            b"source": b"coingecko API",
            b"coin_name": f"{crypto}".encode()
        }

        if response.status_code == 200:
            data = response.json()
            temp = pd.DataFrame(data['prices'], columns=[f"{crypto}_timestamps", f"{crypto}_prices"])

            # we can just add the columns since the time stamps are the same
            temp[f"{crypto}_market_caps"]= [x[1] for x in data["market_caps"]]
            temp[f"{crypto}_total_volumes"] = [x[1] for x in data["total_volumes"]]

            # change timestamps to the real ones 
            temp[f"{crypto}_timestamps"] = pd.to_datetime(temp[f"{crypto}_timestamps"], unit='ms')

            # use pyarrow to change them parquets 
            table = pa.Table.from_pandas(temp)

            # adding the metadata the metadata 
            existing_metadata = table.schema.metadata or {}
            new_metadata = {**custom_metadata, **existing_metadata}
            table = table.replace_schema_metadata(new_metadata)

            # finally write the data to the minio using pyarrow
            fs = pa.fs.S3FileSystem(
                access_key="eric",
                secret_key='eric1234',
                endpoint_override ="http://localhost:9000"
            )

            pq.write_table(
                table, 
                f"coin-vis-automated/coin_vis_{crypto}.parquet",
                filesystem=fs
            )
            print(f'{crypto} is done')

        else:
            print(f"API error at {crypto}, {response.content}")


with DAG(
    dag_id = 'coin_vis_etl',
    start_date = datetime(2025, 9, 15),
    schedule_interval= '@hourly',
    catchup= False

) as dag:
    runetl=PythonOperator(
        task_id = 'coin_vis_etl',
        python_callable = coin_vis_etl
    )

    runetl
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;And there you go, you've just created your own S3-compatible object storage &lt;/p&gt;

</description>
      <category>minio</category>
      <category>trino</category>
      <category>grafana</category>
    </item>
    <item>
      <title>Why I’m Switching to Parquet for Data Storage</title>
      <dc:creator>Eric Kahindi</dc:creator>
      <pubDate>Mon, 15 Sep 2025 05:49:08 +0000</pubDate>
      <link>https://forem.com/eric_kahindi_cfbfda3bd0f7/why-im-switching-to-parquet-for-data-storage-3ckm</link>
      <guid>https://forem.com/eric_kahindi_cfbfda3bd0f7/why-im-switching-to-parquet-for-data-storage-3ckm</guid>
      <description>&lt;p&gt;The first time I came across Parquet files was during my fourth-year project. I kept seeing Hugging Face recommend them whenever I uploaded a custom dataset, and I wondered: why are they so obsessed with this file format?&lt;/p&gt;

&lt;p&gt;Fast forward to today, as I dive deeper into object storage and data lakes, Parquet shows up everywhere again. After some research and hands-on work, I finally get it: this format is not just hype. It’s genuinely better for large-scale data.&lt;/p&gt;

&lt;p&gt;First, let's get the &lt;strong&gt;benefits&lt;/strong&gt; out of the way &lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;They’re simple&lt;/strong&gt; – easy to read/write with common libraries.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;They’re fast&lt;/strong&gt; – optimized for data retreival queries.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;They’re compact&lt;/strong&gt; – storing the same data in less storage.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;They’re schema-aware&lt;/strong&gt; – built-in metadata and structure make them perfect for data lakes.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;When I saw all this, my thoughts were, This can't be right, right?&lt;/p&gt;

&lt;h1&gt;
  
  
  So let's unpack the benefits
&lt;/h1&gt;

&lt;h2&gt;
  
  
  Simple to Use
&lt;/h2&gt;

&lt;p&gt;Converting a CSV to Parquet takes just a few lines of Python:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

# Example dataframe
df = pd.DataFrame({
    "timestamp": ["2025-09-10", "2025-09-11"],
    "symbol": ["BTC", "BTC"],
    "close": [57800, 58800]
})

table = pa.Table.from_pandas(df)
pq.write_table(table, "crypto.parquet")

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;That’s it — you now have a Parquet file.&lt;/p&gt;

&lt;h2&gt;
  
  
  Faster Queries
&lt;/h2&gt;

&lt;p&gt;Unlike CSVs, Parquet is a columnar storage format. Instead of organizing data row by row, it stores values by column, just like a pandas dataframe.&lt;br&gt;
Think of it like this: CSV is like a text document; Parquet is like a database table optimized for analytics.&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Need just one column? Parquet can read only that column instead of scanning the whole file.&lt;/li&gt;
&lt;li&gt;Query engines (Spark, DuckDB, etc.) can skip irrelevant chunks entirely.
This makes querying large datasets significantly faster.&lt;/li&gt;
&lt;/ul&gt;
&lt;h2&gt;
  
  
  More Compact
&lt;/h2&gt;

&lt;p&gt;So they store more data in less space than if the data were in a CSV, and here's how they achieve it&lt;br&gt;
Parquet is highly compressed by design. A few tricks it uses:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Integers - binary encoding (fewer bytes than text).&lt;/li&gt;
&lt;li&gt;Strings - dictionary encoding (e.g., "BTC" stored once, then referenced by index).&lt;/li&gt;
&lt;li&gt;Repeated values - run-length encoding (RLE). &lt;/li&gt;
&lt;/ul&gt;
&lt;h2&gt;
  
  
  Schema and Metadata
&lt;/h2&gt;

&lt;p&gt;This is where data lakes come in &lt;br&gt;
Parquet files are &lt;strong&gt;schema-aware&lt;/strong&gt;. This means that data is stored in a relevant schema, which is basically the blueprint of the data inside the Parquet file&lt;br&gt;
It describes:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Column names&lt;/li&gt;
&lt;li&gt;Data types (int, float, string, timestamp, boolean, etc.)&lt;/li&gt;
&lt;li&gt;Nullable or not&lt;/li&gt;
&lt;li&gt;Nested structures (if any)
You can even define your own schema:
&lt;/li&gt;
&lt;/ul&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;import pyarrow as pa
import pyarrow.parquet as pq

schema = pa.schema([
    ("timestamp", pa.timestamp("s")),
    ("symbol", pa.string()),
    ("price", pa.float64())
])

data = [
    ["2025-09-10 00:00:00", "BTC", 57800.0],
    ["2025-09-11 00:00:00", "ETH", 1800.5],
]

table = pa.Table.from_arrays(list(zip(*data)), schema=schema)
pq.write_table(table, "crypto_with_schema.parquet")

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;


&lt;p&gt;Parquet also supports &lt;strong&gt;rich metadata&lt;/strong&gt;, which is essential in data lakes. Without metadata, a data lake quickly turns into a “data swamp.”&lt;br&gt;
Every Parquet file already stores basic metadata:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Row count&lt;/li&gt;
&lt;li&gt;Column count&lt;/li&gt;
&lt;li&gt;Data types&lt;/li&gt;
&lt;li&gt;Compression info&lt;/li&gt;
&lt;li&gt;Column statistics (min/max values, null counts)&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;But you can also add custom metadata:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;pq.write_table(
    table,
    "btc_prices.parquet",
    metadata={
        b"source": b"CoinGecko",
        b"pipeline": b"airflow-crypto-etl",
        b"tokens": b"BTC,ETH,SOL,HYPE,BNB"
    }
)

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h1&gt;
  
  
  Querying in a Data Lake
&lt;/h1&gt;

&lt;p&gt;Once your Parquet files are in object storage (e.g., S3), you can query them directly with modern engines:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;SELECT timestamp, close
FROM 's3://crypto-data/*.parquet'
WHERE symbol = 'BTC' AND timestamp &amp;gt;= '2025-09-01'
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This makes Parquet a natural fit for tools like Spark, DuckDB, and Presto.&lt;/p&gt;

&lt;h1&gt;
  
  
  Bottom Line
&lt;/h1&gt;

&lt;p&gt;Parquet isn’t just another file format. It’s a compact, fast, and schema-aware way of storing data that plays perfectly with modern data lakes.&lt;br&gt;
If you care about performance, storage efficiency, and long-term scalability, Parquet is a no-brainer.&lt;/p&gt;

</description>
      <category>parquet</category>
      <category>datalake</category>
      <category>dataengineering</category>
    </item>
    <item>
      <title>Why you need to learn Apache Airflow - right now</title>
      <dc:creator>Eric Kahindi</dc:creator>
      <pubDate>Mon, 08 Sep 2025 15:41:39 +0000</pubDate>
      <link>https://forem.com/eric_kahindi_cfbfda3bd0f7/why-you-need-to-learn-apache-airflow-right-now-502j</link>
      <guid>https://forem.com/eric_kahindi_cfbfda3bd0f7/why-you-need-to-learn-apache-airflow-right-now-502j</guid>
      <description>&lt;h1&gt;
  
  
  The standard
&lt;/h1&gt;

&lt;h2&gt;
  
  
  What it is and how we got here
&lt;/h2&gt;

&lt;p&gt;Apache Airflow is an open-source workflow orchestration platform, used to author, schedule, and monitor complex data workflows in a reliable and scalable way.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Fun fact:&lt;/strong&gt; Apache Airflow is a tool that was developed by Airbnb, yes, the one with the houses and apartments.&lt;/p&gt;

&lt;p&gt;In a nutshell, Airflow allows you to define workflows as Directed Acyclic Graphs (DAGs) of tasks, written in Python. Each task might involve data extraction, transformation, loading (ETL), model training, reporting, or any other step in a data pipeline&lt;/p&gt;

&lt;p&gt;Ever since its inception, it has only become more and more popular. It's become a staple for most companies and developers in data engineering alike. apartments&lt;br&gt;
In my opinion, it's quite a bit of a learning curve, especially in the setup, but once you get past that, I felt the same, and here's why &lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Python-based&lt;/strong&gt; – Workflows are defined in Python code, which means it's relatively easy to pick up, even for beginners &lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Flexibility&lt;/strong&gt; – Because it’s Python-based, it integrates easily with existing systems and APIs.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Scalability&lt;/strong&gt; – Suitable for startups and individual developers, to enterprises and large corporations.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Debugability&lt;/strong&gt; – The UI and logs make debugging pipelines straightforward. It's really intuitive &lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Ecosystem Support&lt;/strong&gt; – Many cloud providers (AWS MWAA, Google Cloud Composer, Astronomer) offer managed Airflow services.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Proven Track Record&lt;/strong&gt; – Used by tech giants and enterprises for mission-critical pipelines.&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;
&lt;h1&gt;
  
  
  A brief tour
&lt;/h1&gt;

&lt;p&gt;Let's walk through how we set up and run a data pipeline. I'll explain more as we go along &lt;br&gt;
The pipelines are defined in Directed Acyclic Graphs (DAGs)&lt;/p&gt;

&lt;p&gt;So first, Apache has two sides: the web server UI and the Scheduler&lt;/p&gt;
&lt;h2&gt;
  
  
  Web server
&lt;/h2&gt;

&lt;p&gt;The web server is the major GUI, which acts as command central. This is where we can do all sorts of things to the DAGs, like debugging and monitoring them.&lt;br&gt;
Run&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;apache web-server
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fhbbahppp30ga99re4zeo.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fhbbahppp30ga99re4zeo.png" alt="A picture or the out come" width="800" height="450"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Once the server is started, you can view the GUI at the default port 8080. It should look something like this &lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fj7zk70s0sowwjjj0fi9h.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fj7zk70s0sowwjjj0fi9h.png" alt=" " width="800" height="450"&gt;&lt;/a&gt; &lt;/p&gt;

&lt;h2&gt;
  
  
  Scheduler
&lt;/h2&gt;

&lt;p&gt;This is the core of Apache Airflow. It decides when and what should be done &lt;/p&gt;

&lt;p&gt;Think of the web server as the dashboard, a speedometer, and the scheduler would be that engine that causes the car to move, triggering the dashboard readings&lt;/p&gt;

&lt;p&gt;It Parses the DAGs, Schedules Tasks, Manages Dependencies, Dispatches Work, Handles Catchup &amp;amp; Backfill&lt;/p&gt;

&lt;p&gt;In a separate terminal, run&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;airflow scheduler
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F4qvlim5bbsebx41r3h79.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F4qvlim5bbsebx41r3h79.png" alt=" " width="800" height="450"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  A simple Dag
&lt;/h2&gt;

&lt;p&gt;Now, in the previous screenshots, the line  "export AIRFLOW_HOME=$(pwd)/airflow" is a simple organisational step. &lt;br&gt;
Aiflow automatically creates a folder airflow, but the line above tells Aiflow to create it in your current directory.&lt;/p&gt;

&lt;p&gt;In a separate terminal, in this "aiflow" folder, create a folder named dags. Create a Python file to write your DAGs with the text editor of your choice.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fs0ft8zvpt6jczzf00y2q.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fs0ft8zvpt6jczzf00y2q.png" alt=" " width="800" height="203"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Then, proceed to write your DAG. &lt;br&gt;
This is a simple DAG for the Extraction step of the ETL process in Python.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fs45j4u7v3r0bucjhmraa.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fs45j4u7v3r0bucjhmraa.png" alt=" " width="800" height="450"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Run your DAG
&lt;/h2&gt;

&lt;p&gt;Now restart your scheduler and web server, and confirm that your DAG is present&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Ff7p1x6w81cbn1y8yx9y3.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Ff7p1x6w81cbn1y8yx9y3.png" alt=" " width="800" height="162"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;The fetch load DAG is present, and you can click the play button to run your DAG&lt;br&gt;
Click on your DAG name for closer inpsection or more options for that specific DAG&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Figwg4bwzgg8w04hoh2cv.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Figwg4bwzgg8w04hoh2cv.png" alt=" " width="800" height="450"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F4k1i1bar0qs5odx3m1vc.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F4k1i1bar0qs5odx3m1vc.png" alt=" " width="800" height="450"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;I find the Logs to be the most helpful part, especially when debugging&lt;br&gt;
For instance, in the failed run below, I hadn't properly defined the schema before executing &lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fq318gobjam32n7e3d889.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fq318gobjam32n7e3d889.png" alt=" " width="800" height="450"&gt;&lt;/a&gt;&lt;/p&gt;

</description>
      <category>dataengineering</category>
      <category>airflow</category>
      <category>python</category>
    </item>
    <item>
      <title>Setting Up PostgreSQL on a Virtual Machine</title>
      <dc:creator>Eric Kahindi</dc:creator>
      <pubDate>Sun, 03 Aug 2025 17:54:17 +0000</pubDate>
      <link>https://forem.com/eric_kahindi_cfbfda3bd0f7/setting-up-postgresql-on-a-virtual-machine-231f</link>
      <guid>https://forem.com/eric_kahindi_cfbfda3bd0f7/setting-up-postgresql-on-a-virtual-machine-231f</guid>
      <description>&lt;p&gt;In this guide, we’ll walk through the steps to set up a PostgreSQL server on a virtual machine (VM). This is ideal for anyone learning backend development, data engineering or deploying small projects in a cloud environment.&lt;/p&gt;

&lt;h1&gt;
  
  
  Creating the Virtual Machine
&lt;/h1&gt;

&lt;h2&gt;
  
  
  Choose Your Cloud Provider
&lt;/h2&gt;

&lt;p&gt;Before installing PostgreSQL, we need a VM running on a cloud provider.&lt;br&gt;
While you can use any cloud provider of your choice (AWS, Google Cloud, DigitalOcean, etc.), we'll focus on Microsoft Azure for this tutorial.&lt;br&gt;
Azure provides a generous free tier to test out their products, with a special offer to students who get about $100 in free credit for a whole year.&lt;/p&gt;
&lt;h2&gt;
  
  
  Create your account
&lt;/h2&gt;

&lt;ul&gt;
&lt;li&gt;Visit the Azure for Students page&lt;/li&gt;
&lt;li&gt;Sign up using your student email address&lt;/li&gt;
&lt;li&gt;Verify your student status&lt;/li&gt;
&lt;li&gt;Complete the registration process&lt;/li&gt;
&lt;/ul&gt;
&lt;h2&gt;
  
  
  Create a Virtual Machine
&lt;/h2&gt;

&lt;ul&gt;
&lt;li&gt;Log into the Azure Portal&lt;/li&gt;
&lt;li&gt;Click "Create a resource"&lt;/li&gt;
&lt;li&gt;Search for "Virtual Machine" and select it&lt;/li&gt;
&lt;li&gt;Choose your preferred Linux distribution (Ubuntu 20.04 LTS recommended)&lt;/li&gt;
&lt;li&gt;Select an appropriate VM size (B1s or B2s for testing purposes)&lt;/li&gt;
&lt;li&gt;Configure authentication (SSH public key recommended)&lt;/li&gt;
&lt;li&gt;Configure networking settings&lt;/li&gt;
&lt;li&gt;Review and create your VM&lt;/li&gt;
&lt;/ul&gt;
&lt;h1&gt;
  
  
  Connect and set up PostgreSQL
&lt;/h1&gt;

&lt;p&gt;Once your VM is running, you can connect to it using SSH:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;ssh username@your-vm-public-ip
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Install PostgreSQL
&lt;/h2&gt;

&lt;p&gt;Update your system packages first:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;sudo apt update
sudo apt upgrade -y
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Then install PostgreSQL with the following command&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;sudo apt install postgresql postgresql-contrib -y
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This will install PostgreSQL along with the contrib package for additional functionality that allows us to customise our PostgreSQL:&lt;/p&gt;

&lt;h2&gt;
  
  
  Start PostgreSQL
&lt;/h2&gt;

&lt;p&gt;First, we start the instance of PostgreSQL, then we enable the instance so that it starts automatically as the server boots, then we check the status to ensure that our changes have taken effect.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;sudo systemctl start postgresql
sudo systemctl enable postgresql
sudo systemctl status postgresql
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Log in to PostgreSQL instance
&lt;/h2&gt;

&lt;p&gt;Switch to the default postgres user to use the PostgreSQL command-line interface:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;sudo -i -u postgres
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Launch the PostgreSQL interactive terminal:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;psql
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Navigate and Use PostgreSQL
&lt;/h2&gt;

&lt;p&gt;Now that we're logged into psql, we can execute commands or sql queries &lt;br&gt;
Here is a list of common ones:&lt;br&gt;
List all available databases&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;\l
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Create a new user:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;CREATE USER lux WITH PASSWORD '****';
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Give the user superuser privileges:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;ALTER USER lux WITH SUPERUSER;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Exit the PostgreSQL shell:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;\q
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Exit back to your regular Linux user:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;exit
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h1&gt;
  
  
  Configure PostgreSQL for Remote Access
&lt;/h1&gt;

&lt;p&gt;By default, PostgreSQL only accepts connections from localhost. Let’s change that.&lt;/p&gt;

&lt;p&gt;Navigate to the PostgreSQL configuration directory:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;cd /etc/postgresql/
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Find your PostgreSQL version directory and navigate to it. Then locate the main configuration directory. Then open the postgresql.conf.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;sudo nano postgresql.conf
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Find the line:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;listen_addresses = 'localhost'
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Change it to:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;listen_addresses = '*'
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This allows any device to connect to the instance of PostgreSQL; however, you can list out the specific IPs you may want to use if you have a preference &lt;br&gt;
Save and exit the file (Ctrl + X in nano).&lt;/p&gt;

&lt;h1&gt;
  
  
  Done
&lt;/h1&gt;

&lt;p&gt;You now have a fully functioning PostgreSQL server running on a virtual machine.&lt;/p&gt;

</description>
    </item>
  </channel>
</rss>
