<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>Forem: Anuj Manjhi</title>
    <description>The latest articles on Forem by Anuj Manjhi (@black_eagle).</description>
    <link>https://forem.com/black_eagle</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F3791522%2F1b28ed7f-003d-4a8f-99ef-5493b07576fa.png</url>
      <title>Forem: Anuj Manjhi</title>
      <link>https://forem.com/black_eagle</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://forem.com/feed/black_eagle"/>
    <language>en</language>
    <item>
      <title>SQL Debezium Kafka Snowflake</title>
      <dc:creator>Anuj Manjhi</dc:creator>
      <pubDate>Wed, 25 Feb 2026 10:25:36 +0000</pubDate>
      <link>https://forem.com/black_eagle/sql-debezium-kafka-snowflake-3d7l</link>
      <guid>https://forem.com/black_eagle/sql-debezium-kafka-snowflake-3d7l</guid>
      <description>&lt;p&gt;(Completely self-contained. No Azure. No Elasticsearch.)&lt;/p&gt;

&lt;p&gt;1️⃣ BEGINNER THEORY SECTION (Only What We Use)&lt;br&gt;
🟦 What is CDC (Change Data Capture)?&lt;/p&gt;

&lt;p&gt;CDC = CCTV camera for your database&lt;/p&gt;

&lt;p&gt;Imagine your SQL database is a bank vault.&lt;/p&gt;

&lt;p&gt;CDC acts like:&lt;/p&gt;

&lt;p&gt;🎥 A CCTV camera recording every insert, update, delete.&lt;/p&gt;

&lt;p&gt;Instead of scanning the whole database again and again,&lt;br&gt;
it only captures what changed.&lt;/p&gt;

&lt;p&gt;Why useful?&lt;/p&gt;

&lt;p&gt;Real-time data pipelines&lt;/p&gt;

&lt;p&gt;No heavy full table scans&lt;/p&gt;

&lt;p&gt;Efficient replication&lt;/p&gt;

&lt;p&gt;🟦 What is Debezium?&lt;/p&gt;

&lt;p&gt;Debezium = Smart CDC Reader&lt;/p&gt;

&lt;p&gt;If CDC is CCTV footage,&lt;br&gt;
Debezium is the security officer watching that footage and sending alerts.&lt;/p&gt;

&lt;p&gt;It:&lt;/p&gt;

&lt;p&gt;Reads database transaction logs&lt;/p&gt;

&lt;p&gt;Converts changes into events&lt;/p&gt;

&lt;p&gt;Sends events to Kafka&lt;/p&gt;

&lt;p&gt;🟦 What is Apache Kafka?&lt;/p&gt;

&lt;p&gt;Kafka = Post Office Sorting Center 📬&lt;/p&gt;

&lt;p&gt;Imagine thousands of letters arriving per second.&lt;/p&gt;

&lt;p&gt;Kafka:&lt;/p&gt;

&lt;p&gt;Receives messages (events)&lt;/p&gt;

&lt;p&gt;Stores them in order&lt;/p&gt;

&lt;p&gt;Lets multiple systems read them independently&lt;/p&gt;

&lt;p&gt;Kafka does NOT modify data.&lt;br&gt;
It stores streams.&lt;/p&gt;

&lt;p&gt;🟦 What is a Kafka Topic?&lt;/p&gt;

&lt;p&gt;Topic = Mailbox&lt;/p&gt;

&lt;p&gt;Example:&lt;/p&gt;

&lt;p&gt;sql2016.dbo.DailyWeatherFact&lt;/p&gt;

&lt;p&gt;Each table becomes a topic.&lt;/p&gt;

&lt;p&gt;Kafka keeps events in ordered logs.&lt;/p&gt;

&lt;p&gt;🟦 What is Kafka Connect?&lt;/p&gt;

&lt;p&gt;Kafka Connect = Automation Robot 🤖&lt;/p&gt;

&lt;p&gt;Instead of writing custom code to:&lt;/p&gt;

&lt;p&gt;Read from Kafka&lt;/p&gt;

&lt;p&gt;Send to Snowflake&lt;/p&gt;

&lt;p&gt;Kafka Connect runs connectors that:&lt;/p&gt;

&lt;p&gt;Pull from source&lt;/p&gt;

&lt;p&gt;Push to destination&lt;/p&gt;

&lt;p&gt;🟦 What is a Sink Connector?&lt;/p&gt;

&lt;p&gt;Sink = Delivery Truck 🚛&lt;/p&gt;

&lt;p&gt;Kafka stores mail.&lt;br&gt;
Sink connector delivers mail to final destination.&lt;/p&gt;

&lt;p&gt;In this project:&lt;br&gt;
Kafka → Snowflake&lt;/p&gt;

&lt;p&gt;🟦 What is Snowflake?&lt;/p&gt;

&lt;p&gt;Snowflake = Digital Library Warehouse 📚&lt;/p&gt;

&lt;p&gt;It:&lt;/p&gt;

&lt;p&gt;Stores massive data&lt;/p&gt;

&lt;p&gt;Supports analytics&lt;/p&gt;

&lt;p&gt;Scales independently&lt;/p&gt;

&lt;p&gt;Why not write directly to Snowflake?&lt;/p&gt;

&lt;p&gt;Because:&lt;/p&gt;

&lt;p&gt;Kafka gives:&lt;/p&gt;

&lt;p&gt;Buffering&lt;/p&gt;

&lt;p&gt;Replay&lt;/p&gt;

&lt;p&gt;Fault tolerance&lt;/p&gt;

&lt;p&gt;Scalability&lt;/p&gt;

&lt;p&gt;2️⃣ ARCHITECTURE OVERVIEW&lt;br&gt;
🧭 Text Diagram&lt;br&gt;
+------------------+&lt;br&gt;
|   SQL Server     |&lt;br&gt;
|  (Climatology)   |&lt;br&gt;
+--------+---------+&lt;br&gt;
         |&lt;br&gt;
         | CDC (transaction logs)&lt;br&gt;
         v&lt;br&gt;
+------------------+&lt;br&gt;
|    Debezium      |&lt;br&gt;
|  Source Connector|&lt;br&gt;
+--------+---------+&lt;br&gt;
         |&lt;br&gt;
         v&lt;br&gt;
+------------------+&lt;br&gt;
|      Kafka       |&lt;br&gt;
|  Topic:          |&lt;br&gt;
| sql2016.dbo...   |&lt;br&gt;
+--------+---------+&lt;br&gt;
         |&lt;br&gt;
         v&lt;br&gt;
+------------------+&lt;br&gt;
| Kafka Connect    |&lt;br&gt;
| Snowflake Sink   |&lt;br&gt;
+--------+---------+&lt;br&gt;
         |&lt;br&gt;
         v&lt;br&gt;
+------------------+&lt;br&gt;
|    Snowflake     |&lt;br&gt;
| DAILYWEATHERFACT |&lt;br&gt;
+------------------+&lt;br&gt;
🔌 Ports Used&lt;br&gt;
Component   Port&lt;br&gt;
SQL Server  1433&lt;br&gt;
Zookeeper   2181&lt;br&gt;
Kafka Internal  29092&lt;br&gt;
Kafka External  9092&lt;br&gt;
Kafka Connect   8083&lt;br&gt;
Snowflake   443&lt;br&gt;
🔁 Data Flow Step-by-Step&lt;/p&gt;

&lt;p&gt;Insert row into SQL&lt;/p&gt;

&lt;p&gt;SQL writes change to transaction log&lt;/p&gt;

&lt;p&gt;Debezium reads log&lt;/p&gt;

&lt;p&gt;Sends event to Kafka topic&lt;/p&gt;

&lt;p&gt;Kafka stores event&lt;/p&gt;

&lt;p&gt;Snowflake sink reads topic&lt;/p&gt;

&lt;p&gt;Inserts into Snowflake table&lt;/p&gt;

&lt;p&gt;🔊 Internal vs External Listeners&lt;/p&gt;

&lt;p&gt;Kafka runs inside Docker.&lt;/p&gt;

&lt;p&gt;Internal communication:&lt;/p&gt;

&lt;p&gt;kafka:29092&lt;/p&gt;

&lt;p&gt;External (host machine):&lt;/p&gt;

&lt;p&gt;localhost:9092&lt;/p&gt;

&lt;p&gt;Why needed?&lt;br&gt;
Because:&lt;/p&gt;

&lt;p&gt;Containers talk using internal&lt;/p&gt;

&lt;p&gt;Your machine talks using external&lt;/p&gt;

&lt;p&gt;🏷 Topic Naming Pattern&lt;br&gt;
..&lt;/p&gt;
&lt;div class="table-wrapper-paragraph"&gt;&lt;table&gt;

&lt;p&gt;sql2016.dbo.DailyWeatherFact&lt;br&gt;
3️⃣ REAL WORLD USE CASE&lt;/p&gt;

&lt;p&gt;Enterprises use this for:&lt;/p&gt;

&lt;p&gt;ERP → Data warehouse sync&lt;/p&gt;

&lt;p&gt;Banking transactions&lt;/p&gt;

&lt;p&gt;Inventory changes&lt;/p&gt;

&lt;p&gt;Real-time analytics&lt;/p&gt;

&lt;p&gt;Why Kafka before Snowflake?&lt;/p&gt;

&lt;p&gt;Because:&lt;/p&gt;

&lt;p&gt;Without Kafka   With Kafka&lt;br&gt;
Tight coupling  Decoupled&lt;br&gt;
No replay   Replay anytime&lt;br&gt;
No buffering    Safe buffering&lt;br&gt;
Hard scaling    Easy scaling&lt;/p&gt;

&lt;p&gt;Kafka stores history.&lt;br&gt;
You can replay events anytime.&lt;/p&gt;

&lt;p&gt;4️⃣ FULL IMPLEMENTATION GUIDE&lt;br&gt;
Step 0 – Prerequisites&lt;/p&gt;

&lt;p&gt;OS: Windows 10/11 or Linux&lt;br&gt;
Docker: 24+&lt;br&gt;
RAM: Minimum 8GB (16GB recommended)&lt;br&gt;
Open Ports: 1433, 2181, 9092, 29092, 8083&lt;/p&gt;

&lt;p&gt;Step 1 – SQL Setup&lt;br&gt;
Enable CDC&lt;br&gt;
EXEC sys.sp_cdc_enable_db;&lt;br&gt;
GO&lt;/p&gt;

&lt;p&gt;EXEC sys.sp_cdc_enable_table&lt;br&gt;
@source_schema = 'dbo',&lt;br&gt;
@source_name   = 'DailyWeatherFact',&lt;br&gt;
@role_name     = NULL;&lt;br&gt;
GO&lt;/p&gt;

&lt;p&gt;Why?&lt;br&gt;
Enables log tracking.&lt;/p&gt;

&lt;p&gt;Create Test Table&lt;br&gt;
CREATE TABLE dbo.DailyWeatherFact (&lt;br&gt;
    WeatherDate DATE PRIMARY KEY,&lt;br&gt;
    Temperature FLOAT,&lt;br&gt;
    Humidity FLOAT&lt;br&gt;
);&lt;br&gt;
Create User&lt;br&gt;
CREATE LOGIN sa2 WITH PASSWORD = '23Hammer';&lt;br&gt;
CREATE USER sa2 FOR LOGIN sa2;&lt;br&gt;
ALTER ROLE sysadmin ADD MEMBER sa2;&lt;/p&gt;

&lt;p&gt;Why sysadmin?&lt;br&gt;
Debezium must read logs.&lt;/p&gt;

&lt;p&gt;Step 2 – Docker Compose Setup&lt;/p&gt;

&lt;p&gt;📁 docker-compose.yml&lt;/p&gt;

&lt;p&gt;version: '3.8'&lt;/p&gt;

&lt;p&gt;services:&lt;/p&gt;

&lt;p&gt;zookeeper:&lt;br&gt;
    image: confluentinc/cp-zookeeper:7.4.0&lt;br&gt;
    environment:&lt;br&gt;
      ZOOKEEPER_CLIENT_PORT: 2181&lt;br&gt;
    ports:&lt;br&gt;
      - "2181:2181"&lt;/p&gt;

&lt;p&gt;kafka:&lt;br&gt;
    image: confluentinc/cp-kafka:7.4.0&lt;br&gt;
    depends_on:&lt;br&gt;
      - zookeeper&lt;br&gt;
    ports:&lt;br&gt;
      - "9092:9092"&lt;br&gt;
    environment:&lt;br&gt;
      KAFKA_BROKER_ID: 1&lt;br&gt;
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181&lt;/p&gt;

&lt;pre class="highlight plaintext"&gt;&lt;code&gt;  KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
  KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:29092,EXTERNAL://localhost:9092
  KAFKA_LISTENERS: INTERNAL://0.0.0.0:29092,EXTERNAL://0.0.0.0:9092
  KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
&lt;/code&gt;&lt;/pre&gt;

&lt;p&gt;connect:&lt;br&gt;
    image: debezium/connect:2.5&lt;br&gt;
    depends_on:&lt;br&gt;
      - kafka&lt;br&gt;
    ports:&lt;br&gt;
      - "8083:8083"&lt;br&gt;
    environment:&lt;br&gt;
      BOOTSTRAP_SERVERS: kafka:29092&lt;br&gt;
      GROUP_ID: 1&lt;br&gt;
      CONFIG_STORAGE_TOPIC: connect-configs&lt;br&gt;
      OFFSET_STORAGE_TOPIC: connect-offsets&lt;br&gt;
      STATUS_STORAGE_TOPIC: connect-status&lt;br&gt;
Step 3 – Debezium Source Connector&lt;/p&gt;

&lt;p&gt;📁 debezium-connector.json&lt;/p&gt;

&lt;p&gt;(From uploaded file &lt;/p&gt;

&lt;p&gt;sqlserver-cdc&lt;/p&gt;

&lt;p&gt;)&lt;/p&gt;

&lt;p&gt;{&lt;br&gt;
  "name": "sqlserver-cdc",&lt;br&gt;
  "config": {&lt;br&gt;
    "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",&lt;br&gt;
    "database.hostname": "host.docker.internal",&lt;br&gt;
    "database.port": "1433",&lt;br&gt;
    "database.user": "sa2",&lt;br&gt;
    "database.password": "23Hammer",&lt;br&gt;
    "database.names": "Climatology",&lt;br&gt;
    "database.server.name": "sql2016",&lt;br&gt;
    "table.include.list": "dbo.DailyWeatherFact",&lt;br&gt;
    "database.history.kafka.bootstrap.servers": "kafka:29092",&lt;br&gt;
    "database.history.kafka.topic": "schema-changes.sql2016",&lt;br&gt;
    "snapshot.mode": "schema_only",&lt;br&gt;
    "database.encrypt": "false",&lt;br&gt;
    "database.trustServerCertificate": "true"&lt;br&gt;
  }&lt;br&gt;
}&lt;/p&gt;

&lt;p&gt;Explanation:&lt;/p&gt;

&lt;p&gt;server.name → topic prefix&lt;/p&gt;

&lt;p&gt;snapshot.mode schema_only → no initial data&lt;/p&gt;

&lt;p&gt;history topic → stores schema changes&lt;/p&gt;

&lt;p&gt;Step 4 – Snowflake Sink Connector&lt;/p&gt;

&lt;p&gt;📁 snowflake-sink.json&lt;/p&gt;

&lt;p&gt;(From uploaded file &lt;/p&gt;

&lt;p&gt;snowflake-sink-dailyweatherfact&lt;/p&gt;

&lt;p&gt;)&lt;/p&gt;

&lt;p&gt;{&lt;br&gt;
  "name": "snowflake-sink",&lt;br&gt;
  "config": {&lt;br&gt;
    "connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector",&lt;br&gt;
    "tasks.max": "1",&lt;br&gt;
    "topics": "sql2016.dbo.DailyWeatherFact",&lt;br&gt;
    "consumer.auto.offset.reset": "earliest",&lt;br&gt;
    "snowflake.url.name": "anmbgrd-cv13135.snowflakecomputing.com",&lt;br&gt;
    "snowflake.user.name": "ANUJUSER",&lt;br&gt;
    "snowflake.private.key": "YOUR_PRIVATE_KEY",&lt;br&gt;
    "snowflake.database.name": "CDC_DB",&lt;br&gt;
    "snowflake.schema.name": "RAW",&lt;br&gt;
    "snowflake.warehouse.name": "CDC_WH",&lt;br&gt;
    "snowflake.topic2table.map": "sql2016.dbo.DailyWeatherFact:DAILYWEATHERFACT_CDC",&lt;br&gt;
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",&lt;br&gt;
    "value.converter.schemas.enable": "false",&lt;br&gt;
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",&lt;br&gt;
    "buffer.count.records": "10000",&lt;br&gt;
    "buffer.flush.time": "30"&lt;br&gt;
  }&lt;br&gt;
}&lt;/p&gt;

&lt;p&gt;Buffer settings = batching for performance.&lt;/p&gt;

&lt;p&gt;Step 5 – Commands&lt;/p&gt;

&lt;p&gt;Start containers:&lt;/p&gt;

&lt;p&gt;docker-compose up -d&lt;/p&gt;

&lt;p&gt;Check running:&lt;/p&gt;

&lt;p&gt;docker ps&lt;/p&gt;

&lt;p&gt;Create Debezium connector:&lt;/p&gt;

&lt;p&gt;curl -X POST &lt;a href="http://localhost:8083/connectors" rel="noopener noreferrer"&gt;http://localhost:8083/connectors&lt;/a&gt; \&lt;br&gt;
-H "Content-Type: application/json" \&lt;br&gt;
-d @debezium-connector.json&lt;/p&gt;

&lt;p&gt;Create Snowflake sink:&lt;/p&gt;

&lt;p&gt;curl -X POST &lt;a href="http://localhost:8083/connectors" rel="noopener noreferrer"&gt;http://localhost:8083/connectors&lt;/a&gt; \&lt;br&gt;
-H "Content-Type: application/json" \&lt;br&gt;
-d @snowflake-sink.json&lt;/p&gt;

&lt;p&gt;Insert test:&lt;/p&gt;

&lt;p&gt;INSERT INTO dbo.DailyWeatherFact VALUES ('2026-01-01', 32.5, 60);&lt;/p&gt;

&lt;p&gt;Check topic:&lt;/p&gt;

&lt;p&gt;docker exec -it  \&lt;br&gt;
kafka-console-consumer \&lt;br&gt;
--bootstrap-server localhost:9092 \&lt;br&gt;
--topic sql2016.dbo.DailyWeatherFact \&lt;br&gt;
--from-beginning&lt;br&gt;
Step 6 – Validation&lt;/p&gt;

&lt;p&gt;✔ Insert in SQL&lt;br&gt;
✔ See event in Kafka&lt;br&gt;
✔ Check Snowflake:&lt;/p&gt;

&lt;p&gt;SELECT * FROM RAW.DAILYWEATHERFACT_CDC;&lt;br&gt;
Step 7 – Common Errors&lt;br&gt;
Error   Fix&lt;br&gt;
Listener not reachable  Fix advertised listeners&lt;br&gt;
Connector class not found   Use correct image&lt;br&gt;
Auth error Snowflake    Check private key&lt;br&gt;
No data flowing Check offsets&lt;br&gt;
Step 8 – Production Hardening&lt;/p&gt;

&lt;p&gt;Enable SSL&lt;/p&gt;

&lt;p&gt;Use SASL auth&lt;/p&gt;

&lt;p&gt;Replication factor ≥ 3&lt;/p&gt;

&lt;p&gt;Dead Letter Queue&lt;/p&gt;

&lt;p&gt;Retry configs&lt;/p&gt;

&lt;p&gt;Multi-broker cluster&lt;/p&gt;

&lt;p&gt;Monitor lag&lt;/p&gt;

&lt;p&gt;✅ PROJECT 1 COMPLETE&lt;/p&gt;


&lt;/table&gt;&lt;/div&gt;

</description>
      <category>beginners</category>
      <category>database</category>
      <category>dataengineering</category>
      <category>sql</category>
    </item>
  </channel>
</rss>
