DEV Community

Cover image for AWS Fargate and SQS for Long-Running Jobs with TypeScript
Oleksandr Hanhaliuk for AWS Community Builders

Posted on • Originally published at Medium

5 1 1 1

AWS Fargate and SQS for Long-Running Jobs with TypeScript

This article will explain my experience in setting up the AWS ECS Fargate container which handles messages from SQS. The reason for having Fargate handle messages instead of lambda is that the message represents a long-running job which is usually longer than 15 minutes.

Setting up ECS Fargate infrastructure

Key Components Creation:

Cluster: A Fargate cluster is initialized to house the message processing tasks.
Task Definition: This definition specifies the computing resources required and points to the Dockerfile that contains the application code.
Fargate Service: Represents the service running on Fargate, facilitating public access and defining service scalability.
SQS Queue: An SQS queue is set up to hold the messages to be processed.

I. Import all needed libraries

import {
  Cluster,
  ContainerImage,
  FargateService,
  FargateTaskDefinition,
  LogDrivers,
  PropagatedTagSource,
} from 'aws-cdk-lib/aws-ecs'

import * as sqs from 'aws-cdk-lib/aws-sqs'

import { Duration, Stack } from 'aws-cdk-lib'
import {
  AdjustmentType,
  MetricAggregationType,
  ScalableTarget,
  StepScalingAction,
} from 'aws-cdk-lib/aws-applicationautoscaling'
import { Alarm, ComparisonOperator, MathExpression } from 'aws-cdk-lib/aws-cloudwatch'
import { ApplicationScalingAction } from 'aws-cdk-lib/aws-cloudwatch-actions'
Enter fullscreen mode Exit fullscreen mode

II. Create cluster

// Create fargate cluster
  const cluster = new Cluster(stack, 'SQS_PROCESSING_CLUSTER', {
    // vpc: define your VPC here if needed,
    clusterName: 'SQS_PROCESSING_CLUSTER',
  })
Enter fullscreen mode Exit fullscreen mode

III. Create task definition

  // Create a Task Definition for the container to start
  const taskDefinition = new FargateTaskDefinition(stack, 'SQS_PROCESSING_CONTAINER', {
    memoryLimitMiB: 4096,
    cpu: 1024,
  })
  taskDefinition.addContainer('SQS_PROCESSING_CONTAINER', {
    // Path to docker file in your local repository
    image: ContainerImage.fromAsset(`./src/`, { 
      file: 'Dockerfile',
    }),
    environment: {
       // put your environment
    },
    logging: LogDrivers.awsLogs({
      streamPrefix: '/fargate/sqs-processing/log-group',
      logRetention: aws_logs.RetentionDays.THREE_DAYS,
    }),
  })
Enter fullscreen mode Exit fullscreen mode

IV. Create a Fargate service

  // Create a load-balanced Fargate service and make it public
  const service = new FargateService(stack, 'sqs-processing-service', {
    cluster: cluster, // Cluster from step 1
    taskDefinition: taskDefinition, // Task definition from step 2 
    serviceName: 'sqs-processing-service',
    assignPublicIp: true,
    minHealthyPercent: 100,
    maxHealthyPercent: 400,
    propagateTags: PropagatedTagSource.SERVICE,
    enableExecuteCommand: false,
  })
Enter fullscreen mode Exit fullscreen mode

V. Create SQS

  const queue = new sqs.Queue(stack, 'sqs-id', {
    queueName: 'sqs-name',
    visibilityTimeout: Duration.seconds(300),
  })
Enter fullscreen mode Exit fullscreen mode

VI. Allow Fargate to consume SQS message

queue.grantConsumeMessages(service.taskDefinition.taskRole)
Enter fullscreen mode Exit fullscreen mode

Fargate autoscaling

To optimize resource utilization and costs, the setup includes autoscaling configurations. These ensure that the Fargate tasks scale up when messages are pending in the SQS queue and scale down once all messages have been processed or when no messages are present.

Scaling up

  const scalingTarget = service.autoScaleTaskCount({
    maxCapacity: 10,
    minCapacity: 0,
  })

  // Scale up when SQS has visible messages
  const numberOfMessagesVisibleMetric = queue.metricApproximateNumberOfMessagesVisible({
    period: Duration.seconds(30),
  })

  // Scale up if queue has messages
  scalingTarget.scaleOnMetric('QueueMessagesVisibleScaling', {
    metric: numberOfMessagesVisibleMetric,
    evaluationPeriods: 1,
    scalingSteps: [
      { upper: 0, change: 0 }, // DO not tear down task if there is no messages visible in SQS
      {
        lower: 1,
        upper: 5,
        change: +1,
      },
      {
        lower: 5,
        upper: 10,
        change: +5,
      },
      {
        lower: 10,
        change: +5,
      },
    ],
    cooldown: Duration.seconds(90),
  })
Enter fullscreen mode Exit fullscreen mode

Scale down

Combine 3 metrics to avoid scaling down during message processing


  const messagesSentMetric = sqsQueue.metricNumberOfMessagesSent({
    period: Duration.seconds(30),
  })

  const messagesVisibleMetric = sqsQueue.metricApproximateNumberOfMessagesVisible({
    period: Duration.seconds(30),
  })

  const messagesNotVisibleMetric = sqsQueue.metricApproximateNumberOfMessagesNotVisible({
    period: Duration.seconds(30),
  })

  // Combine all metrics to create a custom metric using a math expression
  // The expression adds the 3 metrics, and we expect the sum to be zero to scale down.
  const customMetric = new MathExpression({
    expression: 'messagesSent + messagesVisible + messagesNotVisible',
    usingMetrics: {
      messagesSent: messagesSentMetric,
      messagesVisible: messagesVisibleMetric,
      messagesNotVisible: messagesNotVisibleMetric,
    },
    period: Duration.seconds(30),
  })
Enter fullscreen mode Exit fullscreen mode

Create alarm and scalable target


  // Add an alarm to our Fargate service CPU usage
  const scaleInInit = new Alarm(stack, 'ScaleDownAlarm', {
    alarmDescription: 'When there is no messages in SQS, scale down to 0',
    metric: customMetric,
    evaluationPeriods: 10,
    datapointsToAlarm: 10,
    threshold: 0,
    comparisonOperator: ComparisonOperator.LESS_THAN_OR_EQUAL_TO_THRESHOLD,
  })

  // Define our auto scaling target for our Fargate service
  const scalableTarget = ScalableTarget.fromScalableTargetId(
    stack,
    'ZeroSQSMessage-ScalableTarget',
    `service/${service.cluster.clusterName}/${service.serviceName}|ecs:service:DesiredCount|ecs`
  )
Enter fullscreen mode Exit fullscreen mode

Create step scaling action and alarm action


  // Define the action taken on our scaling target
  const scalingAction = new StepScalingAction(stack, 'scaleToZero', {
    scalingTarget: scalableTarget,
    adjustmentType: AdjustmentType.EXACT_CAPACITY,
    metricAggregationType: MetricAggregationType.MAXIMUM,
    cooldown: Duration.minutes(5),
  })

  // Create the adjustment made by our action
  scalingAction.addAdjustment({
    adjustment: 0,
    upperBound: 0,
  })

  // Finally add our alarm action to our Fargate alarm
  scaleInInit.addAlarmAction(new ApplicationScalingAction(scalingAction))
Enter fullscreen mode Exit fullscreen mode

Bussines code implementation

Now that our infra is set, it is time to write business code to process messages

Docker file setup

The Dockerfile is carefully crafted to prepare the Node environment, execute the necessary installations, and set the application to run indefinitely, constantly listening for SQS messages to process.

FROM --platform=linux/amd64 node:18 AS appbuild
WORKDIR /app

COPY package*.json ./
RUN npm ci

COPY ./ /app/

RUN npm run build

FROM node:18

WORKDIR /app
COPY --from=appbuild /app/ .
RUN npm prune production
ENV NODE_ENV=production


CMD ["node",  "/app/fargate/index.js" ]
Enter fullscreen mode Exit fullscreen mode

Main business index.ts

A TypeScript file (index.ts) defines the core logic for message reception, processing, and deletion. This involves interacting with the SQS service through AWS SDK commands, specifically focusing on receiving and deleting messages.

import 'dotenv/config'
import { Message } from '@aws-sdk/client-sqs'

export async function runApp() {
  while (true) { // here we can also add delay
    await processMessages()
  }
}

export async function processMessages() {
  let messages: Message[] | undefined
  try {

    const command = new ReceiveMessageCommand({
      QueueUrl: 'sqs-url',
      WaitTimeSeconds: 20, // time to consume messages
      MaxNumberOfMessages: 1,
    })

    const response = await sqsClient.send(command)

    messages = response.Messages
    if (!messages || messages.length === 0) {
      console.warn('No messages received!')
      return
    }

    console.log('Received messages:', { messages })
    for (const msg of messages) {
      try {
        if (!msg.Body) {
          throw new Error('No body in message')
        }
        // here you need to handle you SQS message
      } catch (e) {
        console.error('Error processing message', { error: e, message: msg })
        // Optionally you can send message to Dead-letter SQS
      }

      // Now message is processed, we need to delete it
        const deleteCommand = new DeleteMessageCommand({
          QueueUrl: 'sqs-url',
          ReceiptHandle: message.ReceiptHandle,
        })

        await sqsClient.send(deleteCommand)
    }
  } catch (e) {
    // Uncaught error
    throw new Error('Error processing messages')
  }
}

void runApp()
Enter fullscreen mode Exit fullscreen mode

Summary

This article outlines how to set up an AWS ECS Fargate container to process long-running jobs using messages from an SQS queue, an approach especially useful when tasks exceed the 15-minute execution limit of AWS Lambda. It covers the initial steps of setting up the ECS Fargate infrastructure, including creating a Fargate cluster, task definition, service, and SQS queue, followed by enabling autoscaling based on the queue’s workload. Additionally, it describes implementing business logic within a Docker container to continuously process and manage SQS messages. This setup leverages the scalability and flexibility of AWS services to efficiently handle long-running tasks while optimizing resource usage and costs.


Additional Considerations

When implementing this solution, consider monitoring and alerting mechanisms to keep track of the system’s health and performance. Employing AWS CloudWatch can provide insights into task execution metrics and trigger alarms in case of anomalies. Additionally, exploring the integration of dead-letter queues (DLQs) could further bolster the system’s resilience by handling messages that cannot be processed successfully.

Top comments (0)

Best Practices for Running  Container WordPress on AWS (ECS, EFS, RDS, ELB) using CDK cover image

Best Practices for Running Container WordPress on AWS (ECS, EFS, RDS, ELB) using CDK

This post discusses the process of migrating a growing WordPress eShop business to AWS using AWS CDK for an easily scalable, high availability architecture. The detailed structure encompasses several pillars: Compute, Storage, Database, Cache, CDN, DNS, Security, and Backup.

Read full post

👋 Kindness is contagious

Found this post helpful? A ❤️ or a friendly comment is always appreciated!

Okay