www/docs/client/links/httpSubscriptionLink.md
httpSubscriptionLink is a terminating link that uses Server-sent Events (SSE) for subscriptions.
SSE is a good option for real-time as it's a bit easier than setting up a WebSockets-server.
:::info If your client's environment doesn't support EventSource, you need an EventSource polyfill. For React Native specific instructions please defer to the compatibility section. :::
To use httpSubscriptionLink, you need to use a splitLink to make it explicit that we want to use SSE for subscriptions.
// @filename: server.ts
import { initTRPC } from '@trpc/server';
const t = initTRPC.create();
export const appRouter = t.router({});
export type AppRouter = typeof appRouter;
// @filename: client.ts
// ---cut---
import {
createTRPCClient,
httpBatchLink,
httpSubscriptionLink,
loggerLink,
splitLink,
} from '@trpc/client';
import type { AppRouter } from './server';
const trpcClient = createTRPCClient<AppRouter>({
/**
* @see https://trpc.io/docs/v11/client/links
*/
links: [
// adds pretty logs to your console in development and logs errors in production
loggerLink(),
splitLink({
// uses the httpSubscriptionLink for subscriptions
condition: (op) => op.type === 'subscription',
true: httpSubscriptionLink({
url: `/api/trpc`,
}),
false: httpBatchLink({
url: `/api/trpc`,
}),
}),
],
});
:::tip
The document here outlines the specific details of using httpSubscriptionLink. For general usage of subscriptions, see our subscriptions guide.
:::
If you're doing a web application, cookies are sent as part of the request as long as your client is on the same domain as the server.
If the client and server are not on the same domain, you can use withCredentials: true (read more on MDN here).
Example:
// @filename: server.ts
import { initTRPC } from '@trpc/server';
const t = initTRPC.create();
export const appRouter = t.router({});
export type AppRouter = typeof appRouter;
// @filename: client.ts
import { httpSubscriptionLink } from '@trpc/client';
// ---cut---
// [...]
httpSubscriptionLink({
url: 'https://example.com/api/trpc',
eventSourceOptions() {
return {
withCredentials: true, // <---
};
},
});
Recommended for non-web environments
You can ponyfill EventSource and use the eventSourceOptions -callback to populate headers.
// @filename: server.ts
import { initTRPC } from '@trpc/server';
const t = initTRPC.create();
export const appRouter = t.router({});
export type AppRouter = typeof appRouter;
// @filename: client.ts
declare function getSignature(op: any): Promise<string>;
// ---cut---
import {
createTRPCClient,
httpBatchLink,
httpSubscriptionLink,
splitLink,
} from '@trpc/client';
import { EventSourcePolyfill } from 'event-source-polyfill';
import type { AppRouter } from './server';
// Initialize the tRPC client
const trpc = createTRPCClient<AppRouter>({
links: [
splitLink({
condition: (op) => op.type === 'subscription',
true: httpSubscriptionLink({
url: 'http://localhost:3000',
// ponyfill EventSource
EventSource: EventSourcePolyfill,
// options to pass to the EventSourcePolyfill constructor
eventSourceOptions: async ({ op }) => {
// ^ Includes the operation that's being executed
// you can use this to generate a signature for the operation
const signature = await getSignature(op);
return {
headers: {
authorization: 'Bearer supersecret',
'x-signature': signature,
},
};
},
}),
false: httpBatchLink({
url: 'http://localhost:3000',
}),
}),
],
});
httpSubscriptionLink leverages SSE through EventSource, ensuring that connections encountering errors like network failures or bad response codes are automatically retried. However, EventSource does not allow re-execution of the eventSourceOptions() or url() options to update its configuration, which is particularly important in scenarios where authentication has expired since the last connection.
To address this limitation, you can use a retryLink in conjunction with httpSubscriptionLink. This approach ensures that the connection is re-established with the latest configuration, including any updated authentication details.
:::caution
Please note that restarting the connection will result in the EventSource being recreated from scratch, which means any previously tracked events will be lost.
:::
// @filename: server.ts
import { initTRPC } from '@trpc/server';
const t = initTRPC.create();
export const appRouter = t.router({});
export type AppRouter = typeof appRouter;
// @filename: client.ts
declare function getAuthenticatedUri(): string;
declare const auth: { getOrRenewToken(): Promise<string> };
// ---cut---
import {
createTRPCClient,
httpBatchLink,
httpSubscriptionLink,
retryLink,
splitLink,
} from '@trpc/client';
import {
EventSourcePolyfill,
EventSourcePolyfillInit,
} from 'event-source-polyfill';
import type { AppRouter } from './server';
// Initialize the tRPC client
const trpc = createTRPCClient<AppRouter>({
links: [
splitLink({
condition: (op) => op.type === 'subscription',
false: httpBatchLink({
url: 'http://localhost:3000',
}),
true: [
retryLink({
retry: (opts) => {
opts.op.type;
// ^? will always be 'subscription' since we're in a splitLink
const code = opts.error.data?.code;
if (!code) {
// This shouldn't happen as our httpSubscriptionLink will automatically retry within when there's a non-parsable response
console.error('No error code found, retrying', opts);
return true;
}
if (code === 'UNAUTHORIZED' || code === 'FORBIDDEN') {
console.log('Retrying due to 401/403 error');
return true;
}
return false;
},
}),
httpSubscriptionLink({
url: async () => {
// calculate the latest URL if needed...
return getAuthenticatedUri();
},
// ponyfill EventSource
EventSource: EventSourcePolyfill,
eventSourceOptions: async () => {
// ...or maybe renew an access token
const token = await auth.getOrRenewToken();
return {
headers: {
authorization: `Bearer ${token}`,
},
};
},
}),
],
}),
],
});
In order to authenticate with EventSource, you can define connectionParams in httpSubscriptionLink. This will be sent as part of the URL, which is why other methods are preferred.
import type { CreateHTTPContextOptions } from '@trpc/server/adapters/standalone';
export const createContext = async (opts: CreateHTTPContextOptions) => {
const token = opts.info.connectionParams?.token;
// ^?
// [... authenticate]
return {};
};
export type Context = Awaited<ReturnType<typeof createContext>>;
// @filename: server.ts
import { initTRPC } from '@trpc/server';
const t = initTRPC.create();
export const appRouter = t.router({});
export type AppRouter = typeof appRouter;
// @filename: client.ts
// ---cut---
import {
createTRPCClient,
httpBatchLink,
httpSubscriptionLink,
splitLink,
} from '@trpc/client';
import type { AppRouter } from './server';
// Initialize the tRPC client
const trpc = createTRPCClient<AppRouter>({
links: [
splitLink({
condition: (op) => op.type === 'subscription',
true: httpSubscriptionLink({
url: 'http://localhost:3000',
connectionParams: async () => {
// Will be serialized as part of the URL
return {
token: 'supersecret',
};
},
}),
false: httpBatchLink({
url: 'http://localhost:3000',
}),
}),
],
});
The httpSubscriptionLink supports configuring a timeout for inactivity through the reconnectAfterInactivityMs option. If no messages (including ping messages) are received within the specified timeout period, the connection will be marked as "connecting" and automatically attempt to reconnect.
The timeout configuration is set on the server side when initializing tRPC:
import { initTRPC } from '@trpc/server';
export const t = initTRPC.create({
sse: {
client: {
reconnectAfterInactivityMs: 3_000,
},
},
});
The server can be configured to send periodic ping messages to keep the connection alive and prevent timeout disconnections. This is particularly useful when combined with the reconnectAfterInactivityMs-option.
import { initTRPC } from '@trpc/server';
export const t = initTRPC.create({
sse: {
// Maximum duration of a single SSE connection in milliseconds
// maxDurationMs: 60_000,
ping: {
// Enable periodic ping messages to keep connection alive
enabled: true,
// Send ping message every 2s
intervalMs: 2_000,
},
// client: {
// reconnectAfterInactivityMs: 3_000
// }
},
});
The httpSubscriptionLink makes use of the EventSource API, Streams API, and AsyncIterators, these are not natively supported by React Native and will have to be ponyfilled.
To ponyfill EventSource we recommend to use a polyfill that utilizes the networking library exposed by React Native, over using a polyfill that uses the XMLHttpRequest API. Libraries that polyfill EventSource using XMLHttpRequest fail to reconnect after the app has been in the background. Consider using the rn-eventsource-reborn package.
The Streams API can be ponyfilled using the web-streams-polyfill package.
AsyncIterators can be polyfilled using the @azure/core-asynciterator-polyfill package.
Install the required polyfills:
import { InstallSnippet } from '@site/src/components/InstallSnippet';
<InstallSnippet pkgs="rn-eventsource-reborn web-streams-polyfill @azure/core-asynciterator-polyfill" />Add the polyfills to your project before the link is used (e.g. where you add your TRPCReact.Provider):
import '@azure/core-asynciterator-polyfill';
import { RNEventSource } from 'rn-eventsource-reborn';
import { ReadableStream, TransformStream } from 'web-streams-polyfill';
globalThis.ReadableStream = globalThis.ReadableStream || ReadableStream;
globalThis.TransformStream = globalThis.TransformStream || TransformStream;
Once the ponyfills are added, you can continue setting up the httpSubscriptionLink as described in the setup section.
httpSubscriptionLink Optionstype AnyClientTypes = any;
type DataTransformerOptions = any;
type Operation = any;
namespace EventSourceLike { export type AnyConstructor = any; export type InitDictOf<T> = any; }
// ---cut---
type HTTPSubscriptionLinkOptions<
TRoot extends AnyClientTypes,
TEventSource extends EventSourceLike.AnyConstructor = typeof EventSource,
> = {
/**
* The URL to connect to (can be a function that returns a URL)
*/
url: string | (() => string | Promise<string>);
/**
* Connection params that are available in `createContext()`
* Serialized as part of the URL under the `connectionParams` query parameter
*/
connectionParams?:
| Record<string, string>
| null
| (() =>
| Record<string, string>
| null
| Promise<Record<string, string> | null>);
/**
* Data transformer
* @see https://trpc.io/docs/v11/data-transformers
*/
transformer?: DataTransformerOptions;
/**
* EventSource ponyfill
*/
EventSource?: TEventSource;
/**
* EventSource options or a callback that returns them
*/
eventSourceOptions?:
| EventSourceLike.InitDictOf<TEventSource>
| ((opts: {
op: Operation;
}) =>
| EventSourceLike.InitDictOf<TEventSource>
| Promise<EventSourceLike.InitDictOf<TEventSource>>);
};
export interface SSEStreamProducerOptions<TValue = unknown> {
ping?: {
/**
* Enable ping comments sent from the server
* @default false
*/
enabled: boolean;
/**
* Interval in milliseconds
* @default 1000
*/
intervalMs?: number;
};
/**
* Maximum duration in milliseconds for the request before ending the stream
* @default undefined
*/
maxDurationMs?: number;
/**
* End the request immediately after data is sent
* Only useful for serverless runtimes that do not support streaming responses
* @default false
*/
emitAndEndImmediately?: boolean;
/**
* Client-specific options - these will be sent to the client as part of the first message
* @default {}
*/
client?: {
/**
* Timeout and reconnect after inactivity in milliseconds
* @default undefined
*/
reconnectAfterInactivityMs?: number;
};
}