Skip to main content
Integrate Drizzle ORM with Pipes SDK to store blockchain data in PostgreSQL with automatic cursor management and rollback support.

Overview

The Drizzle target provides:
  • Automatic cursor management: Tracks processing progress automatically
  • Rollback support: Handles blockchain forks with automatic data cleanup
  • Type-safe queries: Full TypeScript support with Drizzle ORM
  • Transaction management: All operations wrapped in database transactions
  • Schema migrations: Compatible with Drizzle Kit for schema management

Prerequisites

  • PostgreSQL database running (version 12+ recommended)
  • Node.js 22+ installed
  • Basic understanding of Drizzle ORM

Installation

npm install @subsquid/pipes drizzle-orm pg
npm install -D drizzle-kit @types/pg
If you plan to add a GraphQL API, also install: npm install @apollo/server

Basic Example

Index ERC20 Transfer events and store them in PostgreSQL.
1

Define your schema

Create a Drizzle schema for your data:
schema.ts
import { pgTable, serial, integer, varchar, timestamp } from "drizzle-orm/pg-core";

export const transfersTable = pgTable("transfers", {
  id: serial("id").primaryKey(),
  blockNumber: integer("block_number").notNull(),
  blockHash: varchar("block_hash", { length: 66 }).notNull(),
  from: varchar("from_address", { length: 42 }).notNull(),
  to: varchar("to_address", { length: 42 }).notNull(),
  // Store BigInt values as strings to avoid precision loss
  value: varchar("value", { length: 78 }).notNull(),
  timestamp: timestamp("timestamp").notNull(),
});

export type NewTransfer = typeof transfersTable.$inferInsert;
Use varchar for storing large numbers (like token amounts) to avoid precision loss. Drizzle will handle the conversion automatically.
2

Create the indexer

Set up the pipeline with Drizzle target:
index.ts
import { commonAbis, evmDecoder, evmPortalSource } from "@subsquid/pipes/evm";
import { drizzleTarget, chunk } from "@subsquid/pipes/targets/drizzle/node-postgres";
import { drizzle } from "drizzle-orm/node-postgres";
import pg from "pg";
import { transfersTable, type NewTransfer } from "./schema";

const { Pool } = pg;

async function main() {
  // Create PostgreSQL connection pool
  const pool = new Pool({
    host: "localhost",
    port: 5432,
    database: "postgres",
    user: "postgres",
    password: "postgres",
  });

  // Create Drizzle instance
  const db = drizzle(pool);

  await evmPortalSource({
    portal: "https://portal.sqd.dev/datasets/ethereum-mainnet",
  })
    .pipe(
      evmDecoder({
        range: { from: 20000000 },
        contracts: ["0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48"], // USDC
        events: {
          transfer: commonAbis.erc20.events.Transfer,
        },
      })
    )
    .pipeTo(
      drizzleTarget({
        db,
        tables: [transfersTable],
        onData: async ({ tx, data, ctx }) => {
          ctx.logger.info(`Processing ${data.transfer.length} transfers`);

          // Map and validate data
          const transferData = data.transfer.map((t): NewTransfer => {
            // Ensure valid timestamp (fallback to current time if invalid)
            const timestamp = t.block.timestamp > 0
              ? new Date(t.block.timestamp * 1000)
              : new Date();

            return {
              blockNumber: t.block.number,
              blockHash: t.block.hash,
              from: t.event.from,
              to: t.event.to,
              value: t.event.value.toString(),
              timestamp,
            };
          });

          // Process in chunks for better performance (100 records per batch)
          for (const values of chunk(transferData, 100)) {
            await tx.insert(transfersTable).values(values);
          }
        },
      })
    );
}

void main();
Always validate timestamps before inserting into the database. Blockchain timestamps can occasionally be 0 or invalid, which will cause database errors.
3

Run the indexer

Start indexing blockchain data:
npx tsx index.ts
The indexer will:
  1. Connect to PostgreSQL
  2. Start processing Transfer events
  3. Store decoded events in the database
  4. Automatically handle cursors and rollbacks
Make sure to set up your database schema first using one of the migration approaches described in the Schema Migrations section.

Configuration Options

drizzleTarget

drizzleTarget({
  db: drizzle(connectionString),
  tables: [table1, table2],
  onStart: async ({ db }) => {
    // Optional: Run migrations or other initialization tasks
  },
  onData: async ({ tx, data, ctx }) => {
    // Process and insert data
  },
})
Parameters:
  • db: Drizzle database instance (required)
  • tables: Array of tables for snapshot tracking (required)
  • onStart: Optional initialization callback for migrations or setup tasks
  • onData: Data processing callback (required)
All tables modified in onData MUST be listed in the tables array. This is required for automatic rollback support.

Chunking Data

Use the chunk helper to process large batches efficiently:
import { chunk } from "@subsquid/pipes/targets/drizzle/node-postgres";

// Default chunk size (1000 records)
for (const values of chunk(data.transfer)) {
  await tx.insert(transfersTable).values(values);
}

// Custom chunk size (100 records per batch)
for (const values of chunk(data.transfer, 100)) {
  await tx.insert(transfersTable).values(values);
}
Recommended chunk sizes:
  • 100-500 records: Good for most use cases and complex schemas
  • 1000+ records: Better for simple schemas with few columns
  • Test different sizes to find optimal performance for your setup

Advanced Examples

Multiple Tables

Handle multiple event types and tables:
import { pgTable, serial, integer, varchar } from "drizzle-orm/pg-core";

async function main() {
  const transfersTable = pgTable("transfers", {
    id: serial("id").primaryKey(),
    blockNumber: integer("block_number").notNull(),
    from: varchar("from_address").notNull(),
    to: varchar("to_address").notNull(),
    amount: varchar("amount").notNull(),
  });

  const approvalsTable = pgTable("approvals", {
    id: serial("id").primaryKey(),
    blockNumber: integer("block_number").notNull(),
    owner: varchar("owner_address").notNull(),
    spender: varchar("spender_address").notNull(),
    amount: varchar("amount").notNull(),
  });

  await evmPortalSource({ 
    portal: "https://portal.sqd.dev/datasets/ethereum-mainnet" 
  })
    .pipe(
      evmDecoder({
        range: { from: 20000000 },
        contracts: ["0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48"],
        events: {
          transfer: commonAbis.erc20.events.Transfer,
          approval: commonAbis.erc20.events.Approval,
        },
      })
    )
    .pipeTo(
      drizzleTarget({
        db: drizzle(DB_URL),
        tables: [transfersTable, approvalsTable],
        onData: async ({ tx, data }) => {
          // Insert transfers
          for (const values of chunk(data.transfer)) {
            await tx.insert(transfersTable).values(
              values.map((t) => ({
                blockNumber: t.block.number,
                from: t.event.from,
                to: t.event.to,
                amount: t.event.value.toString(),
              }))
            );
          }

          // Insert approvals
          for (const values of chunk(data.approval)) {
            await tx.insert(approvalsTable).values(
              values.map((a) => ({
                blockNumber: a.block.number,
                owner: a.event.owner,
                spender: a.event.spender,
                amount: a.event.value.toString(),
              }))
            );
          }
        },
      })
    );
}

void main();

GraphQL API

Expose indexed data via GraphQL using Drizzle GraphQL:
api.ts
import { ApolloServer } from "@apollo/server";
import { startStandaloneServer } from "@apollo/server/standalone";
import { buildSchema } from "drizzle-graphql";
import { drizzle } from "drizzle-orm/node-postgres";
import { transfersTable } from "./schema";

const DB_URL = "postgresql://postgres:postgres@localhost:5432/postgres";

async function startApi() {
  const db = drizzle({
    connection: DB_URL,
    schema: { transfersTable },
  });

  const { schema } = buildSchema(db);
  const server = new ApolloServer({ schema });
  const { url } = await startStandaloneServer(server, { port: 4000 });

  console.log(`🚀 GraphQL API ready at ${url}`);
}

void startApi();
Query your data:
{
  transfersTable(
    limit: 10
    orderBy: { blockNumber: { direction: desc, priority: 1 } }
  ) {
    blockNumber
    from
    to
    amount
    createdAt
  }
}

Custom Indexes

Add database indexes for better query performance. Define indexes in your Drizzle schema:
schema.ts
import { pgTable, serial, integer, varchar, index } from "drizzle-orm/pg-core";

export const transfersTable = pgTable(
  "transfers",
  {
    id: serial("id").primaryKey(),
    blockNumber: integer("block_number").notNull(),
    from: varchar("from_address").notNull(),
    to: varchar("to_address").notNull(),
    amount: varchar("amount").notNull(),
  },
  (table) => [
    index("idx_transfers_from").on(table.from),
    index("idx_transfers_to").on(table.to),
    index("idx_transfers_block").on(table.blockNumber),
  ]
);
All tables used with drizzleTarget must have a primary key. Use serial("id").primaryKey() for auto-incrementing IDs.
After updating your schema, generate and apply migrations:
npx drizzle-kit generate
npx drizzle-kit migrate

Rollback Handling

The Drizzle target automatically handles blockchain forks:
  1. Snapshot Creation: Before processing each batch, a snapshot is created
  2. Fork Detection: If a fork is detected, the target rolls back to the last valid state
  3. Data Cleanup: All data after the fork point is automatically removed
  4. Resume: Processing continues from the fork point
Rollbacks are handled automatically. You don’t need to implement custom rollback logic.

Schema Migrations

You can manage database schema migrations using two approaches: Use Drizzle Kit to manage migrations separately from your indexer. This is the recommended approach for production deployments.
1

Install Drizzle Kit

npm install -D drizzle-kit
2

Create drizzle.config.ts

drizzle.config.ts
import { defineConfig } from "drizzle-kit";

export default defineConfig({
  schema: "./schema.ts",
  out: "./drizzle",
  dialect: "postgresql",
  dbCredentials: {
    url: process.env.DATABASE_URL!,
  },
});
3

Generate migrations

npx drizzle-kit generate
This creates migration files in the ./drizzle directory based on your schema.
4

Apply migrations

npx drizzle-kit migrate
Run this command to apply migrations to your database before starting the indexer.
This approach provides clear separation between schema management and data processing, making it easier to track schema changes and collaborate with teams.

Option 2: Programmatic Migrations in onStart

Apply migrations automatically when the indexer starts using the onStart callback. This is useful for development or single-process deployments.
index.ts
import { drizzle } from "drizzle-orm/node-postgres";
import { migrate } from "drizzle-orm/node-postgres/migrator";
import { drizzleTarget } from "@subsquid/pipes/targets/drizzle/node-postgres";

const DB_URL = "postgresql://postgres:postgres@localhost:5432/postgres";

async function main() {
  await evmPortalSource({ portal: "..." })
    .pipe(evmDecoder({ /* ... */ }))
    .pipeTo(
      drizzleTarget({
        db: drizzle(DB_URL),
        tables: [transfersTable],
        onStart: async ({ db }) => {
          // Run migrations automatically on startup
          await migrate(db, { migrationsFolder: "./drizzle" });
        },
        onData: async ({ tx, data, ctx }) => {
          // Process data
        },
      })
    );
}

void main();
With this approach, migrations run automatically each time the indexer starts. Make sure migration files exist in the specified folder (generated using npx drizzle-kit generate).

When to Use Each Approach

Use Drizzle Kit CLI when:
  • Deploying to production environments
  • Working in a team with multiple developers
  • You need explicit control over when migrations run
  • Separating deployment concerns (database vs application)
Use programmatic migrations when:
  • Rapid development and iteration
  • Single-developer projects
  • Simplified deployment with automatic schema updates
  • Running in containerized environments where startup automation is preferred

Performance Tips

  1. Chunk Size: Start with 100-500 records per chunk and adjust based on testing
    // Test different sizes to find optimal performance
    for (const values of chunk(data.transfer, 100)) {
      await tx.insert(transfersTable).values(values);
    }
    
  2. Connection Pooling: Always use connection pooling for production
    import pg from "pg";
    import { drizzle } from "drizzle-orm/node-postgres";
    
    const { Pool } = pg;
    
    const pool = new Pool({
      host: "localhost",
      port: 5432,
      database: "mydb",
      user: "postgres",
      password: "password",
      max: 20, // Maximum pool size
      idleTimeoutMillis: 30000,
      connectionTimeoutMillis: 2000,
    });
    
    const db = drizzle(pool);
    
  3. Database Indexes: Add indexes on frequently queried columns
    import { index } from "drizzle-orm/pg-core";
    
    export const transfersTable = pgTable(
      "transfers",
      {
        blockNumber: integer("block_number").notNull(),
        from: varchar("from_address", { length: 42 }).notNull(),
        to: varchar("to_address", { length: 42 }).notNull(),
      },
      (table) => [
        index("idx_block_number").on(table.blockNumber),
        index("idx_from_address").on(table.from),
        index("idx_to_address").on(table.to),
      ]
    );
    
  4. Data Validation: Validate data before insertion to prevent database errors
    const transferData = data.transfer.map((t) => {
      // Validate timestamp
      const timestamp = t.block.timestamp > 0
        ? new Date(t.block.timestamp * 1000)
        : new Date();
    
      return {
        blockNumber: t.block.number,
        timestamp,
        // ... other fields
      };
    });
    
  5. Parallel Processing: Use composite pipes for multiple contracts to maximize throughput

Troubleshooting

Table Not in Snapshot List

Error: Table 'my_table' is not in the snapshot list Solution: Add the table to the tables array:
drizzleTarget({
  db: drizzle(DB_URL),
  tables: [myTable], // Add all modified tables here
  onData: async ({ tx, data }) => {
    await tx.insert(myTable).values(...);
  },
})

Connection Issues

Error: Connection refused Solution: Ensure PostgreSQL is running and connection string is correct:
# Check PostgreSQL status
pg_isready

# Test connection
psql postgresql://postgres:postgres@localhost:5432/postgres

Performance Issues

Symptoms: Slow inserts, high memory usage Solutions:
  • Adjust chunk size (test 100, 500, 1000 to find optimal size)
  • Add database indexes on frequently queried columns
  • Use connection pooling (see Performance Tips)
  • Monitor PostgreSQL performance with pg_stat_statements
  • Consider table partitioning for large datasets (>100M rows)

Invalid Timestamp Errors

Error: RangeError: Invalid time value Solution: Validate timestamps before insertion:
const timestamp = t.block.timestamp > 0
  ? new Date(t.block.timestamp * 1000)
  : new Date();

Type Conversion Issues

Error: Precision loss with large numbers Solution: Use varchar for large numbers:
// ✅ Correct - stores as string
value: varchar("value", { length: 78 }).notNull()

// ❌ Avoid - may lose precision
value: bigint("value", { mode: "number" })