<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>Forem: Olena Kutsenko</title>
    <description>The latest articles on Forem by Olena Kutsenko (@olena_kutsenko).</description>
    <link>https://forem.com/olena_kutsenko</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F601720%2F9dfb66ac-aa96-4905-aa6d-24272c4ae85b.png</url>
      <title>Forem: Olena Kutsenko</title>
      <link>https://forem.com/olena_kutsenko</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://forem.com/feed/olena_kutsenko"/>
    <language>en</language>
    <item>
      <title>Using Apache Kafka® and OpenSearch® to explore Mastodon data</title>
      <dc:creator>Olena Kutsenko</dc:creator>
      <pubDate>Tue, 21 Mar 2023 17:14:17 +0000</pubDate>
      <link>https://forem.com/olena_kutsenko/using-apache-kafkar-and-opensearchr-to-explore-mastodon-data-j1j</link>
      <guid>https://forem.com/olena_kutsenko/using-apache-kafkar-and-opensearchr-to-explore-mastodon-data-j1j</guid>
      <description>&lt;p&gt;In an earlier blog post we talked about &lt;a href="https://aiven.io/blog/mastodon-kafka-js?utm_source=devto&amp;amp;utm_medium=organic&amp;amp;utm_campaign=blog_art"&gt;how to stream data from the Mastodon timeline into an Apache Kafka® cluster&lt;/a&gt;. If you missed it, read it to learn how to connect to Mastodon and stream data into an Apache Kafka topic. This article assumes that you already have an Apache Kafka environment running with data in a topic that is called &lt;strong&gt;Mastodon&lt;/strong&gt;.&lt;/p&gt;

&lt;p&gt;Having magnitudes of data that is coming non-stop from a source (Mastodon timeline in our case!) brings us a new challenge - how to make sense of all this data? For instance, you may want to observe trends, search for particular entries, or filter and aggregate data to understand it.&lt;/p&gt;

&lt;p&gt;The biggest advantage of bringing data into an Apache Kafka topic is that Apache Kafka provides a convenient mechanism to plug in other applications and reuse data for such cases as analytics, visualisations, metrics or long term storage. This all can be achieved with Apache Kafka® Connect with almost no code involved.&lt;/p&gt;

&lt;p&gt;When talking about tools for search, aggregations and visualisations, a great place to start is OpenSearch®. OpenSearch is an open source search and analytics suite that has a powerful visual interface to work with data. It is straightforward to set up and start using, so why not let OpenSearch analyse the data coming from Mastodon?&lt;/p&gt;

&lt;p&gt;To give you a visual picture, below is the architectural diagram connecting all the building blocks used in the previous article and the ones we will add in the current one:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--FmAvDt-_--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/jybrbaqu5m8i1jivz3cc.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--FmAvDt-_--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/jybrbaqu5m8i1jivz3cc.png" alt="A diagram that visualises two steps of data transport. The first step (described in detail in the previous article) is bringing data from a Mastodon public feed into an Apache Kafka topic using the Mastodon API, masto.js and node-rdkafka. The second step (described in this article) is using an OpenSearch sink connector into OpenSearch." width="880" height="360"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;In this post you will learn how to reuse the data you have in Apache Kafka with OpenSearch for visualisations and aggregations.&lt;/p&gt;

&lt;h2&gt;
  
  
  Set up OpenSearch
&lt;/h2&gt;

&lt;p&gt;Both Apache Kafka and OpenSearch are available in the Aiven platform, so not only can you run them side by side, but also you can use a managed OpenSearch sink connector that connects these services.&lt;/p&gt;

&lt;p&gt;Create an &lt;a href="https://aiven.io/opensearch?utm_source=devto&amp;amp;utm_medium=organic&amp;amp;utm_campaign=blog_art"&gt;Aiven for OpenSearch&lt;/a&gt; service from &lt;a href="https://console.aiven.io/?utm_source=devto&amp;amp;utm_medium=organic&amp;amp;utm_campaign=blog_art"&gt;Aiven Console&lt;/a&gt; (read more about &lt;a href="https://docs.aiven.io/docs/products/opensearch.html"&gt;OpenSearch in Aiven docs&lt;/a&gt;). &lt;/p&gt;

&lt;p&gt;Once your service is created, make a note of the connection settings, you'll need them in the next section.&lt;/p&gt;

&lt;h2&gt;
  
  
  Use Kafka Connect to bring Apache Kafka and OpenSearch together
&lt;/h2&gt;

&lt;p&gt;The easiest way to connect Apache Kafka with other tools is to use one of the already available connectors. Conveniently, there is &lt;a href="https://docs.aiven.io/docs/products/kafka/kafka-connect/howto/opensearch-sink.html"&gt;an open-source sink connector for OpenSearch&lt;/a&gt; that you can use out of the box. &lt;/p&gt;

&lt;p&gt;To add connectors to the running Apache Kafka cluster, &lt;a href="https://docs.aiven.io/docs/products/kafka/kafka-connect/howto/enable-connect"&gt;enable &lt;strong&gt;Apache Kafka Connect&lt;/strong&gt;&lt;/a&gt; in your Aiven for Apache Kafka service or &lt;a href="https://docs.aiven.io/docs/products/kafka/kafka-connect/howto/best-practices#consider-a-standalone-apache-kafka-connect-service"&gt;consider using a standalone Apache Kafka Connect service&lt;/a&gt;. &lt;/p&gt;

&lt;p&gt;Navigate to the &lt;strong&gt;Connectors&lt;/strong&gt; tab to create a new connector. In the long list of available options, select &lt;strong&gt;OpenSearch sink&lt;/strong&gt;. &lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--sHYtMyj3--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/3d3wsvvy6zm4ycunkkz9.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--sHYtMyj3--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/3d3wsvvy6zm4ycunkkz9.png" alt="Select OpenSearch sink from the list of connectors" width="880" height="220"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;In the configuration page you can either enter properties manually, or speed it up by using a JSON object for connector configuration. Open the JSON editor by clicking on the pencil icon next to the connector configuration.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--jZbBWu90--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/6yzp1bvzt0t6lx5mbn97.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--jZbBWu90--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/6yzp1bvzt0t6lx5mbn97.png" alt="Screenshot showing location of the button to edit connector configuration" width="880" height="345"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Below is an example of the configuration properties needed for the connection. Replace &lt;code&gt;YOUR_OPENSEARCH_HOST&lt;/code&gt;, &lt;code&gt;PORT&lt;/code&gt;, &lt;code&gt;YOUR_SERVICE_USER&lt;/code&gt; and &lt;code&gt;YOUR_SERVICE_PASSWORD&lt;/code&gt; with values taken from your OpenSearch service. These are the connection properties you saw when you created the Aiven for OpenSearch service.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"name"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"sink_mastadon_json"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"connection.url"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"https://YOUR_OPENSEARCH_HOST:PORT"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"key.ignore"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"true"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"connector.class"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"io.aiven.kafka.connect.opensearch.OpensearchSinkConnector"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"connection.username"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"YOUR_SERVICE_USER"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"schema.ignore"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"true"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"tasks.max"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"1"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"connection.password"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"YOUR_SERVICE_PASSWORD"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"key.converter"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"org.apache.kafka.connect.storage.StringConverter"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"value.converter"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"org.apache.kafka.connect.json.JsonConverter"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"topics"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"mastodon"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"value.converter.schemas.enable"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"false"&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;If you want to know more about the available options, check &lt;a href="https://docs.aiven.io/docs/products/kafka/kafka-connect/howto/opensearch-sink.html"&gt;the documentation for the OpenSearch sink Kafka connector&lt;/a&gt; .&lt;/p&gt;

&lt;p&gt;Once you define the values, copy this data to the Apache Kafka connector configuration and press &lt;strong&gt;Apply&lt;/strong&gt;.&lt;/p&gt;

&lt;p&gt;Click to create the connector and wait for the status to be changed to &lt;strong&gt;RUNNING&lt;/strong&gt;, at this point the data is flowing to OpenSearch. If there are any issues during the connection, you'll see an error message, and more information will be available in the logs.&lt;/p&gt;

&lt;p&gt;Now the data from your Apache Kafka topic is sinking into the OpenSearch index. The default name of this newly-created index in OpenSearch is the same as the Kafka topic name.&lt;/p&gt;

&lt;p&gt;Time to look at the data in OpenSearch with the help of &lt;a href="https://opensearch.org/docs/latest/dashboards/index/"&gt;OpenSearch Dashboards&lt;/a&gt;!&lt;/p&gt;

&lt;h2&gt;
  
  
  Log in to OpenSearch Dashboards
&lt;/h2&gt;

&lt;p&gt;To see the data in OpenSearch, open OpenSearch Dashboards using the &lt;strong&gt;Host&lt;/strong&gt;, &lt;strong&gt;User&lt;/strong&gt; and &lt;strong&gt;Password&lt;/strong&gt; details from the "OpenSearch Dashboards" tab in the web console.&lt;/p&gt;

&lt;p&gt;Once you're logged in, create an index pattern for Mastodon. An index pattern is a view for one or more indices that will be used together for aggregation. We have just one index, you can leave it as either &lt;code&gt;Mastodon&lt;/code&gt; or &lt;code&gt;Mastodon*&lt;/code&gt;. Use &lt;code&gt;CreatedAt&lt;/code&gt; as the time field and you'll be able to filter your data by time. &lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s---NfPqA0G--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/jx4ittrr8ied6qixnmdm.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s---NfPqA0G--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/jx4ittrr8ied6qixnmdm.png" alt="Screenshot of creating an index pattern in OpenSearch" width="880" height="401"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Explore data with the Discover panel
&lt;/h2&gt;

&lt;p&gt;When you don't yet know much about the data, &lt;a href="https://opensearch.org/docs/latest/dashboards/discover/index/"&gt;the discover panel&lt;/a&gt; is a great place to start. Here you can either view complete data objects, or choose specific properties you're interested in. &lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--bfNJsuF4--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/fevhuwemfzrzha9krnmd.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--bfNJsuF4--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/fevhuwemfzrzha9krnmd.png" alt="Screenshot of discover panel" width="880" height="483"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;You can also filter, search and even look at pre-created visualisations for each of the available fields.&lt;/p&gt;

&lt;p&gt;For example, if we are only interested in messages that include polls, add a filter to show only those records that have &lt;code&gt;poll.id&lt;/code&gt;  defined.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--o42U-jl5--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/yopvsk8yi8xnhf4n7osd.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--o42U-jl5--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/yopvsk8yi8xnhf4n7osd.png" alt="Screenshot of settings to see if a poll.id property exists" width="880" height="509"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Once you apply the setting, you can see the latest polls.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--fZYC_gdh--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/pzhpc9m8uwe7ijfn2rbf.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--fZYC_gdh--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/pzhpc9m8uwe7ijfn2rbf.png" alt="Screenshot of different found polls" width="880" height="499"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;What is interesting, however, is that if you look at &lt;code&gt;poll.voterCount&lt;/code&gt;, the vast majority of polls don't get any voters. It seems that opinion polls are not always popular on Mastodon.&lt;/p&gt;

&lt;h2&gt;
  
  
  Create visualisations for aggregations
&lt;/h2&gt;

&lt;p&gt;The discover panel is fun, but you might want to define a specific aggregation and visualise the result. OpenSearch Dashboards has a variety of options for this. Look at the list of available visualisations. Each of them comes with a set of properties to shape the targeted aggregation. Here are a couple of examples:&lt;/p&gt;

&lt;h3&gt;
  
  
  Tag clouds
&lt;/h3&gt;

&lt;p&gt;To quickly see what tags are popular for the latest messages you can use &lt;strong&gt;a tag cloud&lt;/strong&gt;. Create a new bucket, set &lt;strong&gt;Aggregation&lt;/strong&gt; to &lt;code&gt;terms&lt;/code&gt; and  &lt;strong&gt;tags&lt;/strong&gt; to &lt;code&gt;tags.name.keyword&lt;/code&gt;. To get more tags in your cloud, increase the &lt;strong&gt;size&lt;/strong&gt; property.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--JkK2NXA7--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/ba6p4z1k9a9cqqmctal3.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--JkK2NXA7--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/ba6p4z1k9a9cqqmctal3.png" alt="Screenshot of creating a tag cloud" width="880" height="1208"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Click &lt;code&gt;Update&lt;/code&gt; and you'll see a tag cloud of the most popular Mastodon hashtags.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--8WFT_TcY--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/kzto0lff6nm33po4s0cs.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--8WFT_TcY--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/kzto0lff6nm33po4s0cs.png" alt="Tag cloud visualisation" width="880" height="532"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;If you're thinking which Mastodon tag is best to follow, search for some keywords and see what tags pop up. If you're a dog lover, apply the &lt;code&gt;content:dog&lt;/code&gt; filter to see which tags have dog-related content 🐕.&lt;/p&gt;

&lt;h3&gt;
  
  
  Bar visualisations
&lt;/h3&gt;

&lt;p&gt;Bar visualisations are useful to compare different values. To compare median values for the number of followers vs following users across accounts that posted the latest messages, create a horizontal bar visualisation.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--gVBKoe5a--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/xtzw7ujr1evor9ct6de0.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--gVBKoe5a--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/xtzw7ujr1evor9ct6de0.png" alt="Screenshot showing that the median of followers is higher than the median of following people" width="880" height="403"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h3&gt;
  
  
  Table
&lt;/h3&gt;

&lt;p&gt;If you're curious to know which users have the most followers, create a table to show information about the accounts with the highest number of followers.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--jmJVj7gj--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/59r9mn029h1j710hk7zu.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--jmJVj7gj--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/59r9mn029h1j710hk7zu.png" alt="Screenshot showing most popular users" width="880" height="304"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Organise visualisations in a dashboard
&lt;/h2&gt;

&lt;p&gt;Once you have multiple visualisations, you can combine them into a dashboard. A dashboard will allow you to apply time constraints and filters to multiple visualisations at once. &lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--h9OE77By--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/wm4ul7meyucepbxmrwk7.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--h9OE77By--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/wm4ul7meyucepbxmrwk7.png" alt="Screenshot showing a dashboard with visualisations of data for the last 15 minutes" width="880" height="402"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;The default time span is the last 15 minutes. If you can't see any data on your dashboard, make sure that you have recent data, or use the time field to apply a specific time span.&lt;/p&gt;

&lt;h2&gt;
  
  
  Find more uses for your data
&lt;/h2&gt;

&lt;p&gt;Apache Kafka Connect offers enormous power to connect multiple systems together by creating data pipelines. In this example you saw how you can bring the data from an Apache Kafka topic to OpenSearch with no code needed. Our goal was to aggregate and visualise the data, which is why we used OpenSearch. In your own scenario, you might want to collect data from an Apache Kafka topic to &lt;a href="https://docs.aiven.io/docs/products/kafka/kafka-connect.html#sink-connectors"&gt;sink it to a different database&lt;/a&gt;, or &lt;a href="https://docs.aiven.io/docs/products/kafka/kafka-connect/howto/s3-sink-connector-aiven.html"&gt;an object store such as S3&lt;/a&gt;, or &lt;a href="https://docs.aiven.io/docs/products/clickhouse/howto/integrate-kafka.html"&gt;put it into ClickHouse® for long term storage and analytics&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://twitter.com/aiven_io"&gt;Tell us&lt;/a&gt; what you are building, what connectors you use, and which ones we should add next!&lt;/p&gt;

</description>
      <category>apachekafka</category>
      <category>mastodon</category>
      <category>opensearch</category>
      <category>data</category>
    </item>
    <item>
      <title>How to stream data from Mastodon public timelines to Apache Kafka with NodeJS and TypeScript</title>
      <dc:creator>Olena Kutsenko</dc:creator>
      <pubDate>Tue, 21 Mar 2023 16:59:40 +0000</pubDate>
      <link>https://forem.com/olena_kutsenko/how-to-stream-data-from-mastodon-public-timelines-to-apache-kafka-with-nodejs-and-typescript-6k4</link>
      <guid>https://forem.com/olena_kutsenko/how-to-stream-data-from-mastodon-public-timelines-to-apache-kafka-with-nodejs-and-typescript-6k4</guid>
      <description>&lt;p&gt;&lt;a href="https://joinmastodon.org/"&gt;Mastodon&lt;/a&gt; has been rising in popularity over recent months. If you're not yet familiar with this exotic online creature, Mastodon is an open-source social networking software for microblogging. Instead of being a single network, like Twitter, Mastodon is a federated platform that connects independent interconnected servers. This makes it a fully decentralised system. It relies on ActivityPub and uses the ActivityStreams 2.0 data format and JSON-LD. As for the functionality, it resembles closely Twitter - you can read the timeline, post messages and interact with other users. &lt;/p&gt;

&lt;p&gt;If you just recently joined Mastodon and are still exploring it, you might find that the scrolling timeline has its limits to understand all that is happening there. Applying some engineering skills will give a better overview on the topics and discussions happening on the platform.&lt;/p&gt;

&lt;p&gt;Since Mastodon's timeline is nothing more than a collection of continuously coming events, the data feed is well-suited for Apache Kafka®. Adding Kafka connectors on top of that opens multiple possibilities to use data for aggregations and visualisations.&lt;/p&gt;

&lt;p&gt;Continue reading to learn how to bring data from Mastodon to Kafka using TypeScript and a couple of helpful libraries. &lt;/p&gt;

&lt;h2&gt;
  
  
  Prepare the Apache Kafka cluster
&lt;/h2&gt;

&lt;p&gt;To bring the data from the Mastodon timeline to a topic in Apache Kafka, you'll need an Apache Kafka cluster and some code to stream the data there. For the former, you can use either your own cluster, or a managed version that runs in the cloud, such as &lt;a href="https://aiven.io/kafka?utm_source=devto&amp;amp;utm_medium=organic&amp;amp;utm_campaign=blog_art"&gt;Aiven for Apache Kafka&lt;/a&gt;. If you don't have an Aiven account yet, &lt;a href="https://console.aiven.io/signup/email?utm_source=devto&amp;amp;utm_medium=organic&amp;amp;utm_campaign=blog_art"&gt;sign up for a free trial&lt;/a&gt; and &lt;a href="https://docs.aiven.io/docs/products/kafka/getting-started.html"&gt;create your cluster&lt;/a&gt;, the setup only takes a few minutes.&lt;/p&gt;

&lt;p&gt;Once your cluster is running, &lt;a href="https://docs.aiven.io/docs/products/kafka/howto/create-topic.html"&gt;add a topic&lt;/a&gt; with the name &lt;code&gt;mastodon&lt;/code&gt;. Alternatively, you can use any other name, just remember it, you'll need it a bit later.&lt;/p&gt;

&lt;p&gt;To connect securely to the cluster we'll use SSL. Aiven already takes care of the configuration of the server, but you'll need to download three files for the client to establish the connection. Download these files from Aiven's console:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--bMuPf9dr--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/x9pt3lx5g7opceaq7ykj.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--bMuPf9dr--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/x9pt3lx5g7opceaq7ykj.png" alt="Screenshot of the Aiven for Apache Kafka page in Aiven's console showing where to take certificates and keys" width="880" height="139"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;You will need these files later, so put them somewhere safe for now.&lt;/p&gt;

&lt;h2&gt;
  
  
  Working with the Mastodon API
&lt;/h2&gt;

&lt;p&gt;The Mastodon API has excellent documentation that makes it straightforward to access the public data feeds. You don't need to be registered to retrieve a public feed, which makes it very convenient. Actually, just give it a try right now. Run the line below in your terminal to start retrieving a stream of data from &lt;code&gt;mastodon.social&lt;/code&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;curl https://mastodon.social/api/v1/streaming/public
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;As a response you should see an endless flow of incoming events:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--5FfK7Mh9--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/v1/images/curl-mastodon.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--5FfK7Mh9--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/v1/images/curl-mastodon.png" alt="Running  raw ``curl https://mastodon.social/api/v1/streaming/public`` endraw  in your terminal shows a response with event data" width="" height=""&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Parsing the response from the server manually is a monotonous and tedious operation. Rather than reinvent the wheel, you can use one of the &lt;a href="https://docs.joinmastodon.org/client/libraries/"&gt;available libraries for Mastodon&lt;/a&gt;. For this example we'll be using &lt;a href="https://github.com/neet/masto.js"&gt;masto.js&lt;/a&gt;.&lt;/p&gt;

&lt;h2&gt;
  
  
  Jump into the code
&lt;/h2&gt;

&lt;p&gt;To give you an instant start to bring Mastodon data into an Apache Kafka cluster, clone &lt;a href="https://github.com/aiven/mastodon-to-kafka"&gt;this repository&lt;/a&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;git clone https://github.com/aiven/mastodon-to-kafka
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Once you have the contents of the repo locally, follow these steps:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Create a folder &lt;code&gt;certificates/&lt;/code&gt; and  add the SSL certificates you downloaded earlier into this folder. We will need these to connect securely to Apache Kafka.&lt;/li&gt;
&lt;li&gt;Copy the file &lt;code&gt;.env.example&lt;/code&gt; and rename to &lt;code&gt;.env&lt;/code&gt;, this file will hold the environment variables.&lt;/li&gt;
&lt;li&gt;Set &lt;code&gt;kafka.uri&lt;/code&gt; in &lt;code&gt;.env&lt;/code&gt; to the address of your cluster. You can take it from the connection information of your Aiven for Apache Kafka service.&lt;/li&gt;
&lt;li&gt;Run &lt;code&gt;npm install&lt;/code&gt; to install all dependencies (if you don't have npm or NodeJS yet, follow &lt;a href="https://docs.npmjs.com/downloading-and-installing-node-js-and-npm"&gt;the installation instructions&lt;/a&gt;).&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;Finally, start by running &lt;code&gt;npm run start&lt;/code&gt; and you should see a flow of delivery reports for every new message coming from the Mastodon public feed, that is defined in the code (in the next section you'll see how to change it to whichever Mastodon feed you like!).&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--zcbU4lR0--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/kykyx7pm873qt920pps1.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--zcbU4lR0--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/kykyx7pm873qt920pps1.png" alt="Screenshot showing running code to send data to the Kafka topic" width="880" height="644"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;If things don't work first time, check for error messages printed in the terminal. They will help you navigate the problem. &lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;You can verify that the data is flowing and see what messages you get by enabling Apache Kafka Rest API.&lt;/p&gt;

&lt;p&gt;In the contextual menu for the topic, select &lt;strong&gt;Apache Kafka REST&lt;/strong&gt;:&lt;br&gt;
&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--9z_9VEp---/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/temwi2r9q5y5rhs5un4n.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--9z_9VEp---/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/temwi2r9q5y5rhs5un4n.png" alt="Screenshot showing Apache Kafka REST menu option for a topic" width="880" height="282"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;We can step back to see exactly how the code works to send the data from Mastodon to Apache Kafka. This can be divided into two logical steps:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Streaming messages from a public Mastodon timeline.&lt;/li&gt;
&lt;li&gt;Sending these messages to Apache Kafka.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;In the sections below you can see these steps in detail.&lt;/p&gt;
&lt;h2&gt;
  
  
  Streaming messages from a public Mastodon timeline
&lt;/h2&gt;

&lt;p&gt;Open the file &lt;code&gt;mastostream.ts&lt;/code&gt;. It contains a small module to stream the Mastodon data. &lt;/p&gt;

&lt;p&gt;To initialise the Mastodon client you need to call &lt;code&gt;login()&lt;/code&gt; from the &lt;code&gt;masto.js&lt;/code&gt; client library and provide the required configuration. This is also the place to provide authentication information, however, since we are only interested in public feeds, the URL property is enough. As URL, &lt;a href="https://joinmastodon.org/servers"&gt;use your favourite Mastodon server&lt;/a&gt;.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;&lt;span class="kd"&gt;const&lt;/span&gt; &lt;span class="nx"&gt;masto&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="nx"&gt;login&lt;/span&gt;&lt;span class="p"&gt;({&lt;/span&gt;
    &lt;span class="na"&gt;url&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;https://mastodon.social/&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="c1"&gt;// choose your favourite mastodon server&lt;/span&gt;
&lt;span class="p"&gt;});&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;With the initialised Mastodon client you connect to the public stream API by calling the asynchronous function &lt;code&gt;masto.stream.streamPublicTimeline()&lt;/code&gt;.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;&lt;span class="kd"&gt;const&lt;/span&gt; &lt;span class="nx"&gt;stream&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="nx"&gt;masto&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;stream&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;streamPublicTimeline&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Finally, you're ready to subscribe to the updates from the public stream provided by the API.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;&lt;span class="nx"&gt;stream&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;on&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;update&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;status&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="nx"&gt;console&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;log&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;status&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="c1"&gt;// next - send status data to Apache Kafka topic&lt;/span&gt;
&lt;span class="p"&gt;});&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Now time to put these building blocks together.&lt;/p&gt;

&lt;p&gt;For the sake of encapsulation, you wouldn't want the &lt;code&gt;mastostream&lt;/code&gt; module to know directly about the Apache Kafka producer. That's why when putting all the above ingredients together we provide the module &lt;code&gt;mastostream&lt;/code&gt; with a more generic callback argument.&lt;br&gt;
This callback function will return the Mastodon status message converted to a string, and the party that triggered the &lt;code&gt;mastostream&lt;/code&gt; will receive the data and be able to act on it:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;&lt;span class="k"&gt;export&lt;/span&gt; &lt;span class="k"&gt;default&lt;/span&gt; &lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;callback&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;status&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="kr"&gt;string&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="k"&gt;void&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;try&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="kd"&gt;const&lt;/span&gt; &lt;span class="nx"&gt;masto&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="nx"&gt;login&lt;/span&gt;&lt;span class="p"&gt;({&lt;/span&gt;
            &lt;span class="na"&gt;url&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;https://fosstodon.org/&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="p"&gt;});&lt;/span&gt;

        &lt;span class="c1"&gt;// Connect to the streaming api&lt;/span&gt;
        &lt;span class="kd"&gt;const&lt;/span&gt; &lt;span class="nx"&gt;stream&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="nx"&gt;masto&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;stream&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;streamPublicTimeline&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;

        &lt;span class="c1"&gt;// Subscribe to updates&lt;/span&gt;
        &lt;span class="nx"&gt;stream&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;on&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;update&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;status&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="k"&gt;try&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
                &lt;span class="nx"&gt;callback&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;JSON&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;stringify&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;status&lt;/span&gt;&lt;span class="p"&gt;));&lt;/span&gt;
            &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="k"&gt;catch&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;err&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
                &lt;span class="nx"&gt;console&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;log&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;Callback failed&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nx"&gt;err&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
            &lt;span class="p"&gt;}&lt;/span&gt;
        &lt;span class="p"&gt;});&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="k"&gt;catch&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;err&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="nx"&gt;console&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;log&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;err&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;};&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This is what you need to stream the data from Mastadon! Time to send these messages to an Apache Kafka topic.&lt;/p&gt;

&lt;h2&gt;
  
  
  Sending messages to Apache Kafka using &lt;code&gt;node-rdkafka&lt;/code&gt;
&lt;/h2&gt;

&lt;p&gt;Open &lt;code&gt;producer.ts&lt;/code&gt; to see the code you need to send the data to an Apache Kafka topic. To work with Apache Kafka you can use one of the existing client libraries, there are several options available. This project uses &lt;code&gt;node-rdkafka&lt;/code&gt;, which is a NodeJS wrapper for the Kafka C/C++ library. Check &lt;a href="https://github.com/Blizzard/node-rdkafka"&gt;its GitHub repository Readme&lt;/a&gt; for installation steps.&lt;/p&gt;

&lt;p&gt;With &lt;code&gt;node-rdkafka&lt;/code&gt; you can create a producer to send data to the cluster. This is where you'll use the Apache Kafka configuration settings defined in &lt;code&gt;.env&lt;/code&gt; earlier and the certificates that you downloaded to prepare to establish a secure connection.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;
&lt;span class="c1"&gt;//create a producer&lt;/span&gt;
&lt;span class="kd"&gt;const&lt;/span&gt; &lt;span class="nx"&gt;producer&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nx"&gt;Kafka&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;Producer&lt;/span&gt;&lt;span class="p"&gt;({&lt;/span&gt;
    &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;metadata.broker.list&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nx"&gt;process&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;env&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;kafka.uri&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt;
    &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;security.protocol&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;ssl&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;ssl.key.location&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nx"&gt;process&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;env&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;ssl.key.location&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt;
    &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;ssl.certificate.location&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nx"&gt;process&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;env&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;ssl.certificate.location&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt;
    &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;ssl.ca.location&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nx"&gt;process&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;env&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;ssl.ca.location&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt;
    &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;dr_cb&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="kc"&gt;true&lt;/span&gt;
&lt;span class="p"&gt;});&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The producer will emit events when things happen, so to understand what is happening and to catch any errors, we subscribe to numerous events including delivery reports.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;
&lt;span class="nx"&gt;producer&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;on&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;event.log&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="kd"&gt;function&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;log&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="nx"&gt;console&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;log&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;log&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
&lt;span class="p"&gt;});&lt;/span&gt;

&lt;span class="c1"&gt;//logging all errors&lt;/span&gt;
&lt;span class="nx"&gt;producer&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;on&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;event.error&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="kd"&gt;function&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;err&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="nx"&gt;console&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;error&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;err&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
&lt;span class="p"&gt;});&lt;/span&gt;

&lt;span class="nx"&gt;producer&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;on&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;connection.failure&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="kd"&gt;function&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;err&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="nx"&gt;console&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;error&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;err&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
&lt;span class="p"&gt;});&lt;/span&gt;

&lt;span class="nx"&gt;producer&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;on&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;delivery-report&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="kd"&gt;function&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;err&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nx"&gt;report&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="nx"&gt;console&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;log&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;Message was delivered&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="nx"&gt;JSON&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;stringify&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;report&lt;/span&gt;&lt;span class="p"&gt;));&lt;/span&gt;
&lt;span class="p"&gt;});&lt;/span&gt;

&lt;span class="nx"&gt;producer&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;on&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;disconnected&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="kd"&gt;function&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;arg&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="nx"&gt;console&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;log&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;producer disconnected. &lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="nx"&gt;JSON&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;stringify&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;arg&lt;/span&gt;&lt;span class="p"&gt;));&lt;/span&gt;
&lt;span class="p"&gt;});&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;One last event, which is especially important to use, is called &lt;code&gt;on ready&lt;/code&gt;. This is the moment where the producer is ready to dispatch a message to the topic. This method will rely on the callback provided by the &lt;code&gt;mastostream&lt;/code&gt; module that we implemented in the previous section:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;&lt;span class="nx"&gt;producer&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;on&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;ready&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="o"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="nx"&gt;mastostream&lt;/span&gt;&lt;span class="p"&gt;((&lt;/span&gt;&lt;span class="nx"&gt;status&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; 
        &lt;span class="nx"&gt;producer&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;produce&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
            &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;mastodon&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;  &lt;span class="c1"&gt;// topic to send the message to&lt;/span&gt;
            &lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;  &lt;span class="c1"&gt;// partition, null for librdkafka default partitioner&lt;/span&gt;
            &lt;span class="nx"&gt;Buffer&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="k"&gt;from&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;status&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;  &lt;span class="c1"&gt;// value&lt;/span&gt;
            &lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;  &lt;span class="c1"&gt;// optional key&lt;/span&gt;
            &lt;span class="nb"&gt;Date&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;now&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;  &lt;span class="c1"&gt;// optional timestamp&lt;/span&gt;
        &lt;span class="p"&gt;);&lt;/span&gt;
        &lt;span class="nx"&gt;producer&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;flush&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;2000&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
    &lt;span class="p"&gt;}).&lt;/span&gt;&lt;span class="k"&gt;catch&lt;/span&gt;&lt;span class="p"&gt;((&lt;/span&gt;&lt;span class="nx"&gt;error&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;throw&lt;/span&gt; &lt;span class="nx"&gt;error&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="p"&gt;});&lt;/span&gt;
&lt;span class="p"&gt;});&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Yet, none of the above will work till you run the &lt;code&gt;connect()&lt;/code&gt; method. With the snippet below, run your code and watch the data start to flow!&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;&lt;span class="nx"&gt;producer&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;connect&lt;/span&gt;&lt;span class="p"&gt;({},&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;err&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;err&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="nx"&gt;console&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;error&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;err&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;});&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This method has an optional second parameter, which is a callback that you can use to be informed about any errors during the connection.&lt;/p&gt;

&lt;p&gt;We've now seen all the code and examined how it all works together. By separating the concerns of collecting data from Mastodon, and passing it to Apache Kafka, we have a system that can also be adapted to handle different data sources as needed.&lt;/p&gt;

&lt;h2&gt;
  
  
  What's next
&lt;/h2&gt;

&lt;p&gt;With the data being constantly collected in the topic you can now use it as an input for other tools and databases, such as OpenSearch®, ClickHouse®, PostgreSQL®. Apache Kafka® Connect connectors will help you bring the data into other systems with no code required. Learn more about &lt;a href="https://aiven.io/blog/what-is-apache-kafka#apache-kafka-connect?utm_source=devto&amp;amp;utm_medium=organic&amp;amp;utm_campaign=blog_art"&gt;Apache Kafka and Kafka Connect&lt;/a&gt; and check the full list of &lt;a href="https://docs.aiven.io/docs/products/kafka/kafka-connect.html#sink-connectors"&gt;sink connectors&lt;/a&gt; that are available in Aiven platform to see where you can bring the data for further storage and analysis.&lt;/p&gt;

</description>
      <category>apachekafka</category>
      <category>mastodon</category>
      <category>data</category>
    </item>
    <item>
      <title>First steps with the Apache Kafka® Java client library</title>
      <dc:creator>Olena Kutsenko</dc:creator>
      <pubDate>Tue, 21 Mar 2023 16:35:39 +0000</pubDate>
      <link>https://forem.com/olena_kutsenko/first-steps-with-the-apache-kafkar-java-client-library-2ea</link>
      <guid>https://forem.com/olena_kutsenko/first-steps-with-the-apache-kafkar-java-client-library-2ea</guid>
      <description>&lt;p&gt;It's difficult to imagine the development of mission-critical software without relying on an event streaming platform such as Apache Kafka®. But perhaps you're new to Apache Kafka® and want to go deeper. You're in the right place! This article will guide your first steps using Apache Kafka® with Java.&lt;/p&gt;

&lt;p&gt;If you can't wait to see the final result, &lt;a href="https://github.com/anelook/apache-kafka-first-steps-java"&gt;this GitHub repository has the producer and consumer&lt;/a&gt; we'll write in the step-by-step guidance provided in this article. &lt;/p&gt;

&lt;h2&gt;
  
  
  Get equipped with what you need
&lt;/h2&gt;

&lt;p&gt;In this blog post you'll learn how to create an Apache Kafka® producer and a consumer in Java. You'll prepare configuration files needed for a secure connection and write some Java to send messages to the cluster and poll them back.&lt;/p&gt;

&lt;p&gt;Before you start writing the code, there are several things you'll need to prepare. &lt;/p&gt;

&lt;h3&gt;
  
  
  Apache Kafka cluster
&lt;/h3&gt;

&lt;p&gt;First, you'll need Apache Kafka cluster itself. To simplify the setup, you can use &lt;a href="https://aiven.io/kafka?utm_source=devto&amp;amp;utm_medium=organic&amp;amp;utm_campaign=blog_art"&gt;Aiven for Apache Kafka®&lt;/a&gt;. Aiven for Apache Kafka® is a fully managed solution which builds a cluster with the correct configuration in just minutes, takes care of secure authentication, and other essentials. If you don't have an Aiven account yet, register for &lt;a href="https://console.aiven.io/signup?utm_source=devto&amp;amp;utm_medium=organic&amp;amp;utm_campaign=blog_art"&gt;a free trial&lt;/a&gt;. &lt;/p&gt;

&lt;p&gt;Once you're in the console, create a new service: in the &lt;strong&gt;Create service&lt;/strong&gt; dialog select &lt;strong&gt;Apache Kafka&lt;/strong&gt;, &lt;strong&gt;the cloud&lt;/strong&gt; of your choice and the nearest to you &lt;strong&gt;cloud region&lt;/strong&gt;.  The &lt;strong&gt;Startup&lt;/strong&gt; service plan is sufficient for today. Set a name for your service, for example &lt;strong&gt;apache-kafka-playground&lt;/strong&gt;.  &lt;/p&gt;

&lt;p&gt;While deploying the service, you can proceed with other tasks.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--9k4Ox09O--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/1zjel9uz5igox5uj8x3o.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--9k4Ox09O--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/1zjel9uz5igox5uj8x3o.png" alt="Screenshot showing newly created Aiven for Apache Kafka service, service is still rebuilding" width="880" height="445"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h3&gt;
  
  
  Java project with dependencies
&lt;/h3&gt;

&lt;p&gt;The second thing you'll need is a JDK installed on your computer and a basic Java project. This article assumes you have basic knowledge of Java. I used the Java 11 JDK when running this code, but Apache Kafka® supports Java 17, so you have plenty of choice.&lt;/p&gt;

&lt;p&gt;You'll also need an official low-level Apache Kafka® client library for Java, a &lt;em&gt;reference client&lt;/em&gt;, to create a producer and a consumer. Note that if you plan to work with Java APIs for Kafka Streams or Kafka Connect, you'll need an additional set of libraries. &lt;/p&gt;

&lt;p&gt;The most convenient way of including &lt;code&gt;kafka-client&lt;/code&gt; in your Java project is by either using &lt;a href="https://maven.apache.org/"&gt;Maven&lt;/a&gt; or &lt;a href="https://gradle.org/"&gt;Gradle&lt;/a&gt;. Select the latest version of the &lt;a href="https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients"&gt;kafka-client from mvnrepository&lt;/a&gt;, choose which build tool you use, copy the dependency and add it to your project. &lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--Jxc0pQfi--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/ibi45q693t3lqkspdr7v.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--Jxc0pQfi--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/ibi45q693t3lqkspdr7v.png" alt="Screenshot showing selecting gradle dependency for apache kafka client from mvnrepository" width="880" height="629"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;I used Gradle. I pasted the dependency into the &lt;em&gt;build.gradle&lt;/em&gt; file and let Intellij Idea load necessary files by selecting &lt;strong&gt;Reload All Gradle Projects&lt;/strong&gt;. &lt;/p&gt;

&lt;p&gt;In addition to Apache Kafka® client, you'll also need several other libraries:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;a href="https://mvnrepository.com/artifact/org.slf4j/slf4j-simple"&gt;slf4j-simple&lt;/a&gt; for logging&lt;/li&gt;
&lt;li&gt;
&lt;a href="https://mvnrepository.com/artifact/org.json/json"&gt;JSON&lt;/a&gt; to create and parse JSON objects&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Set up configuration and authentication for Apache Kafka®
&lt;/h2&gt;

&lt;p&gt;Before creating the producers and consumers, you need to specify several configuration properties. These ensure  that information exchanged by Kafka brokers and clients is kept complete, secure, and confidential. &lt;/p&gt;

&lt;p&gt;Aiven offers two authentication approaches: &lt;strong&gt;TLS&lt;/strong&gt; and &lt;strong&gt;SASL&lt;/strong&gt;. In this article we'll use TLS for both authentication and encryption. If you want to use SASL, check out &lt;a href="https://docs.aiven.io/docs/products/kafka/howto/kafka-sasl-auth.html"&gt;the SASL instructions in Aiven's documentation&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;Usually, to perform a TLS handshake, you need to configure both Apache Kafka® brokers and the clients. To simplify things Aiven takes care of TLS configuration for the brokers, so you only need to configure the clients. And, as we'll see, even with the clients Aiven does most of the work for you. &lt;/p&gt;

&lt;p&gt;To establish a TLS connection between the client and the server three things need to happen:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;The client needs to verify the identity of the server.&lt;/li&gt;
&lt;li&gt;The server needs to verify the identity of the client.&lt;/li&gt;
&lt;li&gt;All messages in transit between the client and server must be encrypted.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;To do this, we'll use Java truststore and keystores.&lt;/p&gt;

&lt;p&gt;A truststore in Java is a place where you store the certificates of external systems that you trust. These certificates don't contain sensitive information, but they are important to identify and connect to a third-party system. On the other hand, the keystore contains the private access key and its corresponding access certificate, which are needed to authenticate the client. You shouldn't share keystore data with anyone.&lt;/p&gt;

&lt;p&gt;If you're adventurous, you can create these files manually (here is &lt;a href="https://docs.aiven.io/docs/products/kafka/howto/keystore-truststore.html"&gt;the guide&lt;/a&gt; how to do this). However, you can also use a convenient shortcut and let Aiven platform do all the job for us. &lt;/p&gt;

&lt;p&gt;Run &lt;a href="https://docs.aiven.io/docs/tools/cli/service/user.html#avn-service-user-kafka-java-creds"&gt;&lt;code&gt;avn service user-kafka-java-creds&lt;/code&gt;&lt;/a&gt; using the &lt;a href="https://docs.aiven.io/docs/tools/cli.html"&gt;Aiven CLI&lt;/a&gt; with the information about the service and the user:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;YOUR-SERVICE-NAME&lt;/strong&gt; - the name of your Apache Kafka service as you defined it during creation&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;YOUR-USER-NAME&lt;/strong&gt; - the name of the user who performs the operation (if you're in doubt, run &lt;code&gt;avn service user-list --format '{username}' --project YOUR-PROJECT-NAME YOUR-SERVICE-NAME&lt;/code&gt; to see users)&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;PASSWORD&lt;/strong&gt; - select a secure password for your keystore and truststore&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Now using apply those fields to the command below and run it:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="n"&gt;avn&lt;/span&gt; &lt;span class="n"&gt;service&lt;/span&gt; &lt;span class="n"&gt;user&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="n"&gt;kafka&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="n"&gt;java&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="n"&gt;creds&lt;/span&gt; &lt;span class="no"&gt;YOUR&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="no"&gt;SERVICE&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="no"&gt;NAME&lt;/span&gt; &lt;span class="o"&gt;--&lt;/span&gt;&lt;span class="n"&gt;username&lt;/span&gt; &lt;span class="no"&gt;YOUR&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="no"&gt;USER&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="no"&gt;NAME&lt;/span&gt; &lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="n"&gt;d&lt;/span&gt; &lt;span class="n"&gt;src&lt;/span&gt;&lt;span class="o"&gt;/&lt;/span&gt;&lt;span class="n"&gt;main&lt;/span&gt;&lt;span class="o"&gt;/&lt;/span&gt;&lt;span class="n"&gt;resources&lt;/span&gt; &lt;span class="o"&gt;--&lt;/span&gt;&lt;span class="n"&gt;password&lt;/span&gt; &lt;span class="no"&gt;PASSWORD&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;If all goes well you will see six new files appeared in the &lt;strong&gt;resources&lt;/strong&gt; folder. Aiven downloads necessary certificates, creats both keystore and trustore, as well as puts all references into a single file &lt;strong&gt;client.properties&lt;/strong&gt;.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--OMg-eYHQ--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/64f5kncoctjlyh4u7sjo.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--OMg-eYHQ--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/64f5kncoctjlyh4u7sjo.png" alt="6 new files that were added after running  raw ``avn service user-kafka-java-creds`` endraw " width="880" height="345"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;To make it easier to read the settings that are located in &lt;strong&gt;client.properties&lt;/strong&gt;, add a small static method &lt;code&gt;loadProperties&lt;/code&gt; into a new class &lt;code&gt;Utils&lt;/code&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="kn"&gt;package&lt;/span&gt; &lt;span class="nn"&gt;org.example&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;java.io.IOException&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;java.io.InputStream&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;java.util.Properties&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.apache.kafka.common.serialization.StringSerializer&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.slf4j.Logger&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.slf4j.LoggerFactory&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;Utils&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;static&lt;/span&gt; &lt;span class="nc"&gt;Properties&lt;/span&gt; &lt;span class="nf"&gt;loadProperties&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="nc"&gt;Properties&lt;/span&gt; &lt;span class="n"&gt;properties&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;Properties&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
        &lt;span class="k"&gt;try&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;InputStream&lt;/span&gt; &lt;span class="n"&gt;input&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;ProducerOneMessage&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;class&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getClassLoader&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;getResourceAsStream&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"client-ssl.properties"&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
            &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;input&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
                &lt;span class="nc"&gt;System&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;out&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;println&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Sorry, unable to find config.properties"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
            &lt;span class="o"&gt;}&lt;/span&gt;
            &lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;load&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;input&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
            &lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"key.serializer"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;StringSerializer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;class&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getName&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
            &lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"value.serializer"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;StringSerializer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;class&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getName&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
        &lt;span class="o"&gt;}&lt;/span&gt; &lt;span class="k"&gt;catch&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;IOException&lt;/span&gt; &lt;span class="n"&gt;ex&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
            &lt;span class="n"&gt;ex&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;printStackTrace&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
        &lt;span class="o"&gt;}&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Congratulations! You're done with the configuration settings.&lt;/p&gt;

&lt;h2&gt;
  
  
  Dispatch events to Apache Kafka® cluster
&lt;/h2&gt;

&lt;p&gt;Time to send the data to the Apache Kafka® cluster. For this you need a producer.&lt;br&gt;
In your project create a new Java class called &lt;code&gt;Producer&lt;/code&gt; and add the main method there. &lt;/p&gt;

&lt;p&gt;To send a message you'll need to do these four steps:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;Producer&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;static&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;main&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;[]&lt;/span&gt; &lt;span class="n"&gt;args&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="c1"&gt;// Step # 1: create a producer and connect to the cluster&lt;/span&gt;
        &lt;span class="c1"&gt;// Step # 2: define the topic name&lt;/span&gt;
        &lt;span class="c1"&gt;// Step # 3: create a message record&lt;/span&gt;
        &lt;span class="c1"&gt;// Step # 4: send the record to the cluster&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;For each of these steps you can rely on the functionality provided by the official Apache Kafka® client library for Java, which you added as a dependency previously.&lt;/p&gt;

&lt;p&gt;Here is what you have to import for the Producer class to work:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.apache.kafka.clients.producer.KafkaProducer&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.apache.kafka.clients.producer.ProducerRecord&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.apache.kafka.common.serialization.StringSerializer&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.json.JSONObject&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.slf4j.Logger&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.slf4j.LoggerFactory&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;java.util.Properties&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;it's also a good idea to use an instance of &lt;code&gt;Logger&lt;/code&gt; to log events later.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="nc"&gt;Logger&lt;/span&gt; &lt;span class="n"&gt;logger&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;LoggerFactory&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getLogger&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Producer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;class&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getName&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Step 1: create a producer and connect to the cluster
&lt;/h3&gt;

&lt;p&gt;The constructor of &lt;code&gt;KafkaProducer&lt;/code&gt; from &lt;em&gt;Apache Kafka client library&lt;/em&gt; expects a list of properties to establish a connection. You already did most of the heavy lifting to define a connection configuration in the previous section. Now, just reference those entries with the helpful utility method &lt;code&gt;Utils.loadProperties()&lt;/code&gt; that you added above.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="c1"&gt;// step # 1: create a producer and connect to the cluster&lt;/span&gt;
&lt;span class="c1"&gt;// get connection data from the configuration file&lt;/span&gt;

&lt;span class="nc"&gt;Properties&lt;/span&gt; &lt;span class="n"&gt;properties&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;Utils&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;loadProperties&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
&lt;span class="nc"&gt;KafkaProducer&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;producer&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt;
        &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;KafkaProducer&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&amp;gt;(&lt;/span&gt;&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;One more thing you need to define is the format to serialize data in. In this example we'll send JSON objects as strings using&lt;code&gt;StringSerializer&lt;/code&gt;. You should also add a serializer for the key. Even though you won't need to use the keys explicitly in the first example, specifying &lt;code&gt;key.serializer&lt;/code&gt; is mandatory.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"key.serializer"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;StringSerializer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;class&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getName&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"value.serializer"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;StringSerializer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;class&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getName&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Now that you have a set of properties to establish a connection, you can create an instance of &lt;code&gt;KafkaProducer&lt;/code&gt; and pass the properties into its constructor:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="nc"&gt;KafkaProducer&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;producer&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;KafkaProducer&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;(&lt;/span&gt;&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;At this point you don't send any data to the cluster. However, it's useful to run the &lt;code&gt;Producer&lt;/code&gt; to see how the connection with the server is established and if there are any errors:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--QQNzqQyJ--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/pac1h5zv1tqk5qx5b0g8.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--QQNzqQyJ--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/pac1h5zv1tqk5qx5b0g8.png" alt="establishing-connection" width="880" height="642"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h3&gt;
  
  
  Step 2: define the topic name
&lt;/h3&gt;

&lt;p&gt;When sending the data to the cluster, you need to define a topic to send the message to.&lt;/p&gt;

&lt;p&gt;I created a topic named &lt;em&gt;customer-activity&lt;/em&gt; which records activity of customers in an online shop. You can be more creative and choose a different theme for your messages!&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;topicName&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"customer-activity"&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Note that once you select the name of your topic, you need to create it in your Aiven for Apache Kafka® cluster. Even though you can configure Apache Kafka® to create topics automatically upon message arrival, it's best to keep that option disabled to avoid accidentally creating a bunch of unnecessary topics.  You can create a topic in Aiven for Apache Kafka® using the handy CLI shortcut &lt;a href="https://docs.aiven.io/docs/tools/cli/service/topic.html#avn-cli-service-topic-create"&gt;&lt;code&gt;avn-cli-service-topic-create&lt;/code&gt;&lt;/a&gt; or follow &lt;a href="https://docs.aiven.io/docs/products/kafka/howto/create-topic.html"&gt;these steps to create a topic&lt;/a&gt; through the Aiven console.&lt;/p&gt;

&lt;p&gt;Here is the configuration of the topic I used, you can see that it contains three partitions and three replications:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--ca3oPF-R--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/h44nb4ho119atjscdjyt.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--ca3oPF-R--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/h44nb4ho119atjscdjyt.png" alt="Screenshot that shows adding a new topic through Aiven's console" width="880" height="405"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h3&gt;
  
  
  Step # 3: create a message record
&lt;/h3&gt;

&lt;p&gt;Messages can be sent in a variety of formats: String, JSON, Avro, protobuf, etc. In fact, Kafka doesn't have any opinion on the structure of data you want to send, which makes the platform very flexible. At times this gets messy, but &lt;a href="https://aiven.io/blog/what-is-karapace?utm_source=devto&amp;amp;utm_medium=organic&amp;amp;utm_campaign=blog_art"&gt;Karapace, Aiven's open source schema registry&lt;/a&gt;, can help you organize your data better if needed.&lt;/p&gt;

&lt;p&gt;For simplicity, use JSON for this example and define an object with three properties: a customer name, an operation that was performed and a product that was affected.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="nc"&gt;JSONObject&lt;/span&gt; &lt;span class="n"&gt;message&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;JSONObject&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
&lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"customer"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"Judy Hopps🐰"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"product"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"Carrot 🥕"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"operation"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"ordered"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Create a new &lt;code&gt;ProductRecord&lt;/code&gt; instance by passing the topic name and the message to the constructor:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="c1"&gt;// package the message in the record&lt;/span&gt;
&lt;span class="nc"&gt;ProducerRecord&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;ProducerRecord&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&amp;gt;(&lt;/span&gt;&lt;span class="n"&gt;topicName&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;toString&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
&lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;info&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Record created: "&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Note, that using &lt;code&gt;&amp;lt;String, String&amp;gt;&lt;/code&gt; indicates that the producer expects both the key and the value in &lt;code&gt;String&lt;/code&gt; format. &lt;/p&gt;

&lt;h3&gt;
  
  
  Step # 4: send the record to the cluster
&lt;/h3&gt;

&lt;p&gt;Finally, to send the message to Apache cluster topic, call the &lt;code&gt;send()&lt;/code&gt; method of the producer instance and provide it with the record:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="n"&gt;producer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;send&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="n"&gt;producer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;flush&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
&lt;span class="n"&gt;producer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;close&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;To run the producer, call the &lt;code&gt;main()&lt;/code&gt; method of &lt;code&gt;Producer&lt;/code&gt; class with the help of the IDE. Alternatively, you can use Gradle and set up the tasks to run the producer, as it's done &lt;a href="https://github.com/anelook/apache-kafka-first-steps-java/blob/main/build.gradle"&gt;in the accompanying repository&lt;/a&gt;. You should see the output similar to this:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--57V4WCXJ--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/m59f55nflr74kux5hv2w.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--57V4WCXJ--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/m59f55nflr74kux5hv2w.png" alt="Screenshot showing running producer that sent a single message to the cluster" width="880" height="323"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;The &lt;code&gt;send()&lt;/code&gt; method of the producer also accepts a callback interface, which provides us with metadata and information about exceptions. You can introduce it by doing the following changes:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="n"&gt;producer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;send&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;Callback&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="nd"&gt;@Override&lt;/span&gt;
    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;onCompletion&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;RecordMetadata&lt;/span&gt; &lt;span class="n"&gt;metadata&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;Exception&lt;/span&gt; &lt;span class="n"&gt;exception&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;exception&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
            &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;info&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Sent successfully. Metadata: "&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="n"&gt;metadata&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;toString&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
        &lt;span class="o"&gt;}&lt;/span&gt; &lt;span class="k"&gt;else&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
            &lt;span class="n"&gt;exception&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;printStackTrace&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
        &lt;span class="o"&gt;}&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;});&lt;/span&gt;
&lt;span class="n"&gt;producer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;flush&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
&lt;span class="n"&gt;producer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;close&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;code&gt;RecordMetadata&lt;/code&gt; and &lt;code&gt;Callback&lt;/code&gt; will need extra imports:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.apache.kafka.clients.producer.RecordMetadata&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.apache.kafka.clients.producer.Callback&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Send multiple messages
&lt;/h2&gt;

&lt;p&gt;Great, you successfully sent a single message to the cluster! However, sending messages one by one is tedious. Before moving to the consumer, transform the code to imitate a continuous (even if overly simplified) flow of data.&lt;/p&gt;

&lt;p&gt;To do this, let's separate the method to generate messages:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;
&lt;span class="kd"&gt;static&lt;/span&gt; &lt;span class="kd"&gt;final&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;[]&lt;/span&gt; &lt;span class="n"&gt;operations&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;&lt;span class="s"&gt;"searched"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"bought"&lt;/span&gt;&lt;span class="o"&gt;};&lt;/span&gt;
&lt;span class="kd"&gt;static&lt;/span&gt; &lt;span class="kd"&gt;final&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;[]&lt;/span&gt; &lt;span class="n"&gt;customers&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;&lt;span class="s"&gt;"Judy Hopps🐰"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"Nick Wilde🦊"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"Chief Bogo🐃"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"Officer Clawhauser😼"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"Mayor Lionheart 🦁"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"Mr. Big 🪑"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"Fru Fru💐"&lt;/span&gt;&lt;span class="o"&gt;};&lt;/span&gt;
&lt;span class="kd"&gt;static&lt;/span&gt; &lt;span class="kd"&gt;final&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;[]&lt;/span&gt; &lt;span class="n"&gt;products&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;&lt;span class="s"&gt;"Donut 🍩"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"Carrot 🥕"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"Tie 👔"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"Glasses 👓️️"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"Phone ☎️"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"Ice cream 🍨"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"Dress 👗"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"Pineapple pizza 🍕"&lt;/span&gt;&lt;span class="o"&gt;};&lt;/span&gt;

&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;static&lt;/span&gt; &lt;span class="nc"&gt;JSONObject&lt;/span&gt; &lt;span class="nf"&gt;generateMessage&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="nc"&gt;JSONObject&lt;/span&gt; &lt;span class="n"&gt;message&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;JSONObject&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;

    &lt;span class="c1"&gt;// randomly assign values&lt;/span&gt;
    &lt;span class="nc"&gt;Random&lt;/span&gt; &lt;span class="n"&gt;randomizer&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;Random&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
    &lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"customer"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;customers&lt;/span&gt;&lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="n"&gt;randomizer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;nextInt&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;7&lt;/span&gt;&lt;span class="o"&gt;)]);&lt;/span&gt;
    &lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"product"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;products&lt;/span&gt;&lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="n"&gt;randomizer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;nextInt&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;7&lt;/span&gt;&lt;span class="o"&gt;)]);&lt;/span&gt;
    &lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"operation"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;operations&lt;/span&gt;&lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="n"&gt;randomizer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;nextInt&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;30&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt; &lt;span class="mi"&gt;25&lt;/span&gt; &lt;span class="o"&gt;?&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt; &lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="o"&gt;]);&lt;/span&gt; &lt;span class="c1"&gt;// prefer 'search' over 'bought'&lt;/span&gt;

    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;And now combine the steps to generate and send data within an endless while loop. Note that using &lt;code&gt;while(true)&lt;/code&gt; and &lt;code&gt;Thread.sleep&lt;/code&gt; aren't things you want to do in a production environment, but for our purposes they work well:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="k"&gt;try&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;KafkaProducer&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;producer&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;KafkaProducer&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&amp;gt;(&lt;/span&gt;&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="c1"&gt;// step # 2: define the topic name&lt;/span&gt;
    &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;topicName&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"customer-activity"&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

    &lt;span class="c1"&gt;// step # 3: generate and send message data&lt;/span&gt;
    &lt;span class="k"&gt;while&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="kc"&gt;true&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="c1"&gt;// generate a new message&lt;/span&gt;
        &lt;span class="nc"&gt;JSONObject&lt;/span&gt; &lt;span class="n"&gt;message&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;generateMessage&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;

        &lt;span class="c1"&gt;// package the message in a record&lt;/span&gt;
        &lt;span class="nc"&gt;ProducerRecord&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt;
                &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;ProducerRecord&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&amp;gt;(&lt;/span&gt;&lt;span class="n"&gt;topicName&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;toString&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
        &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;info&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Record created: "&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

        &lt;span class="c1"&gt;// send data&lt;/span&gt;
        &lt;span class="n"&gt;producer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;send&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;Callback&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
            &lt;span class="nd"&gt;@Override&lt;/span&gt;
            &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;onCompletion&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;RecordMetadata&lt;/span&gt; &lt;span class="n"&gt;metadata&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;Exception&lt;/span&gt; &lt;span class="n"&gt;exception&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
                &lt;span class="k"&gt;if&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;exception&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
                    &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;info&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Sent successfully. Metadata: "&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="n"&gt;metadata&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;toString&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
                &lt;span class="o"&gt;}&lt;/span&gt; &lt;span class="k"&gt;else&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
                    &lt;span class="n"&gt;exception&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;printStackTrace&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
                &lt;span class="o"&gt;}&lt;/span&gt;
            &lt;span class="o"&gt;}&lt;/span&gt;
        &lt;span class="o"&gt;});&lt;/span&gt;
        &lt;span class="nc"&gt;Thread&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;sleep&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;1000&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Now while running the producer, you continuously send records into the cluster:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--RLTQSuRw--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/tbsf1q71z37rvi05e2d1.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--RLTQSuRw--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/tbsf1q71z37rvi05e2d1.png" alt="Screenshot showing the producer sending multiple messages to the Kafka topic" width="880" height="452"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Consume the data from Apache Kafka topic
&lt;/h2&gt;

&lt;p&gt;Now that the messages are generated and sent by the producer into the cluster, you can create a consumer to poll and process those messages.&lt;/p&gt;

&lt;p&gt;Creation of a simple consumer can be divided into three steps:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;Consumer&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;static&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;main&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;[]&lt;/span&gt; &lt;span class="n"&gt;args&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="c1"&gt;// Step # 1: create a consumer and connect to the cluster&lt;/span&gt;
        &lt;span class="c1"&gt;// Step # 2: subscribe consumer to the topics&lt;/span&gt;
        &lt;span class="c1"&gt;// Step # 3: poll and process new data&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Here are the imports for the code below:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.apache.kafka.clients.consumer.ConsumerRecord&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.apache.kafka.clients.consumer.ConsumerRecords&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.apache.kafka.clients.consumer.KafkaConsumer&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.apache.kafka.common.serialization.StringDeserializer&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.slf4j.Logger&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.slf4j.LoggerFactory&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;java.time.Duration&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;java.util.Collections&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;java.util.Properties&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Step 1: create a consumer and connect to the cluster
&lt;/h3&gt;

&lt;p&gt;Similar to how you configured the producer's properties, you need to specify connection information for the consumer.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="c1"&gt;// step # 1: create a consumer and connect to the cluster&lt;/span&gt;
&lt;span class="c1"&gt;// get connection data from the configuration file&lt;/span&gt;
&lt;span class="nc"&gt;Properties&lt;/span&gt; &lt;span class="n"&gt;properties&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;Utils&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;loadProperties&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"key.deserializer"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;StringDeserializer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;class&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getName&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"value.deserializer"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;StringDeserializer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;class&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getName&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"group.id"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"first"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"auto.offset.reset"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"earliest"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt; &lt;span class="c1"&gt;//choose from earliest/latest/none&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;In addition to the properties that you used for producer, the consumer has a couple of new ones. First, the consumer needs to be able to deserialize the data that it reads from the cluster, so instead of serialization properties you define deserialization ones:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"key.deserializer"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;StringDeserializer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;class&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getName&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"value.deserializer"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;StringDeserializer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;class&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getName&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;You also need to assign the consumer to a consumer group. Do this by specifying a &lt;code&gt;group.id&lt;/code&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"group.id"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"first"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The last thing you should define is the point from which the consumer should start reading the data when it first connects to a topic. You can define a specific offset, or, alternatively, point to either the earliest or the latest message currently present in the topic. Set &lt;code&gt;auto.offset.reset&lt;/code&gt; to &lt;code&gt;earliest&lt;/code&gt; to read the messages from the start.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"auto.offset.reset"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"earliest"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt; 
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Using the connection properties that you defined, you're ready to create the consumer:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;KafkaConsumer&amp;lt;String,String&amp;gt; consumer = new KafkaConsumer&amp;lt;String, String&amp;gt;(properties);
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Step 2: subscribe consumer to the topic
&lt;/h3&gt;

&lt;p&gt;Subscribe the consumer to one or more topics:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;topicName&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"customer-activity"&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;subscribe&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Collections&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;singleton&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;topicName&lt;/span&gt;&lt;span class="o"&gt;));&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Step 3: poll and process new data
&lt;/h3&gt;

&lt;p&gt;The last step is to poll data on a regular basis from the Apache Kafka® topic. For this use the &lt;code&gt;poll()&lt;/code&gt; method and specify how long the consumer should wait for new messages to arrive.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="c1"&gt;// step # 3 poll andprocess new data&lt;/span&gt;
&lt;span class="k"&gt;while&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="kc"&gt;true&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="c1"&gt;// poll new data&lt;/span&gt;
    &lt;span class="nc"&gt;ConsumerRecords&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;records&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;poll&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Duration&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;ofMillis&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;100&lt;/span&gt;&lt;span class="o"&gt;));&lt;/span&gt;
    &lt;span class="c1"&gt;// process new data&lt;/span&gt;
    &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ConsumerRecord&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt; &lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="n"&gt;records&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;info&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"message: "&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;value&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Finally, it's time to start the consumer to read all the data sent by the producer. Again, you can either use the help of your IDE to run  the&lt;code&gt;main()&lt;/code&gt; method, or use the powers of &lt;strong&gt;Gradle&lt;/strong&gt; – see &lt;a href="https://github.com/anelook/apache-kafka-first-steps-java"&gt;how it's done&lt;/a&gt; in the accompanying repository.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--4LomcSHf--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/c2iao7td6elxnfpux7b3.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--4LomcSHf--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/c2iao7td6elxnfpux7b3.png" alt="Screenshot showing consumer polling and printing out data" width="880" height="476"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Maintain the ordering of messages for every customer
&lt;/h2&gt;

&lt;p&gt;With the producer and consumer created, you can now send and read the data from the Kafka cluster.&lt;br&gt;
However, if you look at the records closely, you might notice that the order of the records as read by consumer is different from when they were sent by producer. &lt;/p&gt;

&lt;p&gt;Even though it's a natural side effect of a distributed system, you often want to maintain the order across the messages. This challenge is discussed in detail in a separate blog post, &lt;a href="https://aiven.io/blog/balance-data-across-kafka-partitions?utm_source=devto&amp;amp;utm_medium=organic&amp;amp;utm_campaign=blog_art"&gt;ways to balance your data across Apache Kafka® partitions&lt;/a&gt;. In this post, we'll use one of the strategies suggested in that article: preserving the order of messages with the help of a key.&lt;/p&gt;

&lt;p&gt;In an online shop, the order of operations performed by the customers is important. A customer first adds the product into the basket, then pays for it and only then you dispatch the item. To maintain the sequence of messages related to each individual customer when balancing data across partitions you can use &lt;code&gt;id&lt;/code&gt; of the customer as the key.&lt;/p&gt;

&lt;p&gt;For this on the producer side when creating a record, specify the record's key:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="c1"&gt;// create a producer record&lt;/span&gt;
&lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;key&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;get&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"customer"&lt;/span&gt;&lt;span class="o"&gt;).&lt;/span&gt;&lt;span class="na"&gt;toString&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
&lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;value&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;toString&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
&lt;span class="nc"&gt;ProducerRecord&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;ProducerRecord&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&amp;gt;(&lt;/span&gt;&lt;span class="n"&gt;topicName&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;To see the effect of this change on the consumer side, print out the partition and offset of each record when coming from the brokers when you process data:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ConsumerRecord&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt; &lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="n"&gt;records&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;info&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"partition "&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;partition&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt;
            &lt;span class="s"&gt;"| offset "&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;offset&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt;
            &lt;span class="s"&gt;"| "&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;value&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Now you can run the updated producer and consumer. In the consumer output, notice that the data for each individual customer is always added into the same partition. With this, even though messages about customers can be reshuffled, messages related to the same customer maintain their original order.&lt;/p&gt;

&lt;p&gt;You can further improve this setup by using each separate shopping trip performed by the customer as a key. Customers perform multiple shopping trips, but each trip is unique and contains the sequence of events that must stay in exactly same order when consumed. A shopping trip contains fewer records than overall activity of a customer and therefore has less probability to lead to unbalanced partitions.&lt;/p&gt;

&lt;h2&gt;
  
  
  Final thoughts and next steps
&lt;/h2&gt;

&lt;p&gt;In this article we covered the first steps to start using Apache Kafka with the official Java client library. You can find the code used for this article in &lt;a href="https://github.com/anelook/apache-kafka-first-steps-java"&gt;a github repository&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;There is still a lot to uncover when using Apache Kafka, so if you'd like to learn more, check out these articles:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://aiven.io/blog/kafka-simply-explained?utm_source=devto&amp;amp;utm_medium=organic&amp;amp;utm_campaign=blog_art"&gt;Apache Kafka® simply explained&lt;/a&gt;&lt;br&gt;
&lt;a href="https://aiven.io/blog/apache-kafka-key-concepts?utm_source=devto&amp;amp;utm_medium=organic&amp;amp;utm_campaign=blog_art"&gt;Apache Kafka® key concepts, A glossary of terms related to Apache Kafka&lt;/a&gt;&lt;br&gt;
&lt;a href="https://aiven.io/blog/balance-data-across-kafka-partitions?utm_source=devto&amp;amp;utm_medium=organic&amp;amp;utm_campaign=blog_art"&gt;Ways to balance your data across Apache Kafka® partitions&lt;/a&gt;&lt;br&gt;
&lt;a href="https://aiven.io/blog/what-is-karapace?utm_source=devto&amp;amp;utm_medium=organic&amp;amp;utm_campaign=blog_art"&gt;What is Karapace? Find out more about the magic that is the schema registry!&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Or poke around our &lt;a href="https://docs.aiven.io/docs/products/kafka"&gt;Apache Kafka documentation&lt;/a&gt; and try out &lt;a href="https://console.aiven.io/signup?utm_source=devto&amp;amp;utm_medium=organic&amp;amp;utm_campaign=blog_art"&gt;Aiven for Apache Kafka&lt;/a&gt;.&lt;/p&gt;

</description>
      <category>apachekafka</category>
      <category>java</category>
      <category>data</category>
    </item>
    <item>
      <title>From a data stream to the data warehouse</title>
      <dc:creator>Olena Kutsenko</dc:creator>
      <pubDate>Wed, 09 Nov 2022 07:16:10 +0000</pubDate>
      <link>https://forem.com/olena_kutsenko/from-a-data-stream-to-the-data-warehouse-3jh7</link>
      <guid>https://forem.com/olena_kutsenko/from-a-data-stream-to-the-data-warehouse-3jh7</guid>
      <description>&lt;p&gt;Apache Kafka® and ClickHouse® are quite different, but also have a lot in common. They are both open source, highly scalable, work best with immutable data and allow us to process big loads of data, but they do all of this in quite different ways. That’s why instead of competing, these technologies actually complement each other quite well.&lt;/p&gt;

&lt;p&gt;Apache Kafka is amazing at handling real-time data feeds. However, in certain cases we need to come back to older records to analyse and process data at later times. This is challenging because Apache Kafka, a streaming platform, is not optimised to access large chunks of data and act as an OLAP (online analytical processing) engine.&lt;/p&gt;

&lt;p&gt;ClickHouse, on the other hand, is a scalable and reliable storage solution designed to handle petabytes of data and, at the same time, a powerful tool for fast online analytical processing, used by many companies for their data analytics.&lt;/p&gt;

&lt;p&gt;By combining both technologies we get a performant data warehouse in ClickHouse, that stays up-to-date by constantly getting fresh data from Apache Kafka.&lt;/p&gt;

&lt;p&gt;You can think of Apache Kafka topics as rivers where real-time data flows. ClickHouse, on the other hand, is the sea where all data eventually goes.&lt;/p&gt;

&lt;p&gt;With that, time to roll up our sleeves and try integrating these two data solutions in practice. Below step by step we'll create the services, integrate them and run some query experiments.&lt;/p&gt;

&lt;h3&gt;
  
  
  Create services
&lt;/h3&gt;

&lt;p&gt;To simplify the setup we'll be using managed versions of Apache Kafka and ClickHouse, both run by Aiven. If you don't have an Aiven account yet, no worries, &lt;a href="https://console.aiven.io/signup/email?utm_source=devto&amp;amp;utm_medium=organic&amp;amp;utm_campaign=blog_art"&gt;registration&lt;/a&gt; is just a step away, and you can use a free trial for this experiment.&lt;/p&gt;

&lt;p&gt;You can create Aiven for ClickHouse and Aiven for Apache Kafka services directly from &lt;a href="https://console.aiven.io/?utm_source=devto&amp;amp;utm_medium=organic&amp;amp;utm_campaign=blog_art"&gt;Aiven's console&lt;/a&gt;. In the examples below I'm using &lt;code&gt;apache-kafka-service&lt;/code&gt; and &lt;code&gt;clickhouse-service&lt;/code&gt; as names for these services, but you can be more creative ;) &lt;/p&gt;

&lt;p&gt;Note, that Aiven for ClickHouse needs at least a startup plan to allow adding integrations.&lt;/p&gt;

&lt;p&gt;Once you've created the services, wait until they are completely deployed and are in &lt;code&gt;RUNNING&lt;/code&gt; state. Now you're ready for action! &lt;/p&gt;

&lt;h3&gt;
  
  
  Prepare Apache Kafka
&lt;/h3&gt;

&lt;p&gt;In order to move data from Apache Kafka to ClickHouse we need to have some data in Apache Kafka in the first place. So we start by creating a topic in Apache Kafka. You can do it &lt;a href="https://developer.aiven.io/docs/products/kafka/howto/create-topic.html?utm_source=devto&amp;amp;utm_medium=organic&amp;amp;utm_campaign=blog_art"&gt;directly from the Aiven console&lt;/a&gt;. Name it &lt;code&gt;measurements&lt;/code&gt;. Here we'll send continuous measurements for our imaginary set of devices.&lt;/p&gt;

&lt;p&gt;To imitate a continuous flow of new data we'll use a short bash script. In this script we create a JSON object with three properties: the timestamp of the event, the id of the device and a value. Then we send this object into the topic &lt;code&gt;measurements&lt;/code&gt; using &lt;code&gt;kcat&lt;/code&gt;. To understand how to set up &lt;code&gt;kcat&lt;/code&gt;, &lt;a href="https://developer.aiven.io/docs/products/kafka/howto/kcat.html?utm_source=devto&amp;amp;utm_medium=organic&amp;amp;utm_campaign=blog_art"&gt;check this article&lt;/a&gt;.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="c"&gt;#!/bin/bash&lt;/span&gt;

&lt;span class="k"&gt;while&lt;/span&gt; :
&lt;span class="k"&gt;do
    &lt;/span&gt;&lt;span class="nv"&gt;stamp&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="si"&gt;$(&lt;/span&gt;&lt;span class="nb"&gt;date&lt;/span&gt; +%s&lt;span class="si"&gt;)&lt;/span&gt;
    &lt;span class="nb"&gt;id&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="k"&gt;$((&lt;/span&gt;RANDOM%100&lt;span class="k"&gt;))&lt;/span&gt;
    &lt;span class="nv"&gt;val&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="k"&gt;$((&lt;/span&gt;RANDOM%1000&lt;span class="k"&gt;))&lt;/span&gt;
    &lt;span class="nb"&gt;echo&lt;/span&gt; &lt;span class="s2"&gt;"{&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;timestamp&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;:&lt;/span&gt;&lt;span class="nv"&gt;$stamp&lt;/span&gt;&lt;span class="s2"&gt;,&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;device_id&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;:&lt;/span&gt;&lt;span class="nv"&gt;$id&lt;/span&gt;&lt;span class="s2"&gt;,&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;value&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;:&lt;/span&gt;&lt;span class="nv"&gt;$val&lt;/span&gt;&lt;span class="s2"&gt;}"&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
        | kcat &lt;span class="nt"&gt;-F&lt;/span&gt; kcat.config &lt;span class="nt"&gt;-P&lt;/span&gt; &lt;span class="nt"&gt;-t&lt;/span&gt; measurements
&lt;span class="k"&gt;done&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Start the script and leave it running, it'll be continuously creating and sending messages to the topic.&lt;/p&gt;

&lt;p&gt;Our work on the Apache Kafka side is done. Now let's move to ClickHouse.&lt;/p&gt;

&lt;h3&gt;
  
  
  Connect Aiven for ClickHouse to Apache Kafka
&lt;/h3&gt;

&lt;p&gt;You can actually integrate your Aiven for ClickHouse service with any Apache Kafka service, but for us, having two services within the same Aiven project makes the integration straightforward.&lt;/p&gt;

&lt;p&gt;To integrate Aiven for ClickHouse with Apache Kafka we need to do two steps:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Establish a connection.&lt;/li&gt;
&lt;li&gt;Specify the structure and origin of the integrated data. &lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;We'll do these steps with help from the &lt;a href="https://developer.aiven.io/docs/tools/cli.html?utm_source=devto&amp;amp;utm_medium=organic&amp;amp;utm_campaign=blog_art"&gt;Aiven CLI&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;First, establish the connection by creating an integration of type &lt;code&gt;clickhouse_kafka&lt;/code&gt; and specifying the name of your services, Apache Kafka as source and ClickHouse as destination:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;avn service integration-create            \
    --integration-type clickhouse_kafka   \
    --source-service apache-kafka-service \
    --dest-service clickhouse-service
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Running this command won't return you anything (unless there is a problem). But if you now check the list of available databases in your Aiven for ClickHouse service (with the help of &lt;a href="https://console.aiven.io/?utm_source=devto&amp;amp;utm_medium=organic&amp;amp;utm_campaign=blog_art"&gt;Aiven's console&lt;/a&gt;, for example), you'll notice a new one - &lt;code&gt;service_apache-kafka-service&lt;/code&gt;. The name of the created database is the combination of  &lt;code&gt;service_&lt;/code&gt; and your Apache Kafka service name.&lt;/p&gt;

&lt;p&gt;The database is still empty, because we didn't specify yet what kind of data we want to bring from our Apache Kafka service. We can define the datasource in a JSON payload, but first we need to find the id of our integration. You can get it by running this command:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;avn service integration-list clickhouse-service | grep apache-kafka-service
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--PesUJLXS--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/gapj73kf0t8usvwhxfwh.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--PesUJLXS--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/gapj73kf0t8usvwhxfwh.png" alt="Image description" width="880" height="67"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;In my case, the integration id was &lt;code&gt;88546a37-5a8a-4c0c-8bd7-80960e3adab0&lt;/code&gt;. Yours will be a different &lt;a href="https://en.wikipedia.org/wiki/Universally_unique_identifier"&gt;UUID&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;Knowing the integration id we can set the proper configuration for our connection where we specify that:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;we want to bring data from the topic named &lt;code&gt;measurements&lt;/code&gt;
&lt;/li&gt;
&lt;li&gt;we expect the data to be in JSON format (in particular, &lt;code&gt;JSONEachRow&lt;/code&gt;)&lt;/li&gt;
&lt;li&gt;the data will be transformed into a table with three columns: &lt;code&gt;timestamp&lt;/code&gt;, &lt;code&gt;device_id&lt;/code&gt; and &lt;code&gt;value&lt;/code&gt;:
&lt;/li&gt;
&lt;/ul&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;    avn service integration-update 88546a37-5a8a-4c0c-8bd7-80960e3adab0 &lt;span class="se"&gt;\&lt;/span&gt;
      &lt;span class="nt"&gt;--user-config-json&lt;/span&gt; &lt;span class="s1"&gt;'{
          "tables": [
              {
                  "name": "measurements_from_kafka",
                  "columns": [
                      {"name": "timestamp", "type": "DateTime"},
                      {"name": "device_id", "type": "Int8"},
                      {"name": "value", "type": "Int16"}
                  ],
                  "topics": [{"name": "measurements"}],
                  "data_format": "JSONEachRow",
                  "group_name": "measurements_from_kafka_consumer"
              }
          ]
      }'&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;ClickHouse will track what messages from the topic are consumed using the consumer group that you specify in the &lt;code&gt;group_name&lt;/code&gt; field, no extra effort needed on your side. By default, you'll read each entry once. If you want to get your data twice, you can create a copy of the table with another group name.&lt;/p&gt;

&lt;h3&gt;
  
  
  Consume Kafka messages on the fly from Clickhouse
&lt;/h3&gt;

&lt;p&gt;The setup we did is already sufficient to start reading data from the Apache Kafka topic from within ClickHouse.&lt;br&gt;
The most convenient way to run ClickHouse SQL commands is by using &lt;strong&gt;clickhouse-client&lt;/strong&gt;. If you're unsure how to run it, check &lt;a href="https://developer.aiven.io/docs/products/clickhouse/howto/use-cli.html"&gt;this article explaining how to use cli&lt;/a&gt;. &lt;/p&gt;

&lt;p&gt;I, for example, used docker and ran the client by using the command below. Just replace USERNAME, PASSWORD, HOST and PORT with your values.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;docker run &lt;span class="nt"&gt;-it&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;--rm&lt;/span&gt; clickhouse/clickhouse-client &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;--user&lt;/span&gt; USERNAME &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;--password&lt;/span&gt; PASSWORD &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;--host&lt;/span&gt; HOST &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;--port&lt;/span&gt; PORT &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;--secure&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Once in the client, you can check the list of databases&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;SHOW&lt;/span&gt; &lt;span class="n"&gt;DATABASES&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--hE_4N8-6--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/d4gqt7z33ghydv0v8vth.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--hE_4N8-6--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/d4gqt7z33ghydv0v8vth.png" alt=" raw `show databases` endraw  terminal output" width="858" height="502"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;You'll see the one we created by establishing the integration &lt;code&gt;service_apache-kafka-service&lt;/code&gt; (maybe you named it differently!).&lt;/p&gt;

&lt;p&gt;If you get the list of tables from this database, you'll see the name of the table that you specified in the integration settings.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;SHOW&lt;/span&gt; &lt;span class="n"&gt;TABLES&lt;/span&gt; &lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="nv"&gt;`service_apache-kafka-service`&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--sVeJ8JJP--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/ai6hb7ngjfxw6srnz6w7.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--sVeJ8JJP--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/ai6hb7ngjfxw6srnz6w7.png" alt="show tables terminal output" width="866" height="269"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;You can also double-check its structure with&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;DESCRIBE&lt;/span&gt; &lt;span class="nv"&gt;`service_apache-kafka-service`&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;measurements_from_kafka&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--1_VzW_VC--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/czk2xl64g982xnorekvo.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--1_VzW_VC--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/czk2xl64g982xnorekvo.png" alt="describe tables terminal output" width="880" height="198"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Now, you might want to read from this table directly, and it will work. However, remember that we can consume messages only once! So once you read the items, they will be gone. Still, nothing stops you from running the following commands:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;SELECT&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt; &lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="nv"&gt;`service_apache-kafka-service`&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;measurements_from_kafka&lt;/span&gt; &lt;span class="k"&gt;LIMIT&lt;/span&gt; &lt;span class="mi"&gt;100&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;





&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;SELECT&lt;/span&gt; &lt;span class="k"&gt;count&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="nv"&gt;`service_apache-kafka-service`&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;measurements_from_kafka&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--baymjKkq--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/ytaji9nlv87hd28e27uc.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--baymjKkq--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/ytaji9nlv87hd28e27uc.png" alt="selecting items directly from the connecting table terminal output" width="880" height="252"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;However, this is not the most convenient way of consuming the data from Apache Kafka, and apart from debugging won't be used much. Most probably you want to copy and keep the data items in ClickHouse for later. And this is exactly what we'll do in the next section.&lt;/p&gt;

&lt;h3&gt;
  
  
  Persist Kafka messages in Clickhouse table
&lt;/h3&gt;

&lt;p&gt;To store the data coming from Apache Kafka to ClickHouse we need two pieces:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;A destination table, where all data will be stored permanently.&lt;/li&gt;
&lt;li&gt;A materialised view, that will be like a bridge between our connector table (&lt;code&gt;measurements_from_kafka&lt;/code&gt;) and our destination table.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;You can create them with these two queries:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;CREATE TABLE device_measurements (timestamp DateTime, device_id Int8, value Int16)
ENGINE = ReplicatedMergeTree()
ORDER BY timestamp;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;





&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;CREATE MATERIALIZED VIEW materialised_view TO device_measurements AS
SELECT * FROM `service_apache-kafka-service`.measurements_from_kafka;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;When we create a materialised view, a trigger is actually added behind the scenes. This trigger will react to any new data items added to our table &lt;code&gt;measurements_from_kafka&lt;/code&gt;. Once triggered, the data will go through the materialised view (where you also can transform it if you want) into the table &lt;code&gt;device_measurements&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;You can check that the data is flowing by running:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;SELECT&lt;/span&gt; &lt;span class="k"&gt;COUNT&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;from&lt;/span&gt; &lt;span class="n"&gt;device_measurements&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;We can run a query to count all readings from the devices and see which devices have higher values on average. Here we use a nice and simple visualisation mechanism with the &lt;code&gt;bar&lt;/code&gt; function.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;SELECT&lt;/span&gt;
    &lt;span class="n"&gt;device_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="k"&gt;count&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;readings_number&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;bar&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;avg&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;1000&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;100&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;average_measurement_value&lt;/span&gt;
&lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="n"&gt;device_measurements&lt;/span&gt;
&lt;span class="k"&gt;GROUP&lt;/span&gt; &lt;span class="k"&gt;BY&lt;/span&gt; &lt;span class="n"&gt;device_id&lt;/span&gt;
&lt;span class="k"&gt;ORDER&lt;/span&gt; &lt;span class="k"&gt;BY&lt;/span&gt; &lt;span class="n"&gt;device_id&lt;/span&gt; &lt;span class="k"&gt;ASC&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--UfsFU6M4--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/9okugfayuu505sm7ufys.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--UfsFU6M4--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/9okugfayuu505sm7ufys.png" alt="visualising data with bar terminal output" width="880" height="395"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h3&gt;
  
  
  Conclusion
&lt;/h3&gt;

&lt;p&gt;Now you are equipped with the skill to bring data into Aiven for ClickHouse and use materialised views to store the data. Here are some more materials you might be interested to read:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;a href="https://aiven.io/blog/what-is-clickhouse?utm_source=devto&amp;amp;utm_medium=organic&amp;amp;utm_campaign=blog_art"&gt;What is ClickHouse and how it achieves high performance&lt;/a&gt; &lt;/li&gt;
&lt;li&gt;&lt;a href="https://developer.aiven.io/docs/products/clickhouse/getting-started.html?utm_source=devto&amp;amp;utm_medium=organic&amp;amp;utm_campaign=blog_art"&gt;Getting started with Aiven for ClickHouse&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;Documentation on how to &lt;a href="https://developer.aiven.io/docs/products/clickhouse/howto/integrate-kafka.html?utm_source=devto&amp;amp;utm_medium=organic&amp;amp;utm_campaign=blog_art"&gt;integrate Aiven for ClickHouse with Apache Kafka&lt;/a&gt; (with some extra details that are omitted here)&lt;/li&gt;
&lt;li&gt;Information on &lt;a href="https://developer.aiven.io/docs/products/clickhouse/howto/list-integrations.html?utm_source=devto&amp;amp;utm_medium=organic&amp;amp;utm_campaign=blog_art"&gt;other integrations available with Aiven for ClickHouse&lt;/a&gt;
&lt;/li&gt;
&lt;/ul&gt;

</description>
      <category>clickhouse</category>
      <category>apachekafka</category>
      <category>data</category>
      <category>streams</category>
    </item>
    <item>
      <title>Introduction to ClickHouse</title>
      <dc:creator>Olena Kutsenko</dc:creator>
      <pubDate>Tue, 08 Nov 2022 11:36:21 +0000</pubDate>
      <link>https://forem.com/olena_kutsenko/introduction-to-clickhouse-8em</link>
      <guid>https://forem.com/olena_kutsenko/introduction-to-clickhouse-8em</guid>
      <description>&lt;h2&gt;
  
  
  What is ClickHouse?
&lt;/h2&gt;

&lt;p&gt;ClickHouse is a highly scalable open source database management system (DBMS) that uses a column-oriented structure. It's designed for online analytical processing (OLAP) and is highly performant. ClickHouse can return processed results in real time in a fraction of a second. This makes it ideal for applications working with massive structured data sets: data analytics, complex data reports, data science computations...&lt;/p&gt;

&lt;p&gt;ClickHouse is most praised for its exceptionally high performance. That performance comes from a sum of many factors:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Column-oriented data storage&lt;/li&gt;
&lt;li&gt;Data compression&lt;/li&gt;
&lt;li&gt;The vector computation engine&lt;/li&gt;
&lt;li&gt;Approximated calculations&lt;/li&gt;
&lt;li&gt;The use of physical sparse indices&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;But performance isn't the only benefit of ClickHouse. ClickHouse is more than a database, it's a sophisticated database management system that supports distributed query processing, partitioning, data replication and sharding. It's a highly scalable and reliable system capable of handling terabytes of data.&lt;/p&gt;

&lt;p&gt;In fact, ClickHouse is designed to write huge amounts of data and simultaneously process a large number of reading requests. And you can conveniently use a declarative SQL-like query language.&lt;/p&gt;

&lt;h2&gt;
  
  
  Main features of ClickHouse
&lt;/h2&gt;

&lt;p&gt;ClickHouse has a booming development community and continues to be actively developed and improved. You can look at &lt;a href="https://clickhouse.com/docs/en/whats-new/changelog/"&gt;the changelog&lt;/a&gt; and &lt;a href="https://clickhouse.com/docs/en/whats-new/roadmap/"&gt;the road map&lt;/a&gt; to see the latest features and future plans. Even with fast growth of the system, every new feature is evaluated performance-wise to make sure it doesn't affect the speed of the system.  And many of existing biggest features of ClickHouse are particularly aimed at enhancing its performance and efficiency.&lt;/p&gt;

&lt;h3&gt;
  
  
  Column-oriented DBMS
&lt;/h3&gt;

&lt;p&gt;As a truly columnar database, ClickHouse stores the values of the same column physically next to each other with no extra data attached to each value. This matters when even an insignificant amount of extra data (such as length of a string, for example) attached to hundreds of millions of items in the column, substantially affects the speed of compression, decompression and reads.&lt;/p&gt;

&lt;h3&gt;
  
  
  Data compression
&lt;/h3&gt;

&lt;p&gt;To achieve desired performance ClickHouse uses data compression. This includes general-purpose compression, as well as a number of specialised codecs targeting different types of data stored in separate columns.&lt;/p&gt;

&lt;h3&gt;
  
  
  Query processing across multiple servers
&lt;/h3&gt;

&lt;p&gt;ClickHouse supports distributed query processing with data stored across different shards. Large queries are parallelized across multiple cores and use resources they need.&lt;/p&gt;

&lt;h3&gt;
  
  
  SQL query syntax
&lt;/h3&gt;

&lt;p&gt;ClickHouse supports SQL syntax similar to ANSI SQL. However, it is not identical, so a migration from another SQL-compatible system might require translations.&lt;/p&gt;

&lt;h3&gt;
  
  
  Vector computation engine
&lt;/h3&gt;

&lt;p&gt;During data processing, ClickHouse works with chunks of columns (so-called vectors) and operations are performed on the arrays of items, rather than on individual values.&lt;/p&gt;

&lt;h3&gt;
  
  
  No database locks
&lt;/h3&gt;

&lt;p&gt;ClickHouse updates tables continually without relying on locks when adding new data.&lt;/p&gt;

&lt;h3&gt;
  
  
  Primary and data skipping indices
&lt;/h3&gt;

&lt;p&gt;Clickhouse keeps data physically sorted by primary key. Secondary indices (also called "data skipping indices") indicate in advance which data won't match filtering criteria and should be skipped (therefore, the name).&lt;/p&gt;

&lt;h3&gt;
  
  
  Approximated calculations
&lt;/h3&gt;

&lt;p&gt;To gain farther performance boost, for complex queries you can perform calculations on the sample of data finding a trade-off between accuracy and performance. This is relevant, for example, for complex data science calculations.&lt;/p&gt;

&lt;p&gt;While ClickHouse can be an excellent choice for many scenarios, it's important to keep in mind its architectural characteristics. Because ClickHouse is pretty unique, it's easy to make mistakes that lead to sub-optimal performance. That's why it is important to understand what stands behind this DBMS and how it functions.&lt;/p&gt;

&lt;p&gt;Let's start by looking at its most distinguishable feature - column-oriented structure of the storage.&lt;/p&gt;

&lt;h2&gt;
  
  
  Why a column-oriented database management system?
&lt;/h2&gt;

&lt;p&gt;To understand better where the need for the column-oriented approach is coming from and why ClickHouse uses it, let's take a closer look at two different types of systems: Online Transaction Processing (OLTP) and Online Analytical Processing (OLAP). In particular, pay attention to granularity with which they manipulate the data and to the types of operations that are prevalent in these systems.&lt;/p&gt;

&lt;h3&gt;
  
  
  OLTP: Online Transaction Processing
&lt;/h3&gt;

&lt;p&gt;OLTP applications perform small but very frequent operations to insert, update and select a modest numbers of rows. In this type of applications we traditionally use row-oriented approach as the most effective way to work with entire individual rows.&lt;/p&gt;

&lt;h3&gt;
  
  
  OLAP: Online Analytical Processing
&lt;/h3&gt;

&lt;p&gt;OLAP systems are a completely different thing - operations do not target single lines - instead, we work with hundreds of thousands and even millions of records at a time, relying on grouping and aggregation mechanisms. Data in OLAP systems is represented by events and rarely needs to be updated. And, what is important, usually only a fraction of fields is necessary to be retrieved and processed at a time. This makes it very inefficient to read complete rows, like row-oriented systems do.&lt;/p&gt;

&lt;p&gt;The bottom line is, in OLTP applications records are being stored for an easy update of individual rows, while in OLAP systems, data is stored primarily for fast read and analysis of massive chunks of data.&lt;/p&gt;

&lt;p&gt;Therefore, the row-oriented DBMS could not effectively manage analytical processing of data volumes typical to OLAP applications.&lt;/p&gt;

&lt;h3&gt;
  
  
  OLAP and column-oriented systems
&lt;/h3&gt;

&lt;p&gt;Column-oriented systems were designed to solve OLAP challenges. In truly columnar databases, the data is physically grouped and stored by columns. This minimizes disk access and improves performance, because processing a specific query only requires reading a fraction of the data. Since each column contains data of the same type, it can use effective compression mechanisms.&lt;/p&gt;

&lt;p&gt;Additionally, the columnar approach allows adding or removing new columns with no performance overhead, since it means simply creating or deleting files. In contrast, adding a new column in a row-oriented database would require updating the data in every row.&lt;/p&gt;

&lt;p&gt;Understanding the difference between OLAP and OLTP systems, and the distinction between row and columnar approaches is key when making a decision weather to use ClickHouse or not. In the next section we'll look into how this relates to specific system requirements, and what you should pay attention to when making a decision to adopt ClickHouse.&lt;/p&gt;

&lt;h2&gt;
  
  
  When to use ClickHouse
&lt;/h2&gt;

&lt;p&gt;If used correctly and in suitable scenarios, ClickHouse is a powerful, scalable and fast solution that outperforms its competitors. ClickHouse is made for OLAP applications, and includes a number of optimizations to read data and process complex requests at high speeds.&lt;/p&gt;

&lt;p&gt;You'll get the most out of ClickHouse if&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;you work with enormous volumes of data (measured in terabytes) continuously written and read;&lt;/li&gt;
&lt;li&gt;you have tables with the massive number of columns (ClickHouse loves large numbers of columns!), but column values are reasonably short;&lt;/li&gt;
&lt;li&gt;your data is well-structured and not yet aggregated;&lt;/li&gt;
&lt;li&gt;you insert data in large batches over thousands of lines, a million is a good number;&lt;/li&gt;
&lt;li&gt;the vast majority of operations are reads with aggregations;&lt;/li&gt;
&lt;li&gt;for reads, you process large number of rows, but fairly low number of columns;&lt;/li&gt;
&lt;li&gt;you don't need to modify data later;&lt;/li&gt;
&lt;li&gt;you don't need to retrieve specific rows;&lt;/li&gt;
&lt;li&gt;you don't need transactions.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;For example, Yandex uses over 500 servers with 25 million records coming each day. Another company that uses ClickHouse, Bloomberg, has over a hundred of servers and accepts approximately a trillion new records each day (as of data from 2018).&lt;/p&gt;

&lt;h2&gt;
  
  
  When not to use ClickHouse
&lt;/h2&gt;

&lt;p&gt;ClickHouse is designed to be fast. However, the optimisations that make ClickHouse the perfect solution for OLAP applications make it suboptimal for other types of projects.&lt;/p&gt;

&lt;p&gt;Do not use ClickHouse for OLTP. ClickHouse expects data to remain immutable. Even though it is technically possible to remove big chunks of data from the ClickHouse database, it is not fast. ClickHouse simply isn't designed for data modifications. It's also inefficient at finding and retrieving single rows by keys, due to sparse indexing. Lastly, ClickHouse does not fully support ACID transactions.&lt;/p&gt;

&lt;p&gt;ClickHouse is not a key-value DBMS. It is also not designed to be a file storage.&lt;/p&gt;

&lt;p&gt;It's not a document-oriented database, either. ClickHouse uses a pre-defined schema that needs to be specified during table creation. The better the schema, the more effective and performant are the queries.&lt;/p&gt;

&lt;h2&gt;
  
  
  How to get started
&lt;/h2&gt;

&lt;p&gt;I hope that this got you intrigued about ClickHouse and its superpowers. And maybe you wonder how you can start using it on your own. ClickHouse is an open source project and you can follow its documentation to build it yourself.&lt;/p&gt;

&lt;p&gt;However, we know that setting up and maintaining ClickHouse cluster can be quite a challenge. Ensuring proper replication of data, fault-tolerance, stability takes plenty of time and energy. That's why Aiven has decided to offer Aiven for ClickHouse, which will provide you with benefits of ClickHouse without the headache overload.&lt;/p&gt;

&lt;p&gt;With Aiven for ClickHouse, you can focus on the product you are building, and we'll keep the underlying infrastructure running so smoothly that you can totally forget about it.&lt;/p&gt;

&lt;p&gt;So, how can you create Aiven for ClickHouse? Select Aiven for ClickHouse in the Aiven Console when creating a new service. Read detailed instructions &lt;a href="https://docs.aiven.io/docs/products/clickhouse/getting-started.html?utm_source=devto&amp;amp;utm_medium=organic&amp;amp;utm_campaign=blog_art"&gt;over here&lt;/a&gt;. Once the server is up and running (which happens in just couple of minutes), &lt;a href="https://docs.aiven.io/docs/products/clickhouse/sample-dataset.html?utm_source=devto&amp;amp;utm_medium=organic&amp;amp;utm_campaign=blog_art"&gt;add some test data&lt;/a&gt; and see how you can work &lt;a href="https://docs.aiven.io/docs/products/clickhouse/howto/add-service-users.html?utm_source=devto&amp;amp;utm_medium=organic&amp;amp;utm_campaign=blog_art"&gt;with users&lt;/a&gt;, &lt;a href="https://docs.aiven.io/docs/products/clickhouse/concepts/databases-and-tables.html?utm_source=devto&amp;amp;utm_medium=organic&amp;amp;utm_campaign=blog_art"&gt;tables and databases&lt;/a&gt;. To dive deeper and understand how indexing and data processing works in Clickhouse, &lt;a href="https://docs.aiven.io/docs/products/clickhouse/concepts/indexing.html?utm_source=devto&amp;amp;utm_medium=organic&amp;amp;utm_campaign=blog_art"&gt;check this article&lt;/a&gt;.&lt;/p&gt;

</description>
      <category>clickhouse</category>
      <category>datawarehouse</category>
      <category>columnar</category>
      <category>database</category>
    </item>
    <item>
      <title>Ways to balance your data across Apache Kafka partitions</title>
      <dc:creator>Olena Kutsenko</dc:creator>
      <pubDate>Tue, 08 Nov 2022 10:57:32 +0000</pubDate>
      <link>https://forem.com/olena_kutsenko/ways-to-balance-your-data-across-apache-kafka-partitions-5f13</link>
      <guid>https://forem.com/olena_kutsenko/ways-to-balance-your-data-across-apache-kafka-partitions-5f13</guid>
      <description>&lt;p&gt;Apache Kafka® is a distributed system. At its heart is a &lt;strong&gt;set of brokers&lt;/strong&gt; that stores records persistently inside &lt;strong&gt;topics&lt;/strong&gt;. Topics, in turn, are split into &lt;strong&gt;partitions&lt;/strong&gt;. Dividing topics into such pieces allows storing and reading data in parallel. In this way producers and consumers can work with data simultaneously, achieving higher throughput and scalability.&lt;/p&gt;

&lt;p&gt;This makes partitions crucial for a performant cluster. Reading data from distributed locations comes with two big challenges:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;Message order:&lt;/strong&gt; Distributed systems split load-intensive tasks into multiple pieces that can be independently processed in parallel. In this way, we get results faster compared to the linear model. Unlike the linear approach, however, distributed systems &lt;strong&gt;by design do not guarantee the order of processed data&lt;/strong&gt;. That’s why for such systems to work successfully, we need to make sure that the data is properly divided into independent chunks and that we understand the effect of this division on the data ordering.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Uneven record distribution:&lt;/strong&gt; Dividing data across partitions means there's a risk that partition records are distributed unevenly. To prevent this, our system needs to partition records intelligently, so that the data is proportionately balanced across available servers and across their local filesystems. &lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Below we look deeper into these challenges and mechanisms to balance load over partitions to make the best use of the cluster.&lt;/p&gt;

&lt;h2&gt;
  
  
  Challenge of message order
&lt;/h2&gt;

&lt;p&gt;To understand what is happening with record ordering, take a look at the example visualized below. There you can see the data flow for a topic that is divided into three partitions. Messages are pushed by a producer and later retrieved by a consuming application one by one. &lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--1iyCFgNE--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/zzq03wid21363dno60i4.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--1iyCFgNE--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/zzq03wid21363dno60i4.png" alt="New records A-I being assigned to partitions 1-3 randomly. Partitions get ADG, BEH and CFI. The consumer receives IFCHEGBDA. The order within partitions is preserved." width="800" height="248"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;When consuming data from distributed partitions, we cannot guarantee the order in which consumers go through the list of partitions. That's why the sequence of the messages read by a consumer ends up different from the original order sent by the producer.&lt;/p&gt;

&lt;p&gt;Reshuffling records can be totally fine for some scenarios, but for other cases you might want to read the messages in the same order as they were pushed by the producer.&lt;/p&gt;

&lt;p&gt;The solution to this challenge is to rely on the order of the records within a single partition, where the data is guaranteed to maintain the original sequence.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--YJSobWOd--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/vp8kbw83n7e58huas2sp.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--YJSobWOd--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/vp8kbw83n7e58huas2sp.png" alt="Sequence order is guaranteed per partition (ADG, BEH and CFI), but not across partitions (the consumer gets ADBFEHCFI)" width="800" height="426"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;And that's why, when building the product architecture, we should carefully weigh up the partitioning logic and mechanisms used to ensure that the sequence of the messages remains correct when consumers read the data.&lt;/p&gt;

&lt;h3&gt;
  
  
  Ways to partition data based on different scenarios
&lt;/h3&gt;

&lt;p&gt;The way messages are divided across partitions is always defined in the logic of the client, meaning that it is not the &lt;strong&gt;topic&lt;/strong&gt; which specifies this logic, but the &lt;strong&gt;producers&lt;/strong&gt;, who push the data into the cluster. In fact, if needed, different producers can have separate partitioning approaches.&lt;/p&gt;

&lt;p&gt;There are a variety of tools you can use to distribute data across partitions. To understand these alternatives we'll look at several scenarios.&lt;/p&gt;

&lt;h3&gt;
  
  
  Scenario # 1: the order of messages is not important
&lt;/h3&gt;

&lt;p&gt;It's possible that, in your system, it is not necessary to preserve the order of messages. Lucky you! You can rely on the default partitioning mechanism provided by Apache Kafka and no additional logic is needed for the producers.&lt;/p&gt;

&lt;p&gt;As an example of this scenario, imagine a service to send SMS messages. Your organization uses SMS to notify customers, and the messages are divided across multiple partitions so that they can be consumed by different processing applications in parallel. We want to distribute the work and process the messages as fast as possible. However, the &lt;strong&gt;order&lt;/strong&gt; in which the SMS messages reach the recipients is not important. &lt;/p&gt;

&lt;p&gt;In such cases, Apache Kafka uses a &lt;strong&gt;sticky partitioning&lt;/strong&gt; approach (introduced as a default partitioner from version 2.4.0). This default method batches records together before they're sent to the cluster. After the batch is full or the "linger time" &lt;code&gt;linger.ms&lt;/code&gt; is reached, a batch is sent and a new one is created for a different partition. This approach helps decrease latency when producing messages.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--b7rdjXAL--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/lhwjl8ssdb0lepi662y6.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--b7rdjXAL--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/lhwjl8ssdb0lepi662y6.png" alt="New records A, B, C, D and are sent to partition 1 as a batch" width="800" height="397"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Here's a code snippet written in Java which sends a single message into a randomly assigned partition. This is a default behavior and doesn't need any additional logic from your side.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;
   &lt;span class="c1"&gt;// add necessary properties to connect &lt;/span&gt;
   &lt;span class="c1"&gt;// to the cluster and set up security protocols&lt;/span&gt;
   &lt;span class="nc"&gt;Properties&lt;/span&gt; &lt;span class="n"&gt;properties&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;Properties&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt; 

   &lt;span class="c1"&gt;// create a producer&lt;/span&gt;
   &lt;span class="nc"&gt;KafkaProducer&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;producer&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt;
          &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;KafkaProducer&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;(&lt;/span&gt;&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
   &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;topicName&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"topic-name"&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

   &lt;span class="c1"&gt;// generate new message&lt;/span&gt;
   &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;message&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"A message"&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

   &lt;span class="c1"&gt;// create a producer record&lt;/span&gt;
   &lt;span class="nc"&gt;ProducerRecord&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt;
           &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;ProducerRecord&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&amp;gt;(&lt;/span&gt;&lt;span class="n"&gt;topicName&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

   &lt;span class="c1"&gt;// send data&lt;/span&gt;
   &lt;span class="n"&gt;producer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;send&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
   &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;info&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Sent: "&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Scenario # 2: the order is important for groups of records defined with a key
&lt;/h3&gt;

&lt;p&gt;Even though some scenarios do not require maintaining message sequence, the majority of cases do. Imagine, for example, that you run an online shop where customers trigger different events through your applications, and information about their activity is stored in a topic in an Apache Kafka cluster. In this scenario, the order of events for every single customer is important, while the order of events across the customers is irrelevant.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--LRFxWVKJ--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/qyd1039p82bzbw68khu0.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--LRFxWVKJ--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/qyd1039p82bzbw68khu0.png" alt="The order of actions taken by individual users are preserved, but don't need to stay adjacent." width="800" height="242"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;That's why our goal is to preserve the correct sequence of the messages related to every individual customer. We can achieve this if we store the records for every individual customer consistently in a dedicated partition.&lt;/p&gt;

&lt;p&gt;The default partitioner can already do it for you, if you define a proper key for each of the messages.&lt;/p&gt;

&lt;p&gt;Every record body in an Apache Kafka topic consists of two parts - the &lt;strong&gt;value&lt;/strong&gt; of the record and an &lt;strong&gt;optional key&lt;/strong&gt;. The key plays a dramatic role in how messages are distributed across the partitions - all messages with the same key are added to the same partition.&lt;/p&gt;

&lt;p&gt;For our example, the most obvious choice for a key is the id of a customer, which we can use to partition the data. This is visualized below where, for simplicity, we assume that we have three customers (&lt;code&gt;John&lt;/code&gt;,&lt;code&gt;Claire&lt;/code&gt; and &lt;code&gt;Burt&lt;/code&gt;) and three partitions.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--0TWXd5Nw--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/djdhxt3j7trtqprxl04q.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--0TWXd5Nw--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/djdhxt3j7trtqprxl04q.png" alt="A key can be used to decide which partition an event goes to. Here, the user is the key. Partition 1 gets events for John, partition 2 those for Claire, and partition 3 those for Burt." width="800" height="357"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Once the data with the key &lt;code&gt;John&lt;/code&gt; is stored in a partition, Apache Kafka remembers to send all future messages with the identical key into the same partition.&lt;/p&gt;

&lt;p&gt;This visualization includes just three customers, one for each partition. In real life you might need to store data for multiple customers (or devices, or vehicles, etc.) in a single partition.&lt;/p&gt;

&lt;p&gt;The code snippet below shows how to use a key when creating a record:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;   &lt;span class="c1"&gt;// create a producer record&lt;/span&gt;
   &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;key&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;get&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"customerId"&lt;/span&gt;&lt;span class="o"&gt;).&lt;/span&gt;&lt;span class="na"&gt;toString&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
   &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;value&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;toString&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
   &lt;span class="nc"&gt;ProducerRecord&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;ProducerRecord&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&amp;gt;(&lt;/span&gt;&lt;span class="n"&gt;topicName&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;What's important to note is that Apache Kafka doesn't use a string representation of the key. Instead it converts the key into a &lt;strong&gt;hash value&lt;/strong&gt;, which means that there is a probability of a hash collision, when two different keys create the same hash resulting in data assigned to the same partition. Is this something you need to avoid? Scroll down to read about the custom partitioner!&lt;/p&gt;

&lt;h3&gt;
  
  
  Scenario # 3: partition numbers are known in advance
&lt;/h3&gt;

&lt;p&gt;Sometimes you want to control which message goes to which partition. For example, maybe the target partition depends on the day of the week when the data is generated. Assuming your system has seven partitions:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;   &lt;span class="c1"&gt;// create a producer record&lt;/span&gt;
   &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;key&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;get&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"customer"&lt;/span&gt;&lt;span class="o"&gt;).&lt;/span&gt;&lt;span class="na"&gt;toString&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
   &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;value&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;toString&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
   &lt;span class="nc"&gt;LocalDate&lt;/span&gt; &lt;span class="n"&gt;today&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;LocalDate&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;now&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
   &lt;span class="nc"&gt;Integer&lt;/span&gt; &lt;span class="n"&gt;partitionNumber&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;today&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getDayOfWeek&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;getValue&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
   &lt;span class="nc"&gt;ProducerRecord&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;ProducerRecord&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&amp;gt;(&lt;/span&gt;&lt;span class="n"&gt;topicName&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;partitionNumber&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Scenario # 4: achieve maximum flexibility
&lt;/h3&gt;

&lt;p&gt;The tools we've looked at above will help in many use cases. In some situations, however, you might need higher flexibility and might want to customize the logic of partitioning even farther. For this, Apache Kafka provides a mechanism to plug in a &lt;strong&gt;custom partitioner&lt;/strong&gt;, that divides the records across partitions based on the content of a message or some other conditions.&lt;/p&gt;

&lt;p&gt;You can use this approach if you want to group the data within a partition according to a custom logic. For example, if you know that some sources of data bring more records than others, you can group them so that no single partition is significantly bigger or smaller than others. Alternatively, you might want to use this approach if you want to base partitioning on a group of fields, but prefer to keep the key untouched.&lt;/p&gt;

&lt;p&gt;In a custom partitioner you have access to both key and value of the record before deciding into which partition you want to put the message. To create a custom partitioner you'll need to implement a partitioner class and define the logic of its methods. Here is an example of a custom partitioner written in Java:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;customPartitioner&lt;/span&gt; &lt;span class="kd"&gt;implements&lt;/span&gt; &lt;span class="nc"&gt;Partitioner&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;

    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;configure&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Map&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="o"&gt;?&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;configs&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;

    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;int&lt;/span&gt; &lt;span class="nf"&gt;partition&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;topic&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;Object&lt;/span&gt; &lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="kt"&gt;byte&lt;/span&gt;&lt;span class="o"&gt;[]&lt;/span&gt; &lt;span class="n"&gt;keyBytes&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;Object&lt;/span&gt; &lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="kt"&gt;byte&lt;/span&gt;&lt;span class="o"&gt;[]&lt;/span&gt; &lt;span class="n"&gt;valueBytes&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;Cluster&lt;/span&gt; &lt;span class="n"&gt;cluster&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="c1"&gt;// get the list of available partitions&lt;/span&gt;
        &lt;span class="nc"&gt;List&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;PartitionInfo&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;partitions&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;cluster&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;partitionsForTopic&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;topic&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
        &lt;span class="kt"&gt;int&lt;/span&gt; &lt;span class="n"&gt;numPartitions&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;partitions&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;size&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;

        &lt;span class="kt"&gt;int&lt;/span&gt; &lt;span class="n"&gt;partition&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="o"&gt;...;&lt;/span&gt;

        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;partition&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;

    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;close&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Once you've defined the custom partitioner, reference it in your producer:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;    &lt;span class="nc"&gt;Properties&lt;/span&gt; &lt;span class="n"&gt;properties&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;Properties&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
    &lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"partitioner.class"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"customPartitioner"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

    &lt;span class="nc"&gt;KafkaProducer&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;producer&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;KafkaProducer&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&amp;gt;(&lt;/span&gt;&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Now every arriving record is analyzed by a custom partitioner before it is put into a designated partition.&lt;/p&gt;

&lt;h3&gt;
  
  
  Scenario # 5: round robin and uniform sticky partitioning
&lt;/h3&gt;

&lt;p&gt;There are two more built-in partitioners that you can consider. The first is &lt;code&gt;RoundRobinPartitioner&lt;/code&gt; which acts according to its name - iterating over all partitions and distributing items one by one ignoring any provided key values. Round robin, unfortunately, is known to cause uneven distribution of records across partitions. Furthermore, it is less performant compared to the default sticky mechanism, where records are combined into batches to speed up producing time. &lt;/p&gt;

&lt;p&gt;Another built-in partitioner is &lt;code&gt;UniformStickyPartitioner&lt;/code&gt;, which acts similarly to &lt;code&gt;DefaultPartitioner&lt;/code&gt; but ignores the key value. &lt;/p&gt;

&lt;h2&gt;
  
  
  Challenge of uneven record distribution
&lt;/h2&gt;

&lt;p&gt;When defining partitioning logic, carefully evaluate how your partitions will be growing over time. You need to understand if there is a risk that a selected mechanism will result in uneven message distribution. &lt;/p&gt;

&lt;p&gt;There are a variety of scenarios when uneven distribution can happen.&lt;/p&gt;

&lt;p&gt;For example, when the default partitioner sends a huge batch of data to a single partition. When using the default partitioner, consider the proper settings for "linger time" and a maximum size of the batch that fits your particular scenarios. For example, if your product is frequently used during the day, but almost no records come in at night, it is common to set "linger time" low and batch size high. However, with these settings there is a probability that if you have an unexpected surge of data, this influx of records is added to a single batch and sent to a single partition, leading to uneven message distribution.&lt;/p&gt;

&lt;p&gt;Another case of uneven message distribution can happen when you distribute records by keys, but the amount of data related to some keys is significantly bigger than for others. For instance, imagine that you run an image gallery service and divide data across partitions by user id. If some of your users use the service significantly more frequently, they produce significantly more records, increasing the size of some partitions.&lt;/p&gt;

&lt;p&gt;Similar to the scenario above, if you rely on days and times to distribute the data, some dates - such as Black Friday or Christmastime - can generate considerably more records.&lt;/p&gt;

&lt;p&gt;Additionally, uneven distribution can happen when you move data from other data sources with the help of Kafka Connect. Make sure that the data is not heavily written to a single partition, but distributed evenly.&lt;/p&gt;

&lt;p&gt;Overall, uneven message distribution is a complex problem that's easier to prevent than to solve later. Rebalancing messages across partitions is a challenging task because in many scenarios partitions preserve the necessary order of the messages, and rebalancing can destroy the correct sequence.&lt;/p&gt;

&lt;h2&gt;
  
  
  Conclusions
&lt;/h2&gt;

&lt;p&gt;Apache Kafka provides a set of tools to distribute records across multiple partitions. However, the responsibility for a durable architecture, and selection of the strategy to distribute the messages, lies on the shoulders of the engineers building the system.  &lt;/p&gt;

&lt;p&gt;If you'd like to learn more about Apache Kafka, check out these articles:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;a href="https://aiven.io/blog/what-is-apache-kafka?utm_source=devto&amp;amp;utm_medium=organic&amp;amp;utm_campaign=blog_art"&gt;What is Apache Kafka®?&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://aiven.io/blog/db-technology-migration-with-apache-kafka-and-kafka-connect?utm_source=devto&amp;amp;utm_medium=organic&amp;amp;utm_campaign=blog_art"&gt;Database migration with Apache Kafka® and Apache Kafka® Connect&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://aiven.io/blog/using-kafka-connect-jdbc-source-a-postgresql-example?utm_source=devto&amp;amp;utm_medium=organic&amp;amp;utm_campaign=blog_art"&gt;Using Kafka Connect JDBC Source: a PostgreSQL® example&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://aiven.io/blog/manage-apache-kafka-connect-connectors-with-kcctl?utm_source=devto&amp;amp;utm_medium=organic&amp;amp;utm_campaign=blog_art"&gt;Manage Apache Kafka® Connect connectors with kcctl&lt;/a&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Or poke around &lt;a href="https://developer.aiven.io/docs/products/kafka?utm_source=devto&amp;amp;utm_medium=organic&amp;amp;utm_campaign=blog_art"&gt;our Apache Kafka® documentation&lt;/a&gt; and try out &lt;a href="https://console.aiven.io/signup?utm_source=devto&amp;amp;utm_medium=organic&amp;amp;utm_campaign=blog_art"&gt;Aiven for Apache Kafka&lt;/a&gt; &lt;/p&gt;

</description>
      <category>apachekafka</category>
      <category>partitioning</category>
      <category>data</category>
    </item>
    <item>
      <title>Apache Kafka Simply Explained</title>
      <dc:creator>Olena Kutsenko</dc:creator>
      <pubDate>Wed, 06 Jul 2022 12:22:37 +0000</pubDate>
      <link>https://forem.com/olena_kutsenko/apache-kafka-simply-explained-18d4</link>
      <guid>https://forem.com/olena_kutsenko/apache-kafka-simply-explained-18d4</guid>
      <description>&lt;p&gt;Apache Kafka is a de facto standard for data streaming. There is a lot of interest and usage of Apache Kafka, but the learning curve can be steep. So here is my attempt to explain it using simple and friendly vocabulary:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://aiven.io/blog/kafka-simply-explained"&gt;https://aiven.io/blog/kafka-simply-explained&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;The article covers the following topics:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;What is Apache Kafka?&lt;/li&gt;
&lt;li&gt;Where Apache Kafka is used&lt;/li&gt;
&lt;li&gt;Apache Kafka’s way of thinking&lt;/li&gt;
&lt;li&gt;How Apache Kafka coordinates events&lt;/li&gt;
&lt;li&gt;Topics and messages&lt;/li&gt;
&lt;li&gt;Brokers and partitions&lt;/li&gt;
&lt;li&gt;Apache Kafka connectors&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Hoping that you’ll find it useful!&lt;/p&gt;

</description>
      <category>apachekafka</category>
      <category>data</category>
      <category>streaming</category>
      <category>beginners</category>
    </item>
  </channel>
</rss>
