Skip to main content

Documentation Index

Fetch the complete documentation index at: https://beta.docs.sqd.dev/llms.txt

Use this file to discover all available pages before exploring further.

Install Drizzle ORM and the PostgreSQL driver:
npm install drizzle-orm pg
npm install -D drizzle-kit @types/pg
At a glance, the pipeline looks like this:

Schema

Define your tables with Drizzle ORM. Every table needs a primary key, and every table written to in onData must appear in tables.

onData and batchForInsert

onData runs inside a serializable transaction. Use batchForInsert to split data arrays into chunks that fit within PostgreSQL’s 32,767-parameter limit — chunk size is calculated automatically from the number of columns: Pass an explicit second argument to batchForInsert to cap chunk size:
for (const batch of batchForInsert(data.transfers, 100)) { ... }

tables

Every table written to in onData must be listed in tables. At startup, the target installs PostgreSQL trigger functions on these tables to track row-level changes for automatic fork handling. Inserting into an unlisted table throws at runtime.

Schema migrations

Use Drizzle Kit to generate and apply migrations:
npx drizzle-kit generate
npx drizzle-kit migrate
Alternatively, run migrations automatically on startup via onStart:
import { migrate } from 'drizzle-orm/node-postgres/migrator'

drizzleTarget({
  db,
  tables: [...],
  onStart: async ({ db }) => {
    await migrate(db, { migrationsFolder: './drizzle' })
  },
  onData: async ({ tx, data }) => { ... },
})

Rollback handling

Fork handling is fully automatic. Each batch runs inside a transaction that snapshots row-level changes. When the stream detects a fork, the target replays those snapshots in reverse to restore the pre-fork state. Use onBeforeRollback and onAfterRollback to run custom logic around a rollback. Both callbacks receive the Drizzle transaction and the cursor (BlockCursor) to which state was rolled back:
drizzleTarget({
  db,
  tables: [...],
  onBeforeRollback: async ({ tx, cursor }) => { /* e.g. log or acquire an external lock */ },
  onAfterRollback:  async ({ tx, cursor }) => { /* e.g. invalidate a cache */ },
  onData: async ({ tx, data }) => { ... },
})

Complete example

See the drizzleTarget reference for the full API.