Dennis Tretyakov

GraphQL subscriptions with HotChocolate

HotChocolate — is awesome, however the documentation on Subscriptions API is quite poor, which creates an impression that provided API is either quite poor either too complicated. The truth is — it is very simple and provides very rich functionality.

In this article I’ll provide two usage examples.

  • Basic - the example emphasized in HotChocolate docs, but in a bit more clear way.
  • Custom - the example that shows how to implement custom subscriptions what IMHO is way closer to real world scenarios.

As well as I’ll try to summarize what components HotChocolate provides for what purpose, and how to combine them efficiently.

In case if something doesn’t work you can refer to github repo example, which also have branches for different stages of the article.

The example app #

The example app will consist of single mutation sendMessage(body: String!) and a single subscription onMessage which will be fired every time someone sends message using mutation. Event payload will consist of two fields:

  • Body - representing message body as string
  • Timestamp - the timestamp of message arriving to server.

In custom example will take it further allowing client to subscribe only to messages where body contains certain text, which being a silly example — IMHO clearly demonstrates how messages can be filtered on the server side.

Understanding real time messaging #

Just be sure we are on the same page, let’s clarify the basic architecture requirements for a application backend with real-time notification api.

The following is not required to play around on a local environment, but understanding of it is required for both: production setup and understanding of HotChocolate provided APIs

This is not specific to HotChocolate framework or GraphQL. If we want server, running on multiple nodes, to send messages to clients, server instances must be connected to something what we call PubSub. That PubSub is responsible for distribution the messages among nodes. The most known PubSub systems would be: Kafka, Redis, RabbitMq.

Here is an example of communication flow:

  • There are two server instances running. Node1 and Node2.
  • We have UserA connected to Node1.
  • UserB is connected to Node2
  • UserA sends message to Node1 (The one he’s connected to)
  • Node1 sends message to PubSub
  • PubSub sends message to both nodes (Node1, Node2)
  • Node1 sends message to UserA
  • Node2 sends message to UserB

PubSub abstraction #

HotChocolate provides great and simple abstraction over PubSub. Meaning you can change PubSub you are using not affecting the business code.

  • ITopicEventSender — interface for sending messages to PubSub
  • ITopicEventReceiver — to subscribe for messages from PubSub for each active GraphQL subscriber.

Official PubSub integrations: #

As it easy to swap provides, it is a common practice to use InMemory provider for local work, and we’ll use this option in the examples.

Basic Example #

The example is in basic branch in github repo.

For the sake of simplicity I’ll implement everything within root objects (eg. RootQuery, RootMutation, RootSubscription).

// Program.cs

using HcSubscriptionExample.GraphQL;

var builder = WebApplication.CreateBuilder(args);
builder.AddGraphQL()
    .AddInMemorySubscriptions()  // registers InMemory PubSub provider    .AddQueryType<RootQuery>()
    .AddMutationType<RootMutation>()
    .AddSubscriptionType<RootSubscription>();

var app = builder.Build();
app.MapGraphQLWebSocket();app.MapGraphQL();app.Run();

AddInMemorySubscriptions — as registers InMemory PubSub provider. As mentioned before InMemory provider will does not support multi node setup typically used in production environments, but is very convenient for local environment.

MapGraphQLWebSocket — is required to use subscriptions over WebSocket, which is a typical use case with subscriptions, if required HotChocolate also supports SSE.

GraphQL by design also requires at least one query to be present, as it is irrelevant to the example, let’s just implement a dummy query.

public class RootQuery
{
    public string Hello() => "Hello";
}

PubSub message entity #

Before moving to Mutation and Subscription implementation, let’s define the PubSub message contract.

public class PubSubMessage
{
    public const string TOPIC_NAME = "Message";
    public required string Body { get; init; }
    public required DateTime Timestamp { get; init; }
}

The TOPIC_NAME don’t have to be here, but I found a convenient way where to put that constant. In most of PubSub systems the messages has topic name. When you send message you provide topic name and payload, receiving message you receive topic name and payload where knowing topic name you know how to process the payload. The HotChocolate abstraction is no exception from that pattern.

SendMessage mutation #

The basic mutation called SendMessage that takes body as input.

public class SendMessageResult
{
    public required string Body { get; init; }
    public required DateTime Timestamp { get; init; }
}

public class RootMutation
{
    public SendMessageResult SendMessage(
        string body,
        [Service]ITopicEventSender topicEventSender)    {
        DateTime timestamp = DateTime.UtcNow;

        topicEventSender.SendAsync(PubSubMessage.TOPIC_NAME, new PubSubMessage        {            Body = body,            Timestamp = timestamp        });
        return new SendMessageResult
        {
            Body = body,
            Timestamp = timestamp
        };
    }
}

ITopicEventSender — is HotChocolate’s abstraction that sends a message to a PubSub. To send a message we provide a topic name and payload which in case of ITopicSender is an object. The exact implementation will serialize into a format it requires.

SendMessageResult — GraphQL mutation requires to have a result, in this example structure-wise it exactly matches the message payload sent to PubSub, however it doesn’t have to, therefore it has a separate type.

OnMessage subscription #

This is the basic definition of a subscription API.

public class MessageEvent
{
    public required string Body { get; init; }
    public required DateTime Timestamp { get; init; }
}

public class RootSubscription
{
    [Subscribe]    [Topic(PubSubMessage.TOPIC_NAME)]    public MessageEvent OnMessage(
        [EventMessage] PubSubMessage pubSubMessage)    {
        return new MessageEvent
        {
            Body = pubSubMessage.Body,
            Timestamp = pubSubMessage.Timestamp
        };
    }
}

[Subscribe] — attribute instructs HotChocolate to expose method OnMessage as GraphQL subscription.

[Topic(PubSubMessage.TOPIC_NAME)] — attribute instructs HotChocolate to trigger OnMessage function for every message coming from PubSub for every active client subscription. This scenario doesn’t provide any option to filter messages, only transform like on a given example.

[EventMessage] — attribute tells HotChocolate in which parameter to put message payload, as well as the type of a payload.

OnMessage with channel as a parameter #

I’ll not deep in that and there are no example in linked repository, just believe that it would be fair to mention that Basic scenario also allows channel name to be given as parameter.

[Subscribe]
[Topic("channel/{{{nameof(channelName)}}}")]
public MessageEvent OnChatChannelMessage(string channelName, [EventMessage] pubSubMessage) { /* ... ommited */ }

In give example topic name will be defined by input parameter channelName.

Security #

As you can see, when we bind subscription to PubSub topic, there are no place for custom logic, however you can use [Authorize] attribute and custom policies to verify user access to particular subscription or topic.

Make sure to use AuthorizeAttribute from HotChocolate.Authorization namespace.

Custom example #

In this section we’ll continue the basic example created before, but we’ll introduce a custom logic on PubSub messages. The mutation will remain as is, all the changes we’ll do will be in RootSubscrpition class.

A room for custom logic #

The example is in room branch in github repo.

As first exercise will, let’s rewrite RootSubscription to behave the same, but in way that will allow as to write custom logic.

* Mind two new namespace imports

using HotChocolate.Execution;using HotChocolate.Subscriptions;
public class RootSubscription
{
    async IAsyncEnumerable<PubSubMessage> SubscribeToMessage(        [Service] ITopicEventReceiver receiver)    {
        await using ISourceStream<PubSubMessage> stream = await receiver.SubscribeAsync<PubSubMessage>(PubSubMessage.TOPIC_NAME);

        await foreach (var pubSubMessage in stream.ReadEventsAsync())
            yield return pubSubMessage;    }

    [Subscribe(With = nameof(SubscribeToMessage))]
    public MessageEvent OnMessage([EventMessage] PubSubMessage pubSubMessage)
    {
        return new MessageEvent
        {
            Body = pubSubMessage.Body,
            Timestamp = pubSubMessage.Timestamp
        };
    }
}

In basis example there was a direct binding from PubSub to OnMessage function that can only transform a message and pass it to subscriber.

In this version, we introduced a managed middle man, the SubscribeTomessage function, that subscribes to messages in PubSub, and yields messages that should be passed to subscriber, even tho at this point we don’t do any filtering or transformations we obviously have room for it.

What have been done:

  1. We removed [Topic] attribute from OnMessage function that was binding subscription directly.

  2. Instead, in subscribe Subscribe using With property we specified name of private function SubscribeToMessage (obviously you can use any name you want).
    The SubscribeToMessage function returns IAsyncEnumerable<PubSubMessage>, the on OnMessage function will be triggered everytime SubscribeToMessage function yields a result. The generic parameter of IAsyncEnumerable must match OnMessage parameter marked with [EventMessage] attribute.

  3. In SubscribeToMessage we used ITopicEventReceiver injected as a service (in here you can inject any service you registered in a service provider) to subscribe to PubSub using a topic name, and implemented IAsyncEnumerable to yield messages from PubSub to OnMessage. On this stage we can filter or transform messages the way we need.

IMPORTANT: You can inject services registered in a service provider in to a function returning IAsyncEnumerable, but you need to keep in mind, the method runs continuously for all the lifetime of subscription. Think twice on what are you injecting.

One more thing we can see from this example, you are not even forced to use HotChocolate abstraction over PubSub, as your implementation of IAsyncEnumerable function can use any other mechanisms. Yet the provided abstraction seems quite reasonable.

Filtering example #

The example is in filter branch in github repo.

From here we can take it further and implement filtering. For a silly example user will be able to filter messages to receive only the ones where body contains specific text.

// user input DTO
public class MessageFilter
{
    public string? BodyContains { get; init; }
}

public class RootSubscription
{
    async IAsyncEnumerable<PubSubMessage> SubscribeToMessage(
        MessageFilter? filter,        [Service] ITopicEventReceiver receiver)
    {
        await using ISourceStream<PubSubMessage> stream = await receiver.SubscribeAsync<PubSubMessage>(PubSubMessage.TOPIC_NAME);

        await foreach (var pubSubMessage in stream.ReadEventsAsync())
        {
            if (filter?.BodyContains is null || pubSubMessage.Body.Contains(filter.BodyContains))                yield return pubSubMessage;
        }
    }

    [Subscribe(With = nameof(SubscribeToMessage))]
    public MessageEvent OnMessage(
        MessageFilter? filter,        [EventMessage] PubSubMessage pubSubMessage)
    {
        return new MessageEvent
        {
            Body = pubSubMessage.Body,
            Timestamp = pubSubMessage.Timestamp
        };
    }
}

In OnMessage function we added optional (marked by ?) input parameter filter. The convention allows us to add the same parameter to SubscribeToMessage function, the Type and Name must match. It will be passed by HotChocolate’s framework.

Inside SubscribeToMessage function we implemented our silly filtering, which I don’t think requires any explanation.

Summary #

On GraphQL API #

HotChocolate provide a simple API to expose subscription endpoint, where the framework will take care of managing subscription over WebSocket or SSE. Where you can:

On PubSub Integrations #

  • There wide PubSub support in official packages

  • The abstraction is very simple, and other PubSub can be implemented if needed

  • Using IAsyncEnumerable approach, you can even disregard HotChocolates abstraction and use your own mechanisms (even tho it’s hard to image the use-case where it would be useful)

© 2020 - 2024, Dennis Tretyakov