DEV Community

Cover image for Building a Scalable Event-Driven System on AWS with DynamoDB Streams, SQS, and ECS
ryo ariyama
ryo ariyama

Posted on

2

Building a Scalable Event-Driven System on AWS with DynamoDB Streams, SQS, and ECS

Introduction

I previously built an event-driven system on AWS and would like to introduce it here. The event-driven approach I'm referring to is an architecture that processes events triggered by DynamoDB updates and similar events.
As a prerequisite, we're building a multi-tenant system where servers are shared but each tenant has its own DynamoDB table.
This blog can be read in about 5-10 minutes and should be helpful for those who want to learn system architecture on AWS or understand SQS auto-scaling functionality.

System Architecture Overview

Originally, we were operating the following system:
Image description

Lambda functions were triggered based on DynamoDB streams events to perform processing. While this met the requirement of processing events as they occurred, it had the following problems:

  • When event sizes are too large, Lambda may timeout before processing completes within the execution time limit. Additionally, limited CPU and memory resources could lead to OOM (Out of Memory) errors.
  • Events could be lost in the following scenarios:
    • When there are too many events to process within DynamoDB Streams' record retention period (24 hours)
    • When ECS cannot operate normally due to AWS outages or other issues

To address these issues, we made the following improvements:

  • Replaced Lambda with ECS
  • Added SQS between DynamoDB and ECS

These changes provide the following benefits:

  • Using a platform without time constraints allows processing to continue until completion, even with large inputs
  • Larger machine resources prevent CPU and memory-related issues that Lambda couldn't handle
  • When event processing fails, events are stored in the queue for retry once the system returns to normal

The updated architecture looks like this:
Image description

The pipeline between DynamoDB and SQS uses a service called EventBridge Pipes. This eliminates the need to create Lambda functions for event sending and allows filtering and error handling (previously done in code) to be handled without code, reducing maintenance costs.
Here's a sample Terraform code using the aws_pipes_pipe resource:

###########################
# Event
###########################
resource "aws_pipes_pipe" "event_bulk" {
  depends_on = [aws_iam_role_policy.event_pipe_role]
  name       = "event-sqs"
  role_arn   = aws_iam_role.event_pipe_role.arn
  source     = aws_dynamodb_table.events.stream_arn
  target     = var.sqs_event_queue_arn

  source_parameters {
    dynamodb_stream_parameters {
      batch_size                    = var.batch_size
      starting_position             = var.starting_position
      maximum_record_age_in_seconds = 86400
    }
    filter_criteria {
      filter {
        pattern = jsonencode({
          eventName : ["INSERT"]
          dynamodb = {
            "NewImage" : {
              "status" : { "S" : ["pending"] }
            }
          }
        })
      }
    }
  }
}

###########################
# SQS
###########################
resource "aws_sqs_queue" "event" {
  name                       = "${var.batch_container_name}-sqs"
  delay_seconds              = var.sqs_delay_seconds
  max_message_size           = var.sqs_max_message_size
  message_retention_seconds  = var.sqs_message_retention_seconds
  receive_wait_time_seconds  = var.sqs_receive_wait_time_seconds
  visibility_timeout_seconds = var.sqs_visibility_timeout_seconds
  sqs_managed_sse_enabled    = true
}

###########################
# DynamoDB
###########################
resource "aws_dynamodb_table" "events" {
  name             = "events"
  billing_mode     = "PAY_PER_REQUEST"
  hash_key         = "_global_id"
  range_key        = "_event_type"
  stream_enabled   = true
  stream_view_type = "NEW_AND_OLD_IMAGES"
}

###########################
# IAM
###########################
resource "aws_iam_role" "event_pipe_role" {
  name = "event-pipe-role"
  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Principal = {
          Service = "pipes.amazonaws.com"
        }
        Action = "sts:AssumeRole"
        Condition = {
          StringEquals = {
            "aws:SourceAccount" = var.aws_account_id
          }
        }
      }
    ]
  })
}

resource "aws_iam_role_policy" "event_pipe_role" {
  name = "event-pipe-role-policy"
  role = aws_iam_role.event_pipe_role.id
  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Action = [
          "dynamodb:DescribeStream",
          "dynamodb:GetRecords",
          "dynamodb:GetShardIterator",
          "dynamodb:ListStreams"
        ]
        Resource = aws_dynamodb_table.events.stream_arn
      },
      {
        Effect = "Allow"
        Action = [
          "sqs:SendMessage"
        ]
        Resource = aws_sqs_queue.event.arn
      }
    ]
  })
}


Enter fullscreen mode Exit fullscreen mode

The application code within the ECS task is simplified but looks like this.

import json
import logging
import os
import time

import boto3


logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Get the SQS queue URL from environment variables
QUEUE_URL = os.environ.get("SQS_QUEUE_URL")

# Initialize the SQS client
sqs = boto3.client("sqs", region_name=os.environ.get("AWS_REGION", "ap-northeast-1"))

def process_event(event: dict):
    """
    Business logic to process a single event.
    Replace this with your own implementation.
    """
    logger.info(f"Processing event: {event}")
    # Simulate some processing time
    time.sleep(1)
    # You can raise an exception to simulate a failure
    # raise Exception("Simulated failure")

def main():
    """
    Poll messages from the SQS queue and process them one by one.
    """
    while True:
        try:
            response = sqs.receive_message(
                QueueUrl=QUEUE_URL,
                MaxNumberOfMessages=10,
                WaitTimeSeconds=20,        # Enable long polling
                VisibilityTimeout=60       # Time for processing before making the message visible again
            )

            messages = response.get("Messages", [])
            if not messages:
                logger.info("No messages received. Waiting...")
                continue

            for message in messages:
                try:
                    # Parse the message body
                    body = json.loads(message["Body"])
                    logger.info(f"Received message: {body}")

                    # If EventBridge Pipes is used, actual DynamoDB record is nested inside
                    # Uncomment and adjust based on actual structure
                    # dynamodb_record = json.loads(body["detail"])["dynamodb"]

                    process_event(body)

                    # Delete the message only if processing succeeds
                    sqs.delete_message(
                        QueueUrl=QUEUE_URL,
                        ReceiptHandle=message["ReceiptHandle"]
                    )
                    logger.info("Message processed and deleted successfully.")

                except Exception as e:
                    logger.error(f"Error processing message: {e}", exc_info=True)
                    # Do not delete the message to allow for retry

        except Exception as e:
            logger.error(f"Error polling messages: {e}", exc_info=True)
            # Wait a bit before retrying on polling failure
            time.sleep(10)
Enter fullscreen mode Exit fullscreen mode

There are several considerations for this system, but determining how many ECS tasks to launch is one of the main topics. The next section explains this in detail.

ECS Task Auto Scaling

For cost and performance optimization, we want to dynamically adjust the number of ECS tasks based on the number of messages to process. Since ECS can auto-scale based on specific events, we configured it to auto-scale based on SQS message count metrics.

Step scaling

We use Step scaling policies for auto-scaling. Step scaling scales in or out based on specified thresholds. You can configure scaling settings like:

  • 1~100 messages -> Set total task to 1
  • 101~200件 messages -> Set total tasks to 2
  • 201~300件 messages -> Set total tasks to 3

Here's the terraform code for scale-out:

resource "aws_appautoscaling_target" "ecs_target" {
  max_capacity       = 3 // Maximum number of tasks that can be run in the ECS service.
  min_capacity       = 0
  resource_id        = "service/${aws_ecs_cluster.batch.name}/${aws_ecs_service.batch.name}"
  scalable_dimension = "ecs:service:DesiredCount"
  service_namespace  = "ecs"
}

resource "aws_appautoscaling_policy" "scale_out" {
  name               = "ecs-batch-scale-out"
  policy_type        = "StepScaling"
  resource_id        = aws_appautoscaling_target.ecs_target.resource_id
  scalable_dimension = aws_appautoscaling_target.ecs_target.scalable_dimension
  service_namespace  = aws_appautoscaling_target.ecs_target.service_namespace

  step_scaling_policy_configuration {
    adjustment_type         = "ExactCapacity"
    cooldown                = 60
    metric_aggregation_type = "Average"

    # scale out to 1 task: 0 < metrics value <= 100
    step_adjustment {
      metric_interval_lower_bound = 0  # Applied when greater than this value (boundary not included)
      metric_interval_upper_bound = 100 # Applied when less than or equal to this value (boundary included)
      scaling_adjustment          = 1
    }

    # scale out to 2 tasks: 100 < metrics value <= 200
    step_adjustment {
      metric_interval_lower_bound = 101
      metric_interval_upper_bound = 200
      scaling_adjustment          = 2
    }

    # scale out to 3 tasks: 200 < metrics value
    step_adjustment {
      metric_interval_lower_bound = 201
      scaling_adjustment          = 3
    }
  }
}

resource "aws_cloudwatch_metric_alarm" "message_alarm_high" {
  alarm_name = "ecs_batch_FiveSecondsApproximateNumberOfMessages_high"

  comparison_operator = "GreaterThanThreshold"
  evaluation_periods  = 1
  metric_name         = "ApproximateNumberOfMessagesVisible"
  namespace           = "AWS/SQS"
  period              = 10
  statistic           = "Maximum"

  threshold = 1

  dimensions = {
    QueueName = var.sqs_queue_name
  }

  alarm_actions = [aws_appautoscaling_policy.scale_out.arn]
}
Enter fullscreen mode Exit fullscreen mode

This code works as follows

  • Monitors the ApproximateNumberOfMessagesVisible metric, which represents the number of pending messages, and sends alerts to the autoscaling policy when thresholds are exceeded
  • The aws_appautoscaling_policy's step_adjustment conditions determine how many ECS tasks to set as the total count. Pay attention to the calculation formulas for lower_bound and upper_bound (inclusive vs exclusive)

Scale-in can be configured similarly.

resource "aws_appautoscaling_policy" "scale_in" {
  name               = "ecs-batch-${aws_ecs_service.batch.name}-scale-in"
  policy_type        = "StepScaling"
  resource_id        = aws_appautoscaling_target.ecs_target.resource_id
  scalable_dimension = aws_appautoscaling_target.ecs_target.scalable_dimension
  service_namespace  = aws_appautoscaling_target.ecs_target.service_namespace

  step_scaling_policy_configuration {
    adjustment_type         = "ExactCapacity"
    cooldown               = 1200
    metric_aggregation_type = "Average"

    # 200 < message <= 300: -1 task
    step_adjustment {
      metric_interval_lower_bound = 200
      metric_interval_upper_bound = 250
      scaling_adjustment         = 2
    }

    # 100 < message <= 200: -2 task
    step_adjustment {
      metric_interval_lower_bound = 100
      metric_interval_upper_bound = 200
      scaling_adjustment         = 1
    }
  }
}

resource "aws_appautoscaling_policy" "scale_in_to_zero" {
  name               = "ecs-batch-${aws_ecs_service.batch.name}-scale-in-to-zero"
  policy_type        = "StepScaling"
  resource_id        = aws_appautoscaling_target.ecs_target.resource_id
  scalable_dimension = aws_appautoscaling_target.ecs_target.scalable_dimension
  service_namespace  = aws_appautoscaling_target.ecs_target.service_namespace

  step_scaling_policy_configuration {
    adjustment_type         = "ExactCapacity"
    cooldown               = 1200
    metric_aggregation_type = "Average"

    step_adjustment {
      metric_interval_lower_bound = null
      metric_interval_upper_bound = null
      scaling_adjustment         = 0
    }
  }
}

resource "aws_cloudwatch_metric_alarm" "message_alarm_low" {
  alarm_name          = "ecs_batch_messages_low"
  comparison_operator = "LessThanOrEqualToThreshold"
  evaluation_periods  = 3
  metric_name         = "ApproximateNumberOfMessagesVisible"
  namespace           = "AWS/SQS"
  period              = 60
  statistic           = "Average"
  treat_missing_data  = "notBreaching"
  threshold           = 100

  dimensions = {
    QueueName = var.sqs_queue_name
  }

  alarm_actions = [aws_appautoscaling_policy.scale_in.arn]
}

resource "aws_cloudwatch_metric_alarm" "messages_completely_empty" {
  alarm_name          = "ecs_batch_completely_empty"
  comparison_operator = "GreaterThanOrEqualToThreshold"
  evaluation_periods  = 3
  threshold           = 1
  treat_missing_data  = "notBreaching"

  metric_query {
    id          = "e1"
    expression  = "IF(m1 <= 0 AND m2 <= 0, 1, 0)"
    label       = "Empty Messages Condition"
    return_data = "true"
  }

  metric_query {
    id = "m1"
    metric {
      namespace   = "AWS/SQS"
      metric_name = "ApproximateNumberOfMessagesVisible"
      dimensions = {
        QueueName = var.sqs_queue_name
      }
      stat   = "Average"
      period = 60
    }
  }

  metric_query {
    id = "m2"
    metric {
      namespace   = "AWS/SQS"
      metric_name = "ApproximateNumberOfMessagesNotVisible"
      dimensions = {
        QueueName = var.sqs_queue_name
      }
      stat   = "Average"
      period = 60
    }
  }
  alarm_actions = [aws_appautoscaling_policy.scale_in_to_zero.arn]
}
Enter fullscreen mode Exit fullscreen mode

This basically reverses the scale-out settings to gradually scale in based on message count. When messages reach 0, we want to scale tasks to 0, but we need to combine this with the ApproximateNumberOfMessagesNotVisible metric (representing currently processing messages) to ensure instances don't scale to 0 when messages are still being processed.

System monitoring

Next, let's consider operational monitoring. For this system (and generally), monitoring items should be considered from the perspective of whether the system is operating according to requirements. For this system, the following items come to mind:

  1. Tasks not starting

    • Pending message count ≥ 1 and no ECS tasks running
  2. Error rate monitoring

    • ApproximateNumberOfMessagesNotVisible backlog in SQS
    • Dead Letter Queue (DLQ) transfer rate
  3. Resource utilization

    • ECS task CPU/Memory usage
    • Task OOM detection

Due to space constraints, I can't cover everything in this blog, but as a sample, here's how we implemented the first item using CloudWatch composite metrics:

resource "aws_cloudwatch_metric_alarm" "sqs_has_messages" {
  alarm_name          = "event-ecs-sqs-has-messages"
  comparison_operator = "GreaterThanOrEqualToThreshold"
  evaluation_periods  = "1"
  metric_name         = "ApproximateNumberOfMessagesVisible"
  namespace           = "AWS/SQS"
  period              = "60"
  statistic           = "Average"
  threshold           = "1"
  alarm_description   = "This alarm monitors if SQS queue has messages waiting to be processed"
  dimensions = {
    QueueName = var.aws_sqs_queue_name
  }
}

resource "aws_cloudwatch_metric_alarm" "ecs_no_tasks" {
  alarm_name          = "event-ecs-no-tasks"
  comparison_operator = "LessThanOrEqualToThreshold"
  evaluation_periods  = "1"
  metric_name         = "RunningTaskCount"
  namespace           = "AWS/ECS"
  period              = "60"
  statistic           = "Average"
  threshold           = "0"
  alarm_description   = "This alarm monitors if ECS has no running tasks"
  dimensions = {
    ClusterName = aws_ecs_cluster.batch.name
    ServiceName = aws_ecs_service.batch.name
  }
}

resource "aws_cloudwatch_composite_alarm" "sqs_messages_but_no_ecs" {
  alarm_name        = "sqs-messages-but-no-ecs"
  alarm_description = "Alarm when SQS has messages but no ECS tasks are running"

  alarm_rule = "ALARM(${aws_cloudwatch_metric_alarm.sqs_has_messages.alarm_name}) AND ALARM(${aws_cloudwatch_metric_alarm.ecs_no_tasks.alarm_name})"

  alarm_actions = [var.monitoring_sqs_arn]
  ok_actions    = [var.monitoring_sqs_arn]
}
Enter fullscreen mode Exit fullscreen mode

Alternative Approaches

While we've introduced the complete system, there are still cases where problems can occur. This happens when one tenant has significantly larger data compared to other tenants, causing that tenant's data processing to take longer and affect other tenants. This is known as the "noisy neighbors" problem, which is well-known in multi-tenant systems.
To solve this problem, you need to provide separate queues and servers for each tenant.

Image description

However, compared to tenant-shared systems, this approach requires more servers and increases costs in various ways. Deciding which system approach to take is difficult, but it's probably best to start with a simple implementation and make comprehensive decisions while monitoring performance. If you have good decision-making methods for this or any other alternative ways to resolve this multi-tenant problem, I'd appreciate comments sharing them.

Conclusion

That's it. We were able to create a simple yet scalable event-driven system using AWS. I hope this article helps with your development efforts.

ACI image

ACI.dev: The Only MCP Server Your AI Agents Need

ACI.dev’s open-source tool-use platform and Unified MCP Server turns 600+ functions into two simple MCP tools on one server—search and execute. Comes with multi-tenant auth and natural-language permission scopes. 100% open-source under Apache 2.0.

Star our GitHub!

Top comments (0)

👋 Kindness is contagious

Embark on this engaging article, highly regarded by the DEV Community. Whether you're a newcomer or a seasoned pro, your contributions help us grow together.

A heartfelt "thank you" can make someone’s day—drop your kudos below!

On DEV, sharing insights ignites innovation and strengthens our bonds. If this post resonated with you, a quick note of appreciation goes a long way.

Get Started