<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>Forem: Aaron O'L</title>
    <description>The latest articles on Forem by Aaron O'L (@aoloughlin).</description>
    <link>https://forem.com/aoloughlin</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F1174059%2Fbe9685a1-ff1b-4b6f-af19-f0e89ecbe3ad.png</url>
      <title>Forem: Aaron O'L</title>
      <link>https://forem.com/aoloughlin</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://forem.com/feed/aoloughlin"/>
    <language>en</language>
    <item>
      <title>Integration testing EventBridge events</title>
      <dc:creator>Aaron O'L</dc:creator>
      <pubDate>Mon, 06 Nov 2023 19:04:32 +0000</pubDate>
      <link>https://forem.com/aoloughlin/integration-testing-eventbridge-events-1pa5</link>
      <guid>https://forem.com/aoloughlin/integration-testing-eventbridge-events-1pa5</guid>
      <description>&lt;h2&gt;
  
  
  Feature to be tested
&lt;/h2&gt;

&lt;p&gt;I have a Lambda that will consume DynamoDB Stream events, transform them, then send them to EventBridge.&lt;/p&gt;

&lt;h2&gt;
  
  
  Testing approach
&lt;/h2&gt;

&lt;p&gt;Many articles mentioned using an SQS queue to consume the events and validate the messages as a solution. Other solutions mentioned using a Pub/Sub, or the &lt;a href="https://github.com/erezrokah/aws-testing-library/blob/main/src/jest/README.md#tohavelog"&gt;aws-testing-library&lt;/a&gt; to check for the presence of CloudWatch log statements.&lt;/p&gt;

&lt;p&gt;I chose the SQS queue option as it seemed like it would allow me to conduct thorough and reliable tests. Also, since I might swap out the Lambda for &lt;a href="https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes.html"&gt;EventBridge Pipes&lt;/a&gt; in the future, there should be no need to change any of these tests; unlike if I watched for CloudWatch logs.&lt;/p&gt;

&lt;h3&gt;
  
  
  Testing EventBridge Events with SQS
&lt;/h3&gt;

&lt;p&gt;The code for the test is below, and it is commented to describe each piece of code in better detail.&lt;/p&gt;

&lt;h4&gt;
  
  
  The test setup
&lt;/h4&gt;

&lt;p&gt;The test requires three resources to be created:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;An SQS Queue to consume the EventBridge Events&lt;/li&gt;
&lt;li&gt;An EventBridge Rule for targeting specific EventBridge events. I will match against a record's ID so that it won't acknowledge any other non-test EB events.&lt;/li&gt;
&lt;li&gt;An EventBridge Target to set the SQS queue as a target for EB events, and to only consume events matching the Eventbridge Rule.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;Once these are created, each test will insert, modify, or delete a record in a DynamoDB table. This will trigger the Lambda which handles DynamoDB Stream Events and sends them to EventBridge.&lt;/p&gt;

&lt;h4&gt;
  
  
  Verifying SQS messages
&lt;/h4&gt;

&lt;p&gt;Once a DynamoDB record has been updated, I use long polling to check the SQS queue for any messages. When messages are received, I verify the message body.&lt;/p&gt;

&lt;p&gt;Since the EventBridge rule is specific to the test case, I can trust that the message received is only meant for the test SQS queue.&lt;/p&gt;

&lt;h4&gt;
  
  
  Cleanup after each test
&lt;/h4&gt;

&lt;p&gt;The test scenario will contain several tests. &lt;br&gt;
As part of the setup for each test a record will be inserted, so that it can be modified. Then, the test will delete the same record as part of it's cleanup. This will create a lot of additional events that I'm not interested in testing. So, I make sure that these messages are cleared before the intended SQS message can be asserted.&lt;/p&gt;

&lt;p&gt;Once all tests have been executed, I delete the test resources and destroy the clients.&lt;/p&gt;

&lt;h4&gt;
  
  
  Final notes before viewing the code
&lt;/h4&gt;

&lt;p&gt;In the code below I create the test resources before each test run. If you are usign infrastructure as code, you might prefer to create and destroy the resources via Terraform, for example.&lt;/p&gt;

&lt;h4&gt;
  
  
  The code
&lt;/h4&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;const dynamoDBTableName = 'dynamo-db-table'
const mockRecordId = 'integration-test-emit-ddb-stream-change-event'
const sqsQueueName = 'integration-test-queue'
const eventBridgeRuleName = `${mockRecordId}-eb-rule-name`
const eventBridgeTargetID = `${mockRecordId}-eb-target-id`
const eventBridgeName = 'event-bridge-events'
const eventSource = 'event-bridge-source-service'

let eventBridgeClient: EventBridgeClient
let sqsClient: SQSClient
let dynamoDbClient: DynamoDBClient
let queueURL: string | undefined

const testRecord: DDBRecord = {
  recordType: 'someType',
  user: 'developer',
  created: '2024-01-01T00:00:00.000Z',
  recordId: mockRecordId,
  recordName: `${mockRecordId}-record-name`,
  version: '0',
  status: 'initialStatus'
}

const setupSQSQueue = async () =&amp;gt; {
  // Create the test queue for consuming EventBridge events
  const createQueueResponse = await sqsClient.send(
    new CreateQueueCommand({
      QueueName: sqsQueueName,
    }),
  )

  // Set the queue URL so that it can be used throughout the tests
  // Alternatively, if the queue is created via IaC, you can use the `GetQueueUrlCommand` command instead.
  queueURL = createQueueResponse.QueueUrl

  // Create an EB rule for the `testRecord` defined above
  await eventBridgeClient.send(
    new PutRuleCommand({
      Name: eventBridgeRuleName,
      EventPattern: JSON.stringify({
        source: [eventSource],
        detail: {
          data: {
            recordId: [mockRecordId],
          },
        },
      }),
      State: 'ENABLED',
      EventBusName: eventBridgeName,
    }),
  )

  // Get the newly created queue's ARN
  const queueAttributes = await sqsClient.send(
    new GetQueueAttributesCommand({
      QueueUrl: queueURL,
      AttributeNames: ['QueueArn'],
    }),
  )

  // Set the queue as a target for EventBridge events
  await eventBridgeClient.send(
    new PutTargetsCommand({
      Rule: eventBridgeRuleName,
      EventBusName: eventBridgeName,
      Targets: [
        {
          Arn: queueAttributes.Attributes?.QueueArn,
          Id: eventBridgeTargetID,
        },
      ],
    }),
  )
}

// Utility function to delete a single message on the queue
const deleteMessageOnQueue = async (receiptHandle: string) =&amp;gt; {
  await sqsClient.send(
    new DeleteMessageCommand({
      QueueUrl: queueURL,
      ReceiptHandle: receiptHandle,
    }),
  )
}

// Utility function to clear a queue of all messages
const clearMessagesOnQueue = async () =&amp;gt; {
  const sqsMessages = await getMessageFromQueue()
  const messagesToDeleteOnCleanup = getSQSMessageReceiptHandles(sqsMessages)
  for (const receiptHandle of messagesToDeleteOnCleanup) {
    await deleteMessageOnQueue(receiptHandle)
  }

  // Rather than using the above method, the `PurgeQueueCommand` command might work for you. 
  // I didn't use this because it could take up to 60 seconds to delete the messages on the queue.
}

const deleteTestRecordFromTable = async () =&amp;gt; {
  await dynamoDbClient.send(
    new DeleteCommand({
      TableName: dynamoDBTableName,
      Key: {
        recordId: mockRecordId,
      },
    }),
  )
}

// Delete the SQS resource
const deleteQueue = async () =&amp;gt; {
  await sqsClient.send(new DeleteQueueCommand({ QueueUrl: queueURL }))
}

// Delete EB Target resource
const deleteEventBridgeTarget = async () =&amp;gt; {
  await eventBridgeClient.send(
    new RemoveTargetsCommand({
      Ids: [eventBridgeTargetID],
      Rule: eventBridgeRuleName,
      Force: true,
      EventBusName: eventBridgeName,
    }),
  )
}

// Delete EB Rule resource
const deleteEventBridgeRule = async () =&amp;gt; {
  await eventBridgeClient.send(
    new DeleteRuleCommand({
      Name: eventBridgeRuleName,
      Force: true,
      EventBusName: eventBridgeName,
    }),
  )
}

const getMessageFromQueue = async () =&amp;gt;
  await sqsClient.send(
    new ReceiveMessageCommand({
      MaxNumberOfMessages: 10,
      QueueUrl: queueURL,
      AttributeNames: ['All'],
      WaitTimeSeconds: 30,
      VisibilityTimeout: 0,
    }),
  )

const getSQSMessageReceiptHandles = (
  sqsMessageCommandOutput: ReceiveMessageCommandOutput,
): string[] =&amp;gt; {
  if (!sqsMessageCommandOutput.Messages) return []

  return sqsMessageCommandOutput.Messages.map((message) =&amp;gt; message.ReceiptHandle).filter(
    (message): message is string =&amp;gt; message !== undefined,
  )
}

// The test block
describe('Events are sent to EventBridge WHEN records are modified in a DynamoDB table', () =&amp;gt; {

  beforeAll(async () =&amp;gt; {
    // Create AWS SDK Clients
    const awsConfig = { // Any AWS Config }
    eventBridgeClient = new EventBridgeClient(awsConfig)
    sqsClient = new SQSClient(awsConfig)
    dynamoDbClient = new DynamoDBClient(awsConfig)

    // Setup test resources
    await setupSQSQueue()
  })

  afterEach(async () =&amp;gt; {
    await deleteTestRecordFromTable()
    await clearMessagesOnQueue()
  })

  afterAll(async () =&amp;gt; {
    await deleteQueue()
    await deleteEventBridgeTarget()
    await deleteEventBridgeRule()
    dynamoDbClient.destroy()
    eventBridgeClient.destroy()
    sqsClient.destroy()
  })

  it(`SHOULD create an event with the recordType 'X' WHEN a record is inserted into the DynamoDB table`, async () =&amp;gt; {
    // Keeping track of messages on the queue so they can be deleted later
    const messagesToDeleteOnCleanup: string[] = []

    // Insert a new record into the table
    await dynamoDbClient.send(
      new PutCommand({
        TableName: dynamoDBTableName,
        Item: testRecord,
      }),
    )

    const sqsMessages = await getMessageFromQueue()

    expect(sqsMessages.Messages).toBeDefined()
    expect(sqsMessages.Messages).toHaveLength(1)

    // Add the messages to the list for deletion on cleanup
   messagesToDeleteOnCleanup.push(...getSQSMessageReceiptHandles(sqsMessages))

    const parsedMessageBody: EventBridgeEventType&amp;lt;'X', DDBRecord&amp;gt; = JSON.parse(
      sqsMessages.Messages?.[0].Body as string,
    )

    expect(parsedMessageBody['record-type']).toEqual('X')
    expect(parsedMessageBody.detail.data).toEqual({
      ...testRecord,
      status: 'ANewStatus'
    })

    // Clenup messages created as part of this test
    for (const receiptHandle of messagesToDeleteOnCleanup) {
      await deleteMessageOnQueue(receiptHandle)
    }
  })

  describe('AND a record already exists in the database', () =&amp;gt; {
    beforeEach(async () =&amp;gt; {
      // Create the record in DynamoDB
      await dynamoDbClient.send(
        new PutCommand({
          TableName: dynamoDBTableName,
          Item: testRecord,
        }),
      )
      await clearMessagesOnQueue()
    })

    it(`SHOULD create an event with the recordType 'Y' WHEN a record is modified and the status is set to 'Z'`, async () =&amp;gt; {
      const messagesToDeleteOnCleanup: string[] = []

      // Update the record in the table to have a failed status
      await dynamoDbClient.send(
        new PutCommand({
          TableName: dynamoDBTableName,
          Item: {
            ...testRecord,
            status: 'Z',
          } as DDBRecord,
        }),
      )

      const sqsMessages = await getMessageFromQueue()

      expect(sqsMessages.Messages).toBeDefined()
      expect(sqsMessages.Messages).toHaveLength(1)

      // Add the messages to the list for deletion on cleanup
      messagesToDeleteOnCleanup.push(...getSQSMessageReceiptHandles(sqsMessages))

      const parsedMessageBody: EventBridgeEventType&amp;lt;'Y', DDBRecord&amp;gt; = JSON.parse(
        sqsMessages.Messages?.[0].Body as string,
      )

      expect(parsedMessageBody['record-type']).toEqual('Y')
      expect(parsedMessageBody.detail.data).toEqual({
        ...testRecord,
        status: 'Z'
      })

      // Clenup messages created as part of this test
      for (const receiptHandle of messagesToDeleteOnCleanup) {
        await deleteMessageOnQueue(receiptHandle)
      }
    })

    ... more tests using the same approach as above
  })
})

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



</description>
      <category>aws</category>
      <category>javascript</category>
      <category>testing</category>
      <category>eventbridge</category>
    </item>
  </channel>
</rss>
