When it comes to building serverless WebSocket APIs on AWS, there’s no shortage of options: API Gateway, IoT Core, AppSync GraphQL subscriptions, and now AppSync Events. Each option comes with its own level of control and complexity. I’ve found that AppSync Events to be simplest to work with.
One of the interesting features of AppSync Events is its data sources capability. It lets you directly integrate to resources like DynamoDB, OpenSearch, Bedrock and Lambda. You can interact with these data sources using AppSyncJS (appsync’s own flavor of javascript). But to be totally fair, I lean toward direct lambda integration as it gives more control and makes the development and testing workflow more familiar, standard and manageable.
Currently, Bedrock data source supports only the InvokeModel and Converse APIs. So, if you want to integrate with Knowledge Bases, the viable approach is to create a custom data source using Lambda.
And that’s exactly what this blog post is about, we’ll walk through how to build this RAG-based chat application with AppSync Events and bedrock knowledge based using nodejs, TypeScript and Terraform.
Solution overview
Let’s take a look at how the whole setup fits together:
The Knowledge Base is configured to use PostgreSQL as its vector store where we store the embeddings as well as the associated metadata of the documents we want to index. Using Postgres gives us control over the schema, the indexing strategy, and embedding format, all of which come in handy when fine-tuning a vector-based RAG setup.
We’ve got the handleAppSyncEvents
function directly integrated as a data source for the AppSync Events API. Its role is to process incoming events from AppSync and to invoke the retrieveAndGenerate
from the Knowledge Base. This function is configured to be asynchronous (with the EVENT
invocation type), which means AppSync doesn't wait for the function to complete before returning a response to the client. Once we receive a result from bedrock this function publishes a response back to the client’s response channel.
AppSync Events supports multiple authorization methods to secure Event APIs, including API keys, Lambda authorizers, IAM, OpenID Connect, and Amazon Cognito user pools. In this setup, I’m using both Cognito user pools and IAM:
- Web clients use cognito for authentication
- And I chose IAM over using API key for publishing events from the
handleAppSyncEvents
function to AppSync, as it offers better security posture.
One thing I appreciate in this setup: AppSync Events supports Web ACLs. That means you can easily layer in protections like rate limiting and IP filtering. It’s a nice edge over API Gateway WebSockets, which still doesn’t offer native WAF support.
And tying it all together, the browser connects via WebSocket to AppSync, giving us a real-time, bidirectional channel, ideal for sending the models responses back to users in conversational interfaces.
Let’s dive into the details of the solution; but if you’d like to jump straight to the complete implementation, you can find it here 👇
https://github.com/ziedbentahar/rag-chat-with-appsync-events-and-bedrock-knowledge-bases
Setting up the knowledge base
Let’s first take a look at how we can use Aurora PostgreSQL as a vector store.
Creating the vector store on Postgres
When using PostgreSQL as a vector store, Knowledge Base requires an Aurora Serverless cluster with the Data API enabled. The database must include a vector table with specific columns:
An embedding column to store the vector representation of the content,
A chunk column for the actual text tied to each embedding,
And a metadata column that holds references, which are useful for pointing back to the original source during retrieval.
The Knowledge Base keeps this table up to date automatically whenever content is synced from the source bucket.
Since I like to keep everything automated, I trigger the db init script right after the database cluster is created. This script sets up everything we need: a role, schema, table, and indexes; all in one go, wrapped in a single transaction. It’s run by a function once the cluster is deployed:
export const handler = async (_: unknown): Promise<void> => {
const bedrockKnowledgeBaseCreds = {
username: "bedrock_user",
password: generatePostgresPassword(),
};
let schema = "knowledge_base";
let vectorTable = "bedrock_kb";
const queries = [
`CREATE EXTENSION IF NOT EXISTS vector`,
`CREATE SCHEMA IF NOT EXISTS ${schema}`,
`CREATE ROLE ${bedrockKnowledgeBaseCreds.username} WITH PASSWORD '${bedrockKnowledgeBaseCreds.password}' LOGIN`,
`GRANT ALL ON SCHEMA ${schema} to ${bedrockKnowledgeBaseCreds.username}`,
`CREATE TABLE IF NOT EXISTS ${schema}.${vectorTable} (id uuid PRIMARY KEY, embedding vector(1024), chunks text, metadata json, custom_metadata jsonb)`,
`CREATE INDEX IF NOT EXISTS bedrock_kb_embedding_idx ON ${schema}.${vectorTable} USING hnsw (embedding vector_cosine_ops) WITH (ef_construction=256)`,
`CREATE INDEX IF NOT EXISTS bedrock_kb_chunks_fts_idx ON ${schema}.${vectorTable} USING gin (to_tsvector('simple', chunks))`,
`CREATE INDEX IF NOT EXISTS bedrock_kb_custom_metadata_idx ON ${schema}.${vectorTable} USING gin (custom_metadata)`,
`GRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA ${schema} TO ${bedrockKnowledgeBaseCreds.username}`
];
await executeTransaction(databaseArn, databaseSecretArn, databaseName, queries, bedrockKnowledgeBaseCreds);
};
executeTransaction
leverages Aurora’s data api in order to execute these statements. In this function, I also set the secret containing a dedicated database user and password, which will later be used when setting up the data source for the Knowledge Base.
Creating The knowledge base
Quite straightforward in terraform:
resource "aws_bedrockagent_knowledge_base" "this" {
name = "${var.application}-${var.environment}-kb"
role_arn = aws_iam_role.kb_role.arn
knowledge_base_configuration {
vector_knowledge_base_configuration {
embedding_model_arn = local.embedding_model_arn
}
type = "VECTOR"
}
storage_configuration {
type = "RDS"
rds_configuration {
credentials_secret_arn = aws_secretsmanager_secret.kb_creds.arn
database_name = aws_rds_cluster.rds_cluster.database_name
resource_arn = aws_rds_cluster.rds_cluster.arn
table_name = "${local.db_schema}.${local.vector_table}"
field_mapping {
primary_key_field = "id"
metadata_field = "metadata"
text_field = "chunks"
vector_field = "embedding"
}
}
}
depends_on = [
aws_rds_cluster_instance.rds_cluster_instance,
aws_lambda_invocation.seed_db_function,
aws_secretsmanager_secret.kb_creds
]
}
resource "aws_bedrockagent_data_source" "this" {
knowledge_base_id = aws_bedrockagent_knowledge_base.this.id
name = "kb_datasource"
vector_ingestion_configuration {
chunking_configuration {
chunking_strategy = "FIXED_SIZE"
fixed_size_chunking_configuration {
max_tokens = 300
overlap_percentage = 20
}
}
}
data_source_configuration {
type = "S3"
s3_configuration {
bucket_arn = aws_s3_bucket.kb_bucket.arn
inclusion_prefixes = ["${local.kb_folder}"]
}
}
}
When configuring RDS as a data source there are a few key parameters you’ll need to provide:
The vector table and the field mappings that define which columns in the table should be used by the Knowledge Base.
The IAM role associated with the Knowledge Base must have the following permissions on the RDS cluster: rds-data:ExecuteStatement, rds-data:BatchExecuteStatement and rds:DescribeDbClusters
The Secret arn that holds the database credentials that we created in the previous step.
Once the knowledge base is deployed, here’s what it looks like in the console:
We can already start testing it right from the console. In this example, I synced the knowledge base with a dataset containing texts about the Roman Empire.
Alights, let’s see how to setup the AppSync Events integration
Setting up AppSync integration
As mentioned earlier, I’ll be using Cognito User Pools as the default auth mode as well as IAM:
AppSync will handle validating both subscribe and publish requests from clients, as long as they provide a valid Cognito token.
IAM auth will be used handleAppSyncEvent function as needs to publish responses back to the clients
resource "awscc_appsync_api" "this" {
name = "${var.application}-${var.environment}-events-api"
owner_contact = "${var.application}-${var.environment}"
event_config = {
auth_providers = [
{
auth_type = "AMAZON_COGNITO_USER_POOLS"
cognito_config = {
aws_region = data.aws_region.current.name,
user_pool_id = var.user_pool_id,
}
},
{
auth_type = "AWS_IAM"
}
]
connection_auth_modes = [
{
auth_type = "AMAZON_COGNITO_USER_POOLS"
}
]
default_publish_auth_modes = [
{
auth_type = "AMAZON_COGNITO_USER_POOLS"
}
]
default_subscribe_auth_modes = [
{
auth_type = "AMAZON_COGNITO_USER_POOLS"
}
]
}
}
Next up, I’ll need to create a data source and associated to handleAppSyncEvents
function. Unfortunately, this part isn’t supported in the terraform provider yet, so for now, I’m using the SDK to create these resources in a function hat runs once right after the AppSync Events API resource is created:
export const handler = async (event: {
tf: { action: string };
apiId: string;
dataSourceName: string;
lambdaFunctionArn: string;
serviceRoleArn: string;
channelName: string;
}) => {
if (
event.apiId == null ||
event.dataSourceName == null ||
event.lambdaFunctionArn == null ||
event.serviceRoleArn == null ||
event.channelName == null
) {
throw new Error("SourceArn, TargetArn, RoleArn and channel name are required");
}
if (event.tf.action === "create") {
const client = new AppSyncClient({ region: process.env.AWS_REGION });
const createDataSourceCommand = new CreateDataSourceCommand({
apiId: event.apiId,
name: event.dataSourceName,
type: "AWS_LAMBDA",
serviceRoleArn: event.serviceRoleArn,
lambdaConfig: {
lambdaFunctionArn: event.lambdaFunctionArn,
},
});
await client.send(createDataSourceCommand);
const createChannelCommand = new CreateChannelNamespaceCommand({
apiId: event.apiId,
name: event.channelName,
subscribeAuthModes: [
{
authType: "AMAZON_COGNITO_USER_POOLS",
},
],
publishAuthModes: [
{
authType: "AMAZON_COGNITO_USER_POOLS",
},
{
authType: "AWS_IAM",
},
],
handlerConfigs: {
onPublish: {
behavior: "DIRECT",
integration: {
dataSourceName: event.dataSourceName,
lambdaConfig: {
invokeType: "EVENT",
},
},
},
onSubscribe: {
behavior: "DIRECT",
integration: {
dataSourceName: event.dataSourceName,
lambdaConfig: {
invokeType: "EVENT",
},
},
},
},
});
await client.send(createChannelCommand);
return;
}
// ... handle resource update and deletion
};
Note the direct integration of the default channel with the function. The handleAppSyncEvents
function is invoked directly in EVENT
invocation mode.
Here how it looks in the console once the AppSync resources are created:
With the Lambda function as a data source defined in this screen
Using Lambda Powertools to handle AppSync realtime events
Now let’s get to the interesting part: Handling events from AppSync and putting the knowledge base to work. Lambda Powertools offers a handy utility that makes it easier to integrate Lambda functions with AppSync events. It allows defining clear, dedicated handler methods for publish/subscribe interactions, so less messy if-else blocks. Routing is handled automatically based on namespaces and channel patterns, keeping the code clean and easy to maintain.
Here’s how it works in practice:
Setting things up and handing subscription
We start by initializing the resolver using the Lambda Powertools Event Handler utility, then define a handler for new subscriptions with app.onSubscribe
:
import { AppSyncEventsResolver, UnauthorizedException } from "@aws-lambda-powertools/event-handler/appsync-events";
// ...
const app = new AppSyncEventsResolver();
app.onSubscribe("/chat/responses/*", (payload) => {
const identity = payload.identity ? (payload.identity as { sub: string; username: string }) : null;
const sub = identity?.sub;
if (!sub || (payload.info.channel.segments.length != 3 && !payload.info.channel.path.endsWith(`/${sub}`))) {
throw new UnauthorizedException("You cannot subscribe to this channel");
}
});
Here, I ensure that users can only subscribe to their own chat responses channel. This check prevents accidental or unauthorized access to other users’ channels. Since we’re using a Cognito User Pool, the subscription payload contains decoded user information, including the sub
(user ID) and username
.
Validating and handling messages
Let’s now see how we handle messages from users:
const messageSchema = z.object({
id: z.number(),
content: z.string(),
type: z.enum(["chat"]).optional(),
sessionId: z.string().optional(),
});
app.onPublish("/chat/request/*", async (payload, event) => {
const identity = event.identity ? (event.identity as { sub: string; username: string }) : null;
const sub = identity?.sub;
if (!sub || (event.info.channel.segments.length != 3 && !event.info.channel.path.endsWith(`/${sub}`))) {
throw new UnauthorizedException("You cannot publish to this channel");
}
const message = messageSchema.safeParse(payload);
if (!message.success) {
return {
result: { text: "I don't understand what you mean, your message format seems invalid" },
error: "Invalid message payload",
};
}
const { content, sessionId } = message.data;
const result = await bedrockClient.send(
new RetrieveAndGenerateCommand({
input: {
text: content,
},
retrieveAndGenerateConfiguration: {
type: "KNOWLEDGE_BASE",
knowledgeBaseConfiguration: {
knowledgeBaseId: process.env.KB_ID,
modelArn: process.env.KB_MODEL_ARN,
},
},
sessionId,
})
);
const signedRequest = await signRequest(`https://${process.env.EVENTS_API_DNS}/event`, "POST", {
channel: `/chat/responses/${sub}`,
events: [
JSON.stringify({
result: result.output,
sessionId: result.sessionId,
}),
],
});
await fetch(`https://${process.env.EVENTS_API_DNS}/event`, {
method: signedRequest.method,
headers: signedRequest.headers,
body: signedRequest.body,
});
return {
processed: true,
};
});
As with the subscribe handler, when a message is published to the /chat/request/{userId}
channel app.onPublish
gets invoked, I extract the user’s identity from the event and enforce that they can only publish to their own channel. We then validate the payload using zod to ensure it has the expected structure. If validation succeeds we then call bedrock RetrieveAndGenerate
endpoint.
As I am using IAM auth to let the function publish the response back to the client via chat/responses/{userId}
channel. I need to sign the request with sigv4 and pass the signed request headers when calling AppSync event endpoint. This function needs to have appsync:EventPublish
permission on the api channels.
☝️ Some notes:
In this solution we waits for the full response from the model, we’re not using bedrock’s response streaming feature. I could have used the
RetrieveAndGenerateStream
API and then send chunks by publishing incrementally each chunk to the client. However, depending on the model’s response, this could lead to multiple calls to the Events API potentially increasing costs (since each call counts as a separate operation). One possible solution would be to buffer the response chunks and send them in batches, striking a better balance between responsiveness and cost. Handling lambda response streaming natively is a feature I’d love to see supported by AppSync in the future.The
RetrieveAndGenerate
endpoint returns a sessionId that we’ll need to reuse in messages within the same conversation. This sessionId is what allows Amazon Bedrock to maintain context. We simply return that sessionId along with the result, and include it in every subsequent chat message to keep the conversation context-aware.You can check out the Cognito user pool terraform resource definition at the following link.
You can find the complete function code following this link.
Creating a Client
Here, I’m building a small React client to chat with the Knowledge Base via AppSync. I’m using Amplify because it makes it easy to connect to the AppSync API and handle authentication through the user pool to retrieve an access token.
First, we need to configure Amplify by providing the AppSync Events API endpoint along with the Cognito User Pool and Identity Pool ids:
Amplify.configure({
API: {
Events: {
"endpoint": "https://<api-id>.appsync-api.eu-west-1.amazonaws.com/event",
"region": "eu-west-1",
"defaultAuthMode": "userPool",
},
},
Auth: {
Cognito: {
userPoolId: "<user-pool-id>",
userPoolClientId: "<user-pool-client-id>",
identityPoolId: "<identity-pool-id>",
},
},
});
To handle real-time chat with AppSync Events, the client is connected to two endpoints:
events.connect('/chat/responses/${userId}')
to subscribe to responsesevents.post('/chat/request/${userId}')
to send user messages
useEffect(() => {
const connectToChannel = async () => {
try {
if(!userId) {
return;
}
const channel = await events.connect(`/chat/responses/${userId}`);
const subscription = channel.subscribe({
next: (data: any) => {
if(data.type !== "data") return;
const botMessage: Message = {
id: Date.now(),
sender: "bot",
content: data.event.result.text,
};
setMessages((prev) => [...prev, botMessage]);
setSessionId(data.event.sessionId);
},
error: (err: any) => console.error("subscription error", err),
});
return () => {
subscription.unsubscribe?.();
};
} catch (error) {
console.error("connection error", error);
}
};
connectToChannel();
}, [userId]);
const sendMessage = async ({userId }:{userId : string}) => {
if (input.trim() === "") return;
setInput("");
const newMessage: Message = {
id: Date.now(),
sender: "user",
content: input,
sessionId: sessionId,
};
setMessages([...messages, newMessage]);
await events.post(`/chat/request/${userId}`, newMessage);
};
When the React component mounts, we use useEffect
to set up a connection to the /chat/responses/{userId}
channel. This subscribes to responses coming from AppSync Events. Once we start getting responses, we add that message to the chat and update the sessionId
to maintain context.
To send a message, the sendMessage
function posts to the same /chat/request/{userId}
endpoint.
The userId
in this example is provided by Amplify’s Authenticator component, which handles user authentication and exposes the signed-in user's details.
Which gives this chat interface:
If you look at the browser’s WebSocket connection, you’ll see real-time responses coming through as AppSync Events sends data
Wrapping up
In this post, we walked through building a serverless WebSocket API using AppSync Events, Lambda, and Bedrock Knowledge Bases. We explored how to handle real-time communication securely with Cognito and IAM. Lambda Powertools makes working with AppSync Events an absolute breeze.
As usual you can find the complete repo with the solution ready to be adapted and deployed here 👉 https://github.com/ziedbentahar/rag-chat-with-appsync-events-and-bedrock-knowledge-bases
I will make sure to update the repo once AppSync gets better terraform support 👌
Thanks for making it all the way here !
Top comments (3)
Nice post for AI developers!
Which is better to use as a vector database, FAISS or Pinecone?
I haven’t tried FAISS yet, but here’s how I think about it:
Postgres (with pgvector) is a very capable option, especially if you’re already using it in your ecosystem. Keeping everything in one place can simplify ops and reduce overhead.
Pinecone is a great serverless alternative, fully managed, easy to scale, and optimized for vector search use cases.
imo, If you're starting fresh or want a fully managed experience, Pinecone is appealing. If you're already invested in Postgres, it's worth considering extending it with vector capabilities.
Thanks