This article will explain my experience in setting up the AWS ECS Fargate container which handles messages from SQS. The reason for having Fargate handle messages instead of lambda is that the message represents a long-running job which is usually longer than 15 minutes.
Setting up ECS Fargate infrastructure
Key Components Creation:
Cluster: A Fargate cluster is initialized to house the message processing tasks.
Task Definition: This definition specifies the computing resources required and points to the Dockerfile that contains the application code.
Fargate Service: Represents the service running on Fargate, facilitating public access and defining service scalability.
SQS Queue: An SQS queue is set up to hold the messages to be processed.
I. Import all needed libraries
import {
Cluster,
ContainerImage,
FargateService,
FargateTaskDefinition,
LogDrivers,
PropagatedTagSource,
} from 'aws-cdk-lib/aws-ecs'
import * as sqs from 'aws-cdk-lib/aws-sqs'
import { Duration, Stack } from 'aws-cdk-lib'
import {
AdjustmentType,
MetricAggregationType,
ScalableTarget,
StepScalingAction,
} from 'aws-cdk-lib/aws-applicationautoscaling'
import { Alarm, ComparisonOperator, MathExpression } from 'aws-cdk-lib/aws-cloudwatch'
import { ApplicationScalingAction } from 'aws-cdk-lib/aws-cloudwatch-actions'
II. Create cluster
// Create fargate cluster
const cluster = new Cluster(stack, 'SQS_PROCESSING_CLUSTER', {
// vpc: define your VPC here if needed,
clusterName: 'SQS_PROCESSING_CLUSTER',
})
III. Create task definition
// Create a Task Definition for the container to start
const taskDefinition = new FargateTaskDefinition(stack, 'SQS_PROCESSING_CONTAINER', {
memoryLimitMiB: 4096,
cpu: 1024,
})
taskDefinition.addContainer('SQS_PROCESSING_CONTAINER', {
// Path to docker file in your local repository
image: ContainerImage.fromAsset(`./src/`, {
file: 'Dockerfile',
}),
environment: {
// put your environment
},
logging: LogDrivers.awsLogs({
streamPrefix: '/fargate/sqs-processing/log-group',
logRetention: aws_logs.RetentionDays.THREE_DAYS,
}),
})
IV. Create a Fargate service
// Create a load-balanced Fargate service and make it public
const service = new FargateService(stack, 'sqs-processing-service', {
cluster: cluster, // Cluster from step 1
taskDefinition: taskDefinition, // Task definition from step 2
serviceName: 'sqs-processing-service',
assignPublicIp: true,
minHealthyPercent: 100,
maxHealthyPercent: 400,
propagateTags: PropagatedTagSource.SERVICE,
enableExecuteCommand: false,
})
V. Create SQS
const queue = new sqs.Queue(stack, 'sqs-id', {
queueName: 'sqs-name',
visibilityTimeout: Duration.seconds(300),
})
VI. Allow Fargate to consume SQS message
queue.grantConsumeMessages(service.taskDefinition.taskRole)
Fargate autoscaling
To optimize resource utilization and costs, the setup includes autoscaling configurations. These ensure that the Fargate tasks scale up when messages are pending in the SQS queue and scale down once all messages have been processed or when no messages are present.
Scaling up
const scalingTarget = service.autoScaleTaskCount({
maxCapacity: 10,
minCapacity: 0,
})
// Scale up when SQS has visible messages
const numberOfMessagesVisibleMetric = queue.metricApproximateNumberOfMessagesVisible({
period: Duration.seconds(30),
})
// Scale up if queue has messages
scalingTarget.scaleOnMetric('QueueMessagesVisibleScaling', {
metric: numberOfMessagesVisibleMetric,
evaluationPeriods: 1,
scalingSteps: [
{ upper: 0, change: 0 }, // DO not tear down task if there is no messages visible in SQS
{
lower: 1,
upper: 5,
change: +1,
},
{
lower: 5,
upper: 10,
change: +5,
},
{
lower: 10,
change: +5,
},
],
cooldown: Duration.seconds(90),
})
Scale down
Combine 3 metrics to avoid scaling down during message processing
const messagesSentMetric = sqsQueue.metricNumberOfMessagesSent({
period: Duration.seconds(30),
})
const messagesVisibleMetric = sqsQueue.metricApproximateNumberOfMessagesVisible({
period: Duration.seconds(30),
})
const messagesNotVisibleMetric = sqsQueue.metricApproximateNumberOfMessagesNotVisible({
period: Duration.seconds(30),
})
// Combine all metrics to create a custom metric using a math expression
// The expression adds the 3 metrics, and we expect the sum to be zero to scale down.
const customMetric = new MathExpression({
expression: 'messagesSent + messagesVisible + messagesNotVisible',
usingMetrics: {
messagesSent: messagesSentMetric,
messagesVisible: messagesVisibleMetric,
messagesNotVisible: messagesNotVisibleMetric,
},
period: Duration.seconds(30),
})
Create alarm and scalable target
// Add an alarm to our Fargate service CPU usage
const scaleInInit = new Alarm(stack, 'ScaleDownAlarm', {
alarmDescription: 'When there is no messages in SQS, scale down to 0',
metric: customMetric,
evaluationPeriods: 10,
datapointsToAlarm: 10,
threshold: 0,
comparisonOperator: ComparisonOperator.LESS_THAN_OR_EQUAL_TO_THRESHOLD,
})
// Define our auto scaling target for our Fargate service
const scalableTarget = ScalableTarget.fromScalableTargetId(
stack,
'ZeroSQSMessage-ScalableTarget',
`service/${service.cluster.clusterName}/${service.serviceName}|ecs:service:DesiredCount|ecs`
)
Create step scaling action and alarm action
// Define the action taken on our scaling target
const scalingAction = new StepScalingAction(stack, 'scaleToZero', {
scalingTarget: scalableTarget,
adjustmentType: AdjustmentType.EXACT_CAPACITY,
metricAggregationType: MetricAggregationType.MAXIMUM,
cooldown: Duration.minutes(5),
})
// Create the adjustment made by our action
scalingAction.addAdjustment({
adjustment: 0,
upperBound: 0,
})
// Finally add our alarm action to our Fargate alarm
scaleInInit.addAlarmAction(new ApplicationScalingAction(scalingAction))
Bussines code implementation
Now that our infra is set, it is time to write business code to process messages
Docker file setup
The Dockerfile is carefully crafted to prepare the Node environment, execute the necessary installations, and set the application to run indefinitely, constantly listening for SQS messages to process.
FROM --platform=linux/amd64 node:18 AS appbuild
WORKDIR /app
COPY package*.json ./
RUN npm ci
COPY ./ /app/
RUN npm run build
FROM node:18
WORKDIR /app
COPY --from=appbuild /app/ .
RUN npm prune production
ENV NODE_ENV=production
CMD ["node", "/app/fargate/index.js" ]
Main business index.ts
A TypeScript file (index.ts) defines the core logic for message reception, processing, and deletion. This involves interacting with the SQS service through AWS SDK commands, specifically focusing on receiving and deleting messages.
import 'dotenv/config'
import { Message } from '@aws-sdk/client-sqs'
export async function runApp() {
while (true) { // here we can also add delay
await processMessages()
}
}
export async function processMessages() {
let messages: Message[] | undefined
try {
const command = new ReceiveMessageCommand({
QueueUrl: 'sqs-url',
WaitTimeSeconds: 20, // time to consume messages
MaxNumberOfMessages: 1,
})
const response = await sqsClient.send(command)
messages = response.Messages
if (!messages || messages.length === 0) {
console.warn('No messages received!')
return
}
console.log('Received messages:', { messages })
for (const msg of messages) {
try {
if (!msg.Body) {
throw new Error('No body in message')
}
// here you need to handle you SQS message
} catch (e) {
console.error('Error processing message', { error: e, message: msg })
// Optionally you can send message to Dead-letter SQS
}
// Now message is processed, we need to delete it
const deleteCommand = new DeleteMessageCommand({
QueueUrl: 'sqs-url',
ReceiptHandle: message.ReceiptHandle,
})
await sqsClient.send(deleteCommand)
}
} catch (e) {
// Uncaught error
throw new Error('Error processing messages')
}
}
void runApp()
Summary
This article outlines how to set up an AWS ECS Fargate container to process long-running jobs using messages from an SQS queue, an approach especially useful when tasks exceed the 15-minute execution limit of AWS Lambda. It covers the initial steps of setting up the ECS Fargate infrastructure, including creating a Fargate cluster, task definition, service, and SQS queue, followed by enabling autoscaling based on the queue’s workload. Additionally, it describes implementing business logic within a Docker container to continuously process and manage SQS messages. This setup leverages the scalability and flexibility of AWS services to efficiently handle long-running tasks while optimizing resource usage and costs.
Additional Considerations
When implementing this solution, consider monitoring and alerting mechanisms to keep track of the system’s health and performance. Employing AWS CloudWatch can provide insights into task execution metrics and trigger alarms in case of anomalies. Additionally, exploring the integration of dead-letter queues (DLQs) could further bolster the system’s resilience by handling messages that cannot be processed successfully.
Top comments (0)