<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>Forem: Christoffer Persson</title>
    <description>The latest articles on Forem by Christoffer Persson (@pwgn).</description>
    <link>https://forem.com/pwgn</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F421798%2F8bba841b-0910-4669-84f6-f75668c20fad.jpeg</url>
      <title>Forem: Christoffer Persson</title>
      <link>https://forem.com/pwgn</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://forem.com/feed/pwgn"/>
    <language>en</language>
    <item>
      <title>One stop shop: Kubernetes + Kafka + Flink</title>
      <dc:creator>Christoffer Persson</dc:creator>
      <pubDate>Fri, 03 Jul 2020 07:40:41 +0000</pubDate>
      <link>https://forem.com/pwgn/one-stop-shop-kubernetes-kafka-flink-5h0e</link>
      <guid>https://forem.com/pwgn/one-stop-shop-kubernetes-kafka-flink-5h0e</guid>
      <description>&lt;p&gt;This is a hands-on tutorial on how to set up Apache Flink with Apache Kafka connector in Kubernetes. The goal with this tutorial is to push an event to Kafka, process it in Flink, and push the processed event back to Kafka on a separate topic. This guide will not dig deep into any of the tools as there exists a lot of great resources about those topics. Focus here is just to get it up and running!&lt;/p&gt;

&lt;p&gt;You can follow along by cloning this &lt;a href="https://github.com/pwgn/k8s-kafka-flink"&gt;git repo&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;This is what we are going to do:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Deploy Kafka and Flink to Kubernetes&lt;/li&gt;
&lt;li&gt;Deploy job to Flink&lt;/li&gt;
&lt;li&gt;Generate some data&lt;/li&gt;
&lt;/ol&gt;

&lt;h2&gt;
  
  
  MicroK8s
&lt;/h2&gt;

&lt;p&gt;In these examples &lt;a href="https://microk8s.io/"&gt;MicroK8s&lt;/a&gt; have been used. Follow their &lt;a href="https://microk8s.io/docs"&gt;doc&lt;/a&gt; to set it up.&lt;/p&gt;

&lt;p&gt;Do not forget to enable some required extensions:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;microk8s enable dns storage
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;When Kubernetes is setup locally you are good to go!&lt;/p&gt;

&lt;h2&gt;
  
  
  Setup Apache Kafka
&lt;/h2&gt;

&lt;p&gt;To run Kafka on Kubernetes &lt;a href="https://strimzi.io"&gt;Strimzi&lt;/a&gt; is used in this setup. Strimzi simplifies the overall management of the kafka cluster. Strimzi provides some operators to manage Kafka and related components. For the purpose of this guide, the details are not too relevant, but if you are interested you can read more about Strimzi here:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;a href="https://strimzi.io/docs/operators/latest/overview.html"&gt;Overview&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://strimzi.io/docs/operators/latest/quickstart.html"&gt;Quick start guide&lt;/a&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;h3&gt;
  
  
  Deploy Kafka to Kubernetes
&lt;/h3&gt;

&lt;p&gt;Deployment is done in two steps:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Install Strimzi&lt;/li&gt;
&lt;li&gt;Provision the Kafka cluster&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;First, move into the &lt;code&gt;k8s&lt;/code&gt; directory:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;cd k8s
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Easy!&lt;/p&gt;

&lt;h4&gt;
  
  
  Install Strimzi
&lt;/h4&gt;

&lt;p&gt;Create the Kafka namespace:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;kubectl create namespace kafka
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Create Strimzi cluster operator:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;kubectl apply -f strimzi.yml --namespace kafka
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Wait for the &lt;code&gt;strimzi-cluster-operator&lt;/code&gt; to start (&lt;code&gt;STATUS: Running&lt;/code&gt;):&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;kubectl get pods --namespace kafka -w
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Now Strimzi should be installed onto the cluster. Next we will provision the Kafka cluster.&lt;/p&gt;

&lt;h4&gt;
  
  
  Provision the Kafka cluster
&lt;/h4&gt;

&lt;p&gt;Apply the &lt;code&gt;kafka-persistent-single.yml&lt;/code&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;kubectl apply -f kafka-persistent-single.yml --namespace kafka
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Wait for everything to startup, it might take a few minutes:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;kubectl get pods --namespace kafka -w 
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;h4&gt;
  
  
  Verify the Kafka setup
&lt;/h4&gt;

&lt;p&gt;For this particular experiment, I wanted to explore how to connect to the Kafka cluster from the outside. To do this a &lt;code&gt;NodePort&lt;/code&gt; was set up in the &lt;code&gt;kafka-persistent-single.yml&lt;/code&gt;. Strimzi has a good blog post about &lt;a href="https://strimzi.io/blog/2019/04/17/accessing-kafka-part-1/"&gt;Accessing Kafka&lt;/a&gt; if you are interested.&lt;/p&gt;

&lt;p&gt;First, get your Kubernetes node &lt;code&gt;Name&lt;/code&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;kubectl get nodes
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Next, get your node &lt;code&gt;InternalIP&lt;/code&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;# Replace &amp;lt;NodeName&amp;gt; with your node name
kubectl get node &amp;lt;NodeName&amp;gt; -o=jsonpath='{range .status.addresses[*]}{.type}{"\t"}{.address}{"\n"}'
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Fetch the port of your Kafka external bootstrap service:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;kubectl get service my-cluster-kafka-external-bootstrap -o=jsonpath='{.spec.ports[0].nodePort}{"\n"}'\n -n kafka
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;By now you should have:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Your Kubernetes node IP address&lt;/li&gt;
&lt;li&gt;The port of the Kafka bootstrap service&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;If you don't already have the Kafka CLI available you have to &lt;a href="https://kafka.apache.org/quickstart#quickstart_download"&gt;download it&lt;/a&gt;, it is sufficient to follow the download step only.&lt;/p&gt;

&lt;p&gt;Finally, we can do the actual validation by producing/consuming some messages. Open two terminal windows, and browse to your Kafka installation folder.&lt;/p&gt;

&lt;p&gt;In terminal 1, we will consume messages:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;# set the &amp;lt;node-ip&amp;gt; and &amp;lt;bootstrap-port&amp;gt;
bin/kafka-console-consumer.sh --bootstrap-server &amp;lt;node-ip&amp;gt;:&amp;lt;bootstrap-port&amp;gt; --topic my-topic --from-beginning
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;In terminal 2, we produce the messages:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;# set the &amp;lt;node-ip&amp;gt; and &amp;lt;bootstrap-port&amp;gt;
bin/kafka-console-producer.sh --broker-list &amp;lt;node-ip&amp;gt;:&amp;lt;bootstrap-port&amp;gt; --topic my-topic
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Post some messages in terminal 2, and they should pop up in terminal 1. Very smooth.&lt;/p&gt;

&lt;h2&gt;
  
  
  Deploy Apache Flink to Kubernetes
&lt;/h2&gt;

&lt;p&gt;No fancy operator is used to manage Flink. Instead, we are just deploying a simple Flink yml. You can read more about Flink at the &lt;a href="https://ci.apache.org/projects/flink/flink-docs-release-1.10/"&gt;Apache Flink homepage&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;Again, browse to the &lt;code&gt;k8s&lt;/code&gt; directory of the repo.&lt;/p&gt;

&lt;p&gt;Create the Flink namespace:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;kubectl create namespace flink
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Deploy the &lt;code&gt;flink.yml&lt;/code&gt; to the Kubernetes cluster:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;kubectl apply -f flink.yml -n flink
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Wait until Flink boots properly:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;kubectl get pods --namespace flink -w
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Now Flink should be running.&lt;/p&gt;

&lt;h3&gt;
  
  
  Verify the Flink setup
&lt;/h3&gt;

&lt;p&gt;A &lt;code&gt;NodePort&lt;/code&gt; is again used to expose the Flink UI. To get the port call:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;kubectl get service flink-jobmanager-rest -o=jsonpath='{.spec.ports[0].nodePort}{"\n"}'\n -n flink
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Using this port, you should be able to reach the Flink UI. Head into your browser and put &lt;code&gt;&amp;lt;node-ip&amp;gt;:&amp;lt;flink-port&amp;gt;&lt;/code&gt; in your address field.&lt;/p&gt;

&lt;h2&gt;
  
  
  Deploy a job to Flink
&lt;/h2&gt;

&lt;p&gt;The job that will be deployed to Flink is a simple example Flink application. What it does is to add a prefix to the event that is consumed.&lt;/p&gt;

&lt;p&gt;Flink provides a &lt;a href="https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/projectsetup/scala_api_quickstart.html"&gt;templating tool&lt;/a&gt; to get started with new jobs. I had to do some minor modifications to comply with my local SBT and Scala setup. You will have to install both SBT and Scala. These are the versions that are used in this project:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;SBT version 1.3.12&lt;/li&gt;
&lt;li&gt;Scala version 2.12.11&lt;/li&gt;
&lt;li&gt;OpenJDK 13&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Head over to the &lt;code&gt;flink-job&lt;/code&gt; directory in one of your terminals.&lt;br&gt;
Then build a JAR file, simply run:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;sbt assembly
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;If you are lucky it will just work. If not, you might have to do some troubleshooting... Make sure you are using the same versions.&lt;/p&gt;

&lt;p&gt;When the assembly is complete you should have a fresh &lt;code&gt;jar&lt;/code&gt; in &lt;code&gt;target/scala-2.12/flink-job-assembly-0.1-SNAPSHOT.jar&lt;/code&gt;.&lt;/p&gt;

&lt;p&gt;The next step is to submit the job to Flink. You can either do this through the Flink UI using the "Submit New Job" menu option. But I will show how to use the Flink CLI since that is more useful in the long run.&lt;br&gt;
For this tutorial download the "Apache Flink 1.10.1 for Scala 2.12" from &lt;a href="https://flink.apache.org/downloads.html"&gt;here&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;Unzip the package:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;tar xzf flink-1.10.1-bin-scala_2.12.tgz
cd flink-1.10.1
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Get the Flink kubernetes &lt;code&gt;NodePort&lt;/code&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;kubectl get service flink-jobmanager-rest -o=jsonpath='{.spec.ports[0].nodePort}{"\n"}'\n -n flink
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Upload the flink-job jar:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;# set the &amp;lt;node-ip&amp;gt; and &amp;lt;flink-port&amp;gt;
# set &amp;lt;path-to-repo&amp;gt; to the k8s-kafka-flink repo
bin/flink run -m &amp;lt;node-ip&amp;gt;:&amp;lt;flink-port&amp;gt; \
    --class dev.chrisp.Job \ 
    &amp;lt;path-to-repo&amp;gt;/k8s-kafka-flink/flink-job/target/scala-2.12/flink-job-assembly-0.1-SNAPSHOT.jar \ 
    --input-topic input \
    --output-topic output \
    --bootstrap.servers  my-cluster-kafka-bootstrap.kafka:9092 \
    --zookeeper.connect my-cluster-zookeeper-client.kafka:2181 \
    --group.id flink
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;The arguments to the Flink job are pretty self-descriptive.&lt;/p&gt;

&lt;p&gt;Head over to the Flink UI and list "Running Jobs". You should see a task in "Running" state. If you got this far you should be ready to process data!&lt;/p&gt;

&lt;h2&gt;
  
  
  Generate some data
&lt;/h2&gt;

&lt;p&gt;The same thing as for the Kafka validation, open two terminal windows, and browse to your Kafka install directory.&lt;br&gt;
&lt;strong&gt;Note&lt;/strong&gt; that the topic names are changed. Now, &lt;code&gt;input&lt;/code&gt; is used for producing, and &lt;code&gt;output&lt;/code&gt; for consuming.&lt;/p&gt;

&lt;p&gt;In terminal 1, we will consume messages:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;# set the &amp;lt;node-ip&amp;gt; and &amp;lt;bootstrap-port&amp;gt;
bin/kafka-console-consumer.sh --bootstrap-server &amp;lt;node-ip&amp;gt;:&amp;lt;bootstrap-port&amp;gt; --topic output --from-beginning
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;In terminal 2, we produce messages:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;# set the &amp;lt;node-ip&amp;gt; and &amp;lt;bootstrap-port&amp;gt;
bin/kafka-console-producer.sh --broker-list &amp;lt;node-ip&amp;gt;:&amp;lt;bootstrap-port&amp;gt; --topic input
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;When you produce a message (just type anything into the Kafka producer prompt) you will see that event is pushed to the output topic with an additional prefix.&lt;/p&gt;

&lt;h2&gt;
  
  
  Troubleshooting
&lt;/h2&gt;

&lt;p&gt;In Kubernetes you can look at the logs for any pod:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;# get the pods name (use namespace kafka or flink)
kubectl get pods --namespace kafka

# get logs
kubectl get logs &amp;lt;pod-name&amp;gt; --namespace kafka
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Flink logs are also available through the UI. Browse to the "Task Managers" or "Job Manager" and click the "Logs" tab.&lt;/p&gt;

&lt;h2&gt;
  
  
  Done!
&lt;/h2&gt;

&lt;p&gt;Now you have a nice stream processing baseline. Now it is up to you to do something with it, you can start out by making some changes to the Flink Job. Just go nuts in &lt;a href="https://github.com/pwgn/k8s-kafka-flink/blob/master/flink-job/src/main/scala/dev/chrisp/Job.scala"&gt;flink-job/src/main/scala/dev/chrisp/Job.scala&lt;/a&gt;.&lt;/p&gt;

</description>
      <category>kubernetes</category>
      <category>kafka</category>
      <category>flink</category>
      <category>tutorial</category>
    </item>
  </channel>
</rss>
