exanubes
Q&A

Api Gateway Websockets #1 Websocket API on AWS Api Gateway with Pulumi

This article is part of a series

  1. Websocket API on AWS Api Gateway with Pulumi

There’s more and more applications that rely on real-time data and the traditional http request-response cycle does not cut it anymore. That’s when an open WebSocket connection comes in and enables two-way communication from client to server and from server to client.

In this article we’re gonna see how to use an Api Gateway WebSocket API for a two-way data flow between multiple clients.

TOC

What’s a WebSocket

If you’re looking into using an Api Gateway WebSocket API then you’re probably already aware of what websockets are and want to use them to solve a problem you have, but just so we’re on the same page let’s go over it real quick.

WebSocket is a protocol that provides full-duplex communication channels over a single, long-lived TCP connection. It is designed for scenarios where real-time communication between a client and server is required. In contrast, HTTP protocol is suitable for typical web interactions where the request-response model and stateless nature are sufficient.

Your first reaction might be “Full duplex what?“. This is a complicated way of saying that both client and server can send messages to each other at the same time. Similar to a phone call where both parties can talk at the same time but unlike a walkie-talkie where only one party can talk at once.

Overall, the key differences between these protocols is that WebSocket protocol maintains a persistent connection, supports a two-way communication model and has lower overhead after the initial handshake because it does not need to re-establish the connection every time data needs to be sent inq either direction. On the other hand, HTTP closes the connection after each response, is unidirectional meaning server cannot send data to client if client doesn’t initiate the request, and it has additional overhead from having to perform a handshake on each request.

Creating a WebSocket API

To create a Websocket API we need to, at the very least, specify the protocolType and routeSelectionExpression.

const api = new aws.apigatewayv2.Api('websocket-api', {
	name: 'exanubes-websocket-api',
	protocolType: 'WEBSOCKET',
	routeSelectionExpression: '$request.body.type'
});

The former is self-explanatory but routeSelectionExpression is not that obvious. Unlike RESTful APIs, we do not have different url paths and HTTP methods for different actions on resources. Instead, we send every event to the same url with a JSON body. What routeSelectionExpression does is it simulates routes based on a property inside the request – in this case that would be request.body.type.

Saving connections

In order to send data to a client we need to have access to the connection id that’s assigned when establishing a new connection. I’m gonna use a DynamoDB table for that

new aws.dynamodb.Table('ws-connections-table', {
	tableName: 'WS_CONNECTIONS',
	attributeDefinitions: [
		{
			attributeName: 'connectionId',
			attributeType: 'S'
		}
	],
	keySchema: [{ attributeName: 'connectionId', keyType: 'HASH' }],
	billingMode: 'PAY_PER_REQUEST'
});

A quick rundown about DynamoDB. It’s a NoSQL database meaning that it does not have standardized schema of data attributes, every row of data in the table can have different data attributes. However, it operates on a concept of primary keys which are required. In this example, we’re using a simple primary key made up of a partition key – or hash – which is the connectionId attribute. There are also composite primary keys that are made up of two columns instead of just one. As in SQL databases, a primary key must be unique.

DynamoDB is part of the serverless offering in AWS, so we can use a pay-as-you-go model, but we could also provision a read and write capacity units for predictable workloads.

Lambdas and Integrations

To integrate an API with a lambda function we need two things – a lambda function and a lambda integration. An Integration is an API Gateway Component for managing the full lifecycle of request processing between the client-facing API and backend services, e.g., lambda functions.

Integrations are not limited to lambdas though, you can also integrate API Gateway with HTTP endpoints, a VPC Link or certain other AWS services

Handler

const { DynamoDBClient, PutItemCommand } = require('@aws-sdk/client-dynamodb');

const client = new DynamoDBClient();

const table = process.env.CONNECTIONS_TABLE;
exports.handler = async (event) => {
	const input = {
		TableName: table,
		Item: {
			connectionId: {
				S: event.requestContext.connectionId
			}
		}
	};

	const command = new PutItemCommand(input);
	const response = await client.send(command);

	return {
		statusCode: 201,
		body: JSON.stringify({
			message: 'Connected.'
		})
	};
};
const { DeleteItemCommand, DynamoDBClient } = require('@aws-sdk/client-dynamodb');

const client = new DynamoDBClient();
const table = process.env.CONNECTIONS_TABLE;

exports.handler = async function handler(event) {
	const input = {
		TableName: table,
		Key: {
			connectionId: { S: event.requestContext.connectionId }
		}
	};

	const command = new DeleteItemCommand(input);
	await client.send(command);

	return {
		statusCode: 200,
		body: JSON.stringify({
			message: 'disconnected.'
		})
	};
};
exports.handler = async (event) => {
	let response = '';
	if (event.body === 'Marko?') {
		response = 'Polo!';
	} else {
		response = 'Marko?';
	}

	return {
		statusCode: 200,
		body: response
	};
};
const { DeleteItemCommand, DynamoDBClient, ScanCommand } = require('@aws-sdk/client-dynamodb');
const {
	ApiGatewayManagementApiClient,
	PostToConnectionCommand
} = require('@aws-sdk/client-apigatewaymanagementapi');

const dbClient = new DynamoDBClient();
const table = process.env.CONNECTIONS_TABLE;

exports.handler = async function handler(event) {
	const body = JSON.parse(event.body);
	// https://{api-id}.execute-api.{region}.amazonaws.com/{stage}
	const endpoint = `https://${event.requestContext.domainName}/${event.requestContext.stage}`;

	const connections = await getConnections(table);

	const apiGw = new ApiGatewayManagementApiClient({
		apiVersion: '2018-11-29',
		endpoint
	});

	await Promise.all(
		connections.map(async (connection) => {
			try {
				const input = {
					ConnectionId: connection.connectionId.S,
					Data: body.data
				};
				const command = new PostToConnectionCommand(input);
				return apiGw.send(command);
			} catch (error) {
				if (error.statusCode === 410) {
					handleStaleConnection(table, connection.connectionId.S);
				}
			}
		})
	);

	return {
		statusCode: 200,
		body: JSON.stringify({
			message: 'emit lambda function'
		})
	};
};

Let’s quickly get through the code of each of the lambdas:

  • connect – saves the connection id to the DynamoDB table
  • disconnect - deletes the connection id from the DynamoDB table
  • default - this is a fallback lambda for the $default route, you can implement anything here. This is also the only route/lambda that can handle messages that are not in JSON format
  • message - scans the DynamoDB table and sends a message to each connection id
It is not recommended to scan DynamoDB tables
I've trimmed the implementation down to the essentials, but you can find full code on GitHub

Integration

Now, for each of the lambdas we need to create an integration which is what connects the lambda to the API Gateway.

In the following code I'm using NodejsFunction component that creates a node.js lambda function with less boilerplate. This is a result of another article I wrote. You can find the code on GitHub or simply install the npm module
const connectLambda = new NodejsFunction('WebSocket_Connect', {
	code: new pulumi.asset.FileArchive('src/functions/ws-connect'),
	handler: 'index.handler',
	environment: {
		variables: {
			CONNECTIONS_TABLE: table.arn
		}
	},
	policy: {
		policy: table.arn.apply((tableArn) =>
			JSON.stringify({
				Version: '2012-10-17',
				Statement: [
					{
						Action: ['dynamodb:PutItem'],
						Effect: 'Allow',
						Resource: tableArn
					}
				]
			})
		)
	}
});

const connectIntegration = new aws.apigatewayv2.Integration('connect-integration', {
	apiId: api.id,
	integrationType: 'AWS_PROXY',
	integrationUri: connectLambda.handler.invokeArn
});

connectLambda.grantInvoke('apigateway.amazonaws.com', api.arn);
const disconnectLambda = new NodejsFunction('WebSocket_Disconnect', {
	code: new pulumi.asset.FileArchive('src/functions/ws-disconnect'),
	handler: 'index.handler',
	policy: {
		policy: table.arn.apply((tableArn) =>
			JSON.stringify({
				Version: '2012-10-17',
				Statement: [
					{
						Action: ['dynamodb:DeleteItem'],
						Effect: 'Allow',
						Resource: tableArn
					}
				]
			})
		)
	},
	environment: {
		variables: {
			CONNECTIONS_TABLE: table.arn
		}
	}
});

const disconnectIntegration = new aws.apigatewayv2.Integration('disconnect-integration', {
	apiId: api.id,
	integrationType: 'AWS_PROXY',
	integrationUri: disconnectLambda.handler.invokeArn
});
disconnectLambda.grantInvoke('apigateway.amazonaws.com', api.arn);
const defaultLambda = new NodejsFunction('WebSocket_Default', {
	code: new pulumi.asset.FileArchive('src/functions/ws-default'),
	handler: 'index.handler'
});

const defaultIntegration = new aws.apigatewayv2.Integration('default-integration', {
	apiId: api.id,
	integrationType: 'AWS_PROXY',
	integrationUri: defaultLambda.handler.invokeArn
});

defaultLambda.grantInvoke('apigateway.amazonaws.com', api.arn);
const messageLambda = new NodejsFunction('WebSocket_Message', {
	code: new pulumi.asset.FileArchive('src/functions/ws-message'),
	handler: 'index.handler',
	policy: {
		policy: pulumi.all([table.arn, api.executionArn]).apply(([tableArn, executionArn]) =>
			JSON.stringify({
				Version: '2012-10-17',
				Statement: [
					{
						Action: ['dynamodb:DeleteItem', 'dynamodb:Scan'],
						Effect: 'Allow',
						Resource: tableArn
					},
					{
						Action: ['execute-api:ManageConnections', 'execute-api:Invoke'],
						Effect: 'Allow',
						// arn:aws:execute-api:{region}:{accountId}:{apiId}/{stage}/POST/@connections/{connectionId}
						Resource: [`${executionArn}/prod/POST/@connections/*`]
					}
				]
			})
		)
	},
	environment: {
		variables: {
			CONNECTIONS_TABLE: table.arn
		}
	}
});

const messageIntegration = new aws.apigatewayv2.Integration('message-integration', {
	apiId: api.id,
	integrationType: 'AWS_PROXY',
	integrationUri: messageLambda.handler.invokeArn
});

messageLambda.grantInvoke('apigateway.amazonaws.com', api.arn);

Most of the integrations are the same. We create a lambda function, grant the necessary permissions using the least privilege principle and pass the DynamoDB table arn as environment variable. Then we use the lambda to create an AWS_PROXY integration also known as a Lambda Proxy integration. Last but not least, we add a resource-based policy to the lambda to allow our API Gateway to invoke it by specifying the principal and the api arn.

One thing worth mentioning is that message lambda has an additional policy that allows it to manage connections and invoke the API Gateway to post messages to connections.

Read more about different integration types and their use-cases in AWS Documentation

Routes

Routes in API Gateway are what allow us to separate our logic into multiple integrations. There are three predefined routes – $connect, $disconnect, $default – but you can also define your own custom routes.

Obviously, $connect and $disconnect routes facilitate opening and closing a WebSocket connection. The $default route is a fallback in case no other route was matched and message is our own custom route for sending messages to clients. This is an equivalent of an endpoint in a REST API.

new aws.apigatewayv2.Route(`Connect_Route`, {
	apiId: api.id,
	routeKey: '$connect',
	target: pulumi.interpolate`integrations/${connectIntegration.id}`
});
new aws.apigatewayv2.Route(`Disconnect_Route`, {
	apiId: api.id,
	routeKey: '$disconnect',
	target: pulumi.interpolate`integrations/${disconnectIntegration.id}`
});
new aws.apigatewayv2.Route(`Default_Route`, {
	apiId: api.id,
	routeKey: '$default',
	target: pulumi.interpolate`integrations/${defaultIntegration.id}`
});
new aws.apigatewayv2.Route(`Message_Route`, {
	apiId: api.id,
	routeKey: 'message',
	target: pulumi.interpolate`integrations/${messageIntegration.id}`
});

As you can see, they’re all pretty much the same. We create a new route, specify the route key, api gateway and target. The target is a path to the integration we want to use for this route and the format is determined by AWS not Pulumi.

If you use AWS SDK , it will also require you to pass a target property with the format integrations/integrationId

$default route

The $default route is special in that it is the only one that can be configured to have a Route Response. This could be useful when, for example, client sent a message that did not match any of the predefined routes, and we want to send a response back to the client in that case letting him know. The $default route can also be used for error handling, dynamic routing, protocol versioning, non-json payloads and more.

new aws.apigatewayv2.RouteResponse('route-response', {
	apiId: api.id,
	routeId: defaultRoute.id,
	routeResponseKey: '$default'
});
This will only work for the $default route, read more in aws docs

Deployment

To deploy the Api we need to create a stage and a deployment. A deployment is a snapshot of the API that is made available for clients to call. Because of this, every time we make a change, we need to create a new deployment.

new aws.apigatewayv2.Stage(`api-dev-stage`, {
	name: 'dev',
	apiId: this.id,
	autoDeploy: true
});

export const ws_api_url = pulumi.interpolate`${api.apiEndpoint}/${stage.name}`;

This is the minimum we need to be able to deploy a WebSocket Api. The exported value will be displayed as an output when we run pulumi up.

pulumi up

Now, by using wscat or any other WebSocket client you can connect to the WebSocket Api and start sending messages.

wscat -c wss://<api-id>.execute-api.<region>.amazonaws.com/<stage>

This should open up a websocket connection and allow you to post messages. To actually test that it works you should open at least two separate connections then you can post a message which should automatically show up in both terminal windows.

{"type": "message", "data": "Hello, world!"}

This should result in all clients receiving the message. None of this is standardized though. I am using type for routing because that’s what I’ve used in the route selection expression, and data is what I used in the message lambda implementation.

Marko?

This is a message that cannot be matched with any route simply for the reason that it doesn’t have a type property in the payload. So, it will be rerouted to the default route and send a response to the client because we’ve configured Route Response earlier. As you can see, I didn’t have to use a JSON format for this message, but it would work the same if the format was JSON but type was an invalid route.

Conclusion

Leveraging an API Gateway WebSocket API provides an efficient solution for applications requiring real-time, bidirectional communication. By utilizing WebSockets, developers can overcome the limitations of traditional HTTP request-response cycles, enabling persistent connections and reducing overhead. This article has outlined the essential steps for creating and deploying a WebSocket API, integrating it with Lambda functions, and managing connections with DynamoDB. With these tools, you can build robust real-time communication systems that enhance the interactivity and responsiveness of your applications. The practical examples and code snippets provided serve as a solid foundation for implementing WebSockets in your own projects, allowing you to harness the power of real-time data exchange effectively.

FAQ