<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>Forem: Tehila F.</title>
    <description>The latest articles on Forem by Tehila F. (@__d3ec2d533ada03).</description>
    <link>https://forem.com/__d3ec2d533ada03</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F3655442%2F9c8c1f73-d3d8-43f0-9193-211210ab4028.jpg</url>
      <title>Forem: Tehila F.</title>
      <link>https://forem.com/__d3ec2d533ada03</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://forem.com/feed/__d3ec2d533ada03"/>
    <language>en</language>
    <item>
      <title>Stop Writing a Kafka Consumer for Every Topic – Make It Generic in .NET</title>
      <dc:creator>Tehila F.</dc:creator>
      <pubDate>Wed, 10 Dec 2025 12:41:12 +0000</pubDate>
      <link>https://forem.com/__d3ec2d533ada03/stop-writing-a-kafka-consumer-for-every-topic-make-it-generic-in-net-3i26</link>
      <guid>https://forem.com/__d3ec2d533ada03/stop-writing-a-kafka-consumer-for-every-topic-make-it-generic-in-net-3i26</guid>
      <description>&lt;p&gt;How I Built a Generic Kafka Consumer in .NET with Strategy Pattern&lt;/p&gt;

&lt;p&gt;Ever found yourself writing a separate Kafka consumer for every single topic? Yeah… me too. 😅&lt;/p&gt;

&lt;p&gt;When I started working on our project, we had this pattern: &lt;strong&gt;one consumer per topic&lt;/strong&gt;. Sounds simple? Well, it quickly became a nightmare:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Code duplication everywhere&lt;/li&gt;
&lt;li&gt;Hard to maintain&lt;/li&gt;
&lt;li&gt;Adding a new topic meant touching multiple files&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;So I decided to tackle this problem and make &lt;strong&gt;one consumer to rule them all&lt;/strong&gt; – generic, modular, and scalable. Here’s how I did it using &lt;strong&gt;Strategy Pattern&lt;/strong&gt; in .NET.&lt;/p&gt;

&lt;p&gt; &lt;/p&gt;

&lt;p&gt;&lt;strong&gt;The Problem&lt;/strong&gt;&lt;br&gt;
Imagine having a consumer that only handles "createEntityType" topic:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;consumer.Subscribe(new[] { "createEntityType" });
var createEntityRequest = JsonSerializer.Deserialize&amp;lt;CreateEntityTypeRequest&amp;gt;(value);
var schemaId = await entityService.createNewEntityType(createEntityRequest);
await dynamicService.CreateDynamicTable(createEntityRequest);
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This works… until you add the next topic. Or the one after that. Suddenly, every new topic is &lt;strong&gt;more boilerplate, more headaches&lt;/strong&gt;.&lt;/p&gt;

&lt;p&gt; &lt;/p&gt;

&lt;p&gt;&lt;strong&gt;The Solution: Generic Kafka Consumer&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Instead of writing a consumer per topic, I refactored the code so that:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;The consumer &lt;strong&gt;discovers handlers dynamically&lt;/strong&gt; using KafkaTopicAttribute&lt;/li&gt;
&lt;li&gt;Each handler implements the ICustomKafkaHandler interface&lt;/li&gt;
&lt;li&gt;Adding a new topic = add a handler class + attribute → nothing else changes&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt; &lt;/p&gt;

&lt;p&gt;&lt;strong&gt;ConsumerWorker Example&lt;/strong&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
    var topics = _topicHandlers.Keys.ToArray();
    using var consumer = new ConsumerBuilder&amp;lt;string, string&amp;gt;(consumerConfig).Build();
    consumer.Subscribe(topics);

    while (!stoppingToken.IsCancellationRequested)
    {
        var cr = consumer.Consume(stoppingToken);
        using var scope = _scopeFactory.CreateScope();
        var provider = scope.ServiceProvider;

        if (_topicHandlers.TryGetValue(cr.Topic, out var handlerType))
        {
            var handler = (ICustomKafkaHandler)provider.GetRequiredService(handlerType);
            var response = await handler.HandleAsync(cr.Message.Value, cr.Message.Headers, provider);
        }
    }
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt; &lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Handler Example: AddPropertyHandler&lt;/strong&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;[KafkaTopic("addProperty")]
public class AddPropertyHandler : ICustomKafkaHandler
{
    public async Task&amp;lt;object&amp;gt; HandleAsync(string message, Headers headers, IServiceProvider provider)
    {
        var request = JsonSerializer.Deserialize&amp;lt;AddFieldRequest&amp;gt;(message);
        var service = provider.GetRequiredService&amp;lt;AddFieldService&amp;gt;();
        return await service.AddFieldToSchema(request);
    }
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt; &lt;/p&gt;

&lt;p&gt;&lt;strong&gt;KafkaTopicAttribute &amp;amp; Interface&lt;/strong&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;[AttributeUsage(AttributeTargets.Class)]
public class KafkaTopicAttribute : Attribute
{
    public string Topic { get; }
    public KafkaTopicAttribute(string topic) =&amp;gt; Topic = topic;
}

public interface ICustomKafkaHandler
{
    Task&amp;lt;object&amp;gt; HandleAsync(string message, Headers headers, IServiceProvider provider);
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt; &lt;/p&gt;

&lt;p&gt;&lt;strong&gt;How It Works&lt;/strong&gt;&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;ConsumerWorker &lt;strong&gt;builds a map&lt;/strong&gt;: Topic → Handler using KafkaTopicAttribute.&lt;/li&gt;
&lt;li&gt;Each handler implements ICustomKafkaHandler and handles messages for its topic.&lt;/li&gt;
&lt;li&gt;Dependency Injection (DI) injects required services directly into the handler.&lt;/li&gt;
&lt;li&gt;Want to add a new topic? Just add a new handler class with the attribute. &lt;strong&gt;No changes to ConsumerWorker&lt;/strong&gt;.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt; &lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Why This Approach Rocks&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;✅ &lt;strong&gt;Maintainable&lt;/strong&gt;– Add new topics without touching the core consumer&lt;br&gt;
✅ &lt;strong&gt;Modular&lt;/strong&gt;– Each handler is responsible only for its topic&lt;br&gt;
✅ &lt;strong&gt;Scalable&lt;/strong&gt;– Works with dozens of topics with minimal effort&lt;br&gt;
✅ &lt;strong&gt;Error handling&lt;/strong&gt; – Each handler manages its own errors&lt;br&gt;
✅ &lt;strong&gt;DI-friendly&lt;/strong&gt; – Services are injected automatically&lt;/p&gt;

&lt;p&gt; &lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Lessons Learned&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Strategy Pattern + DI is a perfect combo for generic Kafka consumers&lt;/li&gt;
&lt;li&gt;Reflection + attributes = dynamic discovery without hardcoding&lt;/li&gt;
&lt;li&gt;Logging and consistent error handling per handler is crucial&lt;/li&gt;
&lt;li&gt;Separation of Consumer + Handlers = &lt;strong&gt;modularity &amp;amp; extensibility&lt;/strong&gt;
&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt; &lt;/p&gt;

&lt;p&gt;&lt;strong&gt;TL;DR&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Generic Kafka consumers in .NET save you from boilerplate, make your code modular, and make it &lt;strong&gt;easy to extend and maintain&lt;/strong&gt;.&lt;br&gt;
 &lt;/p&gt;

&lt;p&gt;💡 Tip: This approach works best when you have multiple topics with similar patterns and want to centralize error handling, logging, and service injection.&lt;/p&gt;

</description>
      <category>kafka</category>
      <category>dotnet</category>
      <category>csharp</category>
      <category>programming</category>
    </item>
  </channel>
</rss>
