Back to Trpc

Subscriptions / WebSockets

www/versioned_docs/version-9.x/further/subscriptions.md

11.16.04.9 KB
Original Source

Using Subscriptions

:::tip

:::

Adding a subscription procedure

tsx
import { EventEmitter } from 'events';
import * as trpc from '@trpc/server';

// create a global event emitter (could be replaced by redis, etc)
const ee = new EventEmitter();

export const appRouter = trpc
  .router()
  .subscription('onAdd', {
    resolve({ ctx }) {
      // `resolve()` is triggered for each client when they start subscribing `onAdd`

      // return a `Subscription` with a callback which is triggered immediately
      return new trpc.Subscription<Post>((emit) => {
        const onAdd = (data: Post) => {
          // emit data to client
          emit.data(data);
        };

        // trigger `onAdd()` when `add` is triggered in our event emitter
        ee.on('add', onAdd);

        // unsubscribe function when client disconnects or stops subscribing
        return () => {
          ee.off('add', onAdd);
        };
      });
    },
  })
  .mutation('add', {
    input: z.object({
      id: z.string().uuid().optional(),
      text: z.string().min(1),
    }),
    async resolve({ ctx, input }) {
      const post = { ...input }; /* [..] add to db */

      ee.emit('add', post);
      return post;
    },
  });

Creating a WebSocket-server

bash
yarn add ws
ts
import { applyWSSHandler } from '@trpc/server/adapters/ws';
import ws from 'ws';
import { appRouter } from './routers/app';
import { createContext } from './trpc';

const wss = new ws.Server({
  port: 3001,
});
const handler = applyWSSHandler({ wss, router: appRouter, createContext });

wss.on('connection', (ws) => {
  console.log(`➕➕ Connection (${wss.clients.size})`);
  ws.once('close', () => {
    console.log(`➖➖ Connection (${wss.clients.size})`);
  });
});
console.log('✅ WebSocket Server listening on ws://localhost:3001');

process.on('SIGTERM', () => {
  console.log('SIGTERM');
  handler.broadcastReconnectNotification();
  wss.close();
});

Setting TRPCClient to use WebSockets

:::tip You can use Links to route queries and/or mutations to HTTP transport and subscriptions over WebSockets. :::

tsx
import { httpBatchLink } from '@trpc/client/links/httpBatchLink';
import { createWSClient, wsLink } from '@trpc/client/links/wsLink';

// create persistent WebSocket connection
const wsClient = createWSClient({
  url: `ws://localhost:3001`,
});

// configure TRPCClient to use WebSockets transport
const client = createTRPCClient<AppRouter>({
  links: [
    wsLink({
      client: wsClient,
    }),
  ],
});

Using React

See /examples/next-prisma-starter-websockets.

WebSockets RPC Specification

You can read more details by drilling into the TypeScript definitions:

query / mutation

Request

ts
{
  id: number | string;
  jsonrpc?: '2.0';
  method: 'query' | 'mutation';
  params: {
    path: string;
    input?: unknown; // <-- pass input of procedure, serialized by transformer
  };
}

Response

... below, or an error.

ts
{
  id: number | string;
  jsonrpc: '2.0';
  result: {
    type: 'data'; // always 'data' for mutation / queries
    data: TOutput; // output from procedure
  }
}

subscription / subscription.stop

Start a subscription

ts
{
  id: number | string;
  jsonrpc?: '2.0';
  method: 'subscription';
  params: {
    path: string;
    input?: unknown; // <-- pass input of procedure, serialized by transformer
  };
}

To cancel a subscription, call subscription.stop

ts
{
  id: number | string; // <-- id of your created subscription
  jsonrpc?: '2.0';
  method: 'subscription.stop';
}

Subscription response shape

... below, or an error.

ts
{
  id: number | string;
  jsonrpc: '2.0';
  result: (
    | {
      type: 'data';
        data: TData; // subscription emitted data
      }
    | {
        type: 'started'; // sub started
      }
    | {
        type: 'stopped'; // sub stopped
      }
  )
}

Errors

See https://www.jsonrpc.org/specification#error_object or Error Formatting.

Notifications from Server to Client

{id: null, type: 'reconnect' }

Tells clients to reconnect before shutting down server. Invoked by wssHandler.broadcastReconnectNotification().