<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>Forem: Volodymyr Koval</title>
    <description>The latest articles on Forem by Volodymyr Koval (@volodymyr-koval).</description>
    <link>https://forem.com/volodymyr-koval</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F3853780%2F878abb11-4083-45aa-bbc8-610226dcd923.jpg</url>
      <title>Forem: Volodymyr Koval</title>
      <link>https://forem.com/volodymyr-koval</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://forem.com/feed/volodymyr-koval"/>
    <language>en</language>
    <item>
      <title>Two-level concurrency in Node.js - worker threads and async pools for data integration pipelines</title>
      <dc:creator>Volodymyr Koval</dc:creator>
      <pubDate>Tue, 31 Mar 2026 16:15:33 +0000</pubDate>
      <link>https://forem.com/volodymyr-koval/two-level-concurrency-in-nodejs-worker-threads-and-async-pools-for-data-integration-pipelines-52ac</link>
      <guid>https://forem.com/volodymyr-koval/two-level-concurrency-in-nodejs-worker-threads-and-async-pools-for-data-integration-pipelines-52ac</guid>
      <description>&lt;h2&gt;
  
  
  The problem
&lt;/h2&gt;

&lt;p&gt;Imagine we develop a SaaS platform that analyzes users' activity data - think fraud detection, behavioral analytics, compliance reporting. Our task is to create the integration pipeline that ingests and normalizes raw client data before it reaches those systems.&lt;br&gt;
Our clients collect data, store it in CSV files, and upload them to their S3 buckets. Then they create a mapping for their data in our application - select a datatype for each field, some of them can be serialized JSON objects, others parsed with complex regular expressions or base64 decoded, etc. Our task is to download these files, parse each row, apply custom mapping, find corrupted records, and store the converted data on our side.&lt;/p&gt;

&lt;p&gt;From a bird's-eye view the whole integration task is nothing else but:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F9n2gnww58vzk7adkd7cz.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F9n2gnww58vzk7adkd7cz.png" alt="Diagram: Simple linear pipeline - download file, apply mapping per row, save result" width="800" height="204"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;In more detail it works like this:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Client stores CSV files in the S3 bucket. Each row represents one item&lt;/li&gt;
&lt;li&gt;Client gives us credentials for accessing these files&lt;/li&gt;
&lt;li&gt;Client creates mapping for rows&lt;/li&gt;
&lt;li&gt;Client schedules integration runs&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fqkfkswr4rua08sfiyq1x.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fqkfkswr4rua08sfiyq1x.png" alt="Diagram: Client owns S3 bucket with CSV files, gives credentials, describes integration mapping rules, and schedules the integration pipeline" width="800" height="512"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;When scheduled time comes we execute the integration performing the next steps:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;push task to a queue&lt;/li&gt;
&lt;li&gt;receive task by the integration pipeline&lt;/li&gt;
&lt;li&gt;download file from client's S3&lt;/li&gt;
&lt;li&gt;get mapping&lt;/li&gt;
&lt;li&gt;apply mapping per row&lt;/li&gt;
&lt;li&gt;store new file to our S3 for further processing (uploading to the database, used by analytics tools, etc.)&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F9rxbwhw5brekd8lplkcl.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F9rxbwhw5brekd8lplkcl.png" alt="Diagram: Integration pipeline downloads CSV from client's S3 using credentials, gets mapping rules, creates mapping result, and stores it to our S3 bucket" width="800" height="442"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Code is:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;&lt;span class="kd"&gt;const&lt;/span&gt; &lt;span class="nx"&gt;rawData&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="nf"&gt;downloadFile&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;file&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;url&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
&lt;span class="kd"&gt;const&lt;/span&gt; &lt;span class="nx"&gt;processed&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;applyMapping&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;rawData&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nx"&gt;clientMapping&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
&lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="nf"&gt;saveToS3&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;file&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;s3Key&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nx"&gt;processed&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Introducing a message queue (SQS, RabbitMQ, any) helps with a few things:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;pure asynchronous execution&lt;/li&gt;
&lt;li&gt;queue's messages can be dispatched between any number of consumers so scaling is flexible&lt;/li&gt;
&lt;li&gt;acknowledgment mechanism of queue can be utilized for managing retries&lt;/li&gt;
&lt;li&gt;queues are great for managing backpressure &lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Message in queue can be something like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"clientConnection"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="err"&gt;...S&lt;/span&gt;&lt;span class="mi"&gt;3&lt;/span&gt;&lt;span class="err"&gt;Credentials&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="err"&gt;bucketName&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="err"&gt;etc.&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"mappingId"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="err"&gt;...&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"pathToCSV"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="err"&gt;...&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;At this point we observe two issues:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;if file size is relatively big (&amp;gt; 100 MB) we are starting to abuse RAM;&lt;/li&gt;
&lt;li&gt;we split the processing into distinct phases (download, map, upload) which are sequential, and that increases execution time.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;But both issues are quite easy to fix - we just need to introduce streams and process data chunk by chunk, row by row. &lt;br&gt;
Streams are good for two reasons:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;they allow us to keep memory usage flat by processing data chunk-by-chunk. All blob storages have streaming APIs as well as many HTTP-based services;&lt;/li&gt;
&lt;li&gt;they handle backpressure automatically and data flows from source to destination through mapping without any issues even if there are some network delays or CPU-intensive processing.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Code is nothing more than:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;&lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="nx"&gt;pipeline&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="k"&gt;from&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;node:stream/promises&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="nx"&gt;createDownloadStream&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="k"&gt;from&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;./download&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="nx"&gt;createMappingTransform&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="k"&gt;from&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;./mapping&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="nx"&gt;createS3UploadStream&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="k"&gt;from&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;./upload&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

&lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="nf"&gt;pipeline&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
  &lt;span class="nf"&gt;createDownloadStream&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;file&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;url&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
  &lt;span class="nf"&gt;createMappingTransform&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;clientMapping&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
  &lt;span class="nf"&gt;createS3UploadStream&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;file&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;s3Key&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
&lt;span class="p"&gt;);&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;And that's it. Many production systems work fine with this simple architecture. For sure, a few more things should be added like failure handling and reporting results, but overall it is a valid way to build a typical integration pipeline.&lt;/p&gt;

&lt;p&gt;We delivered the integration code packaged as AWS Lambda and started monitoring the system.&lt;/p&gt;

&lt;p&gt;Problems started to show up. &lt;/p&gt;

&lt;p&gt;The first issue arose when the integration started to fail because some client's CSV files were really big and it took a lot of time just to transfer gigabytes of data over the network. Lambdas had limited execution time and we just didn't fit within this timeframe.&lt;/p&gt;

&lt;p&gt;We made a decision to ship our code as a regular service to good old EC2 instances.&lt;/p&gt;

&lt;p&gt;It worked fine until...&lt;/p&gt;

&lt;p&gt;The second issue arose when one client requested batch processing. Their CSV files were huge and they had split them into multiple. They wanted the integration to download all of them, but treat the whole bunch as one logical unit.&lt;br&gt;
Our intuition told us that we could just process all files in parallel and combine the execution results:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;&lt;span class="kd"&gt;const&lt;/span&gt; &lt;span class="nx"&gt;results&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="nb"&gt;Promise&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;all&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
  &lt;span class="nx"&gt;files&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;map&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="nx"&gt;file&lt;/span&gt; &lt;span class="o"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="nf"&gt;pipeline&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
      &lt;span class="nf"&gt;createDownloadStream&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;file&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;url&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
      &lt;span class="nf"&gt;createMappingTransform&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;clientMapping&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
      &lt;span class="nf"&gt;createS3UploadStream&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;file&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;s3Key&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="p"&gt;);&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="na"&gt;fileId&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nx"&gt;file&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="na"&gt;status&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;ok&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt; &lt;span class="p"&gt;};&lt;/span&gt;
  &lt;span class="p"&gt;})&lt;/span&gt;
&lt;span class="p"&gt;);&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;It worked great because for each file most of the time V8 engine was in the &lt;code&gt;idle&lt;/code&gt; state - waiting for network responses - so we could process multiple files in parallel without increasing processing time.&lt;/p&gt;

&lt;p&gt;But then we had clients with a huge number of fields that required some parsing: JSON strings, base64 encoded text, complex regular expressions. The monitoring showed that processing time was increasing and Node.js process started to use noticeable CPU time.&lt;br&gt;
The worst thing was that we were paying for a multicore EC2 instance but Node.js process was struggling using only one core.&lt;/p&gt;
&lt;h2&gt;
  
  
  Where Node.js gets stuck
&lt;/h2&gt;

&lt;p&gt;To understand what's going on we have to set our business logic aside for a bit and think how Node.js handles CPU-intensive tasks.&lt;/p&gt;

&lt;p&gt;We all know that Node.js is single-threaded but it's only partially true.&lt;br&gt;
First of all let's recall that Node.js consists of three parts:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;V8 - JavaScript engine responsible for memory management (Heap, Garbage Collector) and code execution (Call Stack)&lt;/li&gt;
&lt;li&gt;libuv - asynchronous operations handler (Event loop, thread pool, async I/O)&lt;/li&gt;
&lt;li&gt;Node.js API - bridge between V8 and libuv&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Our application's JS code (business logic) is executed by V8 which is single-threaded and shines for I/O operations when all the heavy lifting is done by libuv and its C/C++ modules. &lt;/p&gt;

&lt;p&gt;Libuv maintains a thread pool (configurable via &lt;code&gt;UV_THREADPOOL_SIZE&lt;/code&gt;). Many standard Node.js modules - &lt;code&gt;zlib&lt;/code&gt;, &lt;code&gt;crypto&lt;/code&gt;, file system operations - delegate work to this pool via C/C++ libraries. So, even if we perform some heavy encryption or compression, V8's event loop stays free.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F57bhr75rtoe8hp0sx4du.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F57bhr75rtoe8hp0sx4du.png" alt="Timeline: libuv thread pool handles I/O while the main thread event loop stays free for other work" width="800" height="236"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;If a task requires only I/O operations without loading the CPU, Node.js already handles it well. Downloading and uploading multiple files in parallel is done by libuv and doesn't make the event loop busy, so for regular cases wrapping pipelines into &lt;code&gt;Promise.all()&lt;/code&gt; would be enough.&lt;br&gt;
However, our issue is rooted in the fact that we are going to perform CPU-intensive mapping on the V8 side and it blocks the single thread. Libuv can't help us with this because there are no native addons for our business logic.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fhc8uukd5tyqkjdvmd9ap.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fhc8uukd5tyqkjdvmd9ap.png" alt="Timeline: libuv pool sits idle while CPU-intensive mapping blocks the main thread" width="800" height="253"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;And here we have the core question - &lt;strong&gt;how to optimize our pipeline for CPU-intensive operations and free up the main thread&lt;/strong&gt;?&lt;/p&gt;
&lt;h2&gt;
  
  
  The obvious fix that doesn't help
&lt;/h2&gt;

&lt;p&gt;Node.js provides a few built-in ways to execute CPU-intensive tasks outside the main thread:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Worker Threads&lt;/li&gt;
&lt;li&gt;Clusters&lt;/li&gt;
&lt;li&gt;Child Processes (the low-level tool used by both above)&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;We will consider using Worker Threads as the best choice for our goals. &lt;/p&gt;

&lt;p&gt;Worker Thread is a mechanism that allows creating a separate V8 isolate with its own event loop in a separate OS thread. The great thing about it is that the main process can pass the data to each Worker Thread, and on top of that, Worker Threads can send the data back to the main thread as well as notify it about errors and finishing of execution.&lt;/p&gt;

&lt;p&gt;The simplest example of the integration pipeline execution can be:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;&lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="nx"&gt;Worker&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nx"&gt;isMainThread&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nx"&gt;workerData&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nx"&gt;parentPort&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="k"&gt;from&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;node:worker_threads&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="nx"&gt;pipeline&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="k"&gt;from&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;node:stream/promises&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="nx"&gt;createDownloadStream&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="k"&gt;from&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;./download&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="nx"&gt;createMappingTransform&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="k"&gt;from&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;./mapping&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="nx"&gt;createS3UploadStream&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="k"&gt;from&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;./upload&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

&lt;span class="c1"&gt;// This file runs twice: once as the main thread, once inside the worker.&lt;/span&gt;
&lt;span class="c1"&gt;// isMainThread tells us which context we're in.&lt;/span&gt;
&lt;span class="k"&gt;if &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;isMainThread&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
  &lt;span class="kd"&gt;function&lt;/span&gt; &lt;span class="nf"&gt;runWorker&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;file&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nx"&gt;File&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nx"&gt;mapping&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nx"&gt;Mapping&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt; &lt;span class="nb"&gt;Promise&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="k"&gt;void&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;Promise&lt;/span&gt;&lt;span class="p"&gt;((&lt;/span&gt;&lt;span class="nx"&gt;resolve&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nx"&gt;reject&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
      &lt;span class="c1"&gt;// Spawns a new worker thread running this same file.&lt;/span&gt;
      &lt;span class="c1"&gt;// workerData is how we pass data into the worker at startup.&lt;/span&gt;
      &lt;span class="kd"&gt;const&lt;/span&gt; &lt;span class="nx"&gt;worker&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;Worker&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;__filename&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="na"&gt;workerData&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="nx"&gt;file&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nx"&gt;mapping&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt;
      &lt;span class="p"&gt;});&lt;/span&gt;

      &lt;span class="c1"&gt;// Workers communicate back via messages and error events&lt;/span&gt;
      &lt;span class="nx"&gt;worker&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;on&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;message&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nx"&gt;resolve&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
      &lt;span class="nx"&gt;worker&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;on&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;error&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nx"&gt;reject&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
    &lt;span class="p"&gt;});&lt;/span&gt;
  &lt;span class="p"&gt;}&lt;/span&gt;

  &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="nf"&gt;runWorker&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;file&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nx"&gt;clientMapping&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;

&lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="k"&gt;else&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
  &lt;span class="c1"&gt;// Inside the worker thread: workerData contains what we passed in above&lt;/span&gt;
  &lt;span class="kd"&gt;const&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="nx"&gt;file&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nx"&gt;mapping&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nx"&gt;workerData&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

  &lt;span class="k"&gt;try&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="nf"&gt;pipeline&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
      &lt;span class="nf"&gt;createDownloadStream&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;file&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;url&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
      &lt;span class="nf"&gt;createMappingTransform&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;mapping&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
      &lt;span class="nf"&gt;createS3UploadStream&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;file&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;s3Key&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="p"&gt;);&lt;/span&gt;
    &lt;span class="c1"&gt;// Signal the main thread that we're done&lt;/span&gt;
    &lt;span class="nx"&gt;parentPort&lt;/span&gt;&lt;span class="o"&gt;!&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;postMessage&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;done&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
  &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="k"&gt;catch &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;err&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;throw&lt;/span&gt; &lt;span class="nx"&gt;err&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
  &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;For processing multiple files the naive solution is to create a worker per file:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;&lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="nx"&gt;Worker&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="k"&gt;from&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;node:worker_threads&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

&lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="nb"&gt;Promise&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;all&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
  &lt;span class="nx"&gt;files&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;map&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;file&lt;/span&gt; &lt;span class="o"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nb"&gt;Promise&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="k"&gt;void&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;&lt;span class="p"&gt;((&lt;/span&gt;&lt;span class="nx"&gt;resolve&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nx"&gt;reject&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="kd"&gt;const&lt;/span&gt; &lt;span class="nx"&gt;worker&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;Worker&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;./pipeline.ts&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
      &lt;span class="na"&gt;workerData&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="nx"&gt;file&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="na"&gt;mapping&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nx"&gt;clientMapping&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="p"&gt;});&lt;/span&gt;

    &lt;span class="nx"&gt;worker&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;on&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;message&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nx"&gt;resolve&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
    &lt;span class="nx"&gt;worker&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;on&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;error&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nx"&gt;reject&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
  &lt;span class="p"&gt;}))&lt;/span&gt;
&lt;span class="p"&gt;);&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;and &lt;code&gt;pipeline.ts&lt;/code&gt; is:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;&lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="nx"&gt;workerData&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nx"&gt;parentPort&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="k"&gt;from&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;node:worker_threads&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

&lt;span class="kd"&gt;const&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="nx"&gt;file&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nx"&gt;mapping&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nx"&gt;workerData&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

&lt;span class="k"&gt;try&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
  &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="nf"&gt;pipeline&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="nf"&gt;download&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;file&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;url&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;      &lt;span class="c1"&gt;// readable stream&lt;/span&gt;
    &lt;span class="nf"&gt;transform&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;mapping&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;      &lt;span class="c1"&gt;// transform stream  &lt;/span&gt;
    &lt;span class="nf"&gt;upload&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;file&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;s3Key&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;      &lt;span class="c1"&gt;// writable stream&lt;/span&gt;
  &lt;span class="p"&gt;);&lt;/span&gt;
  &lt;span class="nx"&gt;parentPort&lt;/span&gt;&lt;span class="o"&gt;!&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;postMessage&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;done&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="k"&gt;catch &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;err&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
  &lt;span class="k"&gt;throw&lt;/span&gt; &lt;span class="nx"&gt;err&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;However, this solution isn't optimal if the number of open worker threads is higher than the number of CPU cores. For 8 files we start 8 workers but EC2 has only 4 cores - worker threads start to compete for cores and we get context switching overhead. However, it is not the worst part. The worst part is that each Worker Thread spins up a full V8 Isolate with its own heap and its own event loop. It might not be an issue if the number of files is low, but with high numbers the overhead becomes real.&lt;/p&gt;

&lt;h2&gt;
  
  
  Two-level concurrency
&lt;/h2&gt;

&lt;p&gt;And here we are for building a final solution. Let's first illustrate it with the next diagram:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fqttkwyjmpjul9wyvqxkl.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fqttkwyjmpjul9wyvqxkl.png" alt="Architecture diagram: Queue feeds Main Thread which distributes work to N worker threads (Level 1), each running M concurrent pipelines (Level 2)" width="800" height="674"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;As we discussed before, there is no need to create more Worker Threads than the number of CPU cores, so we can create exactly the same number &lt;code&gt;N = num(CPU cores)&lt;/code&gt;. It is our concurrency Level 1.&lt;br&gt;
All &lt;code&gt;N&lt;/code&gt; workers can be created during the service boot and wait for a task from the Main Thread. &lt;/p&gt;

&lt;p&gt;If the number of files is higher than the number of workers &lt;code&gt;N&lt;/code&gt; we can round-robin them between workers equally. Each worker then processes its assigned files through an async pool of M concurrent pipelines - this is our concurrency Level 2. &lt;strong&gt;While Level 1 spreads work across CPU cores, Level 2 keeps each core busy by overlapping I/O wait across multiple files within a single worker&lt;/strong&gt;.&lt;/p&gt;

&lt;p&gt;We can't just &lt;code&gt;Promise.all()&lt;/code&gt; the files - if a worker gets 25 files, that's 25 pipelines fighting for memory and file descriptors at once. An async pool caps concurrency at M simultaneous pipelines per worker, starting the next one only when a slot frees up. When one pipeline waits for a network ACK, another executes CPU-intensive mapping and vice versa.&lt;/p&gt;

&lt;p&gt;Let's illustrate this process with a diagram for 6 files and 2 Worker Threads:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fxlsk5yleooay49csvrss.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fxlsk5yleooay49csvrss.png" alt="Timeline: two worker threads processing 6 files with interleaved network reads, CPU mapping, and network writes across concurrent pipelines" width="800" height="310"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Notice that when one pipeline uses the CPU power for performing mapping, we still can download or upload data over HTTP - it is handled by &lt;code&gt;libuv&lt;/code&gt; threads outside of the Worker Thread's event loop.&lt;/p&gt;

&lt;p&gt;As a result we can:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;process CPU-heavy pipelines for multiple files with real CPU cores parallelization&lt;/li&gt;
&lt;li&gt;utilize multicore server resources effectively&lt;/li&gt;
&lt;li&gt;as files are processed in parallel, we can drastically reduce execution time&lt;/li&gt;
&lt;li&gt;as parallel execution happens inside one Node.js process, orchestration and error handling would be much easier than if it were distributed between multiple processes or even machines &lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Keep in mind that each worker runs its own V8 isolate - that's roughly 10–30MB of baseline memory per worker, regardless of workload. With &lt;code&gt;N&lt;/code&gt; workers, that cost is part of the process's total RSS. On a 4-core instance, you're looking at 40–120MB just for the isolates before any actual data flows through.&lt;/p&gt;

&lt;p&gt;When Main Thread pulls a scheduled integration run message from a queue it fetches a mapping, S3 credentials and spreads the work between Worker Threads. It waits until all workers report that the job succeeded and sends the ACK to the queue. &lt;br&gt;
Files in a batch represent the same dataset - think of a single day's activity split across multiple files for size reasons. If we ACK after processing only half of them, the queue considers the job done, but the downstream system gets an incomplete picture. For some clients, partial data is worse than no data at all. So the rule is simple: ACK fires only after every worker reports success. If anything fails, the message stays in the queue for a retry or lands in a dead-letter queue for investigation.&lt;/p&gt;

&lt;p&gt;This is also why we keep a single batch inside a single process. Splitting one batch across multiple EC2 instances adds distributed coordination - every instance must agree on success, handle partial failures, and clean up after crashes. A single multi-core machine already handles the load. If multiple batches arrive at the same time, they queue up - each instance processes one batch at a time and picks up the next one when it's done.&lt;/p&gt;

&lt;p&gt;Since &lt;code&gt;N&lt;/code&gt; always equals the number of CPU cores, you can size the EC2 instance based on how many files a client's batch typically contains - a 2-core instance for small batches, a 16-core instance for large ones. No code changes required, and it could help optimize infrastructure costs.&lt;/p&gt;
&lt;h2&gt;
  
  
  Putting it together
&lt;/h2&gt;

&lt;p&gt;The Main Thread will look something like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;&lt;span class="c1"&gt;// (Main Thread)&lt;/span&gt;

&lt;span class="kd"&gt;const&lt;/span&gt; &lt;span class="nx"&gt;N&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nx"&gt;os&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;availableParallelism&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;

&lt;span class="c1"&gt;// Boot N workers once at process start&lt;/span&gt;
&lt;span class="kd"&gt;const&lt;/span&gt; &lt;span class="nx"&gt;workers&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nb"&gt;Array&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="k"&gt;from&lt;/span&gt;&lt;span class="p"&gt;({&lt;/span&gt; &lt;span class="na"&gt;length&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nx"&gt;N&lt;/span&gt; &lt;span class="p"&gt;},&lt;/span&gt; &lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="o"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;Worker&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;./worker.ts&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;));&lt;/span&gt;

&lt;span class="c1"&gt;// Listen for queue messages&lt;/span&gt;
&lt;span class="nx"&gt;queue&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;on&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;message&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="k"&gt;async &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;message&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
  &lt;span class="kd"&gt;const&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="nx"&gt;files&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nx"&gt;mappingId&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nx"&gt;credentials&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nx"&gt;message&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

  &lt;span class="c1"&gt;// Fetch mapping once - workers don't do this themselves&lt;/span&gt;
  &lt;span class="kd"&gt;const&lt;/span&gt; &lt;span class="nx"&gt;mapping&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="nx"&gt;mappingService&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;get&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;mappingId&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;

  &lt;span class="c1"&gt;// Distribute files across workers via round-robin&lt;/span&gt;
  &lt;span class="kd"&gt;const&lt;/span&gt; &lt;span class="nx"&gt;slices&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;roundRobin&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;files&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nx"&gt;N&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;

  &lt;span class="c1"&gt;// Dispatch each slice and wait for all workers to finish&lt;/span&gt;
  &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="nb"&gt;Promise&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;all&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="nx"&gt;workers&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;map&lt;/span&gt;&lt;span class="p"&gt;((&lt;/span&gt;&lt;span class="nx"&gt;worker&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nx"&gt;i&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;=&amp;gt;&lt;/span&gt;
      &lt;span class="nf"&gt;dispatch&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;worker&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="na"&gt;files&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nx"&gt;slices&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="nx"&gt;i&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt; &lt;span class="nx"&gt;mapping&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nx"&gt;credentials&lt;/span&gt; &lt;span class="p"&gt;})&lt;/span&gt;
    &lt;span class="p"&gt;)&lt;/span&gt;
  &lt;span class="p"&gt;);&lt;/span&gt;

  &lt;span class="c1"&gt;// All workers done - safe to ACK&lt;/span&gt;
  &lt;span class="nx"&gt;queue&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;ack&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;message&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
&lt;span class="p"&gt;});&lt;/span&gt;

&lt;span class="kd"&gt;function&lt;/span&gt; &lt;span class="nf"&gt;dispatch&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;worker&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nx"&gt;payload&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt; &lt;span class="nb"&gt;Promise&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="k"&gt;void&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
  &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;Promise&lt;/span&gt;&lt;span class="p"&gt;((&lt;/span&gt;&lt;span class="nx"&gt;resolve&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nx"&gt;reject&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="nx"&gt;worker&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;postMessage&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;payload&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
    &lt;span class="nx"&gt;worker&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;once&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;message&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nx"&gt;resolve&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
    &lt;span class="nx"&gt;worker&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;once&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;error&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nx"&gt;reject&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
  &lt;span class="p"&gt;});&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;And Worker Thread will be updated with listening to a message from the Main Thread:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;&lt;span class="c1"&gt;// (Worker Thread)&lt;/span&gt;

&lt;span class="nx"&gt;parentPort&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;on&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;message&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="k"&gt;async &lt;/span&gt;&lt;span class="p"&gt;({&lt;/span&gt; &lt;span class="nx"&gt;files&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nx"&gt;mapping&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nx"&gt;credentials&lt;/span&gt; &lt;span class="p"&gt;})&lt;/span&gt; &lt;span class="o"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
  &lt;span class="c1"&gt;// Process assigned files as a pool of M concurrent pipelines&lt;/span&gt;
  &lt;span class="c1"&gt;// asyncPool(concurrency, items, fn) — runs at most M pipelines simultaneously &lt;/span&gt;
  &lt;span class="c1"&gt;// (e.g. tiny-async-pool, or a custom implementation)&lt;/span&gt;
  &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="nf"&gt;asyncPool&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;M&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nx"&gt;files&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;file&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;=&amp;gt;&lt;/span&gt;
    &lt;span class="nf"&gt;pipeline&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
      &lt;span class="nf"&gt;createDownloadStream&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;file&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;url&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nx"&gt;credentials&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
      &lt;span class="nf"&gt;createMappingTransform&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;mapping&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
      &lt;span class="nf"&gt;createS3UploadStream&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;file&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;s3Key&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="p"&gt;)&lt;/span&gt;
  &lt;span class="p"&gt;);&lt;/span&gt;

  &lt;span class="nx"&gt;parentPort&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;postMessage&lt;/span&gt;&lt;span class="p"&gt;({&lt;/span&gt; &lt;span class="na"&gt;status&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;done&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt; &lt;span class="p"&gt;});&lt;/span&gt;
&lt;span class="p"&gt;});&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This gives an idea how to start building CPU-bound integration pipelines avoiding computation bottleneck.&lt;/p&gt;

&lt;h2&gt;
  
  
  What this architecture does not solve (and why that's fine)
&lt;/h2&gt;

&lt;p&gt;I intentionally omit a few important parts that will be crucial for a production-ready implementation - they are out of scope of the article and highly depend on your current setup, business needs and preferences. However, let's briefly discuss some of them and mention possible solutions:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;dealing with corrupted rows (could be a client's business choice):

&lt;ul&gt;
&lt;li&gt;logging them and reporting to client after integration is executed&lt;/li&gt;
&lt;li&gt;failing the whole integration and then reporting&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;

&lt;li&gt;clean up if integration failed:

&lt;ul&gt;
&lt;li&gt;removing corrupted S3 files or using &lt;code&gt;abortMultipartUpload&lt;/code&gt;
&lt;/li&gt;
&lt;li&gt;sending ACK to the queue but reporting failure or retrying the whole process&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;

&lt;li&gt;storing credentials to the client's storage:

&lt;ul&gt;
&lt;li&gt;specialized service&lt;/li&gt;
&lt;li&gt;secrets vault&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;

&lt;li&gt;sending scheduled integration to a queue:

&lt;ul&gt;
&lt;li&gt;Amazon EventBridge Scheduler&lt;/li&gt;
&lt;li&gt;Apache Airflow&lt;/li&gt;
&lt;li&gt;Temporal&lt;/li&gt;
&lt;li&gt;and many other schedulers.&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;

&lt;/ul&gt;

&lt;h2&gt;
  
  
  Where else this pattern applies
&lt;/h2&gt;

&lt;p&gt;As a conclusion I'd like to mention when this pattern can be applied and when it doesn't make a lot of sense to implement it.&lt;/p&gt;

&lt;p&gt;The main benefit of this solution is parallelization of CPU-bound tasks and it can be useful for:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Complex business rules — scoring, pricing engines, eligibility checks, fraud detection running across hundreds of thousands of rows&lt;/li&gt;
&lt;li&gt;Statistical computation — rolling windows, percentile calculations, custom aggregations over large in-memory datasets.&lt;/li&gt;
&lt;li&gt;Text processing — heavy regex work, tokenization, parsing of custom or proprietary formats.&lt;/li&gt;
&lt;li&gt;Large JSON processing — &lt;code&gt;JSON.parse&lt;/code&gt; and &lt;code&gt;JSON.stringify&lt;/code&gt; run synchronously in V8. For small payloads this is negligible, but deserializing a 50MB blob will block noticeably.&lt;/li&gt;
&lt;li&gt;Templates rendering — generating thousands of HTML emails or reports in a tight loop.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;If you suspect your pipeline is doing heavy computations - profile first, and if CPU load is constant and high, consider implementing something similar to the solution above. If you just use Node.js built-in modules like &lt;code&gt;crypto&lt;/code&gt; or &lt;code&gt;zlib&lt;/code&gt; - they already solved the issue by delegating work to libuv's C/C++ addons and don't block the event loop. Always worth checking before implementing.&lt;/p&gt;

&lt;p&gt;Even if you don't find the described implementation useful, I hope you've got at least some intuition how to move big files over the network, use Streams, Worker Threads and how some parts of Node.js work.&lt;/p&gt;

</description>
      <category>node</category>
      <category>javascript</category>
      <category>architecture</category>
      <category>performance</category>
    </item>
  </channel>
</rss>
