<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>Forem: štef</title>
    <description>The latest articles on Forem by štef (@d4be4st).</description>
    <link>https://forem.com/d4be4st</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F28058%2F6ca7f183-eeb3-4edc-b9d3-f1a0d3e7ea61.png</url>
      <title>Forem: štef</title>
      <link>https://forem.com/d4be4st</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://forem.com/feed/d4be4st"/>
    <language>en</language>
    <item>
      <title>Publish/Subscribe with Sidekiq</title>
      <dc:creator>štef</dc:creator>
      <pubDate>Wed, 21 Feb 2024 08:00:00 +0000</pubDate>
      <link>https://forem.com/productive/publishsubscribe-with-sidekiq-3n92</link>
      <guid>https://forem.com/productive/publishsubscribe-with-sidekiq-3n92</guid>
      <description>&lt;h2&gt;
  
  
  Introduction
&lt;/h2&gt;

&lt;p&gt;Our Rails application is an old monolith that relies heavily on after/before callbacks to trigger code that has side effects. This usually means that one model can change records of another model. I suspect that we all have come across this "callback hell" at least once in our professional life.&lt;/p&gt;

&lt;p&gt;We needed to introduce a new service for search. As we settled on using &lt;a href="https://www.meilisearch.com/"&gt;meilisearch&lt;/a&gt;, we needed a way to sync updates on our models with the records in meilisearch. We could've continued to use callbacks but we needed something better.&lt;/p&gt;

&lt;p&gt;We settled on a Publish/Subscribe (Pub/Sub) pattern. &lt;br&gt;
Pub/Sub is a design pattern where publishers broadcast messages to topics without specifying recipients, and subscribers listen for specific topics without knowing the source, promoting loose coupling.&lt;/p&gt;
&lt;h2&gt;
  
  
  Exploring Pub/Sub Solutions
&lt;/h2&gt;

&lt;p&gt;While searching for a solution we came across these:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;&lt;a href="https://edgeguides.rubyonrails.org/active_support_instrumentation.html"&gt;ActiveSupport::Notification&lt;/a&gt;: A Rails component for defining and subscribing to events within an application​.&lt;br&gt;
-&amp;gt; Active::Support is synchronous by default. It does not handle errors and retries out of the box. We would need to use a background job handler (like Sidekiq) to make it asynchronous.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;a href="https://github.com/krisleech/wisper"&gt;Wisper&lt;/a&gt;: A Ruby gem providing a decoupled communication layer between different parts of an application​&lt;br&gt;
-&amp;gt; I personally dislike wisper. I used it in the past and dislike the way of defining subscribers in a global way. I wanted topics to be arbitrary and each class to define what to subscribe for itself.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;a href="https://dry-rb.org/gems/dry-events/1.0/"&gt;Dry-events&lt;/a&gt;: A component of the dry-rb ecosystem focused on event publishing and subscription​&lt;br&gt;
-&amp;gt; This gem looks like a framework on top of which I would need to create my own Pub/Sub. It also does not do asynchronous execution out of the box.&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;We finally settled on Sidekiq!&lt;br&gt;
Sidekiq is a background job processing tool, which we already use in our application and we figured a way to use it for processing Pub/Sub messages.&lt;/p&gt;
&lt;h2&gt;
  
  
  Implementation
&lt;/h2&gt;

&lt;p&gt;Implementation is quite straightforward.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight ruby"&gt;&lt;code&gt;&lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;PubSub&lt;/span&gt;
  &lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;SubscriberJob&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt; &lt;span class="no"&gt;ApplicationJob&lt;/span&gt;
    &lt;span class="n"&gt;queue_as&lt;/span&gt; &lt;span class="ss"&gt;:pub_sub&lt;/span&gt;

    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;perform&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="p"&gt;:,&lt;/span&gt; &lt;span class="n"&gt;class_name&lt;/span&gt;&lt;span class="p"&gt;:,&lt;/span&gt; &lt;span class="n"&gt;handler&lt;/span&gt;&lt;span class="p"&gt;:)&lt;/span&gt;
      &lt;span class="n"&gt;class_name&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;constantize&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;public_send&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;handler&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;end&lt;/span&gt;
  &lt;span class="k"&gt;end&lt;/span&gt;

  &lt;span class="kp"&gt;include&lt;/span&gt; &lt;span class="no"&gt;Singleton&lt;/span&gt;

  &lt;span class="c1"&gt;# Publish a message to topic&lt;/span&gt;
  &lt;span class="c1"&gt;#&lt;/span&gt;
  &lt;span class="c1"&gt;# @param topic [String]&lt;/span&gt;
  &lt;span class="c1"&gt;# @param message [Hash]&lt;/span&gt;
  &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nc"&gt;self&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;publish&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;topic&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="o"&gt;**&lt;/span&gt;&lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
   &lt;span class="n"&gt;instance&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;subscribers&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;topic&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="nf"&gt;each&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt; &lt;span class="o"&gt;|&lt;/span&gt;&lt;span class="n"&gt;subscriber&lt;/span&gt;&lt;span class="o"&gt;|&lt;/span&gt;
      &lt;span class="no"&gt;PubSub&lt;/span&gt;&lt;span class="o"&gt;::&lt;/span&gt;&lt;span class="no"&gt;SubscriberJob&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;perform_later&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="ss"&gt;message: &lt;/span&gt;&lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="o"&gt;**&lt;/span&gt;&lt;span class="n"&gt;subscriber&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;end&lt;/span&gt;
  &lt;span class="k"&gt;end&lt;/span&gt;

  &lt;span class="c1"&gt;# Subscribe a class + handler to a topic&lt;/span&gt;
  &lt;span class="c1"&gt;#&lt;/span&gt;
  &lt;span class="c1"&gt;# @param topic [String]&lt;/span&gt;
  &lt;span class="c1"&gt;# @param class_name [String]&lt;/span&gt;
  &lt;span class="c1"&gt;# @param handler [String]&lt;/span&gt;
  &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nc"&gt;self&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;subscribe&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;topic&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;class_name&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;handler&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="ss"&gt;async: &lt;/span&gt;&lt;span class="kp"&gt;true&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;instance&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;subscribe&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;topic&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;class_name&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;handler&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
  &lt;span class="k"&gt;end&lt;/span&gt;

  &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;initialize&lt;/span&gt;
    &lt;span class="vi"&gt;@subscribers&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{}&lt;/span&gt;
  &lt;span class="k"&gt;end&lt;/span&gt;

  &lt;span class="c1"&gt;# return subscribers for the topic&lt;/span&gt;
  &lt;span class="c1"&gt;#&lt;/span&gt;
  &lt;span class="c1"&gt;# @param topic [String]&lt;/span&gt;
  &lt;span class="c1"&gt;# @return [Array&amp;lt;Hash&amp;gt;] { class_name: String, handler: String}&lt;/span&gt;
  &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;subscribers&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;topic&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="vi"&gt;@subscribers&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;fetch&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;topic&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="no"&gt;Set&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;new&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
  &lt;span class="k"&gt;end&lt;/span&gt;

  &lt;span class="c1"&gt;# Subscribe a class + handler to a topic&lt;/span&gt;
  &lt;span class="c1"&gt;#&lt;/span&gt;
  &lt;span class="c1"&gt;# @param topic [String]&lt;/span&gt;
  &lt;span class="c1"&gt;# @param class_name [String]&lt;/span&gt;
  &lt;span class="c1"&gt;# @param handler [String]&lt;/span&gt;
  &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;subscribe&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;topic&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;class_name&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;handler&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="ss"&gt;async: &lt;/span&gt;&lt;span class="kp"&gt;true&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
      &lt;span class="vi"&gt;@subscribers&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;topic&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;||=&lt;/span&gt; &lt;span class="no"&gt;Set&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;new&lt;/span&gt;
      &lt;span class="vi"&gt;@subscribers&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;topic&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&amp;lt;&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="ss"&gt;class_name: &lt;/span&gt;&lt;span class="n"&gt;class_name&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;to_s&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="ss"&gt;handler: &lt;/span&gt;&lt;span class="n"&gt;handler&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt;
  &lt;span class="k"&gt;end&lt;/span&gt;
&lt;span class="k"&gt;end&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Usage
&lt;/h2&gt;

&lt;p&gt;Usage is also quite straightforward:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight ruby"&gt;&lt;code&gt;&lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;Search::Tasks::Indexer&lt;/span&gt;
   &lt;span class="nc"&gt;PubSub&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;subscribe&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'task.upsert'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nb"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="ss"&gt;:on_upsert&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;  &lt;span class="p"&gt;)&lt;/span&gt;

   &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nc"&gt;self&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;on_upsert&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
     &lt;span class="n"&gt;item&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="ss"&gt;:item&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;

     &lt;span class="no"&gt;MeilisearchClient&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;reindex&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;item&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;to_meilisearch&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
   &lt;span class="k"&gt;end&lt;/span&gt;
&lt;span class="k"&gt;end&lt;/span&gt;

&lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;TaskForm&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt; &lt;span class="no"&gt;ApplicationForm&lt;/span&gt;
  &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;save&lt;/span&gt;
    &lt;span class="k"&gt;super&lt;/span&gt;
    &lt;span class="no"&gt;PubSub&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;publish&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'task.upsert'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="ss"&gt;item: &lt;/span&gt;&lt;span class="nb"&gt;self&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
  &lt;span class="k"&gt;end&lt;/span&gt;
&lt;span class="k"&gt;end&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Experience
&lt;/h2&gt;

&lt;p&gt;After 6 months of using this method, I can safely say that it works as intended.&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;It handles unexpected errors like network errors gracefully&lt;/li&gt;
&lt;li&gt;It was easy to use where we needed it&lt;/li&gt;
&lt;li&gt;Classes that handle the incoming messages are separated from the rest of the models and are easy to unit test&lt;/li&gt;
&lt;/ul&gt;

&lt;blockquote&gt;
&lt;p&gt;&lt;strong&gt;📖 Sidestory: Rethinking Pub-Sub&lt;/strong&gt;&lt;br&gt;
Recently &lt;a href="https://dev.to/buha"&gt;Buha&lt;/a&gt; wrote &lt;a href="https://dev.to/productive/a-close-call-with-real-time-how-rethinking-pub-sub-saved-the-day-52kp"&gt;a blog post&lt;/a&gt; how he gave up on the Pub/Sub approach. It is a good read to see the downsides of this approach.&lt;/p&gt;
&lt;/blockquote&gt;

&lt;h2&gt;
  
  
  Conclusion
&lt;/h2&gt;

&lt;p&gt;We solved the problem of pub/sub messaging in our product with the help of Sidekiq. It proved to be reliable and of high quality, and the implementation itself is not complicated. &lt;/p&gt;

&lt;p&gt;What are your experiences with Sidekiq? Are you using something else?&lt;/p&gt;

</description>
      <category>api</category>
      <category>rails</category>
      <category>ruby</category>
      <category>development</category>
    </item>
  </channel>
</rss>
