exanubes

Establishing a secure websocket connection in AWS Appsync Events API

In order to establish a connection with Appsync Websocket Server in a pub/sub model you need:

  • Active Appsync Websocket Server
  • Websocket Client
  • Appsync subprotocol handshake
  • Publish/Subscribe

Websocket server

Creating a websocket server in Appsync is banal, you can use any IaC solution, an AWS CLI or AWS Console to do it.

All you need is a name, and an authorization type for authenticating, publishing and subscribing. That’s it.

resource "aws_appsync_api" "dev" {
  name = "appsync-dev"

  event_config {
    auth_provider {
      auth_type = "AWS_IAM"
    }

    connection_auth_mode {
      auth_type = "AWS_IAM"
    }

    default_publish_auth_mode {
      auth_type = "AWS_IAM"
    }

    default_subscribe_auth_mode {
      auth_type = "AWS_IAM"
    }
  }
}

Even though this is enough, as each websocket server comes with a default channel, you will probably want to add your own channels to the server

resource "aws_appsync_channel_namespace" "dev" {
  name   = "appsync-dev"
  api_id = aws_appsync_api.dev.api_id

  subscribe_auth_mode {
    auth_type = "AWS_IAM"
  }

  publish_auth_mode {
    auth_type = "AWS_IAM"
  }
}

In this example, I’m using AWS_IAM authorization type meaning that the environment needs to have assume a role or use API Keys that has permission to access the Server.

AWS AppSync Events offers the following authorization types to secure Event APIs: API keys, Lambda, IAM, OpenID Connect, and Amazon Cognito user pools

Websocket Client

Creating a websocket client is very simple, it works the same way as with any other Websocket server. It will work with any Websocket library in the language of your choice, but, there’s one gotcha.

conn, _, err := websocket.Dial(ctx, options.Url.String(), &websocket.DialOptions{
		Host:         options.Url.Hostname(),
		Subprotocols: []string{
            "aws-appsync-event-ws",
            calculated_subprotocol,
            },
})

Aside from the endpoint we also need to provide two Subprotocols, one is a static value: aws-appsync-event-ws, but the other has to be computed using the authorizer of your choice. You need to stringify a request object, url base64 encode it without padding, and add header- prefix.

In my example I have to generate the subprotocol using IAM Authorizer but its implementation details are outside of the scope of this article
signature, err := authorizer.Authorize(ctx, Input{})
if err != nil {
    return "", err
}

data, err := json.Marshal(signature)

if err != nil {
    return "", err
}

encoded := base64.RawURLEncoding.EncodeToString(data)

calculated_subprotocol := "header-" + encoded

Once you have both subprotocols, you’ll have a usable websocket connection.

The connection will also get established without the computed subprotocol, but you won't be able to use it

Handshake

Performing a handshake is very straight forward. All you gotta do is send a Connection Init message:

{"type":"connection_init"}

Now you wait for a Connection Acknowledged message, which will return with a time to live parameter in milliseconds. This is for implementing a hearbeat mechanism. If no request is send within that timeout, you should consider the connection dropped. To avoid dropping quiet connections, AWS sends periodic Keep Alive messages every minute, but every message comming in from the Appsync server is a confirmation that the connection is healthy.

var connection_init_msg = []byte(`{"type":"connection_init"}`)
func handshake() (time.Duration, error){
	err := connection.Write(ctx, connection_init_msg)

	if err != nil {
		session.logger.Debug("Failed to send connection init message")
		return 0, err
	}

	timeout := time.After(10 * time.Second)
	for {
		select {
		case <-ctx.Done():
			return 0, ctx.Err()
		case <-timeout:
			return 0, app.ErrHandshakeTimeout
		default:
		}

		event, err := connection.Read(ctx)
		if err != nil {
			return 0, err
		}

		msg, err := session.codec.Decode(event)
		if err != nil {
			return 0, err
		}

		if msg, ok := msg.(protocol.ConnectionAckMessage); ok {
			connection_timeout_ms := time.Duration(msg.TimeoutMs) * time.Millisecond

			return connection_timeout_ms, nil
		}

		if msg, ok := msg.(protocol.ErrorMessage); ok {
			return 0, fmt.Errorf("Handshake returned with error: %v", msg.Errors)
		}

		// Skip 
	}
}

Aside from the golang specific concurrency handling, what this code does is:

  • Sends a Connection Initialization message to the Websocket server
  • Reads/Waits for a response message from Websocket Connection
  • Decodes message bytes into an object/struct
  • Checks if it’s the Connection Acknowledged message
  • Checks if it’s the Error message
  • Skips all other messages
A full list of different messages and their specification is available in the official documentation

After this is done, you can start Publishing messages and Subscribing to channels using the Appsync Websocket Server.