Skip to main content

Documentation Index

Fetch the complete documentation index at: https://beta.docs.sqd.dev/llms.txt

Use this file to discover all available pages before exploring further.

Install Drizzle ORM and the PostgreSQL driver:
npm install drizzle-orm pg
npm install -D drizzle-kit @types/pg
At a glance, the pipeline looks like this:
await solanaPortalSource({ ... }).pipeTo(
  drizzleTarget({
    db: drizzle('postgresql://...'),
    tables: [swapsTable],
    onData: async ({ tx, data }) => {
      for (const batch of batchForInsert(data.swap)) {
        await tx.insert(swapsTable).values(batch.map(...))
      }
    },
  }),
)

Schema

Define your tables with Drizzle ORM. Every table needs a primary key, and every table written to in onData must appear in tables.
import { bigint, integer, pgTable, primaryKey, varchar } from 'drizzle-orm/pg-core'

const swapsTable = pgTable('swaps', {
  slot:             bigint({ mode: 'bigint' }).notNull(),
  instructionIndex: integer().notNull(),
  programId:        varchar().notNull(),
}, (t) => [primaryKey({ columns: [t.slot, t.instructionIndex] })])

onData and batchForInsert

onData runs inside a serializable transaction. Use batchForInsert to split data arrays into chunks that fit within PostgreSQL’s 32,767-parameter limit — chunk size is calculated automatically from the number of columns:
import { batchForInsert, drizzleTarget } from '@subsquid/pipes/targets/drizzle/node-postgres'

onData: async ({ tx, data }) => {
  for (const batch of batchForInsert(data.swap)) {
    await tx.insert(swapsTable).values(
      batch.map((d) => ({
        slot:             BigInt(d.block.slot),
        instructionIndex: d.instruction.instructionIndex,
        programId:        d.programId,
      })),
    )
  }
}
Pass an explicit second argument to batchForInsert to cap chunk size:
for (const batch of batchForInsert(data.transfers, 100)) { ... }

tables

Every table written to in onData must be listed in tables. At startup, the target installs PostgreSQL trigger functions on these tables to track row-level changes for automatic fork handling. Inserting into an unlisted table throws at runtime.

Schema migrations

Use Drizzle Kit to generate and apply migrations:
npx drizzle-kit generate
npx drizzle-kit migrate
Alternatively, run migrations automatically on startup via onStart:
import { migrate } from 'drizzle-orm/node-postgres/migrator'

drizzleTarget({
  db,
  tables: [...],
  onStart: async ({ db }) => {
    await migrate(db, { migrationsFolder: './drizzle' })
  },
  onData: async ({ tx, data }) => { ... },
})

Rollback handling

Fork handling is fully automatic. Each batch runs inside a transaction that snapshots row-level changes. When the stream detects a fork, the target replays those snapshots in reverse to restore the pre-fork state. Use onBeforeRollback and onAfterRollback to run custom logic around a rollback. Both callbacks receive the Drizzle transaction and the cursor (BlockCursor) to which state was rolled back:
drizzleTarget({
  db,
  tables: [...],
  onBeforeRollback: async ({ tx, cursor }) => { /* e.g. log or acquire an external lock */ },
  onAfterRollback:  async ({ tx, cursor }) => { /* e.g. invalidate a cache */ },
  onData: async ({ tx, data }) => { ... },
})

Complete example

import { solanaInstructionDecoder, solanaPortalSource } from '@subsquid/pipes/solana'
import { batchForInsert, drizzleTarget } from '@subsquid/pipes/targets/drizzle/node-postgres'
import { drizzle } from 'drizzle-orm/node-postgres'
import { bigint, integer, pgTable, primaryKey, varchar } from 'drizzle-orm/pg-core'

const swapsTable = pgTable('swaps', {
  slot:             bigint({ mode: 'bigint' }).notNull(),
  instructionIndex: integer().notNull(),
  programId:        varchar().notNull(),
}, (t) => [primaryKey({ columns: [t.slot, t.instructionIndex] })])

await solanaPortalSource({
  portal: 'https://portal.sqd.dev/datasets/solana-mainnet',
  outputs: solanaInstructionDecoder({
    range: { from: '340000000' },
    programId: '...',
    instructions: { swap: swapInstruction },
  }),
}).pipeTo(
  drizzleTarget({
    db: drizzle('postgresql://postgres:postgres@localhost:5432/postgres'),
    tables: [swapsTable],
    onData: async ({ tx, data }) => {
      for (const batch of batchForInsert(data.swap)) {
        await tx.insert(swapsTable).values(
          batch.map((d) => ({
            slot:             BigInt(d.block.slot),
            instructionIndex: d.instruction.instructionIndex,
            programId:        d.programId,
          })),
        )
      }
    },
  }),
)
See the drizzleTarget reference for the full API.