There’s more and more applications that rely on real-time data and the traditional http request-response cycle does not cut it anymore. That’s when an open WebSocket connection comes in and enables two-way communication from client to server and from server to client.
In this article we’re gonna see how to use an Api Gateway WebSocket API for a two-way data flow between multiple clients.
TOC
- What’s a WebSocket
- Creating a WebSocket API
- Saving connections
- Lambdas and Integrations
- Routes
- Deployment
- Conclusion
What’s a WebSocket
If you’re looking into using an Api Gateway WebSocket API then you’re probably already aware of what websockets are and want to use them to solve a problem you have, but just so we’re on the same page let’s go over it real quick.
WebSocket is a protocol that provides full-duplex communication channels over a single, long-lived TCP connection. It is designed for scenarios where real-time communication between a client and server is required. In contrast, HTTP protocol is suitable for typical web interactions where the request-response model and stateless nature are sufficient.
Your first reaction might be “Full duplex what?“. This is a complicated way of saying that both client and server can send messages to each other at the same time. Similar to a phone call where both parties can talk at the same time but unlike a walkie-talkie where only one party can talk at once.
Overall, the key differences between these protocols is that WebSocket protocol maintains a persistent connection, supports a two-way communication model and has lower overhead after the initial handshake because it does not need to re-establish the connection every time data needs to be sent inq either direction. On the other hand, HTTP closes the connection after each response, is unidirectional meaning server cannot send data to client if client doesn’t initiate the request, and it has additional overhead from having to perform a handshake on each request.
Creating a WebSocket API
To create a Websocket API we need to, at the very least, specify the protocolType
and routeSelectionExpression
.
const api = new aws.apigatewayv2.Api('websocket-api', {
name: 'exanubes-websocket-api',
protocolType: 'WEBSOCKET',
routeSelectionExpression: '$request.body.type'
});
The former is self-explanatory but routeSelectionExpression is not that obvious. Unlike RESTful APIs, we do not have
different url paths and HTTP methods for different actions on resources. Instead, we send every event to the same url
with a JSON body. What routeSelectionExpression
does is it simulates routes based on a property inside the request –
in this case that would be request.body.type
.
Saving connections
In order to send data to a client we need to have access to the connection id that’s assigned when establishing a new connection. I’m gonna use a DynamoDB table for that
new aws.dynamodb.Table('connections', {
name: 'WS_CONNECTIONS',
attributes: [{ name: 'connectionId', type: 'S' }],
billingMode: 'PAY_PER_REQUEST',
hashKey: 'connectionId'
});
A quick rundown about DynamoDB. It’s a NoSQL database meaning that it does not have standardized schema of data attributes,
every row of data in the table can have different data attributes. However, it operates on a concept of primary keys
which are required. In this example, we’re using a simple primary key made up of a partition key – or hash – which is
the connectionId
attribute. There are also composite primary keys that are made up of two columns instead of just one.
As in SQL databases, a primary key must be unique.
DynamoDB is part of the serverless offering in AWS, so we can use a pay-as-you-go model, but we could also provision a read and write capacity units for predictable workloads.
Lambdas and Integrations
To integrate an API with a lambda function we need two things – a lambda function and a lambda integration. An Integration is an API Gateway Component for managing the full lifecycle of request processing between the client-facing API and backend services, e.g., lambda functions.
Handler
const { DynamoDBClient, PutItemCommand } = require('@aws-sdk/client-dynamodb');
const client = new DynamoDBClient();
const table = process.env.CONNECTIONS_TABLE;
exports.handler = async (event) => {
const input = {
TableName: table,
Item: {
connectionId: {
S: event.requestContext.connectionId
}
}
};
const command = new PutItemCommand(input);
const response = await client.send(command);
return {
statusCode: 201,
body: JSON.stringify({
message: 'Connected.'
})
};
};
const { DeleteItemCommand, DynamoDBClient } = require('@aws-sdk/client-dynamodb');
const client = new DynamoDBClient();
const table = process.env.CONNECTIONS_TABLE;
exports.handler = async function handler(event) {
const input = {
TableName: table,
Key: {
connectionId: { S: event.requestContext.connectionId }
}
};
const command = new DeleteItemCommand(input);
await client.send(command);
return {
statusCode: 200,
body: JSON.stringify({
message: 'disconnected.'
})
};
};
exports.handler = async (event) => {
let response = '';
if (event.body === 'Marko?') {
response = 'Polo!';
} else {
response = 'Marko?';
}
return {
statusCode: 200,
body: response
};
};
const { DeleteItemCommand, DynamoDBClient, ScanCommand } = require('@aws-sdk/client-dynamodb');
const {
ApiGatewayManagementApiClient,
PostToConnectionCommand
} = require('@aws-sdk/client-apigatewaymanagementapi');
const dbClient = new DynamoDBClient();
const table = process.env.CONNECTIONS_TABLE;
exports.handler = async function handler(event) {
const body = JSON.parse(event.body);
// https://{api-id}.execute-api.{region}.amazonaws.com/{stage}
const endpoint = `https://${event.requestContext.domainName}/${event.requestContext.stage}`;
const connections = await getConnections(table);
const apiGw = new ApiGatewayManagementApiClient({
apiVersion: '2018-11-29',
endpoint
});
await Promise.all(
connections.map(async (connection) => {
try {
const input = {
ConnectionId: connection.connectionId.S,
Data: body.data
};
const command = new PostToConnectionCommand(input);
return await apiGw.send(command);
} catch (error) {
if (error.$metadata.httpStatusCode === 410) {
await handleStaleConnection(table, connection.connectionId.S);
}
}
})
);
return {
statusCode: 200,
body: JSON.stringify({
message: 'emit lambda function'
})
};
};
Let’s quickly get through the code of each of the lambdas:
connect
– saves the connection id to the DynamoDB tabledisconnect
- deletes the connection id from the DynamoDB tabledefault
- this is a fallback lambda for the $default route, you can implement anything here. This is also the only route/lambda that can handle messages that are not in JSON formatmessage
- scans the DynamoDB table and sends a message to each connection id
Integration
Now, for each of the lambdas we need to create an integration which is what connects the lambda to the API Gateway.
NodejsFunction
component that creates a node.js
lambda function with less boilerplate. This is a result of another article I wrote. You can
find the code on GitHub
or simply install the npm module const connectLambda = new NodejsFunction('WebSocket_Connect', {
code: new pulumi.asset.FileArchive('functions/ws-connect'),
handler: 'index.handler',
environment: {
variables: {
CONNECTIONS_TABLE: table.arn
}
},
policy: {
policy: table.arn.apply((tableArn) =>
JSON.stringify({
Version: '2012-10-17',
Statement: [
{
Action: ['dynamodb:PutItem'],
Effect: 'Allow',
Resource: tableArn
}
]
})
)
}
});
const connectIntegration = new aws.apigatewayv2.Integration('connect-integration', {
apiId: api.id,
integrationType: 'AWS_PROXY',
integrationUri: connectLambda.handler.invokeArn
});
connectLambda.grantInvoke(
'apigw-connect-grant-invoke',
'apigateway.amazonaws.com',
pulumi.interpolate`${api.executionArn}/${stage.name}/$connect`
);
const disconnectLambda = new NodejsFunction('WebSocket_Disconnect', {
code: new pulumi.asset.FileArchive('functions/ws-disconnect'),
handler: 'index.handler',
policy: {
policy: table.arn.apply((tableArn) =>
JSON.stringify({
Version: '2012-10-17',
Statement: [
{
Action: ['dynamodb:DeleteItem'],
Effect: 'Allow',
Resource: tableArn
}
]
})
)
},
environment: {
variables: {
CONNECTIONS_TABLE: table.arn
}
}
});
const disconnectIntegration = new aws.apigatewayv2.Integration('disconnect-integration', {
apiId: api.id,
integrationType: 'AWS_PROXY',
integrationUri: disconnectLambda.handler.invokeArn
});
disconnectLambda.grantInvoke(
'apigw-disconnect-grant-invoke',
'apigateway.amazonaws.com',
pulumi.interpolate`${api.executionArn}/${stage.name}/$disconnect`
);
const defaultLambda = new NodejsFunction('WebSocket_Default', {
code: new pulumi.asset.FileArchive('functions/ws-default'),
handler: 'index.handler'
});
const defaultIntegration = new aws.apigatewayv2.Integration('default-integration', {
apiId: api.id,
integrationType: 'AWS_PROXY',
integrationUri: defaultLambda.handler.invokeArn
});
defaultLambda.grantInvoke(
'apigw-default-grant-invoke',
'apigateway.amazonaws.com',
pulumi.interpolate`${api.executionArn}/${stage.name}/$default`
);
const messageLambda = new NodejsFunction('WebSocket_Message', {
code: new pulumi.asset.FileArchive('functions/ws-message'),
handler: 'index.handler',
policy: {
policy: pulumi.all([table.arn, api.executionArn, stage.name]).apply(([tableArn, executionArn, stageName]) =>
JSON.stringify({
Version: '2012-10-17',
Statement: [
{
Action: ['dynamodb:DeleteItem', 'dynamodb:Scan'],x
Effect: 'Allow',
Resource: tableArn
},
{
Action: ['execute-api:ManageConnections', 'execute-api:Invoke'],
Effect: 'Allow',
// arn:aws:execute-api:{region}:{accountId}:{apiId}/{stage}/POST/@connections/{connectionId}
Resource: [`${executionArn}/${stageName}/POST/@connections/*`]
}
]
})
)
},
environment: {
variables: {
CONNECTIONS_TABLE: table.arn
}
}
});
const messageIntegration = new aws.apigatewayv2.Integration('message-integration', {
apiId: api.id,
integrationType: 'AWS_PROXY',
integrationUri: messageLambda.handler.invokeArn
});
messageLambda.grantInvoke(
'apigw-message-grant-invoke',
'apigateway.amazonaws.com',
pulumi.interpolate`${api.executionArn}/${stage.name}/message`
);
Most of the integrations are the same. We create a lambda function, grant the necessary permissions using the least privilege principle and pass the DynamoDB table arn as
environment variable. Then we use the lambda to create an AWS_PROXY
integration also known as a Lambda Proxy integration. Last but not least, we add a resource-based policy to the
lambda to allow our Api Gateway to invoke it by specifying the principal and execution arn down to the route name. This way we ensure that the lambda will be invokes only by the specific
route of this Api Gateway. We don’t have the routes and stage created yet so let’s move onto that now.
One thing worth mentioning is that message
lambda has an additional policy that allows it to manage connections and invoke the API Gateway to post messages to connections.
Routes
Routes in API Gateway are what allow us to separate our logic into multiple integrations. There are three predefined
routes – $connect
, $disconnect
, $default
– but you can also define your own custom routes.
Obviously, $connect
and $disconnect
routes facilitate opening and closing a WebSocket connection. The $default
route
is a fallback in case no other route was matched and message is our own custom route for sending messages to clients. This is
an equivalent of an endpoint in a REST API.
new aws.apigatewayv2.Route(`Connect_Route`, {
apiId: api.id,
routeKey: '$connect',
target: pulumi.interpolate`integrations/${connectIntegration.id}`
});
new aws.apigatewayv2.Route(`Disconnect_Route`, {
apiId: api.id,
routeKey: '$disconnect',
target: pulumi.interpolate`integrations/${disconnectIntegration.id}`
});
new aws.apigatewayv2.Route(`Default_Route`, {
apiId: api.id,
routeKey: '$default',
target: pulumi.interpolate`integrations/${defaultIntegration.id}`
});
new aws.apigatewayv2.Route(`Message_Route`, {
apiId: api.id,
routeKey: 'message',
target: pulumi.interpolate`integrations/${messageIntegration.id}`
});
As you can see, they’re all pretty much the same. We create a new route, specify the route key, api gateway and target. The target is a path to the integration we want to use for this route and the format is determined by AWS not Pulumi.
integrations/integrationId
$default route
The $default
route is special in that it is the only one that can be configured to have a Route Response. This could be useful when, for example, client
sent a message that did not match any of the predefined routes, and we want to send a response back to the client in that case letting him know.
The $default
route can also be used for error handling, dynamic routing, protocol versioning, non-json payloads and more.
new aws.apigatewayv2.RouteResponse('route-response', {
apiId: api.id,
routeId: defaultRoute.id,
routeResponseKey: '$default'
});
Deployment
To deploy the Api we need to create a stage and a deployment. A deployment is a snapshot of the API that is made available for clients to call. Because of this, every time we make a change, we need to create a new deployment.
const stage = new aws.apigatewayv2.Stage(`api-dev-stage`, {
name: 'dev',
apiId: api.id,
autoDeploy: true
});
export const ws_api_url = pulumi.interpolate`${api.apiEndpoint}/${stage.name}/`;
This is the minimum we need to be able to deploy a WebSocket Api. The exported value will be displayed as an output when we run pulumi up
.
pulumi up
Now, by using wscat or any other WebSocket client you can connect to the WebSocket Api and start sending messages.
wscat -c wss://<api-id>.execute-api.<region>.amazonaws.com/<stage>
This should open up a websocket connection and allow you to post messages. To actually test that it works you should open at least two separate connections then you can post a message which should automatically show up in both terminal windows.
{"type": "message", "data": "Hello World!"}
This should result in all clients receiving the message. None of this is standardized though. I am using type for routing because that’s what I’ve used in the route selection expression, and data is what I used in the message lambda implementation.
Marko?
This is a message that cannot be matched with any route simply for the reason that it doesn’t have a type property in the payload. So, it will be rerouted to the default route and send a response to the client because we’ve configured Route Response earlier. As you can see, I didn’t have to use a JSON format for this message, but it would work the same if the format was JSON but type was an invalid route.
Conclusion
Leveraging an API Gateway WebSocket API provides an efficient solution for applications requiring real-time, bidirectional communication. By utilizing WebSockets, developers can overcome the limitations of traditional HTTP request-response cycles, enabling persistent connections and reducing overhead. This article has outlined the essential steps for creating and deploying a WebSocket API, integrating it with Lambda functions, and managing connections with DynamoDB. With these tools, you can build robust real-time communication systems that enhance the interactivity and responsiveness of your applications. The practical examples and code snippets provided serve as a solid foundation for implementing WebSockets in your own projects, allowing you to harness the power of real-time data exchange effectively.