onData runs inside a serializable transaction. Use batchForInsert to split data arrays into chunks that fit within PostgreSQL’s 32,767-parameter limit — chunk size is calculated automatically from the number of columns:
import { batchForInsert, drizzleTarget } from '@subsquid/pipes/targets/drizzle/node-postgres'onData: async ({ tx, data }) => { for (const batch of batchForInsert(data.transfers)) { await tx.insert(transfersTable).values( batch.map((d) => ({ blockNumber: d.block.number, logIndex: d.rawEvent.logIndex, from: d.event.from, to: d.event.to, value: d.event.value, })), ) }}
Pass an explicit second argument to batchForInsert to cap chunk size:
for (const batch of batchForInsert(data.transfers, 100)) { ... }
Every table written to in onData must be listed in tables. At startup, the target installs PostgreSQL trigger functions on these tables to track row-level changes for automatic fork handling. Inserting into an unlisted table throws at runtime.
Fork handling is fully automatic. Each batch runs inside a transaction that snapshots row-level changes. When the stream detects a fork, the target replays those snapshots in reverse to restore the pre-fork state.Use onBeforeRollback and onAfterRollback to run custom logic around a rollback. Both callbacks receive the Drizzle transaction and the cursor (BlockCursor) to which state was rolled back:
drizzleTarget({ db, tables: [...], onBeforeRollback: async ({ tx, cursor }) => { /* e.g. log or acquire an external lock */ }, onAfterRollback: async ({ tx, cursor }) => { /* e.g. invalidate a cache */ }, onData: async ({ tx, data }) => { ... },})