DEV Community

milktea02
milktea02

Posted on • Edited on

Consuming and deserializing avro messages with Apache kafka-console-consumer and AWS Glue as the Schema Registry

I had to do this recently and found little to no documentation on what was required to consume avro logs from kafka and deserialize them with the Glue schema registry so I'll document my findings here. For reference, Confluent has one that just works out of the box.

Requirements/Pre-requisites/Assumptions/?:

  • Apache Kafka
    • I think I was using 3.3.1 with Scala 2.13
  • Kafka cluster with topics that have avro messages
    • We are using an AWS MSK cluster with kafka v2.6.x
    • This also assumes the messages are being serialized using the following headers
  • AWS Glue Registry with avro schema you're using
    • You can find documentation on how to set this up with some schemas
  • jars from https://repo1.maven.org/maven2/org/apache/
  • this also assumes you have your iam role permissions/policies configured

Caveat: This is what worked with our setup and the jars we needed. I don't have any examples or screenshots.

Download required jars into <location of kafka>/libs/:

List of jars:

Group Id Artifact Id Version
software.amazon.glue schema-registry-common 1.1.14
software.amazon.glue schema-registry-serde 1.1.14
software.amazon.awssdk glue 2.17.12
software.amazon.awssdk arns 2.17.12
software.amazon.awssdk url-connection-client 2.17.12
org.apache.avro avro 1.11.0
com.google.guava guava 30.0-jre
com.google.guava failureaccess 1.0.1

Configuring kafka-console-consumer.sh:

You'll need to supply some properties to the glue deserializer; I've used a wrapper script kafka-glue-avro-console-consumer.sh:

#!/bin/sh

kafka-console-consumer.sh --value-deserializer com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryKafkaDeserializer \
        --property value.deserializer.region=<hardcoded-aws-region> \
        --property value.deserializer.dataFormat=AVRO \
        --property value.deserializer.avroRecordType=GENERIC_RECORD "$@"
Enter fullscreen mode Exit fullscreen mode

You'll need to supply other properties such as --bootstrap-server <list of servers:port>, --topic <topic-name>, --consumer.config <config-file>.

Example:

$ ./kafka-glue-avro-console-consumer.sh --bootstrap-server broker-01:9098 --topic my-topic --consumer.config my-config --max-message 1 --from-beginning
Enter fullscreen mode Exit fullscreen mode

If you want to specify the glue registry and schema you can do that with:

--property value.deserializer.registry.name=<registryName>
--property value.deserializer.schemaName=<schemaName>
Enter fullscreen mode Exit fullscreen mode

You can check the source code for additional properties you can use: https://github.com/awslabs/aws-glue-schema-registry/blob/v1.1.14/common/src/main/java/com/amazonaws/services/schemaregistry/common/configs/GlueSchemaRegistryConfiguration.java

Here is a snippet of ansible code to grab all the jars:

- name: install artifacts from maven to consume AVRO records
  maven_artifact:
    group_id: "{{ item.group_id }}"
    artifact_id: "{{ item.artifact_id }}"
    version: "{{ item.version }}"
    dest: /home/ubuntu/kafka_{{ kafka_client_version }}/libs/{{ item.artifact_id }}-{{ item.version }}.jar
    group: ubuntu
    owner: ubuntu
    mode: 0664
  loop:
    - { group_id: 'software.amazon.glue', artifact_id: 'schema-registry-common', version: 1.1.14 }
    - { group_id: 'software.amazon.glue', artifact_id: 'schema-registry-serde', version: 1.1.14 }
    - { group_id: 'software.amazon.awssdk', artifact_id: 'glue', version: 2.17.122 }
    - { group_id: 'software.amazon.awssdk', artifact_id: 'arns', version: 2.17.122 }
    - { group_id: 'software.amazon.awssdk', artifact_id: 'url-connection-client', version: 2.17.122 }
    - { group_id: 'org.apache.avro', artifact_id: 'avro', version: 1.11.0 }
    - { group_id: 'com.google.guava', artifact_id: 'guava', version: 30.0-jre }
    - { group_id: 'com.google.guava', artifact_id: 'failureaccess', version: 1.0.1 }
Enter fullscreen mode Exit fullscreen mode

(we are testing some things which is why everything is in the user's home directory!)

AWS Q Developer image

Build your favorite retro game with Amazon Q Developer CLI in the Challenge & win a T-shirt!

Feeling nostalgic? Build Games Challenge is your chance to recreate your favorite retro arcade style game using Amazon Q Developer’s agentic coding experience in the command line interface, Q Developer CLI.

Participate Now

Top comments (1)

Collapse
 
aitor_mars_efd5412c813b15 profile image
Aitor Mars

Hi, I got below error. I tried to add software.amazon.awssdk.utils package to libs, but it didn't solve the issue.
Exception in thread "main" java.lang.NoClassDefFoundError: software/amazon/awssdk/utils/internal/EnumUtils
at software.amazon.awssdk.services.glue.model.Compatibility.(Compatibility.java:42)
at com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants.(AWSSchemaRegistryConstants.java:114)
at com.amazonaws.services.schemaregistry.common.configs.GlueSchemaRegistryConfiguration.validateAndSetCompatibility(GlueSchemaRegistryConfiguration.java:168)
at com.amazonaws.services.schemaregistry.common.configs.GlueSchemaRegistryConfiguration.buildSchemaRegistryConfigs(GlueSchemaRegistryConfiguration.java:93)
at com.amazonaws.services.schemaregistry.common.configs.GlueSchemaRegistryConfiguration.buildConfigs(GlueSchemaRegistryConfiguration.java:82)
at com.amazonaws.services.schemaregistry.common.configs.GlueSchemaRegistryConfiguration.(GlueSchemaRegistryConfiguration.java:74)
at com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryKafkaDeserializer.configure(GlueSchemaRegistryKafkaDeserializer.java:89)
at kafka.tools.DefaultMessageFormatter.getDeserializerProperty(ConsoleConsumer.scala:586)
at kafka.tools.DefaultMessageFormatter.$anonfun$configure$22(ConsoleConsumer.scala:492)
at kafka.tools.DefaultMessageFormatter.configure(ConsoleConsumer.scala:592)
at kafka.tools.ConsoleConsumer$ConsumerConfig.(ConsoleConsumer.scala:317)
at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:51)
at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
Caused by: java.lang.ClassNotFoundException: software.amazon.awssdk.utils.internal.EnumUtils
at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 13 more

👋 Kindness is contagious

Explore this practical breakdown on DEV’s open platform, where developers from every background come together to push boundaries. No matter your experience, your viewpoint enriches the conversation.

Dropping a simple “thank you” or question in the comments goes a long way in supporting authors—your feedback helps ideas evolve.

At DEV, shared discovery drives progress and builds lasting bonds. If this post resonated, a quick nod of appreciation can make all the difference.

Okay