<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>Forem: Abdelrahman Ahmed</title>
    <description>The latest articles on Forem by Abdelrahman Ahmed (@abdelrahman_ahmed).</description>
    <link>https://forem.com/abdelrahman_ahmed</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F1576772%2Fb5a12439-6978-4b09-94a9-446bb62f3e4b.jpeg</url>
      <title>Forem: Abdelrahman Ahmed</title>
      <link>https://forem.com/abdelrahman_ahmed</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://forem.com/feed/abdelrahman_ahmed"/>
    <language>en</language>
    <item>
      <title>From AWS Kinesis to Apache Kafka: Building a Real-Time Streaming Bridge</title>
      <dc:creator>Abdelrahman Ahmed</dc:creator>
      <pubDate>Thu, 27 Nov 2025 08:18:09 +0000</pubDate>
      <link>https://forem.com/abdelrahman_ahmed/from-aws-kinesis-to-apache-kafka-building-a-real-time-streaming-bridge-96b</link>
      <guid>https://forem.com/abdelrahman_ahmed/from-aws-kinesis-to-apache-kafka-building-a-real-time-streaming-bridge-96b</guid>
      <description>&lt;h2&gt;
  
  
  Introduction:
&lt;/h2&gt;

&lt;p&gt;In one of my recent projects, I needed to replicate data from an AWS Kinesis stream into an Apache Kafka topic. The goal was to allow downstream systems to consume the same events from Kafka that were originally being produced to Kinesis.&lt;/p&gt;

&lt;p&gt;The core question here is: &lt;strong&gt;How can we replicate data from AWS Kinesis stream to a Kafka topic in real-time and with minimal latency, and with the highest reliability?&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;I explored different solutions, including &lt;strong&gt;Kafka Connect&lt;/strong&gt;, &lt;strong&gt;MirrorMaker&lt;/strong&gt;, and other integration tools. However, most of them came with complex configurations and limitations that didn’t fit our infrastructure setup.&lt;/p&gt;

&lt;p&gt;So, I decided to build a custom Kinesis-to-Kafka &lt;strong&gt;bridge&lt;/strong&gt; that will be flexible, lightweight, highly configurable, and perfectly plug into our infrastructure setup.&lt;/p&gt;

&lt;p&gt;If you’re new to &lt;strong&gt;Amazon Kinesis&lt;/strong&gt;, here’s a quick overview of it (feel free to skip this section if you’re already familiar):&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Amazon Kinesis&lt;/strong&gt; is a group of streaming services within Amazon Web Services (AWS) created to stream real-time data at scale. It contains different services like:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Amazon Kinesis Data Streams&lt;/li&gt;
&lt;li&gt;Amazon Kinesis Data Firehose&lt;/li&gt;
&lt;li&gt;Amazon Kinesis Video Streams&lt;/li&gt;
&lt;li&gt;Amazon Managed Service for Apache Flink&lt;/li&gt;
&lt;/ul&gt;

&lt;blockquote&gt;
&lt;p&gt;In this article, we will focus on &lt;strong&gt;Kinesis Data Streams&lt;/strong&gt;, which is a serverless (fully managed by AWS) streaming service designed to handle high-throughput, low-latency streaming use-cases like events, logs, and clickstreams.&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;If you’re also new to &lt;strong&gt;Apache Kafka&lt;/strong&gt;, here’s a quick overview(feel free to skip this section if you’re already familiar):&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Apache Kafka&lt;/strong&gt; is a distributed event streaming platform that handles large volumes of real-time data. It provides durability, scalability, and strong ordering guarantees. It is usually used to build data pipelines and streaming applications at different scales, and it is very dominant in event-driven architectures.&lt;br&gt;&lt;br&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  The Replicator Schema
&lt;/h2&gt;

&lt;p&gt;The replicator consists of three core stages, as shown in the following diagram:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;┌──────────────────────────┐
│     Kinesis Consumer     │  ← Consumes records from AWS Kinesis stream
└────────────┬─────────────┘
             │
             ▼
┌──────────────────────────┐
│     Data Processing      │  ← Applies business logic to transform/validate/parse the records
└────────────┬─────────────┘
             │
             ▼
┌──────────────────────────┐
│    Producing to Kafka    │  ← Produces processed events to Kafka topic/topics
└──────────────────────────┘
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Kinesis Consumer:&lt;/strong&gt; Consuming data from a Kinesis stream requires more than just reading records; it requires manual management for offsets since Kinesis doesn’t provide a built-in offset tracking mechanism for consumers. Instead, consumers are responsible for tracking their own progress, typically by storing checkpoint data in a DynamoDB table, often referred to as a &lt;strong&gt;lease table&lt;/strong&gt;.&lt;/p&gt;

&lt;p&gt;This approach, called &lt;strong&gt;leasing&lt;/strong&gt;, allows multiple consumers to coordinate shard (the basic unit of Kinesis stream) access and avoid processing the same records. You can implement this logic manually (very complex) or use a ready-to-use library such as Amazon &lt;a href="https://github.com/awslabs/amazon-kinesis-client" rel="noopener noreferrer"&gt;Kinesis Client Library (KCL)&lt;/a&gt;, which handles the lease management, DynamoDB tables creation, and offset tracking out of the box.&lt;/p&gt;

&lt;p&gt;In my case, I used KCL for Python (&lt;a href="https://github.com/awslabs/amazon-kinesis-client-python" rel="noopener noreferrer"&gt;KCLPY&lt;/a&gt;) to consume from Kinesis so that I can avoid re-implementing low-level coordination and state management logic.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Data Processing:&lt;/strong&gt; This stage is where your core business logic lives. After data is fetched from Kinesis but before it’s forwarded to Kafka, you may need to:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Adding any required metadata to records&lt;/li&gt;
&lt;li&gt;Validate or filter the records&lt;/li&gt;
&lt;li&gt;Transform the records to meet Kafka consumer expectations&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;If no transformation is needed, the data can be streamed directly to Kafka.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Producing to Kafka:&lt;/strong&gt; This is the final stage, in which records are being published to the designated Kafka topic.&lt;/p&gt;

&lt;p&gt;At the implementation level, this is typically achieved by using one of the Kafka SDKs. In my case, I used the &lt;strong&gt;confluent_kafka‎&lt;/strong&gt; client library for Python.&lt;/p&gt;

&lt;p&gt;After this stage, the records are serialized and published to Kafka, which allows the downstream systems to consume the data in real-time.&lt;br&gt;&lt;br&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Tech Stack and Project Structure
&lt;/h2&gt;

&lt;p&gt;This project was developed using Python, kclpy, confluent_kafka, and Docker.&lt;/p&gt;

&lt;p&gt;Below is an overview of the project structure:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;kinesis-to-kafka-replicator/
├── kinesis_replicator/
│   ├── __init__.py
│   └── record_processor.py
│
├── config/
│   ├── kcl.properties.template
│   └── replicator_configs.template
│
├── amazon_kclpy/
│   └── jars/
│
├── run.sh
├── Dockerfile
├── requirements.txt
└── README.md
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Configuration Breakdown
&lt;/h3&gt;

&lt;p&gt;The configuration is separated into two parts:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;KCLPY Configuration:&lt;/strong&gt; Located in kcl.properties.template and contains all the required settings for the Kinesis client library (KCL), such as &lt;strong&gt;streamName&lt;/strong&gt;, &lt;strong&gt;initialPositionInStream&lt;/strong&gt;, and the AWS authentication parameters. A more detailed list can be found &lt;a href="https://github.com/awslabs/amazon-kinesis-client-python/blob/master/samples/sample.properties" rel="noopener noreferrer"&gt;here&lt;/a&gt;.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Replicator Configuration:&lt;/strong&gt; Located in replicator_configs.template and includes the kafka_specific settings, such as &lt;strong&gt;bootstrap_servers&lt;/strong&gt;, &lt;strong&gt;kafka_topic&lt;/strong&gt;, and &lt;strong&gt;client_id&lt;/strong&gt;, along with any global parameters required for tuning the replicator, such as &lt;strong&gt;sleep_seconds&lt;/strong&gt;.&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;blockquote&gt;
&lt;p&gt;All Python dependencies, such as &lt;strong&gt;boto3&lt;/strong&gt;, &lt;strong&gt;confluent_kafka&lt;/strong&gt;, and &lt;strong&gt;amazon_kclpy&lt;/strong&gt;, are listed in &lt;strong&gt;requirements.txt&lt;/strong&gt;.&lt;br&gt;&lt;/p&gt;
&lt;/blockquote&gt;

&lt;h3&gt;
  
  
  Core Logic: record_processor.py
&lt;/h3&gt;

&lt;p&gt;This is the heart code base of the replicator.&lt;/p&gt;

&lt;p&gt;The RecordProcessor class is responsible for:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Consuming/Receiving the records from the Kinesis stream&lt;/li&gt;
&lt;li&gt;Applying any business logic or transformations&lt;/li&gt;
&lt;li&gt;Publishing the resulting data to Kafka&lt;/li&gt;
&lt;li&gt;Handling retries, checkpointing, and logging&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;A more detailed sample can be found &lt;a href="https://github.com/awslabs/amazon-kinesis-client-python/blob/master/samples/sample_kclpy_app.py" rel="noopener noreferrer"&gt;&lt;strong&gt;here&lt;/strong&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Following a simplified breakdown of what’s happening:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;➤ Processing Kinesis Records&lt;/strong&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="c1"&gt;# Example for KCL process_records method with leasing functionality (checkpointing)
&lt;/span&gt;
    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;process_records&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;process_records_input&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;
        Called by a KCLProcess with a list of records to be processed and a checkpointer which accepts sequence numbers
        from the records to indicate where in the stream to checkpoint.

        :param amazon_kclpy.messages.ProcessRecordsInput process_records_input: the records, and metadata about the
            records.
        &lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;
        &lt;span class="k"&gt;try&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;process_records_input&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;records&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
                &lt;span class="n"&gt;data&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;binary_data&lt;/span&gt;
                &lt;span class="n"&gt;seq&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;int&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;sequence_number&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
                &lt;span class="n"&gt;sub_seq&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;sub_sequence_number&lt;/span&gt;
                &lt;span class="n"&gt;key&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;partition_key&lt;/span&gt;
                &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;process_record&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;data&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;seq&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;sub_seq&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="c1"&gt;# Business logic is being imlemented in this methos
&lt;/span&gt;                &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;should_update_sequence&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;seq&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;sub_seq&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
                    &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;_largest_seq&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;seq&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;sub_seq&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

            &lt;span class="c1"&gt;# Checkpoints every N seconds
&lt;/span&gt;            &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;time&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="o"&gt;-&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;_last_checkpoint_time&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;_CHECKPOINT_FREQ_SECONDS&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
                &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;checkpoint&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;process_records_input&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;checkpointer&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nf"&gt;str&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;_largest_seq&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;]),&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;_largest_seq&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt;
                &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;_last_checkpoint_time&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;time&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

        &lt;span class="k"&gt;except&lt;/span&gt; &lt;span class="nb"&gt;Exception&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;log&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Encountered an exception while processing records. Exception was {e}&lt;/span&gt;&lt;span class="se"&gt;\n&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;format&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;➤ Producing to Kafka&lt;/strong&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="c1"&gt;# A code snippet from the process_record method mentioned above that shows how to produce to Kafka
&lt;/span&gt;
            &lt;span class="k"&gt;try&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
                &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;kafka_producer&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;produce&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
                    &lt;span class="n"&gt;topic&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;kafka_topic&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                    &lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;data&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                    &lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;partition_key&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                    &lt;span class="n"&gt;callback&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;callback&lt;/span&gt;
                &lt;span class="p"&gt;)&lt;/span&gt;
            &lt;span class="k"&gt;except&lt;/span&gt; &lt;span class="nb"&gt;BufferError&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
                &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;warning&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Kafka producer queue full, draining and retrying for record &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;sequence_number&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
                &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;kafka_producer&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;poll&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mf"&gt;1.0&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
                &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;kafka_producer&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;produce&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
                    &lt;span class="n"&gt;topic&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;kafka_topic&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                    &lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;data&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                    &lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;partition_key&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                    &lt;span class="n"&gt;callback&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;callback&lt;/span&gt;
                &lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This ensures the record is safely published to Kafka, with retry logic in case of temporary backpressure.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;amazon_kclpy/jars/:&lt;/strong&gt; This folder contains the Java dependencies used by KCLPY. These are required to enable the bridge between Python and the underlying KCL lease coordination.&lt;br&gt;
Jar files can be generated by following the instructions written here.&lt;br&gt;&lt;br&gt;&lt;/p&gt;
&lt;h2&gt;
  
  
  Deployment and Startup
&lt;/h2&gt;

&lt;p&gt;To ensure portability and seamless deployment, the replicator is contranized using Docker. Here is a breakdown of how to build and run it.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Dockerfile:&lt;/strong&gt; Defines how to build a container image for the replicator that includes the required Python environment, libraries, and the Kinesis Client Library (KCL) dependencies.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;➤ Sample Dockerfile&lt;/strong&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight docker"&gt;&lt;code&gt;&lt;span class="k"&gt;FROM&lt;/span&gt;&lt;span class="s"&gt; python:3.9-slim&lt;/span&gt;

&lt;span class="c"&gt;# Install system dependencies&lt;/span&gt;
&lt;span class="k"&gt;RUN &lt;/span&gt;apt-get update &lt;span class="o"&gt;&amp;amp;&amp;amp;&lt;/span&gt; apt-get &lt;span class="nb"&gt;install&lt;/span&gt; &lt;span class="nt"&gt;-y&lt;/span&gt; openjdk-11-jre curl unzip &lt;span class="o"&gt;&amp;amp;&amp;amp;&lt;/span&gt; &lt;span class="se"&gt;\
&lt;/span&gt;    &lt;span class="nb"&gt;rm&lt;/span&gt; &lt;span class="nt"&gt;-rf&lt;/span&gt; /var/lib/apt/lists/&lt;span class="k"&gt;*&lt;/span&gt;

&lt;span class="c"&gt;# Set workdir and copy code&lt;/span&gt;
&lt;span class="k"&gt;WORKDIR&lt;/span&gt;&lt;span class="s"&gt; /app&lt;/span&gt;
&lt;span class="k"&gt;COPY&lt;/span&gt;&lt;span class="s"&gt; . /app&lt;/span&gt;

&lt;span class="c"&gt;# Install Python requirements&lt;/span&gt;
&lt;span class="k"&gt;RUN &lt;/span&gt;pip &lt;span class="nb"&gt;install&lt;/span&gt; &lt;span class="nt"&gt;--no-cache-dir&lt;/span&gt; &lt;span class="nt"&gt;-r&lt;/span&gt; requirements.txt

&lt;span class="c"&gt;# Set entrypoint&lt;/span&gt;
&lt;span class="k"&gt;ENTRYPOINT&lt;/span&gt;&lt;span class="s"&gt; ["./run.sh"]&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;run.sh:&lt;/strong&gt; The &lt;strong&gt;&lt;em&gt;run.sh&lt;/em&gt;&lt;/strong&gt; script acts as the &lt;strong&gt;&lt;em&gt;entrypoint&lt;/em&gt;&lt;/strong&gt; to launch the replicator. First, the script renders the configuration template files into typical configuration files, then it bootstraps the KCL MultiDaemon with that configuration.&lt;/p&gt;

&lt;p&gt;The &lt;strong&gt;&lt;em&gt;run.sh&lt;/em&gt;&lt;/strong&gt; script includes:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Template rendering for the config files using the &lt;em&gt;&lt;strong&gt;envsubst&lt;/strong&gt;&lt;/em&gt; command&lt;/li&gt;
&lt;li&gt;Launching the KCL Python daemon with the provided configuration&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;strong&gt;➤ Sample run.sh&lt;/strong&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="c"&gt;#!/bin/bash&lt;/span&gt;
&lt;span class="nb"&gt;set&lt;/span&gt; &lt;span class="nt"&gt;-e&lt;/span&gt;

&lt;span class="c"&gt;# Rendering the configuration templates&lt;/span&gt;
envsubst &amp;lt; config/replicator_configs.template &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; config/replicator_configs.json
envsubst &amp;lt; config/kcl.properties.template &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; config/kcl.properties

&lt;span class="c"&gt;# Start KCL daemon&lt;/span&gt;
java &lt;span class="nt"&gt;-cp&lt;/span&gt; &lt;span class="s2"&gt;"amazon_kclpy/jars/*"&lt;/span&gt; software.amazon.kinesis.multilang.MultiLangDaemon config/kcl.properties
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Build and Run&lt;/strong&gt;&lt;br&gt;&lt;/p&gt;

&lt;p&gt;Pre-requisites:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Make sure that your Kinesis stream has data&lt;/li&gt;
&lt;li&gt;Make sure that your Kafka topic exists and is ready to receive data&lt;/li&gt;
&lt;li&gt;The number of shards in the stream must equal the number of partitions in the Kafka topic to help in maintaining global ordering&lt;/li&gt;
&lt;li&gt;The KCL requires some DynamoDB tables to handle Kinesis offsets; you can pre-create them or give the KCL permissions to do that based on the AWS IAM role used.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;strong&gt;1) Build the Docker image&lt;/strong&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="c"&gt;# Go to the Dockerfile path and run the following command&lt;/span&gt;

docker build &lt;span class="nt"&gt;-t&lt;/span&gt; kinesis-to-kafka-replicator &lt;span class="nb"&gt;.&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;2) Prepare the required environment variables&lt;/strong&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="c"&gt;# Example for KCL configuration&lt;/span&gt;
&lt;span class="nv"&gt;KCL_STREAM_NAME&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;kafka-kinesis-replicator-test
&lt;span class="nv"&gt;KCL_APPLICATION_NAME&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;replicator-app-name
&lt;span class="nv"&gt;KCL_REGION&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;eu-west-1
&lt;span class="nv"&gt;KCL_INITIAL_POSITION&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;LATEST
&lt;span class="nv"&gt;KCL_LOG_LEVEL&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;DEBUG
&lt;span class="nv"&gt;KCL_AWS_CREDENTIALS_PROVIDER&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;DefaultCredentialsProvider

&lt;span class="c"&gt;# Example for Kafka and global configuration&lt;/span&gt;
&lt;span class="nv"&gt;SLEEP_SECONDS&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;5
&lt;span class="nv"&gt;CHECKPOINT_RETRIES&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;5
&lt;span class="nv"&gt;CHECKPOINT_FREQ_SECONDS&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;30
&lt;span class="nv"&gt;KAFKA_TOPIC&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;kinesis_kafka_replication_test
&lt;span class="nv"&gt;KAFKA_BOOTSTRAP_SERVERS&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;bootstrap-server:9094
&lt;span class="nv"&gt;KAFKA_SASL_USERNAME&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;kafka-user
&lt;span class="nv"&gt;KAFKA_SASL_PASSWORD&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;kafka-password
&lt;span class="nv"&gt;KAFKA_SASL_MECHANISM&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;SCRAM-SHA-512
&lt;span class="nv"&gt;KAFKA_SECURITY_PROTOCOL&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;SASL_PLAINTEXT
&lt;span class="nv"&gt;KAFKA_CLIENT_ID&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;replicator-client
&lt;span class="nv"&gt;NUMBER_OF_SHARDS&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;4
&lt;span class="nv"&gt;AWS_ACCESS_KEY_ID&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;example-key-id
&lt;span class="nv"&gt;AWS_SECRET_ACCESS_KEY&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;example-secret-key
&lt;span class="nv"&gt;AWS_SESSION_TOKEN&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;example-token
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Note:&lt;/strong&gt; The above environment variables are saved into a kinesis_kafka_replicator.env file and will be used in the following Docker run command to be passed to the container.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;3) Run the container&lt;/strong&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;docker run &lt;span class="nt"&gt;--env-file&lt;/span&gt; kinesis_kafka_replicator.env &lt;span class="nt"&gt;-it&lt;/span&gt; kinesis-to-kafka-replicator
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Verify the container is in a running state.&lt;br&gt;&lt;br&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Verification and Observability
&lt;/h2&gt;

&lt;p&gt;Once the replicator container is up and running, we can observe some logs to make sure that the app inside the container is working properly.&lt;/p&gt;

&lt;p&gt;1) &lt;strong&gt;The application started successfully:&lt;/strong&gt; The highlighted log indicates that the replicator started successfully, and it is processing the correct stream.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdrive.google.com%2Fuc%3Fexport%3Dview%26id%3D13GQpb22IlZeHl1q0v6bX9CviJY-8WCVD" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdrive.google.com%2Fuc%3Fexport%3Dview%26id%3D13GQpb22IlZeHl1q0v6bX9CviJY-8WCVD" alt="replicator started successfully" width="1028" height="276"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;2) &lt;strong&gt;Monitor the DDB tables creation:&lt;/strong&gt; The following images confirm that the replicator can recognize the pre-created tables and will not create them again.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdrive.google.com%2Fuc%3Fexport%3Dview%26id%3D1orJqs_Tql6XqhRXmA4zfnDf2D0xlO9JA" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdrive.google.com%2Fuc%3Fexport%3Dview%26id%3D1orJqs_Tql6XqhRXmA4zfnDf2D0xlO9JA" alt="ddb table1" width="1537" height="342"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdrive.google.com%2Fuc%3Fexport%3Dview%26id%3D1Ib1dLP59Y_m8BwSY6TFn2tPfeHRt0Rhc" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdrive.google.com%2Fuc%3Fexport%3Dview%26id%3D1Ib1dLP59Y_m8BwSY6TFn2tPfeHRt0Rhc" alt="ddb table2" width="1534" height="229"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;3) &lt;strong&gt;KCL leader election:&lt;/strong&gt; The replicator elected a worker leader and assigned it to lead the consumption from a shard.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdrive.google.com%2Fuc%3Fexport%3Dview%26id%3D1UAWiE2U95MCk2JA-Svga3di6WTEJSJcV" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdrive.google.com%2Fuc%3Fexport%3Dview%26id%3D1UAWiE2U95MCk2JA-Svga3di6WTEJSJcV" alt="leader election" width="1541" height="392"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;4) &lt;strong&gt;KCL Kinesis consumer:&lt;/strong&gt; The replicator started to consume the records from the Kinesis stream and push them to Kafka.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdrive.google.com%2Fuc%3Fexport%3Dview%26id%3D103bJftOBW6KtNAh66HPy0_btAkOK24MI" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdrive.google.com%2Fuc%3Fexport%3Dview%26id%3D103bJftOBW6KtNAh66HPy0_btAkOK24MI" alt="kinesis consumer" width="1532" height="451"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;5) &lt;strong&gt;Observing the Kafka topic:&lt;/strong&gt; By observing the Kafka topic on Kafka Ui, it can be easily noticed that the messages started to show up there.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdrive.google.com%2Fuc%3Fexport%3Dview%26id%3D1CevRqzuJIOm_xjiIZWfQ9ADc09SaufD5" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdrive.google.com%2Fuc%3Fexport%3Dview%26id%3D1CevRqzuJIOm_xjiIZWfQ9ADc09SaufD5" alt="kafka topic ui" width="1720" height="730"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;All of this confirms that our replicator is working properly and can mirror the data between the two platforms successfully.&lt;br&gt;&lt;br&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Conclusion
&lt;/h2&gt;

&lt;p&gt;This approach helped in real-time streaming between Kinesis and Kafka without over-engineering. In this guide, we built a lightweight replicator using Python, KCL, and Kafka libraries to bridge data between the two systems in real-time, ensuring reliability and configurability.&lt;/p&gt;

</description>
      <category>aws</category>
      <category>kafka</category>
      <category>kinesis</category>
    </item>
    <item>
      <title>AWS 50% discount for Associate Certifications</title>
      <dc:creator>Abdelrahman Ahmed</dc:creator>
      <pubDate>Sun, 18 Aug 2024 16:07:51 +0000</pubDate>
      <link>https://forem.com/abdelrahman_ahmed/aws-50-discount-for-associate-certifications-1nm3</link>
      <guid>https://forem.com/abdelrahman_ahmed/aws-50-discount-for-associate-certifications-1nm3</guid>
      <description>&lt;p&gt;AWS is offering a 50% discount for all associate certifications, this comes under the &lt;strong&gt;AWS Certified: Associate Challenge&lt;/strong&gt;.&lt;/p&gt;

&lt;p&gt;You can get the voucher by registering for this challenge from the following AWS link: &lt;a href="https://pages.awscloud.com/GLOBAL-ln-GC-Traincert-Associate-Certification-Challenge-Registration-2024.html" rel="noopener noreferrer"&gt;Associate Challenge&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Happy Certifying!&lt;/p&gt;

</description>
    </item>
  </channel>
</rss>
