<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>Forem: horiyomi</title>
    <description>The latest articles on Forem by horiyomi (@horiyomi).</description>
    <link>https://forem.com/horiyomi</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F37211%2Fdb68f62f-b185-4bcf-b950-f202c65a5a72.jpg</url>
      <title>Forem: horiyomi</title>
      <link>https://forem.com/horiyomi</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://forem.com/feed/horiyomi"/>
    <language>en</language>
    <item>
      <title>Introduction To How Kafka Works And Implementation Using Python-client</title>
      <dc:creator>horiyomi</dc:creator>
      <pubDate>Sun, 14 Mar 2021 17:12:32 +0000</pubDate>
      <link>https://forem.com/horiyomi/introduction-to-how-kafka-works-and-implementation-using-python-client-1ejo</link>
      <guid>https://forem.com/horiyomi/introduction-to-how-kafka-works-and-implementation-using-python-client-1ejo</guid>
      <description>&lt;p&gt;&lt;strong&gt;OUTLINE&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;i. A brief introduction to why we might consider Kafka for our business.&lt;/p&gt;

&lt;p&gt;ii. We explain what Kafka is.&lt;/p&gt;

&lt;p&gt;iii. A brief explanation on the &lt;strong&gt;why&lt;/strong&gt; of Kafka.&lt;/p&gt;

&lt;p&gt;iv. Highlighting why Kafka is so fast.&lt;/p&gt;

&lt;p&gt;v. A brief mention of companies using Kafka.&lt;/p&gt;

&lt;p&gt;vi. How to get started with Kafka installations, components of Kafka and what they're responsible for.&lt;/p&gt;

&lt;p&gt;vii. A walk through tutorial on Kafka&lt;/p&gt;

&lt;p&gt;viii. Conclusion&lt;/p&gt;




&lt;h1&gt;
  
  
  &lt;strong&gt;Setup a Python client for Kafka with kafka-python&lt;/strong&gt;
&lt;/h1&gt;

&lt;p&gt;Real-time data usage has become the new business order of the day both for &lt;br&gt;
businesses and their customers. However, one of the key factors to &lt;br&gt;
consider is how the business use case comes about their data for real-time&lt;br&gt;
 usage i.e does the use case do more of writing data than they read, &lt;br&gt;
more of read than write or more of both and need to take actionable &lt;br&gt;
steps in real-time and in an event driven approach, here is where Apache Kafka comes in. We will be going over what Kafka is, Kafka concepts, who is using it, how to set it up and how to use it with a python client (&lt;code&gt;kafka-python&lt;/code&gt;) in this tutorial.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;What is Apache Kafka?&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Kafka is an event streaming distributed messaging system which consists of &lt;br&gt;
servers and clients communicating over high-performance TCP network &lt;br&gt;
protocol.&lt;/p&gt;

&lt;p&gt;PS: Kafka was developed at Linkedin but now managed under the Apache foundation hence the Apache Kafka. I will be referring to Apache Kafka as Kafka throughout this tutorial&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Event Streaming&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Event streaming is the capturing, processing and transforming of data in real-time to various events from different sources e.g website clicks, databases, logging systems, IOT devices e.t.c.&lt;/p&gt;

&lt;p&gt;while ensuring continuous flow and routing stream data to various destinations anticipating the data from the event.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Why Kafka?&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Kafka is used in real-time event streaming data architectures to provide real-time data analytics, messages are stored on disk with Kafka, providing intra-cluster replication thereby making messages more durable, more reliable and supporting multiple subscribers.&lt;/p&gt;

&lt;p&gt;Kafka is able to continuously stream events by using &lt;br&gt;
publish-subscribe(pub-sub) model in that events can be read(subscribe) &lt;br&gt;
as soon as they are written(publish), processed or even stored for data &lt;br&gt;
retention over a period as Kafka gives the flexibility on how long to &lt;br&gt;
retain(store) the data.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Why Is Kafka so fast?&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Kafka is fast for a number of reasons we will be highlighting some of this reasons below&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Zero-copy - It relies heavily on the &lt;a href="https://en.wikipedia.org/wiki/Zero-copy" rel="noopener noreferrer"&gt;zero-copy&lt;/a&gt; principle i.e it interacts directly with the OS kernel to move data.&lt;/li&gt;
&lt;li&gt;Batching - It allows batching of data in chunks which enables efficient data compression thereby, reducing I/O latency.&lt;/li&gt;
&lt;li&gt;Horizontal Scaling - Kafka allows for horizontal scaling as it allows for multiple partitions (even in thousands) on a topic which could be across
thousands of machines, either on premise or cloud makes it very capable
of high loads.&lt;/li&gt;
&lt;li&gt;Avoidance of RAM - Kafka writes to an immutable commit log to the disk sequential thereby, avoiding slow disk seeking.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;strong&gt;What Problem does Kafka Solve?&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;With the rise of innovation in various aspects of life from the internet of &lt;br&gt;
things (IOT), self-driving cars, artificial intelligence, blockchain &lt;br&gt;
solutions,robotics and many more to mention a few, the rate of data &lt;br&gt;
generation is growing exponentially and it’s not slowing down anytime &lt;br&gt;
soon. Hence, for businesses to innovate and understand their customers &lt;br&gt;
more and provide better services, the traditional way of software &lt;br&gt;
development needs to be enhanced in order to incorporate inflow of this &lt;br&gt;
huge and growing datasets from various data sources including the &lt;br&gt;
aforementioned and others. With Kafka all various components of the &lt;br&gt;
system can communicate in an event driven approach where an event from &lt;br&gt;
one part of the system is translated to action in another part of the &lt;br&gt;
system the beauty of this is that it is going to be happening in real-time.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;What Companies Use Kafka?&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Thousands of companies are using Kafka in production including Fortune 500 &lt;br&gt;
companies, some of the companies including Microsoft, Netflix, Goldman &lt;br&gt;
Sachs, Target, Cisco, Intuit,Box, Pinterest, New York times and many &lt;a href="https://kafka.apache.org/powered-by" rel="noopener noreferrer"&gt;more&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Getting Started With Kafka.&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Kafka involves communication between servers and clients.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Servers&lt;/strong&gt;: Kafka runs as a cluster of one or more servers which could be located &lt;br&gt;
in one or multiple data centers on-premise or in cloud.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Clients&lt;/strong&gt;: Kafka clients allow us to write distributed system systems/applications that reads, &lt;br&gt;
writes and processes streams of events in a fault-tolerant approach in &lt;br&gt;
case of network or machine failure. The clients are available as REST APIs and in various programming languages including Java, Scala, Go, Python, C/C++ and many others. In this tutorial we will focus on using the python client.&lt;/p&gt;

&lt;p&gt;There are several client we can use to communicate with Kafka&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;Command line&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;a href="https://github.com/confluentinc/confluent-kafka-python" rel="noopener noreferrer"&gt;confluent-kafka&lt;/a&gt;&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;kafka-python (what we would be using)&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;strong&gt;Installation&lt;/strong&gt;:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;STEP 1:&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Download Kafka from &lt;a href="https://www.apache.org/dyn/closer.cgi?path=/kafka/2.7.0/kafka_2.13-2.7.0.tgz" rel="noopener noreferrer"&gt;here&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Run &lt;code&gt;tar -xzf kafka_2.13-2.7.0.tgz&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;Run &lt;code&gt;cd kafka_2.13-2.7.0&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;STEP 2:&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;NOTE&lt;/strong&gt;: Your local environment must have Java 8+ installed.&lt;/p&gt;

&lt;p&gt;Open a terminal and run this command:&lt;/p&gt;

&lt;p&gt;Run &lt;code&gt;bin/zookeeper-server-start.sh config/zookeeper.properties&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;Open another terminal and run this command&lt;/p&gt;

&lt;p&gt;Run &lt;code&gt;bin/kafka-server-start.sh config/server.properties&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;STEP 3&lt;/strong&gt;:&lt;/p&gt;

&lt;p&gt;Creating a topic to store events&lt;/p&gt;

&lt;p&gt;Run this command on another terminal&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;bin/kafka-topics.sh &lt;span class="nt"&gt;--create&lt;/span&gt; &lt;span class="nt"&gt;--topic&lt;/span&gt; quickstart-events &lt;span class="nt"&gt;--bootstrap-server&lt;/span&gt; localhost:9092&lt;span class="sb"&gt;`&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Run this command to see the topic&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;bin/kafka-topics.sh &lt;span class="nt"&gt;--describe&lt;/span&gt; &lt;span class="nt"&gt;--topic&lt;/span&gt; quickstart-events &lt;span class="nt"&gt;--bootstrap-server&lt;/span&gt; localhost:9092
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Which should return something like this&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;Topic:quickstart-events  PartitionCount:1    ReplicationFactor:1 Configs:

Topic: quickstart-events Partition: 0    Leader: 0   Replicas: 0 Isr: 0
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;STEP 4:&lt;/p&gt;

&lt;p&gt;Run this on your terminal to write an event to a topic&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;bin/kafka-console-producer.sh &lt;span class="nt"&gt;--topic&lt;/span&gt; quickstart-events &lt;span class="nt"&gt;--bootstrap-server&lt;/span&gt; localhost:9092
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;STEP 5:&lt;/p&gt;

&lt;p&gt;Run this on your terminal to read event from the topic&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;bin/kafka-console-consumer.sh &lt;span class="nt"&gt;--topic&lt;/span&gt; quickstart-events &lt;span class="nt"&gt;--from-beginning&lt;/span&gt; &lt;span class="nt"&gt;--bootstrap-server&lt;/span&gt; localhost:9092
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Zookeeper&lt;/strong&gt; is a consistent file system for configuration information which Kafka &lt;br&gt;
uses in managing and coordinating clusters/brokers which includes leadership election for broker topics partition.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Kafka broker&lt;/strong&gt;: Kafka clusters are made up of multiple brokers, each broker having a unique id. Each broker containing topic logs partitions connecting one broker bootstrap client to the entire Kafka client.&lt;/p&gt;

&lt;p&gt;With the steps highlighted above, we now have a running instance of Kafka on our machine. Before we continue, let’s get familiar with concepts of how Kafka works and the components it entails.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Kafka Concepts&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fcdn.hashnode.com%2Fres%2Fhashnode%2Fimage%2Fupload%2Fv1615242711686%2Fn1-PKfvNN.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fcdn.hashnode.com%2Fres%2Fhashnode%2Fimage%2Fupload%2Fv1615242711686%2Fn1-PKfvNN.png" alt="Screenshot_2021-02-26_at_20.06.15.png"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Events&lt;/strong&gt;: It signifies something as happened i.e data is generated in a particular part of the system that we are interested thus a record/message is written to a designated topic. Hence, an event is recorded in a key, value and timestamp format for every event written.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Topics&lt;/strong&gt;:  Kafka topic partitioned across different buckets over various number of data centers&lt;br&gt;
in across regions to ensure fault tolerance. It also ensures events are stored in the order they are written by appending new arriving events to the existing ones and are replicated across various partitions across different partitions. Note Each topic is identified by a topic name.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Producers&lt;/strong&gt;: Are client applications written in any of the available Kafka clients to solely write(publish) events  i.e messages/records to their designated topic which is identified by a topic name.&lt;br&gt;
 They are written to be agnostic of the consumer i.e the producer is not&lt;br&gt;
 aware of the consumer application it does one job and does it well &lt;br&gt;
writing of events to the topic.&lt;/p&gt;

&lt;p&gt;Consumers: Are&lt;br&gt;
 client applications for consuming events i.e messages/records in the &lt;br&gt;
order they arrived at a topic from specific topic.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;USING KAFKA-PYTHON&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;For this tutorial, it’s assumed that you are familiar with python programming language and python virtual environments. We will be using pipenv as our virtual environment for this tutorial.And we would be using an open source  kafka python client called &lt;a href="https://kafka-python.readthedocs.io/" rel="noopener noreferrer"&gt;kafka-python&lt;/a&gt; github.&lt;/p&gt;

&lt;p&gt;We would setup our virtual environment with pipenv by running this command &lt;code&gt;pipenv shell&lt;/code&gt; and we install kafka-python with &lt;code&gt;pip install kafka-python&lt;/code&gt;.&lt;/p&gt;

&lt;p&gt;Before we proceed, we need to briefly looked at some key terms when working with &lt;code&gt;kafka-python&lt;/code&gt; client.&lt;/p&gt;
&lt;h3&gt;
  
  
  &lt;code&gt;KafkaProducer&lt;/code&gt;
&lt;/h3&gt;

&lt;p&gt;&lt;code&gt;KafkaProducer&lt;/code&gt; is the client responsible for publishing record to a Kafka cluster. It does this by calling the &lt;strong&gt;send&lt;/strong&gt; method which is asynchronous and when called adds the record to a buffer of pending records, it returns immediately. Also, the producer automatically retry if the request fails unless it's configured otherwise which is one of the config that can be set.&lt;/p&gt;

&lt;p&gt;Let's create a &lt;code&gt;KafkaProducer&lt;/code&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;kafka&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;KafkaProducer&lt;/span&gt;

&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;kafka.errors&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;KafkaError&lt;/span&gt;

&lt;span class="n"&gt;producer&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;KafkaProducer&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;bootstrap_servers&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;broker1:1234&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt; &lt;span class="n"&gt;retries&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;5&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="n"&gt;future&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;producer&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;send&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;order-topic&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sa"&gt;b&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;item_name=Nike Air|item_id=1543|price=23000&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="k"&gt;try&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;

&lt;span class="n"&gt;record_metadata&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;future&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;get&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;timeout&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="k"&gt;except&lt;/span&gt; &lt;span class="n"&gt;KafkaError&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;

&lt;span class="c1"&gt;# handle exception appropriately
&lt;/span&gt;
&lt;span class="n"&gt;log&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;exception&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

&lt;span class="k"&gt;pass&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Let's do quick walk through of what is going on in the above code snippet.&lt;/p&gt;

&lt;p&gt;&lt;code&gt;KafkaProducer&lt;/code&gt; is the class used by &lt;code&gt;kafka-python&lt;/code&gt; the python client to instantiate a connection to Kafka cluster.&lt;/p&gt;

&lt;p&gt;bootstrap_servers is a list of host[:port] that the producer should contact to bootstrap initial cluster metadata.&lt;/p&gt;

&lt;p&gt;We now send record from the producer by calling send method which takes argument of the topic-name which is a str in this case &lt;strong&gt;order-topic&lt;/strong&gt;, the message, key, value, timestamp, and some other optional arguments.&lt;/p&gt;

&lt;p&gt;Now to the synchronous flow, their could be errors perhaps the topic name was not found &lt;code&gt;kafka-python&lt;/code&gt; client throw the &lt;code&gt;KafkaError&lt;/code&gt; exception which we can handle and deal appropriately.&lt;/p&gt;

&lt;p&gt;We could also send encoded records by using &lt;code&gt;msgpack&lt;/code&gt; which will produce json messages. Here is what that would look like&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="n"&gt;producer&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;KafkaProducer&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;value_serializer&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;msgpack&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;dumps&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="n"&gt;producer&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;send&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;order-topic&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;item_name&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;Nike Air&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;item_id&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mi"&gt;1543&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="n"&gt;price&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;23000&lt;/span&gt; &lt;span class="p"&gt;})&lt;/span&gt;

&lt;span class="c1"&gt;# produce json messages
&lt;/span&gt;
&lt;span class="n"&gt;producer&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;KafkaProducer&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;value_serializer&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="k"&gt;lambda&lt;/span&gt; &lt;span class="n"&gt;m&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;json&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;dumps&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;m&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="nf"&gt;encode&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;ascii&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;

&lt;span class="c1"&gt;#topic in json
&lt;/span&gt;
&lt;span class="n"&gt;producer&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;send&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;order-topic&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;  &lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;item_name&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;Nike AirForce&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;item_id&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mi"&gt;1583&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="n"&gt;price&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;28500&lt;/span&gt; &lt;span class="p"&gt;})&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;PS: There are more config that can be set on the &lt;code&gt;KafakProducer&lt;/code&gt; see the &lt;a href="https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html" rel="noopener noreferrer"&gt;documentation&lt;/a&gt; to view more configs that can be set.&lt;/p&gt;

&lt;h3&gt;
  
  
  &lt;code&gt;KafkaConsumer&lt;/code&gt;
&lt;/h3&gt;

&lt;p&gt;Consumer subscribe(reads) records from Kafka cluster. The consumer will transparently handle the failure of servers in the Kafka cluster, and adapt as topic-partitions are created or migrate between brokers.&lt;/p&gt;

&lt;p&gt;Let's create Kafka Consumer&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;kafka&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;KafkaConsumer&lt;/span&gt;
&lt;span class="c1"&gt;# To consume latest messages and auto-commit offsets
&lt;/span&gt;
&lt;span class="n"&gt;consumer&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;KafkaConsumer&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;order-topic&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;group_id&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;sample-group&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;  &lt;span class="n"&gt;bootstrap_servers&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;localhost:9092&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt;

&lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;message&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;

&lt;span class="c1"&gt;# message value and key are raw bytes -- decode if necessary!
&lt;/span&gt;
&lt;span class="c1"&gt;# e.g., for unicode: `message.value.decode('utf-8')`
&lt;/span&gt;
&lt;span class="nf"&gt;print &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;%s:%d:%d: key=%s value=%s&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt; &lt;span class="o"&gt;%&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;topic&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;partition&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;offset&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Let's walk through what's going on the Consumer code snippet&lt;/p&gt;

&lt;p&gt;&lt;code&gt;KafkaConsumer&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;&lt;code&gt;bootstrap_servers&lt;/code&gt; – ‘host[:port]’ string (or list of ‘host[:port]’ strings) that the consumer should contact to bootstrap initial cluster metadata.&lt;/p&gt;

&lt;p&gt;&lt;code&gt;group_id&lt;/code&gt; -  Is the name of the consumer group that can be join dynamically if partition assignment is enabled, which is used for fetching and committing offsets.&lt;/p&gt;

&lt;p&gt;&lt;code&gt;value_deserializer&lt;/code&gt;(callback) is any callable that takes a raw message value and returns a de-serialized value.&lt;/p&gt;

&lt;p&gt;Various approaches of consuming record from a topic&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="c1"&gt;# consume earliest available messages, don't commit offsets
&lt;/span&gt;
&lt;span class="nc"&gt;KafkaConsumer&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;auto_offset_reset&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;earliest&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;enable_auto_commit&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="bp"&gt;False&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="c1"&gt;# consume json messages
&lt;/span&gt;
&lt;span class="nc"&gt;KafkaConsumer&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;value_deserializer&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="k"&gt;lambda&lt;/span&gt; &lt;span class="n"&gt;m&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;json&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;loads&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;m&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;decode&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;ascii&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)))&lt;/span&gt; &lt;span class="c1"&gt;# consume msgpack KafkaConsumer(value_deserializer=msgpack.unpackb) # StopIteration if no message after 1sec KafkaConsumer(consumer_timeout_ms=1000)
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Conclusion&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Phew!!, if you come this far i say thank you. We've only scratched the surface of what we can do with Kafka, there many more things that can be achieved by extending the arguments in both the &lt;code&gt;KafkaProducer&lt;/code&gt; and &lt;code&gt;KafkaConsumer&lt;/code&gt; from authentication using SSL, setting SSL certificate, adding new topic dynamically. We can explore more config from the &lt;code&gt;kafka-python&lt;/code&gt; &lt;a href="https://kafka-python.readthedocs.io/en/master/usage.html" rel="noopener noreferrer"&gt;documentation&lt;/a&gt;.&lt;/p&gt;

</description>
    </item>
  </channel>
</rss>
