<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>Forem: Scott Wang</title>
    <description>The latest articles on Forem by Scott Wang (@cosmostail).</description>
    <link>https://forem.com/cosmostail</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F522403%2Fb37e78ae-4985-4d12-a2bf-6494ac97510e.jpeg</url>
      <title>Forem: Scott Wang</title>
      <link>https://forem.com/cosmostail</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://forem.com/feed/cosmostail"/>
    <language>en</language>
    <item>
      <title>MySQL 8 Kafka Connect Tutorial on Docker</title>
      <dc:creator>Scott Wang</dc:creator>
      <pubDate>Sat, 28 Nov 2020 17:58:27 +0000</pubDate>
      <link>https://forem.com/cosmostail/mysql-8-kafka-connect-tutorial-on-docker-479p</link>
      <guid>https://forem.com/cosmostail/mysql-8-kafka-connect-tutorial-on-docker-479p</guid>
      <description>&lt;p&gt;In this tutorial, we will use docker-compose, MySQL 8 as examples to demonstrate Kafka Connector by using MySQL as the data source.&lt;/p&gt;

&lt;p&gt;This tutorial is mainly based on the tutorial written on &lt;a href="https://docs.confluent.io/5.0.0/installation/docker/docs/installation/connect-avro-jdbc.html" rel="noopener noreferrer"&gt;Kafka Connect Tutorial on Docker&lt;/a&gt;. However, the original tutorial is out-dated that it just won’t work if you followed it step by step. This is a refreshment of that tutorial, also to simplify things, we will get rid of the Avro set up as it serves no purpose to demonstrate Kafka Connector. Enough said, let’s begins.&lt;/p&gt;

&lt;h3&gt;
  
  
  Preparation
&lt;/h3&gt;

&lt;p&gt;First, you will need to download MYSQL Connector Driver, this can be found &lt;br&gt;
&lt;a href="http://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-8.0.22.tar.gz" rel="noopener noreferrer"&gt;MySQL Connector Driver&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;You will also need to download the JDBC plugins at &lt;br&gt;
&lt;a href="https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc" rel="noopener noreferrer"&gt;Confluent JDBC plugins&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Unzip both mysql-connector-java-8.0.22.tar.gz and confluentinc-kafka-connect-jdbc-10.0–2.1.zip. Create a jars directory, move mysql-connector-java-8.0.22.jar and all the .jar files in onfluentinc-kafka-connect-jdbc-10.0–2.1/lib/ directory to the jars directory.&lt;/p&gt;
&lt;h3&gt;
  
  
  docker-compose file
&lt;/h3&gt;

&lt;p&gt;Here is the docker-compose file that contains everything you need to run this tutorial&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;version: '2'
services:
  mysql:
    privileged: true
    ports:
      - 3306:3306
    environment:
      MYSQL_ROOT_PASSWORD: test
    image: mysql:8.0

  zookeeper:
    image: confluentinc/cp-zookeeper:5.0.0
    privileged: true
    ports:
      - 32181:32181
    environment:
      ZOOKEEPER_CLIENT_PORT: 32181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:5.0.0
    ports:
      - 29092:29092
    links:
      - zookeeper
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:32181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  kafka-connector-mysql:
    image: confluentinc/cp-kafka-connect:latest
    ports:
      - 28083:28083
    links:
      - kafka
      - zookeeper
      - mysql
    environment:
      CONNECT_BOOTSTRAP_SERVERS: kafka:29092
      CONNECT_REST_PORT: 28083
      CONNECT_GROUP_ID: "quickstart-avro"
      CONNECT_CONFIG_STORAGE_TOPIC: "quickstart-avro-config"
      CONNECT_OFFSET_STORAGE_TOPIC: "quickstart-avro-offsets"
      CONNECT_STATUS_STORAGE_TOPIC: "quickstart-avro-status"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_REST_ADVERTISED_HOST_NAME: "localhost"
      CONNECT_LOG4J_ROOT_LOGLEVEL: DEBUG
      CONNECT_PLUGIN_PATH: "/usr/share/java,/etc/kafka-connect/jars"
    volumes:
      - $PWD/jars:/etc/kafka-connect/jars
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Before you run the docker-compose, make sure your file structure looks like&lt;br&gt;
&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fi%2F1gpm9m4kdbvaspn92gdt.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fi%2F1gpm9m4kdbvaspn92gdt.png" alt="Alt Text"&gt;&lt;/a&gt;&lt;/p&gt;
&lt;h3&gt;
  
  
  Let the fun begins
&lt;/h3&gt;

&lt;p&gt;Let the fun begins with starting mysql service&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;docker-compose up -d mysql
docker exec -it kafka-connector_mysql_1 bash
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Execute the following queries by using MySQL cli &lt;code&gt;mysql -uroot -ptest&lt;/code&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;CREATE DATABASE IF NOT EXISTS connect_test;
USE connect_test;

CREATE TABLE IF NOT EXISTS test (
  id serial NOT NULL PRIMARY KEY,
  name varchar(100),
  email varchar(200),
  department varchar(200),
  modified timestamp default CURRENT_TIMESTAMP NOT NULL,
  INDEX `modified_index` (`modified`)
);

INSERT INTO test (name, email, department) VALUES ('alice', 'alice@abc.com', 'engineering');
INSERT INTO test (name, email, department) VALUES ('bob', 'bob@abc.com', 'sales');
INSERT INTO test (name, email, department) VALUES ('bob', 'bob@abc.com', 'sales');
INSERT INTO test (name, email, department) VALUES ('bob', 'bob@abc.com', 'sales');
INSERT INTO test (name, email, department) VALUES ('bob', 'bob@abc.com', 'sales');
INSERT INTO test (name, email, department) VALUES ('bob', 'bob@abc.com', 'sales');
INSERT INTO test (name, email, department) VALUES ('bob', 'bob@abc.com', 'sales');
INSERT INTO test (name, email, department) VALUES ('bob', 'bob@abc.com', 'sales');
INSERT INTO test (name, email, department) VALUES ('bob', 'bob@abc.com', 'sales');
INSERT INTO test (name, email, department) VALUES ('bob', 'bob@abc.com', 'sales');
exit;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Let’s start zookeeper and kafka&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;docker-compose up -d zookeeper kafka 
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;wait for few seconds, to make sure that all services are up and running.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;docker ps
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Once the zookeeper, Kafka and mysql are all up and running, let’s prepare our final course, &lt;em&gt;confluent kafka-connect&lt;/em&gt;&lt;br&gt;
First, let’s create the topics that will be used by the connector&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;docker-compose run --rm kafka kafka-topics --create --topic quickstart-avro-offsets --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:32181 --config cleanup.policy=compact

docker-compose run --rm kafka kafka-topics --create --topic quickstart-avro-config --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:32181 --config cleanup.policy=compact

docker-compose run --rm kafka kafka-topics --create --topic quickstart-avro-status --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:32181 --config cleanup.policy=compact
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Finally, let’s start the &lt;em&gt;kafka-connect&lt;/em&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;docker-compose up -d kafka-connector-mysql
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Give it few seconds, and let’s create the JDBC Source connector by making a REST API call to the kafka connector service.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;curl -X POST \
  -H "Content-Type: application/json" \
  --data '{ "name": "quickstart-jdbc-source", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "tasks.max": 1, "connection.url": "jdbc:mysql://mysql:3306/connect_test", "connection.user": "root", "connection.password": "test", "mode": "incrementing", "incrementing.column.name": "id", "timestamp.column.name": "modified", "topic.prefix": "quickstart-jdbc-", "poll.interval.ms": 1000 } }' \
  http://localhost:28083/connectors
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;wait for few seconds, and check the status to make sure it is &lt;em&gt;RUNNING&lt;/em&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;curl -s -X GET http://localhost:28083/connectors/quickstart-jdbc-source/status
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;You should see something similar to&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;{"name":"quickstart-jdbc-source","connector":{"state":"RUNNING","worker_id":"localhost:28083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"localhost:28083"}],"type":"source"}%
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;That’s yet, let’s verify our work by running a Kafka consumer,&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;docker-compose run --rm kafka kafka-console-consumer --bootstrap-server kafka:29092  --topic quickstart-jdbc-test --from-beginning
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;and you should see the following logs show up&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;{"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"email"},{"type":"string","optional":true,"field":"department"},{"type":"int64","optional":false,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"modified"}],"optional":false,"name":"test"},"payload":{"id":18,"name":"bob","email":"bob@abc.com","department":"sales","modified":1606418867000}}
{"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"email"},{"type":"string","optional":true,"field":"department"},{"type":"int64","optional":false,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"modified"}],"optional":false,"name":"test"},"payload":{"id":19,"name":"bob","email":"bob@abc.com","department":"sales","modified":1606418867000}}
{"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"email"},{"type":"string","optional":true,"field":"department"},{"type":"int64","optional":false,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"modified"}],"optional":false,"name":"test"},"payload":{"id":20,"name":"bob","email":"bob@abc.com","department":"sales","modified":1606418867000}}
^CProcessed a total of 10 messages
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;That’s yet. To tear it all down, simply run&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;docker-compose down
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Enjoy.&lt;/p&gt;

</description>
      <category>kafka</category>
      <category>mysql</category>
      <category>kafkaconnect</category>
      <category>docker</category>
    </item>
  </channel>
</rss>
