<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>Forem: Abdelrahman Elsayed</title>
    <description>The latest articles on Forem by Abdelrahman Elsayed (@d3cypherd).</description>
    <link>https://forem.com/d3cypherd</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F3516367%2F016202fb-5d97-4a6b-aef2-d87441b6ac5a.jpeg</url>
      <title>Forem: Abdelrahman Elsayed</title>
      <link>https://forem.com/d3cypherd</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://forem.com/feed/d3cypherd"/>
    <language>en</language>
    <item>
      <title>How to Use Kubernetes Metadata for Dynamic Kafka Topics in Fluent Bit</title>
      <dc:creator>Abdelrahman Elsayed</dc:creator>
      <pubDate>Fri, 19 Sep 2025 16:06:01 +0000</pubDate>
      <link>https://forem.com/d3cypherd/how-to-use-kubernetes-metadata-for-dynamic-kafka-topics-in-fluent-bit-33fk</link>
      <guid>https://forem.com/d3cypherd/how-to-use-kubernetes-metadata-for-dynamic-kafka-topics-in-fluent-bit-33fk</guid>
      <description>&lt;p&gt;Managing logs at scale can be painful, especially when all of them end up in the same Kafka topic. What if you could route logs into &lt;strong&gt;pod-specific topics&lt;/strong&gt; (or any custom topic) directly from Fluent Bit? This makes filtering and querying much easier, especially in multi-tenant or large Kubernetes environments. In this post, we’ll explore how to set up &lt;strong&gt;dynamic Kafka topics in Fluent Bit using Kubernetes metadata&lt;/strong&gt;.&lt;/p&gt;

&lt;h2&gt;
  
  
  The problem
&lt;/h2&gt;

&lt;p&gt;Fluent Bit’s Kafka output plugin supports dynamic topic assignment using &lt;code&gt;Dynamic_topic on&lt;/code&gt; and &lt;code&gt;Topic_key&lt;/code&gt;. However, the documentation is vague about using &lt;strong&gt;nested fields&lt;/strong&gt;, which is the case for kubernetes metadata fields, as the topic key.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Example:&lt;/strong&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
   &lt;/span&gt;&lt;span class="nl"&gt;"timestamp"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s2"&gt;"2025-05-13T19:27:31.954980134Z"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
   &lt;/span&gt;&lt;span class="nl"&gt;"log"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s2"&gt;"Server is listening on port 5000"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
   &lt;/span&gt;&lt;span class="nl"&gt;"kubernetes"&lt;/span&gt;&lt;span class="p"&gt;:{&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"namespace_name"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s2"&gt;"default"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"pod_id"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s2"&gt;"de70b4a7-2803-485b-9015-3d17791969ae"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"labels"&lt;/span&gt;&lt;span class="p"&gt;:{&lt;/span&gt;&lt;span class="w"&gt;
         &lt;/span&gt;&lt;span class="nl"&gt;"app"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s2"&gt;"dy-7d8eeb3d4fb6da81d88da56280de7d42"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
         &lt;/span&gt;&lt;span class="nl"&gt;"pod-template-hash"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s2"&gt;"5744dfc46c"&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"container_name"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s2"&gt;"dy-7d8eeb3d4fb6da81d88da56280de7d42"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"annotations"&lt;/span&gt;&lt;span class="p"&gt;:{&lt;/span&gt;&lt;span class="w"&gt;
         &lt;/span&gt;&lt;span class="nl"&gt;"kubectl.kubernetes.io/restartedAt"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s2"&gt;"2025-05-13T22:27:29+03:00"&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"container_image"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s2"&gt;"docker.io/library/dy-7d8eeb3d4fb6da81d88da56280de7d42:latest"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"container_hash"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s2"&gt;"sha256:d110c044a3e9fed6c3b3c9222a1b4d5cf4bc5f128a1d5f47980abee7ff98f745"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"host"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s2"&gt;"kind-control-plane"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"docker_id"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s2"&gt;"da27432083a32dada6ccac359d72233a4e93d98d1a139e6b2d4bee7d167c49cc"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"pod_name"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s2"&gt;"dy-7d8eeb3d4fb6da81d88da56280de7d42-5744dfc46c-dgsjf"&lt;/span&gt;&lt;span class="w"&gt;
   &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;At first glance, it looks like we should be able to use &lt;code&gt;$kubernetes['pod_name']&lt;/code&gt; as Topic_key, since the docs show this syntax in &lt;a href="https://docs.fluentbit.io/manual/data-pipeline/filters/grep#nested-fields-example" rel="noopener noreferrer"&gt; filter examples &lt;/a&gt;. But in practice, this doesn’t work for outputs — only for filters.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;❌ Configuration that doesn't work:&lt;/strong&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight yaml"&gt;&lt;code&gt;    &lt;span class="pi"&gt;[&lt;/span&gt;&lt;span class="nv"&gt;OUTPUT&lt;/span&gt;&lt;span class="pi"&gt;]&lt;/span&gt;
        &lt;span class="s"&gt;Name          kafka&lt;/span&gt;
        &lt;span class="s"&gt;Match         *&lt;/span&gt;
        &lt;span class="s"&gt;Brokers       kafka:9093&lt;/span&gt;
        &lt;span class="s"&gt;Topics        kube&lt;/span&gt;
        &lt;span class="s"&gt;Topic_key     $kubernetes['pod_name']&lt;/span&gt;    &lt;span class="c1"&gt;# This syntax doesn't work&lt;/span&gt;
        &lt;span class="s"&gt;Dynamic_topic on&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  The solution
&lt;/h2&gt;

&lt;p&gt;After some trial and error, I discovered why the config in the docs doesn’t work:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Fluent Bit’s Kafka output plugin only supports top-level keys in Topic_key&lt;/strong&gt;.&lt;/p&gt;

&lt;p&gt;That means if your field is nested (like &lt;code&gt;kubernetes.pod_name&lt;/code&gt;), Fluent Bit won’t resolve it directly.&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;The trick is to &lt;strong&gt;promote the nested field to the top level&lt;/strong&gt; before it reaches the output plugin.&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;We can do this using a &lt;strong&gt;Lua filter&lt;/strong&gt;.&lt;/p&gt;

&lt;h3&gt;
  
  
  Step 1: Original log (nested field inside kubernetes)
&lt;/h3&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"timestamp"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s2"&gt;"2025-05-13T19:27:31.954980134Z"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"log"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s2"&gt;"Server is listening on port 5000"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"kubernetes"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"namespace_name"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s2"&gt;"default"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"pod_name"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s2"&gt;"dy-7d8eeb3d4fb6da81d88da56280de7d42-5744dfc46c-dgsjf"&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Step 2: Lua filter to promote nested field
&lt;/h3&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight lua"&gt;&lt;code&gt;&lt;span class="k"&gt;function&lt;/span&gt; &lt;span class="nf"&gt;promote_pod_name&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;tag&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;timestamp&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="kd"&gt;local&lt;/span&gt; &lt;span class="n"&gt;k8s&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="s2"&gt;"kubernetes"&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;k8s&lt;/span&gt; &lt;span class="ow"&gt;and&lt;/span&gt; &lt;span class="n"&gt;k8s&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="s2"&gt;"pod_name"&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="k"&gt;then&lt;/span&gt;
        &lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="s2"&gt;"topic_name"&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;k8s&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="s2"&gt;"pod_name"&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
    &lt;span class="k"&gt;else&lt;/span&gt;
        &lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="s2"&gt;"topic_name"&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s2"&gt;"default-topic"&lt;/span&gt; &lt;span class="c1"&gt;-- fallback&lt;/span&gt;
    &lt;span class="k"&gt;end&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;timestamp&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt;
&lt;span class="k"&gt;end&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Quick Note:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;1 means "keep the record"&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;timestamp&lt;/code&gt; is passed through filter unchanged&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;record&lt;/code&gt; is the updated log with &lt;em&gt;topic_name&lt;/em&gt; at top level&lt;/li&gt;
&lt;/ul&gt;

&lt;h3&gt;
  
  
  Step 3: Log after Lua filter
&lt;/h3&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"timestamp"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s2"&gt;"2025-05-13T19:27:31.954980134Z"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"log"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s2"&gt;"Server is listening on port 5000"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"topic_name"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s2"&gt;"dy-7d8eeb3d4fb6da81d88da56280de7d42-5744dfc46c-dgsjf"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"kubernetes"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="err"&gt;...&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Now Fluent Bit can safely use &lt;strong&gt;topic_name&lt;/strong&gt; as the Topic_key.&lt;/p&gt;

&lt;h3&gt;
  
  
  Step 4: Fluent Bit config
&lt;/h3&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight yaml"&gt;&lt;code&gt;&lt;span class="pi"&gt;[&lt;/span&gt;&lt;span class="nv"&gt;FILTER&lt;/span&gt;&lt;span class="pi"&gt;]&lt;/span&gt;
    &lt;span class="s"&gt;Name     lua&lt;/span&gt;
    &lt;span class="s"&gt;Match    kube.*&lt;/span&gt;
    &lt;span class="s"&gt;script   /fluent-bit/scripts/promote_pod_name.lua&lt;/span&gt;
    &lt;span class="s"&gt;call     promote_pod_name&lt;/span&gt;

&lt;span class="pi"&gt;[&lt;/span&gt;&lt;span class="nv"&gt;OUTPUT&lt;/span&gt;&lt;span class="pi"&gt;]&lt;/span&gt;
    &lt;span class="s"&gt;Name          kafka&lt;/span&gt;
    &lt;span class="s"&gt;Match         *&lt;/span&gt;
    &lt;span class="s"&gt;Brokers       kafka:9093&lt;/span&gt;
    &lt;span class="s"&gt;Topics        kube&lt;/span&gt;
    &lt;span class="s"&gt;Topic_key     topic_name&lt;/span&gt;
    &lt;span class="s"&gt;Dynamic_topic on&lt;/span&gt;
    &lt;span class="s"&gt;Format        json&lt;/span&gt;

&lt;span class="na"&gt;promote_pod_name.lua&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="pi"&gt;|&lt;/span&gt;
  &lt;span class="s"&gt;function promote_pod_name(tag, timestamp, record)&lt;/span&gt;
      &lt;span class="s"&gt;local k8s = record["kubernetes"]&lt;/span&gt;
      &lt;span class="s"&gt;if k8s and k8s["pod_name"] then&lt;/span&gt;
          &lt;span class="s"&gt;record["topic_name"] = k8s["pod_name"]&lt;/span&gt;
      &lt;span class="s"&gt;else&lt;/span&gt;
          &lt;span class="s"&gt;record["topic_name"] = "default-topic" -- fallback&lt;/span&gt;
      &lt;span class="s"&gt;end&lt;/span&gt;
      &lt;span class="s"&gt;return 1, timestamp, record&lt;/span&gt;
  &lt;span class="s"&gt;end&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Here's what a typical &lt;strong&gt;ConfigMap configuration for Fluent Bit&lt;/strong&gt; for processing logs coming from kubernetes pods would look like:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight yaml"&gt;&lt;code&gt;&lt;span class="na"&gt;apiVersion&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;v1&lt;/span&gt;
&lt;span class="na"&gt;kind&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;ConfigMap&lt;/span&gt;
&lt;span class="na"&gt;metadata&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
  &lt;span class="na"&gt;name&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;fluent-bit-config&lt;/span&gt;
  &lt;span class="na"&gt;namespace&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;logging&lt;/span&gt;
&lt;span class="na"&gt;data&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
  &lt;span class="na"&gt;fluent-bit.conf&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="pi"&gt;|&lt;/span&gt;
    &lt;span class="s"&gt;[INPUT]&lt;/span&gt;
        &lt;span class="s"&gt;Name              tail&lt;/span&gt;
        &lt;span class="s"&gt;Path              /var/log/containers/*.log&lt;/span&gt;
        &lt;span class="s"&gt;multiline.parser  docker, cri&lt;/span&gt;
        &lt;span class="s"&gt;Tag               kube.*&lt;/span&gt;
        &lt;span class="s"&gt;Mem_Buf_Limit     5MB&lt;/span&gt;
        &lt;span class="s"&gt;Skip_Long_Lines   On&lt;/span&gt;

    &lt;span class="s"&gt;[FILTER]&lt;/span&gt;
        &lt;span class="s"&gt;Name              kubernetes&lt;/span&gt;
        &lt;span class="s"&gt;Match             kube.*&lt;/span&gt;
        &lt;span class="s"&gt;Merge_Log         On&lt;/span&gt;
        &lt;span class="s"&gt;Keep_Log          Off&lt;/span&gt;
        &lt;span class="s"&gt;Kube_Tag_Prefix   kube.var.log.containers.&lt;/span&gt;
        &lt;span class="s"&gt;Kube_URL          https://kubernetes.default.svc:443&lt;/span&gt;
        &lt;span class="s"&gt;Kube_CA_File      /var/run/secrets/kubernetes.io/serviceaccount/ca.crt&lt;/span&gt;
        &lt;span class="s"&gt;Kube_Token_File   /var/run/secrets/kubernetes.io/serviceaccount/token&lt;/span&gt;
        &lt;span class="s"&gt;K8S-Logging.Exclude  On&lt;/span&gt;

    &lt;span class="s"&gt;[FILTER]&lt;/span&gt;
        &lt;span class="s"&gt;Name     lua&lt;/span&gt;
        &lt;span class="s"&gt;Match    kube.*&lt;/span&gt;
        &lt;span class="s"&gt;script   /fluent-bit/scripts/promote_pod_name.lua&lt;/span&gt;
        &lt;span class="s"&gt;call     promote_pod_name&lt;/span&gt;

    &lt;span class="s"&gt;[OUTPUT]&lt;/span&gt;
        &lt;span class="s"&gt;Name          kafka&lt;/span&gt;
        &lt;span class="s"&gt;Match         *&lt;/span&gt;
        &lt;span class="s"&gt;Brokers       kafka:9093&lt;/span&gt;
        &lt;span class="s"&gt;Topics        kube&lt;/span&gt;
        &lt;span class="s"&gt;Topic_key     topic_name&lt;/span&gt;
        &lt;span class="s"&gt;Dynamic_topic on&lt;/span&gt;
        &lt;span class="s"&gt;Format        json&lt;/span&gt;

  &lt;span class="na"&gt;promote_pod_name.lua&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="pi"&gt;|&lt;/span&gt;
    &lt;span class="s"&gt;function promote_pod_name(tag, timestamp, record)&lt;/span&gt;
        &lt;span class="s"&gt;local k8s = record["kubernetes"]&lt;/span&gt;
        &lt;span class="s"&gt;if k8s and k8s["pod_name"] then&lt;/span&gt;
            &lt;span class="s"&gt;record["topic_name"] = k8s["pod_name"]&lt;/span&gt;
        &lt;span class="s"&gt;else&lt;/span&gt;
            &lt;span class="s"&gt;record["topic_name"] = "default-topic" -- fallback&lt;/span&gt;
        &lt;span class="s"&gt;end&lt;/span&gt;
        &lt;span class="s"&gt;return 1, timestamp, record&lt;/span&gt;
    &lt;span class="s"&gt;end&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This approach isn’t limited to &lt;code&gt;pod_name&lt;/code&gt; — you can promote &lt;strong&gt;any nested Kubernetes field or label&lt;/strong&gt; to a top-level field and use it as your Kafka topic key. This is useful for:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;  Splitting logs by namespace&lt;/li&gt;
&lt;li&gt;  Creating per-application Kafka topics&lt;/li&gt;
&lt;li&gt;  Enforcing multi-tenant log isolation&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;If you try this in your environment or run into edge cases, I’d love to hear from you! Feel free to connect with me on LinkedIn or drop me a message.&lt;/p&gt;

</description>
      <category>kubernetes</category>
      <category>kafka</category>
      <category>devops</category>
    </item>
  </channel>
</rss>
