<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>Forem: Damian Czaja</title>
    <description>The latest articles on Forem by Damian Czaja (@trojan295).</description>
    <link>https://forem.com/trojan295</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F1112421%2F55c7753c-4953-4dcb-bbd9-1c8db6a22702.jpeg</url>
      <title>Forem: Damian Czaja</title>
      <link>https://forem.com/trojan295</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://forem.com/feed/trojan295"/>
    <language>en</language>
    <item>
      <title>Benchmark Kubernetes CNI</title>
      <dc:creator>Damian Czaja</dc:creator>
      <pubDate>Thu, 13 Jul 2023 09:44:00 +0000</pubDate>
      <link>https://forem.com/trojan295/benchmark-kubernetes-cni-55ie</link>
      <guid>https://forem.com/trojan295/benchmark-kubernetes-cni-55ie</guid>
      <description>&lt;p&gt;I was configuring recently Calico on a RKE2 cluster. I wanted to test the network performance of the CNI and check, if it can achieve the 100Gbps bandwidth of the NIC.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://iperf.fr/"&gt;iperf3&lt;/a&gt; is a great tool for this, but unfortunately, it is single threaded and the test was CPU bound.&lt;/p&gt;

&lt;p&gt;To overcome this, I wrote a simple script, which runs multiple iperf3 tests on a Kubernetes cluster. The tool is available on &lt;a href="https://github.com/Trojan295/k8s-utils/tree/main/cmd/k8siperf3"&gt;my Github&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;You have to provide the node for the iperf3 servers and clients and the number of parallel tests. The scripts then starts iperf3 servers, gets their pod IPs and starts a client for each server. At the end of the test, it sums the bitrate of all the tests.&lt;/p&gt;

&lt;p&gt;The results might be a bit off, as the client pods might not start and end at the same time. That's why the scripts discards the first and last 15 seconds of measurements.&lt;/p&gt;

&lt;p&gt;When I compared the results from the script to the numbers in &lt;code&gt;nload vxlan.calico&lt;/code&gt; they were pretty close.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;❯ go run ./cmd/k8siperf3 &lt;span class="nt"&gt;--parallel-count&lt;/span&gt; 8 &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;--client-node-name&lt;/span&gt; node-1 &lt;span class="nt"&gt;--server-node-name&lt;/span&gt; node-2
2023/07/13 11:35:40 Creating 8 iperf3 server pods
2023/07/13 11:35:41 Waiting &lt;span class="k"&gt;for &lt;/span&gt;iperf3 server pods to start
2023/07/13 11:35:45 iperf3 server pod 0 IP address: 10.42.113.216
2023/07/13 11:35:45 iperf3 server pod 1 IP address: 10.42.113.218
2023/07/13 11:35:46 iperf3 server pod 2 IP address: 10.42.113.211
2023/07/13 11:35:46 iperf3 server pod 3 IP address: 10.42.113.212
2023/07/13 11:35:47 iperf3 server pod 4 IP address: 10.42.113.213
2023/07/13 11:35:47 iperf3 server pod 5 IP address: 10.42.113.214
2023/07/13 11:35:47 iperf3 server pod 6 IP address: 10.42.113.195
2023/07/13 11:35:47 iperf3 server pod 7 IP address: 10.42.113.196
2023/07/13 11:35:47 Creating 8 iperf3 client pods
2023/07/13 11:35:47 Waiting &lt;span class="k"&gt;for &lt;/span&gt;iperf3 client pods to finish
2023/07/13 11:36:53 iperf3 client pod 0 finished
2023/07/13 11:36:53 iperf3 client pod 1 finished
2023/07/13 11:36:53 iperf3 client pod 2 finished
2023/07/13 11:36:54 iperf3 client pod 3 finished
2023/07/13 11:36:54 iperf3 client pod 4 finished
2023/07/13 11:36:55 iperf3 client pod 5 finished
2023/07/13 11:36:55 iperf3 client pod 6 finished
2023/07/13 11:36:55 iperf3 client pod 7 finished
Bitrate: 86.589250 Gbits/sec
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



</description>
    </item>
    <item>
      <title>Transactional outbox pattern example in Golang and MongoDB</title>
      <dc:creator>Damian Czaja</dc:creator>
      <pubDate>Mon, 03 Jul 2023 08:34:40 +0000</pubDate>
      <link>https://forem.com/trojan295/transactional-outbox-pattern-example-in-golang-and-mongodb-36mb</link>
      <guid>https://forem.com/trojan295/transactional-outbox-pattern-example-in-golang-and-mongodb-36mb</guid>
      <description>&lt;p&gt;For a personal project, I was experimenting with DDD. I was sending domain events through RabbitMQ to run choreography-based sagas.&lt;br&gt;
One problem I had was ensuring that the domain event got sent out after modifying aggregates. It's not possible to run an atomic transaction through MongoDB and RabbitMQ, so there can be a situation where the aggregate is modified successfully in the database, but we won't send an event because RabbitMQ is not available. You could use retries, but this won't withstand, for example, an application crash.&lt;/p&gt;

&lt;p&gt;To solve this, I used the &lt;a href="https://microservices.io/patterns/data/transactional-outbox.html"&gt;transactional outbox pattern&lt;/a&gt;. The idea is to have an additional table in the database to save events that have to be sent. It must be in the same database where the aggregate is persisted, so you can run it in the same DB transaction. Then we have some other process that polls documents from this collection and sends them to the event bus or queue.&lt;/p&gt;

&lt;p&gt;With this, we can modify the aggregate and dispatch the event in the same MongoDB transaction. The event will then be sent to the RabbitMQ when it's available. This pattern ensures that the event will be sent out at least once, but it could also be sent multiple times. So on the consumer side, you have to either be idempotent or check for duplicate events. One idea here is to add GUIDs to your events and use the inbox pattern, where you check for duplicate events.&lt;/p&gt;
&lt;h2&gt;
  
  
  Golang implementation
&lt;/h2&gt;

&lt;p&gt;I was able to find a lot of examples in C# and Java, but just a few for the Outbox pattern in Golang. My example here is not something you can copy-paste, but you should get an idea of how to implement it yourself.&lt;/p&gt;

&lt;p&gt;I defined the following interfaces:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight go"&gt;&lt;code&gt;&lt;span class="k"&gt;package&lt;/span&gt; &lt;span class="n"&gt;application&lt;/span&gt;

&lt;span class="c"&gt;// EventPublisher publishes events to an event bus or queue.&lt;/span&gt;
&lt;span class="k"&gt;type&lt;/span&gt; &lt;span class="n"&gt;EventPublisher&lt;/span&gt; &lt;span class="k"&gt;interface&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
  &lt;span class="n"&gt;PublishEvents&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ctx&lt;/span&gt; &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Context&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;event&lt;/span&gt; &lt;span class="o"&gt;...*&lt;/span&gt;&lt;span class="n"&gt;domain&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Event&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="kt"&gt;error&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="c"&gt;// EventOutbox dispatches events to the transactional outbox.&lt;/span&gt;
&lt;span class="k"&gt;type&lt;/span&gt; &lt;span class="n"&gt;EventOutbox&lt;/span&gt; &lt;span class="k"&gt;interface&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;DispatchEvents&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ctx&lt;/span&gt; &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Context&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;event&lt;/span&gt; &lt;span class="o"&gt;...*&lt;/span&gt;&lt;span class="n"&gt;domain&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Event&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="kt"&gt;error&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="c"&gt;// UnitOfWork provides an interface for running operations on the persistance layer in a single transaction.&lt;/span&gt;
&lt;span class="k"&gt;type&lt;/span&gt; &lt;span class="n"&gt;UnitOfWork&lt;/span&gt; &lt;span class="k"&gt;interface&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;OrderRepository&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="n"&gt;domain&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;OrderRepository&lt;/span&gt;
    &lt;span class="n"&gt;EventOutbox&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="n"&gt;domain&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;EventOutbox&lt;/span&gt;

    &lt;span class="n"&gt;Run&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ctx&lt;/span&gt; &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Context&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;f&lt;/span&gt; &lt;span class="k"&gt;func&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ctx&lt;/span&gt; &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Context&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;interface&lt;/span&gt;&lt;span class="p"&gt;{},&lt;/span&gt; &lt;span class="kt"&gt;error&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;interface&lt;/span&gt;&lt;span class="p"&gt;{},&lt;/span&gt; &lt;span class="kt"&gt;error&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;I have all the interactions with MongoDB in a single Go struct.&lt;br&gt;
My &lt;code&gt;MongoDBStore&lt;/code&gt; implements the &lt;code&gt;EventOutbox&lt;/code&gt; and &lt;code&gt;UnitOfWork&lt;/code&gt; interfaces. It also has a method &lt;code&gt;RunOutbox&lt;/code&gt; to run the process, which sends events from the outbox to the event bus.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight go"&gt;&lt;code&gt;&lt;span class="c"&gt;// Event is MongoDB event representation&lt;/span&gt;
&lt;span class="k"&gt;type&lt;/span&gt; &lt;span class="n"&gt;Event&lt;/span&gt; &lt;span class="k"&gt;struct&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;ID&lt;/span&gt;   &lt;span class="n"&gt;primitive&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;ObjectID&lt;/span&gt; &lt;span class="s"&gt;`bson:"_id,omitempty"`&lt;/span&gt;
    &lt;span class="n"&gt;Data&lt;/span&gt; &lt;span class="n"&gt;bson&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Raw&lt;/span&gt;           &lt;span class="s"&gt;`bson:"data"`&lt;/span&gt;

    &lt;span class="n"&gt;Published&lt;/span&gt; &lt;span class="kt"&gt;bool&lt;/span&gt; &lt;span class="s"&gt;`bson:"published"`&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="c"&gt;// ToModel is used convert from MongoDB to domain event&lt;/span&gt;
&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;dto&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;Event&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="n"&gt;ToModel&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;domain&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Event&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="kt"&gt;error&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
  &lt;span class="c"&gt;// ...&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="c"&gt;// FromModel is used to convert domain event to MongoDB representation&lt;/span&gt;
&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;dto&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;Event&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="n"&gt;FromModel&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;event&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;domain&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Event&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="kt"&gt;error&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
  &lt;span class="c"&gt;// ...&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="c"&gt;// Run runs f in a single MongoDB transaction.&lt;/span&gt;
&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;s&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;MongoDBStore&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="n"&gt;Run&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ctx&lt;/span&gt; &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Context&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;f&lt;/span&gt; &lt;span class="k"&gt;func&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ctx&lt;/span&gt; &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Context&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;interface&lt;/span&gt;&lt;span class="p"&gt;{},&lt;/span&gt; &lt;span class="kt"&gt;error&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;interface&lt;/span&gt;&lt;span class="p"&gt;{},&lt;/span&gt; &lt;span class="kt"&gt;error&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;session&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;s&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;client&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;StartSession&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="k"&gt;defer&lt;/span&gt; &lt;span class="n"&gt;session&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;EndSession&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ctx&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="n"&gt;result&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;session&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;WithTransaction&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ctx&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="k"&gt;func&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;sessCtx&lt;/span&gt; &lt;span class="n"&gt;mongo&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;SessionContext&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;interface&lt;/span&gt;&lt;span class="p"&gt;{},&lt;/span&gt; &lt;span class="kt"&gt;error&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;f&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;sessCtx&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;})&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;result&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="c"&gt;// DispatchEvent inserts an event in the outbox collection to send out.&lt;/span&gt;
&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;store&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;MongoDBStore&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="n"&gt;DispatchEvent&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ctx&lt;/span&gt; &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Context&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;event&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;domain&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Event&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="kt"&gt;error&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
  &lt;span class="n"&gt;collection&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;store&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;client&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Database&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;store&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;database&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Collection&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;outboxCollection&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="n"&gt;dto&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;Event&lt;/span&gt;&lt;span class="p"&gt;{}&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;dto&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;FromModel&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;event&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;fmt&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Errorf&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"failed to convert from model: %w"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="n"&gt;_&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;collection&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;InsertOne&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ctx&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;dto&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;fmt&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Errorf&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"failed to insert event: %w"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;

  &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="c"&gt;// RunOutbox runs an infinite loop, which polls and sends events&lt;/span&gt;
&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;store&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;MongoDBStore&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="n"&gt;RunOutbox&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ctx&lt;/span&gt; &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Context&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;eventPublisher&lt;/span&gt; &lt;span class="n"&gt;application&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;EventPublisher&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="kt"&gt;error&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;ticker&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;NewTicker&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="m"&gt;1&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt; &lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Second&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;select&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;case&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;-&lt;/span&gt;&lt;span class="n"&gt;ctx&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Done&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;
            &lt;span class="k"&gt;return&lt;/span&gt;
        &lt;span class="k"&gt;case&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;-&lt;/span&gt;&lt;span class="n"&gt;ticker&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;C&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;
            &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;store&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;runOutbox&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ctx&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;eventPublisher&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
                &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt;
            &lt;span class="p"&gt;}&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;store&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;MongoDBStore&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="n"&gt;runOutbox&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ctx&lt;/span&gt; &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Context&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;eventPublisher&lt;/span&gt; &lt;span class="n"&gt;application&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;EventPublisher&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="kt"&gt;error&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;events&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;store&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;getUnpublishedEvents&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ctx&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;fmt&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Errorf&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"failed to get unpublished events: %w"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;_&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;dto&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="k"&gt;range&lt;/span&gt; &lt;span class="n"&gt;events&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;event&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;dto&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;ToModel&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;fmt&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Errorf&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"failed to convert event to model: %w"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt;

        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;eventPublisher&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;PublishEvents&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ctx&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;event&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;fmt&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Errorf&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"failed to publish event: %w"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt;

        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;store&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;setEventAsPublished&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ctx&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;dto&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;ID&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;fmt&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Errorf&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"failed to set event as published: %w"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;store&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;MongoDBStore&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="n"&gt;getUnpublishedEvents&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ctx&lt;/span&gt; &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Context&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;([]&lt;/span&gt;&lt;span class="n"&gt;Event&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="kt"&gt;error&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;collection&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;store&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;client&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Database&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;store&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;database&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Collection&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;outboxCollection&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="n"&gt;cursor&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;collection&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Find&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ctx&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;bson&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;M&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="s"&gt;"published"&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="no"&gt;false&lt;/span&gt;&lt;span class="p"&gt;})&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;fmt&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Errorf&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"failed to get unpublished events: %w"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="k"&gt;var&lt;/span&gt; &lt;span class="n"&gt;events&lt;/span&gt; &lt;span class="p"&gt;[]&lt;/span&gt;&lt;span class="n"&gt;Event&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;cursor&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;All&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ctx&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;events&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;fmt&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Errorf&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"failed to decode unpublished events: %w"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;events&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt;

&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;store&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;MongoDBStore&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="n"&gt;setEventAsPublished&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ctx&lt;/span&gt; &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Context&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;eventID&lt;/span&gt; &lt;span class="n"&gt;primitive&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;ObjectID&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="kt"&gt;error&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;collection&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;store&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;client&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Database&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;store&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;database&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Collection&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;outboxCollection&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="n"&gt;result&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;collection&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;FindOneAndUpdate&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ctx&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;bson&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;M&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="s"&gt;"_id"&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="n"&gt;eventID&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt; &lt;span class="n"&gt;bson&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;M&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="s"&gt;"$set"&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="n"&gt;bson&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;M&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="s"&gt;"published"&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="no"&gt;true&lt;/span&gt;&lt;span class="p"&gt;}})&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;result&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Err&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;fmt&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Errorf&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"failed to set event as published: %w"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;result&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Err&lt;/span&gt;&lt;span class="p"&gt;())&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;In main, you have to run the &lt;code&gt;RunOutbox&lt;/code&gt; method in a goroutine. You can then use this in your handlers to modify aggregates and send domain events:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight go"&gt;&lt;code&gt;&lt;span class="k"&gt;package&lt;/span&gt; &lt;span class="n"&gt;application&lt;/span&gt;

&lt;span class="k"&gt;type&lt;/span&gt; &lt;span class="n"&gt;CreateOrderHandler&lt;/span&gt; &lt;span class="k"&gt;struct&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;uow&lt;/span&gt;         &lt;span class="n"&gt;UnitOfWork&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;h&lt;/span&gt; &lt;span class="n"&gt;CreateOrderHandler&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="n"&gt;Handle&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ctx&lt;/span&gt; &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Context&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;cmd&lt;/span&gt; &lt;span class="n"&gt;CreateOrder&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;domain&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Order&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="kt"&gt;error&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
  &lt;span class="c"&gt;// this generates an OrderCreated event&lt;/span&gt;
    &lt;span class="n"&gt;order&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;domain&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;CreateOrder&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

    &lt;span class="n"&gt;orderIface&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;h&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;uow&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Run&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ctx&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="k"&gt;func&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ctx&lt;/span&gt; &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Context&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;interface&lt;/span&gt;&lt;span class="p"&gt;{},&lt;/span&gt; &lt;span class="kt"&gt;error&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;h&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;uow&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;OrderRepository&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;CreateOrder&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ctx&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;order&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;fmt&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Errorf&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"failed to create order: %w"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;h&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;uow&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;EventOutbox&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;DispatchEvents&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;order&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Events&lt;/span&gt;&lt;span class="p"&gt;());&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;fmt&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Errorf&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"failed to dispatch events: %w"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt;

        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;order&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt;
    &lt;span class="p"&gt;})&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; 
    &lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;orderIface&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;domain&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Order&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;You could also use &lt;a href="https://www.mongodb.com/docs/manual/changeStreams/"&gt;MongoDB ChangeStreams&lt;/a&gt; instead of polling to get information when a new event is in the outbox.&lt;/p&gt;

</description>
      <category>go</category>
      <category>mongodb</category>
      <category>softwareengineering</category>
      <category>architecture</category>
    </item>
  </channel>
</rss>
