<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>Forem: Rob Walters</title>
    <description>The latest articles on Forem by Rob Walters (@rwaltersma).</description>
    <link>https://forem.com/rwaltersma</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F1158629%2Fab6a62b3-211a-4844-8053-a28dca465e67.png</url>
      <title>Forem: Rob Walters</title>
      <link>https://forem.com/rwaltersma</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://forem.com/feed/rwaltersma"/>
    <language>en</language>
    <item>
      <title>REST APIs vs GraphQL: is it one or the other?</title>
      <dc:creator>Rob Walters</dc:creator>
      <pubDate>Wed, 19 Mar 2025 19:24:09 +0000</pubDate>
      <link>https://forem.com/rwaltersma/rest-apis-vs-graphql-is-it-one-or-the-other-5e8f</link>
      <guid>https://forem.com/rwaltersma/rest-apis-vs-graphql-is-it-one-or-the-other-5e8f</guid>
      <description>&lt;p&gt;GraphQL and REST are popular methods for building APIs that facilitate data transfer across networks. Both approaches emphasize key principles such as statelessness and the clear separation of client and server. They rely on HTTP as the main communication protocol. Both are flexible methods for developing APIs and can easily integrate with various programming languages and applications. As developers began to use APIs more, the need arose to filter and modify responses. Some APIs support synchronous calls, where one output serves as the input for another, while others can operate in parallel by design. This process, called API orchestration, highlights the advantages of GraphQL over REST APIs.&lt;br&gt;
But what if you’ve invested considerable engineering hours and money into developing REST APIs for your organization? Is it worth rewriting all of these to implement GraphQL?&lt;/p&gt;

&lt;p&gt;The good news is that &lt;strong&gt;you don’t have to rewrite any of your REST APIs to leverage GraphQL and utilize API Orchestration!&lt;/strong&gt; &lt;a href="https://www.apollographql.com/graphos/apollo-connectors?utm_campaign=2025-03-19_apollo-connectors-march2025awareness&amp;amp;utm_medium=blog&amp;amp;utm_source=devto" rel="noopener noreferrer"&gt;Apollo Connectors&lt;/a&gt; for REST APIs make it easy and quick to integrate REST APIs into your GraphQL with no complex code to write or compile!&lt;/p&gt;

&lt;p&gt;To illustrate, consider a REST API endpoint &lt;code&gt;/getOrders/:orderId&lt;/code&gt;, where orderId represents an order ID like “12345”. The response to a GET request on &lt;code&gt;/getOrders/12345&lt;/code&gt; would be:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;  {
      "order_id": "12345",
      "customer": {
        "id": "C001",
        "name": "John Doe",
        "email": "johndoe@example.com"
      },
      "items": [
        {
          "product_id": "P1001",
          "name": "Wireless Mouse",
          "quantity": 2,
          "price": 25.99
        },
        {
          "product_id": "P1002",
          "name": "Mechanical Keyboard",
          "quantity": 1,
          "price": 89.99
        }
      ],
      "total_price": 141.97,
      "status": "Shipped",
      "order_date": "2025-02-25T14:30:00Z"
    }
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;To integrate this REST API with GraphQL, you only need to define the &lt;code&gt;@source&lt;/code&gt; directive in your GraphQL schema like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;extend schema
  @link(
    url: "https://specs.apollo.dev/federation/v2.10"
    import: ["@key", "@requires"]
  )
  @link(
    url: "https://specs.apollo.dev/connect/v0.1"
    import: ["@source", "@connect"]
  )
  @source(
    name: "orders"
    http: {
      baseURL: "&amp;lt;&amp;lt;REST API URL&amp;gt;&amp;gt;"
  })
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Then add your Types and Query:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;type Order {
  orderId: ID!
  customer: String!
  totalItems: Float!
  totalPrice: Float!
  status: String!
  orderDate: String!
}

type Query {
  getOrder(order_id: ID!): Order
    @connect(
      source: "orders"
      http: { GET: "/getOrders/{$args.order_id}" }
      selection: """
          orderId: order_id
          customer:customer.name
          totalItems: items-&amp;gt;size
          totalPrice: total_price
          status
          orderDate: order_date
      """
    )
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The &lt;code&gt;@connect&lt;/code&gt; directive in the Query instructs GraphQL to use the “orders” connector. You can also specify a resource path that will be added to the baseUrl defined in the &lt;code&gt;@source&lt;/code&gt; declarative statement. The selection field indicates how you want the data to be returned to the client. Here, you can optionally transform and filter the data. To help you build the selection mapping, Apollo provides a free &lt;a href="https://www.apollographql.com/connectors-mapping-playground?utm_campaign=2025-03-19_connectors-mapping-playground-march2025awareness&amp;amp;utm_medium=blog&amp;amp;utm_source=devto" rel="noopener noreferrer"&gt;Connectors Mapping Playground&lt;/a&gt;, as shown in the figure below.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fthv4nrpgmwhqu272iuzn.jpg" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fthv4nrpgmwhqu272iuzn.jpg" alt="Image description" width="800" height="340"&gt;&lt;/a&gt;&lt;br&gt;
The connection mapper takes an input and can suggest a mapping based on that input. In this example, we are renaming fields and even transforming a field called totalItems to indicate how many order items it contains instead of listing the items themselves (e.g., “totalItems: items-&amp;gt;size”). Using the Apollo Sandbox included with the rover CLI, you can run ad-hoc GraphQL queries. In the figure below, we executed the getOrder query by passing “12345”, and the returned data shown is from our REST API.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fpz0x5mky4qmgjon5derg.jpg" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fpz0x5mky4qmgjon5derg.jpg" alt="Image description" width="800" height="435"&gt;&lt;/a&gt;&lt;br&gt;
All of this configuration was accomplished without any procedural code, saving time and making it easy to integrate your existing REST APIs into the graph!&lt;/p&gt;

&lt;p&gt;For more information on Apollo Connectors check out the following:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;Hear from Apollo GraphQL founder Matt DeBergalis: &lt;a href="https://www.youtube.com/shorts/TrmtBGKBroQ" rel="noopener noreferrer"&gt;Apollo Connectors "It's the biggest thing we've ever shipped" - YouTube&lt;/a&gt;&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Learn about all the latest innovations in GraphQL: &lt;a href="https://www.youtube.com/watch?v=5j85mPdgkP4" rel="noopener noreferrer"&gt;Livestream Launch: New innovations from Apollo&lt;/a&gt;&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Take a free Apollo Odyssey course on Apollo Connectors: &lt;a href="https://www.apollographql.com/tutorials/connectors-intro-rest/01-overview?utm_campaign=2025-03-19_odyssey-apollo-connectors-tutorial-march2025awareness&amp;amp;utm_medium=blog&amp;amp;utm_source=devto" rel="noopener noreferrer"&gt;Introducing Apollo Connectors - GraphQL Tutorials&lt;/a&gt;&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Documentation: &lt;a href="https://www.apollographql.com/docs/graphos/get-started/guides/rest?utm_campaign=2025-03-19_getting-started-apollo-connectors-doc-march2025awareness&amp;amp;utm_medium=blog&amp;amp;utm_source=devto" rel="noopener noreferrer"&gt;Get Started with Apollo Connectors and GraphOS&lt;/a&gt; &lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Create your Apollo GraphQL Studio for free &lt;a href="https://studio.apollographql.com/signup?utm_campaign=2025-03-19_studio-signup-march2025awareness&amp;amp;utm_medium=blog&amp;amp;utm_source=devto" rel="noopener noreferrer"&gt;https://studio.apollographql.com/signup&lt;/a&gt;&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

</description>
      <category>graphql</category>
      <category>restapi</category>
      <category>microservices</category>
      <category>programming</category>
    </item>
    <item>
      <title>Aggregate streaming data within windows of time using Atlas Stream Processing</title>
      <dc:creator>Rob Walters</dc:creator>
      <pubDate>Mon, 25 Sep 2023 16:32:42 +0000</pubDate>
      <link>https://forem.com/mongodb/aggregate-streaming-data-within-windows-of-time-using-atlas-stream-processing-1lm1</link>
      <guid>https://forem.com/mongodb/aggregate-streaming-data-within-windows-of-time-using-atlas-stream-processing-1lm1</guid>
      <description>&lt;p&gt;Unlike traditional batch processing — where data is collected, stored, and then processed in chunks — streaming data is processed as it is produced, allowing for immediate analysis, response, and decision-making. Window operators within MongoDB &lt;a href="https://www.mongodb.com/products/platform/atlas-stream-processing" rel="noopener noreferrer"&gt;Atlas Stream Processing&lt;/a&gt; pipelines allow developers to analyze and process specific fixed-sized “windows” of data within a continuous data stream. This bucketing of the data makes it easy to discover patterns and trends. Without window operators, developers have to process every single data point in the stream, and depending on the volume of data, this can be very resource-intensive. &lt;/p&gt;

&lt;p&gt;For example, consider the solar farm use case where thousands of sensors across the farm are capturing input wattage every second. Moving every data point across tens of thousands of sensor readings is time-consuming and costly. A better solution is to capture the trend of the data by using a window operator and calculating the average watts over an interval of time. This adds value to the business while conserving storage and network resources. &lt;/p&gt;

&lt;p&gt;Currently, Atlas Stream Processing supports two methods for windowing: &lt;a href="https://www.mongodb.com/docs/atlas/atlas-sp/stream-aggregation/#-tumblingwindow" rel="noopener noreferrer"&gt;tumbling windows&lt;/a&gt; and &lt;a href="https://www.mongodb.com/docs/atlas/atlas-sp/stream-aggregation/#mongodb-pipeline-pipe.-hoppingWindow" rel="noopener noreferrer"&gt;hopping windows&lt;/a&gt;. Each of these variations treats the actions within the time window slightly differently. Let’s explore these window operators in more detail.&lt;/p&gt;

&lt;h2&gt;
  
  
  Tumbling windows
&lt;/h2&gt;

&lt;p&gt;Tumbling windows segment the data into non-overlapping, fixed-size windows. This can be helpful for batch-style analysis on discrete chunks of data.  For example, if we want to calculate the average price per minute, we can define a tumbling window with an interval of one minute, as shown in the following figure.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F9a6rv2t2ixtdymuu0roj.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F9a6rv2t2ixtdymuu0roj.png" alt="Graphic showing non-overlapping, fixed-size tumbling windows." width="800" height="152"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;For every minute, the average price for the LEAF stock will be calculated, and that value will be moved on to the next stage in the Atlas Stream Processing aggregation pipeline.  &lt;/p&gt;

&lt;p&gt;Using Atlas Stream Processing, we use the &lt;a href="https://www.mongodb.com/docs/atlas/atlas-sp/stream-aggregation/#-tumblingwindow" rel="noopener noreferrer"&gt;$tumblingWindow&lt;/a&gt; pipeline operator to define the interval, and aggregate operations we seek to perform within the time window.  For example, consider a stream of stock data in JSON format:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt; {
    company_symbol: 'SSC',
    company_name: 'SUPERIOR SAFFRON CORPORATION',
    exchange: 'NASDAQ',
    price: 70.14,
    tx_time: ISODate("2023-08-25T06:56:11.129Z")
  },
  {
    company_symbol: 'GTH',
    company_name: 'GRACEFUL TRAINER HOLDINGS',
    exchange: 'NYSE',
    price: 66.78,
    tx_time: ISODate("2023-08-25T06:56:11.129Z")
  },
  {
    company_symbol: 'FIL',
    company_name: 'FRUSTRATING INK LLC',
    exchange: 'NASDAQ',
    price: 83.92,
    tx_time: ISODate("2023-08-25T06:56:11.129Z")
  },
…
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Here the data from multiple securities is being streamed once per second. To configure Atlas Stream Processing to perform an average price value over one-minute grouping by the company symbol, you configure the $tumblingWindow pipeline operator as follows:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;$tumblingWindow: 
{
            interval: {size: NumberInt(1), unit: "minute"},
            pipeline: [
                {
                    $group: {
                         _id: "$fullDocument.company_symbol",
                        max: {$max: "$fullDocument.price"},
                        avg: {$avg: "$fullDocument.price"}
                    }
                }
            ]
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Note: The use of &lt;code&gt;$fullDocument&lt;/code&gt; is needed here since Atlas Stream Processing is reading from a MongoDB change stream on the collection. The event that comes from the change stream contains metadata about the event and a field called “fullDocument” that includes the data we are interested in.  For more information on the change stream event format, check out the &lt;a href="https://www.mongodb.com/docs/manual/reference/change-events/#change-events" rel="noopener noreferrer"&gt;Change Events documentation&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;The interval units can be: "year", "month", "day", "hour", "minute", “second”, and “ms”. Inside the pipeline, you define aggregation operations on the time interval. In this case, we want to use &lt;code&gt;$group&lt;/code&gt; to group the data by company symbol and return the maximum value and the average of the price value during the one-minute interval.  &lt;/p&gt;

&lt;p&gt;When this pipeline is run, the following results are obtained:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;{
  _id: 'IST',
  max: 89.77,
  avg: 89.50254545454546,
  _stream_meta: {
    sourceType: 'atlas',
    windowStartTimestamp: 2023-08-25T11:02:00.000Z,
    windowEndTimestamp: 2023-08-25T11:03:00.000Z
  }
}
{
  _id: 'DPP',
  max: 51.38,
  avg: 51.23148148148148,
  _stream_meta: {
    sourceType: 'atlas',
    windowStartTimestamp: 2023-08-25T11:02:00.000Z,
    windowEndTimestamp: 2023-08-25T11:03:00.000Z
  }
}
{
  _id: 'RAC',
  max: 60.63,
  avg: 60.47611111111111,
  _stream_meta: {
    sourceType: 'atlas',
    windowStartTimestamp: 2023-08-25T11:02:00.000Z,
    windowEndTimestamp: 2023-08-25T11:03:00.000Z
  }
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Notice there is an extra field, “_stream_meta,” included as part of the result. This data describes the time interval for the aggregation. This output is explained in the section “Window output” later in this post since it applies to the other supported window in Atlas Stream Processing, a hopping window.&lt;/p&gt;

&lt;h2&gt;
  
  
  Hopping windows
&lt;/h2&gt;

&lt;p&gt;A hopping window, sometimes referred to as a sliding window, continuously moves over the data stream in a fixed size as new data arrives. This is useful for ongoing analysis and determining trends — for example, if we want to calculate the average price over the past hour in 30-minute increments as shown in the following figure. Stated another way, at time one hour, you get an average over the past hour. Then at time one hour and 30 minutes, you get an average over the past one hour (which is the calculation of time between 30 minutes and the current time of 90 minutes). &lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F5plv5hl8fzaivybd2hjv.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F5plv5hl8fzaivybd2hjv.png" alt="Graphic showing overlapping, fixed-size hopping windows." width="800" height="252"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Given our stock example, we can create a hopping window pipeline operator that averages over the past minute every 30 seconds as follows:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;hoppingwindow=
    { $hoppingWindow: {
      interval: {size: 1, unit: "minute"}, 
      hopSize: {size: 30, unit: "second"},
      pipeline: 
      [
        { $group: {
            _id: "$fullDocument.company_symbol",
            max: { $max: "$fullDocument.price" },
            min: { $min: "$fullDocument.price" },
            avg: { $avg: "$fullDocument.price" }
        }},
        { $sort: { _id: 1 }}
      ]
    }}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;As with the tumbling window, we start by specifying an interval. But unique to the hopping window, we also specify “hopSize” to define the time segment the pipeline will be evaluated over. An example output of the hopping window defined above is as follows:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;{
  _id: 'IST',
  max: 89.69,
  min: 89.44,
  avg: 89.59533333333334,
  _stream_meta: {
    sourceType: 'atlas',
    windowStartTimestamp: 2023-08-25T10:43:00.000Z,
    windowEndTimestamp: 2023-08-25T10:44:00.000Z
  }
}
{
  _id: 'IST',
  max: 89.69,
  min: 89.27,
  avg: 89.53133333333334,
  _stream_meta: {
    sourceType: 'atlas',
    windowStartTimestamp: 2023-08-25T10:43:30.000Z,
    windowEndTimestamp: 2023-08-25T10:44:30.000Z
  }
}
{
  _id: 'IST',
  max: 89.8,
  min: 89.27,
  avg: 89.53566666666667,
  _stream_meta: {
    sourceType: 'atlas',
    windowStartTimestamp: 2023-08-25T10:44:00.000Z,
    windowEndTimestamp: 2023-08-25T10:45:00.000Z
  }
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The above result set was filtered to only show one of the stock symbols, “IST”, so you can observe the data as it is returned per the “hopSize” defined in the query. The first result was from the interval 43:00 to 44:00, then the minute 43:30 to 44:30, then the minute from 44:00 to 45:00. Note these computations are in 30-second “hops.”&lt;/p&gt;

&lt;h2&gt;
  
  
  Window output
&lt;/h2&gt;

&lt;p&gt;For every message being emitted from a window, some implicit projections are made. This allows the developer to understand the bounds of the window being emitted. Output is automatically projected into a "_stream_meta" field for the message.&lt;/p&gt;

&lt;p&gt;For example, the output of a single tumblingWindow from the earlier example is as follows:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;{
  _id: 'TRF',
  max: 29.64,
  avg: 29.541632653061225,
  _stream_meta: {
    sourceType: 'atlas',
    windowStartTimestamp: 2023-08-25T09:50:00.000Z,
    windowEndTimestamp: 2023-08-25T09:51:00.000Z
  }
}
{
  _id: 'DPP',
  max: 51.28,
  avg: 51.13448979591837,
  _stream_meta: {
    sourceType: 'atlas',
    windowStartTimestamp: 2023-08-25T09:50:00.000Z,
    windowEndTimestamp: 2023-08-25T09:51:00.000Z
  }
}
{
  _id: 'GCC',
  max: 60.41,
  avg: 60.30142857142857,
  _stream_meta: {
    sourceType: 'atlas',
    windowStartTimestamp: 2023-08-25T09:50:00.000Z,
    windowEndTimestamp: 2023-08-25T09:51:00.000Z
  }
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The windowStartTimestamp is the first timestamp of the window, and its data is inclusive in the calculation. The windowEndTimestamp is the last timestamp of the window, and its data is exclusive in the calculation.  &lt;/p&gt;

&lt;h2&gt;
  
  
  Blocking versus non-blocking stages
&lt;/h2&gt;

&lt;p&gt;Keep in mind that the window operations mentioned previously are used within the context of a stream processing query. This query is a MongoDB Aggregation pipeline that can contain other operators. For example, a typical stream processing query includes a &lt;code&gt;$source&lt;/code&gt;, indicating the source of the data stream, and an &lt;code&gt;$emit&lt;/code&gt; or &lt;code&gt;$merge&lt;/code&gt; stage that describes where to write the stream data. To make it easy to build pipelines, we can define variables for these stages. When ready to process the query, we simply pass the stages in an array. To illustrate, consider the following variables:&lt;/p&gt;

&lt;p&gt;A source:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;sourceStocks={$source: { 
connectionName: "stockdb",
db: "StockData",
coll:"Stocks", 
allowedLateness: { unit: 'minute', size: 1 },
timeField: { $dateFromString:{"dateString":"fullDocument.$tx_time"}}}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Our hopping window from earlier:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;hoppingwindow=
    { $hoppingWindow: {
      interval: {size: 1, unit: "minute"}, 
      hopSize: {size: 30, unit: "second"},
      pipeline: 
      [
        { $group: {
            _id: "$fullDocument.company_symbol",
            max: { $max: "$fullDocument.price" },
            min: { $min: "$fullDocument.price" },
            avg: { $avg: "$fullDocument.price" }
        }},
        { $sort: { _id: 1 }}
      ]
    }}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;And a $merge stage:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;mergeStocks={$merge: {      
            into: {
                connectionName: "stockdb",
                db: "StockData",
                coll: "stocksummary"            }
            }}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Now, when we create the stream processor, we simply issue the:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;sp.createStreamProcessor("StockSummary",[sourceStocks,hoppingwindow,mergeStocks])
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;When building pipelines for Atlas Stream Processing, there are certain operators— such as &lt;code&gt;$group&lt;/code&gt;, &lt;code&gt;$sort&lt;/code&gt;, &lt;code&gt;$count&lt;/code&gt;, and &lt;code&gt;$limit&lt;/code&gt; — that are considered blocking stages of a pipeline. This means that the process waits for all of the input data set to arrive and accumulate before processing the data together. In the context of a data stream, these blocking operations do not make sense to be executed on individual data points that arrive in the stream, since data is flowing continuously and value is obtained from more than one data point. &lt;/p&gt;

&lt;p&gt;Other aggregation pipeline operators are not blocking, such as &lt;code&gt;$addFields&lt;/code&gt;, &lt;code&gt;$match&lt;/code&gt;, &lt;code&gt;$project&lt;/code&gt;, &lt;code&gt;$redact&lt;/code&gt;, &lt;code&gt;$replaceRoot&lt;/code&gt;, &lt;code&gt;$replaceWith&lt;/code&gt;, &lt;code&gt;$set&lt;/code&gt;, &lt;code&gt;$unset&lt;/code&gt;, and &lt;code&gt;$unwind&lt;/code&gt;, to name a few. These non-blocking operators can be used anywhere within the stream processing pipeline. But blocking stages, such as &lt;code&gt;$avg&lt;/code&gt;, must be used within the tumbling or hopping window pipeline operators. &lt;/p&gt;

&lt;p&gt;Defining a window gives the stream processor the bounded context it needs (the &lt;a href="https://www.mongodb.com/docs/atlas/atlas-sp/stream-aggregation/#fields-3" rel="noopener noreferrer"&gt;interval&lt;/a&gt; and &lt;a href="https://www.mongodb.com/docs/atlas/atlas-sp/stream-aggregation/#fields-2" rel="noopener noreferrer"&gt;hop size&lt;/a&gt;, if applicable) to appropriately process your data. Thus, the following &lt;code&gt;$group&lt;/code&gt; would not be valid if it was outside of a tumbling or hopping window operator:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;Var g={$group: {
            _id: "$fullDocument.company_symbol",
            max: { $max: "$fullDocument.price" },
            min: { $min: "$fullDocument.price" },
            avg: { $avg: "$fullDocument.price" }
        }}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Also, note that Atlas Stream Processing does not support the $setWindowFields operator. Behind the scenes, &lt;code&gt;$setWindowFields&lt;/code&gt; produces a different output document schema and uses different window boundary semantics. The window operators &lt;code&gt;$tumblingWindow&lt;/code&gt; and &lt;code&gt;$hoppingWindow&lt;/code&gt; used within Atlas Stream Processing are purpose-built to handle streams of data and common issues such as out-of-order and late-arriving data.&lt;/p&gt;

&lt;h2&gt;
  
  
  Summary
&lt;/h2&gt;

&lt;p&gt;Window operations in Atlas Stream Processing provide developers with an easy way to aggregate streaming data through both tumbling and hopping windows. MongoDB is investing heavily in data streams and supporting event-driven architectures with Atlas Stream Processing. &lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Explore Atlas Stream Processing content on the MongoDB Developer Center&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Now that you’ve learned about window operators in Atlas Stream Processing, check out the windowing tutorial and other content on the &lt;a href="https://www.mongodb.com/developer/products/atlas/stream-processing/" rel="noopener noreferrer"&gt;Developer Center&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Try Atlas Stream Processing today&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://cloud.mongodb.com/go?l=https%3A%2F%2Fcloud.mongodb.com%2Fv2%2F%3Cproject%3E%23%2FstreamProcessing" rel="noopener noreferrer"&gt;Login to MongoDB Atlas to get started&lt;/a&gt;&lt;/p&gt;

</description>
      <category>mongodb</category>
      <category>streamprocessing</category>
      <category>kafka</category>
    </item>
  </channel>
</rss>
