Back to Trpc

WebSockets

www/docs/server/websockets.md

11.16.08.7 KB
Original Source

You can use WebSockets for all or some of the communication with your server, see wsLink for how to set it up on the client.

:::tip The document here outlines the specific details of using WebSockets. For general usage of subscriptions, see our subscriptions guide. :::

Creating a WebSocket-server

bash
yarn add ws
ts
// @filename: trpc.ts
import type { CreateWSSContextFnOptions } from '@trpc/server/adapters/ws';
export const createContext = (opts: CreateWSSContextFnOptions) => ({});

// @filename: routers/app.ts
import { initTRPC } from '@trpc/server';
const t = initTRPC.create();
export const appRouter = t.router({});

// @filename: wsServer.ts
// @types: node
// ---cut---
import { applyWSSHandler } from '@trpc/server/adapters/ws';
import { WebSocketServer } from 'ws';
import { appRouter } from './routers/app';
import { createContext } from './trpc';

const wss = new WebSocketServer({
  port: 3001,
});
const handler = applyWSSHandler({
  wss,
  router: appRouter,
  createContext,
  // Enable heartbeat messages to keep connection open (disabled by default)
  keepAlive: {
    enabled: true,
    // server ping message interval in milliseconds
    pingMs: 30000,
    // connection is terminated if pong message is not received in this many milliseconds
    pongWaitMs: 5000,
  },
});

wss.on('connection', (ws) => {
  console.log(`++ Connection (${wss.clients.size})`);
  ws.once('close', () => {
    console.log(`-- Connection (${wss.clients.size})`);
  });
});
console.log('WebSocket Server listening on ws://localhost:3001');

process.on('SIGTERM', () => {
  console.log('SIGTERM');
  handler.broadcastReconnectNotification();
  wss.close();
});

Setting TRPCClient to use WebSockets

:::tip You can use Links to route queries and/or mutations to HTTP transport and subscriptions over WebSockets. :::

tsx
// @filename: server.ts
import { initTRPC } from '@trpc/server';
const t = initTRPC.create();
export const appRouter = t.router({});
export type AppRouter = typeof appRouter;

// @filename: client.ts
// ---cut---
import { createTRPCClient, createWSClient, wsLink } from '@trpc/client';
import type { AppRouter } from './server';

// create persistent WebSocket connection
const wsClient = createWSClient({
  url: `ws://localhost:3001`,
});

// configure TRPCClient to use WebSockets transport
const client = createTRPCClient<AppRouter>({
  links: [
    wsLink({
      client: wsClient,
    }),
  ],
});

Authentication / connection params {#connectionParams}

:::tip If you're doing a web application, you can ignore this section as the cookies are sent as part of the request. :::

In order to authenticate with WebSockets, you can define connectionParams to createWSClient. This will be sent as the first message when the client establishes a WebSocket connection.

ts
import type { CreateWSSContextFnOptions } from '@trpc/server/adapters/ws';

export const createContext = async (opts: CreateWSSContextFnOptions) => {
  const token = opts.info.connectionParams?.token;
  //    ^?

  // [... authenticate]

  return {};
};

export type Context = Awaited<ReturnType<typeof createContext>>;
ts
// @filename: server.ts
import { initTRPC } from '@trpc/server';
import superjson from 'superjson';
const t = initTRPC.create({ transformer: superjson });
export const appRouter = t.router({});
export type AppRouter = typeof appRouter;

// @filename: client.ts
// ---cut---
import { createTRPCClient, createWSClient, wsLink } from '@trpc/client';
import type { AppRouter } from './server';
import superjson from 'superjson';

const wsClient = createWSClient({
  url: `ws://localhost:3000`,

  connectionParams: async () => {
    return {
      token: 'supersecret',
    };
  },
});
export const trpc = createTRPCClient<AppRouter>({
  links: [wsLink({ client: wsClient, transformer: superjson })],
});

If you yield an event using our tracked()-helper and include an id, the client will automatically reconnect when it gets disconnected and send the last known ID when reconnecting as part of the lastEventId-input.

You can send an initial lastEventId when initializing the subscription and it will be automatically updated as the browser receives data.

:::info If you're fetching data based on the lastEventId, and capturing all events is critical, you may want to use ReadableStream's or a similar pattern as an intermediary as is done in our full-stack SSE example to prevent newly emitted events being ignored while yield'ing the original batch based on lastEventId. :::

ts
// @types: node
import EventEmitter, { on } from 'events';
import { initTRPC, tracked } from '@trpc/server';
import { z } from 'zod';

type Post = { id: string; title: string };

const t = initTRPC.create();
const publicProcedure = t.procedure;
const router = t.router;

const ee = new EventEmitter();

export const subRouter = router({
  onPostAdd: publicProcedure
    .input(
      z
        .object({
          // lastEventId is the last event id that the client has received
          // On the first call, it will be whatever was passed in the initial setup
          // If the client reconnects, it will be the last event id that the client received
          lastEventId: z.string().nullish(),
        })
        .optional(),
    )
    .subscription(async function* (opts) {
      if (opts.input?.lastEventId) {
        // [...] get the posts since the last event id and yield them
      }
      // listen for new events
      for await (const [data] of on(ee, 'add', {
        // Passing the AbortSignal from the request automatically cancels the event emitter when the subscription is aborted
        signal: opts.signal,
      })) {
        const post = data as Post;
        // tracking the post id ensures the client can reconnect at any time and get the latest events since this id
        yield tracked(post.id, post);
      }
    }),
});

WebSockets RPC Specification

You can read more details by drilling into the TypeScript definitions:

query / mutation

Request

ts
interface RequestMessage {
  id: number | string;
  jsonrpc?: '2.0';
  method: 'query' | 'mutation';
  params: {
    path: string;
    input?: unknown; // <-- pass input of procedure, serialized by transformer
  };
}

Response

... below, or an error.

ts
type TOutput = any;
// ---cut---
interface ResponseMessage {
  id: number | string;
  jsonrpc?: '2.0';
  result: {
    type: 'data'; // always 'data' for mutation / queries
    data: TOutput; // output from procedure
  };
}

subscription / subscription.stop

Start a subscription

ts
interface SubscriptionRequest {
  id: number | string;
  jsonrpc?: '2.0';
  method: 'subscription';
  params: {
    path: string;
    input?: unknown; // <-- pass input of procedure, serialized by transformer
  };
}

To cancel a subscription, call subscription.stop

ts
interface SubscriptionStopRequest {
  id: number | string; // <-- id of your created subscription
  jsonrpc?: '2.0';
  method: 'subscription.stop';
}

Subscription response shape

... below, or an error.

ts
type TData = any;
// ---cut---
interface SubscriptionResponse {
  id: number | string;
  jsonrpc?: '2.0';
  result:
    | {
        type: 'data';
        data: TData; // subscription emitted data
      }
    | {
        type: 'started'; // subscription started
      }
    | {
        type: 'stopped'; // subscription stopped
      };
}

Connection params

If the connection is initialized with ?connectionParams=1, the first message has to be connection params.

ts
interface ConnectionParamsMessage {
  data: Record<string, string> | null;
  method: 'connectionParams';
}

Errors

See https://www.jsonrpc.org/specification#error_object or Error Formatting.

Notifications from Server to Client

{ id: null, type: 'reconnect' }

Tells clients to reconnect before shutting down the server. Invoked by wssHandler.broadcastReconnectNotification().