<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>Forem: Anthony Accomazzo</title>
    <description>The latest articles on Forem by Anthony Accomazzo (@acco).</description>
    <link>https://forem.com/acco</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F541290%2F2cd6a15a-715e-4dcc-aae1-fe715bf013a8.jpeg</url>
      <title>Forem: Anthony Accomazzo</title>
      <link>https://forem.com/acco</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://forem.com/feed/acco"/>
    <language>en</language>
    <item>
      <title>All the ways to react to changes in Supabase</title>
      <dc:creator>Anthony Accomazzo</dc:creator>
      <pubDate>Mon, 09 Sep 2024 22:01:09 +0000</pubDate>
      <link>https://forem.com/acco/all-the-ways-to-react-to-changes-in-supabase-ndb</link>
      <guid>https://forem.com/acco/all-the-ways-to-react-to-changes-in-supabase-ndb</guid>
      <description>&lt;p&gt;&lt;a href="https://supabase.com/" rel="noopener noreferrer"&gt;Supabase&lt;/a&gt; makes it easy for your frontend to react to changes in your database via its Realtime feature. But outside the frontend, there's lots of reasons your application might want to react to changes in your database. You might need to trigger side effects, like sending users an email, alerting admins about a change, or invalidating a cache. Or you might need to capture a log of changes for compliance or debugging purposes.&lt;/p&gt;

&lt;p&gt;Realtime is just one way to respond to changes in your Supabase project. In this post, we'll explore the options available. Hopefully I can help you choose the right solution for your needs.&lt;/p&gt;

&lt;h2&gt;
  
  
  Database triggers
&lt;/h2&gt;

&lt;p&gt;Postgres triggers execute a specified function in response to certain database events.&lt;/p&gt;

&lt;p&gt;Postgres triggers are a part of the lifecycle of rows. You can write functions in PL/pgSQL and have Postgres invoke them whenever a row is inserted, updated, or deleted. They're a powerful way to chain database changes together.&lt;/p&gt;

&lt;p&gt;For example, here's a trigger that maintains a search index (&lt;code&gt;search_index&lt;/code&gt;) whenever an &lt;code&gt;article&lt;/code&gt; is changed:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;create&lt;/span&gt; &lt;span class="k"&gt;or&lt;/span&gt; &lt;span class="k"&gt;replace&lt;/span&gt; &lt;span class="k"&gt;function&lt;/span&gt; &lt;span class="n"&gt;update_search_index&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="k"&gt;returns&lt;/span&gt; &lt;span class="k"&gt;trigger&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="err"&gt;$$&lt;/span&gt;
&lt;span class="k"&gt;begin&lt;/span&gt;
    &lt;span class="n"&gt;if&lt;/span&gt; &lt;span class="n"&gt;tg_op&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'insert'&lt;/span&gt; &lt;span class="k"&gt;or&lt;/span&gt; &lt;span class="n"&gt;tg_op&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'update'&lt;/span&gt; &lt;span class="k"&gt;then&lt;/span&gt;
        &lt;span class="k"&gt;insert&lt;/span&gt; &lt;span class="k"&gt;into&lt;/span&gt; &lt;span class="n"&gt;search_index&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;record_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;content&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="k"&gt;values&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;new&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;to_tsvector&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'english'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;title&lt;/span&gt; &lt;span class="o"&gt;||&lt;/span&gt; &lt;span class="s1"&gt;' '&lt;/span&gt; &lt;span class="o"&gt;||&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;body&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;
        &lt;span class="k"&gt;on&lt;/span&gt; &lt;span class="n"&gt;conflict&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;record_id&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt; &lt;span class="k"&gt;update&lt;/span&gt;
        &lt;span class="k"&gt;set&lt;/span&gt; &lt;span class="n"&gt;content&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;to_tsvector&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'english'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;title&lt;/span&gt; &lt;span class="o"&gt;||&lt;/span&gt; &lt;span class="s1"&gt;' '&lt;/span&gt; &lt;span class="o"&gt;||&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;body&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
    &lt;span class="n"&gt;elsif&lt;/span&gt; &lt;span class="n"&gt;tg_op&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'delete'&lt;/span&gt; &lt;span class="k"&gt;then&lt;/span&gt;
        &lt;span class="k"&gt;delete&lt;/span&gt; &lt;span class="k"&gt;from&lt;/span&gt; &lt;span class="n"&gt;search_index&lt;/span&gt; &lt;span class="k"&gt;where&lt;/span&gt; &lt;span class="n"&gt;record_id&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;old&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;id&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="k"&gt;end&lt;/span&gt; &lt;span class="n"&gt;if&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="k"&gt;null&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="k"&gt;end&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="err"&gt;$$&lt;/span&gt; &lt;span class="k"&gt;language&lt;/span&gt; &lt;span class="n"&gt;plpgsql&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

&lt;span class="k"&gt;create&lt;/span&gt; &lt;span class="k"&gt;trigger&lt;/span&gt; &lt;span class="n"&gt;maintain_search_index&lt;/span&gt;
&lt;span class="k"&gt;after&lt;/span&gt; &lt;span class="k"&gt;insert&lt;/span&gt; &lt;span class="k"&gt;or&lt;/span&gt; &lt;span class="k"&gt;update&lt;/span&gt; &lt;span class="k"&gt;or&lt;/span&gt; &lt;span class="k"&gt;delete&lt;/span&gt; &lt;span class="k"&gt;on&lt;/span&gt; &lt;span class="n"&gt;articles&lt;/span&gt;
&lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="k"&gt;each&lt;/span&gt; &lt;span class="k"&gt;row&lt;/span&gt; &lt;span class="k"&gt;execute&lt;/span&gt; &lt;span class="k"&gt;function&lt;/span&gt; &lt;span class="n"&gt;update_search_index&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;What's really neat about triggers is that they run in the same transaction as the change they're triggering off of. So, if a trigger fails to execute, the whole transaction is rolled back. That can give your system some great guarantees. (Said another way, triggers give you "exactly-once processing".)&lt;/p&gt;

&lt;p&gt;This means triggers are great for:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Maintaining derived tables (like search indexes)&lt;/li&gt;
&lt;li&gt;Populating column defaults (where &lt;code&gt;default&lt;/code&gt; doesn't cut it)&lt;/li&gt;
&lt;li&gt;Creating audit logs of changes&lt;/li&gt;
&lt;li&gt;Enforcing business rules&lt;/li&gt;
&lt;/ul&gt;

&lt;h3&gt;
  
  
  When are triggers not a good fit?
&lt;/h3&gt;

&lt;p&gt;What makes triggers great also makes them weak for certain use cases.&lt;/p&gt;

&lt;p&gt;First, they can impact database performance. A great way to eke out more performance from Postgres is to batch operations. But triggers are executed one-by-one, row-by-row, sometimes blunting the benefits of batching.&lt;/p&gt;

&lt;p&gt;If you're not careful, one insert can lead to a cascade of changes across your database. Naturally, the more tables Postgres has to visit to make your insert possible, the slower those inserts will become.&lt;/p&gt;

&lt;p&gt;Second, Postgres triggers are relatively easy to write thanks to tools like Claude Sonnet. But they're hard to test and debug. PL/pgSQL isn't the most ergonomic language, and triggers aren't the most ergonomic runtime. With some database clients, one of the only tools for debugging is sprinkling &lt;code&gt;raise exception 'here!'&lt;/code&gt; throughout your codebase. This can be a headache.&lt;/p&gt;

&lt;p&gt;Third, and perhaps most obviously, Postgres triggers are limited to your database runtime. They can only interact with the tables in your database.&lt;/p&gt;

&lt;p&gt;Unless...&lt;/p&gt;

&lt;h2&gt;
  
  
  Database Webhooks
&lt;/h2&gt;

&lt;p&gt;&lt;a href="https://supabase.com/docs/guides/database/webhooks" rel="noopener noreferrer"&gt;Database Webhooks&lt;/a&gt; in Supabase allow your database to interface with the outside world. With the &lt;code&gt;pg_net&lt;/code&gt; extension, you can trigger HTTP requests to external services when database changes occur. The &lt;code&gt;pg_net&lt;/code&gt; extension is asynchronous, which means database changes will not be blocked during long-running requests.&lt;/p&gt;

&lt;p&gt;Here's an example of a Database Webhook that fires whenever a row is inserted or updated into the &lt;code&gt;orders&lt;/code&gt; table:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;create&lt;/span&gt; &lt;span class="k"&gt;or&lt;/span&gt; &lt;span class="k"&gt;replace&lt;/span&gt; &lt;span class="k"&gt;function&lt;/span&gt; &lt;span class="n"&gt;post_inserted_order&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="k"&gt;returns&lt;/span&gt; &lt;span class="k"&gt;trigger&lt;/span&gt; &lt;span class="k"&gt;language&lt;/span&gt; &lt;span class="n"&gt;plpgsql&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="err"&gt;$$&lt;/span&gt;
&lt;span class="k"&gt;begin&lt;/span&gt;
    &lt;span class="c1"&gt;-- calls net.http_post function&lt;/span&gt;
    &lt;span class="c1"&gt;-- sends request to webhook.site&lt;/span&gt;
    &lt;span class="n"&gt;perform&lt;/span&gt; &lt;span class="n"&gt;net&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;http_post&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="s1"&gt;'https://api.example.com/my/webhook/handler'&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nb"&gt;text&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;jsonb_build_object&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
            &lt;span class="s1"&gt;'id'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="s1"&gt;'name'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;name&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="s1"&gt;'user_id'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;user_id&lt;/span&gt;
        &lt;span class="p"&gt;),&lt;/span&gt;
        &lt;span class="s1"&gt;'{"Content-Type": "application/json", "Authorization": "Bearer my_secret"}'&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="n"&gt;jsonb&lt;/span&gt;
    &lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;request_id&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="k"&gt;end&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="err"&gt;$$&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

&lt;span class="k"&gt;create&lt;/span&gt; &lt;span class="k"&gt;trigger&lt;/span&gt; &lt;span class="n"&gt;inserted_order_webhook&lt;/span&gt;
&lt;span class="k"&gt;after&lt;/span&gt; &lt;span class="k"&gt;insert&lt;/span&gt; &lt;span class="k"&gt;or&lt;/span&gt; &lt;span class="k"&gt;update&lt;/span&gt; &lt;span class="k"&gt;on&lt;/span&gt; &lt;span class="k"&gt;public&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;orders&lt;/span&gt;
&lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="k"&gt;each&lt;/span&gt; &lt;span class="k"&gt;row&lt;/span&gt; &lt;span class="k"&gt;execute&lt;/span&gt; &lt;span class="k"&gt;function&lt;/span&gt; &lt;span class="n"&gt;post_inserted_order&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Database Webhooks make Postgres triggers a whole lot more powerful. You can send webhooks directly to workflow tools or to non-JS services in your stack. You can use them to trigger serverless functions, like Supabase Edge Functions.&lt;/p&gt;

&lt;p&gt;You can use Database Webhooks to move complex triggers from PL/pgSQL to your application code. For example, you could use a Database Webhook to notify your app of a recently placed order. Then your app could run the series of follow-up SQL queries to modify other tables as necessary. While you &lt;em&gt;could&lt;/em&gt; do this with a plain database trigger, this lets you write code in your domain language – where you can easily unit test, debug, etc.&lt;/p&gt;

&lt;h3&gt;
  
  
  When are Database Webhooks not a good fit?
&lt;/h3&gt;

&lt;p&gt;While Database Webhooks allow you to move more business logic into your application code, the setup process will still take some trial and error. I recommend getting your requests to work first by running them directly in Supabase's SQL Editor (e.g. run &lt;code&gt;perform net.http_post&lt;/code&gt;...) Then, once you're confident that you're shaping your requests the right way, you can embed the call into your Postgres trigger.&lt;/p&gt;

&lt;p&gt;Second, unlike Postgres triggers, &lt;code&gt;pg_net&lt;/code&gt; calls are async. This is good, because it means there's little performance overhead. But bad because &lt;code&gt;pg_net&lt;/code&gt; offers at-most-once delivery. That means if something goes wrong or your webhook endpoint is down, the notification might get lost for good. Supabase will store the error in a dedicated table for 6 hours, but won't automatically retry the webhook.&lt;/p&gt;

&lt;p&gt;Third, there are &lt;a href="https://github.com/supabase/pg_net/issues/86" rel="noopener noreferrer"&gt;some reports&lt;/a&gt; of &lt;code&gt;pg_net&lt;/code&gt; failing to make requests after your database transaction volume surpasses a certain threshold.&lt;/p&gt;

&lt;p&gt;So, while Database Webhooks expand the possibilities of triggers, they're not a replacement. You'll want to continue to use triggers for those critical in-database workflows where you 100% can't miss a change.&lt;/p&gt;

&lt;h2&gt;
  
  
  Realtime Subscriptions
&lt;/h2&gt;

&lt;p&gt;Realtime is Supabase's flagship feature for reacting to database changes. It allows both client and server applications to subscribe to changes in your database tables and receive updates in real-time.&lt;/p&gt;

&lt;p&gt;First, be sure to turn Realtime on for your table:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fwg17mgi2qgqx7keei8ax.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fwg17mgi2qgqx7keei8ax.png" alt="A dialog modal, turning on realtime for a table" width="400" height="267"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Then, you can create subscriptions. Here, we specify a subscription for &lt;code&gt;INSERT&lt;/code&gt; operations on &lt;code&gt;orders&lt;/code&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight javascript"&gt;&lt;code&gt;&lt;span class="c1"&gt;// Listen to inserts&lt;/span&gt;
&lt;span class="nx"&gt;supabase&lt;/span&gt;
  &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;channel&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;default&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
  &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;on&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;postgres_changes&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="na"&gt;event&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;INSERT&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="na"&gt;schema&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;public&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="na"&gt;table&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;orders&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt; &lt;span class="p"&gt;},&lt;/span&gt;
    &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;payload&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
      &lt;span class="nx"&gt;console&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;log&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;Received change&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nx"&gt;payload&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
  &lt;span class="p"&gt;)&lt;/span&gt;
  &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;subscribe&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Realtime's easy to use and you can use the same client interface on both the frontend and the backend.&lt;/p&gt;

&lt;p&gt;Unlike Database Webhooks, Realtime is a pub/sub system. You can use it to broadcast changes to many clients, which is great for building reactive interfaces. And clients can even broadcast their own messages, making Realtime a powerful tool for building collaborative features.&lt;/p&gt;

&lt;p&gt;Compared to Database Webhooks, I find Realtime a bit easier to work with, in part because it's well supported in Supabase's console and JavaScript client.&lt;/p&gt;

&lt;h3&gt;
  
  
  When is Realtime not a good fit?
&lt;/h3&gt;

&lt;p&gt;Like Database Webhooks, messages have at-most-once delivery guarantees. So it's not a good fit when you absolutely need to react to a change. You need to be comfortable with the fact that messages will be dropped (for example, your Node app wasn't subscribed).&lt;/p&gt;

&lt;p&gt;While you can use Database Webhooks to trigger side effects and async workers, that may not be the best use case for Realtime. With webhooks, you know your request was routed to at most one worker, and so only one worker will field your request. But with broadcast systems like Realtime, multiple workers might pick it up. So if you wanted to use Realtime to, say, send an email, that could result in some undesirable situations: multiple workers hear about the request and send out an email. (You can try to mitigate with private channels, but how do you mutex message handlers on deploys?)&lt;/p&gt;

&lt;h2&gt;
  
  
  Listen/Notify
&lt;/h2&gt;

&lt;p&gt;Postgres' built-in pub/sub mechanism, Listen/Notify, is a simple way to broadcast events:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="c1"&gt;-- in one session&lt;/span&gt;
&lt;span class="k"&gt;listen&lt;/span&gt; &lt;span class="n"&gt;my_channel&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

&lt;span class="c1"&gt;-- in another session&lt;/span&gt;
&lt;span class="k"&gt;notify&lt;/span&gt; &lt;span class="n"&gt;my_channel&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s1"&gt;'something happened!'&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;You can call &lt;code&gt;NOTIFY&lt;/code&gt; within trigger functions to alert listeners of changes.&lt;/p&gt;

&lt;p&gt;However, I don't think it's the best fit for Supabase projects. First, Listen/Notify doesn't work with the Supabase JS client and doesn't work with Supabase cloud's connection pooler. But more important, everything that Listen/Notify can do, Realtime can do better.&lt;/p&gt;

&lt;p&gt;We felt there was a gap in the option space for Supabase, which is why we built &lt;a href="https://github.com/sequinstream/sequin" rel="noopener noreferrer"&gt;Sequin&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;Sequin is an open source tool that pairs nicely with Supabase. Unlike Database Webhooks or Realtime broadcasts, Sequin is designed to deliver messages with exactly-once processing guarantees. After connecting Sequin to your Supabase database, you select which table you want to monitor and filter down to which changes you care about:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fhqc3f9w61z5dni55zicc.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fhqc3f9w61z5dni55zicc.png" alt="Sequin console, setting up consumer, indicating which table, operations, and WHERE filters you want on changes" width="800" height="1139"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;You then tell Sequin where to send change event webhooks:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fhnrx4cvbdr39evv541sg.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fhnrx4cvbdr39evv541sg.png" alt="Configuring an HTTP endpoint for Sequin to send events to, complete with base url, path, and headers" width="800" height="522"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Unlike Database Webhooks, if your servers are down or your functions return errors, Sequin will keep retrying the message (with backoffs). So you get retries, replays, and a great debugger experience.&lt;/p&gt;

&lt;p&gt;Sequin comes as a &lt;a href="https://sequinstream.com/" rel="noopener noreferrer"&gt;cloud offering&lt;/a&gt; so it's easy to get up and running.&lt;/p&gt;

&lt;h3&gt;
  
  
  When is Sequin not a good fit?
&lt;/h3&gt;

&lt;p&gt;For really simple use cases that a 5-line PL/pgSQL trigger can handle, Sequin is probably too heavyweight. Same if your Database Webhook is fire-and-forget – you won't need Sequin's delivery guarantees.&lt;/p&gt;

&lt;p&gt;Sequin's also not a good fit for pub/sub use cases like Realtime. Because Sequin offers exactly-once processing, it only delivers messages to a single worker at a time.&lt;/p&gt;

&lt;h2&gt;
  
  
  Choosing the right approach
&lt;/h2&gt;

&lt;p&gt;To recap, here's when you might consider each approach:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Triggers&lt;/strong&gt; are great for maintaining order and consistency in your database. Ideally, your triggers are not too complicated and you don't have too many of them. If your table has high write volume, be mindful of them.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Database Webhooks&lt;/strong&gt; are good for quick fire-and-forget use cases. Things like POST'ing a Slack notification for your team or sending an analytic event.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Realtime&lt;/strong&gt; can help you build a differentiating client experience. You can use it to build a reactive client that updates immediately when data changes in the database. Or power features like presence in collaborative editing tools. You can also use Realtime where you might otherwise reach for a pub/sub system like Simple Notification Service (SNS) to broadcast events to backend services that you're OK with missing some events.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Sequin&lt;/strong&gt; lets you write robust event-driven workflows with exactly-once processing guarantees. It's a more powerful and easier to work with version of Database Webhooks. It's great for critical workflows like sending emails, updating user data in your CRM, invalidating caches, and syncing data. You can even use Sequin in place of a queuing system like SQS or Kafka.&lt;/p&gt;

</description>
      <category>postgres</category>
      <category>webdev</category>
      <category>supabase</category>
    </item>
    <item>
      <title>What was that commit? Searching GitHub with OpenAI embeddings</title>
      <dc:creator>Anthony Accomazzo</dc:creator>
      <pubDate>Fri, 20 Oct 2023 22:52:47 +0000</pubDate>
      <link>https://forem.com/acco/what-was-that-commit-searching-github-with-openai-embeddings-5hnm</link>
      <guid>https://forem.com/acco/what-was-that-commit-searching-github-with-openai-embeddings-5hnm</guid>
      <description>&lt;p&gt;We ran into a situation the other day that was all too familiar: we needed to write some code that I &lt;em&gt;knew&lt;/em&gt; we’ve written before. We wanted to serialize and deserialize an Elixir struct into a Postgres &lt;code&gt;jsonb&lt;/code&gt; column. Although we’d solved this before, the module had long been deleted, so it was lingering somewhere in our git history.&lt;/p&gt;

&lt;p&gt;We didn’t remember what the module was called, or any other identifying details about the implementation or the commit.&lt;/p&gt;

&lt;p&gt;After scraping my mind and scraping through &lt;code&gt;git reflog&lt;/code&gt;, we eventually found it. But we realized that simple text search through our git history was too limiting.&lt;/p&gt;

&lt;p&gt;It dawned on us that we wanted to perform not a &lt;em&gt;literal string&lt;/em&gt; search but a &lt;em&gt;semantic&lt;/em&gt; search.&lt;/p&gt;

&lt;p&gt;This seemed like the kind of problem that embeddings were designed to solve. So, we set out to build the tool.&lt;/p&gt;

&lt;h3&gt;
  
  
  Embeddings
&lt;/h3&gt;

&lt;p&gt;An &lt;em&gt;&lt;a href="https://developers.google.com/machine-learning/crash-course/embeddings/video-lecture"&gt;embedding&lt;/a&gt;&lt;/em&gt; is a vector representation of data. A vector representation is a series of floats, like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="nt"&gt;-0&lt;/span&gt;.016741209, 0.019078454, 0.017176045, &lt;span class="nt"&gt;-0&lt;/span&gt;.028046958, ...]
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Embeddings help capture the relatedness of text, images, video, or other data. With that relatedness, you can search, cluster, and classify.&lt;/p&gt;

&lt;p&gt;For example, you can generate embeddings for the two strings “I committed my changes to GitHub” and “I pushed the commit to remote.” A literal text comparison would find few substring matches between the two. But an embeddings-powered similarity comparison would rank very high – the two sentences are very related, as they describe practically the same activity.&lt;/p&gt;

&lt;p&gt;In contrast, “I’m committed to remote” has many of the same words. But it would rank as not very related. The words “commit” and “remote” are referring to completely different things!&lt;/p&gt;

&lt;h3&gt;
  
  
  How to create embeddings?
&lt;/h3&gt;

&lt;p&gt;There are lots of ways to create embeddings. The easiest solution is to rely on a third-party vendor like &lt;a href="https://platform.openai.com/docs/guides/embeddings/what-are-embeddings"&gt;OpenAI&lt;/a&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;curl https://api.openai.com/v1/embeddings &lt;span class="se"&gt;\&lt;/span&gt;
  &lt;span class="nt"&gt;-H&lt;/span&gt; &lt;span class="s2"&gt;"Content-Type: application/json"&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
  &lt;span class="nt"&gt;-H&lt;/span&gt; &lt;span class="s2"&gt;"Authorization: Bearer &lt;/span&gt;&lt;span class="nv"&gt;$OPENAI_API_KEY&lt;/span&gt;&lt;span class="s2"&gt;"&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
  &lt;span class="nt"&gt;-d&lt;/span&gt; &lt;span class="s1"&gt;'{
    "input": "Your text string goes here",
    "model": "text-embedding-ada-002"
  }'&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;OpenAI accepts batches of embeddings too, where &lt;code&gt;input&lt;/code&gt; is set to an array of strings.&lt;/p&gt;

&lt;h3&gt;
  
  
  Workflow
&lt;/h3&gt;

&lt;p&gt;In order to power a GitHub search tool, first we needed embeddings for all our GitHub data. This means creating string representations of each object and retrieving embeddings via OpenAI’s API.&lt;/p&gt;

&lt;p&gt;For example, for Pull Requests, we just concatenated the &lt;code&gt;title&lt;/code&gt; and &lt;code&gt;body&lt;/code&gt; fields to make the string for embeddings. For commits, we only needed the &lt;code&gt;commit&lt;/code&gt; message.&lt;/p&gt;

&lt;p&gt;Then, to search across these embeddings, the user will type in a search query. We’d convert &lt;em&gt;the search query&lt;/em&gt; into an embedding. Then, with both the search query and GitHub objects represented as embeddings, we can perform our similarity search.&lt;/p&gt;

&lt;h3&gt;
  
  
  Using Postgres
&lt;/h3&gt;

&lt;p&gt;When generating GitHub embeddings, we need to store them somewhere. This is what a vector database is designed to do: be a repository for your embedding vectors and allow you to perform efficient queries with them.&lt;/p&gt;

&lt;p&gt;Fortunately, Postgres has a vector extension, &lt;code&gt;pgvector&lt;/code&gt;. This is great because it means you don’t have to add an entirely new data store to your stack. With &lt;code&gt;pgvector&lt;/code&gt;, Postgres can work with vector data like embeddings, and it’s performant enough for plenty of workflows like ours.&lt;/p&gt;

&lt;p&gt;To add &lt;code&gt;pgvector&lt;/code&gt; to your database, you just need to run a single command &lt;sup id="fnref1"&gt;1&lt;/sup&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;create&lt;/span&gt; &lt;span class="n"&gt;extension&lt;/span&gt; &lt;span class="n"&gt;vector&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;To build our solution, we knew that we needed to both generate embeddings for all &lt;em&gt;current&lt;/em&gt; GitHub data as well as dynamically generate embeddings in the future for all &lt;em&gt;new&lt;/em&gt; GitHub data. i.e. we’d need to run some kind of backfill to generate embeddings for all current Pull Requests and Issues. And then setup a process to monitor inserts and updates for these objects to ensure the embeddings are kept up-to-date.&lt;/p&gt;

&lt;p&gt;Using Sequin, we pulled all GitHub objects into Postgres. So Pull Requests in GitHub → &lt;code&gt;pull_requests&lt;/code&gt; in our database and Issues → &lt;code&gt;issues&lt;/code&gt;. We could then run a one-off process that paginated through the table like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;select&lt;/span&gt; &lt;span class="n"&gt;id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;body&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;title&lt;/span&gt; &lt;span class="k"&gt;from&lt;/span&gt; &lt;span class="n"&gt;github&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;pull_request&lt;/span&gt; &lt;span class="k"&gt;order&lt;/span&gt; &lt;span class="k"&gt;by&lt;/span&gt; &lt;span class="n"&gt;id&lt;/span&gt; &lt;span class="k"&gt;asc&lt;/span&gt; &lt;span class="k"&gt;limit&lt;/span&gt; &lt;span class="mi"&gt;1000&lt;/span&gt; &lt;span class="k"&gt;offset&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="k"&gt;offset&lt;/span&gt;&lt;span class="p"&gt;};&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Then, for each batch of records, we fetched embeddings with an API request to OpenAI. We decided to store embeddings in a separate table, like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;create table github_embedding.commit (
  id text references github.commit(id) on delete cascade,
  embedding vector(1536) not null
)
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Batch jobs like this work fine for backfilling data. We knew we could get away with running the task once a day to generate embeddings for new or updated records.&lt;/p&gt;

&lt;p&gt;But we wanted our search tool to work with the freshest data possible. We didn’t want to have a big time delay between activity in GitHub and results in the search tool.&lt;/p&gt;

&lt;h2&gt;
  
  
  Generating embeddings on insert or update
&lt;/h2&gt;

&lt;p&gt;In order to generate embeddings for GitHub objects whenever they were created or updated, we needed a way to find out about these events.&lt;/p&gt;

&lt;p&gt;In situations like this, developers often consider Postgres' &lt;a href="https://www.postgresql.org/docs/current/sql-notify.html"&gt;listen/notify protocol&lt;/a&gt;. It's fast to get started with and works great. But, &lt;a href="https://blog.sequin.io/all-the-ways-to-capture-changes-in-postgres/"&gt;notify events are ephemeral&lt;/a&gt;, so delivery is at-most-once. That means there's a risk of missing notifications, and therefore of there being holes in your data.&lt;/p&gt;

&lt;p&gt;The other option was to use Sequin’s &lt;a href="https://docs.sequin.io/events"&gt;events&lt;/a&gt;. Along with a sync to Postgres, Sequin provides an event stream. Sequin will publish events to a serverless Kafka stream associated with your sync. Sequin will publish events like “GitHub Pull Request deleted” or “GtiHub Commit upserted.”&lt;/p&gt;

&lt;p&gt;You don’t have to use Kafka to interface with the event stream. There are options to use a simple HTTP interface or to have events POST’d to an endpoint you choose (webhooks).&lt;/p&gt;

&lt;p&gt;Events contain the ID and collection of the affected record, as well as the payload of the record itself:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"collection"&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"pull_request"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"id"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s2"&gt;"079013db-8b17-44cd-8528-f5e68fc61333"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"data"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="err"&gt;“activity_date”:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="err"&gt;“&lt;/span&gt;&lt;span class="mi"&gt;2023-09-12&lt;/span&gt;&lt;span class="err"&gt;”&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"title"&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"Add GitHub embeddings [ … ] "&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="err"&gt;//&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="err"&gt;…&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
 &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;To make events work, we just needed to setup an event listener. That event listener implements a callback function. It derives a string value from the record by concatenating and stringifying fields. Then, it makes a request to OpenAI to get the embedding. Finally, it upserts the embedding into the database:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight elixir"&gt;&lt;code&gt;&lt;span class="nv"&gt;@impl&lt;/span&gt; &lt;span class="no"&gt;true&lt;/span&gt;
&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="n"&gt;handle_message&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;state&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt;
  &lt;span class="n"&gt;event&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="no"&gt;Jason&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;decode!&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;body&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

 &lt;span class="p"&gt;%{&lt;/span&gt; &lt;span class="err"&gt;“&lt;/span&gt;&lt;span class="n"&gt;id&lt;/span&gt;&lt;span class="err"&gt;”&lt;/span&gt; &lt;span class="o"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="err"&gt;“&lt;/span&gt;&lt;span class="n"&gt;collection&lt;/span&gt;&lt;span class="err"&gt;”&lt;/span&gt; &lt;span class="o"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;collection&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;event&lt;/span&gt;

  &lt;span class="n"&gt;body&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;%{&lt;/span&gt;
    &lt;span class="ss"&gt;input:&lt;/span&gt; &lt;span class="n"&gt;get_embedding_input&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;collection&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;id&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="ss"&gt;model:&lt;/span&gt; &lt;span class="s2"&gt;"text-embedding-ada-002"&lt;/span&gt;
  &lt;span class="p"&gt;}&lt;/span&gt;

  &lt;span class="n"&gt;req&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt;
    &lt;span class="no"&gt;Req&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;new&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
      &lt;span class="ss"&gt;url:&lt;/span&gt; &lt;span class="s2"&gt;"https://api.openai.com/v1/embeddings"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
      &lt;span class="ss"&gt;headers:&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;
        &lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="s2"&gt;"Content-Type"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s2"&gt;"application/json"&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;
        &lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="s2"&gt;"Authorization"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s2"&gt;"Bearer &amp;lt;&amp;lt;secret&amp;gt;&amp;gt;"&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;
      &lt;span class="p"&gt;],&lt;/span&gt;
      &lt;span class="ss"&gt;json:&lt;/span&gt; &lt;span class="n"&gt;body&lt;/span&gt;
    &lt;span class="p"&gt;)&lt;/span&gt;

  &lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="ss"&gt;:ok&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;resp&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="no"&gt;Req&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;post&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;req&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
  &lt;span class="p"&gt;%{&lt;/span&gt; &lt;span class="err"&gt;“&lt;/span&gt;&lt;span class="n"&gt;data&lt;/span&gt;&lt;span class="err"&gt;”&lt;/span&gt; &lt;span class="o"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="p"&gt;[%{&lt;/span&gt; &lt;span class="err"&gt;“&lt;/span&gt;&lt;span class="n"&gt;embedding&lt;/span&gt;&lt;span class="err"&gt;”&lt;/span&gt; &lt;span class="o"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;embedding&lt;/span&gt; &lt;span class="p"&gt;}]&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;resp&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;body&lt;/span&gt;

  &lt;span class="n"&gt;upsert_embedding&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;collection&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;embedding&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

  &lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="ss"&gt;:ack&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;state&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="k"&gt;end&lt;/span&gt;

&lt;span class="k"&gt;defp&lt;/span&gt; &lt;span class="n"&gt;get_embedding_input&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="err"&gt;“&lt;/span&gt;&lt;span class="n"&gt;pull_request&lt;/span&gt;&lt;span class="err"&gt;”&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;id&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt;
  &lt;span class="no"&gt;GitHub&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="no"&gt;PullRequest&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;get!&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="ss"&gt;select:&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="ss"&gt;:title&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="ss"&gt;:body&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt;
  &lt;span class="o"&gt;|&amp;gt;&lt;/span&gt; &lt;span class="no"&gt;Map&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;take&lt;/span&gt;&lt;span class="p"&gt;([&lt;/span&gt;&lt;span class="ss"&gt;:title&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="ss"&gt;:body&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt;
  &lt;span class="o"&gt;|&amp;gt;&lt;/span&gt; &lt;span class="no"&gt;Enum&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;join&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="err"&gt;“&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt; &lt;span class="err"&gt;“&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="no"&gt;End&lt;/span&gt;

&lt;span class="k"&gt;defp&lt;/span&gt; &lt;span class="n"&gt;upsert_embedding&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="err"&gt;“&lt;/span&gt;&lt;span class="n"&gt;pull_request&lt;/span&gt;&lt;span class="err"&gt;”&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;embedding&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt;
    &lt;span class="p"&gt;%&lt;/span&gt;&lt;span class="no"&gt;GitHub&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="no"&gt;PullRequest&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="ss"&gt;id:&lt;/span&gt; &lt;span class="n"&gt;id&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="o"&gt;|&amp;gt;&lt;/span&gt; &lt;span class="no"&gt;GitHub&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="no"&gt;PullRequest&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;changeset&lt;/span&gt;&lt;span class="p"&gt;(%{&lt;/span&gt; &lt;span class="ss"&gt;embedding:&lt;/span&gt; &lt;span class="n"&gt;embedding&lt;/span&gt; &lt;span class="p"&gt;})&lt;/span&gt;
    &lt;span class="o"&gt;|&amp;gt;&lt;/span&gt; &lt;span class="no"&gt;MyApp&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="no"&gt;Repo&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;insert!&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="ss"&gt;on_conflict:&lt;/span&gt; &lt;span class="ss"&gt;:replace_all&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="ss"&gt;conflict_target:&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="ss"&gt;:id&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt;
&lt;span class="k"&gt;end&lt;/span&gt;

&lt;span class="c1"&gt;# handle other collection types here&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;With the backfill done and an event handler in place, we now had up-to-date database tables with GitHub embeddings. With that foundation in place, we were ready to build our tool!&lt;/p&gt;

&lt;h2&gt;
  
  
  A Postgres query for finding matches
&lt;/h2&gt;

&lt;p&gt;With your embeddings setup in Postgres, you’re ready to create a mechanism for querying them.&lt;/p&gt;

&lt;p&gt;Supabase has a &lt;a href="https://supabase.com/blog/openai-embeddings-postgres-vector"&gt;great post&lt;/a&gt; on embeddings in Postgres. I’ve adapted their similarity query below. You can use the cosine distance operator (&lt;code&gt;&amp;lt;=&amp;gt;&lt;/code&gt;) provided by pg_vector to determine similarity. Here’s a query that grabs a list of pull_requests over a &lt;code&gt;match_threshold&lt;/code&gt;, ordered by most similar to least similar:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;select&lt;/span&gt;
  &lt;span class="n"&gt;pull_request&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="n"&gt;pull_request&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;title&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="n"&gt;pull_request&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;body&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="mi"&gt;1&lt;/span&gt; &lt;span class="o"&gt;-&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;embedding_pull_request&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;embedding&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;=&amp;gt;&lt;/span&gt; &lt;span class="p"&gt;{{&lt;/span&gt;&lt;span class="n"&gt;searchEmbedding&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="p"&gt;}})&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;similarity&lt;/span&gt;
&lt;span class="k"&gt;from&lt;/span&gt; &lt;span class="n"&gt;github_sequin&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;pull_request&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;pull_request&lt;/span&gt;
&lt;span class="k"&gt;join&lt;/span&gt; &lt;span class="n"&gt;github_embedding_sequin&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;pull_request&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;embedding_pull_request&lt;/span&gt; &lt;span class="k"&gt;on&lt;/span&gt; &lt;span class="n"&gt;pull_request&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;id&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;embedding_pull_request&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;id&lt;/span&gt;
&lt;span class="c1"&gt;-- match threshold set to 0.75, you can change it&lt;/span&gt;
&lt;span class="k"&gt;where&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt; &lt;span class="o"&gt;-&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;embedding_pull_request&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;embedding&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;=&amp;gt;&lt;/span&gt; &lt;span class="p"&gt;{{&lt;/span&gt;&lt;span class="n"&gt;searchEmbedding&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="p"&gt;}})&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="mi"&gt;75&lt;/span&gt;
&lt;span class="k"&gt;order&lt;/span&gt; &lt;span class="k"&gt;by&lt;/span&gt; &lt;span class="n"&gt;similarity&lt;/span&gt; &lt;span class="k"&gt;desc&lt;/span&gt;
&lt;span class="c1"&gt;-- match count set to 5, you can change it&lt;/span&gt;
&lt;span class="k"&gt;limit&lt;/span&gt; &lt;span class="mi"&gt;5&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  The search tool
&lt;/h2&gt;

&lt;p&gt;With our data model and search function squared away, we were ready to build our tool.&lt;/p&gt;

&lt;p&gt;When the user enters a query, we first convert their search query into an embedding using OpenAI. Then, we use the SQL query above to find the GitHub objects that are the closest match.&lt;/p&gt;

&lt;p&gt;Below is a simple example of this tool. Here’s a demonstration of a search for Pull Requests that mention “serialize and deserialize structs into jsonb ecto”:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--rJDbGKj7--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://blog.sequin.io/content/images/2023/10/image1-1.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--rJDbGKj7--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://blog.sequin.io/content/images/2023/10/image1-1.png" alt="image1.png" width="800" height="380"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;On the left, we see the list of the top 5 PRs that matched, sorted by similarity descending. On the right is a preview of the PR that you selected.&lt;/p&gt;

&lt;p&gt;Note that this is not a literal string match. The search refers to the “serialize and deserialize errors,” but the PR contains serializes/deserializes. The PR also doesn’t mention &lt;code&gt;jsonb&lt;/code&gt;, just JSON.&lt;/p&gt;

&lt;p&gt;Because of embeddings, we found the exact PR we were looking for, and with only a vague idea of what we were looking for!&lt;/p&gt;

&lt;h3&gt;
  
  
  Weaknesses
&lt;/h3&gt;

&lt;p&gt;The tool is very effective when the search query has some substance to it (several words) and your PRs do as well. Naturally, if a PR or issue is very light on content, it’s harder to match.&lt;/p&gt;

&lt;p&gt;In fact, PRs or issues with very little text content can match too frequently for the wrong things. So, you may consider adding a clause that filters out GitHub objects that have fields that don’t meet some minimum required length.&lt;/p&gt;

&lt;p&gt;Remember, you’re not describing what you’re looking for. You’re writing text that you think will be a match for a description found in a PR or an issue.&lt;/p&gt;

&lt;h2&gt;
  
  
  Further exploration
&lt;/h2&gt;

&lt;p&gt;Now that we have our first workflow around embeddings, we’re starting to think up other ideas.&lt;/p&gt;

&lt;p&gt;For example, how could we expand search over commit bodies/diffs? Will embeddings work well if we’re describing the code inside of a commit (vs matching descriptions on the PRs and issues around the code)?&lt;/p&gt;

&lt;p&gt;Can we power roll-ups off this data? For example, imagine a weekly summary that describes what got committed (vs just listing PRs). Or reports like “cluster analysis” that told the team how our time broke down between fixing bugs vs shipping new features.&lt;/p&gt;




&lt;ol&gt;

&lt;li id="fn1"&gt;
&lt;p&gt;&lt;code&gt;pg_vector&lt;/code&gt; is included in most of the latest distributions of Postgres. ↩&lt;/p&gt;

&lt;p&gt;If you're on AWS RDS, be sure you upgrade to Postgres 15.2+ to get access to the &lt;code&gt;vector&lt;/code&gt; extension.&lt;/p&gt;
&lt;/li&gt;

&lt;/ol&gt;

</description>
      <category>webdev</category>
      <category>ai</category>
      <category>machinelearning</category>
    </item>
    <item>
      <title>Storing Salesforce embeddings with pgvector and OpenAI</title>
      <dc:creator>Anthony Accomazzo</dc:creator>
      <pubDate>Tue, 10 Oct 2023 17:11:59 +0000</pubDate>
      <link>https://forem.com/acco/storing-salesforce-embeddings-with-pgvector-and-openai-16kc</link>
      <guid>https://forem.com/acco/storing-salesforce-embeddings-with-pgvector-and-openai-16kc</guid>
      <description>&lt;p&gt;We use Salesforce as the hub for all customer data. We pipe notes, call transcripts, and email conversations into Salesforce.&lt;/p&gt;

&lt;p&gt;We thought it would be cool to build tooling on top of Salesforce that helped us with product roadmap and direction. We receive feedback and great ideas all the time from our customers. How could we make it easy to see suggested features? To follow-up with the right customers after shipping a request? And spot recurring themes from our customer conversations?&lt;/p&gt;

&lt;p&gt;Just building a simple search tool isn’t enough. A standard search query matches on literal words in a string. So, if we wanted to pull up all the notes where a customer requested that Sequin support an integration, we’d have to brute force with a number of search strings, like “request,” “support,” “add integration.” Not only is this tedious, but it’s unlikely to work.&lt;/p&gt;

&lt;p&gt;This is where embeddings come in.&lt;/p&gt;

&lt;h3&gt;
  
  
  Embeddings
&lt;/h3&gt;

&lt;p&gt;An &lt;em&gt;&lt;a href="https://developers.google.com/machine-learning/crash-course/embeddings/video-lecture"&gt;embedding&lt;/a&gt;&lt;/em&gt; is a vector representation of data. A vector representation is a series of floats, like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="nt"&gt;-0&lt;/span&gt;.016741209, 0.019078454, 0.017176045, &lt;span class="nt"&gt;-0&lt;/span&gt;.028046958, ...]
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Embeddings help capture the relatedness of text, images, video, or other data. With that relatedness, you can search, cluster, and classify.&lt;/p&gt;

&lt;p&gt;Embeddings enable an advanced search method of our customers notes. First, we can generate embeddings for all notes. Then, we can perform a query using another embedding. The user will type in a query, such as “want sequin to support ecommerce service.” We can take that query, turn &lt;em&gt;it&lt;/em&gt; into an embedding, and compare its relatedness to the embeddings of all the notes.&lt;/p&gt;

&lt;p&gt;To generate embeddings, you’ll want to rely on a third-party vendor. You can use APIs like &lt;a href="https://platform.openai.com/docs/guides/embeddings/what-are-embeddings"&gt;OpenAI’s&lt;/a&gt; to generate them.&lt;/p&gt;

&lt;h3&gt;
  
  
  Embeddings search tool
&lt;/h3&gt;

&lt;p&gt;In this post, I’ll show you how to build a tool that will let you do an embeddings search across your Salesforce data. This will let you search semantically instead of literally. You can use a tool like this to filter and find feedback or product suggestions.&lt;/p&gt;

&lt;p&gt;You can create embeddings on any Salesforce object, like Case, Task, Note or a custom object that your team uses. In the examples below, I’ll use popular Salesforce objects interchangeably.&lt;/p&gt;

&lt;p&gt;This post assumes you already have Salesforce &lt;a href="https://docs.sequin.io/integrations/salesforce/setup"&gt;setup with Sequin&lt;/a&gt; to sync Salesforce objects to your Postgres database. You should also have an OpenAI account setup.&lt;/p&gt;

&lt;h2&gt;
  
  
  Prepare your database
&lt;/h2&gt;

&lt;p&gt;To prepare your database, first add the &lt;code&gt;pgvector&lt;/code&gt; extension &lt;sup id="fnref1"&gt;1&lt;/sup&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;create&lt;/span&gt; &lt;span class="n"&gt;extension&lt;/span&gt; &lt;span class="n"&gt;vector&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Create a separate schema, &lt;code&gt;salesforce_embedding&lt;/code&gt;, for your embedding data &lt;sup id="fnref2"&gt;2&lt;/sup&gt;. In your queries, you’ll join your embedding tables to your Salesforce tables.&lt;/p&gt;

&lt;p&gt;Here’s an example of creating an embedding table for Salesforce tasks:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;create&lt;/span&gt; &lt;span class="k"&gt;table&lt;/span&gt; &lt;span class="n"&gt;salesforce_embedding&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;task&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
  &lt;span class="n"&gt;id&lt;/span&gt; &lt;span class="nb"&gt;text&lt;/span&gt; &lt;span class="k"&gt;references&lt;/span&gt; &lt;span class="n"&gt;salesforce&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;task&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;id&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;on&lt;/span&gt; &lt;span class="k"&gt;delete&lt;/span&gt; &lt;span class="k"&gt;cascade&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="n"&gt;embedding&lt;/span&gt; &lt;span class="n"&gt;vector&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;1536&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;not&lt;/span&gt; &lt;span class="k"&gt;null&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;In this post, I’ll show you how to use OpenAI’s &lt;code&gt;text-embedding-ada-002&lt;/code&gt; model. That model generates embeddings with 1536 dimensions, hence the &lt;code&gt;1536&lt;/code&gt; parameter above.&lt;/p&gt;

&lt;h2&gt;
  
  
  Generate embeddings on insert or update
&lt;/h2&gt;

&lt;p&gt;You'll first setup your app to generate embeddings for Salesforce records as they're inserted or updated. Then, I'll show you how to backfill embeddings for your existing records.&lt;/p&gt;

&lt;p&gt;You have two options for finding out about new or updated Salesforce objects in your database.&lt;/p&gt;

&lt;p&gt;You can use Postgres' &lt;a href="https://www.postgresql.org/docs/current/sql-notify.html"&gt;listen/notify protocol&lt;/a&gt;. It's fast to get started with and works great. But, &lt;a href="https://blog.sequin.io/all-the-ways-to-capture-changes-in-postgres/"&gt;notify events are ephemeral&lt;/a&gt;, so delivery is at-most-once. That means there's a risk of missing notifications, and therefore of there being holes in your data.&lt;/p&gt;

&lt;p&gt;Along with a sync to Postgres, Sequin provisions an &lt;a href="https://docs.sequin.io/events"&gt;event stream for you&lt;/a&gt;. Sequin will publish events to a &lt;a href="https://nats.io"&gt;NATS&lt;/a&gt; stream associated with your sync, &lt;code&gt;sequin-[sync_id]&lt;/code&gt; (e.g. &lt;code&gt;sequin-sync_1a107d79&lt;/code&gt;). You can write a function in your app that listens for these events and updates the &lt;code&gt;embedding&lt;/code&gt; column for the Salesforce object that was updated or inserted. Notably, unlike listen/notify, the NATS stream is durable so you get at-least-once delivery guarantees.&lt;/p&gt;

&lt;p&gt;The NATS team maintains client libraries in over 40 languages. Here's the skeleton for a listener for Salesforce upsert events in Elixir:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight elixir"&gt;&lt;code&gt;&lt;span class="k"&gt;defmodule&lt;/span&gt; &lt;span class="no"&gt;MyApp&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="no"&gt;Sequin&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="no"&gt;SalesforceStreamConsumer&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt;
  &lt;span class="kn"&gt;use&lt;/span&gt; &lt;span class="no"&gt;Jetstream&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="no"&gt;PullConsumer&lt;/span&gt;

  &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="n"&gt;start_link&lt;/span&gt;&lt;span class="p"&gt;([])&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt;
    &lt;span class="no"&gt;Jetstream&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="no"&gt;PullConsumer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;start_link&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="bp"&gt;__MODULE__&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="p"&gt;[])&lt;/span&gt;
  &lt;span class="k"&gt;end&lt;/span&gt;

    &lt;span class="nv"&gt;@impl&lt;/span&gt; &lt;span class="no"&gt;PullConsumer&lt;/span&gt;
  &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="n"&gt;init&lt;/span&gt;&lt;span class="p"&gt;([])&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt;
      &lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="ss"&gt;:ok&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="ss"&gt;connection_name:&lt;/span&gt; &lt;span class="ss"&gt;:gnat&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="ss"&gt;stream_name:&lt;/span&gt; &lt;span class="s2"&gt;"sequin-sync_1a107d79"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="ss"&gt;consumer_name:&lt;/span&gt; &lt;span class="s2"&gt;"my_app_sf_upserts"&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;
  &lt;span class="k"&gt;end&lt;/span&gt;

  &lt;span class="nv"&gt;@impl&lt;/span&gt; &lt;span class="no"&gt;PullConsumer&lt;/span&gt;
  &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="n"&gt;handle_message&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;state&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt;
    &lt;span class="c1"&gt;# TODO&lt;/span&gt;
    &lt;span class="c1"&gt;# event handling code goes here&lt;/span&gt;
  &lt;span class="k"&gt;end&lt;/span&gt;
&lt;span class="k"&gt;end&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;In &lt;code&gt;init/1&lt;/code&gt;, you specify the stream name as well as a name for your &lt;a href="https://docs.nats.io/nats-concepts/jetstream/consumers"&gt;consumer&lt;/a&gt;. &lt;code&gt;handle_message/2&lt;/code&gt; is the function that handles each event on the stream. In this case, that means &lt;code&gt;handle_message/2&lt;/code&gt; will be invoked every time a Salesforce object is inserted or updated.&lt;/p&gt;

&lt;p&gt;The &lt;code&gt;consumer_name&lt;/code&gt; for this module is &lt;code&gt;my_app_sf_upserts&lt;/code&gt;. I’ll show you in a moment how to register this consumer with NATS.&lt;/p&gt;

&lt;p&gt;In &lt;code&gt;handle_message/2&lt;/code&gt;, you make an API request to OpenAI. The body specifies the input for the embedding and the model to use. For the input, you’ll want to generate the embedding based on a different field or combination of fields for each embedding. So, you can implement a &lt;code&gt;get_embedding_input/2&lt;/code&gt; for each collection you care about. The following example handles one table, &lt;code&gt;task&lt;/code&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight elixir"&gt;&lt;code&gt;&lt;span class="nv"&gt;@impl&lt;/span&gt; &lt;span class="no"&gt;true&lt;/span&gt;
&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="n"&gt;handle_message&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;state&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt;
  &lt;span class="n"&gt;event&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="no"&gt;Jason&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;decode!&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;body&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

 &lt;span class="p"&gt;%{&lt;/span&gt; &lt;span class="err"&gt;“&lt;/span&gt;&lt;span class="n"&gt;id&lt;/span&gt;&lt;span class="err"&gt;”&lt;/span&gt; &lt;span class="o"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="err"&gt;“&lt;/span&gt;&lt;span class="n"&gt;collection&lt;/span&gt;&lt;span class="err"&gt;”&lt;/span&gt; &lt;span class="o"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;collection&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;event&lt;/span&gt;

  &lt;span class="n"&gt;body&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;%{&lt;/span&gt;
    &lt;span class="ss"&gt;input:&lt;/span&gt; &lt;span class="n"&gt;get_embedding_input&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;collection&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;id&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="ss"&gt;model:&lt;/span&gt; &lt;span class="s2"&gt;"text-embedding-ada-002"&lt;/span&gt;
  &lt;span class="p"&gt;}&lt;/span&gt;

  &lt;span class="n"&gt;req&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt;
    &lt;span class="no"&gt;Req&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;new&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
      &lt;span class="ss"&gt;url:&lt;/span&gt; &lt;span class="s2"&gt;"https://api.openai.com/v1/embeddings"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
      &lt;span class="ss"&gt;headers:&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;
        &lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="s2"&gt;"Content-Type"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s2"&gt;"application/json"&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;
        &lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="s2"&gt;"Authorization"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s2"&gt;"Bearer &amp;lt;&amp;lt;secret&amp;gt;&amp;gt;"&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;
      &lt;span class="p"&gt;],&lt;/span&gt;
      &lt;span class="ss"&gt;json:&lt;/span&gt; &lt;span class="n"&gt;body&lt;/span&gt;
    &lt;span class="p"&gt;)&lt;/span&gt;

  &lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="ss"&gt;:ok&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;resp&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="no"&gt;Req&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;post&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;req&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
  &lt;span class="p"&gt;%{&lt;/span&gt; &lt;span class="err"&gt;“&lt;/span&gt;&lt;span class="n"&gt;data&lt;/span&gt;&lt;span class="err"&gt;”&lt;/span&gt; &lt;span class="o"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="p"&gt;[%{&lt;/span&gt; &lt;span class="err"&gt;“&lt;/span&gt;&lt;span class="n"&gt;embedding&lt;/span&gt;&lt;span class="err"&gt;”&lt;/span&gt; &lt;span class="o"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;embedding&lt;/span&gt; &lt;span class="p"&gt;}]&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;resp&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;body&lt;/span&gt;

  &lt;span class="n"&gt;upsert_embedding&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;collection&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;embedding&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

  &lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="ss"&gt;:ack&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;state&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="k"&gt;end&lt;/span&gt;

&lt;span class="k"&gt;defp&lt;/span&gt; &lt;span class="n"&gt;get_embedding_input&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="err"&gt;“&lt;/span&gt;&lt;span class="n"&gt;task&lt;/span&gt;&lt;span class="err"&gt;”&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;id&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt;
  &lt;span class="no"&gt;Salesforce&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="no"&gt;Task&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;get!&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="ss"&gt;select:&lt;/span&gt; &lt;span class="ss"&gt;:description&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
  &lt;span class="o"&gt;|&amp;gt;&lt;/span&gt; &lt;span class="no"&gt;Map&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;fetch!&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="ss"&gt;:description&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="no"&gt;End&lt;/span&gt;

&lt;span class="k"&gt;defp&lt;/span&gt; &lt;span class="n"&gt;upsert_embedding&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="err"&gt;“&lt;/span&gt;&lt;span class="n"&gt;task&lt;/span&gt;&lt;span class="err"&gt;”&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;embedding&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt;
    &lt;span class="p"&gt;%&lt;/span&gt;&lt;span class="no"&gt;Salesforce&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="no"&gt;TaskEmbedding&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="ss"&gt;id:&lt;/span&gt; &lt;span class="n"&gt;id&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="o"&gt;|&amp;gt;&lt;/span&gt; &lt;span class="no"&gt;Salesforce&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="no"&gt;TaskEmbedding&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;changeset&lt;/span&gt;&lt;span class="p"&gt;(%{&lt;/span&gt; &lt;span class="ss"&gt;embedding:&lt;/span&gt; &lt;span class="n"&gt;embedding&lt;/span&gt; &lt;span class="p"&gt;})&lt;/span&gt;
    &lt;span class="o"&gt;|&amp;gt;&lt;/span&gt; &lt;span class="no"&gt;MyApp&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="no"&gt;Repo&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;insert!&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="ss"&gt;on_conflict:&lt;/span&gt; &lt;span class="ss"&gt;:replace_all&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="ss"&gt;conflict_target:&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="ss"&gt;:id&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt;
&lt;span class="k"&gt;end&lt;/span&gt;

&lt;span class="c1"&gt;# handle other collection types here&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;At the end of &lt;code&gt;handle_message/2&lt;/code&gt; is a call to &lt;code&gt;upset_embedding/3&lt;/code&gt; which upserts the record to the database. Shown in the example are handler functions for the Task collection. You can add whatever handler functions you need for the collections you want to have embeddings for.&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;This example does not handle issues with the OpenAI API gracefully. A more robust solution would have some error handling around that call.&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;Now, register this consumer you just wrote with your NATS event stream. You can filter on only upserted events (you don’t want your handler to be invoked for &lt;code&gt;deleted&lt;/code&gt; events):&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;nats consumer add &lt;span class="nt"&gt;--pull&lt;/span&gt; &lt;span class="nt"&gt;--deliver&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;all &lt;span class="nt"&gt;--creds&lt;/span&gt; /path/to/your.creds sequin-sync_1a107d79 ghola &lt;span class="nt"&gt;--filter&lt;/span&gt; sequin.sync_1a107d79.salesforce.&lt;span class="k"&gt;*&lt;/span&gt;.upserted
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;blockquote&gt;
&lt;p&gt;This example uses &lt;a href="https://docs.nats.io/using-nats/nats-tools/nats_cli"&gt;NATS cli&lt;/a&gt;, which is nice for one-off commands like this one.&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;With this listener deployed, when a record inserts, your consumer will populate its &lt;code&gt;embedding&lt;/code&gt; column. And when a record updates, your consumer will regenerate its &lt;code&gt;embedding&lt;/code&gt; column.&lt;/p&gt;

&lt;p&gt;The next step is to backfill all the records with &lt;code&gt;null&lt;/code&gt; values for &lt;code&gt;embedding&lt;/code&gt; in the database.&lt;/p&gt;

&lt;h2&gt;
  
  
  Backfill the &lt;code&gt;embedding&lt;/code&gt; column for existing records
&lt;/h2&gt;

&lt;p&gt;You have two primary options for backfilling the &lt;code&gt;embedding&lt;/code&gt; column:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Create a batch job&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;You can write a one-off batch job that paginates through your table and kicks off API calls to fetch the embeddings for each record.&lt;/p&gt;

&lt;p&gt;You can paginate through each table like this &lt;sup id="fnref3"&gt;3&lt;/sup&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;select&lt;/span&gt; &lt;span class="n"&gt;id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;description&lt;/span&gt; &lt;span class="k"&gt;from&lt;/span&gt; &lt;span class="n"&gt;salesforce&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;task&lt;/span&gt; &lt;span class="k"&gt;order&lt;/span&gt; &lt;span class="k"&gt;by&lt;/span&gt; &lt;span class="n"&gt;id&lt;/span&gt; &lt;span class="k"&gt;asc&lt;/span&gt; &lt;span class="k"&gt;limit&lt;/span&gt; &lt;span class="mi"&gt;1000&lt;/span&gt; &lt;span class="k"&gt;offset&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;You can send multiple strings at once to OpenAI’s embedding API. After grabbing a set of rows, here’s how you might fetch the embeddings for those records:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight elixir"&gt;&lt;code&gt;&lt;span class="k"&gt;defp&lt;/span&gt; &lt;span class="n"&gt;fetch_and_upsert_rows&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;rows&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt;
  &lt;span class="n"&gt;inputs&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="no"&gt;Enum&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;map&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;rows&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;get_embedding_input&lt;/span&gt;&lt;span class="o"&gt;/&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

  &lt;span class="n"&gt;body&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;%{&lt;/span&gt;
    &lt;span class="ss"&gt;input:&lt;/span&gt; &lt;span class="n"&gt;inputs&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="ss"&gt;model:&lt;/span&gt; &lt;span class="s2"&gt;"text-embedding-ada-002"&lt;/span&gt;
  &lt;span class="p"&gt;}&lt;/span&gt;

  &lt;span class="n"&gt;req&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt;
    &lt;span class="no"&gt;Req&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;new&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
      &lt;span class="ss"&gt;url:&lt;/span&gt; &lt;span class="s2"&gt;"https://api.openai.com/v1/embeddings"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
      &lt;span class="ss"&gt;headers:&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;
        &lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="s2"&gt;"Content-Type"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s2"&gt;"application/json"&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;
        &lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="s2"&gt;"Authorization"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s2"&gt;"Bearer &amp;lt;&amp;lt;secret&amp;gt;&amp;gt;"&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;
      &lt;span class="p"&gt;],&lt;/span&gt;
      &lt;span class="ss"&gt;json:&lt;/span&gt; &lt;span class="n"&gt;body&lt;/span&gt;
    &lt;span class="p"&gt;)&lt;/span&gt;

  &lt;span class="n"&gt;embeddings&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;req&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;body&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="err"&gt;“&lt;/span&gt;&lt;span class="n"&gt;data&lt;/span&gt;&lt;span class="err"&gt;”&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;|&amp;gt;&lt;/span&gt; &lt;span class="no"&gt;Enum&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;map&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt; &lt;span class="nv"&gt;&amp;amp;1&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="err"&gt;“&lt;/span&gt;&lt;span class="n"&gt;embedding&lt;/span&gt;&lt;span class="err"&gt;”&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt;
  &lt;span class="n"&gt;upsert_embeddings&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;rows&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;embeddings&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="k"&gt;end&lt;/span&gt;

&lt;span class="k"&gt;defp&lt;/span&gt; &lt;span class="n"&gt;get_embedding_input&lt;/span&gt;&lt;span class="p"&gt;(%&lt;/span&gt;&lt;span class="no"&gt;Salesforce&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="no"&gt;Task&lt;/span&gt;&lt;span class="p"&gt;{}&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;task&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt;
  &lt;span class="n"&gt;task&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;description&lt;/span&gt;
&lt;span class="k"&gt;end&lt;/span&gt;

&lt;span class="c1"&gt;# … write other `get_embedding_input/1` clauses&lt;/span&gt;

&lt;span class="k"&gt;defp&lt;/span&gt; &lt;span class="n"&gt;upsert_embeddings&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;rows&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;embeddings&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt;
  &lt;span class="n"&gt;records&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="no"&gt;Enum&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;zip_with&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;tasks&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;embeddings&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="k"&gt;fn&lt;/span&gt; &lt;span class="n"&gt;task&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;embedding&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt;
    &lt;span class="p"&gt;%{&lt;/span&gt;
      &lt;span class="ss"&gt;id:&lt;/span&gt; &lt;span class="n"&gt;task&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
      &lt;span class="ss"&gt;embedding:&lt;/span&gt; &lt;span class="n"&gt;embedding&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
  &lt;span class="k"&gt;end&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

  &lt;span class="no"&gt;MyApp&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="no"&gt;Repo&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;insert_all&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="no"&gt;SalesforceTaskEmbedding&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;records&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="ss"&gt;on_conflict:&lt;/span&gt; &lt;span class="ss"&gt;:replace_all&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="ss"&gt;conflict_target:&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="ss"&gt;:id&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
  &lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="k"&gt;end&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Use a Sequin job&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Alternatively, you can have Sequin do the record pagination and collection part for you. This will let you use your existing event handling code to backfill your table.&lt;/p&gt;

&lt;p&gt;You can kick-off a backfill of your events stream via the &lt;a href="https://app.sequin.io"&gt;Sequin console&lt;/a&gt;. Sequin will paginate your Postgres tables and fill your stream with events that have the same shape as the update and insert events:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="o"&gt;{&lt;/span&gt; &lt;span class="s2"&gt;"id"&lt;/span&gt;: &lt;span class="s2"&gt;"note-8hUjsk2p"&lt;/span&gt;, &lt;span class="s2"&gt;"table_name"&lt;/span&gt;: “note”, &lt;span class="o"&gt;{&lt;/span&gt; “data”: &lt;span class="o"&gt;[&lt;/span&gt; … &lt;span class="o"&gt;]&lt;/span&gt; &lt;span class="o"&gt;}&lt;/span&gt; &lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Assuming you don’t have any other consumers listening to the &lt;code&gt;sequin.sync_1a107d79.salesforce.*.upserted&lt;/code&gt; topic, you can reuse this topic for the backfill &lt;sup id="fnref4"&gt;4&lt;/sup&gt;. You can backfill each of your collections, like &lt;code&gt;task&lt;/code&gt; and &lt;code&gt;account&lt;/code&gt;.&lt;/p&gt;

&lt;h2&gt;
  
  
  Create a Postgres query for finding matches
&lt;/h2&gt;

&lt;p&gt;With your embeddings setup in Postgres, you’re ready to create a mechanism for querying them.&lt;/p&gt;

&lt;p&gt;Supabase has a &lt;a href="https://supabase.com/blog/openai-embeddings-postgres-vector"&gt;great post&lt;/a&gt; on embeddings in Postgres. I’ve adapted their similarity query below. You can use the cosine distance operator (&lt;code&gt;&amp;lt;=&amp;gt;&lt;/code&gt;) provided by pgvector to determine similarity. Here’s a query that grabs a list of tasks over a &lt;code&gt;match_threshold&lt;/code&gt;, ordered by most similar to least similar:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;select&lt;/span&gt;
  &lt;span class="n"&gt;task&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="n"&gt;task&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;content&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="mi"&gt;1&lt;/span&gt; &lt;span class="o"&gt;-&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;embedding_task&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;embedding&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;=&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;query_embedding&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;similarity&lt;/span&gt;
&lt;span class="k"&gt;from&lt;/span&gt; &lt;span class="n"&gt;salesforce&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;task&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;task&lt;/span&gt;
&lt;span class="k"&gt;join&lt;/span&gt; &lt;span class="n"&gt;salesforce_embedding&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;task&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;embedding_task&lt;/span&gt; &lt;span class="k"&gt;on&lt;/span&gt; &lt;span class="n"&gt;task&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;id&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;embedding_task&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;id&lt;/span&gt;
&lt;span class="k"&gt;where&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt; &lt;span class="o"&gt;-&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;embedding_task&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;embedding&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;=&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;query_embedding&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;match_threshold&lt;/span&gt;
&lt;span class="k"&gt;order&lt;/span&gt; &lt;span class="k"&gt;by&lt;/span&gt; &lt;span class="n"&gt;similarity&lt;/span&gt; &lt;span class="k"&gt;desc&lt;/span&gt;
&lt;span class="k"&gt;limit&lt;/span&gt; &lt;span class="n"&gt;match_count&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Build the tool
&lt;/h2&gt;

&lt;p&gt;With your data model and search function squared away, you can build your tool.&lt;/p&gt;

&lt;p&gt;When the user enters a query, you’ll first convert their search query into an embedding using OpenAI. Then, you’ll use the SQL query above to find the Salesforce objects that are the closest match.&lt;/p&gt;

&lt;p&gt;Below is a simple example of this tool. Here’s a demonstration of a search for Notes that mention a SaaS platform that a customer or prospect is hoping we add to Sequin:&lt;/p&gt;

&lt;p&gt;Note that the word “integrated” didn’t appear at all in our filter query, yet we still found a match for “interest in seeing ServiceNow integrated into Sequin…”&lt;/p&gt;

&lt;p&gt;This strategy works great for shorter text fields. But it will break down with longer call notes, Intercom conversations, and extended email threads. In those situations, it’s often not enough to find the matching record. You also want to know where in that record the match occurred.&lt;/p&gt;

&lt;p&gt;To advance our tool in order to address this, we sliced the text fields of our Salesforce objects into smaller, overlapping “windows.” This meant we could compare each of these smaller embeddings to our query embedding to identify regions of high similarity.&lt;/p&gt;

&lt;p&gt;To achieve this, you can split your objects across multiple embedding records. Your table could look something like this, with an added &lt;code&gt;idx&lt;/code&gt; column:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;create&lt;/span&gt; &lt;span class="k"&gt;table&lt;/span&gt; &lt;span class="n"&gt;salesforce_embedding&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;task&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
  &lt;span class="n"&gt;id&lt;/span&gt; &lt;span class="nb"&gt;text&lt;/span&gt; &lt;span class="k"&gt;references&lt;/span&gt; &lt;span class="n"&gt;salesforce&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;task&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;id&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;on&lt;/span&gt; &lt;span class="k"&gt;delete&lt;/span&gt; &lt;span class="k"&gt;cascade&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="n"&gt;idx&lt;/span&gt; &lt;span class="nb"&gt;integer&lt;/span&gt; &lt;span class="k"&gt;not&lt;/span&gt; &lt;span class="k"&gt;null&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="n"&gt;embedding&lt;/span&gt; &lt;span class="n"&gt;vector&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;1536&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;not&lt;/span&gt; &lt;span class="k"&gt;null&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="k"&gt;primary&lt;/span&gt; &lt;span class="k"&gt;key&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;idx&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="p"&gt;);&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The &lt;code&gt;idx&lt;/code&gt; (or index) is the window index. One Salesforce object could be split over an arbitrary number of embedding records, according to whatever window size seems to work best for your application.&lt;/p&gt;

&lt;p&gt;In the application, you’ll display the relevant windows that scored highest in similarity. That will let the user easily see the sentences or paragraphs that are a match. Clicking on the window can bring them to the whole Note, but at the specific location that was a high match.&lt;/p&gt;

&lt;h3&gt;
  
  
  Writing back to Salesforce
&lt;/h3&gt;

&lt;p&gt;As we were filtering and reading through Salesforce Tasks and Notes, we realized in addition to search we wanted two pieces of functionality:&lt;/p&gt;

&lt;p&gt;The ability to rate objects on a scale of 1-5, according to how deep or insightful the conversation was.&lt;br&gt;
The ability to tag notes based on product themes, recurring problems, etc.&lt;/p&gt;

&lt;p&gt;With Sequin’s &lt;a href="https://docs.sequin.io/writes"&gt;write support&lt;/a&gt;, this update is trivial. You can add custom fields to your objects (like &lt;code&gt;Rating__c&lt;/code&gt; and &lt;code&gt;Tags__c&lt;/code&gt;). Then, you can make write requests back to Salesforce like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight elixir"&gt;&lt;code&gt;&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="n"&gt;update_tags&lt;/span&gt;&lt;span class="p"&gt;(%&lt;/span&gt;&lt;span class="no"&gt;Salesforce&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="no"&gt;Task&lt;/span&gt;&lt;span class="p"&gt;{}&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;task&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;tags&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt;
  &lt;span class="n"&gt;task&lt;/span&gt;
  &lt;span class="o"&gt;|&amp;gt;&lt;/span&gt; &lt;span class="no"&gt;Salesforce&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="no"&gt;Task&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;update_changeset&lt;/span&gt;&lt;span class="p"&gt;(%{&lt;/span&gt; &lt;span class="ss"&gt;tags__c:&lt;/span&gt; &lt;span class="n"&gt;tags&lt;/span&gt; &lt;span class="p"&gt;})&lt;/span&gt;
  &lt;span class="o"&gt;|&amp;gt;&lt;/span&gt; &lt;span class="no"&gt;Repo&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;update!&lt;/span&gt;
&lt;span class="k"&gt;end&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Changes are applied synchronously to Salesforce’s API, so if there’s a validation error it will be raised in your code.&lt;/p&gt;

&lt;h2&gt;
  
  
  Conclusion
&lt;/h2&gt;

&lt;p&gt;Consolidating customer feedback and call notes into one location is only half the battle. The next piece is creating tools and workflows that let you use this information to guide your product and keep customers in the loop while doing so.&lt;/p&gt;

&lt;p&gt;Embeddings are a powerful tool for achieving this. You can use a machine to help you find similar notes and cluster ideas. With a little effort, you can build your own tool, which gives you far more power and flexibility than you’d find using Salesforce AI.&lt;/p&gt;

&lt;p&gt;Your team will need to centralize their notes to make this work great, however! In a future post, I’ll detail the strategies we use for making data capture easy (e.g. drop a call note into Slack). Subscribe to get notified when we write that post.&lt;/p&gt;




&lt;ol&gt;

&lt;li id="fn1"&gt;
&lt;p&gt;&lt;code&gt;pgvector&lt;/code&gt; is included in most of the latest distributions of Postgres.&lt;br&gt;
If you're on AWS RDS, be sure you upgrade to Postgres 15.2+ to get access to the &lt;code&gt;vector&lt;/code&gt; extension. ↩&lt;/p&gt;
&lt;/li&gt;

&lt;li id="fn2"&gt;
&lt;p&gt;You can mix and match fields from different tables to generate embeddings. To start, you can keep it simple and generate embeddings that correspond to a single Salesforce object. For most objects, you’ll probably choose to create an embedding for just one field. For example, you don't need to create an embedding for the whole Note object, just the &lt;code&gt;body&lt;/code&gt; field. ↩&lt;/p&gt;

&lt;p&gt;A few tables might warrant creating a blended embedding with more than one field. For example, Tasks have both a &lt;code&gt;subject&lt;/code&gt; and a &lt;code&gt;description&lt;/code&gt;. You can concatenate the two fields together into a newline-separated string, and generate the embedding on that.&lt;/p&gt;

&lt;p&gt;In the future, you can blend more fields or objects together to let you build on your data in novel ways.&lt;/p&gt;
&lt;/li&gt;

&lt;li id="fn3"&gt;
&lt;p&gt;Normally a pagination strategy like this wouldn't be safe unless IDs were auto-incrementing. But this will work fine in all situations, because we don't care if we miss records that are inserted mid-pagination -- those are being handled by our event handler above! ↩&lt;/p&gt;
&lt;/li&gt;

&lt;li id="fn4"&gt;
&lt;p&gt;If you need to, you can use a different topic name for this populator, e.g. &lt;code&gt;jobs.backfill_sf_embeddings.[collection]&lt;/code&gt;. You’ll just need to register a different consumer, as each consumer can only be subscribed to one topic. ↩&lt;/p&gt;
&lt;/li&gt;

&lt;/ol&gt;

</description>
      <category>webdev</category>
      <category>database</category>
      <category>ai</category>
      <category>machinelearning</category>
    </item>
    <item>
      <title>All the ways to capture changes in Postgres</title>
      <dc:creator>Anthony Accomazzo</dc:creator>
      <pubDate>Tue, 19 Sep 2023 22:57:55 +0000</pubDate>
      <link>https://forem.com/acco/all-the-ways-to-capture-changes-in-postgres-5ekf</link>
      <guid>https://forem.com/acco/all-the-ways-to-capture-changes-in-postgres-5ekf</guid>
      <description>&lt;p&gt;Working with data at rest is where Postgres shines. But what about when you need data in motion? What about when you need to trigger a workflow based on changes to a table? Or you need to stream the data in Postgres to another data store, system, or service in real-time?&lt;/p&gt;

&lt;p&gt;Fortunately, Postgres comes with a lot of options to make this happen. In this post, I’ll lay them all out. I’ll also give you an idea of which are easy to do, which are more robust, and how to make the right choice for you.&lt;/p&gt;

&lt;h2&gt;
  
  
  Listen/Notify
&lt;/h2&gt;

&lt;p&gt;Perhaps the simplest approach is to use Postgres' interprocess communication feature, Listen/Notify. Listen/Notify is an implementation of the publish-subscribe pattern.&lt;/p&gt;

&lt;p&gt;With Listen/Notify, a Postgres session (or connection) can "listen" to a particular channel for notifications. Activity in the database or other sessions can "notify" that channel. Whenever a notification is sent to a channel, all sessions listening to that channel receive the notification instantly.&lt;/p&gt;

&lt;p&gt;You can see Listen/Notify for yourself by opening two &lt;code&gt;psql&lt;/code&gt; sessions.&lt;/p&gt;

&lt;p&gt;In session 1, you can setup your listener:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="k"&gt;listen&lt;/span&gt; &lt;span class="n"&gt;my_channel&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="k"&gt;LISTEN&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;And in session 2, you can publish to that channel with a message:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="k"&gt;notify&lt;/span&gt; &lt;span class="n"&gt;my_channel&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s1"&gt;'hey there!'&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="k"&gt;NOTIFY&lt;/span&gt;
&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="k"&gt;notify&lt;/span&gt; &lt;span class="n"&gt;my_channel&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s1"&gt;'is this thing on?'&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="k"&gt;NOTIFY&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;While the listener process received the message right away, &lt;code&gt;psql&lt;/code&gt; won't print the message automatically. To get it to print out the messages it's received so far, you just need to run any query. For example, you can just send an empty query like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="k"&gt;listen&lt;/span&gt; &lt;span class="n"&gt;my_channel&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="k"&gt;LISTEN&lt;/span&gt;
&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="n"&gt;Asynchronous&lt;/span&gt; &lt;span class="n"&gt;notification&lt;/span&gt; &lt;span class="nv"&gt;"my_channel"&lt;/span&gt; &lt;span class="k"&gt;with&lt;/span&gt; &lt;span class="n"&gt;payload&lt;/span&gt; &lt;span class="nv"&gt;"hey there!"&lt;/span&gt; &lt;span class="n"&gt;received&lt;/span&gt; &lt;span class="k"&gt;from&lt;/span&gt; &lt;span class="n"&gt;server&lt;/span&gt; &lt;span class="n"&gt;process&lt;/span&gt; &lt;span class="k"&gt;with&lt;/span&gt; &lt;span class="n"&gt;PID&lt;/span&gt; &lt;span class="mi"&gt;80019&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;
&lt;span class="n"&gt;Asynchronous&lt;/span&gt; &lt;span class="n"&gt;notification&lt;/span&gt; &lt;span class="nv"&gt;"my_channel"&lt;/span&gt; &lt;span class="k"&gt;with&lt;/span&gt; &lt;span class="n"&gt;payload&lt;/span&gt; &lt;span class="nv"&gt;"is this thing on?"&lt;/span&gt; &lt;span class="n"&gt;received&lt;/span&gt; &lt;span class="k"&gt;from&lt;/span&gt; &lt;span class="n"&gt;server&lt;/span&gt; &lt;span class="n"&gt;process&lt;/span&gt; &lt;span class="k"&gt;with&lt;/span&gt; &lt;span class="n"&gt;PID&lt;/span&gt; &lt;span class="mi"&gt;80019&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;(Naturally, this isn't how the Postgres client library in your preferred programming language will work. Libraries will deliver messages to your subscriber immediately without requiring a query.)&lt;/p&gt;

&lt;p&gt;To use Listen/Notify to capture changes, you can set up a trigger. For example, here's an &lt;code&gt;after&lt;/code&gt; trigger that sends along the payload of the record that changed as JSON via Notify:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;create&lt;/span&gt; &lt;span class="k"&gt;or&lt;/span&gt; &lt;span class="k"&gt;replace&lt;/span&gt; &lt;span class="k"&gt;function&lt;/span&gt; &lt;span class="n"&gt;notify_trigger&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="k"&gt;returns&lt;/span&gt; &lt;span class="k"&gt;trigger&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="err"&gt;$$&lt;/span&gt;
&lt;span class="k"&gt;declare&lt;/span&gt;
  &lt;span class="n"&gt;payload&lt;/span&gt; &lt;span class="n"&gt;json&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="k"&gt;begin&lt;/span&gt;
  &lt;span class="n"&gt;payload&lt;/span&gt; &lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;json_build_object&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'table'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;TG_TABLE_NAME&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s1"&gt;'id'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="k"&gt;NEW&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s1"&gt;'action'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;TG_OP&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
  &lt;span class="n"&gt;perform&lt;/span&gt; &lt;span class="n"&gt;pg_notify&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'table_changes'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;payload&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nb"&gt;text&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
  &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="k"&gt;end&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="err"&gt;$$&lt;/span&gt; &lt;span class="k"&gt;language&lt;/span&gt; &lt;span class="n"&gt;plpgsql&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

&lt;span class="k"&gt;create&lt;/span&gt; &lt;span class="k"&gt;trigger&lt;/span&gt; &lt;span class="n"&gt;my_trigger&lt;/span&gt;
&lt;span class="k"&gt;after&lt;/span&gt; &lt;span class="k"&gt;insert&lt;/span&gt; &lt;span class="k"&gt;or&lt;/span&gt; &lt;span class="k"&gt;update&lt;/span&gt; &lt;span class="k"&gt;or&lt;/span&gt; &lt;span class="k"&gt;delete&lt;/span&gt; &lt;span class="k"&gt;on&lt;/span&gt; &lt;span class="n"&gt;my_table&lt;/span&gt;
&lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="k"&gt;each&lt;/span&gt; &lt;span class="k"&gt;row&lt;/span&gt; &lt;span class="k"&gt;execute&lt;/span&gt; &lt;span class="k"&gt;function&lt;/span&gt; &lt;span class="n"&gt;notify_trigger&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Downsides
&lt;/h3&gt;

&lt;p&gt;Listen/Notify is simple and powerful, but has some notable downsides.&lt;/p&gt;

&lt;p&gt;First, as a pub-sub mechanism, it has "at most once" delivery semantics. Notifications are transient; a listener needs to be listening to a channel when notifications are published. When a listener subscribes to a channel, it will only receive notifications from that moment forward. This also means that if there are network issues that cause a listening session to disconnect even briefly, it won't receive the notification.&lt;/p&gt;

&lt;p&gt;Second, the payload size limit is 8000 bytes. If the message exceeds this size, the &lt;code&gt;notify&lt;/code&gt; command will fail. &lt;sup id="fnref1"&gt;1&lt;/sup&gt;&lt;/p&gt;

&lt;p&gt;As such, Listen/Notify is solid for basic change detection needs, but you'll probably find it does not serve more sophisticated needs well. However, it can complement other strategies (like "poll the table") nicely.&lt;/p&gt;

&lt;h2&gt;
  
  
  Poll the table
&lt;/h2&gt;

&lt;p&gt;The simplest &lt;em&gt;robust&lt;/em&gt; way to capture changes is to poll the table directly. Here, you need each table to have an &lt;code&gt;updated_at&lt;/code&gt; column or similar that updates whenever the row updates. (You can use a trigger for this.) A combination of &lt;code&gt;updated_at&lt;/code&gt; &lt;a href="https://blog.sequin.io/whats-changed-in-your-api/"&gt;and &lt;code&gt;id&lt;/code&gt;&lt;/a&gt; serve as your cursor. In this setup, your application logic that polls the table handles storing and maintaining the cursor.&lt;/p&gt;

&lt;p&gt;In addition to polling the table, you can use a Notify subscription to inform your application that a record has been inserted or modified. Postgres' notifications are ephemeral, so this should only serve as an optimization on top of polling.&lt;/p&gt;

&lt;h3&gt;
  
  
  Downsides
&lt;/h3&gt;

&lt;p&gt;This approach has two downsides.&lt;/p&gt;

&lt;p&gt;The first is that you can't detect when a row is deleted. There's no way to "see" the missing row in the table.&lt;/p&gt;

&lt;p&gt;One remediation is to have a Postgres trigger fire on deletes, and store the &lt;code&gt;id&lt;/code&gt; (and whatever other columns you want) in a separate table: e.g. &lt;code&gt;deleted_contacts&lt;/code&gt;. Then, your application can poll that table to discover deletes instead.&lt;/p&gt;

&lt;p&gt;The second downside is that you don't get diffs. You know this record was updated since you last polled the table, but you don't know &lt;em&gt;what&lt;/em&gt; was updated on the record.&lt;/p&gt;

&lt;p&gt;Maybe deletes aren't a big deal for your use case or you don't care about diffs. If so, polling the table is a reasonable and simple solution for tracking changes.&lt;/p&gt;

&lt;h2&gt;
  
  
  Replication (WAL)
&lt;/h2&gt;

&lt;p&gt;Postgres supports streaming replication to other Postgres databases. In streaming replication, Postgres sends the WAL stream over a network connection from the primary to a replica. The standby servers pull these WAL records and replay them to keep their database in sync with the primary database. &lt;/p&gt;

&lt;p&gt;Streaming replication was built for streaming changes to other Postgres servers. But you can use it to capture changes for your application too.&lt;/p&gt;

&lt;p&gt;You first create a replication slot, like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;select&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt; &lt;span class="k"&gt;from&lt;/span&gt;
&lt;span class="n"&gt;pg_create_logical_replication_slot&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'&amp;lt;your_slot_name&amp;gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s1"&gt;'&amp;lt;output_plugin&amp;gt;'&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;code&gt;output_plugin&lt;/code&gt; is a parameter which specifies which plugin Postgres should use to decode WAL changes. Postgres comes with a few built-in plugins. &lt;code&gt;pgoutput&lt;/code&gt; is the default. It formats the output in the binary expected by client servers. &lt;code&gt;test_decoding&lt;/code&gt; is a simple output plugin that provides human-readable output of the changes to the WAL.&lt;/p&gt;

&lt;p&gt;The most popular output plugin not built-in to Postgres is &lt;code&gt;wal2json&lt;/code&gt;. It does what it says on the tin. JSON will be a lot easier for you to consume from an application than Postgres' binary format.&lt;/p&gt;

&lt;p&gt;After creating your replication slot, you can start it and consume from it. Working with replication slots uses a different part of the Postgres protocol than standard queries. But many client libraries have functions that help you work with replication slots.&lt;/p&gt;

&lt;p&gt;For example, this is how you consume WAL messages in the &lt;code&gt;psycopg2&lt;/code&gt; library:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="n"&gt;cursor&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;start_replication&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;slot_name&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s"&gt;'your_slot_name'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;decode&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="bp"&gt;True&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;cursor&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;consume_stream&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;lambda&lt;/span&gt; &lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;acknowledge_to_server&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;cursor&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;

&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;acknowledge_to_server&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;cursor&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="c1"&gt;# Process the message (msg) here
&lt;/span&gt;    &lt;span class="c1"&gt;# ...
&lt;/span&gt;    &lt;span class="c1"&gt;# Acknowledge the message
&lt;/span&gt;    &lt;span class="n"&gt;cursor&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;send_feedback&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;flush_lsn&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;wal_end&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Note that the client is responsible for ack'ing WAL messages that it has received. So the replication slot behaves like event buses such as SQS.&lt;/p&gt;

&lt;p&gt;Instead of consuming from the WAL directly, you can use tools like Debezium to do this for you. Debezium will consume the WAL from Postgres and stream those changes to a variety of sinks, including Kafka or NATS.&lt;/p&gt;

&lt;h3&gt;
  
  
  Downsides
&lt;/h3&gt;

&lt;p&gt;Using Postgres' replication facilities to capture changes is a robust solution. The biggest downside is complexity. Replication slots and the replication protocol are less familiar to most developers than the "standard" parts (i.e. tables and queries).&lt;/p&gt;

&lt;p&gt;Along with this complexity is a decrease in clarity. If something with replication breaks or if there's a lag or things aren't working as expected, it can be a bit trickier to debug than the other solutions outlined here.&lt;/p&gt;

&lt;p&gt;Another aspect worth mentioning is that replication slots may require tweaking &lt;code&gt;postgresql.conf&lt;/code&gt;. For example, you may need to tweak parameters like &lt;code&gt;max_wal_senders&lt;/code&gt; and &lt;code&gt;max_replication_slots&lt;/code&gt;. So you'll need total access to the database to implement this solution.&lt;/p&gt;

&lt;h2&gt;
  
  
  Capture changes in an audit table
&lt;/h2&gt;

&lt;p&gt;In this approach, you set up a separate table for logging changes, e.g. &lt;code&gt;changelog&lt;/code&gt;. That table contains column related to the record's modification, such as:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;code&gt;action&lt;/code&gt;: Was this an &lt;code&gt;insert&lt;/code&gt;, &lt;code&gt;update&lt;/code&gt;, or &lt;code&gt;delete&lt;/code&gt;?&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;old&lt;/code&gt;: A jsonb of the record before the mutation. Blank for inserts.&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;values&lt;/code&gt;: A jsonb of the change fields. Blank for deletes.&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;inserted_at&lt;/code&gt;: Time the change occurred.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;To set this up, you need to create a trigger function that inserts into this table every time a change occurs. Then, you need to create triggers on all the tables you care about to invoke that trigger function.&lt;/p&gt;

&lt;p&gt;Here's an example of what that trigger function might look like:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;create or replace function changelog_trigger() returns trigger as $$
declare
  action text;
  table_name text;
  transaction_id bigint;
  timestamp timestamp;
  old_data jsonb;
  new_data jsonb;
begin
  action := lower(TG_OP::text);
  table_name := TG_TABLE_NAME::text;
  transaction_id := txid_current();
  timestamp := current_timestamp;

  if TG_OP = 'DELETE' then
    old_data := to_jsonb(OLD.*);
  elseif TG_OP = 'INSERT' then
    new_data := to_jsonb(NEW.*);
  elseif TG_OP = 'UPDATE' then
    old_data := to_jsonb(OLD.*);
    new_data := to_jsonb(NEW.*);
  end if;

  insert into changelog (action, table_name, transaction_id, timestamp, old_data, new_data) 
  values (action, table_name, transaction_id, timestamp, old_data, new_data);

  return null;
end;
$$ language plpgsql;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;After setting up a way to capture changes, you need to figure out how to consume them.&lt;/p&gt;

&lt;p&gt;There's a lot of different ways you can do this. One way is to treat the &lt;code&gt;changelog&lt;/code&gt; as a queue. Your application workers can pull changes from this table. You'll probably want to ensure that changes are processed ~exactly once. You can use the &lt;code&gt;for update skip locked&lt;/code&gt; feature in Postgres to do this. For example, your workers can open a transaction and grab a chunk of &lt;code&gt;changelog&lt;/code&gt; entries:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;begin&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

&lt;span class="k"&gt;select&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt; 
&lt;span class="k"&gt;from&lt;/span&gt; &lt;span class="n"&gt;changelog&lt;/span&gt; 
&lt;span class="k"&gt;order&lt;/span&gt; &lt;span class="k"&gt;by&lt;/span&gt; &lt;span class="nb"&gt;timestamp&lt;/span&gt; 
&lt;span class="k"&gt;limit&lt;/span&gt; &lt;span class="mi"&gt;100&lt;/span&gt; 
&lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="k"&gt;update&lt;/span&gt; &lt;span class="n"&gt;skip&lt;/span&gt; &lt;span class="n"&gt;locked&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Now, other workers running that query will not receive this "locked" block of rows. After your worker processes the records, it can delete them:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;delete from changelog 
where id in (list_of_processed_record_ids);

commit;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Downsides
&lt;/h3&gt;

&lt;p&gt;This approach is similar to using a replication slot, but more manual. The trigger function and table design I've outlined might work to start. But you'd likely need to make tweaks before deploying at scale in production. &lt;sup id="fnref2"&gt;2&lt;/sup&gt;&lt;/p&gt;

&lt;p&gt;The advantage over replication slots is that it's all "standard" Postgres. Instead of an opaque replication slot, you have an easy to query Postgres table. And you don't need access to &lt;code&gt;postgresql.conf&lt;/code&gt; to make this work.&lt;/p&gt;

&lt;h2&gt;
  
  
  Foreign data wrappers
&lt;/h2&gt;

&lt;p&gt;Foreign data wrappers (FDWs) are a Postgres feature that allow you to both read from and write to external data sources from your Postgres database.&lt;/p&gt;

&lt;p&gt;The most notable and widely supported extension built on FDWs is &lt;code&gt;postgres_fdw&lt;/code&gt;. With &lt;code&gt;postgres_fdw&lt;/code&gt;, you can connect two Postgres databases and create something like a &lt;a href="https://www.postgresql.org/docs/current/sql-createview.html"&gt;view&lt;/a&gt; in one Postgres database that references a table in another Postgres database. Under the hood, you're turning one Postgres database into a client and the other into a server. When you make queries against foreign tables, the client database sends the queries to the server database via Postgres' &lt;a href="https://www.postgresql.org/docs/current/protocol-flow.html"&gt;wire protocol&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;Using FDWs to capture changes is an unusual strategy. I wouldn't recommend it outside very specific situations.&lt;/p&gt;

&lt;p&gt;One situation where FDWs could make sense is if you're capturing changes in one Postgres database in order to write them to another Postgres database. Perhaps you use one database for accounting and another for your application. You can skip the intermediary change capture steps and use &lt;code&gt;postgres_fdw&lt;/code&gt; to go from database to database.&lt;/p&gt;

&lt;p&gt;Here's an example trigger that ensures the status for a given account (identified by &lt;code&gt;email&lt;/code&gt;) is in-sync across two databases. This assumes the foreign table has already been declared as &lt;code&gt;foreign_app_database&lt;/code&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;create or replace function cancel_subscription()
  returns trigger as $$
declare
  account_status text;
begin
  if (new.status = 'cancelled' or new.status = 'suspended') then
    account_status := 'cancelled';

    update foreign_app_database.account
    set status = account_status
    where email = new.email;
  end if;

  return new;
end;
$$ language plpgsql;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;In addition to &lt;code&gt;postgres_fdw&lt;/code&gt;, you can create and load your own foreign data wrappers into your Postgres database.&lt;/p&gt;

&lt;p&gt;That means you could create a foreign data wrapper that posts changes to an internal API. Unlike the other change detection strategies in this list, because you'd write to the API inside your commit, your API would have the ability to reject the change and roll back the commit.&lt;/p&gt;

&lt;h3&gt;
  
  
  Downsides
&lt;/h3&gt;

&lt;p&gt;Foreign data wrappers are a fun and powerful Postgres feature. But they'll rarely be your best option for capturing changes. You're probably not trying to replicate changes from one Postgres database to another. And while writing your own foreign data wrapper from scratch &lt;a href="https://github.com/supabase/wrappers"&gt;has gotten easier&lt;/a&gt;, writing your own FDW is probably the biggest lift in this list for capturing changes.&lt;/p&gt;

&lt;h2&gt;
  
  
  Conclusion
&lt;/h2&gt;

&lt;p&gt;There are lots of options for capturing changes in Postgres. Depending on your use case, some options are clearly better than others. In sum:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Listen/Notify is great for non-critical event capture, prototyping, or optimizing polling.&lt;/li&gt;
&lt;li&gt;Polling for changes is a fine, straightforward solution for simple use cases.&lt;/li&gt;
&lt;li&gt;Replication is probably your best bet for a robust solution. If that’s too difficult or opaque, then perhaps the audit table is a good middle-ground.&lt;/li&gt;
&lt;li&gt;Finally, foreign data wrappers solve a need you’re unlikely to have.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;We examined all of these options for our own change capture requirements, and unfortunately none of them met our complex (and niche) needs. So, we ended up needing to build a Postgres proxy 😅 You can &lt;a href="https://blog.sequin.io/we-had-no-choice-but-to-build-a-postgres-proxy/"&gt;read more about that here&lt;/a&gt;.&lt;/p&gt;




&lt;ol&gt;

&lt;li id="fn1"&gt;
&lt;p&gt;Note the payload size includes the channel name, which like all Postgres identifiers can be up to 64 bytes in size. ↩&lt;/p&gt;
&lt;/li&gt;

&lt;li id="fn2"&gt;
&lt;p&gt;One example issue that comes to mind: should there be a timeout for how long workers can have changes checked out? ↩&lt;/p&gt;
&lt;/li&gt;

&lt;/ol&gt;

</description>
      <category>postgres</category>
      <category>webdev</category>
    </item>
    <item>
      <title>We had no choice but to build a Postgres proxy</title>
      <dc:creator>Anthony Accomazzo</dc:creator>
      <pubDate>Tue, 19 Sep 2023 22:40:56 +0000</pubDate>
      <link>https://forem.com/acco/we-had-no-choice-but-to-build-a-postgres-proxy-528d</link>
      <guid>https://forem.com/acco/we-had-no-choice-but-to-build-a-postgres-proxy-528d</guid>
      <description>&lt;p&gt;We knew building a database proxy would be hard. We wanted to find &lt;em&gt;any&lt;/em&gt; other way to achieve our mission. But alas, after looking at all the options, only one solution remained standing.&lt;/p&gt;

&lt;p&gt;Below, I'll share our journey to the inescapable conclusion.&lt;/p&gt;




&lt;h2&gt;
  
  
  Perhaps async writes will be good enough?
&lt;/h2&gt;

&lt;p&gt;When we started Sequin, we had a one-way sync from third-party APIs to Postgres databases. Our hypothesis was that when working with third-party APIs, just reading all your data from a database is way easier than from an API. You can use SQL or your favorite ORM, don't have to worry about rate limits or latency, and don't have to worry about availability.&lt;/p&gt;

&lt;p&gt;Reading through the database worked so well that we wanted to see if we could make writes work through the database too.&lt;/p&gt;

&lt;p&gt;So we added database writes. We'd monitor your database for changes and send those changes to the API. The process ran &lt;em&gt;async&lt;/em&gt;, completing just a couple of seconds after you committed your write.&lt;/p&gt;

&lt;p&gt;After seeing customers adopt and scale with async writes, it was confirmed: writing back to the API via SQL is amazing. But the async part was causing a lot of problems.&lt;/p&gt;

&lt;p&gt;For example, you write to Postgres. Let’s say you’re updating the email on a Salesforce contact. The write succeeds. But it's totally unknown to you if and when that change will make it to the API. Inserting, updating, or deleting a record in Postgres is like creating a job or a "promise" that you are hoping would resolve in a successful API write in the future.&lt;/p&gt;

&lt;p&gt;The API is ultimately the source of truth. You need the API to approve your mutation. Writes are where &lt;a href="https://acko.net/blog/apis-are-about-policy/"&gt;APIs enforce their policy&lt;/a&gt;. You want to know about validation errors when they happen – like if the contact you’re updating has an invalid email – so you can handle them in code.&lt;/p&gt;

&lt;p&gt;When developing on an API, an async experience like this is tough. You craft your mutation in your Postgres client and commit it. Then, you have to go check somewhere else to monitor the progress of your request – be it the audit table or a dashboard somewhere.&lt;/p&gt;

&lt;p&gt;Furthermore, this approach means changes can originate in two places and that you have two nodes that can drift apart.&lt;/p&gt;

&lt;p&gt;We were removing the HTTP API but replacing it with a classically hard distributed systems problems. For example: if a change fails API validation, do we roll it back? What if there were other changes stacked on top of this one? Or what if drift occurred between the time when the developer committed the Postgres change and we made the subsequent API request?&lt;/p&gt;

&lt;h2&gt;
  
  
  Synchronous, but at what cost?
&lt;/h2&gt;

&lt;p&gt;We grew weary of async writes. It felt like we were close, but hadn't found the winning solution yet.&lt;/p&gt;

&lt;p&gt;Ideally, we wanted the API to continue to act as the validation layer for commits. And for the API to be the source of truth, with Postgres as a simple follower database.&lt;/p&gt;

&lt;p&gt;We wanted synchronous writes. But inside of Postgres' transactional model, we didn't see a way to make this happen.&lt;/p&gt;

&lt;p&gt;So, we began exploring.&lt;/p&gt;

&lt;h3&gt;
  
  
  Requirements
&lt;/h3&gt;

&lt;p&gt;Our requirements were driven by the fact that we wanted writes to be synchronous, so errors would be easy to see and handle. And that we wanted to be compatible with all Postgres clients, including ORMs:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Support for &lt;code&gt;insert&lt;/code&gt;, &lt;code&gt;update&lt;/code&gt;, and &lt;code&gt;delete&lt;/code&gt;.&lt;/li&gt;
&lt;li&gt;Support for &lt;code&gt;returning&lt;/code&gt; clauses. &lt;code&gt;returning&lt;/code&gt; clauses are often necessary for inserts, where you need to do something with the row you just inserted. And indeed several ORMs rely on these clauses to operate. &lt;sup id="fnref1"&gt;1&lt;/sup&gt;
&lt;/li&gt;
&lt;li&gt;A commit must translate to a single API request. This was the simplest way to avoid weird inconsistent state. &lt;sup id="fnref2"&gt;2&lt;/sup&gt;
&lt;/li&gt;
&lt;li&gt;Errors must happen during the commit. If the operation fails, the user should receive a Postgres error.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Notably, we decided that batch operations were a "nice to have." Many transactional workflows operate on only a single row at a time. Batch operations would be most common in one-off workflows. If we had to give them up for synchronous writes, we would.&lt;/p&gt;

&lt;h2&gt;
  
  
  Option: Synchronous replication
&lt;/h2&gt;

&lt;p&gt;Postgres supports streaming replication to other Postgres databases. In streaming replication, Postgres sends the WAL stream over a network connection from the primary to a replica.&lt;/p&gt;

&lt;p&gt;When streaming replication is set to synchronous mode, the primary will wait for any or all replicas to confirm they committed the data.&lt;/p&gt;

&lt;p&gt;Instead of streaming the WAL to another Postgres database, we could stream the WAL synchronously to our application server. Instead of committing the changes to a database, it would attempt to commit them to the API. If it failed to do so, it could raise an error, which would trickle up to and break the transaction.&lt;/p&gt;

&lt;p&gt;However, this wasn't going to meet our requirements.&lt;/p&gt;

&lt;p&gt;Let's start by considering a success case: the customer inserts a record into the database, the new record streams to us through the WAL, we commit the new record to the API, and the API accepts the insert.&lt;/p&gt;

&lt;p&gt;We now need to update the database with the record returned by the API. Importantly, the API response body includes the record’s API ID. It also may contain other fields not sent in our request, like calculated fields or timestamps.&lt;/p&gt;

&lt;p&gt;In synchronous replication, we can only update the row with the result from the API &lt;em&gt;after&lt;/em&gt; the commit has happened. That’s because another process is responsible for writing the changes back:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--8Ew5H_fZ--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://blog.sequin.io/content/images/2023/09/CleanShot-2023-09-18-at-09.50.57%402x.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--8Ew5H_fZ--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://blog.sequin.io/content/images/2023/09/CleanShot-2023-09-18-at-09.50.57%402x.png" alt="Synchronous replication architecture" width="800" height="304"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;So while we hear about the commit as it’s happening – and can interrupt the commit if it fails – we still can't fit our whole operation neatly into the commit. We have to wait for the commit to finish – and therefore for the row to become available/unlocked – before we can update it with the API response.&lt;/p&gt;

&lt;p&gt;This means we can’t meet two requirements.&lt;/p&gt;

&lt;p&gt;The first is that there is no way for us to support a &lt;code&gt;returning&lt;/code&gt; clause. The row needs to be modified &lt;em&gt;before&lt;/em&gt; it's committed if you want to reflect the updated row in the fields returned to the client. You can only do that in a "before each row" trigger or in a rewrite rule.&lt;/p&gt;

&lt;p&gt;The second issue is related: &lt;em&gt;when&lt;/em&gt; the record will be updated with the API response is really indeterminate! If the client can't rely on a &lt;code&gt;returning&lt;/code&gt; clause, they may opt to do a read-after-write: after writing the record, immediately read it. But again, because the update is not happening coincident with the commit, there's no telling if that subsequent read will "beat" whatever process we have writing back the API's response to the row.&lt;/p&gt;

&lt;p&gt;In addition, in the failure case where the API rejects the changes, we weren't confident we could craft the right Postgres errors to percolate up to the client. (Unconfirmed, as we'd already eliminated this option.)&lt;/p&gt;

&lt;h2&gt;
  
  
  Option: Foreign data wrappers
&lt;/h2&gt;

&lt;p&gt;Foreign data wrappers were another serious contender.&lt;/p&gt;

&lt;p&gt;Foreign data wrappers (FDWs) are a Postgres feature that allow you to both read from and write to external data sources. The architecture that they model felt very similar to what we were building: the data source you're writing to doesn't live in your database, it lives over the wire (the API). This was encouraging.&lt;/p&gt;

&lt;p&gt;While you can build your own FDWs, most cloud providers do not let you load arbitrary extensions into their managed databases. This was the first rub: in order to support our numerous customers on AWS' or GCP's managed Postgres databases, we couldn't create our own foreign data wrapper extension.&lt;/p&gt;

&lt;p&gt;Instead we’d need to use Postgres’ built-in FDW, &lt;a href="https://www.postgresql.org/docs/current/postgres-fdw.html"&gt;&lt;code&gt;postgres_fdw&lt;/code&gt;&lt;/a&gt;. With &lt;code&gt;postgres_fdw&lt;/code&gt;, you can connect two Postgres databases and create something like a &lt;a href="https://www.postgresql.org/docs/current/sql-createview.html"&gt;view&lt;/a&gt; in one Postgres database that references a table in another Postgres database.&lt;/p&gt;

&lt;p&gt;These foreign tables behave exactly like local tables. You can &lt;code&gt;select&lt;/code&gt;, &lt;code&gt;insert&lt;/code&gt;, &lt;code&gt;update&lt;/code&gt;, &lt;code&gt;delete&lt;/code&gt;, and &lt;code&gt;join&lt;/code&gt; all the same.&lt;/p&gt;

&lt;p&gt;When you setup &lt;code&gt;postgres_fdw&lt;/code&gt;, under the hood you're turning one Postgres database into a client and the other into a server. When you make queries against foreign tables, the client database sends the queries to the server database via Postgres' &lt;a href="https://www.postgresql.org/docs/current/protocol-flow.html"&gt;wire protocol&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;To make &lt;code&gt;postgres_fdw&lt;/code&gt; work, we'd setup a Postgres wire protocol compliant server. That server would act as a fake Postgres database. By fitting into the model of the standard &lt;code&gt;postgres_fdw&lt;/code&gt;, we'd have the widest compatibility:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--xGdtLF9F--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://blog.sequin.io/content/images/2023/09/CleanShot-2023-09-18-at-09.51.30%402x.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--xGdtLF9F--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://blog.sequin.io/content/images/2023/09/CleanShot-2023-09-18-at-09.51.30%402x.png" alt="Foreign data wrappers architecture" width="800" height="258"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Like synchronous replication, we had to find a way to use &lt;code&gt;postgres_fdw&lt;/code&gt; inside of Postgres' standard commit flow to deliver the experience we wanted. And like synchronous replication, we ran into limitations.&lt;/p&gt;

&lt;p&gt;The most notable limitation was with &lt;code&gt;postgres_fdw&lt;/code&gt; itself.&lt;/p&gt;

&lt;p&gt;With &lt;code&gt;update&lt;/code&gt; and &lt;code&gt;delete&lt;/code&gt; queries, the client Postgres sends the query as-is to the server. This makes sense – the client Postgres doesn't store any of the rows. So when you run an &lt;code&gt;update&lt;/code&gt; or &lt;code&gt;delete&lt;/code&gt;, it has to fully delegate the operation to the server. This is exactly what we wanted, because it gives us full control. The client Postgres database is proxying requests to our server, allowing us to have full control over how they are executed.&lt;/p&gt;

&lt;p&gt;But &lt;code&gt;insert&lt;/code&gt; queries were a different story. In specific situations, &lt;code&gt;postgres_fdw&lt;/code&gt; does not support batch inserts. The biggest drag is that it does not support batch inserts when a &lt;a href="https://github.com/postgres/postgres/blob/master/contrib/postgres_fdw/postgres_fdw.c#L2010-L2017"&gt;&lt;code&gt;returning&lt;/code&gt; clause is specified&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;In these situations, the query doesn't fail (which for our purposes would be preferable). Instead, &lt;code&gt;postgres_fdw&lt;/code&gt; will rewrite the batch insert, turning it into multiple single row inserts:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="c1"&gt;-- original query sent by the client…&lt;/span&gt;
&lt;span class="k"&gt;insert&lt;/span&gt; &lt;span class="k"&gt;into&lt;/span&gt; &lt;span class="n"&gt;orders&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;order_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;product_name&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;quantity&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; 
&lt;span class="k"&gt;values&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s1"&gt;'Product A'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;30&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s1"&gt;'Product B'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;20&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;

&lt;span class="c1"&gt;-- is split into two queries before being sent to the foreign Postgres server&lt;/span&gt;
&lt;span class="k"&gt;insert&lt;/span&gt; &lt;span class="k"&gt;into&lt;/span&gt; &lt;span class="n"&gt;orders&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;order_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;product_name&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;quantity&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; 
&lt;span class="k"&gt;values&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s1"&gt;'Product A'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;30&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;

&lt;span class="k"&gt;insert&lt;/span&gt; &lt;span class="k"&gt;into&lt;/span&gt; &lt;span class="n"&gt;orders&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;order_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;product_name&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;quantity&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; 
&lt;span class="k"&gt;values&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s1"&gt;'Product B'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;20&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This is unfortunate because both the user and foreign server are blind to the fact that their batch insert is actually being translated into a bunch of serial inserts. Likewise, on the server, when we receive an insert we have no way of knowing that another is about to follow as part of a batch.&lt;/p&gt;

&lt;p&gt;With &lt;code&gt;postgres_fdw&lt;/code&gt;, all operations happen inside of a transaction. So, for batch inserts, you might think: can't we just "ack" each inserted row as it's received, storing it in memory. Then, at the end of the transaction, go write all the rows to the API? But then we'd violate our requirement to fully support &lt;code&gt;returning&lt;/code&gt; clauses – because our &lt;em&gt;only&lt;/em&gt; opportunity to affect the row returned to the client is when we receive each individual insert query. We can't return dummy data for each insert query, then at the end of the transaction say: "never mind all those rows I just sent you – here are the &lt;em&gt;real&lt;/em&gt; rows you should return."&lt;/p&gt;

&lt;p&gt;Ideally, we'd be able to detect when a customer was attempting to make a batch insert with a &lt;code&gt;returning&lt;/code&gt; clause and return a helpful error. But that’s not possible.&lt;/p&gt;

&lt;p&gt;So, foreign data wrappers in the general sense wouldn't work for us because we can't install our own FDWs on managed databases. And using &lt;code&gt;postgres_fdw&lt;/code&gt; felt clever, but put us downstream of an extension that we had little control over.&lt;/p&gt;

&lt;p&gt;We briefly surveyed other options, including far-out projects like &lt;a href="https://github.com/pramsey/pgsql-http"&gt;pgsql-http&lt;/a&gt;. But no matter what we looked at, it was clear: we couldn't do what we needed to do behind the database (synchronous replication). And we couldn't do what we needed to do inside the database (FDWs).&lt;/p&gt;

&lt;p&gt;We'd need to get in front of it.&lt;/p&gt;

&lt;h2&gt;
  
  
  Landing on the Postgres proxy
&lt;/h2&gt;

&lt;p&gt;To get in front of the database, we'd need to build a proxy:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--jKin4BlD--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://blog.sequin.io/content/images/2023/09/CleanShot-2023-09-18-at-09.53.19%402x.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--jKin4BlD--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://blog.sequin.io/content/images/2023/09/CleanShot-2023-09-18-at-09.53.19%402x.png" alt="Postgres proxy architecture" width="800" height="384"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;It felt like the biggest lift, but also came with the biggest guarantee that we could get the experience we wanted:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;We’d be able to support &lt;code&gt;insert&lt;/code&gt;, &lt;code&gt;update&lt;/code&gt;, and &lt;code&gt;delete&lt;/code&gt;, including batches.&lt;/li&gt;
&lt;li&gt;We’d be able to fully support &lt;code&gt;returning&lt;/code&gt; clauses, returning the response that we received from the API after performing the mutation request.&lt;/li&gt;
&lt;li&gt;We’d have full control over the Postgres errors that we returned to the client.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;To pull this off, we’d need to add an additional requirement, which was that the proxy would need to work seamlessly with all Postgres clients. That meant adhering to Postgres' wire protocol.&lt;/p&gt;

&lt;p&gt;A standard Postgres proxy like Pgbouncer doesn't need to know much about Postgres' wire protocol beyond authentication. After a client establishes a connection to Pgbouncer, Pgbouncer opens connection to the Postgres database. These connections reside in a pool. When a client sends a statement or transaction, Pgbouncer checks out a connection from the pool and uses that connection for the duration of the statement or transaction.&lt;/p&gt;

&lt;p&gt;But once a client's connection is paired with a database connection, Pgbouncer doesn't need to know much about what's going on. The proxy simply passes TCP packets back and forth between the two. All the while, it's looking for one particular message from the server to the client (the &lt;code&gt;ReadyForQuery&lt;/code&gt; message). When it sees that message, it knows whatever the client and server were working on is completed, and it's able to release the database connection back into the pool.&lt;/p&gt;

&lt;p&gt;We thought to achieve our goal, our proxy might not need to know too much about what was going on either.&lt;/p&gt;

&lt;p&gt;Boy, were we wrong.&lt;/p&gt;

&lt;p&gt;As we expanded our proxy to cover the surface area of Postgres operations, our proxy had to become more and more fluent in the Postgres protocol.&lt;/p&gt;

&lt;p&gt;Eventually, our proxy became a fluent Postgres server. The specification of the Postgres protocol is concise, leaving room for interpretation. In that room, client quirks have blossomed, and our proxy had to adapt to handle all of them. &lt;sup id="fnref3"&gt;3&lt;/sup&gt;&lt;/p&gt;

&lt;p&gt;The proxy also had to become a fluent Postgres client. We have to inject our own queries into the client/server dance to capture changes.&lt;/p&gt;

&lt;p&gt;Just like a Postgres server or client, our proxy keeps an internal state machine for all connections to ensure we know precisely where we are in a statement or transaction flow. We know the state of the client connection and of the server connection and what we need to do to safely progress to the next state with each.&lt;/p&gt;

&lt;p&gt;(More on our proxy's design in a future post!)&lt;/p&gt;

&lt;h3&gt;
  
  
  The experience we always wanted
&lt;/h3&gt;

&lt;p&gt;While it was a journey to decide to build the proxy and another journey to build it, we ended up with a solution that gives us much more control. That meant building the experience we'd been looking for.&lt;/p&gt;

&lt;p&gt;When you make an &lt;code&gt;insert&lt;/code&gt;, &lt;code&gt;update&lt;/code&gt;, or &lt;code&gt;delete&lt;/code&gt; to a Sequin-synced table, we're able to check the batch size of your query. If you're operating on more records than we can modify in a single API request, we'll return an error. Otherwise, we'll commit the changes to the API. If the API request succeeds, we'll commit your changes to the database and complete your query – including fulfilling your &lt;code&gt;returning&lt;/code&gt; clause if you had one. If the API request fails, we'll rollback the changes in your database and return a helpful Postgres error to your client.&lt;/p&gt;

&lt;p&gt;Some 80%+ of the operations we all perform on APIs are just standard CRUD. SQL or your favorite ORM is a great interface for CRUD, and far easier and faster to use than an HTTP API. It's such a cool experience, and we love seeing customers' reactions every time they get to play with it.&lt;/p&gt;

&lt;p&gt;If you’re curious to give it a whirl for yourself, &lt;a href="https://app.sequin.io"&gt;sign-up for a free trial&lt;/a&gt;. Otherwise, be sure to subscribe to our blog to catch future posts where we detail how our Postgres proxy works.&lt;/p&gt;




&lt;ol&gt;

&lt;li id="fn1"&gt;
&lt;p&gt;The row the database returns must reflect the row we upsert &lt;em&gt;after&lt;/em&gt; getting a response from the API. That means it will be fully populated, with a canonical ID and the like. ↩&lt;/p&gt;
&lt;/li&gt;

&lt;li id="fn2"&gt;
&lt;p&gt;For example, imagine if it took us 5 API requests to perform all the mutations in a single commit. What happens if the third API request fails validation? The commit was only partially committed to the API, but in Postgres it's all-or-nothing. ↩&lt;/p&gt;
&lt;/li&gt;

&lt;li id="fn3"&gt;
&lt;p&gt;One example: some clients send an empty &lt;code&gt;SimpleQuery&lt;/code&gt; message as a keep alive. An empty &lt;code&gt;SimpleQuery&lt;/code&gt; is strange. A &lt;code&gt;Sync&lt;/code&gt; is better suited for this purpose. ↩&lt;/p&gt;
&lt;/li&gt;

&lt;/ol&gt;

</description>
      <category>webdev</category>
      <category>postgres</category>
    </item>
    <item>
      <title>How we built our sync on HubSpot's API</title>
      <dc:creator>Anthony Accomazzo</dc:creator>
      <pubDate>Tue, 28 Mar 2023 16:58:39 +0000</pubDate>
      <link>https://forem.com/acco/how-we-built-our-sync-on-hubspots-api-3gpi</link>
      <guid>https://forem.com/acco/how-we-built-our-sync-on-hubspots-api-3gpi</guid>
      <description>&lt;p&gt;HubSpot is a leading platform for marketing, sales, and customer service. For companies that use HubSpot heavily, many operational workflows originate in the platform. Critical data about their customers pass in and out of HubSpot and their core services.&lt;/p&gt;

&lt;p&gt;Like other CRMs, HubSpot allows for a lot of customization. With &lt;em&gt;custom objects&lt;/em&gt;, you can create tables in HubSpot that match your domain. With &lt;em&gt;associations&lt;/em&gt;, you can connect related objects together in meaningful ways, such as linking contacts to their respective companies or deals.&lt;/p&gt;

&lt;p&gt;This means organizations often "blur the line" between their database and their HubSpot data. Records often need to move seamlessly from HubSpot to their application – and back again.&lt;/p&gt;

&lt;p&gt;HubSpot has an API to facilitate reading data from and writing changes to their platform.&lt;/p&gt;

&lt;p&gt;One of the most common use cases of an API is &lt;a href="https://dev.to__GHOST_URL__/whats-changed-in-your-api/"&gt;figuring out what's changed&lt;/a&gt;. As a developer, you need to find out what happened in an upstream API so that you can make updates to your local data or trigger side effects.&lt;/p&gt;

&lt;p&gt;Figuring out what's changed in HubSpot is challenging because:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;There is no &lt;a href="https://dev.to__GHOST_URL__/events-not-webhooks/"&gt;&lt;code&gt;/events&lt;/code&gt; endpoint&lt;/a&gt; that lists changes.&lt;/li&gt;
&lt;li&gt;There are no &lt;a href="https://developers.hubspot.com/docs/api/webhooks#webhook-subscriptions"&gt;webhooks&lt;/a&gt; for custom objects.&lt;/li&gt;
&lt;li&gt;There is limited queryability around associations.&lt;/li&gt;
&lt;li&gt;Query support for deleted objects is incomplete.&lt;/li&gt;
&lt;li&gt;The Search API is eventually consistent.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;HubSpot is one of the sources we sync at Sequin. Detecting changes (often called change data capture) is the backbone of any sync process. So, we had to figure out how to overcome these challenges to detect changes in HubSpot to power our sync.&lt;/p&gt;

&lt;p&gt;In this post, I'll break our strategy down. Our sync process consists of three primary parts, which I'll step through in order:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;Schema introspection&lt;/strong&gt;, where we determine the list of syncable objects from HubSpot and their schemas.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Backfilling&lt;/strong&gt;, where we paginate over the entire HubSpot instance to grab historical data.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Syncing&lt;/strong&gt;, where we poll HubSpot for changes.&lt;/li&gt;
&lt;/ol&gt;

&lt;h2&gt;
  
  
  Introspecting the schema
&lt;/h2&gt;

&lt;p&gt;HubSpot has some reflection APIs you can use to &lt;a href="https://developers.hubspot.com/docs/api/crm/crm-custom-objects"&gt;list the objects and custom objects in a HubSpot instance&lt;/a&gt;. You can also describe the fields of those objects.&lt;/p&gt;

&lt;p&gt;To list the schemas for all the custom objects in your instance, call &lt;code&gt;GET /crm/v3/schemas&lt;/code&gt;. The response contains all the info you'll need to determine how to parse JSON objects for use in your code or database. It contains the full list of &lt;code&gt;properties&lt;/code&gt; for each object and the type of each property (e.g. &lt;code&gt;text&lt;/code&gt; or &lt;code&gt;datetime&lt;/code&gt;):&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;{
  "results": [
      {
          "labels": { "singular": "Car", "plural": "Cars" },
          "requiredProperties": ["year", "vin", "model", "make"],
          "searchableProperties": ["year", "vin", "model", "make"],
          "primaryDisplayProperty": "model",
          "secondaryDisplayProperties": ["make"],
          "archived": false,
          "id": "7171718",
          "fullyQualifiedName": "p82918_car",
          "properties": [
            {
              "updatedAt": "2022-05-26T00:13:43.786Z",
              "createdAt": "2022-05-26T00:05:17.903Z",
              "name": "color",
              "label": "Color",
              "type": "string",
              "fieldType": "text",
              "description": "",
              "groupName": "car_information",
              "options": [],
              "updatedUserId": "44561081",
              "displayOrder": -1,
              "calculated": false,
              "externalOptions": false,
              "archived": false,
              "hasUniqueValue": false,
              "hidden": false,
              "modificationMetadata": {
                "archivable": true,
                "readOnlyDefinition": false,
                "readOnlyValue": false
              },
              ...
      }
  ]
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;Curiously, only custom objects are listed here, not core HubSpot objects. To get the schema for core objects like &lt;code&gt;Company&lt;/code&gt; or &lt;code&gt;Contact&lt;/code&gt;, you'll need to call the dedicated schema endpoints for those objects. Here's the list of those objects and endpoints:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;code&gt;Company&lt;/code&gt; @ &lt;code&gt;/crm/v3/schemas/companies&lt;/code&gt;
&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;Contact&lt;/code&gt; @ &lt;code&gt;/crm/v3/schemas/contacts&lt;/code&gt;
&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;Deal&lt;/code&gt; @ &lt;code&gt;/crm/v3/schemas/deals&lt;/code&gt;
&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;Line item&lt;/code&gt; @ &lt;code&gt;/crm/v3/schemas/line_items&lt;/code&gt;
&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;Product&lt;/code&gt; @ &lt;code&gt;/crm/v3/schemas/products&lt;/code&gt;
&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;Quote&lt;/code&gt; @ &lt;code&gt;/crm/v3/schemas/quotes&lt;/code&gt;
&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;Ticket&lt;/code&gt; @ &lt;code&gt;/crm/v3/schemas/tickets&lt;/code&gt;
&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Finally, some objects don't seem to be present anywhere. For example, there is no schema endpoint for a HubSpot Pipeline. For these objects, your best bet is to determine their schema definition manually.&lt;/p&gt;

&lt;h2&gt;
  
  
  Backfilling
&lt;/h2&gt;

&lt;p&gt;A very common workflow is &lt;strong&gt;backfilling&lt;/strong&gt; or paginating over an entire dataset. Backfilling is integral to setting up a sync on top of an API – you'll need to pull in the data that was created prior to your sync going live.&lt;/p&gt;

&lt;p&gt;We decided to use HubSpot's Search API for backfilling. This felt like the best way to paginate through the history in a stable way that would ensure we didn't miss anything.&lt;/p&gt;

&lt;h3&gt;
  
  
  The Search API
&lt;/h3&gt;

&lt;p&gt;To use the Search API for stable pagination, you need three components:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;A query filter&lt;/li&gt;
&lt;li&gt;The list of properties you want returned&lt;/li&gt;
&lt;li&gt;A sort parameter&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;You'll send them along in the JSON body of your Search API request like this:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;POST /crm/v3/objects/{object_name}/search

{
  filterGroups: [ /* ... */ ],
  properties: [ /* ... */ ],
  sorts: [ /* ... */ ]
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;&lt;strong&gt;&lt;code&gt;filterGroups&lt;/code&gt;&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;HubSpot's query filters match a specific format. At Sequin, we use a query filter that looks like this:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;"filters": [
    {
      "propertyName": "createdate",
      "operator": "GTE",
      "value": {cursor}
    }
]
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;This essentially constructs a query that looks like this:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;createdate &amp;gt;= {cursor}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;The &lt;code&gt;propertyName&lt;/code&gt; is the field that you want to filter on. Here, we specify the &lt;code&gt;createdate&lt;/code&gt; field. Unfortunately, not every object has a &lt;code&gt;createdate&lt;/code&gt; field – for many non-custom objects (but not all), this field is &lt;code&gt;hs_createdate&lt;/code&gt;.&lt;/p&gt;

&lt;p&gt;The &lt;code&gt;operator&lt;/code&gt; specifies &lt;code&gt;GTE&lt;/code&gt; or "greater than or equal." And &lt;code&gt;value&lt;/code&gt; is the timestamp we're comparing to. We start with &lt;code&gt;0&lt;/code&gt;, then use the latest &lt;code&gt;createdate&lt;/code&gt; in the response to compute our next &lt;code&gt;cursor&lt;/code&gt;.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;&lt;code&gt;properties&lt;/code&gt;&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;By default, the Search API returns only a few properties on the objects it returns. You'll need to specify which properties you want back in your response. There doesn't appear to be a limit on the size of this parameter, so you can just set &lt;code&gt;properties&lt;/code&gt; to a list of all properties on the object.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;&lt;code&gt;sorts&lt;/code&gt;&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;For backfills, we use a &lt;code&gt;sorts&lt;/code&gt; parameter like this:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;{
  "propertyName": "createdate",
  "direction": "ASCENDING"
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;Again, &lt;code&gt;propertyName&lt;/code&gt; will vary based on the object – many objects instead call this property &lt;code&gt;hs_createdate&lt;/code&gt;.&lt;/p&gt;

&lt;p&gt;Together, these three parameters in our Search API request ensure that we're able to paginate the full table of each HubSpot object without missing any records.&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;This strategy alone suffers from one fatal flaw: &lt;strong&gt;you can get stuck&lt;/strong&gt;. If you get back a page of HubSpot records that all have the same &lt;code&gt;createdate&lt;/code&gt;, you can't move forward with a &lt;code&gt;GTE&lt;/code&gt; operator – you have nothing to increment your cursor with! So, we do some extra filtering and sorting that incorporates the ID of records as well to "get through" the deadlock.&lt;/p&gt;

&lt;p&gt;This is only usually a problem on the busiest HubSpot instances. If absolute consistency isn't an issue for you, you can also consider just using a &lt;code&gt;GT&lt;/code&gt; operator to be safe. While you might miss objects, that operator will never get you stuck in the stream.&lt;/p&gt;
&lt;/blockquote&gt;

&lt;h3&gt;
  
  
  Data type issues in the Search API
&lt;/h3&gt;

&lt;p&gt;We've seen some strange values come back from HubSpot's Search API. Once in a while, we'll get back an integer in a &lt;code&gt;date&lt;/code&gt; field or a string in a &lt;code&gt;numeric&lt;/code&gt;.&lt;/p&gt;

&lt;p&gt;We're still not sure where these come from. We try to recover the original type if we can, but usually our system ends up needing to just throw the values out. They're rare enough that we haven't been able to get to the bottom of what causes them to appear.&lt;/p&gt;

&lt;h2&gt;
  
  
  Syncing changes
&lt;/h2&gt;

&lt;p&gt;Backfilling is a one-off process that we run when a sync is first established or when a re-sync is kicked off (in the case that a customer of ours e.g. adds a new column to their synced table.)&lt;/p&gt;

&lt;p&gt;Syncing changes is far more critical – and tricky. While our sync &lt;em&gt;could&lt;/em&gt; just use the backfill process over and over again to keep HubSpot and Postgres in-sync, this would be wildly inefficient. Large HubSpot instances can take many hours to backfill.&lt;/p&gt;

&lt;p&gt;Instead, we need to query HubSpot in such a way that we can efficiently see what's changed from one request to the next.&lt;/p&gt;

&lt;p&gt;As noted in the beginning of this post, this is uniquely challenging with HubSpot. Change detection is not standardized across the API. For example, webhooks are only supported on &lt;a href="https://developers.hubspot.com/docs/api/webhooks#webhook-subscriptions"&gt;six objects&lt;/a&gt;. And some changes are not possible to detect at all.&lt;/p&gt;

&lt;p&gt;We use the Search API to detect creates and updates. We use the standard list objects endpoint for deletes, where supported. And we use a combination of endpoints to create a synthetic change stream for associations.&lt;/p&gt;

&lt;h3&gt;
  
  
  Querying the Search API for changes
&lt;/h3&gt;

&lt;p&gt;The Search API is the primary API for finding out what's changed in HubSpot.&lt;/p&gt;

&lt;p&gt;As mentioned in "Backfilling," we use three parameters in our search requests: &lt;code&gt;properties&lt;/code&gt;, &lt;code&gt;filterGroups&lt;/code&gt;, and &lt;code&gt;sorts&lt;/code&gt;. Our pagination strategy for syncing changes is similar to backfilling: use a timestamp to page through the changes. Except we're filtering and sorting on a different property:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;"filters": [
    {
      // See note below on this property
      "propertyName": "lastmodifieddate",
      "operator": "GTE",
      "value": {cursor}
    }
]
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;Again, the &lt;code&gt;propertyName&lt;/code&gt; to use here is not standard across objects – for some objects, it's &lt;code&gt;hs_lastmodifieddate&lt;/code&gt;.&lt;/p&gt;

&lt;p&gt;With this method, our system only needs to process records that were recently created or updated. We use &lt;code&gt;GTE&lt;/code&gt; so we don't accidentally skip records in situations where multiple records were created at the same time. (This means we'll always "see" one record twice – once in request &lt;code&gt;n&lt;/code&gt; and again in request &lt;code&gt;n+1&lt;/code&gt; – but the inefficiency is worth the consistency guarantee.)&lt;/p&gt;

&lt;p&gt;One caveat with this method is that it's subject to getting stuck in certain situations with highly active HubSpot accounts, similar to our backfill method. (See "Backfills.)&lt;/p&gt;

&lt;p&gt;With this strategy, you can detect &lt;strong&gt;creates&lt;/strong&gt; and &lt;strong&gt;updates&lt;/strong&gt; for &lt;strong&gt;objects&lt;/strong&gt; and &lt;strong&gt;custom objects&lt;/strong&gt;. You can't detect deletes. And associations are not supported in the Search API at all.&lt;/p&gt;

&lt;h3&gt;
  
  
  Detecting deletes
&lt;/h3&gt;

&lt;p&gt;A common limitation of APIs is that they support seeing changes in records that still exist but there's no such change detection for records that have been deleted (they're no longer present anywhere for you to "see.")&lt;/p&gt;

&lt;p&gt;HubSpot has this limitation as well, but not for all objects. Fortunately, for standard objects, they provide a way to retrieve deletes.&lt;/p&gt;

&lt;p&gt;To detect deletes (or "archived objects"), you need to use the standard list objects endpoint:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;GET /crm/v3/objects/{object}?after={cursor}&amp;amp;archived=true
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;We can paginate this endpoint with the &lt;code&gt;after&lt;/code&gt; cursor. We get this in the &lt;code&gt;paging.next.after&lt;/code&gt; property from the previous response. &lt;code&gt;archived=true&lt;/code&gt; is how we tell HubSpot to give us deleted objects.&lt;/p&gt;

&lt;p&gt;You might ask: If we can use this endpoint for grabbing archived objects, why can't we use this to do regular change detection as well? Can't we just paginate &lt;em&gt;this&lt;/em&gt; endpoint to see which records have been created or updated, as opposed to the Search API?&lt;/p&gt;

&lt;p&gt;Our concern is stability. We can't confirm the sort or stability of this endpoint. When an endpoint is dynamic (things are being updated and created and deleted between requests), a limit/offset pagination strategy can make it easy for you to accidentally skip records.&lt;/p&gt;

&lt;p&gt;We can't use the Search API for archived objects, though. And archived objects are stable, so this strategy is safe.&lt;/p&gt;

&lt;p&gt;A huge bummer, however, is that &lt;a href="https://community.hubspot.com/t5/APIs-Integrations/Paging-through-deleted-objects-is-not-yet-supported-for-custom/td-p/627505"&gt;custom objects do not support the &lt;code&gt;archived&lt;/code&gt; flag&lt;/a&gt;. And as we'll see, archived associations don't have a way to be retrieved either – but nothing is standard about associations anyway.&lt;/p&gt;

&lt;h2&gt;
  
  
  Associations
&lt;/h2&gt;

&lt;p&gt;Associations are a key feature of HubSpot. On top of associating any object to another, you can add labels to your associations to create rich networks of objects.&lt;/p&gt;

&lt;p&gt;However, associations are the most challenging part of HubSpot's API to sync. They are not supported to the same degree as objects in the API. You can't use the search API to find out which associations have been updated. And associations don't have &lt;code&gt;created_at&lt;/code&gt; or &lt;code&gt;updated_at&lt;/code&gt; timestamps on them.&lt;/p&gt;

&lt;p&gt;We're hopeful HubSpot will improve their support for accessing associations. In the meantime, in order to sync them, you're forced to do a full sweep: you have to paginate through every association on a regular interval, inserting new associations and updating existing ones as you go.&lt;/p&gt;

&lt;h3&gt;
  
  
  Using the list objects API to fetch associations
&lt;/h3&gt;

&lt;p&gt;There are two endpoints you can use to access associations.&lt;/p&gt;

&lt;p&gt;The first option is to make calls to the standard list objects API. You can include the &lt;code&gt;associations&lt;/code&gt; parameter, which will have HubSpot return the associations you list for each object returned:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;GET /crm/v3/objects/companies?associations=CONTACT
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;The second option is to use the batch associations API. This endpoint will list up to 1,000 associations per object per call:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;GET /crm/v4/objects/{objectType}/{objectId}/associations/{toObjectType}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;We ended up using both endpoints in combination for our sync.&lt;/p&gt;

&lt;p&gt;We use the standard list objects API to fetch a list of 100 objects and the associations for each of those objects. There's a catch, though: HubSpot only returns 100 associations per object. That means if you have an object with a lot of associations, you have to further paginate that object. If that object has thousands of associations, that could mean tens of API requests just to retrieve all the associations for that one object!&lt;/p&gt;

&lt;p&gt;So, we then follow-up that request with zero or more batch association requests. For each object in our first request that had more than 100 associations, we make a batch association request to try to grab the rest. That will let us see 1,000 associations for that object at a time vs just 100.&lt;/p&gt;

&lt;p&gt;As long as the count of associations per object follows a power law (only ~10% of objects have thousands of associations), this works reasonably well.&lt;/p&gt;

&lt;h3&gt;
  
  
  Mitigating inefficiency
&lt;/h3&gt;

&lt;p&gt;The full sweep strategy is very inefficient. Sync time increases linearly with the count of associations. And any given request contains mostly associations we've seen before, meaning we're spending a lot of network and CPU retrieving and parsing JSON that contains no new information.&lt;/p&gt;

&lt;p&gt;We want to contain this inefficiency as much as possible. The impact would be even worse if we blindly sent associations through the rest of our pipeline. If the database also had to process every association record we saw, we'd be paying the cost in multiple areas.&lt;/p&gt;

&lt;p&gt;So to mitigate and isolate the inefficiency, we use an in-memory cache. The cache is essentially an md5 fingerprint that tells us if we've seen an association before or not.&lt;/p&gt;

&lt;h3&gt;
  
  
  Deletes
&lt;/h3&gt;

&lt;p&gt;Our in-memory cache is helpful for another purpose: association deletes.&lt;/p&gt;

&lt;p&gt;HubSpot provides for no way to detect if an association has been deleted. So, we use a "mark-and-sweep" strategy in our sync.&lt;/p&gt;

&lt;p&gt;As we're paging through associations, we keep a record of all the associations that we've seen in this sync cycle. When we reach the end, we know which associations we should keep – and which we should flush. We can then issue a &lt;code&gt;delete&lt;/code&gt; command to the database to drop the associations we saw in the last sync cycle but not in this one.&lt;/p&gt;

&lt;h2&gt;
  
  
  Eventual consistency
&lt;/h2&gt;

&lt;p&gt;While the Search API is powerful and does most of what we want for syncing objects and custom objects, it has one major flaw: it's eventually consistent.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://dev.to__GHOST_URL__/finding-and-fixing-eventual-consistency-with-stripe-events/"&gt;As we've written about&lt;/a&gt;, eventual consistency can be the bane of any integration. When an API is eventually consistent, you can miss creates, updates, and deletes without knowing it. Eventual consistency issues are hard to debug and hard to mitigate.&lt;/p&gt;

&lt;p&gt;In HubSpot's Search API, objects can arrive out-of-order. To illustrate an example, I'll use a human-readable form of HubSpot's &lt;code&gt;filterGroups&lt;/code&gt; syntax. I'll also use human-readable timestamps without a date.&lt;/p&gt;

&lt;p&gt;Let's say you make a request to the search endpoint at exactly &lt;code&gt;12:00:10&lt;/code&gt; with a cursor from just 10 seconds before, &lt;code&gt;12:00:00&lt;/code&gt;:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;"lastmodifieddate" ≥ 12:00:00
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;This should return all records in the system that have been updated since &lt;code&gt;12:00:00&lt;/code&gt;. What we've observed can happen is that HubSpot can return a response like this:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;[
  {
    "id": "some-id-1",
    "lastmodifieddate": "12:00:07"
    // ...
  },
  {
    "id": "some-id-2",
    "lastmodifieddate": "12:00:08",
    // ...
  }
]
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;Seeing this response, we assume that just two records have been updated since &lt;code&gt;12:00:00&lt;/code&gt;. For our next cursor, we can use &lt;code&gt;12:00:08&lt;/code&gt;. We grab that cursor, and continue on our way.&lt;/p&gt;

&lt;p&gt;The problem: there was actually &lt;em&gt;another record&lt;/em&gt; that was created/updated between &lt;code&gt;12:00:00&lt;/code&gt; and &lt;code&gt;12:00:08&lt;/code&gt;. It's just not being returned by the Search API yet.&lt;/p&gt;

&lt;p&gt;We can confirm this a minute later by repeating our original request, again asking for records created/updated after &lt;code&gt;12:00:00&lt;/code&gt;:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;// one minute later, run this again
"lastmodifieddate" ≥ 12:00:00
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;And then we'll see a record appear this time, with a &lt;code&gt;lastmodifieddate&lt;/code&gt; prior to &lt;code&gt;12:00:08&lt;/code&gt;:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;[
  // New record appears!
  {
    "id": "some-id-0",
    "lastmodifieddate": "12:00:06"
    // ...
  },
  {
    "id": "some-id-1",
    "lastmodifieddate": "12:00:07"
    // ...
  },
  {
    "id": "some-id-2",
    "lastmodifieddate": "12:00:08",
    // ...
  }
]
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;Because when we initially made this request we updated our cursor to &lt;code&gt;12:00:08&lt;/code&gt; to make our next request, &lt;strong&gt;we will never see this update.&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Objects arriving out-of-order to a stream like this is no good. It means we can't increment our cursor with confidence, as we're not sure if a given page that we've retrieved has "settled" yet.&lt;/p&gt;

&lt;p&gt;We've reached out to HubSpot for comment on this and will update this post when we hear back. Our guess is that the eventual consistency emerges because they're streaming changes from their database to some second store for search querying (like Elastic.) And must be doing so in such a way that does not guarantee strict ordering of changes.&lt;/p&gt;

&lt;p&gt;We're still determining how long it can take the Search API to reach consistency. The safest option is to avoid cursoring up until the present. If you run your pagination a few minutes behind, you'll significantly reduce the chances you'll miss any changes.&lt;/p&gt;

&lt;p&gt;For our sync, we wanted to continue to propagate changes in as close to real-time as possible. But consistency is of utmost importance to us. So we run two sync processes for each object that we sync: one on the "bleeding edge" of changes and another several minutes behind to catch stragglers.&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;This means changes can arrive out-of-order to our customers' databases, but this doesn't seem to be an issue. It's rare that it really matters that this contact was inserted before that one. What's most important is that both contacts make it into your database.&lt;/p&gt;
&lt;/blockquote&gt;

&lt;h2&gt;
  
  
  Rate limits
&lt;/h2&gt;

&lt;p&gt;One final consideration when architecting a sync on HubSpot: rate limits.&lt;/p&gt;

&lt;p&gt;The syncing strategies we've outlined can consume a ton of rate limit. You'll need to evaluate how this consumption interacts with any other integrations you have set up for your HubSpot account.&lt;/p&gt;

&lt;p&gt;Fortunately, OAuth apps benefit from their own dedicated rate limit. This means that our solution can interact with our customers' HubSpot instances without interfering with any other integrations they may have configured to access their HubSpot API.&lt;/p&gt;

&lt;p&gt;If you encounter rate limit issues with your own syncing process, you can explore options such as the "API add-on," which substantially increases the number of requests available for your HubSpot account. Alternatively, you can configure your app as a private OAuth app, granting it a separate pool of requests to draw from.&lt;/p&gt;

&lt;h2&gt;
  
  
  Conclusion
&lt;/h2&gt;

&lt;p&gt;HubSpot's API poses some unique challenges for syncing. It's frustrating that their webhook support is so limited. If webhooks were supported comprehensively, we could use that to power the real-time nature of our sync. And then we could use polling the search API as a "sweep" operation to ensure we didn't miss anything. We could run that poll way behind the present to avoid eventual consistency issues.&lt;/p&gt;

&lt;p&gt;Further, everyone would benefit if HubSpot better supported querying associations through their API. Associations are so vital to a CRM like HubSpot – they're the &lt;em&gt;R&lt;/em&gt; in CRM! Improved access to associations would make things far easier for the developer. And would mean syncs like ours would impose far less load on their system. Sending countless gigabytes of unnecessary JSON through both of our systems feels tragically inefficient.&lt;/p&gt;

&lt;p&gt;Finally, the differentiated treatment between objects and custom objects adds needless friction to adoption. Both in little areas, like not returning every object in the &lt;code&gt;/schemas&lt;/code&gt; endpoint. And in big areas, like not being able to detect deletes for custom objects.&lt;/p&gt;

&lt;p&gt;All that said, with a sync in place, having your HubSpot data in your local datastore provides a ton of benefits. You abstract API complexity away from the rest of your app. Remote calls become local ones, reducing cognitive overhead around guarding for failures or availability issues. You don't have to think about rate limits. Reads are lightning fast. And when your data is in a database, it's far more queryable–you can perform complex queries and analysis easily.&lt;/p&gt;

</description>
      <category>webdev</category>
      <category>api</category>
    </item>
    <item>
      <title>Finding and fixing eventual consistency with Stripe events</title>
      <dc:creator>Anthony Accomazzo</dc:creator>
      <pubDate>Thu, 23 Mar 2023 16:40:02 +0000</pubDate>
      <link>https://forem.com/acco/finding-and-fixing-eventual-consistency-with-stripe-events-a6i</link>
      <guid>https://forem.com/acco/finding-and-fixing-eventual-consistency-with-stripe-events-a6i</guid>
      <description>&lt;p&gt;At &lt;a href="https://sequin.io"&gt;Sequin&lt;/a&gt;, the backbone of our syncing infrastructure is polling. This is because polling provides stronger consistency guarantees than webhooks.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://blog.sequin.io/events-not-webhooks/"&gt;As we've written about&lt;/a&gt;, when you use webhooks, you give up some control: webhooks are ephemeral. If your service is down or you mishandle a webhook you receive, you're out of luck. You're also at the whims of the webhook provider. They might drop a webhook altogether, meaning you'll never have a chance to process it.&lt;/p&gt;

&lt;p&gt;Polling is not without its challenges, however. Besides the complexity of maintaining polling infrastructure, the hardest part about polling is &lt;em&gt;cursoring&lt;/em&gt; or paging through a stream of events. When cursoring through API items, you need to traverse the list in such a way that you don't miss any items. (And, ideally, you don't &lt;em&gt;repeat&lt;/em&gt; items often either, as that's inefficient.)&lt;/p&gt;

&lt;p&gt;Cursoring is surprisingly hard, as most APIs &lt;a href="https://blog.sequin.io/whats-changed-in-your-api/"&gt;don't make it easy to see what's changed in them&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;Cursoring becomes extra hard if the API you're querying is &lt;em&gt;eventually consistent&lt;/em&gt;. In an API, eventual consistency means that your result set is not stable – the results you get from a request can change the next time you make the same request. This adds a lot of complexity, as you have to write defensive code.&lt;/p&gt;

&lt;h2&gt;
  
  
  Stripe events
&lt;/h2&gt;

&lt;p&gt;Stripe is one of the rare API providers that has thoughtful solutions for change detection. They have a &lt;a href="https://stripe.com/docs/api/events"&gt;dedicated &lt;code&gt;/events&lt;/code&gt; endpoint&lt;/a&gt; where they publish most of the changes that happen in their system. Examples include an event for every time a customer is created, a subscription is updated, or a new payment goes through.&lt;/p&gt;

&lt;p&gt;We've been happy consumers of Stripe's &lt;code&gt;/events&lt;/code&gt; and want to see more endpoints like it across other APIs.&lt;/p&gt;

&lt;p&gt;However, due to the demanding nature of our real-time sync, we poll the &lt;code&gt;/events&lt;/code&gt; endpoint frequently – multiple times per second. This means we're susceptible to even the slightest eventual consistency issues. And, indeed, we found a situation with the &lt;code&gt;/events&lt;/code&gt; endpoint.&lt;/p&gt;

&lt;p&gt;I'll give some background on the &lt;code&gt;/events&lt;/code&gt; endpoint, discuss the issue we encountered, then tell you how we're mitigating the issue.&lt;/p&gt;

&lt;h3&gt;
  
  
  Paginating events
&lt;/h3&gt;

&lt;p&gt;Most Stripe objects have a &lt;code&gt;created&lt;/code&gt; property. This property is a Unix timestamp in seconds.&lt;/p&gt;

&lt;p&gt;As a result, there are many Stripe events that will share the same &lt;code&gt;created&lt;/code&gt; timestamp in a given Stripe account. For example, certain Stripe operations cause many Stripe records to be created at the same time. When a customer signs up for your service and starts a new subscription, Stripe creates a bunch of objects like a &lt;code&gt;customer&lt;/code&gt; and &lt;code&gt;subscription&lt;/code&gt; for that customer.&lt;/p&gt;

&lt;p&gt;Normally, if we were cursoring Stripe's API with a &lt;code&gt;created&lt;/code&gt; timestamp, this could be a problem. For example, consider this simplified HTTP query:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;GET api.stripe.com/v1/events?createdAfter=${cursor}&amp;amp;limit=100
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Using &lt;code&gt;created &amp;gt; cursor&lt;/code&gt; would be a problem because we could easily skip any other events created in the same timestamp. Likewise, this could be a problem as well:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;GET api.stripe.com/v1/events?createdAtOrAfter=${cursor}&amp;amp;limit=100
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Here, using &lt;code&gt;created &amp;gt;= cursor&lt;/code&gt; we'd have the potential of &lt;em&gt;getting stuck&lt;/em&gt; on a page where every event had the same &lt;code&gt;created&lt;/code&gt; timestamp – there would be no way for us to move forward.&lt;/p&gt;

&lt;p&gt;Fortunately, Stripe lets us cursor by the event's ID. We can make a request to get some stream of Stripe events, like this (for brevity, I'll include just the &lt;code&gt;id&lt;/code&gt; and &lt;code&gt;created&lt;/code&gt; properties of each event):&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;GET api.stripe.com/v1/events?ending_before=evt_1MoCivKddDnm8ttlZ19ZW52C

{
  "data": [
         {
              "created": 1679422378,
              "id": "evt_1Mo9g6KddDnm8ttlWtWxdDBt"
              # ...
            }
            {
              "created": 1679422373,
              "id": "evt_1Mo9g1KddDnm8ttlitN7Jl38"
            # ...
            }
            {
              "created": 1679422371,
              "id": "evt_1Mo9fzKddDnm8ttlPDyDsUez"
            # ...
            }
            {
              "created": 1679422292,
              "id": "evt_1Mo9eiKddDnm8ttlip9sgaB5"
            # ...
            }
            {
              "created": 1679422292,
              "id": "evt_3MgXZgKddDnm8ttl09DxjuvM"
            # ...
            }
        ]
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The list of events is returned sorted by created descending. So, the most recent event in the list is on top. Assuming we're paginating through the stream from oldest → newest, to continue pagination, we'd pluck the event ID at the top (&lt;code&gt;evt_1MoCivKddDnm8ttlZ19ZW52C&lt;/code&gt;) and send that along as our &lt;code&gt;ending_before&lt;/code&gt; to continue forward.&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;One odd thing to note is that the event IDs themselves are &lt;em&gt;not&lt;/em&gt; strictly ordered. Note that the last event in the list begins with &lt;code&gt;evt_3&lt;/code&gt; which is "greater than" the event above it, &lt;code&gt;evt_1&lt;/code&gt;. We'll discuss this more in a bit.&lt;/p&gt;
&lt;/blockquote&gt;

&lt;h3&gt;
  
  
  Missing events
&lt;/h3&gt;

&lt;p&gt;We had some customers report missing items in their sync. This kicked off an investigation. We logged every request and response to and from Stripe. We then ran audits comparing the state of our synced database over time to the state of Stripe's API.&lt;/p&gt;

&lt;p&gt;When our audits caught a missing item in our database – say, a missing Stripe &lt;code&gt;subscription&lt;/code&gt; – we had the full trail of evidence to determine how we got there.&lt;/p&gt;

&lt;p&gt;Our investigation revealed: the &lt;code&gt;/events&lt;/code&gt; endpoint is eventually consistent!&lt;/p&gt;

&lt;h2&gt;
  
  
  Eventually consistent &lt;code&gt;/events&lt;/code&gt;
&lt;/h2&gt;

&lt;p&gt;Here's the behavior we observed: We make a request to Stripe with some event ID, say &lt;code&gt;evt_0&lt;/code&gt;. We get back a list of 3 events. For brevity, I'll just include the &lt;code&gt;id&lt;/code&gt; and &lt;code&gt;created&lt;/code&gt; properties of each event. To make the &lt;code&gt;created&lt;/code&gt; timestamps easier to read, I've formatted them into human-readable strings:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;[
    {
      "created": "12:07:00",
      "id": "evt_3"
      # ...
    },
    {
      "created": "12:05:00",
      "id": "evt_2"
    # ...
    },
    {
      "created": "12:00:00",
      "id": "evt_1"
    # ...
    }
]
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Given this response, our next cursor becomes &lt;code&gt;evt_3&lt;/code&gt;. So, we make that request and get back the following events:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;[
    {
      "created": "12:07:01",
      "id": "evt_7"
      # ...
    },
    {
      "created": "12:07:01",
      "id": "evt_6"
    # ...
    }
]
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Problem is, at the &lt;code&gt;12:07:00&lt;/code&gt; timestamp, &lt;code&gt;evt_3&lt;/code&gt; wasn't the only event that occurred. There are two other events, &lt;code&gt;evt_4&lt;/code&gt; and &lt;code&gt;evt_5&lt;/code&gt; which were not present in the first response. For some reason, when we used &lt;code&gt;evt_3&lt;/code&gt; to get our second response, the stream started at &lt;code&gt;evt_6&lt;/code&gt; – which occurred at &lt;code&gt;12:07:01&lt;/code&gt;, the second after the batch of events took place.&lt;/p&gt;

&lt;p&gt;We can see this play out in our historic request/response logs. &lt;strong&gt;Yet, when we replay the request later with &lt;code&gt;evt_3&lt;/code&gt;, we &lt;em&gt;do&lt;/em&gt; get back &lt;code&gt;evt_4&lt;/code&gt; and &lt;code&gt;evt_5&lt;/code&gt; in the response!&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;This suggests there's something eventually consistent about Stripe's &lt;code&gt;/events&lt;/code&gt; API. If we paginate through the endpoint using Event IDs, we're subject to skipping events. And because we query Stripe's &lt;code&gt;/events&lt;/code&gt; endpoint multiple times per second, we're especially vulnerable to this issue.&lt;/p&gt;

&lt;h3&gt;
  
  
  How is this happening?
&lt;/h3&gt;

&lt;p&gt;We're not sure why this is happening. We've confirmed it can happen when events are created in the same second, but haven't ruled out it happening in other situations.&lt;/p&gt;

&lt;p&gt;One theory we have: some Stripe event IDs are prefixed with &lt;code&gt;evt_3xxx&lt;/code&gt; and others with &lt;code&gt;evt_1xxx&lt;/code&gt;. These IDs seem to correspond to what object the event is enveloping. For example, events for &lt;code&gt;payment_intent&lt;/code&gt; and &lt;code&gt;charge&lt;/code&gt; always have an event ID with &lt;code&gt;evt_3xxx&lt;/code&gt;. It's possible that these objects are generated in a separate system that have their own ID generator. This could explain the objects potentially reaching the &lt;code&gt;/events&lt;/code&gt; endpoint out-of-order.&lt;/p&gt;

&lt;h2&gt;
  
  
  Solution
&lt;/h2&gt;

&lt;p&gt;To mitigate this issue, we're changing our cursoring logic. After receiving a response, to determine our cursor for the subsequent request, we follow a simple algorithm:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;If the &lt;code&gt;created&lt;/code&gt; value on the latest event is more than 5 seconds in the past, update to use that cursor.&lt;/li&gt;
&lt;li&gt;Otherwise, &lt;em&gt;do not update the cursor&lt;/em&gt;. Instead, use the same cursor we just used in our next request.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;This means we'll "see" the same events over several requests. And for very busy Stripe &lt;code&gt;/events&lt;/code&gt; endpoints, it could mean we add a few seconds of latency, as we might always be running just a tad behind the present. But the improved consistency guarantee is worth it.&lt;/p&gt;

&lt;p&gt;Without knowing the root cause, we can't be sure how much mitigation we'll need to resolve this issue. We'll update this post after we've run this algorithm in production for a bit and had a chance to measure drift.&lt;/p&gt;

&lt;p&gt;In general, finding out what's changed in an API is &lt;a href="https://blog.sequin.io/whats-changed-in-your-api/"&gt;an extremely common&lt;/a&gt; requirement for engineering teams. Eventual consistency makes this task very difficult. When designing your API, consider how you can use strategies that will make your API consistent and predictable.&lt;/p&gt;

</description>
      <category>api</category>
      <category>stripe</category>
    </item>
    <item>
      <title>How we sync Stripe to Postgres</title>
      <dc:creator>Anthony Accomazzo</dc:creator>
      <pubDate>Fri, 09 Jul 2021 00:12:43 +0000</pubDate>
      <link>https://forem.com/acco/how-we-sync-stripe-to-postgres-c15</link>
      <guid>https://forem.com/acco/how-we-sync-stripe-to-postgres-c15</guid>
      <description>&lt;p&gt;At Sync Inc, we replicate APIs to Postgres databases in real-time. We want to provide our customers with the experience of having direct, row-level access to their data from third-party platforms, like Stripe and Airtable.&lt;/p&gt;

&lt;p&gt;Stripe's API is great, so we knew our Stripe support would have to be very fast and reliable on day one. In order to replace API reads, our sync needs to create a database that is truly &lt;strong&gt;a second source of truth&lt;/strong&gt;. This means:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;We need to backfill all data&lt;/strong&gt;: On initial sync, we load in all historical data for a given Stripe account into the target Postgres database.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Support virtual objects&lt;/strong&gt;: Stripe has a few "virtual" objects, like "upcoming invoices." These objects are in constant flux until they are created (eg an upcoming invoice becomes an invoice.) You have to fetch these one-off as there's no place to paginate them. They don't even have primary keys.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;We provide a /wait endpoint&lt;/strong&gt;: As you'll see, customers can call the &lt;code&gt;/wait&lt;/code&gt; endpoint on Sync Inc after writing changes to Stripe. This endpoint will return a &lt;code&gt;200&lt;/code&gt; when we've confirmed the database is completely up-to-date. This means they can read from their database after writing to Stripe and know it's consistent.&lt;/li&gt;
&lt;/ol&gt;

&lt;h3&gt;
  
  
  Two primary sync strategies
&lt;/h3&gt;

&lt;p&gt;Our Stripe sync orbits around Stripe's &lt;code&gt;events&lt;/code&gt; endpoint. This endpoint serves the same purpose as a replication slot on a database. It contains a list of all create/update/delete events that have happened for a given account on Stripe.&lt;/p&gt;

&lt;p&gt;Each event contains a full payload of the affected record. We can use this event stream to effectively playback all changes to a Stripe account.&lt;/p&gt;

&lt;p&gt;However, as you might expect, the endpoint does not contain an unbounded list of all events in a Stripe account ever. It contains data from the last 30 days.&lt;/p&gt;

&lt;p&gt;So, this means that when our customers start up a new replica Postgres database, we need to &lt;em&gt;backfill&lt;/em&gt; it with historical Stripe data first. Backfilling just means paginating each and every endpoint back to the beginning of the Stripe account.&lt;/p&gt;

&lt;p&gt;What we ended up with was two distinct sync processes: Our &lt;em&gt;backfill&lt;/em&gt; process and our &lt;em&gt;events polling&lt;/em&gt; process.&lt;/p&gt;

&lt;p&gt;We run the backfill process first to build up the initial database. Then we run the events polling process continuously over the lifetime of the database to keep it in sync.&lt;/p&gt;

&lt;h3&gt;
  
  
  Sync process: The backfill
&lt;/h3&gt;

&lt;p&gt;During the backfill, we need to paginate through the full history of each endpoint on Stripe.&lt;/p&gt;

&lt;p&gt;Given the breadth of the Stripe API, there are a few of challenges the backfill poses:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;We need to make requests to dozens of API endpoints.&lt;/li&gt;
&lt;li&gt;Then, for each endpoint, we have to convert the JSON response into a structure that's ready to insert into Postgres.&lt;/li&gt;
&lt;li&gt;Further, each response can contain several layers of nested children. Those children can be &lt;em&gt;lists&lt;/em&gt; of children which are, in turn, paginateable.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;This was a great excuse to use Elixir's &lt;a href="https://github.com/dashbitco/broadway"&gt;Broadway&lt;/a&gt;. A Broadway pipeline consists of one &lt;em&gt;producer&lt;/em&gt; and one or more &lt;em&gt;workers&lt;/em&gt;. The producer is in charge of producing jobs. The workers consume and work those jobs, each working in parallel. Broadway gives us a few things out of the box:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;A worker queue with back-pressure.&lt;/li&gt;
&lt;li&gt;We can dynamically scale the number of workers in the pipeline based on the amount of work to do.&lt;/li&gt;
&lt;li&gt;We can easily rate limit the volume of work we process per unit of time. We tuned this to stay well below Stripe's API quota limit.&lt;/li&gt;
&lt;li&gt;A "message" construct with acknowledge/fail behaviors. This made things like retry logic trivial.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;In our case, the queue of work the producer maintains is a list of &lt;em&gt;pages&lt;/em&gt; to be processed. A page is the combination of an endpoint and the current cursor for that endpoint. Here's a small example:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;queue = [
  {"/v1/customers", "cur9sjkxi1x"},
  {"/v1/invoices", "cur0pskoxiq1"},
  # ...
]

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;To configure throughput, we just instantiate Broadway with a few parameters:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;options = [
  producer: [
    module: BackfillProducer,
    rate_limiting: [
      allowed_messages: 50,
      interval: :timer.seconds(1)
    ]
  ],
  processors: [
    default: [
      concurrency: 50,
      max_demand: 1
    ]
  ]
]

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;That &lt;code&gt;rate_limiting&lt;/code&gt; setting is all we need to ensure we process no more than 50 pages per second. This leaves a comfy 50 requests per second left over in a customer's Stripe quota.&lt;/p&gt;

&lt;p&gt;Under &lt;code&gt;processors&lt;/code&gt;, we specify that we want up to 50 concurrent workers and that each may request one unit of work per time (in our case, a page).&lt;/p&gt;

&lt;p&gt;So, to kick off the sync, the backfill producer's queue is seeded with all Stripe endpoints (and &lt;code&gt;nil&lt;/code&gt; cursors). Our workers checkout a page to work and fetch it. Each page contains up to 100 objects. Each of those objects can contain a list of paginateable children. As such, the worker's first job is to populate all objects in the page completely.&lt;/p&gt;

&lt;p&gt;Once we have a list of "filled out" objects, we parse and insert them. We use a large JSON object which maps object types and their fields to tables and columns in Postgres. We benefit greatly from the fact that every Stripe object contains an &lt;code&gt;object&lt;/code&gt; field which identifies what the entity is.&lt;/p&gt;

&lt;h3&gt;
  
  
  Sync process: new events
&lt;/h3&gt;

&lt;p&gt;After the backfill completes, it's time to switch to processing events for the indefinite lifetime of the sync. But we need a smooth hand-off between the two, otherwise we risk missing a change.&lt;/p&gt;

&lt;p&gt;To facilitate the hand-off, before the backfill begins we make a request to &lt;code&gt;/events&lt;/code&gt; to grab the most recent cursor. After the backfill completes, we first catch up on all /events that occurred while we were backfilling. After those are processed, the database is up-to-date. And it's time to poll &lt;code&gt;/events&lt;/code&gt; indefinitely.&lt;/p&gt;

&lt;p&gt;We poll the &lt;code&gt;/events&lt;/code&gt; endpoint every 500ms to check to see if there’s anything new to process, continuously. This is how we can promise "sub-second" lag.&lt;/p&gt;

&lt;p&gt;We log sync completions to a Postgres table. We use the "polymorphic embed" pattern, where each log entry contains a JSON &lt;code&gt;payload&lt;/code&gt; that can take one of several shapes. For example, our "Stripe backfill complete" log looks like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;{
  "kind": "stripe_backfill_complete",
  "row_count": 1830,
  "last_event_before_backfill": "evt_1J286oDXGuvRIWUJKfUqKpsJ"
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;When we boot a Stripe sync process, it checks the sync logs table for the most recent completed sync for this database. Our sync manager then knows what kind of sync process we need to boot and the initial state of that process.&lt;/p&gt;

&lt;h3&gt;
  
  
  What about webhooks?
&lt;/h3&gt;

&lt;p&gt;When one hears about a "real-time" or "evented" API integration, the first API primitive that leaps to mind is "webhooks."&lt;/p&gt;

&lt;p&gt;But webhooks come with a few challenges:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;You can't go down&lt;/strong&gt;: Senders typically retry undelivered webhooks with some exponential back-off. But the guarantees are often loose or unclear. And the last thing your system probably needs after recovering from a disaster is a deluge of backed-up webhooks to handle.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;You're counting on sender deliverability&lt;/strong&gt;: When polling, the only real barrier between you and the latest data is a possible caching layer. With webhooks, senders will typically have some sort of queue or "outbox" that their workers work through. Queues like this are subject to back-pressure. This opens you up to your sync slowing down if your sender's queue backs up.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;A redundant system&lt;/strong&gt;: Webhooks are not something we can rely on exclusively for a syncing operation like this, so they'll always need to be complemented by a polling system. We have to poll to backfill the database after it initializes. And we may have to poll after recovering from downtime or after fixing a bug in our webhook handling logic.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;In general, I have this suspicion that a system that relies purely on webhooks to stay in sync is bound to fail. All it takes is for one webhook to get dropped, on either the receiving end or sending end. With no other backup mechanisms in place, you risk a record in your database being out of sync indefinitely.&lt;/p&gt;

&lt;p&gt;Luckily, it turns out that with an &lt;code&gt;/events&lt;/code&gt; endpoint to poll, webhooks are not necessary. The trick is to just poll it frequently enough to get as close to real-time as possible! What's great is that you can use the same sync system to get a change made milliseconds ago or to catch up on all changes that happened during unexpected downtime.&lt;/p&gt;

&lt;h3&gt;
  
  
  The "wait" endpoint
&lt;/h3&gt;

&lt;p&gt;Our databases are read-only. Customers make writes to a platform's API, so that those writes can go through a platform's validation stack. Then, those changes flow down to their database. This is the &lt;a href="https://acco.io/read-from-dbs"&gt;&lt;em&gt;one-way data flow&lt;/em&gt; we advocate&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;To make our Stripe database a true second source of truth, we need a final pillar. We need to enable "read after writes," or the guarantee that if you make a write to Stripe's API, that write will be reflected in your database in a subsequent read. While our Stripe sync is &lt;em&gt;fast&lt;/em&gt;, the architecture at present leaves open a race condition: You can make a write to Stripe then query your database before the change has propagated.&lt;/p&gt;

&lt;p&gt;The simplest way to overcome this race condition is to sleep for one second before any subsequent reads. This should work almost all the time. But we wanted to provide something even more robust.&lt;/p&gt;

&lt;p&gt;Customers can instead call our "wait" endpoint:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;GET &amp;lt;https://api.syncinc.so/api/stripe/wait/:id&amp;gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This endpoint will hold open until we've confirmed that your Stripe database is up-to-date. When it is, the request returns with a &lt;code&gt;200&lt;/code&gt;. You can now make a subsequent read to your database with confidence.&lt;/p&gt;

&lt;h3&gt;
  
  
  Coming up
&lt;/h3&gt;

&lt;p&gt;With backfills, support for virtual objects like "upcoming invoices," and sub-second sync times, we provide a true Postgres replica with all your Stripe data.&lt;/p&gt;

&lt;p&gt;We still have a lot of work to do to make the developer experience around this database great. There are 82 tables to wrap one's head around (!!) and robust ORM support is a must for many developers. Now that the foundations of our sync are in place, stay tuned for updates to the overall experience.&lt;/p&gt;

</description>
      <category>postgres</category>
      <category>sync</category>
      <category>database</category>
      <category>api</category>
    </item>
    <item>
      <title>Our Airtable sync process, layer by layer</title>
      <dc:creator>Anthony Accomazzo</dc:creator>
      <pubDate>Thu, 01 Jul 2021 17:22:51 +0000</pubDate>
      <link>https://forem.com/acco/our-airtable-sync-process-layer-by-layer-58l9</link>
      <guid>https://forem.com/acco/our-airtable-sync-process-layer-by-layer-58l9</guid>
      <description>&lt;p&gt;At &lt;a href="https://syncinc.so/"&gt;Sync Inc&lt;/a&gt;, we replicate APIs to Postgres databases in real-time. We want to provide our customers with the experience of having direct, row-level access to their data from third-party platforms, like Stripe and Airtable.&lt;/p&gt;

&lt;p&gt;After the concept of Sync Inc crystallized, we knew we had to get something to market quickly. But it turns out that most APIs are not designed to service real-time replication. The first API we supported – Airtable – is no exception.&lt;/p&gt;

&lt;p&gt;Airtable's API presents some unique challenges, which I'll touch on first. Then I'll describe how we built a minimum-viable sync process that was fast and reliable enough to get us our first customers. From there, you'll see how we iteratively built and improved our sync process layer-by-layer.&lt;/p&gt;

&lt;h3&gt;
  
  
  Challenges
&lt;/h3&gt;

&lt;p&gt;With Airtable's API:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;There's no way to figure out what's changed&lt;/li&gt;
&lt;li&gt;The schema can change at any time&lt;/li&gt;
&lt;li&gt;Throughput is low&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Breaking these challenges down:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;No way to figure out what's changed&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;There's no way around it: we have to sweep every table, all the time.&lt;/p&gt;

&lt;p&gt;For Airtable, it's difficult to see what's changed in a base. First, you have to make requests &lt;em&gt;against each individual table&lt;/em&gt;. Given that Airtable's API is limited to 5 requests/second, if you have more than a few tables our dream of sub-second lag time becomes difficult to attain.&lt;/p&gt;

&lt;p&gt;Second, deletes are very common in Airtable. And yet, there's no way to easily tell from the API what's been deleted – you have to check every row.&lt;/p&gt;

&lt;p&gt;The last curveball is Airtable's inconsistent treatment of the &lt;code&gt;last_updated&lt;/code&gt; field for records. For example, changes to computed fields do not affect this timestamp. So, we can't poll for changes to them.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;The schema can change at any time&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Airtable has a flexible schema. Users can add, drop, rename, and change the type of columns at any time. This means the shape of the data we receive from the API is constantly changing.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Throughput is low&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Airtable's responses will contain a maximum of 100 records. And we can only make 5 requests per second. This, of course, means we can only process a maximum of 500 records per second.&lt;/p&gt;

&lt;h3&gt;
  
  
  Iteratively building up our sync
&lt;/h3&gt;

&lt;p&gt;We launched with the minimum viable version of our sync process. Once we had something that worked, we began to layer on sync optimizations that made the sync faster and more efficient.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Lowest-common denominator: the rebuild sync&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Because the Airtable schema can change at any time and we have to sweep the full Airtable API on each sync run, the lowest-common denominator was what we call "the rebuild sync." In the rebuild sync, we perform every single operation necessary to instantiate a Postgres database and bring it to parity with the data in Airtable.&lt;/p&gt;

&lt;p&gt;To support this process, our database is split into two schemas:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;code&gt;public&lt;/code&gt;&lt;/li&gt;
&lt;li&gt;&lt;code&gt;public_swap&lt;/code&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Our customers read from the &lt;code&gt;public&lt;/code&gt; schema. The &lt;code&gt;public_swap&lt;/code&gt; or "swap schema" is where the sync takes place during a rebuild.&lt;/p&gt;

&lt;p&gt;At the top of each sync cycle, we initialize all the tables in &lt;code&gt;public_swap&lt;/code&gt; as defined by the Airtable base's current schema. We then pull all the records for each table from Airtable and insert them into the tables in the swap schema.&lt;/p&gt;

&lt;p&gt;At the end of the sync cycle, we open up a database transaction. Inside that transaction, we drop every table in &lt;code&gt;public&lt;/code&gt; and "promote" every table in &lt;code&gt;public_swap&lt;/code&gt; to &lt;code&gt;public&lt;/code&gt;.&lt;/p&gt;

&lt;p&gt;From our customers' point of view, it looks like their &lt;code&gt;public&lt;/code&gt; schema suddenly receives all updates from Airtable, all at once, at discrete intervals.&lt;/p&gt;

&lt;p&gt;We decided it made sense to start with doing this rebuild every sync cycle for a couple of reasons:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;We can use the same process for syncing for the first time or the millionth time.&lt;/li&gt;
&lt;li&gt;Using migrations would be tricky. When the schema changes, we'd have to map out all the paths that were taken to get from schema A → schema B. We avoid this entirely by just rebuilding all the tables from scratch.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;strong&gt;Sync to &lt;code&gt;public&lt;/code&gt;, only rebuild on schema changes&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;As you might imagine, there were a few immediate problems with running a full rebuild on every sync cycle:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;We were rebuilding the tables in the swap schema and "promoting" them to public on each sync run. Promoting is an expensive operation. We had to drop all the tables in &lt;code&gt;public&lt;/code&gt;. This meant we were constantly marking Postgres tuples for deletion, keeping the vacuuming functionality very busy.&lt;/li&gt;
&lt;li&gt;It increased the max lag time of a given row. This is because a row first had to get synced to the swap schema, and then wait for the rest of the rows to get synced to the swap schema &lt;em&gt;before&lt;/em&gt; being promoted to the public schema.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;So, for our database this meant high write IOPs and high network utilization. For our customers, it meant a suboptimal max lag time.&lt;/p&gt;

&lt;p&gt;The next level of our sync was to sync directly to the &lt;code&gt;public&lt;/code&gt; schema. To pull this off, we needed to incorporate a few changes.&lt;/p&gt;

&lt;p&gt;First, we needed a way to &lt;em&gt;upsert&lt;/em&gt; records from Airtable right to the &lt;code&gt;public&lt;/code&gt; schema. Here's an example of what the upsert statement looked like:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;insert&lt;/span&gt; &lt;span class="k"&gt;into&lt;/span&gt; &lt;span class="k"&gt;public&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;products&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="n"&gt;created_time&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="n"&gt;name&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="k"&gt;size&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="n"&gt;color&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
      &lt;span class="k"&gt;values&lt;/span&gt; &lt;span class="err"&gt;$&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="err"&gt;$&lt;/span&gt;&lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="err"&gt;$&lt;/span&gt;&lt;span class="mi"&gt;3&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="err"&gt;$&lt;/span&gt;&lt;span class="mi"&gt;4&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="err"&gt;$&lt;/span&gt;&lt;span class="mi"&gt;5&lt;/span&gt;
      &lt;span class="k"&gt;on&lt;/span&gt; &lt;span class="n"&gt;conflict&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;id&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt; &lt;span class="k"&gt;update&lt;/span&gt; &lt;span class="k"&gt;set&lt;/span&gt;
      &lt;span class="n"&gt;id&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;excluded&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;created_time&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;excluded&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;created_time&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;name&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;excluded&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;name&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="k"&gt;size&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;excluded&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="k"&gt;size&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;color&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;excluded&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;color&lt;/span&gt;
      &lt;span class="k"&gt;where&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;created_time&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;name&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="k"&gt;size&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;color&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;is&lt;/span&gt; &lt;span class="k"&gt;distinct&lt;/span&gt; &lt;span class="k"&gt;from&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;excluded&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;created_time&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;excluded&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;name&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;excluded&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="k"&gt;size&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;excluded&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;color&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;There are a few key components of this upsert statement:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;The &lt;code&gt;on conflict (id)&lt;/code&gt; is what switches Postgres from performing an insert to performing an update. When updating, the &lt;code&gt;set&lt;/code&gt; clause is used instead. The &lt;code&gt;set&lt;/code&gt; clause here is just a re-iteration of the mapping that the &lt;code&gt;insert&lt;/code&gt; clause makes between columns and their values.&lt;/li&gt;
&lt;li&gt;The &lt;code&gt;where ... is distinct from&lt;/code&gt; is a key clause. Without it, the upsert would perform a write operation for every single row in the upsert clause, regardless if there were any changes. This trades a write-heavy characteristic for a read-heavy characteristic, which is more efficient for Postgres. Furthermore, for customers using replication slots, this means we'll only generate a WAL entry when something has actually been created/updated.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;Note that we're putting the database in charge of determining what's changed. We're constantly sending rows to Postgres, and Postgres is diffing those rows with what it has stored. If there are changes, then it performs a write.&lt;/p&gt;

&lt;p&gt;This upsert logic lets us write directly to &lt;code&gt;public&lt;/code&gt;. But, we still need to trigger a full rebuild if the base's schema is modified in some way (eg a table is added or a column is changed).&lt;/p&gt;

&lt;p&gt;So, at the beginning of each sync we need to check to see if the Airtable schema has changed since our last sync. We pull the schema, hash it, then compare that hash with the hash of the last build. If the hashes are the same, we sync right to &lt;code&gt;public&lt;/code&gt;. If the hashes are different, we kick off a full rebuild.&lt;/p&gt;

&lt;p&gt;This sync process was a big step up from the naive first implementation. For our customers, this reduced the lag time of a given row significantly. For Postgres, we traded high write IOPs for high read IOPs, which meant our database could handle greater load. It also effectively eliminated the fraction of time a table is in a write lock, removing all kinds of weird, intermittent performance hiccups.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;In-memory fingerprinting&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Layer two of our sync process bought us considerably more room on our Postgres database. But there was one last major piece of low-hanging fruit.&lt;/p&gt;

&lt;p&gt;While more optimal than the constant rebuild, the upsert statement above still places a heavy burden on the Postgres database. Every sync cycle, Postgres is given the responsibility of diffing each batch of rows we pull from Airtable with what it has stored. Read IOPs and network were both still high.&lt;/p&gt;

&lt;p&gt;We knew it would be very beneficial to hoist this diffing work up to our workers.&lt;/p&gt;

&lt;p&gt;Traditionally, this is the moment I'd reach for Redis. But sharing memory between worker instances is precisely where Elixir/OTP shines. We can keep a map in-memory of &lt;code&gt;%{ "record_id" =&amp;gt; "record_hash" }&lt;/code&gt;, where &lt;code&gt;record_hash&lt;/code&gt; is an MD5 hash of all the key/value pairs of a given record. When we request 100 records from Airtable, we can compare their latest hashes with what we have in memory. Then, we only perform an upsert of the records that have new hashes. After Postgres confirms the write succeeded, we write the latest hashes to our in-memory cache.&lt;/p&gt;

&lt;p&gt;The algorithm to employ this in-memory hash is pretty straightforward:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Hash the incoming records we just pulled from Airtable&lt;/li&gt;
&lt;li&gt;Diff the incoming hashes with the hashes we have in-memory&lt;/li&gt;
&lt;li&gt;Upsert all records that have changed into Postgres&lt;/li&gt;
&lt;li&gt;Update the in-memory hash&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;By keeping the upsert from our second layer intact, we're resilient to failures in this pipeline. If eg the update step in 4 fails, no big deal, those records will just be re-upserted on the next sync, meaning just a little more work for Postgres. If we're unable to upsert to Postgres (eg Postgres is down), we don't update our in-memory hash, so try again to insert on the next go around.&lt;/p&gt;

&lt;p&gt;This particular improvement had a massive impact. Our read IOPs and network utilization each dropped by over 90%:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--chjCKrba--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/d28zu5rt0yrs1wggfumu.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--chjCKrba--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/d28zu5rt0yrs1wggfumu.png" alt="graph of db performance improvements"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;There was an uptick in load on our workers, as they were now doing all the diffing. But this was more than offset by the reduction in time they spent waiting for Postgres.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Support read-after-writes with a proxy&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;At this point, our sync is efficient and we're able to approach the theoretical replication limit given the limitations of the Airtable API.&lt;/p&gt;

&lt;p&gt;However, we felt ourselves missing an answer for &lt;em&gt;writes&lt;/em&gt;. We believe in a one-way data flow: reads are best made in SQL, but for writes you'll want to make an API request so you can properly handle eg validation errors. But, even if the replication lag is only a few seconds, any code that performs a &lt;em&gt;read-after-write&lt;/em&gt; will not work.&lt;/p&gt;

&lt;p&gt;For example, we have some customers that are powering their React web applications with an Airtable back-end. Let's say they have a workflow to let users update their email address. The user clicks an "Edit profile" button, and is prompted for their new email address. After updating, they're sent back to their profile page:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--2kVsBWkf--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/87ktek14ztj5luqjp1vq.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--2kVsBWkf--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/87ktek14ztj5luqjp1vq.png" alt="flow of user profile edit flow"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Here's the problem: when they save their new email address, that triggers an API PATCH request to Airtable on behalf of that user. But there's a race condition: when the user is redirected back to his or her profile page, the React app re-fetches their user record from Postgres. There's no guarantee, though, that the updated user record (with the new email) has propagated via Sync Inc to their Postgres database. So the React app grabs and displays the stale email, confusing the user.&lt;/p&gt;

&lt;p&gt;So we built a proxy that users can write through to Airtable. Functionally, it's a reverse proxy. You just prepend the proxy's hostname to whatever Airtable API request you want to make, eg:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;PATCH https://proxy.syncinc.so/api.airtable.com/v0/appF0qbTS1QiA025N/Users
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The proxy attempts to write the change to Airtable first. If the Airtable write succeeds, the change is written to the database &lt;em&gt;and then&lt;/em&gt; the requestor receives a 200. This means that the change that was just written is guaranteed to be present in any subsequent database read.&lt;/p&gt;

&lt;h3&gt;
  
  
  Layer by layer
&lt;/h3&gt;

&lt;p&gt;Our wisest move was to build the initial sync to serve the lowest common denominator, even if it was slow and inefficient. As long as we were reliable, delivered on the replication time we estimated in our console, and were resilient to eg schema changes, our customers were happy.&lt;/p&gt;

&lt;p&gt;Reflecting back on our discussions before launch, a lot of our original assumptions about what our sync would need were wrong. Luckily, we time-boxed the initial build and got our MVP sync (the rebuild sync) to market as fast as possible. From there, servicing customers and learning about their needs helped inform the shape of each subsequent layer and its priority. Notice how we slowly layered on performance optimizations:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;Layer 1&lt;/strong&gt;

&lt;ul&gt;
&lt;li&gt;The rebuild sync: High write IOPs, high network, and high vacuuming.&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Layer 2&lt;/strong&gt;

&lt;ul&gt;
&lt;li&gt;Sync to &lt;code&gt;public&lt;/code&gt;: We first reduced vacuuming by syncing directly to &lt;code&gt;public&lt;/code&gt; and keeping tables between syncs.&lt;/li&gt;
&lt;li&gt;Then we added &lt;code&gt;where ... is distinct from&lt;/code&gt;. This traded high write IOPs for high read IOPs and got rid of our vacuuming problem. We still had high network.&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Layer 3&lt;/strong&gt;

&lt;ul&gt;
&lt;li&gt;Fingerprinting: By hoisting the diffing logic up to our workers, we solved both read IOPs and network on our database.&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;
&lt;/ul&gt;

</description>
      <category>startup</category>
      <category>airtable</category>
      <category>postgres</category>
      <category>syncing</category>
    </item>
    <item>
      <title>What is RDS Proxy? Exploring through benchmarks</title>
      <dc:creator>Anthony Accomazzo</dc:creator>
      <pubDate>Wed, 30 Dec 2020 03:13:27 +0000</pubDate>
      <link>https://forem.com/acco/testing-rds-proxy-with-benchmarks-57f0</link>
      <guid>https://forem.com/acco/testing-rds-proxy-with-benchmarks-57f0</guid>
      <description>&lt;p&gt;I was surprised when a customer wrote in to explain that he noticed - for simple queries - the Airtable API was about as performant as querying his &lt;a href="https://syncinc.so/?utm_source=blog&amp;amp;utm_medium=post&amp;amp;utm_campaign=rds"&gt;Sync Inc&lt;/a&gt;-provisioned Airtable database.&lt;/p&gt;

&lt;p&gt;We'd expect a database query to be at least an order of magnitude faster than an API query. So I had to investigate: What was the explanation for this?&lt;/p&gt;

&lt;p&gt;Airtable's API is pretty consistent and responsive, with a mean and median request time of 350ms:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--jeWNTi2a--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://blog.syncinc.so/rds-proxy/airtable-api.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--jeWNTi2a--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://blog.syncinc.so/rds-proxy/airtable-api.png"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;This is about what you'd expect from a standard list request against a third-party API with 100 items returned per request.&lt;/p&gt;

&lt;p&gt;So how could a database query get up to 350ms? A little digging, and we discovered that the customer was querying his database via a Lambda function. This meant opening a connection to his Sync Inc db before each query. A solid lead.&lt;/p&gt;

&lt;p&gt;I wanted to see how much overhead the connection times were adding. So I wrote some simple benchmark functions in Go.&lt;/p&gt;

&lt;p&gt;The timing for the benchmarking is handled by this simple function:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight go"&gt;&lt;code&gt;&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="n"&gt;timeTrack&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;start&lt;/span&gt; &lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Time&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;name&lt;/span&gt; &lt;span class="kt"&gt;string&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;elapsed&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Since&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;start&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;log&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Printf&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"%s took %s"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;name&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;elapsed&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;You can log how long a function execution took in Go using this function in combination with &lt;code&gt;defer&lt;/code&gt; like so:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight go"&gt;&lt;code&gt;&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="n"&gt;benchmarkQuery&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;count&lt;/span&gt; &lt;span class="kt"&gt;int&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;defer&lt;/span&gt; &lt;span class="n"&gt;timeTrack&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Now&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt; &lt;span class="s"&gt;"benchmarkQuery"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;i&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="m"&gt;0&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt; &lt;span class="n"&gt;i&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt; &lt;span class="n"&gt;count&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt; &lt;span class="n"&gt;i&lt;/span&gt;&lt;span class="o"&gt;++&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;funcUnderTest&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Above, the arguments to &lt;code&gt;timeTrack()&lt;/code&gt; are evaluated immediately, meaning the first argument is set to &lt;code&gt;time.Now()&lt;/code&gt; - the time &lt;code&gt;benchmarkQuery()&lt;/code&gt; was invoked.&lt;/p&gt;

&lt;p&gt;But &lt;code&gt;defer&lt;/code&gt; waits to &lt;em&gt;invoke&lt;/em&gt; &lt;code&gt;timeTrack()&lt;/code&gt; until right before the function &lt;code&gt;benchmarkQuery()&lt;/code&gt; returns. Which, in this case, will be after the for loop executing &lt;code&gt;funcUnderTest()&lt;/code&gt; &lt;code&gt;count&lt;/code&gt; times has completed. Neat trick.&lt;/p&gt;

&lt;p&gt;With a timing function in place, I wrote out some benchmarks. We want to benchmark two different kinds of behavior:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;A function that opens a connection &lt;em&gt;on every loop&lt;/em&gt; and then makes a request&lt;/li&gt;
&lt;li&gt;A function that opens a connection &lt;em&gt;then&lt;/em&gt; loops and makes requests&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;This should help us understand how much time is spent establishing the connection and how much time is spent executing the query.&lt;/p&gt;

&lt;p&gt;Here's what the function that opens a connection every loop looks like:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight go"&gt;&lt;code&gt;&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="n"&gt;benchmarkOpen&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;count&lt;/span&gt; &lt;span class="kt"&gt;int&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
  &lt;span class="k"&gt;defer&lt;/span&gt; &lt;span class="n"&gt;timeTrack&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Now&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt; &lt;span class="s"&gt;"benchmarkOpen"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;i&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="m"&gt;0&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt; &lt;span class="n"&gt;i&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt; &lt;span class="n"&gt;count&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt; &lt;span class="n"&gt;i&lt;/span&gt;&lt;span class="o"&gt;++&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;db&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;openDb&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
        &lt;span class="k"&gt;defer&lt;/span&gt; &lt;span class="n"&gt;db&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Close&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
        &lt;span class="n"&gt;rows&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;db&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Query&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"SELECT id from purchase_orders limit(100);"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;rows&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Close&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="n"&gt;log&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Printf&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Got error: %v"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;And the one that opens a connection then makes a bunch of queries on that connection:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight go"&gt;&lt;code&gt;&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="n"&gt;benchmarkQuery&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;count&lt;/span&gt; &lt;span class="kt"&gt;int&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
  &lt;span class="k"&gt;defer&lt;/span&gt; &lt;span class="n"&gt;timeTrack&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Now&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt; &lt;span class="s"&gt;"benchmarkQuery"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;db&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;openDb&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
    &lt;span class="k"&gt;defer&lt;/span&gt; &lt;span class="n"&gt;db&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Close&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

    &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;i&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="m"&gt;0&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt; &lt;span class="n"&gt;i&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt; &lt;span class="n"&gt;count&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt; &lt;span class="n"&gt;i&lt;/span&gt;&lt;span class="o"&gt;++&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;rows&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;db&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Query&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"SELECT id from purchase_orders limit(100);"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;rows&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Close&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="n"&gt;log&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Printf&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Got error: %v"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;(Note: It's important to run &lt;code&gt;rows.Close()&lt;/code&gt;, otherwise the connection is not immediately released - which means Go's &lt;code&gt;database/sql&lt;/code&gt; will default to just opening a new connection for the next request, defeating this test.)&lt;/p&gt;

&lt;p&gt;Using these two functions, we're prepared to find out:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;What kind of impact does re-creating a db connection on each run have on the wall time of a 1-off execution?&lt;/li&gt;
&lt;li&gt;Does using RDS proxy have an effect on that wall time?&lt;/li&gt;
&lt;li&gt;What are the basic load characteristics of a db.r5.large getting hammered with new connection requests? How does RDS Proxy help it perform?&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;This inquiry is all motivated by the nature of Lambda functions, which the customer uses. And indeed Amazon recently released RDS proxy to address this precise architecture:&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;Many applications, including those built on modern serverless architectures, can have a large number of open connections to the database server, and may open and close database connections at a high rate, exhausting database memory and compute resources. Amazon RDS Proxy allows applications to pool and share connections established with the database, improving database efficiency and application scalability.&lt;/p&gt;
&lt;/blockquote&gt;

&lt;h3&gt;
  
  
  RDS Proxy gotchas
&lt;/h3&gt;

&lt;p&gt;Before we get into the benchmarks, I want to touch on the gotchas that sucked up a lot of my time. (I guess after all these years I haven't learned my lesson: The AWS UX is just not going to help you use their products. You have to read the user guide from end-to-end.)&lt;/p&gt;

&lt;p&gt;Here are the two that tripped me up big time:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;1. RDS Proxy only supports specific database engine versions&lt;/strong&gt; – and the latest RDS Postgres is not one of them.&lt;/p&gt;

&lt;p&gt;The first time I went to setup a new RDS proxy, I didn't see any available databases in the dropdown:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--vVURHWPW--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://blog.syncinc.so/rds-proxy/empty-dropdown.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--vVURHWPW--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://blog.syncinc.so/rds-proxy/empty-dropdown.png" alt=""&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;After much trial and error, I ended up spinning up a new Aurora db. At last, I saw &lt;em&gt;something&lt;/em&gt; populate the dropdown.&lt;/p&gt;

&lt;p&gt;It wasn't until much later I learned that Postgres 12 - the engine I use everywhere - just isn't supported yet.&lt;/p&gt;

&lt;p&gt;(What would have been nice: Showing me all my RDS databases in that drop-down, but just &lt;em&gt;greying out the ones that are not supported by Proxy&lt;/em&gt;.)&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;2. You can only connect to your RDS Proxy from inside &lt;em&gt;the same VPC&lt;/em&gt;&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;This one was a time sink. There are few things I dread more than trying to connect to something on AWS and getting a timeout. There are about a half-dozen layers (security groups, IGWs, subnets) that all have to be lined up &lt;em&gt;just so&lt;/em&gt; to get a connection online. The bummer is that there's no easy way to debug &lt;em&gt;where&lt;/em&gt; a given connection has failed.&lt;/p&gt;

&lt;p&gt;So, the first wrench was when I discovered - after much trial and error - that I couldn't connect to my RDS Proxy from my laptop.&lt;/p&gt;

&lt;p&gt;(How about a small bone, right next to "Proxy endpoint," that lets me know the limitations associated with this endpoint?)&lt;/p&gt;

&lt;h3&gt;
  
  
  Now that we're up and connected
&lt;/h3&gt;

&lt;p&gt;&lt;strong&gt;RDS vs RDS Proxy&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;With my RDS Proxy 101 hard-won, we're ready to run some benchmarks.&lt;/p&gt;

&lt;p&gt;I copied the benchmark binary up to an EC2 server co-located with the RDS database and RDS proxy.&lt;/p&gt;

&lt;p&gt;Per above:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;code&gt;benchmarkOpen&lt;/code&gt; opens a connection and then runs a query on each loop (sequential)&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;benchmarkQuery&lt;/code&gt; opens a connection first, then each loop is a query on that connection (also sequential)&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Here's what I got running this against the RDS database directly, 1000 times per, with a 10s pause between:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="c"&gt;# RDS direct, same datacenter&lt;/span&gt;
&lt;span class="nv"&gt;$ &lt;/span&gt;./main
2020/12/25 00:08:14 Starting benchmarks...
2020/12/25 00:08:19 benchmarkOpen took 5.846447324s
2020/12/25 00:08:30 benchmarkQuery took 504.32703ms
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;So:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;code&gt;benchmarkOpen&lt;/code&gt; - ~5.85ms per loop&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;benchmarkQuery&lt;/code&gt; - ~0.500ms per loop&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Fair to assume the connection handshake takes 5ms, which isn't bad.&lt;/p&gt;

&lt;p&gt;This benchmark runs too fast to register on any RDS graphs. But if I loop the benchmark to sustain it over a couple minutes, I can see a healthy connection spike for the first test:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--apz8Y0o3--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://blog.syncinc.so/rds-proxy/db-direct-graphs.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--apz8Y0o3--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://blog.syncinc.so/rds-proxy/db-direct-graphs.png" alt=""&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;I recognize that in &lt;code&gt;benchmarkOpen&lt;/code&gt;, instead of deferring &lt;code&gt;db.Close()&lt;/code&gt; I could be nicer to my database and close it immediately. But I kind of like the DB slow boil.&lt;/p&gt;

&lt;p&gt;Let's move on to the RDS Proxy. Remember, RDS Proxy has to be in the same VPC as the entity calling it as well as the database. So all three are co-located:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="c"&gt;# RDS Proxy, same datacenter&lt;/span&gt;
&lt;span class="nv"&gt;$ &lt;/span&gt;./main
2020/12/25 00:15:21 Starting benchmarks...
2020/12/25 00:15:28 benchmarkOpen took 7.094809555s
2020/12/25 00:15:39 benchmarkQuery took 1.239892867s
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Comparing to the first benchmark: It appears the Proxy adds an overhead to each request of 0.7ms (1.2ms - 0.5ms). As we'd expect, connection openings take a bit longer, about 1.15ms longer.&lt;/p&gt;

&lt;p&gt;And, sure enough, I can't get the graph to budge. It sustains about 100 connections, with no real CPU spike, with the benchmark sustained over several minutes:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--D2XrPIAg--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://blog.syncinc.so/rds-proxy/rds-with-proxy.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--D2XrPIAg--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://blog.syncinc.so/rds-proxy/rds-with-proxy.png" alt=""&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;We'll see soon how all this performance translates to Lambda, but the motivation for RDS Proxy is clear: &lt;strong&gt;It's less about the client, more about the database&lt;/strong&gt;. RDS Proxy is just a layer of indirection for our database connections. It doesn't speed up establishing new connections - in fact, we pay a small tax on connection opening. It just saves our database from the thundering herd.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Cross-region&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Because we're here, I'm curious: What happens if we run this benchmark cross-region, from US-East-1 to US-West-2?&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="c"&gt;# RDS - different datacenter&lt;/span&gt;
&lt;span class="nv"&gt;$ &lt;/span&gt;./main
2020/12/25 00:11:12 Starting benchmarks...
2020/12/25 00:15:53 benchmarkOpen took 4m41.607514892s
2020/12/25 00:17:15 benchmarkQuery took 1m11.767803425s
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Different story:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;code&gt;benchmarkOpen&lt;/code&gt; - ~281.6ms per loop&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;benchmarkQuery&lt;/code&gt; - ~71.77ms per loop&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Connections take about 200ms, presumably because they involve a lot of back and forth that require coast-to-coast travel.&lt;/p&gt;

&lt;p&gt;The difference is over a full order of magnitude. At this point, opening a database connection and making a query starts to have performance characteristics similar to querying Airtable's API.&lt;/p&gt;

&lt;p&gt;Indeed, the customer that opened this inquiry operates out of US-East-1. Bingo.&lt;/p&gt;

&lt;p&gt;Opening connections is certainly a factor here. But RDS Proxy won't help, because as we've learned it's not intended to reduce client-side connection opening costs. And even if it did, we couldn't use it cross-region!&lt;/p&gt;

&lt;p&gt;So co-locating the database in the customer's data center is the move, and will give their team performance an API can only dream of.&lt;/p&gt;

&lt;h3&gt;
  
  
  Benchmarking Lambda
&lt;/h3&gt;

&lt;p&gt;Lambda got us into this mess, so let's do an end-to-end benchmark with it. Our direct-to-RDS test will route like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;API Gateway -&amp;gt; Lambda -&amp;gt; RDS
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;And our RDS Proxy test will look like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;API Gateway -&amp;gt; Lambda -&amp;gt; RDS Proxy -&amp;gt; RDS
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Each lambda function call will open a new database connection and issue a single query. I just adapted the function above to make one open/query as opposed to doing so inside a &lt;code&gt;for&lt;/code&gt; loop.&lt;/p&gt;

&lt;p&gt;I'll run a benchmark locally on my computer that will call the API Gateway endpoint. Because I'm running locally, I'm able to save a little work by using Go's native benchmarking facilities:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight go"&gt;&lt;code&gt;&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="n"&gt;BenchmarkRequest&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;b&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;testing&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;B&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;i&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="m"&gt;0&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt; &lt;span class="n"&gt;i&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt; &lt;span class="n"&gt;b&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;N&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt; &lt;span class="n"&gt;i&lt;/span&gt;&lt;span class="o"&gt;++&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;resp&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;http&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Post&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"https://[REDACTED]/run"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"application/json"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="nb"&gt;panic&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;err&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt;
        &lt;span class="k"&gt;defer&lt;/span&gt; &lt;span class="n"&gt;resp&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Body&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Close&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;resp&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;StatusCode&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="m"&gt;200&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="n"&gt;fmt&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Println&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Received non-200 response. Continuing"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The first test routes to RDS directly:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;go &lt;span class="nb"&gt;test&lt;/span&gt; &lt;span class="nt"&gt;-bench&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="nb"&gt;.&lt;/span&gt; &lt;span class="nt"&gt;-benchtime&lt;/span&gt; 1000x
goos: darwin
goarch: amd64
pkg: github.com/acco/rds-proxy-bench
BenchmarkRequest-8          1000     128684974 ns/op
PASS
ok      github.com/acco/rds-proxy-bench 130.351s
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;So, 128ms per request.&lt;/p&gt;

&lt;p&gt;Now using RDS Proxy:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;go &lt;span class="nb"&gt;test&lt;/span&gt; &lt;span class="nt"&gt;-bench&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="nb"&gt;.&lt;/span&gt; &lt;span class="nt"&gt;-benchtime&lt;/span&gt; 1000x
goos: darwin
goarch: amd64
pkg: github.com/acco/rds-proxy-bench
BenchmarkRequest-8          1000     118576056 ns/op
PASS
ok      github.com/acco/rds-proxy-bench 120.529s
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;118ms per request.&lt;/p&gt;

&lt;p&gt;We're not impressed until we look at the graphs:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--gXdt7cG9--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://blog.syncinc.so/rds-proxy/proxy-lambda.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--gXdt7cG9--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://blog.syncinc.so/rds-proxy/proxy-lambda.png" alt=""&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;The first big spike is the first benchmark, hitting RDS directly. The second non-spike is the second benchmark, routed through the proxy.&lt;/p&gt;

&lt;p&gt;The end-to-end Lambda benchmark takes it all home: The purpose of RDS Proxy is foremost to tame connections (and related load) on the database. Any happy gains on the &lt;em&gt;client&lt;/em&gt; side will be a result of a relieved database.&lt;/p&gt;

</description>
      <category>database</category>
      <category>serverless</category>
      <category>aws</category>
      <category>lambda</category>
    </item>
  </channel>
</rss>
