In order to establish a connection with Appsync Websocket Server in a pub/sub model you need:
- Active Appsync Websocket Server
- Websocket Client
- Appsync subprotocol handshake
- Publish/Subscribe
Websocket server
Creating a websocket server in Appsync is banal, you can use any IaC solution, an AWS CLI or AWS Console to do it.
All you need is a name, and an authorization type for authenticating, publishing and subscribing. That’s it.
resource "aws_appsync_api" "dev" {
name = "appsync-dev"
event_config {
auth_provider {
auth_type = "AWS_IAM"
}
connection_auth_mode {
auth_type = "AWS_IAM"
}
default_publish_auth_mode {
auth_type = "AWS_IAM"
}
default_subscribe_auth_mode {
auth_type = "AWS_IAM"
}
}
}
Even though this is enough, as each websocket server comes with a default channel, you will probably want to add your own channels to the server
resource "aws_appsync_channel_namespace" "dev" {
name = "appsync-dev"
api_id = aws_appsync_api.dev.api_id
subscribe_auth_mode {
auth_type = "AWS_IAM"
}
publish_auth_mode {
auth_type = "AWS_IAM"
}
}
In this example, I’m using AWS_IAM authorization type meaning that the environment needs to have assume a role or use API Keys that has permission to access the Server.
Websocket Client
Creating a websocket client is very simple, it works the same way as with any other Websocket server. It will work with any Websocket library in the language of your choice, but, there’s one gotcha.
conn, _, err := websocket.Dial(ctx, options.Url.String(), &websocket.DialOptions{
Host: options.Url.Hostname(),
Subprotocols: []string{
"aws-appsync-event-ws",
calculated_subprotocol,
},
})
Aside from the endpoint we also need to provide two Subprotocols, one is a static value: aws-appsync-event-ws, but the other
has to be computed using the authorizer of your choice. You need to stringify a request object, url base64 encode it without padding,
and add header- prefix.
signature, err := authorizer.Authorize(ctx, Input{})
if err != nil {
return "", err
}
data, err := json.Marshal(signature)
if err != nil {
return "", err
}
encoded := base64.RawURLEncoding.EncodeToString(data)
calculated_subprotocol := "header-" + encoded
Once you have both subprotocols, you’ll have a usable websocket connection.
Handshake
Performing a handshake is very straight forward. All you gotta do is send a Connection Init message:
{"type":"connection_init"}
Now you wait for a Connection Acknowledged message, which will return with a time to live parameter in milliseconds. This is for implementing a hearbeat mechanism. If no request is send within that timeout, you should consider the connection dropped. To avoid dropping quiet connections, AWS sends periodic Keep Alive messages every minute, but every message comming in from the Appsync server is a confirmation that the connection is healthy.
var connection_init_msg = []byte(`{"type":"connection_init"}`)
func handshake() (time.Duration, error){
err := connection.Write(ctx, connection_init_msg)
if err != nil {
session.logger.Debug("Failed to send connection init message")
return 0, err
}
timeout := time.After(10 * time.Second)
for {
select {
case <-ctx.Done():
return 0, ctx.Err()
case <-timeout:
return 0, app.ErrHandshakeTimeout
default:
}
event, err := connection.Read(ctx)
if err != nil {
return 0, err
}
msg, err := session.codec.Decode(event)
if err != nil {
return 0, err
}
if msg, ok := msg.(protocol.ConnectionAckMessage); ok {
connection_timeout_ms := time.Duration(msg.TimeoutMs) * time.Millisecond
return connection_timeout_ms, nil
}
if msg, ok := msg.(protocol.ErrorMessage); ok {
return 0, fmt.Errorf("Handshake returned with error: %v", msg.Errors)
}
// Skip
}
}
Aside from the golang specific concurrency handling, what this code does is:
- Sends a Connection Initialization message to the Websocket server
- Reads/Waits for a response message from Websocket Connection
- Decodes message bytes into an object/struct
- Checks if it’s the Connection Acknowledged message
- Checks if it’s the Error message
- Skips all other messages
After this is done, you can start Publishing messages and Subscribing to channels using the Appsync Websocket Server.