In a previous previous I covered the AppSync Events WebSocket protocol from scratch: the subprotocol handshake, connection init, and heartbeat mechanics. Doing all of this manually gets tedious fast, especially when you need to handle multiple authorization modes or multiple subscriptions on the same connection.
AWS does not provide an official SDK for this, so I extracted it into a library: exanubes/appsync , a Go client for the AWS AppSync Events WebSocket API.
AppSync Events is AWS’s managed pub/sub service. Clients connect over WebSocket, subscribe to named channels, and receive messages in real time. Unlike the AppSync GraphQL API, it has no official Go SDK.
Prerequisites
You need an active AppSync Event API with at least one channel namespace configured. The library covers the client side only. Provisioning the API is outside its scope, but if you’d like to see an example using terraform you can see my terraform config , which is configured with all supported auth providers.
Installation
go get github.com/exanubes/appsync
import (
"github.com/exanubes/appsync"
"github.com/exanubes/appsync/authorizer"
)
Public API is fairly small and simple, and I’d like to keep it that way.
Connecting
The library exposes two core interfaces: Client and Subscription.
type Client interface {
Publish(context.Context, PublishCommandInput) error
Subscribe(context.Context, SubscribeCommandInput) (Subscription, error)
Close(context.Context) error
}
type Subscription interface {
Close(context.Context) error
Next(context.Context) (*NextMessageOutput, error)
DecodeNext(context.Context, any) error
}
A Client represents one active WebSocket connection. You can create multiple subscriptions from the same client. They all share the underlying connection.
To connect, use appsync.Connect. The simplest case uses an API key authorizer:
httpEndpoint := "https://xxxxxxxxxxxxxxxxxxxx.appsync-api.eu-central-1.amazonaws.com/event"
wsEndpoint := "wss://xxxxxxxxxxxxxxxxxxxx.appsync-realtime-api.eu-central-1.amazonaws.com/event/realtime"
authz, err := authorizer.ApiKey(authorizer.ApiKeyAuthorizerConfig{
ApiKey: "your-api-key",
Endpoint: httpEndpoint,
})
if err != nil {
return err
}
client, err := appsync.Connect(ctx, appsync.ConnectionOptions{
Endpoint: wsEndpoint,
Subprotocols: []string{appsync.ProtocolEvents},
Authorizer: authz,
})
if err != nil {
return err
}
defer client.Close(context.Background())
AppSync requires two separate URLs. The HTTP endpoint is used only by authorizers to produce correctly signed headers. No application data flows through this endpoint. The WebSocket endpoint is where the connection is established. Both are available in the AppSync console under your API’s settings.
To successfully establish a connection each Authorizer produces a Signature which is then used to generate a custom
subprotocol based on those headers. The subprotocol is then used for establishing the WebSocket connection. That’s what the WebSocket
endpoint is for. What headers are used depends on the Authorizer, but all of them need a host header which is the HTTP endpoint.
You can read my previous article on this topic.
appsync.ProtocolEvents is the aws-appsync-event-ws subprotocol string required by AppSync.
You don't have to use the exported constant, using a string literal will have the same effect.Subscribing and receiving events
sub, err := client.Subscribe(ctx, appsync.SubscribeCommandInput{
Channel: "default/notifications",
})
if err != nil {
return err
}
defer sub.Close(context.Background())
AppSync channels follow a namespace/channel pattern. The namespace must match one configured in your AppSync Event API settings. For example, default/notifications uses the built-in default namespace with a channel named notifications.
Read events with DecodeNext for JSON payloads:
type Notification struct {
Message string `json:"message"`
}
var notification Notification
if err := sub.DecodeNext(ctx, ¬ification); err != nil {
return err
}
Or use Next when you want the raw bytes:
message, err := sub.Next(ctx)
if err != nil {
return err
}
fmt.Printf("received: %s\n", message.Data)
Both Next and DecodeNext block until a message arrives, the context is cancelled, or the subscription is closed.
Put a deadline on the context if the caller can’t block indefinitely.
Publishing
Publishing sends a payload to a channel over the same WebSocket connection:
type Notification struct {
Message string `json:"message"`
}
payload, err := json.Marshal(Notification{Message: "hello"})
if err != nil {
return err
}
err = client.Publish(ctx, appsync.PublishCommandInput{
Channel: "default/notifications",
Payload: payload,
})
The library accepts raw bytes for the payload. Marshaling is the caller’s responsibility.
Authorization modes
The authorizer package ships three built-in implementations. All three follow the same pattern: create the authorizer,
pass it to Connect.
IAM
Use IAM when your AppSync API is configured for AWS_IAM authorization. The authorizer uses the AWS SDK credential
chain, so it picks up environment variables, shared config profiles, SSO sessions, Lambda execution roles, and ECS task
roles automatically. No extra configuration is needed for most deployments.
authz, err := authorizer.IAM(authorizer.IAMAuthorizerConfig{
Region: "eu-central-1",
Endpoint: httpEndpoint,
})
This is the only authorizer that has any level of complexity. The way it works is that it needs to create a
canonical request
that is signed using AWS’s SigV4 signing algorithm. The algorithm uses the request and some other “stuff” to sign the
request. As a result, it assigns headers to the canonical request that was used. I named those headers a Signature in my
implementation.
This is a common element for all authorizers, a Signature is basically just a map[string]string type. It’s used in two
ways, it is attached to every message as authorization key, and it’s base64 URL encoded to generate a custom subprotocol
that’s used when establishing a WebSocket connection.
authorization key.Token-based
Use token authorization for Cognito User Pool tokens, OIDC tokens, or Lambda authorizer tokens. Pass the token string directly:
// Cognito ID token, OIDC token, or custom Lambda token
authz, err := authorizer.Token(authorizer.TokenAuthorizerConfig{
AuthToken: token,
Endpoint: httpEndpoint,
})
All three of these authorization types require you to first authenticate with the Identity Provider you’re using, get a token and then use it here. So Appsync doesn’t handle requesting/generating a valid token, it can only take the provided token and validate it against the Authorization type you configured for your Appsync Server. So, if you’re using a Lambda Authorizer, it will call that lambda and pass it that token. Same with Cognito User Pool and OIDC tokens.
Custom authorizer
For token refresh logic, per-message signing, or non-standard auth schemes, implement the Authorizer
interface:
type Authorizer interface {
Authorize(context.Context, AuthorizeCommandInput) (*AuthorizeCommandOutput, error)
}
The library calls Authorize for every operation. The input tells you which operation is happening:
| Operation | Channel | Payload |
|---|---|---|
| Connect | empty | nil |
| Subscribe | subscription channel | nil |
| Publish | destination channel | publish payload |
| Unsubscribe | empty | nil |
Return a Signature map with the headers AppSync expects:
func (a *MyAuthorizer) Authorize(
ctx context.Context,
input authorizer.AuthorizeCommandInput,
) (*authorizer.AuthorizeCommandOutput, error) {
token, err := a.fetchToken(ctx)
if err != nil {
return nil, err
}
return &authorizer.AuthorizeCommandOutput{
Signature: map[string]string{
"Authorization": token,
"host": a.host,
},
}, nil
}
Error handling
The package exports sentinel errors for use with errors.Is:
message, err := sub.Next(ctx)
if err != nil {
switch {
case errors.Is(err, context.Canceled), errors.Is(err, context.DeadlineExceeded):
return err
case errors.Is(err, appsync.ErrSubscriptionClosed):
return nil
default:
return err
}
}
The ones you’ll encounter most often:
| Error | When it happens |
|---|---|
ErrHandshakeTimeout | AppSync didn’t acknowledge the connection. Likely an endpoint or auth misconfiguration. |
ErrHeartbeatTimeout | Keep-alive messages stopped. The connection dropped silently. |
ErrSubscriptionInboxFull | The subscriber isn’t consuming fast enough for its buffer |
ErrSubscriptionClosed | A read was attempted on an already-closed subscription |
Backpressure
By default, the library uses internal channel buffers of 100 messages each. For production use with high-throughput channels or slow consumers, these can be tuned:
client, err := appsync.Connect(ctx, appsync.ConnectionOptions{
Endpoint: wsEndpoint,
Subprotocols: []string{appsync.ProtocolEvents},
Authorizer: authz,
Backpressure: appsync.Backpressure{
ConnectionInbound: 200,
ConnectionOutbound: 200,
SubscriptionEvents: 500,
},
})
Larger buffers reduce the risk of ErrSubscriptionInboxFull under burst traffic but increase memory use per connection.
There is no global cap: every active connection allocates its own buffer independently.
Limitations
The library is at v0. It works, but the API may change if missing features require it. What’s not supported yet:
- Per-request authorizers - Appsync allows using different Authorizers for connecting, publishing and subscribing. The library currently uses the same Authorizer for all of these actions
- HTTP publish - Appsync makes it possible to Publish events without establishing a WebSocket connection, right now the library only supports it via the WebSocket
- Batch publish - Appsync supports publishing up to 5 events at once, library currently does one event per publish message
The library is at github.com/exanubes/appsync