db-core/custom-adapter
sub-skillBuilding custom collection adapters for new backends. SyncConfig interface: sync function receiving begin, write, commit, markReady, truncate primitives. ChangeMessage format (insert, update, delete). loadSubset for on-demand sync. LoadSubsetOptions (where, orderBy, limit, cursor). Expression parsing: parseWhereExpression, parseOrderByExpression, extractSimpleComparisons, parseLoadSubsetOptions. Collection options creator pattern. rowUpdateMode (partial vs full). Subscription lifecycle and cleanup functions.
This skill builds on db-core and db-core/collection-setup. Read those first.
Custom Adapter Authoring
Setup
import { createCollection } from '@tanstack/db'
import type { SyncConfig, CollectionConfig } from '@tanstack/db'
interface MyItem {
id: string
name: string
}
function myBackendCollectionOptions<T>(config: {
endpoint: string
getKey: (item: T) => string
}): CollectionConfig<T, string, {}> {
return {
getKey: config.getKey,
sync: {
sync: ({ begin, write, commit, markReady, collection }) => {
let isInitialSyncComplete = false
const bufferedEvents: Array<any> = []
// 1. Subscribe to real-time events FIRST
const unsubscribe = myWebSocket.subscribe(config.endpoint, (event) => {
if (!isInitialSyncComplete) {
bufferedEvents.push(event)
return
}
begin()
write({ type: event.type, key: event.id, value: event.data })
commit()
})
// 2. Fetch initial data
fetch(config.endpoint).then(async (res) => {
const items = await res.json()
begin()
for (const item of items) {
write({ type: 'insert', value: item })
}
commit()
// 3. Process buffered events
isInitialSyncComplete = true
for (const event of bufferedEvents) {
begin()
write({ type: event.type, key: event.id, value: event.data })
commit()
}
// 4. Signal readiness
markReady()
})
// 5. Return cleanup function
return () => {
unsubscribe()
}
},
rowUpdateMode: 'partial',
},
onInsert: async ({ transaction }) => {
await fetch(config.endpoint, {
method: 'POST',
body: JSON.stringify(transaction.mutations[0].modified),
})
},
onUpdate: async ({ transaction }) => {
const mut = transaction.mutations[0]
await fetch(`${config.endpoint}/${mut.key}`, {
method: 'PATCH',
body: JSON.stringify(mut.changes),
})
},
onDelete: async ({ transaction }) => {
await fetch(`${config.endpoint}/${transaction.mutations[0].key}`, {
method: 'DELETE',
})
},
}
}
Core Patterns
ChangeMessage format
// Insert
write({ type: 'insert', value: item })
// Update (partial — only changed fields)
write({ type: 'update', key: itemId, value: partialItem })
// Update (full row replacement)
write({ type: 'update', key: itemId, value: fullItem })
// Set rowUpdateMode: "full" in sync config
// Delete
write({ type: 'delete', key: itemId, value: item })
On-demand sync with loadSubset
import { parseLoadSubsetOptions } from "@tanstack/db"
sync: {
sync: ({ begin, write, commit, markReady }) => {
// Initial sync...
markReady()
return () => {}
},
loadSubset: async (options) => {
const { filters, sorts, limit, offset } = parseLoadSubsetOptions(options)
// filters: [{ field: ['category'], operator: 'eq', value: 'electronics' }]
// sorts: [{ field: ['price'], direction: 'asc', nulls: 'last' }]
const params = new URLSearchParams()
for (const f of filters) {
params.set(f.field.join("."), `${f.operator}:${f.value}`)
}
const res = await fetch(`/api/items?${params}`)
return res.json()
},
}
Managing optimistic state duration
Mutation handlers must not resolve until server changes have synced back to the collection. Five strategies:
- Refetch (simplest): await collection.utils.refetch()
- Transaction ID: return { txid } and track via sync stream
- ID-based tracking: await specific record ID appearing in sync stream
- Version/timestamp: wait until sync stream catches up to mutation time
- Provider method: await backend.waitForPendingWrites()
Expression parsing for predicate push-down
import {
parseWhereExpression,
parseOrderByExpression,
extractSimpleComparisons,
} from '@tanstack/db'
// In loadSubset or queryFn:
const comparisons = extractSimpleComparisons(options.where)
// Returns: [{ field: ['name'], operator: 'eq', value: 'John' }]
const orderBy = parseOrderByExpression(options.orderBy)
// Returns: [{ field: ['created_at'], direction: 'desc', nulls: 'last' }]
Common Mistakes
CRITICAL Not calling markReady() in sync implementation
Wrong:
sync: ({ begin, write, commit }) => {
fetchData().then((items) => {
begin()
items.forEach((item) => write({ type: 'insert', value: item }))
commit()
// forgot markReady()!
})
}
Correct:
sync: ({ begin, write, commit, markReady }) => {
fetchData().then((items) => {
begin()
items.forEach((item) => write({ type: 'insert', value: item }))
commit()
markReady()
})
}
markReady() transitions the collection to "ready" status. Without it, live queries never resolve and useLiveSuspenseQuery hangs forever in Suspense.
Source: docs/guides/collection-options-creator.md
HIGH Race condition: subscribing after initial fetch
Wrong:
sync: ({ begin, write, commit, markReady }) => {
fetchAll().then((data) => {
writeAll(data)
subscribe(onChange) // changes during fetch are LOST
markReady()
})
}
Correct:
sync: ({ begin, write, commit, markReady }) => {
const buffer = []
subscribe((event) => {
if (!ready) {
buffer.push(event)
return
}
begin()
write(event)
commit()
})
fetchAll().then((data) => {
writeAll(data)
ready = true
buffer.forEach((e) => {
begin()
write(e)
commit()
})
markReady()
})
}
Subscribe to real-time events before fetching initial data. Buffer events during the fetch, then replay them after the initial sync completes.
Source: docs/guides/collection-options-creator.md
HIGH write() called without begin()
Wrong:
onMessage((event) => {
write({ type: event.type, key: event.id, value: event.data })
commit()
})
Correct:
onMessage((event) => {
begin()
write({ type: event.type, key: event.id, value: event.data })
commit()
})
Sync data must be written within a transaction (begin → write → commit). Calling write() without begin() throws NoPendingSyncTransactionWriteError.
Source: packages/db/src/collection/sync.ts:110
Tension: Simplicity vs. Correctness in Sync
Getting-started simplicity (localOnly, eager mode) conflicts with production correctness (on-demand sync, race condition prevention, proper markReady handling). Agents optimizing for quick setup tend to skip buffering, markReady, and cleanup functions.
See also: db-core/collection-setup/SKILL.md -- for built-in adapter patterns to model after.