<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>Forem: Madhur Ahuja</title>
    <description>The latest articles on Forem by Madhur Ahuja (@madhur).</description>
    <link>https://forem.com/madhur</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F2859%2F509431.jpeg</url>
      <title>Forem: Madhur Ahuja</title>
      <link>https://forem.com/madhur</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://forem.com/feed/madhur"/>
    <language>en</language>
    <item>
      <title>Parallelism and ordering</title>
      <dc:creator>Madhur Ahuja</dc:creator>
      <pubDate>Mon, 08 Jul 2019 10:29:39 +0000</pubDate>
      <link>https://forem.com/madhur/parallelism-and-ordering-3fa7</link>
      <guid>https://forem.com/madhur/parallelism-and-ordering-3fa7</guid>
      <description>&lt;p&gt;One of the disadvantages of microservice event based architectures is that there is lot of parallel processing of a single entity across multiple modules.&lt;br&gt;
There are many cases we want the parallelism but at the same time, want the messages to be processed in ordered manner.&lt;/p&gt;

&lt;p&gt;There are several design pattern to solve this problem:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;Kafka solves this through &lt;a href="http://kafka.apache.org/090/documentation.html"&gt;partitions&lt;/a&gt;. A topic can be split into partitions. Messages are gauranteed to be in order within a single partition. You can have multiple consumers consuming from each partition separately providing for scalability and ordering. This design assumes that each consumer is single threaded and processing the messages one after other. In practical, scenarios this is usually not the case.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;One of the solution is to use SEDA queue. &lt;a href="https://camel.apache.org/"&gt;Camel&lt;/a&gt; has a dedicated page on &lt;a href="https://camel.apache.org/parallel-processing-and-ordering.html"&gt;Parallelism and ordering&lt;/a&gt;&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;There is whitepaper titles &lt;a href="https://people.csail.mit.edu/sanchez/papers/2015.swarm.micro.pdf"&gt;A Scalable Architecture for Ordered Parallelism&lt;/a&gt;&lt;br&gt;
I havn't gone thorugh it yet.&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Are there other methods to handle the ordering while processing the messages in parallel? Let me know in the comments...&lt;/p&gt;

</description>
      <category>parallel</category>
    </item>
    <item>
      <title>Setting up Java project for monitoring</title>
      <dc:creator>Madhur Ahuja</dc:creator>
      <pubDate>Wed, 03 Jul 2019 06:20:11 +0000</pubDate>
      <link>https://forem.com/madhur/setting-up-java-project-for-monitoring-3j12</link>
      <guid>https://forem.com/madhur/setting-up-java-project-for-monitoring-3j12</guid>
      <description>&lt;p&gt;In any Java application deployed to monitoring, it is important to have proper monitoring / alerting infrastructure setup. &lt;/p&gt;

&lt;p&gt;Apart from the infrastructure monitoring, it is essential to setup JVM metrics as well as application monitoring.&lt;/p&gt;

&lt;p&gt;When I say application monitoring, it means monitoring the business transactions such as no. of orders / transactions, successful orders, failed orders , user signups, email triggered etc.&lt;/p&gt;

&lt;p&gt;For this, we have to emit events from our application. These are usually called business metrics.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://metrics.dropwizard.io/4.0.0/"&gt;Dropwizard metrics&lt;/a&gt; is one of the standard libraries out there which allows you to emit events.&lt;/p&gt;

&lt;p&gt;The events can be sent to various data stores, most popular being &lt;a href="https://metrics.dropwizard.io/4.0.0/manual/graphite.html"&gt;Graphite&lt;/a&gt; and &lt;a href="https://github.com/kickstarter/dropwizard-influxdb-reporter"&gt;Influxdb&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;The most popular types of metrics are:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;Meters - A meter measures the rate of events over time. Also called as requests per second or TPS(transactions per second). Meters also track 1-, 5- and 15- minute moving averages, also called &lt;code&gt;m1_rate&lt;/code&gt; , &lt;code&gt;m5_rate&lt;/code&gt; and &lt;code&gt;m15_rate&lt;/code&gt;. Meters have &lt;code&gt;mark&lt;/code&gt; method to indicate the event.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Counters - A counter is used to keep track of counts. For example pending jobs and total requests. Total requests etc is usually a useless parameter but tools like graphite / influxdb can provide derivate function over the counters which can give us the additional functionality of meters using counters.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Timer - A timer measures both the rate a particular piece of code is called and as well as the distribution of its duration. Usually if you are using timer, you don't need meter or counter, since those functionalities are implemented by timer as well. A good practice is to have timer for your normal execution and counter/meter for exception scenarios, so that you can visualize no. of errors or rate of errors. A timer also provides, percentile times using variables &lt;code&gt;p50&lt;/code&gt;, &lt;code&gt;p75&lt;/code&gt;, &lt;code&gt;p95&lt;/code&gt;, &lt;code&gt;p98&lt;/code&gt;, &lt;code&gt;p99&lt;/code&gt; and standard deviation.&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;When using &lt;a href="https://spring.io/"&gt;Spring framework&lt;/a&gt;, &lt;a href="https://github.com/ryantenney/metrics-spring"&gt;Spring metrics&lt;/a&gt; provides tight integration with &lt;a href=""&gt;Dropwizard metrics&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Its important to note that &lt;a href="http://metrics.ryantenney.com/"&gt;metrics-spring&lt;/a&gt;  is a different library from &lt;a href="https://docs.spring.io/spring-metrics/docs/current/public/prometheus"&gt;Spring metrics&lt;/a&gt; , which is part of Spring framework itself.&lt;/p&gt;

&lt;p&gt;The below code demonstrates a simple way to bootstrap spring application for monitoring using &lt;a href="https://metrics.dropwizard.io/4.0.0/"&gt;Dropwizard metrics&lt;/a&gt; and &lt;a href="http://metrics.ryantenney.com/"&gt;metrics-spring&lt;/a&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight groovy"&gt;&lt;code&gt;&lt;span class="n"&gt;buildscript&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;ext&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;
    &lt;span class="n"&gt;repositories&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;mavenCentral&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;
    &lt;span class="n"&gt;dependencies&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;classpath&lt;/span&gt; &lt;span class="s1"&gt;'org.springframework.boot:spring-boot-gradle-plugin:1.5.16.RELEASE'&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;

&lt;span class="n"&gt;group&lt;/span&gt; &lt;span class="s1"&gt;'metrics-example'&lt;/span&gt;
&lt;span class="n"&gt;version&lt;/span&gt; &lt;span class="s1"&gt;'1.0-SNAPSHOT'&lt;/span&gt;

&lt;span class="n"&gt;apply&lt;/span&gt; &lt;span class="nl"&gt;plugin:&lt;/span&gt; &lt;span class="s1"&gt;'java'&lt;/span&gt;
&lt;span class="n"&gt;apply&lt;/span&gt; &lt;span class="nl"&gt;plugin:&lt;/span&gt; &lt;span class="s1"&gt;'org.springframework.boot'&lt;/span&gt;

&lt;span class="n"&gt;sourceCompatibility&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mf"&gt;1.8&lt;/span&gt;

&lt;span class="n"&gt;repositories&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;mavenCentral&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;

&lt;span class="n"&gt;dependencies&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;compile&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s2"&gt;"org.springframework.boot:spring-boot-starter-web"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;compile&lt;/span&gt; &lt;span class="s1"&gt;'org.projectlombok:lombok:1.16.12'&lt;/span&gt;
    &lt;span class="n"&gt;compile&lt;/span&gt; &lt;span class="nl"&gt;group:&lt;/span&gt; &lt;span class="s1"&gt;'io.dropwizard.metrics'&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nl"&gt;name:&lt;/span&gt; &lt;span class="s1"&gt;'metrics-core'&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nl"&gt;version:&lt;/span&gt; &lt;span class="s1"&gt;'4.1.0'&lt;/span&gt;
    &lt;span class="n"&gt;compile&lt;/span&gt; &lt;span class="nl"&gt;group:&lt;/span&gt; &lt;span class="s1"&gt;'io.dropwizard.metrics'&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nl"&gt;name:&lt;/span&gt; &lt;span class="s1"&gt;'metrics-graphite'&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nl"&gt;version:&lt;/span&gt; &lt;span class="s1"&gt;'4.1.0'&lt;/span&gt;
    &lt;span class="n"&gt;compile&lt;/span&gt; &lt;span class="nl"&gt;group:&lt;/span&gt; &lt;span class="s1"&gt;'com.ryantenney.metrics'&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nl"&gt;name:&lt;/span&gt; &lt;span class="s1"&gt;'metrics-spring'&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nl"&gt;version:&lt;/span&gt; &lt;span class="s1"&gt;'3.1.3'&lt;/span&gt;
    &lt;span class="n"&gt;compile&lt;/span&gt; &lt;span class="nl"&gt;group:&lt;/span&gt; &lt;span class="s1"&gt;'com.codahale.metrics'&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nl"&gt;name:&lt;/span&gt; &lt;span class="s1"&gt;'metrics-jvm'&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nl"&gt;version:&lt;/span&gt; &lt;span class="s1"&gt;'3.0.2'&lt;/span&gt;
    &lt;span class="n"&gt;testCompile&lt;/span&gt; &lt;span class="nl"&gt;group:&lt;/span&gt; &lt;span class="s1"&gt;'junit'&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nl"&gt;name:&lt;/span&gt; &lt;span class="s1"&gt;'junit'&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nl"&gt;version:&lt;/span&gt; &lt;span class="s1"&gt;'4.12'&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;





&lt;div class="highlight"&gt;&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="kn"&gt;package&lt;/span&gt; &lt;span class="nn"&gt;app&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.codahale.metrics.ConsoleReporter&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.codahale.metrics.Counter&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.codahale.metrics.JmxReporter&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.codahale.metrics.Meter&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.codahale.metrics.MetricFilter&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.codahale.metrics.MetricRegistry&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.codahale.metrics.graphite.Graphite&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.codahale.metrics.graphite.GraphiteReporter&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.codahale.metrics.jvm.GarbageCollectorMetricSet&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.codahale.metrics.jvm.MemoryUsageGaugeSet&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.codahale.metrics.jvm.ThreadStatesGaugeSet&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.ryantenney.metrics.spring.config.annotation.MetricsConfigurerAdapter&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.springframework.beans.factory.annotation.Value&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.springframework.stereotype.Component&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;javax.annotation.PostConstruct&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;java.net.InetSocketAddress&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;java.util.concurrent.TimeUnit&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="nd"&gt;@Component&lt;/span&gt;
&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;abstract&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;MetricsConfig&lt;/span&gt; &lt;span class="kd"&gt;extends&lt;/span&gt; &lt;span class="nc"&gt;MetricsConfigurerAdapter&lt;/span&gt;&lt;span class="o"&gt;{&lt;/span&gt;

    &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kd"&gt;final&lt;/span&gt; &lt;span class="nc"&gt;MetricRegistry&lt;/span&gt; &lt;span class="n"&gt;metrics&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;MetricRegistry&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
    &lt;span class="nc"&gt;Meter&lt;/span&gt; &lt;span class="n"&gt;publishMeter&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;metrics&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;meter&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"publish.meter"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
    &lt;span class="nc"&gt;Counter&lt;/span&gt; &lt;span class="n"&gt;publishCounter&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;metrics&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;counter&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"publish.counter"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

    &lt;span class="nd"&gt;@Value&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"${graphite.host}"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
    &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;graphiteHost&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

    &lt;span class="nd"&gt;@Value&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"${graphite.port}"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
    &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kt"&gt;int&lt;/span&gt; &lt;span class="n"&gt;graphitePort&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

    &lt;span class="nd"&gt;@Value&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"${graphite.amount.of.time.between.polls}"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
    &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kt"&gt;long&lt;/span&gt; &lt;span class="n"&gt;graphiteAmountOfTimeBetweenPolls&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

    &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;graphitePrefix&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

    &lt;span class="nd"&gt;@PostConstruct&lt;/span&gt;
    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;run&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="nc"&gt;ConsoleReporter&lt;/span&gt; &lt;span class="n"&gt;reporter&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;ConsoleReporter&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;forRegistry&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;metrics&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
                &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;convertRatesTo&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;TimeUnit&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;SECONDS&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
                &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;convertDurationsTo&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;TimeUnit&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;MILLISECONDS&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
                &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;build&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
        &lt;span class="n"&gt;reporter&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;start&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;TimeUnit&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;SECONDS&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

    &lt;span class="o"&gt;}&lt;/span&gt;

    &lt;span class="kd"&gt;abstract&lt;/span&gt; &lt;span class="kd"&gt;protected&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;configureReporters&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;

    &lt;span class="kd"&gt;protected&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;configureReporters&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;graphitePrefix&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;graphitePrefix&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;graphitePrefix&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
        &lt;span class="n"&gt;configureReporters&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;metrics&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;


    &lt;span class="nd"&gt;@Override&lt;/span&gt;
    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;configureReporters&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;MetricRegistry&lt;/span&gt; &lt;span class="n"&gt;metricRegistry&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;registerReporter&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;JmxReporter&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;forRegistry&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;metricRegistry&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
                &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;build&lt;/span&gt;&lt;span class="o"&gt;()).&lt;/span&gt;&lt;span class="na"&gt;start&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
        &lt;span class="nc"&gt;GraphiteReporter&lt;/span&gt; &lt;span class="n"&gt;graphiteReporter&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt;
                &lt;span class="n"&gt;getGraphiteReporterBuilder&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;metricRegistry&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
                        &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;build&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;getGraphite&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
        &lt;span class="n"&gt;registerReporter&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;graphiteReporter&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
        &lt;span class="n"&gt;graphiteReporter&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;start&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;graphiteAmountOfTimeBetweenPolls&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
                &lt;span class="nc"&gt;TimeUnit&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;MILLISECONDS&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;

    &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="nc"&gt;GraphiteReporter&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;Builder&lt;/span&gt; &lt;span class="nf"&gt;getGraphiteReporterBuilder&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;MetricRegistry&lt;/span&gt;
                                                       &lt;span class="n"&gt;metricRegistry&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;metricRegistry&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;register&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"gc"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;GarbageCollectorMetricSet&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
        &lt;span class="n"&gt;metricRegistry&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;register&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"memory"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;MemoryUsageGaugeSet&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
        &lt;span class="n"&gt;metricRegistry&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;register&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"threads"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;ThreadStatesGaugeSet&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nc"&gt;GraphiteReporter&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;forRegistry&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;metricRegistry&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
                &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;convertRatesTo&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;TimeUnit&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;SECONDS&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
                &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;convertDurationsTo&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;TimeUnit&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;MILLISECONDS&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
                &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;filter&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;MetricFilter&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;ALL&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
                &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;prefixedWith&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;graphitePrefix&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;

    &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="nc"&gt;Graphite&lt;/span&gt; &lt;span class="nf"&gt;getGraphite&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nf"&gt;Graphite&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;InetSocketAddress&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;graphiteHost&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
                &lt;span class="n"&gt;graphitePort&lt;/span&gt;&lt;span class="o"&gt;));&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;





&lt;div class="highlight"&gt;&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="kn"&gt;package&lt;/span&gt; &lt;span class="nn"&gt;app&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.springframework.stereotype.Component&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;javax.annotation.PostConstruct&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="nd"&gt;@Component&lt;/span&gt;
&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;ProdMetricsConfig&lt;/span&gt; &lt;span class="kd"&gt;extends&lt;/span&gt; &lt;span class="nc"&gt;MetricsConfig&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;

    &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kd"&gt;static&lt;/span&gt; &lt;span class="kd"&gt;final&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="no"&gt;GRAPHITE_PREFIX&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt;
            &lt;span class="s"&gt;"collectd/graphite-monitoring-example/production"&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;


    &lt;span class="nd"&gt;@Override&lt;/span&gt;
    &lt;span class="kd"&gt;protected&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;configureReporters&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;configureReporters&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="no"&gt;GRAPHITE_PREFIX&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;

    &lt;span class="nd"&gt;@PostConstruct&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;
    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;init&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;configureReporters&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Here is an example of dashboard visualized in &lt;a href="https://grafana.com/"&gt;Grafana&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--ePx9EOdq--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/monitoring-example.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--ePx9EOdq--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/monitoring-example.png" alt=""&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;The above complete project is available on my &lt;a href="https://github.com/madhur/jvm-monitoring-example"&gt;github repository&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;The repo contains a dockerized spring boot application, grafana and a graphite instance.&lt;/p&gt;

&lt;p&gt;Assuming you have docker installed, just execute below commands to get the project up and running.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight shell"&gt;&lt;code&gt;./gradlew build
docker build &lt;span class="nt"&gt;-t&lt;/span&gt; metrics-example &lt;span class="nb"&gt;.&lt;/span&gt;
docker-compose up &lt;span class="nt"&gt;-d&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Use &lt;code&gt;http://localhost:3000&lt;/code&gt; to browse the grafana portal and setup your dashboards.&lt;/p&gt;

&lt;p&gt;Do let me know if you have any suggestions / feedback&lt;/p&gt;

</description>
      <category>java</category>
    </item>
    <item>
      <title>Consuming from Kafka</title>
      <dc:creator>Madhur Ahuja</dc:creator>
      <pubDate>Sat, 22 Jun 2019 14:01:31 +0000</pubDate>
      <link>https://forem.com/madhur/consuming-from-kafka-47kj</link>
      <guid>https://forem.com/madhur/consuming-from-kafka-47kj</guid>
      <description>&lt;p&gt;I have been exploring on the best ways to consume from Kafka topic in Java. There are several ways:&lt;/p&gt;

&lt;p&gt;1 The simplest way is using &lt;code&gt;KafkaListener&lt;/code&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="nd"&gt;@Slf4j&lt;/span&gt;
&lt;span class="nd"&gt;@Component&lt;/span&gt;
&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;ExampleConsumer&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;

    &lt;span class="nd"&gt;@KafkaListener&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;id&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"fooGroup"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;topics&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"Topic2"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;listen&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;in&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;log&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;info&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Received: "&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="n"&gt;in&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;in&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;startsWith&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"foo"&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
            &lt;span class="k"&gt;throw&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nf"&gt;RuntimeException&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"failed"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
        &lt;span class="o"&gt;}&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;2 The second way is to use &lt;a href="https://camel.apache.org/"&gt;Apache Camel&lt;/a&gt; . Using Apache camel is useful if you have lot of filtering logic to be applied on incoming messages and also output the processed messages onto another topic or stream.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="nd"&gt;@Component&lt;/span&gt;
&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;CamelListener&lt;/span&gt; &lt;span class="kd"&gt;extends&lt;/span&gt; &lt;span class="nc"&gt;RouteBuilder&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;

    &lt;span class="nd"&gt;@Autowired&lt;/span&gt;
    &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="nc"&gt;KafkaConsumerProperties&lt;/span&gt; &lt;span class="n"&gt;kafkaConsumerProperties&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

    &lt;span class="nd"&gt;@Override&lt;/span&gt;
    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;configure&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="kd"&gt;throws&lt;/span&gt; &lt;span class="nc"&gt;Exception&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;from&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;kafkaConsumerProperties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;kafkaUri&lt;/span&gt;&lt;span class="o"&gt;()).&lt;/span&gt;&lt;span class="na"&gt;process&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;exchange&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;

            &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;payload&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;exchange&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getIn&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;getBody&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;class&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
            &lt;span class="nc"&gt;System&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;out&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;println&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Camel consumer: "&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="n"&gt;payload&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
        &lt;span class="o"&gt;}).&lt;/span&gt;&lt;span class="na"&gt;end&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;3 The final and my preferred way is to use &lt;a href="https://kafka.apache.org/documentation/streams/"&gt;Kafka Streams&lt;/a&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="nd"&gt;@Service&lt;/span&gt;
&lt;span class="nd"&gt;@Slf4j&lt;/span&gt;
&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;KafkaStreamConsumer&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;

    &lt;span class="nd"&gt;@Autowired&lt;/span&gt;
    &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="nc"&gt;KafkaConsumerProperties&lt;/span&gt; &lt;span class="n"&gt;kafkaConsumerProperties&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

    &lt;span class="nd"&gt;@Autowired&lt;/span&gt;
    &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="nc"&gt;KafkaOrderFeedProcessor&lt;/span&gt; &lt;span class="n"&gt;kafkaOrderFeedProcessor&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

    &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;topic&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

    &lt;span class="nd"&gt;@PostConstruct&lt;/span&gt;
    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;processKafkaConsumer&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="nc"&gt;Properties&lt;/span&gt; &lt;span class="n"&gt;properties&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;kafkaConsumerProperties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getConsumerProperties&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
        &lt;span class="nc"&gt;KafkaStreams&lt;/span&gt; &lt;span class="n"&gt;kafkaStreams&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
        &lt;span class="k"&gt;try&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
            &lt;span class="nc"&gt;StreamsBuilder&lt;/span&gt; &lt;span class="n"&gt;builder&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;StreamsBuilder&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
            &lt;span class="nc"&gt;KStream&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;kStream&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;builder&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;stream&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;topic&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
            &lt;span class="n"&gt;kStream&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;process&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;kafkaOrderFeedProcessor&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="o"&gt;]);&lt;/span&gt;
            &lt;span class="n"&gt;kafkaStreams&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;KafkaStreams&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;builder&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;build&lt;/span&gt;&lt;span class="o"&gt;(),&lt;/span&gt; &lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
            &lt;span class="n"&gt;kafkaStreams&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;start&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
            &lt;span class="n"&gt;log&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;info&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"op={}, status=OK, desc={}"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"KafkaConsumer"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"kafka consumer stream  started successfully"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
        &lt;span class="o"&gt;}&lt;/span&gt; &lt;span class="k"&gt;catch&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Exception&lt;/span&gt; &lt;span class="n"&gt;var9&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
            &lt;span class="n"&gt;log&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;error&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"op={}, status=KO, desc={} and exception={}"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;Object&lt;/span&gt;&lt;span class="o"&gt;[]{&lt;/span&gt;&lt;span class="s"&gt;"KafkaConsumer"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"exception while starting kafka consumer stream"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;var9&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getMessage&lt;/span&gt;&lt;span class="o"&gt;()});&lt;/span&gt;
            &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;kafkaStreams&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
                &lt;span class="n"&gt;kafkaStreams&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;close&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
            &lt;span class="o"&gt;}&lt;/span&gt;
        &lt;span class="o"&gt;}&lt;/span&gt;

    &lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;There are various advantages of using Kafka's Streams API.&lt;/p&gt;

&lt;p&gt;Kafka's Streams API (&lt;a href="https://kafka.apache.org/documentation/streams/"&gt;https://kafka.apache.org/documentation/streams/&lt;/a&gt;) is built on top of Kafka's producer and consumer clients. It's significantly more powerful and also more expressive than the Kafka consumer client. Here are some of the features of the Kafka Streams API:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;supports exactly-once processing semantics (Kafka versions 0.11+)&lt;/li&gt;
&lt;li&gt;supports fault-tolerant stateful processing including streaming joins, aggregations, and windowing&lt;/li&gt;
&lt;li&gt;supports event-time processing as well as processing based on processing-time and ingestion-time
has first-class support for both streams and tables, which is where stream processing meets databases; in practice, most stream processing applications need both streams AND tables for implementing their respective use cases, so if a stream processing technology lacks either of the two abstractions (say, no support for tables) you are either stuck or must manually implement this functionality yourself (good luck with that...)&lt;/li&gt;
&lt;li&gt;supports interactive queries to expose the latest processing results to other applications and services)&lt;/li&gt;
&lt;li&gt;more expressive: it ships with (1) a functional programming style DSL with operations such as map, filter, reduce as well as (2) an imperative style Processor API for e.g. doing complex event processing (CEP), and (3) you can even combine the DSL and the Processor API.&lt;/li&gt;
&lt;/ul&gt;

</description>
      <category>java</category>
      <category>kafka</category>
    </item>
    <item>
      <title>Considerations for high throughput kafka producer</title>
      <dc:creator>Madhur Ahuja</dc:creator>
      <pubDate>Sat, 22 Jun 2019 13:59:07 +0000</pubDate>
      <link>https://forem.com/madhur/considerations-for-high-throughput-kafka-producer-3gjm</link>
      <guid>https://forem.com/madhur/considerations-for-high-throughput-kafka-producer-3gjm</guid>
      <description>&lt;p&gt;I have been recently working on lot of high throughput kafka producers. Our application publishes close to 3 million kafka publishes per day. (which is still low compared to what kafka can handle)&lt;/p&gt;

&lt;p&gt;There are some of the learnings along the way in maintaining such kafka producers:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Choose the number of partitions wisely: The number of partitions determine how much consumers can scale. Number of partitions is degree of parallelism in kafka. Kafka gives a single partition's data to single thread.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Our general thumb rule is to have partitions equal to number of consumer servers. For example, if we have cluster of 20 servers consuming from kafka topic, each server will be consuming from single partition so 20 partitions.&lt;/p&gt;

&lt;p&gt;There are many other factors to be considered &lt;a href="https://www.confluent.io/blog/how-choose-number-topics-partitions-kafka-cluster"&gt;as explained here&lt;/a&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;Decide a consistent key while publishing - Messages published with the same key will be published to a single partition. A partition is logic unit of ordering of messages. So if ordering of messages is important to you, you should choose a consistent key for those messages. &lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Use power of asynchronous - Kafka producer is by default asynchronous unless you use a blocking call explicitly. That means that kafka publish can fail and your code would have moved past the publish method already. Kafka producer provides a callback once the server has executed the publish instruction. In this callback, the user can check for failure and retry the option or send to a dead letter queue etc. Kafka producer itself retries for 3 times but I believe that is too less and not enough for data critical applications.&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Below is the sample snippet of such producer&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="nd"&gt;@Autowired&lt;/span&gt;
&lt;span class="nd"&gt;@Qualifier&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"createKafkaSslProducerOrder"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
&lt;span class="nc"&gt;Producer&lt;/span&gt; &lt;span class="n"&gt;kafkaSslProducer&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;publish&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;messageKey&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;payload&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;topic&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;

    &lt;span class="k"&gt;try&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;

        &lt;span class="nc"&gt;ProducerRecord&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;ProducerRecord&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&amp;gt;(&lt;/span&gt;&lt;span class="n"&gt;topic&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;messageKey&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;payload&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

        &lt;span class="n"&gt;kafkaSslProducer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;send&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;metadata&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;exception&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;

            &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Optional&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;ofNullable&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;exception&lt;/span&gt;&lt;span class="o"&gt;).&lt;/span&gt;&lt;span class="na"&gt;isPresent&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
                &lt;span class="n"&gt;log&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;error&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"op={}, status=KO, desc={} and exception={}"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
                        &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;Object&lt;/span&gt;&lt;span class="o"&gt;[]&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt; &lt;span class="s"&gt;"KafkaProducer"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
                                &lt;span class="s"&gt;"Error posting message to kafka topic: "&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="n"&gt;topic&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
                                &lt;span class="n"&gt;exception&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getMessage&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;});&lt;/span&gt;
                &lt;span class="c1"&gt;// Send for re-processing&lt;/span&gt;
            &lt;span class="o"&gt;}&lt;/span&gt;

        &lt;span class="o"&gt;});&lt;/span&gt;

    &lt;span class="o"&gt;}&lt;/span&gt; &lt;span class="k"&gt;catch&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Exception&lt;/span&gt; &lt;span class="n"&gt;ex&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;log&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;error&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"op={}, status=KO, desc=Error posting message to SSL kafka: {}, stackTrace={} "&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="no"&gt;LOG_OP_INFO&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;ex&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getMessage&lt;/span&gt;&lt;span class="o"&gt;(),&lt;/span&gt; &lt;span class="n"&gt;ex&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
        &lt;span class="c1"&gt;// Re-throw the exception so that status can be recorded in the database.&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;ul&gt;
&lt;li&gt;&lt;p&gt;In case of kafka messages, it is useful to provide a complete publish timestamp and origial modify timestamp of the message (such as db record). Using these timestamps, client can determine if the incoming message is stale or a new upate.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Initially, during development, it is very useful to store the partition and offset of the consumed messages. This can be stored in the consumer data store or application logs. Using this information, the message can be directly looked up in kafka to see the original message. &lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

</description>
      <category>java</category>
      <category>kafka</category>
    </item>
    <item>
      <title>Tuning Linux servers for scalability</title>
      <dc:creator>Madhur Ahuja</dc:creator>
      <pubDate>Tue, 07 Feb 2017 22:00:57 +0000</pubDate>
      <link>https://forem.com/madhur/tuning-linux-servers-for-scalability</link>
      <guid>https://forem.com/madhur/tuning-linux-servers-for-scalability</guid>
      <description>&lt;p&gt;Focus on performance and scalablity is one of my primary personal and professional goal when working with tech products.&lt;/p&gt;

&lt;p&gt;The server can be any Linux based server such as &lt;a href="https://www.centos.org/"&gt;CentOS&lt;/a&gt; or &lt;a href="https://www.debian.org/"&gt;Debian&lt;/a&gt; derivative such as &lt;a href="https://www.ubuntu.com/"&gt;Ubuntu&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;In this post, I will outline my learnings on scaling the Linux server. Scaling here implies many things such as&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;Being able to open many files at once. Here files can be generally applied to concept such as open ports, threads etc and not necessary physical files&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Being able to handle many concurrent network connections&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Knowing Linux OS and related concepts such as &lt;a href="https://en.wikipedia.org/wiki/Iptables"&gt;Iptables&lt;/a&gt; is a pre-requisite.&lt;/p&gt;

&lt;h2&gt;
  
  
  Open files
&lt;/h2&gt;

&lt;p&gt;We need to keep our file limit high for any linux production server. Check the current value using &lt;code&gt;ulimit -a&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;We can configure this limit using &lt;code&gt;/etc/security/limits.conf&lt;/code&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight powershell"&gt;&lt;code&gt;&lt;span class="w"&gt;    &lt;/span&gt;&lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="n"&gt;hard&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nx"&gt;nofile&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nx"&gt;300000&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="n"&gt;soft&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nx"&gt;nofile&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nx"&gt;300000&lt;/span&gt;&lt;span class="w"&gt;

    &lt;/span&gt;&lt;span class="n"&gt;tomcat&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nx"&gt;hard&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nx"&gt;nofile&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nx"&gt;300000&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="n"&gt;tomcat&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nx"&gt;soft&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nx"&gt;nofile&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nx"&gt;300000&lt;/span&gt;&lt;span class="w"&gt;

&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Note that we can also specify per user limit as shown above (special limits for tomcat user)&lt;/p&gt;

&lt;p&gt;The file descriptor limit for a running process can be seen in the following file under Max open files.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight powershell"&gt;&lt;code&gt;&lt;span class="w"&gt;    &lt;/span&gt;&lt;span class="err"&gt;$&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="n"&gt;cat&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nx"&gt;/proc/&lt;/span&gt;&lt;span class="err"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nx"&gt;pid&lt;/span&gt;&lt;span class="err"&gt;&amp;gt;&lt;/span&gt;&lt;span class="nx"&gt;/limits&lt;/span&gt;&lt;span class="w"&gt;

    &lt;/span&gt;&lt;span class="n"&gt;Max&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nx"&gt;open&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nx"&gt;files&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nx"&gt;30000&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Ephemeral Ports
&lt;/h2&gt;

&lt;p&gt;Increase the number of ephemeal ports availabl to your application. The default value is &lt;code&gt;32768 - 61000&lt;/code&gt;.&lt;/p&gt;

&lt;h2&gt;
  
  
  TIME_WAIT state
&lt;/h2&gt;

&lt;p&gt;TCP connections go through lot of states, last of them is &lt;code&gt;TIME_WAIT&lt;/code&gt; state. The default &lt;code&gt;TIME_WAIT&lt;/code&gt; timeout is for 2 minutes, Which means you'll run out of available ports if you receive more than about 400 requests a second, or if we look back to how nginx does proxies, this actually translates to 200 requests per second.&lt;/p&gt;

&lt;p&gt;These parameters can be tuned using these settings in &lt;code&gt;/etc/sysctl.conf&lt;/code&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight powershell"&gt;&lt;code&gt;&lt;span class="w"&gt;    &lt;/span&gt;&lt;span class="n"&gt;net.ipv4.ip_local_port_range&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="mi"&gt;18000&lt;/span&gt;&lt;span class="w"&gt;    &lt;/span&gt;&lt;span class="mi"&gt;65535&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="n"&gt;net.ipv4.netfilter.ip_conntrack_tcp_timeout_time_wait&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Connection Tracking
&lt;/h2&gt;

&lt;p&gt;The next parameter we looked at was Connection Tracking. This is a side effect of using &lt;code&gt;iptables&lt;/code&gt;. Since &lt;code&gt;iptables&lt;/code&gt; needs to allow two-way communication between established HTTP and ssh connections, it needs to keep track of which connections are established, and it puts these into a connection tracking table. This table grows. And grows. And grows.&lt;/p&gt;

&lt;p&gt;You can see the current size of this table using &lt;code&gt;sysctl net.netfilter.nf_conntrack_count&lt;/code&gt; and its limit using &lt;code&gt;sysctl net.nf_conntrack_max&lt;/code&gt;. If count crosses max, your linux system will stop accepting new TCP connections and you'll never know about this. The only indication that this has happened is a single line hidden somewhere in &lt;code&gt;/var/log/syslog&lt;/code&gt; saying that you're out of connection tracking entries. One line, once, when it first happens.&lt;/p&gt;

&lt;p&gt;A better indication is if count is always very close to max. You might think, “Hey, we've set max exactly right.”, but you'd be wrong.&lt;/p&gt;

&lt;p&gt;What you need to do (or at least that's what you first think) is to increase max.&lt;/p&gt;

&lt;p&gt;Keep in mind though, that the larger this value, the more RAM the kernel will use to keep track of these entries. RAM that could be used by your application.&lt;/p&gt;

&lt;p&gt;We started down this path, increasing net.nf_conntrack_max, but soon we were just pushing it up every day. Connections that were getting in there were never getting out.&lt;/p&gt;

&lt;h2&gt;
  
  
  Maximum number of pending connections on a socket
&lt;/h2&gt;

&lt;p&gt;During some of our initial load testing, we ran into a strange problem where we were unable to open more than approximately 128 concurrent connections at once.&lt;/p&gt;

&lt;p&gt;After some investigation, we learned about the following kernel parameter.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight powershell"&gt;&lt;code&gt;&lt;span class="w"&gt;    &lt;/span&gt;&lt;span class="n"&gt;net.core.somaxconn&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This kernel parameter is the size of the backlog of TCP connections waiting to be accepted by the application. If a connection indication arrives when the queue is full, the connection is refused. The default value for this parameters is 128 on most modern operating systems.&lt;/p&gt;

&lt;p&gt;Bumping up this limit in &lt;code&gt;/etc/sysctl.conf&lt;/code&gt; helped us get rid of the “connection refused issues on our Linux machines.&lt;/p&gt;

&lt;h2&gt;
  
  
  JVM thread count
&lt;/h2&gt;

&lt;p&gt;A few hours after we allowed a significant percentage of production traffic to hit our server for the first time, we were alerted to the fact that the load balancer was unable to connect to a few of our machines. On further investigation, we saw the following all over our server logs.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight powershell"&gt;&lt;code&gt;&lt;span class="w"&gt;    &lt;/span&gt;&lt;span class="n"&gt;java.lang.OutOfMemoryError:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nx"&gt;unable&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nx"&gt;to&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nx"&gt;create&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nx"&gt;new&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nx"&gt;native&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nx"&gt;thread&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;If you hit the JVM thread limit, chances are that there is a thread leak in your code that needs to be fixed. However, if you find that all your threads are actually doing useful work, is there a way to tweak the system to let you create more threads and accept more connections?&lt;/p&gt;

&lt;p&gt;The answer, as always, is fun. It's interesting to discuss how available memory limits the number of threads that can be created on a JVM. The stack size of a thread determines the memory available for static memory allocation. Thus, the absolute theoretical maximum number of threads is a process's user address space divided by the thread stack size. However, the reality is that the JVM also uses memory for dynamic allocation on the heap. With a few quick tests with a small Java process, we could verify that as more memory is allocated for the heap, less is available for the stack. Thus, the limit on the number of threads decreases with increasing heap size.&lt;/p&gt;

&lt;p&gt;To summarize, you can increase the thread count limit by decreasing the stack size per thread &lt;code&gt;(-Xss)&lt;/code&gt; or by decreasing the memory allocated to the heap &lt;code&gt;(-Xms, -Xmx).&lt;/code&gt;&lt;/p&gt;

</description>
    </item>
    <item>
      <title>Hi, I'm Madhur Ahuja</title>
      <dc:creator>Madhur Ahuja</dc:creator>
      <pubDate>Tue, 07 Feb 2017 16:56:14 +0000</pubDate>
      <link>https://forem.com/madhur/hi-im-madhur-ahuja</link>
      <guid>https://forem.com/madhur/hi-im-madhur-ahuja</guid>
      <description>&lt;p&gt;I am a programmer/technology guy. I like building web applications using all the standard technologies like HTML, CSS, JavaScript, JQuery, ASP.NET, and SQL Server. &lt;br&gt;
I also know a good variety of programming languages too. .NET C# is my favourite, I know Java very well, and I know Javascript pretty well too.&lt;br&gt;
For operating systems I have experience in both Linux and Windows, Linux being my current operating system. &lt;br&gt;
I also have experience developing applications for Android.&lt;/p&gt;

&lt;p&gt;Before entering the professional world, I spent some of the most fruitful years of my life studying Information Technology at &lt;a href="http://www.bharatividyapeeth.edu/default.aspx" rel="noopener noreferrer"&gt;Bharati Vidyapeeth's&lt;/a&gt; &lt;a href="http://www.bvucoepune.edu.in/" rel="noopener noreferrer"&gt;College of Engineering&lt;/a&gt;, Pune. &lt;br&gt;
My interest in computers, internet and Linux dates back to 1997, when I was first introduced to a computer. That computer (Intel Pentium 120 MHz, 1.2GB HDD, 8MB RAM) was one of the most awesome things I had ever possessed.&lt;/p&gt;

</description>
      <category>introduction</category>
    </item>
  </channel>
</rss>
