subscriptions
coreSet up real-time event streams with async generator subscriptions using .subscription(async function*() { yield }). SSE via httpSubscriptionLink is recommended over WebSocket. Use tracked(id, data) from @trpc/server for reconnection recovery with lastEventId. WebSocket via wsLink and createWSClient from @trpc/client, applyWSSHandler from @trpc/server/adapters/ws. Configure SSE ping with initTRPC.create({ sse: { ping: { enabled, intervalMs } } }). AbortSignal via opts.signal for cleanup. splitLink to route subscriptions.
tRPC — Subscriptions
Setup
SSE is recommended for most subscription use cases. It is simpler to set up and does not require a WebSocket server.
Server
// server.ts
import EventEmitter, { on } from 'node:events';
import { initTRPC, tracked } from '@trpc/server';
import { createHTTPServer } from '@trpc/server/adapters/standalone';
import { z } from 'zod';
const t = initTRPC.create({
sse: {
ping: {
enabled: true,
intervalMs: 2000,
},
client: {
reconnectAfterInactivityMs: 5000,
},
},
});
type Post = { id: string; title: string };
const ee = new EventEmitter();
const appRouter = t.router({
onPostAdd: t.procedure
.input(z.object({ lastEventId: z.string().nullish() }).optional())
.subscription(async function* (opts) {
for await (const [data] of on(ee, 'add', { signal: opts.signal })) {
const post = data as Post;
yield tracked(post.id, post);
}
}),
});
export type AppRouter = typeof appRouter;
createHTTPServer({
router: appRouter,
createContext() {
return {};
},
}).listen(3000);
Client (SSE)
// client.ts
import {
createTRPCClient,
httpBatchLink,
httpSubscriptionLink,
splitLink,
} from '@trpc/client';
import type { AppRouter } from './server';
const trpc = createTRPCClient<AppRouter>({
links: [
splitLink({
condition: (op) => op.type === 'subscription',
true: httpSubscriptionLink({ url: 'http://localhost:3000' }),
false: httpBatchLink({ url: 'http://localhost:3000' }),
}),
],
});
const subscription = trpc.onPostAdd.subscribe(
{ lastEventId: null },
{
onData(post) {
console.log('New post:', post);
},
onError(err) {
console.error('Subscription error:', err);
},
},
);
// To stop:
// subscription.unsubscribe();
Core Patterns
tracked() for reconnection recovery
import EventEmitter, { on } from 'node:events';
import { initTRPC, tracked } from '@trpc/server';
import { z } from 'zod';
const t = initTRPC.create();
const ee = new EventEmitter();
const appRouter = t.router({
onPostAdd: t.procedure
.input(z.object({ lastEventId: z.string().nullish() }).optional())
.subscription(async function* (opts) {
const iterable = on(ee, 'add', { signal: opts.signal });
if (opts.input?.lastEventId) {
// Fetch and yield events since lastEventId from your database
// const missed = await db.post.findMany({ where: { id: { gt: opts.input.lastEventId } } });
// for (const post of missed) { yield tracked(post.id, post); }
}
for await (const [data] of iterable) {
yield tracked(data.id, data);
}
}),
});
When using tracked(id, data), the client automatically sends lastEventId on reconnection. For SSE this is part of the EventSource spec; for WebSocket, wsLink handles it.
Polling loop subscription
import { initTRPC, tracked } from '@trpc/server';
import { z } from 'zod';
const t = initTRPC.create();
const appRouter = t.router({
onNewItems: t.procedure
.input(z.object({ lastEventId: z.coerce.date().nullish() }))
.subscription(async function* (opts) {
let cursor = opts.input?.lastEventId ?? null;
while (!opts.signal?.aborted) {
const items = await db.item.findMany({
where: cursor ? { createdAt: { gt: cursor } } : undefined,
orderBy: { createdAt: 'asc' },
});
for (const item of items) {
yield tracked(item.createdAt.toJSON(), item);
cursor = item.createdAt;
}
await new Promise((r) => setTimeout(r, 1000));
}
}),
});
WebSocket setup (when bidirectional communication is required)
// server
import { applyWSSHandler } from '@trpc/server/adapters/ws';
import { WebSocketServer } from 'ws';
import { appRouter } from './router';
const wss = new WebSocketServer({ port: 3001 });
const handler = applyWSSHandler({
wss,
router: appRouter,
createContext() {
return {};
},
keepAlive: {
enabled: true,
pingMs: 30000,
pongWaitMs: 5000,
},
});
process.on('SIGTERM', () => {
handler.broadcastReconnectNotification();
wss.close();
});
// client
import {
createTRPCClient,
createWSClient,
httpBatchLink,
splitLink,
wsLink,
} from '@trpc/client';
import type { AppRouter } from './server';
const wsClient = createWSClient({ url: 'ws://localhost:3001' });
const trpc = createTRPCClient<AppRouter>({
links: [
splitLink({
condition: (op) => op.type === 'subscription',
true: wsLink({ client: wsClient }),
false: httpBatchLink({ url: 'http://localhost:3000' }),
}),
],
});
Cleanup with try...finally
const appRouter = t.router({
events: t.procedure.subscription(async function* (opts) {
const cleanup = registerListener();
try {
for await (const [data] of on(ee, 'event', { signal: opts.signal })) {
yield data;
}
} finally {
cleanup();
}
}),
});
tRPC invokes .return() on the generator when the subscription stops, triggering the finally block.
Common Mistakes
HIGH Using Observable instead of async generator
Wrong:
import { observable } from '@trpc/server/observable';
t.procedure.subscription(({ input }) => {
return observable((emit) => {
emit.next(data);
});
});
Correct:
t.procedure.subscription(async function* ({ input, signal }) {
for await (const [data] of on(ee, 'event', { signal })) {
yield data;
}
});
Observable subscriptions are deprecated and will be removed in v12. Use async generator syntax (async function*).
Source: packages/server/src/unstable-core-do-not-import/procedureBuilder.ts
MEDIUM Empty string as tracked event ID
Wrong:
yield tracked('', data);
Correct:
yield tracked(event.id.toString(), data);
tracked() throws if the ID is an empty string because it conflicts with SSE "no id" semantics.
Source: packages/server/src/unstable-core-do-not-import/stream/tracked.ts
HIGH Fetching history before setting up event listener
Wrong:
t.procedure.subscription(async function* (opts) {
const history = await db.getEvents(); // events may fire here and be lost
yield* history;
for await (const event of listener) {
yield event;
}
});
Correct:
t.procedure.subscription(async function* (opts) {
const iterable = on(ee, 'event', { signal: opts.signal }); // listen first
const history = await db.getEvents();
for (const item of history) {
yield tracked(item.id, item);
}
for await (const [event] of iterable) {
yield tracked(event.id, event);
}
});
If you fetch historical data before setting up the event listener, events emitted between the fetch and listener setup are lost.
Source: www/docs/server/subscriptions.md
MEDIUM SSE ping interval >= client reconnect interval
Wrong:
initTRPC.create({
sse: {
ping: { enabled: true, intervalMs: 10000 },
client: { reconnectAfterInactivityMs: 5000 },
},
});
Correct:
initTRPC.create({
sse: {
ping: { enabled: true, intervalMs: 2000 },
client: { reconnectAfterInactivityMs: 5000 },
},
});
If the server ping interval is >= the client reconnect timeout, the client disconnects thinking the connection is dead before receiving a ping.
Source: packages/server/src/unstable-core-do-not-import/stream/sse.ts
HIGH Sending custom headers with SSE without EventSource polyfill
Wrong:
httpSubscriptionLink({
url: 'http://localhost:3000',
// Native EventSource does not support custom headers
});
Correct:
import { EventSourcePolyfill } from 'event-source-polyfill';
httpSubscriptionLink({
url: 'http://localhost:3000',
EventSource: EventSourcePolyfill,
eventSourceOptions: async () => ({
headers: { authorization: 'Bearer token' },
}),
});
The native EventSource API does not support custom headers. Use an EventSource polyfill and pass it via the EventSource option on httpSubscriptionLink.
Source: www/docs/client/links/httpSubscriptionLink.md
MEDIUM Choosing WebSocket when SSE would suffice
SSE (httpSubscriptionLink) is recommended for most subscription use cases. WebSockets add complexity (connection management, reconnection, keepalive, separate server process). Only use wsLink when bidirectional communication or WebSocket-specific features are required.
Source: maintainer interview
MEDIUM WebSocket subscription stale inputs on reconnect
When a WebSocket reconnects, subscriptions re-send the original input parameters. There is no hook to re-evaluate inputs on reconnect, which can cause stale data. Consider using tracked() with lastEventId to mitigate this.
Source: https://github.com/trpc/trpc/issues/4122
See Also
- links -- splitLink, httpSubscriptionLink, wsLink, httpBatchLink
- auth -- authenticating subscription connections (connectionParams, cookies, EventSource polyfill headers)
- server-setup -- initTRPC.create() SSE configuration options
- adapter-fastify -- WebSocket subscriptions via @fastify/websocket and useWSS