DEV Community

Cover image for Serverless RAG Chat with AppSync Events and Bedrock Knowledge Bases
Zied Ben Tahar for AWS Community Builders

Posted on • Edited on • Originally published at levelup.gitconnected.com

1 1 1

Serverless RAG Chat with AppSync Events and Bedrock Knowledge Bases

When it comes to building serverless WebSocket APIs on AWS, there’s no shortage of options: API Gateway, IoT Core, AppSync GraphQL subscriptions, and now AppSync Events. Each option comes with its own level of control and complexity. I’ve found that AppSync Events to be simplest to work with.

One of the interesting features of AppSync Events is its data sources capability. It lets you directly integrate to resources like DynamoDB, OpenSearch, Bedrock and Lambda. You can interact with these data sources using AppSyncJS (appsync’s own flavor of javascript). But to be totally fair, I lean toward direct lambda integration as it gives more control and makes the development and testing workflow more familiar, standard and manageable.

Currently, Bedrock data source supports only the InvokeModel and Converse APIs. So, if you want to integrate with Knowledge Bases, the viable approach is to create a custom data source using Lambda.

And that’s exactly what this blog post is about, we’ll walk through how to build this RAG-based chat application with AppSync Events and bedrock knowledge based using nodejs, TypeScript and Terraform.

Solution overview

Let’s take a look at how the whole setup fits together:

Architecture overviewArchitecture overview

The Knowledge Base is configured to use PostgreSQL as its vector store where we store the embeddings as well as the associated metadata of the documents we want to index. Using Postgres gives us control over the schema, the indexing strategy, and embedding format, all of which come in handy when fine-tuning a vector-based RAG setup.

We’ve got the handleAppSyncEvents function directly integrated as a data source for the AppSync Events API. Its role is to process incoming events from AppSync and to invoke the retrieveAndGenerate from the Knowledge Base. This function is configured to be asynchronous (with the EVENT invocation type), which means AppSync doesn't wait for the function to complete before returning a response to the client. Once we receive a result from bedrock this function publishes a response back to the client’s response channel.

AppSync Events supports multiple authorization methods to secure Event APIs, including API keys, Lambda authorizers, IAM, OpenID Connect, and Amazon Cognito user pools. In this setup, I’m using both Cognito user pools and IAM:

  • Web clients use cognito for authentication
  • And I chose IAM over using API key for publishing events from the handleAppSyncEvents function to AppSync, as it offers better security posture.

One thing I appreciate in this setup: AppSync Events supports Web ACLs. That means you can easily layer in protections like rate limiting and IP filtering. It’s a nice edge over API Gateway WebSockets, which still doesn’t offer native WAF support.

And tying it all together, the browser connects via WebSocket to AppSync, giving us a real-time, bidirectional channel, ideal for sending the models responses back to users in conversational interfaces.

Let’s dive into the details of the solution; but if you’d like to jump straight to the complete implementation, you can find it here 👇

https://github.com/ziedbentahar/rag-chat-with-appsync-events-and-bedrock-knowledge-bases

Setting up the knowledge base

Let’s first take a look at how we can use Aurora PostgreSQL as a vector store.

Creating the vector store on Postgres

When using PostgreSQL as a vector store, Knowledge Base requires an Aurora Serverless cluster with the Data API enabled. The database must include a vector table with specific columns:

  • An embedding column to store the vector representation of the content,

  • A chunk column for the actual text tied to each embedding,

  • And a metadata column that holds references, which are useful for pointing back to the original source during retrieval.

The Knowledge Base keeps this table up to date automatically whenever content is synced from the source bucket.

Since I like to keep everything automated, I trigger the db init script right after the database cluster is created. This script sets up everything we need: a role, schema, table, and indexes; all in one go, wrapped in a single transaction. It’s run by a function once the cluster is deployed:

    export const handler = async (_: unknown): Promise<void> => {
        const bedrockKnowledgeBaseCreds = {
            username: "bedrock_user",
            password: generatePostgresPassword(),
        };

        let schema = "knowledge_base";
        let vectorTable = "bedrock_kb";

        const queries = [
            `CREATE EXTENSION IF NOT EXISTS vector`,
            `CREATE SCHEMA IF NOT EXISTS ${schema}`,
            `CREATE ROLE ${bedrockKnowledgeBaseCreds.username} WITH PASSWORD '${bedrockKnowledgeBaseCreds.password}' LOGIN`,
            `GRANT ALL ON SCHEMA ${schema} to ${bedrockKnowledgeBaseCreds.username}`,
            `CREATE TABLE IF NOT EXISTS ${schema}.${vectorTable} (id uuid PRIMARY KEY, embedding vector(1024), chunks text, metadata json, custom_metadata jsonb)`,
            `CREATE INDEX IF NOT EXISTS bedrock_kb_embedding_idx ON ${schema}.${vectorTable} USING hnsw (embedding vector_cosine_ops) WITH (ef_construction=256)`,
            `CREATE INDEX IF NOT EXISTS bedrock_kb_chunks_fts_idx ON ${schema}.${vectorTable} USING gin (to_tsvector('simple', chunks))`,
            `CREATE INDEX IF NOT EXISTS bedrock_kb_custom_metadata_idx ON ${schema}.${vectorTable} USING gin (custom_metadata)`,
            `GRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA ${schema} TO ${bedrockKnowledgeBaseCreds.username}`

        ];

        await executeTransaction(databaseArn, databaseSecretArn, databaseName, queries, bedrockKnowledgeBaseCreds);
    };
Enter fullscreen mode Exit fullscreen mode

executeTransaction leverages Aurora’s data api in order to execute these statements. In this function, I also set the secret containing a dedicated database user and password, which will later be used when setting up the data source for the Knowledge Base.

Creating The knowledge base

Quite straightforward in terraform:

    resource "aws_bedrockagent_knowledge_base" "this" {

      name     = "${var.application}-${var.environment}-kb"
      role_arn = aws_iam_role.kb_role.arn

      knowledge_base_configuration {
        vector_knowledge_base_configuration {
          embedding_model_arn = local.embedding_model_arn
        }
        type = "VECTOR"
      }

      storage_configuration {
        type = "RDS"
        rds_configuration {
          credentials_secret_arn = aws_secretsmanager_secret.kb_creds.arn
          database_name          = aws_rds_cluster.rds_cluster.database_name
          resource_arn           = aws_rds_cluster.rds_cluster.arn
          table_name             = "${local.db_schema}.${local.vector_table}"
          field_mapping {
            primary_key_field = "id"
            metadata_field    = "metadata"
            text_field        = "chunks"
            vector_field      = "embedding"
          }
        }
      }

      depends_on = [
        aws_rds_cluster_instance.rds_cluster_instance,
        aws_lambda_invocation.seed_db_function,
        aws_secretsmanager_secret.kb_creds
      ]
    }

    resource "aws_bedrockagent_data_source" "this" {
      knowledge_base_id = aws_bedrockagent_knowledge_base.this.id
      name              = "kb_datasource"

      vector_ingestion_configuration {
        chunking_configuration {
          chunking_strategy = "FIXED_SIZE"
          fixed_size_chunking_configuration {
            max_tokens         = 300
            overlap_percentage = 20
          }
        }
      }

      data_source_configuration {

        type = "S3"
        s3_configuration {

          bucket_arn         = aws_s3_bucket.kb_bucket.arn
          inclusion_prefixes = ["${local.kb_folder}"]
        }
      }
    }
Enter fullscreen mode Exit fullscreen mode

When configuring RDS as a data source there are a few key parameters you’ll need to provide:

  • The vector table and the field mappings that define which columns in the table should be used by the Knowledge Base.

  • The IAM role associated with the Knowledge Base must have the following permissions on the RDS cluster: rds-data:ExecuteStatement, rds-data:BatchExecuteStatement and rds:DescribeDbClusters

  • The Secret arn that holds the database credentials that we created in the previous step.

Once the knowledge base is deployed, here’s what it looks like in the console:

Knowledge base overviewKnowledge base overview

We can already start testing it right from the console. In this example, I synced the knowledge base with a dataset containing texts about the Roman Empire.

Testing the knowledge baseTesting the knowledge base

Alights, let’s see how to setup the AppSync Events integration

Setting up AppSync integration

As mentioned earlier, I’ll be using Cognito User Pools as the default auth mode as well as IAM:

  • AppSync will handle validating both subscribe and publish requests from clients, as long as they provide a valid Cognito token.

  • IAM auth will be used handleAppSyncEvent function as needs to publish responses back to the clients

    resource "awscc_appsync_api" "this" {
      name          = "${var.application}-${var.environment}-events-api"
      owner_contact = "${var.application}-${var.environment}"
      event_config = {
        auth_providers = [
          {
            auth_type = "AMAZON_COGNITO_USER_POOLS"
            cognito_config = {
              aws_region   = data.aws_region.current.name,
              user_pool_id = var.user_pool_id,
            }
          },
          {
            auth_type = "AWS_IAM"
          }
        ]
        connection_auth_modes = [
          {
            auth_type = "AMAZON_COGNITO_USER_POOLS"
          }
        ]
        default_publish_auth_modes = [
          {
            auth_type = "AMAZON_COGNITO_USER_POOLS"
          }
        ]
        default_subscribe_auth_modes = [
          {
            auth_type = "AMAZON_COGNITO_USER_POOLS"
          }
        ]
      }
    }
Enter fullscreen mode Exit fullscreen mode

Next up, I’ll need to create a data source and associated to handleAppSyncEvents function. Unfortunately, this part isn’t supported in the terraform provider yet, so for now, I’m using the SDK to create these resources in a function hat runs once right after the AppSync Events API resource is created:

    export const handler = async (event: {
        tf: { action: string };
        apiId: string;
        dataSourceName: string;
        lambdaFunctionArn: string;
        serviceRoleArn: string;
        channelName: string;
    }) => {
        if (
            event.apiId == null ||
            event.dataSourceName == null ||
            event.lambdaFunctionArn == null ||
            event.serviceRoleArn == null ||
            event.channelName == null
        ) {
            throw new Error("SourceArn, TargetArn, RoleArn and channel name are required");
        }

        if (event.tf.action === "create") {
            const client = new AppSyncClient({ region: process.env.AWS_REGION });

            const createDataSourceCommand = new CreateDataSourceCommand({
                apiId: event.apiId,
                name: event.dataSourceName,
                type: "AWS_LAMBDA",
                serviceRoleArn: event.serviceRoleArn,
                lambdaConfig: {
                    lambdaFunctionArn: event.lambdaFunctionArn,
                },
            });

            await client.send(createDataSourceCommand);

            const createChannelCommand = new CreateChannelNamespaceCommand({
                apiId: event.apiId,
                name: event.channelName,
                subscribeAuthModes: [
                    {
                        authType: "AMAZON_COGNITO_USER_POOLS",
                    },
                ],
                publishAuthModes: [
                    {
                        authType: "AMAZON_COGNITO_USER_POOLS",
                    },
                    {
                        authType: "AWS_IAM",
                    },
                ],
                handlerConfigs: {
                    onPublish: {
                        behavior: "DIRECT",
                        integration: {
                            dataSourceName: event.dataSourceName,
                            lambdaConfig: {
                                invokeType: "EVENT",
                            },
                        },
                    },
                    onSubscribe: {
                        behavior: "DIRECT",
                        integration: {
                            dataSourceName: event.dataSourceName,
                            lambdaConfig: {
                                invokeType: "EVENT",
                            },
                        },
                    },
                },
            });

            await client.send(createChannelCommand);

            return;
        }

        // ... handle resource update and deletion
    };
Enter fullscreen mode Exit fullscreen mode

Note the direct integration of the default channel with the function. The handleAppSyncEvents function is invoked directly in EVENT invocation mode.

Here how it looks in the console once the AppSync resources are created:

Chat channel namespaceChat channel namespace

With the Lambda function as a data source defined in this screen

Lambda data sourceLambda data source

Using Lambda Powertools to handle AppSync realtime events

Now let’s get to the interesting part: Handling events from AppSync and putting the knowledge base to work. Lambda Powertools offers a handy utility that makes it easier to integrate Lambda functions with AppSync events. It allows defining clear, dedicated handler methods for publish/subscribe interactions, so less messy if-else blocks. Routing is handled automatically based on namespaces and channel patterns, keeping the code clean and easy to maintain.

Here’s how it works in practice:

Setting things up and handing subscription

We start by initializing the resolver using the Lambda Powertools Event Handler utility, then define a handler for new subscriptions with app.onSubscribe:

    import { AppSyncEventsResolver, UnauthorizedException } from "@aws-lambda-powertools/event-handler/appsync-events";
    // ...

    const app = new AppSyncEventsResolver();

    app.onSubscribe("/chat/responses/*", (payload) => {
        const identity = payload.identity ? (payload.identity as { sub: string; username: string }) : null;
        const sub = identity?.sub;

        if (!sub || (payload.info.channel.segments.length != 3 && !payload.info.channel.path.endsWith(`/${sub}`))) {
            throw new UnauthorizedException("You cannot subscribe to this channel");
        }
    });
Enter fullscreen mode Exit fullscreen mode

Here, I ensure that users can only subscribe to their own chat responses channel. This check prevents accidental or unauthorized access to other users’ channels. Since we’re using a Cognito User Pool, the subscription payload contains decoded user information, including the sub (user ID) and username.

Validating and handling messages

Let’s now see how we handle messages from users:

    const messageSchema = z.object({
        id: z.number(),
        content: z.string(),
        type: z.enum(["chat"]).optional(),
        sessionId: z.string().optional(),
    });

    app.onPublish("/chat/request/*", async (payload, event) => {
        const identity = event.identity ? (event.identity as { sub: string; username: string }) : null;
        const sub = identity?.sub;

        if (!sub || (event.info.channel.segments.length != 3 && !event.info.channel.path.endsWith(`/${sub}`))) {
            throw new UnauthorizedException("You cannot publish to this channel");
        }

        const message = messageSchema.safeParse(payload);

        if (!message.success) {
            return {
                result: { text: "I don't understand what you mean, your message format seems invalid" },
                error: "Invalid message payload",
            };
        }

        const { content, sessionId } = message.data;

        const result = await bedrockClient.send(
            new RetrieveAndGenerateCommand({
                input: {
                    text: content,
                },
                retrieveAndGenerateConfiguration: {
                    type: "KNOWLEDGE_BASE",
                    knowledgeBaseConfiguration: {
                        knowledgeBaseId: process.env.KB_ID,
                        modelArn: process.env.KB_MODEL_ARN,
                    },
                },
                sessionId,
            })
        );

        const signedRequest = await signRequest(`https://${process.env.EVENTS_API_DNS}/event`, "POST", {
            channel: `/chat/responses/${sub}`,
            events: [
                JSON.stringify({
                    result: result.output,
                    sessionId: result.sessionId,
                }),
            ],
        });

        await fetch(`https://${process.env.EVENTS_API_DNS}/event`, {
            method: signedRequest.method,
            headers: signedRequest.headers,
            body: signedRequest.body,
        });

        return {
            processed: true,
        };
    });
Enter fullscreen mode Exit fullscreen mode

As with the subscribe handler, when a message is published to the /chat/request/{userId} channel app.onPublish gets invoked, I extract the user’s identity from the event and enforce that they can only publish to their own channel. We then validate the payload using zod to ensure it has the expected structure. If validation succeeds we then call bedrock RetrieveAndGenerate endpoint.

As I am using IAM auth to let the function publish the response back to the client via chat/responses/{userId} channel. I need to sign the request with sigv4 and pass the signed request headers when calling AppSync event endpoint. This function needs to have appsync:EventPublish permission on the api channels.

☝️ Some notes:

  • In this solution we waits for the full response from the model, we’re not using bedrock’s response streaming feature. I could have used the RetrieveAndGenerateStream API and then send chunks by publishing incrementally each chunk to the client. However, depending on the model’s response, this could lead to multiple calls to the Events API potentially increasing costs (since each call counts as a separate operation). One possible solution would be to buffer the response chunks and send them in batches, striking a better balance between responsiveness and cost. Handling lambda response streaming natively is a feature I’d love to see supported by AppSync in the future.

  • The RetrieveAndGenerate endpoint returns a sessionId that we’ll need to reuse in messages within the same conversation. This sessionId is what allows Amazon Bedrock to maintain context. We simply return that sessionId along with the result, and include it in every subsequent chat message to keep the conversation context-aware.

  • You can check out the Cognito user pool terraform resource definition at the following link.

You can find the complete function code following this link.

Creating a Client

Here, I’m building a small React client to chat with the Knowledge Base via AppSync. I’m using Amplify because it makes it easy to connect to the AppSync API and handle authentication through the user pool to retrieve an access token.

First, we need to configure Amplify by providing the AppSync Events API endpoint along with the Cognito User Pool and Identity Pool ids:

    Amplify.configure({
      API: {
        Events: {
          "endpoint": "https://<api-id>.appsync-api.eu-west-1.amazonaws.com/event",
          "region": "eu-west-1",
          "defaultAuthMode": "userPool",
        },
      },
      Auth: {
        Cognito: {
          userPoolId: "<user-pool-id>",
          userPoolClientId: "<user-pool-client-id>",
          identityPoolId: "<identity-pool-id>",
        },
      },
    });
Enter fullscreen mode Exit fullscreen mode

To handle real-time chat with AppSync Events, the client is connected to two endpoints:

  • events.connect('/chat/responses/${userId}') to subscribe to responses

  • events.post('/chat/request/${userId}') to send user messages

      useEffect(() => {
        const connectToChannel = async () => {
          try {
            if(!userId) {
              return;
            }

            const channel = await events.connect(`/chat/responses/${userId}`);

            const subscription = channel.subscribe({
              next: (data: any) => {
                if(data.type !== "data") return;

                const botMessage: Message = {
                  id: Date.now(),
                  sender: "bot",
                  content: data.event.result.text,

                };
                setMessages((prev) => [...prev, botMessage]);
                setSessionId(data.event.sessionId);
              },
              error: (err: any) => console.error("subscription error", err),
            });

            return () => {
              subscription.unsubscribe?.();
            };
          } catch (error) {
            console.error("connection error", error);
          }
        };

        connectToChannel();
      }, [userId]);

      const sendMessage = async ({userId }:{userId : string}) => {
        if (input.trim() === "") return;
        setInput("");
        const newMessage: Message = {
          id: Date.now(),
          sender: "user",
          content: input,
          sessionId: sessionId,
        };

        setMessages([...messages, newMessage]);

        await events.post(`/chat/request/${userId}`, newMessage);
      };
Enter fullscreen mode Exit fullscreen mode

When the React component mounts, we use useEffect to set up a connection to the /chat/responses/{userId} channel. This subscribes to responses coming from AppSync Events. Once we start getting responses, we add that message to the chat and update the sessionId to maintain context.

To send a message, the sendMessage function posts to the same /chat/request/{userId} endpoint.

The userId in this example is provided by Amplify’s Authenticator component, which handles user authentication and exposes the signed-in user's details.

Which gives this chat interface:

If you look at the browser’s WebSocket connection, you’ll see real-time responses coming through as AppSync Events sends data

Wrapping up

In this post, we walked through building a serverless WebSocket API using AppSync Events, Lambda, and Bedrock Knowledge Bases. We explored how to handle real-time communication securely with Cognito and IAM. Lambda Powertools makes working with AppSync Events an absolute breeze.

As usual you can find the complete repo with the solution ready to be adapted and deployed here 👉 https://github.com/ziedbentahar/rag-chat-with-appsync-events-and-bedrock-knowledge-bases

I will make sure to update the repo once AppSync gets better terraform support 👌

Thanks for making it all the way here !

Resources

Sentry image

Make it make sense

Make sense of fixing your code with straight-forward application monitoring.

Start debugging →

Top comments (3)

Collapse
 
michael_liang_0208 profile image
Michael Liang

Nice post for AI developers!
Which is better to use as a vector database, FAISS or Pinecone?

Collapse
 
zied profile image
Zied Ben Tahar

I haven’t tried FAISS yet, but here’s how I think about it:

  • Postgres (with pgvector) is a very capable option, especially if you’re already using it in your ecosystem. Keeping everything in one place can simplify ops and reduce overhead.

  • Pinecone is a great serverless alternative, fully managed, easy to scale, and optimized for vector search use cases.

imo, If you're starting fresh or want a fully managed experience, Pinecone is appealing. If you're already invested in Postgres, it's worth considering extending it with vector capabilities.

Collapse
 
michael_liang_0208 profile image
Michael Liang

Thanks

Best Practices for Running  Container WordPress on AWS (ECS, EFS, RDS, ELB) using CDK cover image

Best Practices for Running Container WordPress on AWS (ECS, EFS, RDS, ELB) using CDK

This post discusses the process of migrating a growing WordPress eShop business to AWS using AWS CDK for an easily scalable, high availability architecture. The detailed structure encompasses several pillars: Compute, Storage, Database, Cache, CDN, DNS, Security, and Backup.

Read full post

👋 Kindness is contagious

Explore this insightful write-up, celebrated by our thriving DEV Community. Developers everywhere are invited to contribute and elevate our shared expertise.

A simple "thank you" can brighten someone’s day—leave your appreciation in the comments!

On DEV, knowledge-sharing fuels our progress and strengthens our community ties. Found this useful? A quick thank you to the author makes all the difference.

Okay