Skip to main content

createTarget

Create a data sink.
createTarget(config: TargetConfig): Target
Parameters:
  • write: Write function async ({ctx, read}) => void (required)
  • fork: Fork handler async (previousBlocks: BlockCursor[]) => BlockCursor | null (optional)
Example:
const target = createTarget({
  write: async ({ logger, read }) => {
    for await (const { data, ctx } of read()) {
      const span = ctx.profiler.start('save');
      await database.save(data);
      span.end();
    }
  },
  onRollback: async ({ cursor }) => {
    await database.deleteAfter(cursor.blockNumber);
  },
});
You can avoid implementing onRollback handlers by configuring your source to use only finalized blocks. See Using Finalized Blocks for details.

clickhouseTarget

ClickHouse target with automatic cursor management.
clickhouseTarget(config: ClickhouseTargetConfig): Target
Parameters:
  • client: ClickHouse client instance (required)
  • settings: Configuration options (optional)
  • onStart: Initialization handler async ({store, logger}) => void (optional)
  • onData: Data handler async ({store, data, ctx}) => void (required)
  • onRollback: Rollback handler async ({store, safeCursor, type}) => void (optional)
    • type: 'offset_check' | 'blockchain_fork'
    • safeCursor: The safe block cursor to roll back to
    • cursor: Deprecated, use safeCursor instead
Example:
import { clickhouseTarget } from "@subsquid/pipes/targets/clickhouse";

const target = clickhouseTarget({
  client,
  onData: async ({store, data}) => {
    store.insert({
      table: 'transfers',
      values: data.transfer.map(t => ({...})),
      format: 'JSONEachRow'
    })
  },
  onRollback: async ({store, safeCursor}) => {
    await store.removeAllRows({
      tables: ['transfers'],
      where: `block_number > ${safeCursor.number}`
    })
  }
})
You can simplify your implementation by using finalized blocks, which eliminates the need for onRollback handlers. See Using Finalized Blocks.

drizzleTarget

Drizzle ORM target for PostgreSQL with automatic cursor management and rollback support.
drizzleTarget(config: DrizzleTargetConfig): Target
Parameters:
  • db: Drizzle database instance (required)
  • tables: Array of tables used for snapshot tracking (required)
  • onStart: Initialization handler async ({db}) => void (optional)
  • onData: Data handler async ({tx, data, ctx}) => void (required)
  • onBeforeRollback: Pre-rollback handler async ({tx, cursor}) => void (optional)
  • onAfterRollback: Post-rollback handler async ({tx, cursor}) => void (optional)
  • settings: Configuration object (optional)
    • state: State options for cursor tracking
    • transaction.isolationLevel: 'read uncommitted' | 'read committed' | 'repeatable read' | 'serializable'
Example:
import { drizzleTarget } from "@subsquid/pipes/targets/drizzle/node-postgres";
import { drizzle } from "drizzle-orm/node-postgres";

const target = drizzleTarget({
  db: drizzle("postgresql://user:pass@localhost:5432/db"),
  tables: [transfersTable],
  onStart: async ({ db }) => {
    // Initialize schema
    await db.execute(`CREATE TABLE IF NOT EXISTS transfers (...)`);
  },
  onData: async ({ tx, data, ctx }) => {
    await tx.insert(transfersTable).values(
      data.transfers.map(t => ({
        blockNumber: t.blockNumber,
        from: t.event.from,
        to: t.event.to,
        amount: t.event.value,
      }))
    );
  },
});
The Drizzle target automatically handles cursors and rollbacks. See the Postgres (Drizzle) Integration guide for complete examples.

Context & Utilities

Types and utilities available in target handlers.

Context

The context object available in target write handlers.
interface Context {
  logger: Logger;
  profiler: Profiler;
}

Logger

The logger interface is Pino-compatible, allowing integration with any Pino transport for sending logs to external services like GCP Cloud Logging, Sentry, or other monitoring platforms.
interface Logger {
  info(message: string | object, ...args: any[]): void;
  warn(message: string | object, ...args: any[]): void;
  error(message: string | object, ...args: any[]): void;
}
You can use the logger as a drop-in replacement for standard Pino loggers. See the Custom Logging section for integration examples.

Profiler

Track performance metrics for pipeline operations.
interface Profiler {
  start(name: string): Profiler;
  measure<T>(name: string, fn: (span: Profiler) => Promise<T>): Promise<T>;
  addLabels(labels: string | string[]): Profiler;
  end(): Profiler;
  elapsed: number;
  children: Profiler[];
}
Use measure() for convenient async timing:
const result = await profiler.measure('db-query', async (span) => {
  return await database.query('SELECT * FROM transfers');
});
Example:
const target = createTarget({
  write: async ({ read }) => {
    for await (const { data, ctx } of read()) {
      const span = ctx.profiler.start('processing');
      await processData(data);
      span.end();
    }
  }
})

BlockCursor

Cursor format for resuming indexing from a specific block.
interface BlockCursor {
  number: number;
  hash?: string;
  timestamp?: number;
}
Example:
const target = createTarget({
  write: async ({ read }) => {
    // Resume from block 20000000
    for await (const { data } of read({ number: 20000000 })) {
      await processData(data);
    }
  }
})

Next Steps