Introduction
I previously built an event-driven system on AWS and would like to introduce it here. The event-driven approach I'm referring to is an architecture that processes events triggered by DynamoDB updates and similar events.
As a prerequisite, we're building a multi-tenant system where servers are shared but each tenant has its own DynamoDB table.
This blog can be read in about 5-10 minutes and should be helpful for those who want to learn system architecture on AWS or understand SQS auto-scaling functionality.
System Architecture Overview
Originally, we were operating the following system:
Lambda functions were triggered based on DynamoDB streams events to perform processing. While this met the requirement of processing events as they occurred, it had the following problems:
- When event sizes are too large, Lambda may timeout before processing completes within the execution time limit. Additionally, limited CPU and memory resources could lead to OOM (Out of Memory) errors.
- Events could be lost in the following scenarios:
- When there are too many events to process within DynamoDB Streams' record retention period (24 hours)
- When ECS cannot operate normally due to AWS outages or other issues
To address these issues, we made the following improvements:
- Replaced Lambda with ECS
- Added SQS between DynamoDB and ECS
These changes provide the following benefits:
- Using a platform without time constraints allows processing to continue until completion, even with large inputs
- Larger machine resources prevent CPU and memory-related issues that Lambda couldn't handle
- When event processing fails, events are stored in the queue for retry once the system returns to normal
The updated architecture looks like this:
The pipeline between DynamoDB and SQS uses a service called EventBridge Pipes. This eliminates the need to create Lambda functions for event sending and allows filtering and error handling (previously done in code) to be handled without code, reducing maintenance costs.
Here's a sample Terraform code using the aws_pipes_pipe
resource:
###########################
# Event
###########################
resource "aws_pipes_pipe" "event_bulk" {
depends_on = [aws_iam_role_policy.event_pipe_role]
name = "event-sqs"
role_arn = aws_iam_role.event_pipe_role.arn
source = aws_dynamodb_table.events.stream_arn
target = var.sqs_event_queue_arn
source_parameters {
dynamodb_stream_parameters {
batch_size = var.batch_size
starting_position = var.starting_position
maximum_record_age_in_seconds = 86400
}
filter_criteria {
filter {
pattern = jsonencode({
eventName : ["INSERT"]
dynamodb = {
"NewImage" : {
"status" : { "S" : ["pending"] }
}
}
})
}
}
}
}
###########################
# SQS
###########################
resource "aws_sqs_queue" "event" {
name = "${var.batch_container_name}-sqs"
delay_seconds = var.sqs_delay_seconds
max_message_size = var.sqs_max_message_size
message_retention_seconds = var.sqs_message_retention_seconds
receive_wait_time_seconds = var.sqs_receive_wait_time_seconds
visibility_timeout_seconds = var.sqs_visibility_timeout_seconds
sqs_managed_sse_enabled = true
}
###########################
# DynamoDB
###########################
resource "aws_dynamodb_table" "events" {
name = "events"
billing_mode = "PAY_PER_REQUEST"
hash_key = "_global_id"
range_key = "_event_type"
stream_enabled = true
stream_view_type = "NEW_AND_OLD_IMAGES"
}
###########################
# IAM
###########################
resource "aws_iam_role" "event_pipe_role" {
name = "event-pipe-role"
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Effect = "Allow"
Principal = {
Service = "pipes.amazonaws.com"
}
Action = "sts:AssumeRole"
Condition = {
StringEquals = {
"aws:SourceAccount" = var.aws_account_id
}
}
}
]
})
}
resource "aws_iam_role_policy" "event_pipe_role" {
name = "event-pipe-role-policy"
role = aws_iam_role.event_pipe_role.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Effect = "Allow"
Action = [
"dynamodb:DescribeStream",
"dynamodb:GetRecords",
"dynamodb:GetShardIterator",
"dynamodb:ListStreams"
]
Resource = aws_dynamodb_table.events.stream_arn
},
{
Effect = "Allow"
Action = [
"sqs:SendMessage"
]
Resource = aws_sqs_queue.event.arn
}
]
})
}
The application code within the ECS task is simplified but looks like this.
import json
import logging
import os
import time
import boto3
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Get the SQS queue URL from environment variables
QUEUE_URL = os.environ.get("SQS_QUEUE_URL")
# Initialize the SQS client
sqs = boto3.client("sqs", region_name=os.environ.get("AWS_REGION", "ap-northeast-1"))
def process_event(event: dict):
"""
Business logic to process a single event.
Replace this with your own implementation.
"""
logger.info(f"Processing event: {event}")
# Simulate some processing time
time.sleep(1)
# You can raise an exception to simulate a failure
# raise Exception("Simulated failure")
def main():
"""
Poll messages from the SQS queue and process them one by one.
"""
while True:
try:
response = sqs.receive_message(
QueueUrl=QUEUE_URL,
MaxNumberOfMessages=10,
WaitTimeSeconds=20, # Enable long polling
VisibilityTimeout=60 # Time for processing before making the message visible again
)
messages = response.get("Messages", [])
if not messages:
logger.info("No messages received. Waiting...")
continue
for message in messages:
try:
# Parse the message body
body = json.loads(message["Body"])
logger.info(f"Received message: {body}")
# If EventBridge Pipes is used, actual DynamoDB record is nested inside
# Uncomment and adjust based on actual structure
# dynamodb_record = json.loads(body["detail"])["dynamodb"]
process_event(body)
# Delete the message only if processing succeeds
sqs.delete_message(
QueueUrl=QUEUE_URL,
ReceiptHandle=message["ReceiptHandle"]
)
logger.info("Message processed and deleted successfully.")
except Exception as e:
logger.error(f"Error processing message: {e}", exc_info=True)
# Do not delete the message to allow for retry
except Exception as e:
logger.error(f"Error polling messages: {e}", exc_info=True)
# Wait a bit before retrying on polling failure
time.sleep(10)
There are several considerations for this system, but determining how many ECS tasks to launch is one of the main topics. The next section explains this in detail.
ECS Task Auto Scaling
For cost and performance optimization, we want to dynamically adjust the number of ECS tasks based on the number of messages to process. Since ECS can auto-scale based on specific events, we configured it to auto-scale based on SQS message count metrics.
Step scaling
We use Step scaling policies for auto-scaling. Step scaling scales in or out based on specified thresholds. You can configure scaling settings like:
- 1~100 messages -> Set total task to 1
- 101~200件 messages -> Set total tasks to 2
- 201~300件 messages -> Set total tasks to 3
Here's the terraform code for scale-out:
resource "aws_appautoscaling_target" "ecs_target" {
max_capacity = 3 // Maximum number of tasks that can be run in the ECS service.
min_capacity = 0
resource_id = "service/${aws_ecs_cluster.batch.name}/${aws_ecs_service.batch.name}"
scalable_dimension = "ecs:service:DesiredCount"
service_namespace = "ecs"
}
resource "aws_appautoscaling_policy" "scale_out" {
name = "ecs-batch-scale-out"
policy_type = "StepScaling"
resource_id = aws_appautoscaling_target.ecs_target.resource_id
scalable_dimension = aws_appautoscaling_target.ecs_target.scalable_dimension
service_namespace = aws_appautoscaling_target.ecs_target.service_namespace
step_scaling_policy_configuration {
adjustment_type = "ExactCapacity"
cooldown = 60
metric_aggregation_type = "Average"
# scale out to 1 task: 0 < metrics value <= 100
step_adjustment {
metric_interval_lower_bound = 0 # Applied when greater than this value (boundary not included)
metric_interval_upper_bound = 100 # Applied when less than or equal to this value (boundary included)
scaling_adjustment = 1
}
# scale out to 2 tasks: 100 < metrics value <= 200
step_adjustment {
metric_interval_lower_bound = 101
metric_interval_upper_bound = 200
scaling_adjustment = 2
}
# scale out to 3 tasks: 200 < metrics value
step_adjustment {
metric_interval_lower_bound = 201
scaling_adjustment = 3
}
}
}
resource "aws_cloudwatch_metric_alarm" "message_alarm_high" {
alarm_name = "ecs_batch_FiveSecondsApproximateNumberOfMessages_high"
comparison_operator = "GreaterThanThreshold"
evaluation_periods = 1
metric_name = "ApproximateNumberOfMessagesVisible"
namespace = "AWS/SQS"
period = 10
statistic = "Maximum"
threshold = 1
dimensions = {
QueueName = var.sqs_queue_name
}
alarm_actions = [aws_appautoscaling_policy.scale_out.arn]
}
This code works as follows
- Monitors the
ApproximateNumberOfMessagesVisible
metric, which represents the number of pending messages, and sends alerts to the autoscaling policy when thresholds are exceeded - The aws_appautoscaling_policy's step_adjustment conditions determine how many ECS tasks to set as the total count. Pay attention to the calculation formulas for lower_bound and upper_bound (inclusive vs exclusive)
Scale-in can be configured similarly.
resource "aws_appautoscaling_policy" "scale_in" {
name = "ecs-batch-${aws_ecs_service.batch.name}-scale-in"
policy_type = "StepScaling"
resource_id = aws_appautoscaling_target.ecs_target.resource_id
scalable_dimension = aws_appautoscaling_target.ecs_target.scalable_dimension
service_namespace = aws_appautoscaling_target.ecs_target.service_namespace
step_scaling_policy_configuration {
adjustment_type = "ExactCapacity"
cooldown = 1200
metric_aggregation_type = "Average"
# 200 < message <= 300: -1 task
step_adjustment {
metric_interval_lower_bound = 200
metric_interval_upper_bound = 250
scaling_adjustment = 2
}
# 100 < message <= 200: -2 task
step_adjustment {
metric_interval_lower_bound = 100
metric_interval_upper_bound = 200
scaling_adjustment = 1
}
}
}
resource "aws_appautoscaling_policy" "scale_in_to_zero" {
name = "ecs-batch-${aws_ecs_service.batch.name}-scale-in-to-zero"
policy_type = "StepScaling"
resource_id = aws_appautoscaling_target.ecs_target.resource_id
scalable_dimension = aws_appautoscaling_target.ecs_target.scalable_dimension
service_namespace = aws_appautoscaling_target.ecs_target.service_namespace
step_scaling_policy_configuration {
adjustment_type = "ExactCapacity"
cooldown = 1200
metric_aggregation_type = "Average"
step_adjustment {
metric_interval_lower_bound = null
metric_interval_upper_bound = null
scaling_adjustment = 0
}
}
}
resource "aws_cloudwatch_metric_alarm" "message_alarm_low" {
alarm_name = "ecs_batch_messages_low"
comparison_operator = "LessThanOrEqualToThreshold"
evaluation_periods = 3
metric_name = "ApproximateNumberOfMessagesVisible"
namespace = "AWS/SQS"
period = 60
statistic = "Average"
treat_missing_data = "notBreaching"
threshold = 100
dimensions = {
QueueName = var.sqs_queue_name
}
alarm_actions = [aws_appautoscaling_policy.scale_in.arn]
}
resource "aws_cloudwatch_metric_alarm" "messages_completely_empty" {
alarm_name = "ecs_batch_completely_empty"
comparison_operator = "GreaterThanOrEqualToThreshold"
evaluation_periods = 3
threshold = 1
treat_missing_data = "notBreaching"
metric_query {
id = "e1"
expression = "IF(m1 <= 0 AND m2 <= 0, 1, 0)"
label = "Empty Messages Condition"
return_data = "true"
}
metric_query {
id = "m1"
metric {
namespace = "AWS/SQS"
metric_name = "ApproximateNumberOfMessagesVisible"
dimensions = {
QueueName = var.sqs_queue_name
}
stat = "Average"
period = 60
}
}
metric_query {
id = "m2"
metric {
namespace = "AWS/SQS"
metric_name = "ApproximateNumberOfMessagesNotVisible"
dimensions = {
QueueName = var.sqs_queue_name
}
stat = "Average"
period = 60
}
}
alarm_actions = [aws_appautoscaling_policy.scale_in_to_zero.arn]
}
This basically reverses the scale-out settings to gradually scale in based on message count. When messages reach 0, we want to scale tasks to 0, but we need to combine this with the ApproximateNumberOfMessagesNotVisible
metric (representing currently processing messages) to ensure instances don't scale to 0 when messages are still being processed.
System monitoring
Next, let's consider operational monitoring. For this system (and generally), monitoring items should be considered from the perspective of whether the system is operating according to requirements. For this system, the following items come to mind:
-
Tasks not starting
- Pending message count ≥ 1 and no ECS tasks running
-
Error rate monitoring
- ApproximateNumberOfMessagesNotVisible backlog in SQS
- Dead Letter Queue (DLQ) transfer rate
-
Resource utilization
- ECS task CPU/Memory usage
- Task OOM detection
Due to space constraints, I can't cover everything in this blog, but as a sample, here's how we implemented the first item using CloudWatch composite metrics:
resource "aws_cloudwatch_metric_alarm" "sqs_has_messages" {
alarm_name = "event-ecs-sqs-has-messages"
comparison_operator = "GreaterThanOrEqualToThreshold"
evaluation_periods = "1"
metric_name = "ApproximateNumberOfMessagesVisible"
namespace = "AWS/SQS"
period = "60"
statistic = "Average"
threshold = "1"
alarm_description = "This alarm monitors if SQS queue has messages waiting to be processed"
dimensions = {
QueueName = var.aws_sqs_queue_name
}
}
resource "aws_cloudwatch_metric_alarm" "ecs_no_tasks" {
alarm_name = "event-ecs-no-tasks"
comparison_operator = "LessThanOrEqualToThreshold"
evaluation_periods = "1"
metric_name = "RunningTaskCount"
namespace = "AWS/ECS"
period = "60"
statistic = "Average"
threshold = "0"
alarm_description = "This alarm monitors if ECS has no running tasks"
dimensions = {
ClusterName = aws_ecs_cluster.batch.name
ServiceName = aws_ecs_service.batch.name
}
}
resource "aws_cloudwatch_composite_alarm" "sqs_messages_but_no_ecs" {
alarm_name = "sqs-messages-but-no-ecs"
alarm_description = "Alarm when SQS has messages but no ECS tasks are running"
alarm_rule = "ALARM(${aws_cloudwatch_metric_alarm.sqs_has_messages.alarm_name}) AND ALARM(${aws_cloudwatch_metric_alarm.ecs_no_tasks.alarm_name})"
alarm_actions = [var.monitoring_sqs_arn]
ok_actions = [var.monitoring_sqs_arn]
}
Alternative Approaches
While we've introduced the complete system, there are still cases where problems can occur. This happens when one tenant has significantly larger data compared to other tenants, causing that tenant's data processing to take longer and affect other tenants. This is known as the "noisy neighbors" problem, which is well-known in multi-tenant systems.
To solve this problem, you need to provide separate queues and servers for each tenant.
However, compared to tenant-shared systems, this approach requires more servers and increases costs in various ways. Deciding which system approach to take is difficult, but it's probably best to start with a simple implementation and make comprehensive decisions while monitoring performance. If you have good decision-making methods for this or any other alternative ways to resolve this multi-tenant problem, I'd appreciate comments sharing them.
Conclusion
That's it. We were able to create a simple yet scalable event-driven system using AWS. I hope this article helps with your development efforts.
Top comments (0)