stream-db
coreStream-backed reactive database with @durable-streams/state. createStreamDB() with schema and stream options, db.preload() lazy initialization, db.collections for TanStack DB collections, optimistic actions with onMutate and mutationFn, db.utils.awaitTxId() for transaction confirmation, control events (snapshot-start, snapshot-end, reset), db.close() cleanup, re-exported TanStack DB operators (eq, gt, and, or, count, sum, avg, min, max).
This skill builds on durable-streams/state-schema. Read it first for schema definition and event types.
Durable Streams — StreamDB
Create a stream-backed reactive database that syncs structured state from a durable stream into TanStack DB collections. Provides reactive queries, optimistic actions, and transaction confirmation.
Setup
import { createStreamDB, createStateSchema } from "@durable-streams/state"
import { DurableStream } from "@durable-streams/client"
import { z } from "zod"
const schema = createStateSchema({
users: {
schema: z.object({ id: z.string(), name: z.string(), email: z.string() }),
type: "user",
primaryKey: "id",
},
messages: {
schema: z.object({ id: z.string(), text: z.string(), userId: z.string() }),
type: "message",
primaryKey: "id",
},
})
const db = createStreamDB({
streamOptions: {
url: "https://your-server.com/v1/stream/my-app",
contentType: "application/json",
},
state: schema,
})
// The stream must already exist on the server before preload().
// Use DurableStream.connect() to attach to an existing stream,
// or create it first if it doesn't exist yet:
try {
await DurableStream.create({
url: "https://your-server.com/v1/stream/my-app",
contentType: "application/json",
})
} catch (e) {
if (e.code !== "CONFLICT_EXISTS") throw e // Already exists is fine
}
// Connect and load initial data
await db.preload()
// Access TanStack DB collections
const users = db.collections.users
const messages = db.collections.messages
Core Patterns
Reactive queries with TanStack DB
StreamDB collections are TanStack DB collections. Use framework adapters for reactive queries:
import { useLiveQuery } from "@tanstack/react-db"
import { eq } from "@durable-streams/state"
function UserProfile({ userId }: { userId: string }) {
const userQuery = useLiveQuery((q) =>
q
.from({ users: db.collections.users })
.where(({ users }) => eq(users.id, userId))
.findOne()
)
if (!userQuery.data) return null
return <div>{userQuery.data.name}</div>
}
Optimistic actions with server confirmation
import { createStreamDB, createStateSchema } from "@durable-streams/state"
import { z } from "zod"
const schema = createStateSchema({
users: {
schema: z.object({ id: z.string(), name: z.string() }),
type: "user",
primaryKey: "id",
},
})
const db = createStreamDB({
streamOptions: {
url: "https://your-server.com/v1/stream/my-app",
contentType: "application/json",
},
state: schema,
actions: ({ db, stream }) => ({
addUser: {
onMutate: (user) => {
db.collections.users.insert(user) // Optimistic — shows immediately
},
mutationFn: async (user) => {
const txid = crypto.randomUUID()
await stream.append(
JSON.stringify(
schema.users.insert({ value: user, headers: { txid } })
)
)
await db.utils.awaitTxId(txid, 10000) // Wait for confirmation
},
},
}),
})
await db.preload()
await db.actions.addUser({ id: "1", name: "Kyle" })
Cleanup on unmount
import { useEffect, useState } from "react"
function App() {
const [db, setDb] = useState(null)
useEffect(() => {
const database = createStreamDB({ streamOptions, state: schema })
database.preload().then(() => setDb(database))
return () => database.close() // Clean up connections and timers
}, [])
if (!db) return <div>Loading...</div>
return <Dashboard db={db} />
}
SSR: StreamDB is client-only
StreamDB holds open HTTP connections and relies on browser/Node.js runtime features. In meta-frameworks (TanStack Start, Next.js, Remix), ensure StreamDB only runs on the client:
// TanStack Start / React Router — mark the route as client-only
export const Route = createFileRoute("/dashboard")({
ssr: false,
component: Dashboard,
})
Without ssr: false, the server-side render will attempt to create StreamDB and fail or produce instanceof mismatches between server and client bundles.
Common Mistakes
CRITICAL Forgetting to call preload() before accessing data
Wrong:
const db = createStreamDB({ streamOptions, state: schema })
const users = db.collections.users // Collections are empty!
Correct:
const db = createStreamDB({ streamOptions, state: schema })
await db.preload() // Connect and load initial data
const users = db.collections.users
StreamDB creates the stream lazily. Without preload(), no connection is established and collections remain empty.
Source: packages/state/src/stream-db.ts
HIGH Not calling close() on unmount/cleanup
Wrong:
useEffect(() => {
const db = createStreamDB({ streamOptions, state: schema })
db.preload()
setDb(db)
}, [])
Correct:
useEffect(() => {
const db = createStreamDB({ streamOptions, state: schema })
db.preload()
setDb(db)
return () => db.close()
}, [])
StreamDB holds open HTTP connections and a 15-second health check interval. Forgetting close() leaks connections and timers.
Source: packages/state/README.md best practices
HIGH Not using awaitTxId for critical writes
Wrong:
mutationFn: async (user) => {
await stream.append(JSON.stringify(schema.users.insert({ value: user })))
// No confirmation — optimistic state may diverge from server
}
Correct:
mutationFn: async (user) => {
const txid = crypto.randomUUID()
await stream.append(
JSON.stringify(schema.users.insert({ value: user, headers: { txid } }))
)
await db.utils.awaitTxId(txid, 10000) // Wait up to 10 seconds
}
Without awaitTxId, the client has no confirmation that the write was persisted. Optimistic state may diverge if the write fails silently.
Source: packages/state/README.md transaction IDs section
HIGH Tension: Catch-up completeness vs. live latency
This skill's patterns conflict with reading-streams. preload() waits for all existing data before resolving, which may take time for large streams. Agents may forget that after preload(), the StreamDB is already in live-tailing mode — no additional subscription setup is needed.
See also: durable-streams/reading-streams/SKILL.md
See also
- state-schema — Define schemas before creating a StreamDB
- reading-streams — Understanding live modes and offset management
Version
Targets @durable-streams/state v0.2.1.