Skip to main content

Documentation Index

Fetch the complete documentation index at: https://beta.docs.sqd.dev/llms.txt

Use this file to discover all available pages before exploring further.

Install Drizzle ORM and the PostgreSQL driver:
npm install drizzle-orm pg
npm install -D drizzle-kit @types/pg
At a glance, the pipeline looks like this:
await evmPortalSource({ ... }).pipeTo(
  drizzleTarget({
    db: drizzle('postgresql://...'),
    tables: [transfersTable],
    onData: async ({ tx, data }) => {
      for (const batch of batchForInsert(data.transfers)) {
        await tx.insert(transfersTable).values(batch.map(...))
      }
    },
  }),
)

Schema

Define your tables with Drizzle ORM. Every table needs a primary key, and every table written to in onData must appear in tables.
import { integer, numeric, pgTable, primaryKey, varchar } from 'drizzle-orm/pg-core'

const transfersTable = pgTable('transfers', {
  blockNumber: integer().notNull(),
  logIndex:    integer().notNull(),
  from:        varchar({ length: 42 }).notNull(),
  to:          varchar({ length: 42 }).notNull(),
  value:       numeric({ mode: 'bigint' }).notNull(),
}, (t) => [primaryKey({ columns: [t.blockNumber, t.logIndex] })])

onData and batchForInsert

onData runs inside a serializable transaction. Use batchForInsert to split data arrays into chunks that fit within PostgreSQL’s 32,767-parameter limit — chunk size is calculated automatically from the number of columns:
import { batchForInsert, drizzleTarget } from '@subsquid/pipes/targets/drizzle/node-postgres'

onData: async ({ tx, data }) => {
  for (const batch of batchForInsert(data.transfers)) {
    await tx.insert(transfersTable).values(
      batch.map((d) => ({
        blockNumber: d.block.number,
        logIndex:    d.rawEvent.logIndex,
        from:        d.event.from,
        to:          d.event.to,
        value:       d.event.value,
      })),
    )
  }
}
Pass an explicit second argument to batchForInsert to cap chunk size:
for (const batch of batchForInsert(data.transfers, 100)) { ... }

tables

Every table written to in onData must be listed in tables. At startup, the target installs PostgreSQL trigger functions on these tables to track row-level changes for automatic fork handling. Inserting into an unlisted table throws at runtime.

Schema migrations

Use Drizzle Kit to generate and apply migrations:
npx drizzle-kit generate
npx drizzle-kit migrate
Alternatively, run migrations automatically on startup via onStart:
import { migrate } from 'drizzle-orm/node-postgres/migrator'

drizzleTarget({
  db,
  tables: [...],
  onStart: async ({ db }) => {
    await migrate(db, { migrationsFolder: './drizzle' })
  },
  onData: async ({ tx, data }) => { ... },
})

Rollback handling

Fork handling is fully automatic. Each batch runs inside a transaction that snapshots row-level changes. When the stream detects a fork, the target replays those snapshots in reverse to restore the pre-fork state. Use onBeforeRollback and onAfterRollback to run custom logic around a rollback. Both callbacks receive the Drizzle transaction and the cursor (BlockCursor) to which state was rolled back:
drizzleTarget({
  db,
  tables: [...],
  onBeforeRollback: async ({ tx, cursor }) => { /* e.g. log or acquire an external lock */ },
  onAfterRollback:  async ({ tx, cursor }) => { /* e.g. invalidate a cache */ },
  onData: async ({ tx, data }) => { ... },
})

Complete example

import { commonAbis, evmDecoder, evmPortalSource } from '@subsquid/pipes/evm'
import { batchForInsert, drizzleTarget } from '@subsquid/pipes/targets/drizzle/node-postgres'
import { drizzle } from 'drizzle-orm/node-postgres'
import { integer, numeric, pgTable, primaryKey, varchar } from 'drizzle-orm/pg-core'

const transfersTable = pgTable('transfers', {
  blockNumber: integer().notNull(),
  logIndex:    integer().notNull(),
  from:        varchar({ length: 42 }).notNull(),
  to:          varchar({ length: 42 }).notNull(),
  value:       numeric({ mode: 'bigint' }).notNull(),
}, (t) => [primaryKey({ columns: [t.blockNumber, t.logIndex] })])

await evmPortalSource({
  portal: 'https://portal.sqd.dev/datasets/ethereum-mainnet',
  outputs: evmDecoder({
    range: { from: '0' },
    events: { transfers: commonAbis.erc20.events.Transfer },
  }),
}).pipeTo(
  drizzleTarget({
    db: drizzle('postgresql://postgres:postgres@localhost:5432/postgres'),
    tables: [transfersTable],
    onData: async ({ tx, data }) => {
      for (const batch of batchForInsert(data.transfers)) {
        await tx.insert(transfersTable).values(
          batch.map((d) => ({
            blockNumber: d.block.number,
            logIndex:    d.rawEvent.logIndex,
            from:        d.event.from,
            to:          d.event.to,
            value:       d.event.value,
          })),
        )
      }
    },
  }),
)
See the drizzleTarget reference for the full API.