<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>Forem: Bill Bejeck</title>
    <description>The latest articles on Forem by Bill Bejeck (@bbejeck).</description>
    <link>https://forem.com/bbejeck</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F542288%2F8e0d4f77-c0ed-406b-a67f-9feca375998b.jpg</url>
      <title>Forem: Bill Bejeck</title>
      <link>https://forem.com/bbejeck</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://forem.com/feed/bbejeck"/>
    <language>en</language>
    <item>
      <title>Windowing for event streams</title>
      <dc:creator>Bill Bejeck</dc:creator>
      <pubDate>Mon, 22 Jan 2024 16:59:46 +0000</pubDate>
      <link>https://forem.com/bbejeck/windowing-for-event-streams-55d6</link>
      <guid>https://forem.com/bbejeck/windowing-for-event-streams-55d6</guid>
      <description>&lt;p&gt;Stream processing is the best way to work with event data. While batch&lt;br&gt;
processing still has its use cases, and probably always will, only stream processing offers the ability to respond in real-time to events.&lt;/p&gt;

&lt;p&gt;But if we zoom in, what does it look like to respond to events? By now,&lt;br&gt;
I'm sure you're familiar with the oft-quoted fraud scenario - a person with nefarious intent gets a hold of an unaware consumer's credit card number. Still, due to the bank's responsiveness processing system, the fraudulent charge gets declined.&lt;/p&gt;

&lt;p&gt;Other uses of stream processing require an immediate response but are&lt;br&gt;
not tied to one single event. Consider monitoring the heat of a&lt;br&gt;
manufacturing process; if the average temperature reaches a certain&lt;br&gt;
threshold in a given period, then the monitoring process should generate&lt;br&gt;
an alert. But this isn't about one temperature spike. It's about a&lt;br&gt;
consistent upward trend. In other words, what are the temperature&lt;br&gt;
readings doing during a fixed period?&lt;/p&gt;

&lt;p&gt;I'm talking about windowing in event streams, if you have not guessed by&lt;br&gt;
now. While aggregations (an aggregation is a grouping of events by a&lt;br&gt;
common attribute) are a vital tool to leverage an event stream, an&lt;br&gt;
aggregation over all time doesn't shed any light on specific periods of&lt;br&gt;
activity. Consider the following illustration:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fscip3umm2s2jpb2yidcj.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fscip3umm2s2jpb2yidcj.png" alt="Average temperature aggregated over all time" width="800" height="279"&gt;&lt;/a&gt;&lt;br&gt;
Over time the average temperature reading has increased some over time,&lt;br&gt;
but it doesn't tell the whole story. Now let's take a look at capturing&lt;br&gt;
the average temp readings over specific intervals:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fyxml4ff2numumme5qzk6.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fyxml4ff2numumme5qzk6.png" alt="Windowed average temp readings" width="800" height="322"&gt;&lt;/a&gt;&lt;br&gt;
Now by getting readings at specific intervals (windows) you can spot the&lt;br&gt;
issue with a large jump in the average value.&lt;/p&gt;

&lt;p&gt;This is not to say that an aggregation over all time isn't helpful, but&lt;br&gt;
that, in many cases, you'll want to aggregate over specific intervals.&lt;br&gt;
In other cases, you'll want an aggregation not defined by fixed time&lt;br&gt;
boundaries but by behavior, e.g., session windows whose boundaries are&lt;br&gt;
based on periods of &lt;em&gt;inactivity.&lt;/em&gt; We'll get into session windows in a&lt;br&gt;
post later in the blog series.&lt;/p&gt;

&lt;p&gt;This blog post marks the first in a series about windowing in the two&lt;br&gt;
dominant stream processing technologies today: &lt;a href="https://kafka.apache.org/36/documentation/streams/developer-guide/"&gt;Kafka Streams&lt;/a&gt; and &lt;a href="https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql/overview/"&gt;Flink, specifically Flink SQL&lt;/a&gt;).&lt;br&gt;
It's important to note that the point of this blog series is not a&lt;br&gt;
direct comparison between the two APIs. Instead, it is a resource for&lt;br&gt;
windowed operations in Kafka Streams and Flink SQL. While comparing the&lt;br&gt;
two in a competitive analysis is natural, it's not the main focus here.&lt;/p&gt;

&lt;p&gt;The blog series will discuss:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;The different types of windowing, semantics, and potential use&lt;br&gt;
cases.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Time semantics&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Interpretation of the results&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Testing windowed applications&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;I will assume basic familiarity with Kafka Streams and Flink SQL, so the&lt;br&gt;
examples will start by covering windowing.&lt;/p&gt;

&lt;p&gt;But before we get into windowing, let's discuss how Kafka Streams and&lt;br&gt;
Flink SQL structure windowing applications. We'll only cover this level&lt;br&gt;
of detail in this initial post, and subsequent ones will assume knowledge of how to assemble the program and focus on the windowing aspect.&lt;/p&gt;
&lt;h2&gt;
  
  
  Kafka Streams windowing
&lt;/h2&gt;

&lt;p&gt;You'll need to specify an&lt;br&gt;
&lt;a href="https://docs.confluent.io/platform/current/streams/developer-guide/dsl-api.html#aggregating"&gt;aggregation&lt;/a&gt; to do any windowing in Kafka Streams. Aggregations are a function that&lt;br&gt;
combines smaller components into a large composition, clustered around&lt;br&gt;
some attribute, which in Kafka Streams will be the key in the key-value&lt;br&gt;
pairs. You can also perform a reduce, a specialized form of aggregation,since a reduce operation will return the same type as its input components. Generally, an aggregation can return a completely different value from the inputs. But since windowing operates the same for either a reduce or aggregation will use an aggregation for our examples throughout the blog series.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;KStream&amp;lt;String,Double&amp;gt; iotHeatSensorStream =
  builder.stream("heat-sensor-input",
    Consumed.with(stringSerde, doubleSerde));
iotHeatSensorStream.groupByKey() &amp;lt;1&amp;gt;
      .windowedBy(&amp;lt;window specificatation&amp;gt;) &amp;lt;2&amp;gt;
        .aggregate(() -&amp;gt; new IotSensorAggregation(tempThreshold), &amp;lt;3&amp;gt;
         aggregator,
         Materialized.with(stringSerde, aggregationSerde))
         .toStream().to("sensor-agg-output",
           Produced.with(windowedSerde, sensorAggregationSerde))&amp;lt;4&amp;gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Let's walk through the essential points of setting up the Kafka Streams&lt;br&gt;
window aggregation:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;The first step is to group all records by key; this is required before performing any aggregation. Here you're using&lt;br&gt;
&lt;a href="https://javadoc.io/static/org.apache.kafka/kafka-streams/3.6.1/org/apache/kafka/streams/kstream/KStream.html#groupByKey--"&gt;KStream.groupByKey&lt;/a&gt; which assumes the underlying key-value pairs have the correct keys needed for clustering together. If not, you could use the&lt;br&gt;
&lt;a href="https://javadoc.io/static/org.apache.kafka/kafka-streams/3.6.1/org/apache/kafka/streams/kstream/KStream.html#groupBy-org.apache.kafka.streams.kstream.KeyValueMapper-"&gt;KStream.groupBy&lt;/a&gt; function where you pass a&lt;br&gt;
&lt;a href="https://javadoc.io/static/org.apache.kafka/kafka-streams/3.6.1/org/apache/kafka/streams/kstream/KeyValueMapper.html"&gt;KeyValueMapper&lt;/a&gt; instance that maps the current key-value pair into a new one which allows you to create a new key suitable for the aggregation&lt;br&gt;
grouping. Note that changing the key for a group-by will lead to a&lt;br&gt;
re-partitioning of the records.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;You are specifying the windowing - we'll cover the specific types in&lt;br&gt;
later posts.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Point three is where you're specifying how to aggregate records. The&lt;br&gt;
first parameter is an Initializer represented as a lambda function,&lt;br&gt;
which provides the initial value. The second parameter is the&lt;br&gt;
Aggregator instance, which performs the aggregation action you&lt;br&gt;
specify. Here, it's a simple average and tracking the highest and&lt;br&gt;
lowest values seen. The third parameter is a Materialized instance&lt;br&gt;
specifying how to store the aggregation. Since the value type&lt;br&gt;
differs from the incoming value, you must provide the appropriate&lt;br&gt;
Serde instance for Kafka Streams to use when (de)serializing&lt;br&gt;
records.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;The final point is where you provide the Serde instances for&lt;br&gt;
producing the results back to Kafka. The key Serde is a different&lt;br&gt;
type as Kafka Streams wraps the incoming record key in a Windowed&lt;br&gt;
instance.&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;What's not apparent from this aggregation example is where the&lt;br&gt;
timestamps for the window are. But there's a big hint in the explanation&lt;br&gt;
of the aggregation example. At point four of the aggregation&lt;br&gt;
description, Kafka Streams wraps the original key in a&lt;br&gt;
&lt;a href="https://www.javadoc.io/doc/org.apache.kafka/kafka-streams/latest/org/apache/kafka/streams/kstream/Windowed.html"&gt;Windowed&lt;/a&gt;&lt;br&gt;
object.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Ffmuiini0apawwbpxfo1r.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Ffmuiini0apawwbpxfo1r.png" alt="Windowed object" width="800" height="421"&gt;&lt;/a&gt;&lt;br&gt;
As shown in this illustration, the Windowed object contains the original&lt;br&gt;
key and the&lt;br&gt;
&lt;a href="https://www.javadoc.io/static/org.apache.kafka/kafka-streams/3.6.1/org/apache/kafka/streams/kstream/Window.html"&gt;Window&lt;/a&gt;&lt;br&gt;
instance for the aggregation values. The Window object has the start and&lt;br&gt;
end time for the aggregation window. It doesn't contain the window size,&lt;br&gt;
but you can easily calculate the size by subtracting the start time from&lt;br&gt;
the end. We'll cover reporting and analyzing the aggregation window&lt;br&gt;
times in a follow-on blog post.&lt;/p&gt;

&lt;p&gt;Wrapping the original key in a Windowed object changes the type, meaning&lt;br&gt;
you'll have to update Kafka Streams on serializing the results.&lt;br&gt;
Fortunately, Kafka Streams provides the&lt;br&gt;
&lt;a href="https://www.javadoc.io/static/org.apache.kafka/kafka-streams/3.6.1/org/apache/kafka/streams/kstream/WindowedSerdes.html"&gt;WindowedSerdes&lt;/a&gt;&lt;br&gt;
utility class making it easy to get the correct Serde for producing&lt;br&gt;
results back to Kafka:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;Serde&amp;lt;Windowed&amp;lt;String&amp;gt;&amp;gt; windowedSerde =
        WindowedSerdes.timeWindowedSerdeFrom(String.class, &amp;lt;1&amp;gt;
                                              60_000L &amp;lt;2&amp;gt;
                                            );

KStream&amp;lt;String,Double&amp;gt; iotHeatSensorStream =
  builder.stream("heat-sensor-input",
    Consumed.with(stringSerde, doubleSerde));
  iotHeatSensorStream.groupByKey() &amp;lt;1&amp;gt;
         .windowedBy(&amp;lt;window specificatation&amp;gt;)
         .aggregate(() -&amp;gt; new IotSensorAggregation(tempThreshold),
              aggregator,
               Materialized.with(stringSerde, aggregationSerde))
         .toStream().to("sensor-agg-output",
           Produced.with(windowedSerde, sensorAggregationSerde))&amp;lt;3&amp;gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;ol&gt;
&lt;li&gt;&lt;p&gt;The class type for the original key&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;The size of the window in milliseconds&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Providing the Serde for the Windowed key&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;So, by using the WindowedSerdes class, you provide the proper&lt;br&gt;
deserialization strategy for Kafka Streams to produce windowed results&lt;br&gt;
back to Kafka. Producing windowed results to a topic implies downstream&lt;br&gt;
consumers will know how to handle the windowed results as well. We'll&lt;br&gt;
cover that situation in a later blog on reporting in a subsequent post&lt;br&gt;
in this series.&lt;/p&gt;

&lt;p&gt;Now, let's move on to Flink SQL aggregation windows.&lt;/p&gt;
&lt;h2&gt;
  
  
  Flink SQL windowing
&lt;/h2&gt;

&lt;p&gt;Flink offers windowing for event stream data as windowing table-valued&lt;br&gt;
functions (TVF). The Flink TVFs implement the &lt;br&gt;
&lt;a href="https://sigmodrecord.org/publications/sigmodRecord/1806/pdfs/08_Industry_Michels.pdf"&gt;SQL 2016 standard Polymorphic TableFunctions&lt;/a&gt;&lt;br&gt;
(PTF). In a nutshell, PTFs allow for user-defined functions on a table that returns a table.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fauszjlrapkqem7rm3dny.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fauszjlrapkqem7rm3dny.png" alt="PFT in action" width="800" height="311"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;The exciting thing about PTF is that the schema of the table returned by the function is dynamic; it's determined at runtime by the function&lt;br&gt;
output. So, the PTFs enable windowing and aggregation functions on existing tables, precisely what we get with the Flink SQL windowing. The windowing TVFs in Flink replace the now deprecated &lt;a href="https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql/queries/window-agg/#group-window-aggregation"&gt;Group Window Functions&lt;/a&gt;.&lt;br&gt;
Window TVFs provide more powerful window-based calculations like &lt;a href="https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql/queries/window-topn/"&gt;Window TopN&lt;/a&gt;&lt;br&gt;
and &lt;a href="https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql/queries/window-deduplication/"&gt;Window Deduplication&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;Now, let's move on to how you execute a windowed aggregation in Flink SQL. As with the Kafka Streams example, we'll review the structure of a windowed aggregation, with specific window implementations covered in later posts.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;SELECT window_start,
       window_end,
       device_id,
       AVG(reading) AS avg_reading   &amp;lt;1&amp;gt;

FROM TABLE(&amp;lt;2&amp;gt;
           &amp;lt;Window Function&amp;gt; ( &amp;lt;3&amp;gt;
                              TABLE device_readings,   &amp;lt;4&amp;gt;
                              DESCRIPTOR(ts),    &amp;lt;5&amp;gt;
                              INTERVAL '5' MINUTES,  &amp;lt;6&amp;gt;
                              [INTERVAL '10' MINUTES]
                            )
           )
GROUP BY window_start, &amp;lt;7&amp;gt;
         window_end,
         device_id
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Here's the breakdown of the query:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;Selecting the columns and the aggregation using the Flink SQL AVG function and providing a descriptive name; these columns form the schema of the returned table.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;The TABLE function&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Here, you give a specific window function, either HOP, TUMBLING, or CUMULATE. Support for a SESSION type is coming soon. We'll cover the specific types in later posts.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Next are the parameters for the window function, starting with the table to use for the input&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;The DESCRIPTOR is the time attribute column the function uses for the window.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Depending on the window function, the following 1 or 2 parameters determine the window advance and size or just the size.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;As with standard SQL aggregate functions, we need the same columns in the GROUP BY clause in the SELECT clause.&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;Flink SQL inserts three additional columns into windowed operations,&lt;code&gt;window_start&lt;/code&gt;, &lt;code&gt;window_end&lt;/code&gt;, and &lt;code&gt;window_time&lt;/code&gt;. Flink SQL determines window_time by subtracting 1ms from the window_end value.&lt;/p&gt;

&lt;p&gt;This concludes our introduction to the structure of windowing&lt;br&gt;
applications in Kafka Streams and Flink SQL. In the next edition, we'll cover hopping and tumbling windows.&lt;/p&gt;

&lt;h2&gt;
  
  
  Resources
&lt;/h2&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;&lt;a href="https://www.confluent.io/product/flink/"&gt;Apache Flink on Confluent Cloud&lt;/a&gt;&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;a href="https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql/queries/window-tvf/#windowing-table-valued-functions-windowing-tvfs"&gt;Flink SQL Windows&lt;/a&gt;&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;a href="https://docs.confluent.io/platform/current/streams/developer-guide/dsl-api.html#windowing"&gt;Kafka Streams windowing documentation&lt;/a&gt;&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;a href="https://www.manning.com/books/kafka-streams-in-action-second-edition"&gt;Kafka Streams in Action 2nd Edition&lt;/a&gt;&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

</description>
      <category>kafkastreams</category>
      <category>flink</category>
      <category>flinksql</category>
      <category>programming</category>
    </item>
    <item>
      <title>A Critical Detail about Kafka Partitioners</title>
      <dc:creator>Bill Bejeck</dc:creator>
      <pubDate>Mon, 17 Apr 2023 16:11:20 +0000</pubDate>
      <link>https://forem.com/bbejeck/a-critical-detail-about-kafka-partitioners-21cb</link>
      <guid>https://forem.com/bbejeck/a-critical-detail-about-kafka-partitioners-21cb</guid>
      <description>&lt;p&gt;Apache Kafka® is the de facto standard for event streaming today.  Part of what makes Kafka so successful is its ability to handle tremendous volumes of data, with a throughput of millions of records per second, not unheard of in production environments.  One part of Kafka's design that makes this possible is partitioning.  &lt;/p&gt;

&lt;p&gt;Kafka uses partitions to spread the load of data across brokers in a cluster, and it's also the unit of parallelism; more partitions mean higher throughput.  Since Kafka works with key-value pairs, getting records with the same key on the same partition is essential.  &lt;/p&gt;

&lt;p&gt;Think of a banking application that uses the customer ID for each transaction it produces to Kafka.  It's critical to get all those events on the same partition; that way, consumer applications process records in the order they arrive.  The mechanism to guarantee records with the same key land on the correct partition is a simple but effective process: take the hash of the key modulo the number of partitions.  Here’s an illustration showing this concept in action:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--gYegYx2m--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/uflcd4abu6u74u4zsp9g.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--gYegYx2m--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/uflcd4abu6u74u4zsp9g.png" alt="Partitioners use hashing to determine the correct partition for a key" width="800" height="268"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;At a high level,  a hash function such as CRC32 or Murmur2 takes an input and produces a fixed-size output such as a 64-bit number.  The same input always produces the same output, whether implemented in Java, Python, or any other language. Partitioners use the hash result to choose a partition consistently, so the same record key will always map to the same Kafka partition. I won’t go into more details in this blog, but it’s enough to know that several hashing algorithms are available.&lt;/p&gt;

&lt;p&gt;I want to talk today not about how partitions work but about the partitioner in Kafka producer clients.  The producer uses a partitioner to determine the correct partition for a given key, so using the same partitioner strategy across your producer clients is critical.  &lt;/p&gt;

&lt;p&gt;Since producer clients have a default partitioner setting, this requirement shouldn't be an issue.  For example, when using the Java producer client with the Apache Kafka distribution, the &lt;code&gt;KafkaProducer&lt;/code&gt; class provides a default partitioner that &lt;a href="https://github.com/apache/kafka/blob/440a53099d6aed34bc9f407b5fb3d74484f1e167/clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java#L328"&gt;uses the Murmur2&lt;/a&gt; &lt;a href="https://en.wikipedia.org/wiki/MurmurHash#MurmurHash2"&gt;hash function&lt;/a&gt; to determine the partition for a given key.&lt;/p&gt;

&lt;p&gt;But what about Kafka producer clients in other languages?  The excellent &lt;a href="https://github.com/confluentinc/librdkafka/tree/master"&gt;librdkafka project&lt;/a&gt; is a C/C++ implementation of Kafka clients and is widely used for non-JVM Kafka applications.  Additionally, Kafka clients in other languages (Python, C#) build on top of it.  The default partitioner for librdkafka uses the CRC32 hash function to get the correct partition for a key.  &lt;/p&gt;

&lt;p&gt;This situation in and of itself is not an issue, but it easily could be.  The Kafka broker is agnostic to the client's language; as long it follows the Kafka protocol, you can use clients in any language, and the broker happily accepts their produce and consume requests.  Given today's polyglot programming environments, you can have development teams within an organization working in different languages, say Python and Java.  But without any changes, both groups will end up using different partitioning strategies in the form of different hashing algorithms: librdkafka producers with CRC32 and Java producers with Murmur2, so records with the same key will land in different partitions!  So, what's the remedy to this situation? &lt;/p&gt;

&lt;p&gt;The Java &lt;code&gt;KafkaProducer&lt;/code&gt; only provides one hashing algorithm via a default partitioner; since implementing a partitioner is tricky, it's best to leave it at the default.  But the &lt;a href="https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md"&gt;librdkafka producer client provides multiple options&lt;/a&gt;.  One of those options is the murmur2_random partitioner, which uses the murmur2 hash function and assigns null keys to a random partition, the equivalent behavior to the Java default partitioner.&lt;/p&gt;

&lt;p&gt;For example, if you're using the Kafka producer client in C#, you can set the partitioning strategy with this line:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;ProducerConfig.Partitioner = Partitioner.Murmur2Random;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;And now your C# and Java producer clients use compatible partitioning approaches!&lt;/p&gt;

&lt;p&gt;When using a non-java Kafka client, enabling the identical partitioning strategy as the Java producer client is an excellent idea to ensure that all producers use consistent partitions for different keys. &lt;/p&gt;

</description>
      <category>java</category>
      <category>apachekafka</category>
      <category>dotnet</category>
    </item>
    <item>
      <title>5 Things Every Apache Kafka Developer Should Know</title>
      <dc:creator>Bill Bejeck</dc:creator>
      <pubDate>Wed, 06 Jan 2021 16:10:26 +0000</pubDate>
      <link>https://forem.com/confluentinc/5-things-every-apache-kafka-developer-should-know-4nb</link>
      <guid>https://forem.com/confluentinc/5-things-every-apache-kafka-developer-should-know-4nb</guid>
      <description>&lt;p&gt;&lt;em&gt;This post was originally published on the &lt;a href="%E2%80%9Dhttps://www.confluent.io/blog/5-things-every-kafka-developer-should-know/?utm_source=dev&amp;amp;utm_medium=blogpost&amp;amp;utm_campaign=tm.devx_ch.top-5-things-every-kafka-developer-should-know_content.apache-kafka%E2%80%9C"&gt;Confluent blog&lt;/a&gt;.&lt;/em&gt;&lt;/p&gt;

&lt;p&gt;Apache Kafka&lt;sup&gt;®&lt;/sup&gt; is an event streaming platform used by more than 30%&lt;br&gt;
of the Fortune 500 today. There are numerous features of Kafka that make&lt;br&gt;
it the de-facto standard for an event streaming platform, and in this&lt;br&gt;
blog post, I explain what I think are the top five things every Kafka&lt;br&gt;
developer should know.Some items in our top five are performance&lt;br&gt;
related, while others are about the key architectural concepts that make&lt;br&gt;
Kafka tick. I hope that at the end ofthis blog post, you’ll walk away&lt;br&gt;
with a deeper understanding of how Kafka works, as well as with a new&lt;br&gt;
trick or two up your sleeve.&lt;/p&gt;

&lt;h2&gt;
  
  
  &lt;span id="what-we-will-cover"&gt;&lt;/span&gt;What we'll cover
&lt;/h2&gt;

&lt;ul&gt;
&lt;li&gt;  Tip #1: Understand message delivery and durability
guarantees
&lt;/li&gt;
&lt;li&gt;  Tip #2: Learn about the new sticky partitioner in the producer
API
&lt;/li&gt;
&lt;li&gt;  Tip #3: Avoid “stop-the-world” consumer group rebalances by using
cooperative rebalancing
&lt;/li&gt;
&lt;li&gt;  Tip #4: Master the command line
tools

&lt;ul&gt;
&lt;li&gt;  Kafka console producer
&lt;/li&gt;
&lt;li&gt;  Kafka console consumer
&lt;/li&gt;
&lt;li&gt;  Dump log
&lt;/li&gt;
&lt;li&gt;  Delete records
&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;

&lt;li&gt;  Tip #5: Use the powerof record&lt;br&gt;
headers

&lt;ul&gt;
&lt;li&gt;  Adding headers to Kafka records
&lt;/li&gt;
&lt;li&gt;  Retrieving headers
&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;

&lt;li&gt;  Recap
&lt;/li&gt;

&lt;/ul&gt;

&lt;h2&gt;
  
  
  &lt;span id="tip-1-message-delivery-and-durability"&gt;&lt;/span&gt;Tip #1: Understand message delivery and durability guarantees
&lt;/h2&gt;

&lt;p&gt;For data durability, the &lt;code&gt;KafkaProducer&lt;/code&gt; has the configuration setting&lt;br&gt;
&lt;code&gt;acks&lt;/code&gt;. The &lt;code&gt;acks&lt;/code&gt; configuration specifies how many acknowledgments the&lt;br&gt;
producer receives to consider a record delivered to the broker. The&lt;br&gt;
options to choose from are:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;  &lt;code&gt;none&lt;/code&gt;: The producer considers the records successfully delivered
once it sends the records to the broker. This is basically “fire and
forget.”&lt;/li&gt;
&lt;li&gt;  &lt;code&gt;one&lt;/code&gt;: The producer waits for the lead broker to acknowledge that it
has written the record to its log.&lt;/li&gt;
&lt;li&gt;  &lt;code&gt;all&lt;/code&gt;: The producer waits for an acknowledgment from the lead broker
and from the following brokers that they have successfully written
the record to their logs.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;As you can see, there is a trade-off to make here—and that’s by design&lt;br&gt;
because different applications have different requirements. You can opt&lt;br&gt;
for higher throughput with a chance for data loss, or you may prefer a&lt;br&gt;
very high data durability guarantee at the expense of a lower&lt;br&gt;
throughput.Now let’s take a second to talk a little bit about the&lt;br&gt;
&lt;code&gt;acks=all&lt;/code&gt;scenario. If you produce records with &lt;code&gt;acks&lt;/code&gt; set to all to a&lt;br&gt;
cluster of three Kafka brokers, it means that under ideal conditions,&lt;br&gt;
Kafka contains three replicas of your data—one for the lead broker and&lt;br&gt;
one each for two followers. When the logs of each of these replicas all&lt;br&gt;
have the same record offsets, they are considered to be in &lt;em&gt;sync&lt;/em&gt;. In&lt;br&gt;
other words, these &lt;em&gt;in-sync&lt;/em&gt; &lt;em&gt;replicas&lt;/em&gt; have the same content for a&lt;br&gt;
given topic partition. Take a look at the following illustration to&lt;br&gt;
clearly picture what’s going&lt;br&gt;
on:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fcdn.confluent.io%2Fwp-content%2Fuploads%2Fproducer-ack-1024x691.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fcdn.confluent.io%2Fwp-content%2Fuploads%2Fproducer-ack-1024x691.png" alt="Under ideal circumstances, the leader sends an Ack to the producer after both followers have sent an Ack to the leader."&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;But there’s some subtlety to using the &lt;code&gt;acks=all&lt;/code&gt; configuration. What it&lt;br&gt;
doesn’t specify is &lt;em&gt;how many&lt;/em&gt; replicas need to be in sync. The lead&lt;br&gt;
broker will always be in &lt;em&gt;sync&lt;/em&gt; with itself. But you could have a&lt;br&gt;
situation where the two following brokers can’t keep up due to network&lt;br&gt;
partitions, record load, etc. So when a producer has a successful send,&lt;br&gt;
the actual number of acknowledgments could have come from only one&lt;br&gt;
broker! If the two followers are not &lt;em&gt;in sync&lt;/em&gt;, the producer still&lt;br&gt;
receives the required number of acks, but it’s only the leader in this&lt;br&gt;
case. For example:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fcdn.confluent.io%2Fwp-content%2Fuploads%2Fin-sync-replicas-1024x710.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fcdn.confluent.io%2Fwp-content%2Fuploads%2Fin-sync-replicas-1024x710.png" alt="If for some reason the followers are down or can't fetch from the leader, the produce request still succeeds as all of the &amp;quot;in-sync&amp;quot; replicas have acknowledged the record."&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;By setting &lt;code&gt;acks=all&lt;/code&gt;, you are placing a premium on the durability of your&lt;br&gt;
data. So if the replicas aren’t keeping up, it stands to reason that you&lt;br&gt;
want to raise an exception for new records until the replicas are caught&lt;br&gt;
up.  In a nutshell, having only one in-sync replica follows the "letter of&lt;br&gt;
the law" but not the "spirit of the law." What we need is a guarantee&lt;br&gt;
when using the &lt;code&gt;acks=all&lt;/code&gt; setting. A successful send involves at least a&lt;br&gt;
majority of the available in-sync brokers.There just so happens to be&lt;br&gt;
one such configuration: &lt;code&gt;min.insync.replicas&lt;/code&gt;. The &lt;code&gt;min.insync.replicas&lt;/code&gt;&lt;br&gt;
configuration enforces the number of replicas that must be in sync for&lt;br&gt;
the write to proceed. Note that the &lt;code&gt;min.insync.replicas&lt;/code&gt; configuration&lt;br&gt;
is set at the broker or topic level and is not a producer configuration.&lt;br&gt;
The default value for &lt;code&gt;min.insync.replicas&lt;/code&gt; is one. So to avoid the&lt;br&gt;
scenario described above, in a three-broker cluster, you’d want to&lt;br&gt;
increase the value to two. Let’s revisit our previous example from before&lt;br&gt;
and see the&lt;br&gt;
difference:&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fcdn.confluent.io%2Fwp-content%2Fuploads%2Fnot-enough-replicas-1024x726.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fcdn.confluent.io%2Fwp-content%2Fuploads%2Fnot-enough-replicas-1024x726.png" alt="Not enough replicas | Replica acknowledgments haven't been received"&gt;&lt;/a&gt;If&lt;br&gt;
the number of replicas that are in sync is below the configured amount,&lt;br&gt;
the lead broker won’t attempt to append the record to its log. The leader&lt;br&gt;
throws either a &lt;code&gt;NotEnoughReplicasException&lt;/code&gt; or&lt;br&gt;
&lt;code&gt;NotEnoughReplicasAfterAppendException&lt;/code&gt;, forcing the producer to retry&lt;br&gt;
the write. Having replicas out of sync with the leader is considered a&lt;br&gt;
retryable error, so the producer will continue to retry and send the&lt;br&gt;
records up to the configured &lt;a href="https://kafka.apache.org/documentation/#delivery.timeout.ms" rel="noopener noreferrer"&gt;delivery&lt;br&gt;
timeout&lt;/a&gt;.So by setting the &lt;code&gt;min.insync.replicas&lt;/code&gt; and producer &lt;code&gt;acks&lt;/code&gt; configurations&lt;br&gt;
to work together in this way, you’ve increased the durability of your&lt;br&gt;
data. Now let’s move on to the next items in our list: improvements to&lt;br&gt;
the Kafka clients. Over the past year, the Kafka producer and Kafka consumer APIs have added some new features that every Kafka developer should know.&lt;/p&gt;

&lt;h2&gt;
  
  
  &lt;span id="tip-2-new-sticky-partitioner"&gt;&lt;/span&gt;Tip #2: Learn about the new sticky partitioner in the producer API
&lt;/h2&gt;

&lt;p&gt;Kafka uses partitions to increase throughput and spread the load of messages to all brokers in a cluster. Kafka records are in a key/value&lt;br&gt;
format, where the keys can be null. Kafka producers don’t immediately&lt;br&gt;
send records,instead placing them into partition-specific batches to be&lt;br&gt;
sent later. Batches are an effective means of increasing network&lt;br&gt;
utilization. There are three ways the partitioner determines into which&lt;br&gt;
partition the records should be written. The partition can be explicitly&lt;br&gt;
provided in the ProducerRecord object via the overloaded ProducerRecord&lt;br&gt;
constructor. In this case, the producer always uses this partition. If no&lt;br&gt;
partition is provided, and the ProducerRecord has a key, the producer&lt;br&gt;
takes the hash of the key modulo the number of partitions. The resulting&lt;br&gt;
number from that calculation is the partition that the producer will&lt;br&gt;
use. If there is no key and no partition present in the ProducerRecord,&lt;br&gt;
then previously Kafka used a round-robin approach to assign messages&lt;br&gt;
across partitions. The producer would assign the first record in the&lt;br&gt;
batch to partition zero, the second to partition one, and so on, until&lt;br&gt;
the end of the partitions. The producer would then start over with&lt;br&gt;
partition zero and repeat the entire process for all remaining&lt;br&gt;
records.The following illustration depicts this&lt;br&gt;
process:&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fcdn.confluent.io%2Fwp-content%2Fuploads%2Fproducer-partition-updated.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fcdn.confluent.io%2Fwp-content%2Fuploads%2Fproducer-partition-updated.png" alt="Producer | Batches assigned to a partition"&gt;&lt;/a&gt;The&lt;br&gt;
round-robin approach works well for even distribution of records across&lt;br&gt;
partitions. But there’s one drawback. Due to this "fair" round-robin&lt;br&gt;
approach,you can end up sending multiple sparsely populated batches.&lt;br&gt;
It’s more efficient to send fewer batches with more records in each&lt;br&gt;
batch. Fewer batches mean less queuing of produce requests, hence less&lt;br&gt;
load on the brokers. Let’s look at a simplified example where you have a&lt;br&gt;
topic with three partitions to explain this. For the sake of simplicity,&lt;br&gt;
let’s assume that your application produced nine records with no key,&lt;br&gt;
all arriving at the same&lt;br&gt;
time:&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fcdn.confluent.io%2Fwp-content%2Fuploads%2Fsparse_batches_sent-updated.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fcdn.confluent.io%2Fwp-content%2Fuploads%2Fsparse_batches_sent-updated.png" alt="The order of Partition Assignment when the producer uses Partition 2, it starts over at 0"&gt;&lt;/a&gt;As&lt;br&gt;
you can see above, the nine incoming records will result in three&lt;br&gt;
batches of three records. But, it would be better if we could send one&lt;br&gt;
batch of nine records. As stated before, fewer batches result in less&lt;br&gt;
network traffic and less load on the brokers.Apache Kafka 2.4.0 added&lt;br&gt;
the &lt;a href="https://www.confluent.io/blog/apache-kafka-2-4-latest-version-updates?utm_source=dev&amp;amp;utm_medium=blogpost&amp;amp;utm_campaign=tm.devx_ch.top-5-things-every-kafka-developer-should-know_content.apache-kafka#kip-480" rel="noopener noreferrer"&gt;sticky partitioner&lt;br&gt;
approach&lt;/a&gt;,&lt;br&gt;
which now makes this possible. Instead of using a round robin approach&lt;br&gt;
per record, the sticky partitioner assigns records to the same partition&lt;br&gt;
until the batch is sent. Then, after sending a batch, the sticky&lt;br&gt;
partitioner increments the partition to use for the next batch. Let’s&lt;br&gt;
revisit our illustration from above but updated using the sticky&lt;br&gt;
partitioner:&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fcdn.confluent.io%2Fwp-content%2Fuploads%2Fbatch-partition-updated.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fcdn.confluent.io%2Fwp-content%2Fuploads%2Fbatch-partition-updated.png" alt="Batch partition assignment order"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;By using the same partition until a batch is full or otherwise completed,&lt;br&gt;
we’ll send fewer produce requests, which reduces the load on the request&lt;br&gt;
queue and reduces latency of the system as well. It’s worth noting that&lt;br&gt;
the sticky partitioner still results in an even distribution of records.&lt;br&gt;
The even distribution occurs over time, as the partitioner sends a batch&lt;br&gt;
to each partition. You can think of it as a “per-batch” round-robin or “eventually even” approach. To learn more about the sticky partitioner,&lt;br&gt;
you can read the &lt;a href="https://www.confluent.io/blog/apache-kafka-producer-improvements-sticky-partitioner/?utm_source=dev&amp;amp;utm_medium=blogpost&amp;amp;utm_campaign=tm.devx_ch.top-5-things-every-kafka-developer-should-know_content.apache-kafka" rel="noopener noreferrer"&gt;Apache Kafka Producer Improvements with the Sticky Partitioner&lt;/a&gt; blog post and the related&lt;br&gt;
 &lt;a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-480%3A+Sticky+Partitioner" rel="noopener noreferrer"&gt;KIP-480&lt;/a&gt; design document. &lt;br&gt;
Now let’s move on to the consumer changes.&lt;/p&gt;

&lt;h2&gt;
  
  
  &lt;span id="tip-3-cooperative-rebalancing"&gt;&lt;/span&gt;Tip #3: Avoid “stop-the-world” consumer group rebalances by using cooperative rebalancing
&lt;/h2&gt;

&lt;p&gt;Kafka is a distributed system, and one of the key things distributed&lt;br&gt;
systems need to do is deal with failures and disruptions—not just&lt;br&gt;
anticipate failures, but fully embrace them. A great example of how&lt;br&gt;
Kafka handles this expecteddisruption is the consumer group protocol,&lt;br&gt;
which manages multiple instancesof a consumer for a single logical&lt;br&gt;
application. If an instance of a consumer stops, by design or otherwise,&lt;br&gt;
Kafka will &lt;em&gt;rebalance&lt;/em&gt; and make sure another instance of the consumer&lt;br&gt;
takes over the work.As of version 2.4, Kafka introduced a new rebalance&lt;br&gt;
protocol: cooperative rebalancing. But before we dive into the new&lt;br&gt;
protocol, let’s look in a bit moredetail at the consumer group&lt;br&gt;
basics.Let’s assume you have a distributed application with several&lt;br&gt;
consumers subscribed to a topic. Any set of consumers configured with&lt;br&gt;
the same group.id form one logical consumer called a consumer group.&lt;br&gt;
Each consumerin the group is responsible for consuming from one or more&lt;br&gt;
partitions of the subscribed topic(s). These partitions are assigned by&lt;br&gt;
the leader of the consumer group.Here’s an illustration demonstrating&lt;br&gt;
this&lt;br&gt;
concept:&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fcdn.confluent.io%2Fwp-content%2Fuploads%2Fsix-partitions-1024x451.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fcdn.confluent.io%2Fwp-content%2Fuploads%2Fsix-partitions-1024x451.png" alt="6 partitions | Topic | Consumer 1, 2 and 3"&gt;&lt;/a&gt;From&lt;br&gt;
the above illustration, you can see that under optimal conditions, all&lt;br&gt;
three consumers are processing records from two partitions each. But&lt;br&gt;
what happens if one of the applications suffers an error or can’t&lt;br&gt;
connect to the network anymore? Does processing for those topic&lt;br&gt;
partitions stop until you can restore the application in question?&lt;br&gt;
Fortunately, the answer is &lt;em&gt;no&lt;/em&gt;, thanks to the consumer rebalancing&lt;br&gt;
protocol.Here’s another illustration showing the consumer group protocol&lt;br&gt;
in&lt;br&gt;
action:&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fcdn.confluent.io%2Fwp-content%2Fuploads%2Fminus-consumer-2-1024x431.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fcdn.confluent.io%2Fwp-content%2Fuploads%2Fminus-consumer-2-1024x431.png" alt="6 partitions | Partitions: 0, 1, 3 "&gt;&lt;/a&gt;As&lt;br&gt;
you can see, Consumer 2 fails for some reason and either misses a poll&lt;br&gt;
or triggers a session timeout. The group coordinator removes it from the&lt;br&gt;
group and triggers what is known as a rebalance. A rebalance is a&lt;br&gt;
mechanism that attempts to evenly distribute (balance) the workload&lt;br&gt;
across all available members of a consumer group. In this case, since&lt;br&gt;
Consumer 2 left the group,the rebalance assigns its previously owned&lt;br&gt;
partitions to the other active members of the group. So as you can see,&lt;br&gt;
losing a consumer application for a particular group ID doesn’t result&lt;br&gt;
in a loss of processing on those topic partitions.There is, however, a&lt;br&gt;
drawback of the default rebalancing approach. Each consumer gives up its&lt;br&gt;
entire assignment of topic partitions, and no processing takes place&lt;br&gt;
until the topic partitions are reassigned—sometimes referred to as a&lt;br&gt;
“stop-the-world” rebalance. To compound the issue, depending on the&lt;br&gt;
instance of the &lt;code&gt;ConsumerPartitionAssignor&lt;/code&gt; used, consumers are simply&lt;br&gt;
reassigned the same topic partitions that they owned prior to the&lt;br&gt;
rebalance, the net effect being that there is no need to pause work on&lt;br&gt;
those partitions.This implementation of the rebalance protocol is called&lt;br&gt;
&lt;a href="https://www.confluent.io/blog/cooperative-rebalancing-in-kafka-streams-consumer-ksqldb/?utm_source=dev&amp;amp;utm_medium=blogpost&amp;amp;utm_campaign=tm.devx_ch.top-5-things-every-kafka-developer-should-know_content.apache-kafka" rel="noopener noreferrer"&gt;&lt;code&gt;eager rebalancing&lt;/code&gt;&lt;/a&gt;&lt;br&gt;
because it prioritizes the importance of ensuring that no consumers in&lt;br&gt;
the samp group claim ownership over the same topic partitions. Ownership&lt;br&gt;
of the same topic partition by two consumers in the same group would&lt;br&gt;
result in undefined behavior.While it is critical to keep any two&lt;br&gt;
consumers from claiming ownership over the same topic partition, it&lt;br&gt;
turns out that there is a better approach that provides safety without&lt;br&gt;
compromising on time spent not processing: &lt;a href="https://www.confluent.io/blog/incremental-cooperative-rebalancing-in-kafka/?utm_source=dev&amp;amp;utm_medium=blogpost&amp;amp;utm_campaign=tm.devx_ch.top-5-things-every-kafka-developer-should-know_content.apache-kafka" rel="noopener noreferrer"&gt;incremental cooperative&lt;br&gt;
rebalancing&lt;/a&gt;.&lt;br&gt;
First introduced to Kafka Connect in &lt;a href="https://www.confluent.io/blog/whats-new-in-apache-kafka-2-3?utm_source=dev&amp;amp;utm_medium=blogpost&amp;amp;utm_campaign=tm.devx_ch.top-5-things-every-kafka-developer-should-know_content.apache-kafka#kip-415" rel="noopener noreferrer"&gt;Apache Kafka&lt;br&gt;
2.3&lt;/a&gt;,&lt;br&gt;
this has now been implemented for the consumer group protocol too. With&lt;br&gt;
the cooperative approach, consumers don’t automatically give up&lt;br&gt;
ownership of all topic partitions at the start of the rebalance.&lt;br&gt;
Instead, all members encode their current assignment and forward the&lt;br&gt;
information to the group leader. The group leader then determines which&lt;br&gt;
partitions need to change ownership—instead of producingan entirely new&lt;br&gt;
assignment from scratch.Now a second rebalance is issued, but this time,&lt;br&gt;
only the topic partitions thatneed to change ownership are involved. It&lt;br&gt;
could be revoking topic partitions that are no longer assigned or adding&lt;br&gt;
new topic partitions. For the topic partitions that are in both the new&lt;br&gt;
and old assignment, nothing has to change, which means continued&lt;br&gt;
processing for topic partitions that aren’t moving.The bottom line is&lt;br&gt;
that eliminating the "stop-the-world" approach to rebalancing and only&lt;br&gt;
stopping the topic partitions involved means less costlyrebalances, thus&lt;br&gt;
reducing the total time to complete the rebalance. Even long rebalances&lt;br&gt;
are less painful now that processing can continue throughout them. This&lt;br&gt;
positive change in rebalancing is made possible by using the&lt;br&gt;
&lt;a href="https://kafka.apache.org/26/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html" rel="noopener noreferrer"&gt;&lt;code&gt;CooperativeStickyAssignor&lt;/code&gt;&lt;/a&gt;.&lt;br&gt;
The &lt;code&gt;CooperativeStickyAssignor&lt;/code&gt; makes the trade-off of having a second&lt;br&gt;
rebalance but with the benefit of a faster return to normal&lt;br&gt;
operations.To enable this new rebalance protocol, you need to set the&lt;br&gt;
&lt;code&gt;partition.assignment.strategy&lt;/code&gt; to use the new&lt;br&gt;
&lt;code&gt;CooperativeStickyAssignor&lt;/code&gt;. Also, note that this change is entirely on&lt;br&gt;
the client side. To take advantage of the new rebalance protocol, you&lt;br&gt;
only need to update your client version. If you’re a Kafka Streams user,&lt;br&gt;
there is even better news. Kafka Streams enables the cooperative&lt;br&gt;
rebalance protocol by default, so there is nothing else to do.&lt;/p&gt;

&lt;h2&gt;
  
  
  &lt;span id="tip-4-master-command-line-tools"&gt;&lt;/span&gt;Tip #4: Master the command line tools
&lt;/h2&gt;

&lt;p&gt;The Apache Kafka binary installation includes several tools located in&lt;br&gt;
the &lt;code&gt;bin&lt;/code&gt; directory. While you’ll find several tools in that directory,I&lt;br&gt;
want to show you the four tools that I think will have the most impact&lt;br&gt;
on your day-to-day work. I’m referring to the &lt;code&gt;console-consumer&lt;/code&gt;,&lt;br&gt;
&lt;code&gt;console-producer&lt;/code&gt;, &lt;code&gt;dump-log&lt;/code&gt;, and &lt;code&gt;delete-records&lt;/code&gt;.&lt;/p&gt;

&lt;h3&gt;
  
  
  &lt;span id="kafka-console-producer"&gt;&lt;/span&gt;Kafka console producer
&lt;/h3&gt;

&lt;p&gt;The console producer allows you to produce records to a topic directly&lt;br&gt;
from the command line. Producing from the command line is a great way to&lt;br&gt;
quickly test new consumer applications when you aren’t producing data to&lt;br&gt;
the topics yet. To start the console producer, run this command:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;kafka-console-producer --topic  \
--broker-list &amp;lt;broker-host:port&amp;gt; 
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;After you execute the command, there’s an empty prompt waiting for your&lt;br&gt;
input—just type in some characters and hit enter to produce a message.&lt;br&gt;
Using the command line producer in this way does not send any keys, only&lt;br&gt;
values. Luckily, there is a way to send keys as well. You just have to&lt;br&gt;
update the command to include the necessary flags:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;kafka-console-producer --topic  \                       
--broker-list &amp;lt;broker-host:port&amp;gt; \                      
--property parse.key=true \                       
--property key.separator=":"
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;The choice of the &lt;code&gt;key.separator&lt;/code&gt; property is arbitrary. You can use any&lt;br&gt;
character. And now, you can send full key/value pairs from the command&lt;br&gt;
line! If you are using &lt;a href="https://docs.confluent.io/current/schema-registry/index.html?utm_source=dev&amp;amp;utm_medium=blogpost&amp;amp;utm_campaign=tm.devx_ch.top-5-things-every-kafka-developer-should-know_content.apache-kafka" rel="noopener noreferrer"&gt;Confluent Schema&lt;br&gt;
Registry&lt;/a&gt;,&lt;br&gt;
there are &lt;a href="https://docs.confluent.io/current/schema-registry/serdes-develop/index.html?utm_source=dev&amp;amp;utm_medium=blogpost&amp;amp;utm_campaign=tm.devx_ch.top-5-things-every-kafka-developer-should-know_content.apache-kafka#command-line-utilities-and-json-encoding-of-messages" rel="noopener noreferrer"&gt;command line&lt;br&gt;
producers&lt;/a&gt;&lt;br&gt;
available to send records in Avro, Protobuf, and JSON Schemaformats.Now&lt;br&gt;
let’s take a look at the other side of the coin: consuming records from&lt;br&gt;
the command line.&lt;/p&gt;

&lt;h3&gt;
  
  
  &lt;span id="kafka-console-consumer"&gt;&lt;/span&gt;Kafka console consumer
&lt;/h3&gt;

&lt;p&gt;The console consumer gives you the ability to consume records from a&lt;br&gt;
Kafkatopic directly from the command line. Being able to quickly start a&lt;br&gt;
consumer can be an invaluable tool in prototyping or debugging. Consider&lt;br&gt;
building a new microservice. To quickly confirm that your producer&lt;br&gt;
application is sending messages, you can simply run this command:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;kafka-console-consumer --topic  \                          
--bootstrap-server &amp;lt;broker-host:port&amp;gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;After you run this command, you’ll start seeing records scrolling across&lt;br&gt;
your screen (so long as data is currently being produced to the topic).&lt;br&gt;
If you want to see all the records from the start, you can add a&lt;br&gt;
&lt;code&gt;--from-beginning&lt;/code&gt; flag to the command, and you’ll see all records&lt;br&gt;
produced to that topic.&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;kafka-console-consumer --topic &amp;lt;topic-name&amp;gt; \                          
--bootstrap-server &amp;lt;broker-host:port&amp;gt; \                          
--from-beginning
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;If you are using &lt;a href="https://docs.confluent.io/current/schema-registry/index.html?utm_source=dev&amp;amp;utm_medium=blogpost&amp;amp;utm_campaign=tm.devx_ch.top-5-things-every-kafka-developer-should-know_content.apache-kafka" rel="noopener noreferrer"&gt;Schema&lt;br&gt;
Registry&lt;/a&gt;,&lt;br&gt;
there are &lt;a href="https://docs.confluent.io/current/schema-registry/serdes-develop/index.html?utm_source=dev&amp;amp;utm_medium=blogpost&amp;amp;utm_campaign=tm.devx_ch.top-5-things-every-kafka-developer-should-know_content.apache-kafka#command-line-utilities-and-json-encoding-of-messages" rel="noopener noreferrer"&gt;command line&lt;br&gt;
consumers&lt;/a&gt;&lt;br&gt;
available for Avro, Protobuf, and JSON Schema encoded records. The&lt;br&gt;
Schema Registry command line consumers are intended for working with&lt;br&gt;
records in the Avro, Protobuf or JSON formats, while the plain consumers&lt;br&gt;
work with records of primitive Java types: String, Long, Double,&lt;br&gt;
Integer, etc. The default format expected for keys and values by the&lt;br&gt;
plain console consumer is the String type.If the keys or values are not&lt;br&gt;
strings, you’ll need to provide the deserializers via the command line&lt;br&gt;
flags &lt;code&gt;--key-deserializer&lt;/code&gt; and &lt;code&gt;--value-deserializer&lt;/code&gt; with the fully&lt;br&gt;
qualified class names of the respective deserializers.You may well have&lt;br&gt;
noticed that by default, the console consumer only prints the value&lt;br&gt;
component of the messages to the screen. If you want to see the keys as&lt;br&gt;
well, you can do so by including the necessary flags:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;kafka-console-consumer --topic  \                          
--bootstrap-server &amp;lt;broker-host:port&amp;gt; \  
--property print.key=true  
--property key.separator=":"
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;As with the producer, the value used for the key separator is arbitrary,&lt;br&gt;
so you can choose any character you want to use.&lt;/p&gt;

&lt;h3&gt;
  
  
  &lt;span id="dump-log"&gt;&lt;/span&gt;Dump log
&lt;/h3&gt;

&lt;p&gt;Sometimes when you’re working with Kafka, you may find yourself needing&lt;br&gt;
to manually inspect the underlying logs of a topic. Whether you’re just&lt;br&gt;
curious about Kafka internals or you need to debug an issue and verify&lt;br&gt;
the content, the &lt;code&gt;kafka-dump-log&lt;/code&gt; command is your friend. Here’sa&lt;br&gt;
command used to view the log of an example topic aptly named &lt;code&gt;example&lt;/code&gt;:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;  kafka-dump-log \  
  --print-data-log \   
  --files  ./var/lib/kafka/data/example-0/00000000000000000000.log 
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;ul&gt;
&lt;li&gt;  The &lt;code&gt;--print-data-log&lt;/code&gt; flag specifies to print the datain the log.&lt;/li&gt;
&lt;li&gt;  The &lt;code&gt;--files&lt;/code&gt; flag is required. This could also be a comma-separated
list of files.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;For a full list of options and a description of what each option does,&lt;br&gt;
run &lt;code&gt;kafka-dump-log&lt;/code&gt; with the &lt;code&gt;--help&lt;/code&gt; flag.Running the command above&lt;br&gt;
yields something like this:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;Dumping ./var/lib/kafka/data/example-0/00000000000000000000.logStarting offset: 0baseOffset: 0 lastOffset: 0 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1599775774460 size: 81 magic:2 compresscodec: NONE crc: 3162584294 isvalid: true| offset: 0 CreateTime: 1599775774460 keysize: 3 valuesize: 10 sequence: -1headerKeys: [] key: 887 payload: -2.1510235baseOffset: 1 lastOffset: 9 count: 9 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 81 CreateTime: 1599775774468 size: 252 magic: 2 compresscodec: NONE crc: 2796351311 isvalid: true| offset: 1 CreateTime: 1599775774463 keysize: 1 valuesize: 9 sequence: -1 headerKeys: [] key: 5 payload: 33.440664| offset: 2 CreateTime: 1599775774463 keysize: 8 valuesize: 9 sequence: -1 headerKeys: [] key: 60024247 payload: 9.1408728| offset: 3 CreateTime: 1599775774463 keysize: 1 valuesize: 9 sequence: -1 headerKeys: [] key: 1 payload: 45.348946| offset: 4 CreateTime: 1599775774464 keysize: 6 valuesize: 10 sequence: -1headerKeys: [] key: 241795 payload: -63.786373| offset: 5 CreateTime: 1599775774465 keysize: 8 valuesize: 9 sequence: -1 headerKeys: [] key: 53596698 payload: 69.431393| offset: 6 CreateTime: 1599775774465 keysize: 8 valuesize: 9 sequence: -1 headerKeys: [] key: 33219463 payload: 88.307875| offset: 7 CreateTime: 1599775774466 keysize: 1 valuesize: 9 sequence: -1 headerKeys: [] key: 0 payload: 39.940350| offset: 8 CreateTime: 1599775774467 keysize: 5 valuesize: 9 sequence: -1 headerKeys: [] key: 78496 payload: 74.180098| offset: 9 CreateTime: 1599775774468 keysize: 8 valuesize: 9 sequence: -1 headerKeys: [] key: 89866187 payload: 79.459314
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;There’s lots of information available from the &lt;code&gt;dump-log&lt;/code&gt; command. You&lt;br&gt;
can clearly see the &lt;code&gt;key, payload&lt;/code&gt; (value), offset, and timestamp for&lt;br&gt;
each record. Keep in mind that this data is from a demo topic that&lt;br&gt;
contains only 10 messages, so with a real topic, there will&lt;br&gt;
besubstantially more data. Also note that in this example, the keys and&lt;br&gt;
values for the topic are strings. To run the dump-log tool with key or&lt;br&gt;
value types other than strings, you’ll need to use either the&lt;br&gt;
&lt;code&gt;--key-decoder-class&lt;/code&gt; or the &lt;code&gt;--value-decoder-class&lt;/code&gt; flags.&lt;/p&gt;

&lt;h3&gt;
  
  
  &lt;span id="delete-records"&gt;&lt;/span&gt;Delete records
&lt;/h3&gt;

&lt;p&gt;Kafka stores records for topics on disk and retains that data even once&lt;br&gt;
consumers have read it. However, records aren’t stored in one big file&lt;br&gt;
but are broken up into segments by partition where the offset order is&lt;br&gt;
continuous across segments for the same topic partition. Because servers&lt;br&gt;
donot have infinite amounts of storage, Kafka provides settings to&lt;br&gt;
control how much data is retained, based on time and size:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;  The time configuration controlling data retention is
&lt;code&gt;log.retention.hours&lt;/code&gt;, which defaults to 168 hours (one week)&lt;/li&gt;
&lt;li&gt;  The size configuration &lt;code&gt;log.retention.bytes&lt;/code&gt; controlshow large
segments can grow before they are eligible for deletion&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;em&gt;However&lt;/em&gt;, the default setting for &lt;code&gt;log.retention.bytes&lt;/code&gt; is -1, which&lt;br&gt;
allows the log segment size to be unlimited. If you’re not careful and&lt;br&gt;
haven’t configured the retention &lt;em&gt;size&lt;/em&gt; as well as the retention time,&lt;br&gt;
you could have a situation where you will run out of disk space.&lt;br&gt;
Remember, you &lt;em&gt;never&lt;/em&gt; want to go into the filesystem and manually delete&lt;br&gt;
files. Instead, we want a controlled and supported way to delete records&lt;br&gt;
from a topic in order to free up space. Fortunately, Kafka ships with a&lt;br&gt;
tool that deletes data as required. The &lt;code&gt;kafka-delete-records&lt;/code&gt; has two&lt;br&gt;
mandatory parameters:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;  &lt;code&gt;--bootstrap-server:&lt;/code&gt; the broker(s) to connect to for bootstrapping&lt;/li&gt;
&lt;li&gt;  &lt;code&gt;--offset-json-file:&lt;/code&gt; a JSON file containing the deletion settings&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Here’s an example of the JSON file:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;{   "partitions": [                  
    {"topic": "example", "partition": 0, "offset": -1}],
    "version":1 
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;As you can see, the format of the JSON is simple. It’s an array of JSON&lt;br&gt;
objects. Each JSON object has three properties:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;  &lt;strong&gt;Topic&lt;/strong&gt;: the topic to delete from&lt;/li&gt;
&lt;li&gt;  &lt;strong&gt;Partition&lt;/strong&gt;: the partition to delete from&lt;/li&gt;
&lt;li&gt;  &lt;strong&gt;Offset&lt;/strong&gt;: the offset you want the delete to start from, moving
backward to lower offsets&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;For this example, I’m reusing the same topic from the dump-log tool, so&lt;br&gt;
it’s a very simple JSON file. If you had more partitions or topics, you&lt;br&gt;
would simply expand on the JSON config file above. I want to discuss how&lt;br&gt;
to choose the &lt;code&gt;offset&lt;/code&gt; in the JSON config file. Because the example topic&lt;br&gt;
contains only 10 records, you could easily calculate the starting offset&lt;br&gt;
to start the deletion process. But in practice, you most likely won’t&lt;br&gt;
know off the top of your head what offset to use. Also bear in mind that&lt;br&gt;
offset != message number, so you can’t just delete from “message 42.” If&lt;br&gt;
you supply a &lt;code&gt;-1&lt;/code&gt;, then the offset of the &lt;code&gt;high watermark&lt;/code&gt; is used,&lt;br&gt;
which means you will delete all the data currently in the topic. The&lt;br&gt;
&lt;code&gt;high watermark&lt;/code&gt; is the highest available offset for consumption (the&lt;br&gt;
offset of the last successfully replicated message, plus one). Now to run&lt;br&gt;
the command, just enter this on the command line:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;kafka-delete-records --bootstrap-server &amp;lt;broker-host:port&amp;gt; \                     
--offset-json-file offsets.json
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;After running this command, you should see something like this on the&lt;br&gt;
console:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;Executing records delete operationRecords delete operation completed:partition: example-0  low_watermark: 10
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;The results of the command show that Kafka deleted all records from the&lt;br&gt;
topic partition &lt;code&gt;example-0&lt;/code&gt;. The &lt;code&gt;low_watermark&lt;/code&gt; value of 10 indicates&lt;br&gt;
the &lt;em&gt;lowest&lt;/em&gt; offset available to consumers. Because there were only 10&lt;br&gt;
records in the &lt;code&gt;example topic&lt;/code&gt;, we know that the offsets ranged from 0&lt;br&gt;
to 9 and no consumer can read those records again. For more background&lt;br&gt;
on how deletes are implemented, you can read&lt;br&gt;
&lt;a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-107%3A+Add+deleteRecordsBefore%28%29+API+in+AdminClient" rel="noopener noreferrer"&gt;KIP-107&lt;/a&gt; and &lt;a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-204+%3A+Adding+records+deletion+operation+to+the+new+Admin+Client+API" rel="noopener noreferrer"&gt;KIP-204&lt;/a&gt;.&lt;/p&gt;

&lt;h2&gt;
  
  
  &lt;span id="tip-5-record-headers"&gt;&lt;/span&gt;Tip #5: Use the power of record headers
&lt;/h2&gt;

&lt;p&gt;Apache Kafka 0.11 introduced the concept of &lt;a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers" rel="noopener noreferrer"&gt;record&lt;br&gt;
headers&lt;/a&gt;.&lt;br&gt;
Record headers give you the ability to add some metadata about the Kafka&lt;br&gt;
record, without adding any extra information to the key/value pair of&lt;br&gt;
the record itself. Consider if you wanted to embed some information in a&lt;br&gt;
message, such as an identifier for the system from which the data&lt;br&gt;
originated. Perhaps you want this for lineage and audit purposes and in&lt;br&gt;
order to facilitate routing of the data downstream. Why not just append&lt;br&gt;
this information to the key? Then you could extract the part needed and&lt;br&gt;
you would be able to route data accordingly. But adding artificial data&lt;br&gt;
to the key poses two potential problems.&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt; First, if you are using a compacted topic, adding information to
the key would make the record incorrectly appear as unique. Thus,
compaction would not function as intended.&lt;/li&gt;
&lt;li&gt; For the second issue, consider the effect if one particular system
identifier dominates in the records sent. You now have a situation
where you could have significant key skew. Depending on how you are
consuming from the partitions, the uneven distribution of keys could
have an impact on processing by increasing latency.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;These are two situations where you might want to use headers. The&lt;br&gt;
&lt;a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers" rel="noopener noreferrer"&gt;original&lt;br&gt;
KIP&lt;/a&gt;&lt;br&gt;
proposing headers provides some additional cases as well:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;  Automated routing of messages based on header information between
clusters&lt;/li&gt;
&lt;li&gt;  Enterprise APM tools (e.g., Appdynamics or Dynatrace) need to stitch
in “magic” transaction IDs for them to provide end-to-end
transaction flow monitoring.&lt;/li&gt;
&lt;li&gt;  Audit metadata is recorded with the message, for example, the
client-id that produced the record.&lt;/li&gt;
&lt;li&gt;  Business payload needs to be encrypted end to end and signed without
tamper, but ecosystem components need access to metadata to achieve tasks.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Now that I’ve made a case for using headers, let’s walk through how you&lt;br&gt;
can add headers to your Kafka records.&lt;/p&gt;

&lt;h3&gt;
  
  
  &lt;span id="adding-headers"&gt;&lt;/span&gt;Adding headers toKafka records
&lt;/h3&gt;

&lt;p&gt;Here’s the Java code to add headers to a&lt;br&gt;
&lt;a href="https://kafka.apache.org/26/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html" rel="noopener noreferrer"&gt;&lt;code&gt;ProducerRecord&lt;/code&gt;&lt;/a&gt;:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;ProducerRecord&amp;lt;String, String&amp;gt; producerRecord = new ProducerRecord&amp;lt;&amp;gt;("bizops", "value"); 
producerRecord.headers().add("client-id", "2334".getBytes(StandardCharsets.UTF_8)); producerRecord.headers().add("data-file", "incoming-data.txt".getBytes(StandardCharsets.UTF_8)); 
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;// Details left out for clarity&lt;br&gt;
producer.send(producerRecord);&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;  Create an instance of the &lt;code&gt;ProducerRecord&lt;/code&gt; class&lt;/li&gt;
&lt;li&gt;  Call the &lt;code&gt;ProducerRecord.headers()&lt;/code&gt; method and add the key and value
for the header&lt;/li&gt;
&lt;li&gt;  Adding another header&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;There’s a few things we need to point out with the code example here.&lt;br&gt;
The &lt;a href="https://kafka.apache.org/26/javadoc/org/apache/kafka/common/header/Header.html" rel="noopener noreferrer"&gt;header&lt;/a&gt;&lt;br&gt;
interface expects a &lt;code&gt;String&lt;/code&gt; key and the value as a byte array. Even&lt;br&gt;
though you provide a key, you can add as many headers with the same key&lt;br&gt;
if needed. Duplicate keys will &lt;em&gt;not&lt;/em&gt; overwrite previous entries with the&lt;br&gt;
same key. Also, there are overloaded &lt;code&gt;ProducerRecord&lt;/code&gt; constructors that&lt;br&gt;
accept an &lt;code&gt;Iterable&amp;lt;Header&amp;gt;&lt;/code&gt;. You could create your own concrete class&lt;br&gt;
that implements the &lt;code&gt;Header&lt;/code&gt; interface and passes in a collection that&lt;br&gt;
implements the &lt;code&gt;Iterable&lt;/code&gt; interface. However, in practice, the simple&lt;br&gt;
method shown here should suffice. Now that you know how to add headers,&lt;br&gt;
let’s take a look at how you can access headers from the consumer side&lt;br&gt;
of things.&lt;/p&gt;

&lt;h3&gt;
  
  
  &lt;span id="retrieving-headers"&gt;&lt;/span&gt;Retrieving headers
&lt;/h3&gt;

&lt;p&gt;This is how you can access headers when consuming records:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;//Details left out for clarity
ConsumerRecords&amp;lt;String, String&amp;gt; consumerRecords = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord&amp;lt;String, String&amp;gt; consumerRecord : consumerRecords) {     
    for (Header header : consumerRecord.headers()) {          
        System.out.println("header key " + header.key() + "header value " + new String(header.value())); 
    }
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;ul&gt;
&lt;li&gt;  Iterating over the &lt;code&gt;ConsumerRecords&lt;/code&gt;
&lt;/li&gt;
&lt;li&gt;  For each &lt;code&gt;ConsumerRecord&lt;/code&gt;, iterating over the headers&lt;/li&gt;
&lt;li&gt;  Header processing&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;From the code above, you can see that to process the headers, simply use&lt;br&gt;
the &lt;a href="https://kafka.apache.org/26/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html#headers--" rel="noopener noreferrer"&gt;&lt;code&gt;ConsumerRecord.headers()&lt;/code&gt;&lt;/a&gt;&lt;br&gt;
method to return the headers. In our example above, we’re printing the&lt;br&gt;
headers out to the console for demonstration purposes. Once you have&lt;br&gt;
access to the headers, you can process them as needed. For reading&lt;br&gt;
headers from the command line, &lt;a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-431%3A+Support+of+printing+additional+ConsumerRecord+fields+in+DefaultMessageFormatter" rel="noopener noreferrer"&gt;KIP-431&lt;/a&gt;&lt;br&gt;
adds support for optionally printing headers from the ConsoleConsumer,&lt;br&gt;
which will be available in the Apache Kafka 2.7.0 release.You can also&lt;br&gt;
use &lt;a href="https://github.com/edenhill/kafkacat" rel="noopener noreferrer"&gt;kafkacat&lt;/a&gt; to view headers&lt;br&gt;
from the command line. Here’s an example command:&lt;/p&gt;


&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;kafkacat -b kafka-broker:9092 -t my_topic_name -C \&lt;br&gt;&lt;br&gt;
-f '\nKey (%K bytes): %k  Value (%S bytes): %s  Timestamp: %T  Partition: %p  Offset: %o  Headers: %h\n'&lt;br&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;h2&gt;
&lt;br&gt;
  &lt;br&gt;
  &lt;br&gt;
  &lt;span id="recap"&gt;&lt;/span&gt;Recap&lt;br&gt;
&lt;/h2&gt;

&lt;p&gt;You now have read the top five tips for working with Apache Kafka. To recap,we understand:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt; Message durability and its relationship with delivery guarantees&lt;/li&gt;
&lt;li&gt; The sticky partitioner in the producer API&lt;/li&gt;
&lt;li&gt; The command line tools&lt;/li&gt;
&lt;li&gt; The power of record headers&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;And yet, there is still so much more to learn! Head over to &lt;a href="https://developer.confluent.io/?utm_source=dev&amp;amp;utm_medium=blogpost&amp;amp;utm_campaign=tm.devx_ch.top-5-things-every-kafka-%0Adeveloper-should-know_content.apache-kafka" rel="noopener noreferrer"&gt;Confluent&lt;br&gt;
Developer&lt;/a&gt;&lt;br&gt;
and &lt;a href="https://kafka-tutorials.confluent.io/?utm_source=dev&amp;amp;utm_medium=blogpost&amp;amp;utm_campaign=tm.devx_ch.top-5-things-every-kafka-developer-should-know_content.apache-kafka" rel="noopener noreferrer"&gt;Kafka Tutorials&lt;/a&gt; to see what’s going on.&lt;/p&gt;

</description>
      <category>apachekafka</category>
      <category>kafka</category>
      <category>microservices</category>
      <category>eventstreaming</category>
    </item>
  </channel>
</rss>
