Short recipes for the common things you wire around TanStack Workflow.
Local development:
pnpm add @tanstack/workflow-core @tanstack/workflow-runtime zodpnpm add @tanstack/workflow-core @tanstack/workflow-runtime zodPostgres deployment:
pnpm add @tanstack/workflow-core @tanstack/workflow-runtime \
@tanstack/workflow-store-drizzle-postgres drizzle-orm pg zodpnpm add @tanstack/workflow-core @tanstack/workflow-runtime \
@tanstack/workflow-store-drizzle-postgres drizzle-orm pg zodNetlify:
pnpm add @tanstack/workflow-netlifypnpm add @tanstack/workflow-netlifyVercel:
pnpm add @tanstack/workflow-vercelpnpm add @tanstack/workflow-vercelimport { createWorkflow } from '@tanstack/workflow-core'
import { z } from 'zod'
export const chargeWorkflow = createWorkflow({
id: 'charge',
input: z.object({ userId: z.string(), amount: z.number() }),
}).handler(async (ctx) => {
const charge = await ctx.step('charge-card', (stepCtx) =>
stripe.charges.create(
{ customer: ctx.input.userId, amount: ctx.input.amount },
{ idempotencyKey: stepCtx.id },
),
)
return { chargeId: charge.id }
})import { createWorkflow } from '@tanstack/workflow-core'
import { z } from 'zod'
export const chargeWorkflow = createWorkflow({
id: 'charge',
input: z.object({ userId: z.string(), amount: z.number() }),
}).handler(async (ctx) => {
const charge = await ctx.step('charge-card', (stepCtx) =>
stripe.charges.create(
{ customer: ctx.input.userId, amount: ctx.input.amount },
{ idempotencyKey: stepCtx.id },
),
)
return { chargeId: charge.id }
})import {
defineWorkflowRuntime,
inMemoryWorkflowExecutionStore,
} from '@tanstack/workflow-runtime'
import { chargeWorkflow } from './charge'
export const workflowRuntime = defineWorkflowRuntime({
store: inMemoryWorkflowExecutionStore(),
workflows: {
charge: {
load: async () => chargeWorkflow,
},
},
})import {
defineWorkflowRuntime,
inMemoryWorkflowExecutionStore,
} from '@tanstack/workflow-runtime'
import { chargeWorkflow } from './charge'
export const workflowRuntime = defineWorkflowRuntime({
store: inMemoryWorkflowExecutionStore(),
workflows: {
charge: {
load: async () => chargeWorkflow,
},
},
})import { drizzle } from 'drizzle-orm/node-postgres'
import { Pool } from 'pg'
import { defineWorkflowRuntime } from '@tanstack/workflow-runtime'
import { createDrizzlePostgresWorkflowStore } from '@tanstack/workflow-store-drizzle-postgres'
const db = drizzle(new Pool({ connectionString: process.env.DATABASE_URL }))
const store = createDrizzlePostgresWorkflowStore({ db })
export const workflowRuntime = defineWorkflowRuntime({
store,
workflows: {
charge: {
load: () => import('./charge').then((mod) => mod.chargeWorkflow),
},
},
})import { drizzle } from 'drizzle-orm/node-postgres'
import { Pool } from 'pg'
import { defineWorkflowRuntime } from '@tanstack/workflow-runtime'
import { createDrizzlePostgresWorkflowStore } from '@tanstack/workflow-store-drizzle-postgres'
const db = drizzle(new Pool({ connectionString: process.env.DATABASE_URL }))
const store = createDrizzlePostgresWorkflowStore({ db })
export const workflowRuntime = defineWorkflowRuntime({
store,
workflows: {
charge: {
load: () => import('./charge').then((mod) => mod.chargeWorkflow),
},
},
})Workflow owns its durable store schema. Apply the package-owned SQL migration during setup/deploy instead of copying workflow_* tables into your app's Drizzle schema.
psql "$DATABASE_URL" -f node_modules/@tanstack/workflow-store-drizzle-postgres/migrations/0000_workflow_store.sqlpsql "$DATABASE_URL" -f node_modules/@tanstack/workflow-store-drizzle-postgres/migrations/0000_workflow_store.sqlIf your deploy system wants a package script:
{
"scripts": {
"workflow:migrate": "psql \"$DATABASE_URL\" -f node_modules/@tanstack/workflow-store-drizzle-postgres/migrations/0000_workflow_store.sql"
}
}{
"scripts": {
"workflow:migrate": "psql \"$DATABASE_URL\" -f node_modules/@tanstack/workflow-store-drizzle-postgres/migrations/0000_workflow_store.sql"
}
}Run this against the same DATABASE_URL your deployed functions use. Keep store.ensureSchema() for tests, local demos, and explicit admin bootstrap scripts.
The migration records itself in workflow_schema_migrations. Future Workflow store schema changes will ship as additional numbered SQL files in the adapter package.
export async function POST(request: Request) {
const input = await request.json()
const runId = `charge:${input.orderId}`
const result = await workflowRuntime.startRun({
workflowId: 'charge',
runId,
input,
includeEvents: false,
})
return Response.json({
runId,
kind: result.kind,
})
}export async function POST(request: Request) {
const input = await request.json()
const runId = `charge:${input.orderId}`
const result = await workflowRuntime.startRun({
workflowId: 'charge',
runId,
input,
includeEvents: false,
})
return Response.json({
runId,
kind: result.kind,
})
}Workflow:
const payment = await ctx.waitForEvent<{ paymentId: string }>(
'payment-received',
)const payment = await ctx.waitForEvent<{ paymentId: string }>(
'payment-received',
)Webhook:
export async function POST(request: Request) {
const event = await request.json()
const result = await workflowRuntime.deliverSignal({
runId: `checkout:${event.orderId}`,
signalId: event.id,
name: 'payment-received',
payload: { paymentId: event.paymentId },
includeEvents: false,
})
return Response.json({ kind: result.kind })
}export async function POST(request: Request) {
const event = await request.json()
const result = await workflowRuntime.deliverSignal({
runId: `checkout:${event.orderId}`,
signalId: event.id,
name: 'payment-received',
payload: { paymentId: event.paymentId },
includeEvents: false,
})
return Response.json({ kind: result.kind })
}Workflow:
const decision = await ctx.approve({
title: 'Approve refund?',
description: `Refund ${ctx.input.amount}`,
})
if (!decision.approved) {
return { status: 'rejected' as const }
}const decision = await ctx.approve({
title: 'Approve refund?',
description: `Refund ${ctx.input.amount}`,
})
if (!decision.approved) {
return { status: 'rejected' as const }
}Approval handler:
await workflowRuntime.deliverApproval({
runId,
approval: {
approvalId,
approved: true,
feedback: 'Approved in admin',
},
})await workflowRuntime.deliverApproval({
runId,
approval: {
approvalId,
approved: true,
feedback: 'Approved in admin',
},
})Workflow:
const now = await ctx.now()
await ctx.sleepUntil(now + 30 * 60_000)const now = await ctx.now()
await ctx.sleepUntil(now + 30 * 60_000)Sweep:
await workflowRuntime.sweep({
maxTimers: 25,
maxDurationMs: 55_000,
includeEvents: false,
})await workflowRuntime.sweep({
maxTimers: 25,
maxDurationMs: 55_000,
includeEvents: false,
})import { defineWorkflowRuntime, every } from '@tanstack/workflow-runtime'
export const workflowRuntime = defineWorkflowRuntime({
store,
workflows: {
digest: {
load: async () => digestWorkflow,
schedules: [
{
id: 'digest-every-15m',
schedule: every.minutes(15),
overlapPolicy: 'skip',
input: { batchSize: 100 },
},
],
},
},
})import { defineWorkflowRuntime, every } from '@tanstack/workflow-runtime'
export const workflowRuntime = defineWorkflowRuntime({
store,
workflows: {
digest: {
load: async () => digestWorkflow,
schedules: [
{
id: 'digest-every-15m',
schedule: every.minutes(15),
overlapPolicy: 'skip',
input: { batchSize: 100 },
},
],
},
},
})The host sweep materializes due schedules and starts deterministic runs.
import { cron } from '@tanstack/workflow-runtime'
{
id: 'weekly-report',
schedule: cron('0 9 * * 1', { timezone: 'UTC' }),
overlapPolicy: 'skip',
}import { cron } from '@tanstack/workflow-runtime'
{
id: 'weekly-report',
schedule: cron('0 9 * * 1', { timezone: 'UTC' }),
overlapPolicy: 'skip',
}Current materialization supports numeric five-field UTC cron schedules.
const result = await workflowRuntime.sweep({
maxScheduledRuns: 10,
maxTimers: 50,
maxDurationMs: 25_000,
includeEvents: false,
})
console.log(result.summary)
if (result.remainingMayExist) {
// Let the next cron tick continue, or enqueue another sweep.
}const result = await workflowRuntime.sweep({
maxScheduledRuns: 10,
maxTimers: 50,
maxDurationMs: 25_000,
includeEvents: false,
})
console.log(result.summary)
if (result.remainingMayExist) {
// Let the next cron tick continue, or enqueue another sweep.
}const result = await workflowRuntime.sweep({
includeEvents: true,
maxEvents: 100,
})
console.log(result.scheduled[0]?.events)const result = await workflowRuntime.sweep({
includeEvents: true,
maxEvents: 100,
})
console.log(result.scheduled[0]?.events)Do this in development or admin tooling, not on every production cron response.
import { createCloudflareWorkflowScheduledHandler } from '@tanstack/workflow-cloudflare'
export default {
scheduled: createCloudflareWorkflowScheduledHandler({
runtime: ({ env }) => createWorkflowRuntime(env),
maxScheduledRuns: 25,
maxTimers: 25,
maxDurationMs: 25_000,
}),
}import { createCloudflareWorkflowScheduledHandler } from '@tanstack/workflow-cloudflare'
export default {
scheduled: createCloudflareWorkflowScheduledHandler({
runtime: ({ env }) => createWorkflowRuntime(env),
maxScheduledRuns: 25,
maxTimers: 25,
maxDurationMs: 25_000,
}),
}// scripts/workflow-sweep.ts
import { createRailwayWorkflowCronCommand } from '@tanstack/workflow-railway'
import { workflowRuntime } from '../src/workflows/runtime.server'
const sweep = createRailwayWorkflowCronCommand({
runtime: workflowRuntime,
maxScheduledRuns: 25,
maxTimers: 25,
maxDurationMs: 55_000,
logSummary: true,
})
await sweep()// scripts/workflow-sweep.ts
import { createRailwayWorkflowCronCommand } from '@tanstack/workflow-railway'
import { workflowRuntime } from '../src/workflows/runtime.server'
const sweep = createRailwayWorkflowCronCommand({
runtime: workflowRuntime,
maxScheduledRuns: 25,
maxTimers: 25,
maxDurationMs: 55_000,
logSummary: true,
})
await sweep()Configure Railway Cron Jobs with config-as-code:
# railway.toml
[deploy]
startCommand = "pnpm workflow:sweep"
cronSchedule = "*/5 * * * *"
restartPolicyType = "NEVER"# railway.toml
[deploy]
startCommand = "pnpm workflow:sweep"
cronSchedule = "*/5 * * * *"
restartPolicyType = "NEVER"// netlify/functions/workflow-sweep-background.ts
import {
createNetlifyWorkflowSweepHandler,
} from '@tanstack/workflow-netlify'
import { workflowRuntime } from '../../src/workflows/runtime.server'
export default createNetlifyWorkflowSweepHandler({
runtime: workflowRuntime,
maxDurationMs: 25_000,
})
export const config = {
schedule: '*/5 * * * *',
}// netlify/functions/workflow-sweep-background.ts
import {
createNetlifyWorkflowSweepHandler,
} from '@tanstack/workflow-netlify'
import { workflowRuntime } from '../../src/workflows/runtime.server'
export default createNetlifyWorkflowSweepHandler({
runtime: workflowRuntime,
maxDurationMs: 25_000,
})
export const config = {
schedule: '*/5 * * * *',
}// app/api/workflow/sweep/route.ts
import { createVercelWorkflowSweepHandler } from '@tanstack/workflow-vercel'
import { workflowRuntime } from '@/workflows/runtime.server'
export const runtime = 'nodejs'
export const maxDuration = 60
export const GET = createVercelWorkflowSweepHandler({
runtime: workflowRuntime,
cronSecret: process.env.CRON_SECRET,
maxDurationMs: 55_000,
})// app/api/workflow/sweep/route.ts
import { createVercelWorkflowSweepHandler } from '@tanstack/workflow-vercel'
import { workflowRuntime } from '@/workflows/runtime.server'
export const runtime = 'nodejs'
export const maxDuration = 60
export const GET = createVercelWorkflowSweepHandler({
runtime: workflowRuntime,
cronSecret: process.env.CRON_SECRET,
maxDurationMs: 55_000,
}){
"$schema": "https://openapi.vercel.sh/vercel.json",
"crons": [{ "path": "/api/workflow/sweep", "schedule": "*/5 * * * *" }]
}{
"$schema": "https://openapi.vercel.sh/vercel.json",
"crons": [{ "path": "/api/workflow/sweep", "schedule": "*/5 * * * *" }]
}workflows: {
fulfillment: {
load: () =>
import('./fulfillment').then((mod) => mod.fulfillmentWorkflow),
},
}workflows: {
fulfillment: {
load: () =>
import('./fulfillment').then((mod) => mod.fulfillmentWorkflow),
},
}Lazy loaders keep adapters from importing every workflow up front and make old versions explicit.
workflows: {
fulfillment: {
version: 'v2',
load: () => import('./fulfillment.v2').then((mod) => mod.workflow),
previousVersions: {
v1: () => import('./fulfillment.v1').then((mod) => mod.workflow),
},
},
}workflows: {
fulfillment: {
version: 'v2',
load: () => import('./fulfillment.v2').then((mod) => mod.workflow),
previousVersions: {
v1: () => import('./fulfillment.v1').then((mod) => mod.workflow),
},
},
}Remove v1 only after every v1 run has finished or errored.
import { materializeWorkflowSchedules } from '@tanstack/workflow-runtime'
await materializeWorkflowSchedules(workflowRuntime, {
now: Date.now(),
cronLookbackMs: 24 * 60 * 60 * 1000,
})import { materializeWorkflowSchedules } from '@tanstack/workflow-runtime'
await materializeWorkflowSchedules(workflowRuntime, {
now: Date.now(),
cronLookbackMs: 24 * 60 * 60 * 1000,
})Most users should let the host adapter call this automatically.
Host adapters default to compact responses:
{
"ok": true,
"summary": {
"materialized": 1,
"scheduled": { "completed": 1 },
"timers": {},
"eventCount": 8,
"returnedEventCount": 0
},
"deadlineReached": false,
"remainingMayExist": false
}{
"ok": true,
"summary": {
"materialized": 1,
"scheduled": { "completed": 1 },
"timers": {},
"eventCount": 8,
"returnedEventCount": 0
},
"deadlineReached": false,
"remainingMayExist": false
}The full sweep result is optional:
createVercelWorkflowSweepHandler({
runtime,
includeSweepResult: true,
includeEvents: true,
})createVercelWorkflowSweepHandler({
runtime,
includeSweepResult: true,
includeEvents: true,
})