Integrate Drizzle ORM with Pipes SDK to store blockchain data in PostgreSQL with automatic cursor management and rollback support.
Overview
The Drizzle target provides:
- Automatic cursor management: Tracks processing progress automatically
- Rollback support: Handles blockchain forks with automatic data cleanup
- Type-safe queries: Full TypeScript support with Drizzle ORM
- Transaction management: All operations wrapped in database transactions
- Schema migrations: Compatible with Drizzle Kit for schema management
Prerequisites
- PostgreSQL database running (version 12+ recommended)
- Node.js 22+ installed
- Basic understanding of Drizzle ORM
Installation
npm install @subsquid/pipes drizzle-orm pg
npm install -D drizzle-kit @types/pg
If you plan to add a GraphQL API, also install: npm install @apollo/server
Basic Example
Index ERC20 Transfer events and store them in PostgreSQL.
Define your schema
Create a Drizzle schema for your data:import { pgTable, serial, integer, varchar, timestamp } from "drizzle-orm/pg-core";
export const transfersTable = pgTable("transfers", {
id: serial("id").primaryKey(),
blockNumber: integer("block_number").notNull(),
blockHash: varchar("block_hash", { length: 66 }).notNull(),
from: varchar("from_address", { length: 42 }).notNull(),
to: varchar("to_address", { length: 42 }).notNull(),
// Store BigInt values as strings to avoid precision loss
value: varchar("value", { length: 78 }).notNull(),
timestamp: timestamp("timestamp").notNull(),
});
export type NewTransfer = typeof transfersTable.$inferInsert;
Use varchar for storing large numbers (like token amounts) to avoid precision loss. Drizzle will handle the conversion automatically.
Create the indexer
Set up the pipeline with Drizzle target:import { commonAbis, evmDecoder, evmPortalSource } from "@subsquid/pipes/evm";
import { drizzleTarget, chunk } from "@subsquid/pipes/targets/drizzle/node-postgres";
import { drizzle } from "drizzle-orm/node-postgres";
import pg from "pg";
import { transfersTable, type NewTransfer } from "./schema";
const { Pool } = pg;
async function main() {
// Create PostgreSQL connection pool
const pool = new Pool({
host: "localhost",
port: 5432,
database: "postgres",
user: "postgres",
password: "postgres",
});
// Create Drizzle instance
const db = drizzle(pool);
await evmPortalSource({
portal: "https://portal.sqd.dev/datasets/ethereum-mainnet",
})
.pipe(
evmDecoder({
range: { from: 20000000 },
contracts: ["0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48"], // USDC
events: {
transfer: commonAbis.erc20.events.Transfer,
},
})
)
.pipeTo(
drizzleTarget({
db,
tables: [transfersTable],
onData: async ({ tx, data, ctx }) => {
ctx.logger.info(`Processing ${data.transfer.length} transfers`);
// Map and validate data
const transferData = data.transfer.map((t): NewTransfer => {
// Ensure valid timestamp (fallback to current time if invalid)
const timestamp = t.block.timestamp > 0
? new Date(t.block.timestamp * 1000)
: new Date();
return {
blockNumber: t.block.number,
blockHash: t.block.hash,
from: t.event.from,
to: t.event.to,
value: t.event.value.toString(),
timestamp,
};
});
// Process in chunks for better performance (100 records per batch)
for (const values of chunk(transferData, 100)) {
await tx.insert(transfersTable).values(values);
}
},
})
);
}
void main();
Always validate timestamps before inserting into the database. Blockchain timestamps can occasionally be 0 or invalid, which will cause database errors.
Run the indexer
Start indexing blockchain data:The indexer will:
- Connect to PostgreSQL
- Start processing Transfer events
- Store decoded events in the database
- Automatically handle cursors and rollbacks
Make sure to set up your database schema first using one of the migration approaches described in the Schema Migrations section.
Configuration Options
drizzleTarget
drizzleTarget({
db: drizzle(connectionString),
tables: [table1, table2],
onStart: async ({ db }) => {
// Optional: Run migrations or other initialization tasks
},
onData: async ({ tx, data, ctx }) => {
// Process and insert data
},
})
Parameters:
db: Drizzle database instance (required)
tables: Array of tables for snapshot tracking (required)
onStart: Optional initialization callback for migrations or setup tasks
onData: Data processing callback (required)
All tables modified in onData MUST be listed in the tables array. This is required for automatic rollback support.
Chunking Data
Use the chunk helper to process large batches efficiently:
import { chunk } from "@subsquid/pipes/targets/drizzle/node-postgres";
// Default chunk size (1000 records)
for (const values of chunk(data.transfer)) {
await tx.insert(transfersTable).values(values);
}
// Custom chunk size (100 records per batch)
for (const values of chunk(data.transfer, 100)) {
await tx.insert(transfersTable).values(values);
}
Recommended chunk sizes:
- 100-500 records: Good for most use cases and complex schemas
- 1000+ records: Better for simple schemas with few columns
- Test different sizes to find optimal performance for your setup
Advanced Examples
Multiple Tables
Handle multiple event types and tables:
import { pgTable, serial, integer, varchar } from "drizzle-orm/pg-core";
async function main() {
const transfersTable = pgTable("transfers", {
id: serial("id").primaryKey(),
blockNumber: integer("block_number").notNull(),
from: varchar("from_address").notNull(),
to: varchar("to_address").notNull(),
amount: varchar("amount").notNull(),
});
const approvalsTable = pgTable("approvals", {
id: serial("id").primaryKey(),
blockNumber: integer("block_number").notNull(),
owner: varchar("owner_address").notNull(),
spender: varchar("spender_address").notNull(),
amount: varchar("amount").notNull(),
});
await evmPortalSource({
portal: "https://portal.sqd.dev/datasets/ethereum-mainnet"
})
.pipe(
evmDecoder({
range: { from: 20000000 },
contracts: ["0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48"],
events: {
transfer: commonAbis.erc20.events.Transfer,
approval: commonAbis.erc20.events.Approval,
},
})
)
.pipeTo(
drizzleTarget({
db: drizzle(DB_URL),
tables: [transfersTable, approvalsTable],
onData: async ({ tx, data }) => {
// Insert transfers
for (const values of chunk(data.transfer)) {
await tx.insert(transfersTable).values(
values.map((t) => ({
blockNumber: t.block.number,
from: t.event.from,
to: t.event.to,
amount: t.event.value.toString(),
}))
);
}
// Insert approvals
for (const values of chunk(data.approval)) {
await tx.insert(approvalsTable).values(
values.map((a) => ({
blockNumber: a.block.number,
owner: a.event.owner,
spender: a.event.spender,
amount: a.event.value.toString(),
}))
);
}
},
})
);
}
void main();
GraphQL API
Expose indexed data via GraphQL using Drizzle GraphQL:
import { ApolloServer } from "@apollo/server";
import { startStandaloneServer } from "@apollo/server/standalone";
import { buildSchema } from "drizzle-graphql";
import { drizzle } from "drizzle-orm/node-postgres";
import { transfersTable } from "./schema";
const DB_URL = "postgresql://postgres:postgres@localhost:5432/postgres";
async function startApi() {
const db = drizzle({
connection: DB_URL,
schema: { transfersTable },
});
const { schema } = buildSchema(db);
const server = new ApolloServer({ schema });
const { url } = await startStandaloneServer(server, { port: 4000 });
console.log(`🚀 GraphQL API ready at ${url}`);
}
void startApi();
Query your data:
{
transfersTable(
limit: 10
orderBy: { blockNumber: { direction: desc, priority: 1 } }
) {
blockNumber
from
to
amount
createdAt
}
}
Custom Indexes
Add database indexes for better query performance. Define indexes in your Drizzle schema:
import { pgTable, serial, integer, varchar, index } from "drizzle-orm/pg-core";
export const transfersTable = pgTable(
"transfers",
{
id: serial("id").primaryKey(),
blockNumber: integer("block_number").notNull(),
from: varchar("from_address").notNull(),
to: varchar("to_address").notNull(),
amount: varchar("amount").notNull(),
},
(table) => [
index("idx_transfers_from").on(table.from),
index("idx_transfers_to").on(table.to),
index("idx_transfers_block").on(table.blockNumber),
]
);
All tables used with drizzleTarget must have a primary key. Use serial("id").primaryKey() for auto-incrementing IDs.
After updating your schema, generate and apply migrations:
npx drizzle-kit generate
npx drizzle-kit migrate
Rollback Handling
The Drizzle target automatically handles blockchain forks:
- Snapshot Creation: Before processing each batch, a snapshot is created
- Fork Detection: If a fork is detected, the target rolls back to the last valid state
- Data Cleanup: All data after the fork point is automatically removed
- Resume: Processing continues from the fork point
Rollbacks are handled automatically. You don’t need to implement custom rollback logic.
Schema Migrations
You can manage database schema migrations using two approaches:
Option 1: Drizzle Kit CLI (Recommended)
Use Drizzle Kit to manage migrations separately from your indexer. This is the recommended approach for production deployments.
Install Drizzle Kit
npm install -D drizzle-kit
Create drizzle.config.ts
import { defineConfig } from "drizzle-kit";
export default defineConfig({
schema: "./schema.ts",
out: "./drizzle",
dialect: "postgresql",
dbCredentials: {
url: process.env.DATABASE_URL!,
},
});
Generate migrations
This creates migration files in the ./drizzle directory based on your schema. Apply migrations
Run this command to apply migrations to your database before starting the indexer.
This approach provides clear separation between schema management and data processing, making it easier to track schema changes and collaborate with teams.
Option 2: Programmatic Migrations in onStart
Apply migrations automatically when the indexer starts using the onStart callback. This is useful for development or single-process deployments.
import { drizzle } from "drizzle-orm/node-postgres";
import { migrate } from "drizzle-orm/node-postgres/migrator";
import { drizzleTarget } from "@subsquid/pipes/targets/drizzle/node-postgres";
const DB_URL = "postgresql://postgres:postgres@localhost:5432/postgres";
async function main() {
await evmPortalSource({ portal: "..." })
.pipe(evmDecoder({ /* ... */ }))
.pipeTo(
drizzleTarget({
db: drizzle(DB_URL),
tables: [transfersTable],
onStart: async ({ db }) => {
// Run migrations automatically on startup
await migrate(db, { migrationsFolder: "./drizzle" });
},
onData: async ({ tx, data, ctx }) => {
// Process data
},
})
);
}
void main();
With this approach, migrations run automatically each time the indexer starts. Make sure migration files exist in the specified folder (generated using npx drizzle-kit generate).
When to Use Each Approach
Use Drizzle Kit CLI when:
- Deploying to production environments
- Working in a team with multiple developers
- You need explicit control over when migrations run
- Separating deployment concerns (database vs application)
Use programmatic migrations when:
- Rapid development and iteration
- Single-developer projects
- Simplified deployment with automatic schema updates
- Running in containerized environments where startup automation is preferred
-
Chunk Size: Start with 100-500 records per chunk and adjust based on testing
// Test different sizes to find optimal performance
for (const values of chunk(data.transfer, 100)) {
await tx.insert(transfersTable).values(values);
}
-
Connection Pooling: Always use connection pooling for production
import pg from "pg";
import { drizzle } from "drizzle-orm/node-postgres";
const { Pool } = pg;
const pool = new Pool({
host: "localhost",
port: 5432,
database: "mydb",
user: "postgres",
password: "password",
max: 20, // Maximum pool size
idleTimeoutMillis: 30000,
connectionTimeoutMillis: 2000,
});
const db = drizzle(pool);
-
Database Indexes: Add indexes on frequently queried columns
import { index } from "drizzle-orm/pg-core";
export const transfersTable = pgTable(
"transfers",
{
blockNumber: integer("block_number").notNull(),
from: varchar("from_address", { length: 42 }).notNull(),
to: varchar("to_address", { length: 42 }).notNull(),
},
(table) => [
index("idx_block_number").on(table.blockNumber),
index("idx_from_address").on(table.from),
index("idx_to_address").on(table.to),
]
);
-
Data Validation: Validate data before insertion to prevent database errors
const transferData = data.transfer.map((t) => {
// Validate timestamp
const timestamp = t.block.timestamp > 0
? new Date(t.block.timestamp * 1000)
: new Date();
return {
blockNumber: t.block.number,
timestamp,
// ... other fields
};
});
-
Parallel Processing: Use composite pipes for multiple contracts to maximize throughput
Troubleshooting
Table Not in Snapshot List
Error: Table 'my_table' is not in the snapshot list
Solution: Add the table to the tables array:
drizzleTarget({
db: drizzle(DB_URL),
tables: [myTable], // Add all modified tables here
onData: async ({ tx, data }) => {
await tx.insert(myTable).values(...);
},
})
Connection Issues
Error: Connection refused
Solution: Ensure PostgreSQL is running and connection string is correct:
# Check PostgreSQL status
pg_isready
# Test connection
psql postgresql://postgres:postgres@localhost:5432/postgres
Symptoms: Slow inserts, high memory usage
Solutions:
- Adjust chunk size (test 100, 500, 1000 to find optimal size)
- Add database indexes on frequently queried columns
- Use connection pooling (see Performance Tips)
- Monitor PostgreSQL performance with
pg_stat_statements
- Consider table partitioning for large datasets (>100M rows)
Invalid Timestamp Errors
Error: RangeError: Invalid time value
Solution: Validate timestamps before insertion:
const timestamp = t.block.timestamp > 0
? new Date(t.block.timestamp * 1000)
: new Date();
Type Conversion Issues
Error: Precision loss with large numbers
Solution: Use varchar for large numbers:
// ✅ Correct - stores as string
value: varchar("value", { length: 78 }).notNull()
// ❌ Avoid - may lose precision
value: bigint("value", { mode: "number" })