Skip to main content

Core Functions

evmPortalSource

Create a Portal source for EVM chains.
evmPortalSource(config: EvmPortalSourceConfig): Source
Parameters:
  • portal: Portal API URL or config object (required)
  • query: EvmQueryBuilder instance (optional)
  • cursor: Resume from block { number: number } (optional)
  • cache: Portal cache instance (optional)
  • logger: Logger instance or false/'silent' to disable (optional)
  • metrics: MetricsServer for monitoring (optional)
  • progress: ProgressTrackerOptions for progress tracking (optional)
Example:
const source = evmPortalSource({
  portal: "https://portal.sqd.dev/datasets/ethereum-mainnet",
  query: queryBuilder,
  cursor: { number: 20000000 },
  cache: portalSqliteCache({ path: "./cache.sqlite" }),
});

evmDecoder

Decode smart contract events as a pipe.
evmDecoder<T>(config: EvmDecoderConfig<T>): Decoder<T>
Parameters:
  • range: Block range { from: number | 'latest', to?: number } (required)
  • contracts: Array of contract addresses or factory (required)
  • events: Map of event names to ABI objects (required)
  • profiler: Profiler config { id: string } (optional)
  • onError: Error handler (ctx, error) => unknown (optional, default: throws error)
Example:
const decoder = evmDecoder({
  range: { from: 20000000, to: 20100000 },
  contracts: ["0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48"],
  events: {
    transfer: commonAbis.erc20.events.Transfer,
  },
});
Use @subsquid/evm-typegen to generate typed ABIs for custom contracts. This provides type safety and eliminates the need to manually find event signatures. See the Event Decoding guide for details on generating and using custom ABIs.

createTransformer

Transform data in pipes.
createTransformer<I, O>(config: TransformerConfig<I, O>): Transformer<I, O>
Parameters:
  • transform: Transform function async (data: I, ctx) => O (required)
  • query: Query callback ({queryBuilder, portal, logger}) => void (optional)
Example:
const transformer = createTransformer({
  query: ({ queryBuilder }) => {
    queryBuilder.addLog({
      request: { address: ["0x..."] },
      range: { from: 20000000 },
    });
  },
  transform: async (data) => {
    return data.blocks.map((b) => b.logs);
  },
});

createTarget

Create a data sink.
createTarget(config: TargetConfig): Target
Parameters:
  • write: Write function async ({ctx, read}) => void (required)
  • fork: Fork handler async (previousBlocks: BlockCursor[]) => BlockCursor | null (optional)
Example:
const target = createTarget({
  write: async ({ logger, read }) => {
    for await (const { data, ctx } of read()) {
      const span = ctx.profiler.start('save');
      await database.save(data);
      span.end();
    }
  },
  fork: async (previousBlocks) => {
    // Find common ancestor and rollback
    const rollbackTo = previousBlocks[0];
    if (rollbackTo) {
      await database.deleteAfter(rollbackTo.number);
      return rollbackTo;
    }
    return null;
  },
});

clickhouseTarget

ClickHouse target with automatic cursor management.
clickhouseTarget(config: ClickhouseTargetConfig): Target
Parameters:
  • client: ClickHouse client instance (required)
  • settings: Configuration options (optional)
  • onStart: Initialization handler async ({store, logger}) => void (optional)
  • onData: Data handler async ({store, data, ctx}) => void (required)
  • onRollback: Rollback handler async ({store, safeCursor, type}) => void (optional)
Example:
const target = clickhouseTarget({
  client,
  onData: async ({store, data}) => {
    store.insert({
      table: 'transfers',
      values: data.transfer.map(t => ({...})),
      format: 'JSONEachRow'
    })
  },
  onRollback: async ({store, safeCursor}) => {
    await store.removeAllRows({
      tables: ['transfers'],
      where: `block_number > ${safeCursor.number}`
    })
  }
})

factory

Track dynamically created contracts.
factory(config: FactoryConfig): Factory
Parameters:
  • address: Factory contract address or array of addresses (required)
  • event: Factory creation event ABI (required)
  • parameter: Extract child address (event) => string (required)
  • database: Factory database instance (required)
Example:
const factoryInstance = factory({
  address: "0x1f98431c8ad98523631ae4a59f267346ea31f984",
  event: factoryAbi.events.PoolCreated,
  parameter: "pool",
  database: factorySqliteDatabase({ path: "./pools.sqlite" }),
});
The address parameter now supports multiple factory addresses. Pass an array to track contracts from multiple factories.

Query Builder

EvmQueryBuilder

Build queries for Portal API.
class EvmQueryBuilder {
  addFields(fields: FieldSelection): this;
  addLog(config: LogRequest): this;
  addTransaction(config: TransactionRequest): this;
  addTrace(config: TraceRequest): this;
  addStateDiff(config: StateDiffRequest): this;
}

addFields

Select fields to return.
addFields({
  block: { number: true, hash: true, timestamp: true },
  log: { address: true, topics: true, data: true },
  transaction: { hash: true, from: true, to: true, value: true },
});

addLog

Filter event logs.
addLog({
  request: {
    address: ["0x..."],
    topic0: ["0x..."],
    topic1: ["0x..."], // optional
  },
  range: { from: 20000000, to: 20000100 },
});

addTransaction

Filter transactions.
addTransaction({
  request: {
    from: ["0x..."],
    to: ["0x..."],
    sighash: ["0xa9059cbb"],
  },
  range: { from: 20000000 },
});

Utilities

factorySqliteDatabase

Create SQLite factory database.
factorySqliteDatabase(config: { path: string }): FactoryDatabase
Example:
const db = factorySqliteDatabase({ path: "./factories.sqlite" });
No longer requires await - returns the database instance synchronously.

portalSqliteCache

Create SQLite cache for Portal responses.
portalSqliteCache(config: { path: string }): PortalCache
Example:
import { portalSqliteCache } from "@subsquid/pipes/portal-cache/node";

const cache = portalSqliteCache({ path: "./cache.sqlite" });
Import from @subsquid/pipes/portal-cache/node instead of @subsquid/pipes/portal-cache.

commonAbis

Pre-defined ABIs for standard contracts.
import { commonAbis } from "@subsquid/pipes/evm";

// ERC20 Events
commonAbis.erc20.events.Transfer;
commonAbis.erc20.events.Approval;

// ERC20 Functions
commonAbis.erc20.functions.transfer;
commonAbis.erc20.functions.approve;
commonAbis.erc20.functions.name;
commonAbis.erc20.functions.symbol;
commonAbis.erc20.functions.decimals;
commonAbis.erc20.functions.balanceOf;
commonAbis.erc20.functions.totalSupply;
commonAbis.erc20.functions.allowance;
Currently, only ERC20 ABIs are available in commonAbis. For ERC721 and other token standards, use @subsquid/evm-typegen to generate custom ABIs from your contract JSON files.

Types

DecodedEvent

interface DecodedEvent<T, F = unknown> {
  event: T;
  contract: string;
  block: {
    number: number;
    hash: string;
  };
  timestamp: Date;
  rawEvent: {
    address: string;
    data: string;
    topics: string[];
    transactionHash: string;
    logIndex: number;
    transactionIndex: number;
  };
  factory?: {
    contract: string;
    blockNumber: number;
    event: F;
  };
}
The block property contains the block number and hash as a nested object. Transaction-related information like transactionHash, logIndex, and transactionIndex are available in the rawEvent property. For factory-created contracts, the factory field contains the child contract address, the block number where it was created, and the decoded factory creation event.

EvmPortalData

interface EvmPortalData<Q> {
  blocks: Array<{
    header: {
      number: number;
      hash: string;
      timestamp: number;
      // ... other fields
    };
    logs: Array<{
      address: string;
      topics: string[];
      data: string;
      transactionHash: string;
      logIndex: number;
    }>;
    transactions: Array<{
      hash: string;
      from: string;
      to: string;
      value: bigint;
      // ... other fields
    }>;
  }>;
}

Context

interface Context {
  logger: Logger;
  profiler: Profiler;
}

Logger

interface Logger {
  info(message: string | object, ...args: any[]): void;
  warn(message: string | object, ...args: any[]): void;
  error(message: string | object, ...args: any[]): void;
}

Profiler

interface Profiler {
  start(name: string): Profiler;
  measure<T>(name: string, fn: (span: Profiler) => Promise<T>): Promise<T>;
  addLabels(labels: string | string[]): Profiler;
  end(): Profiler;
  elapsed: number;
  children: Profiler[];
}

Pipe Methods

pipe

Chain a transformer.
source.pipe(transformer);

pipeComposite

Chain multiple transformers.
source.pipeComposite({
  transfers: decoder1,
  swaps: decoder2,
});

pipeTo

Connect to a target.
source.pipeTo(target);

Complete Type Example

import type {
  Source,
  Transformer,
  Target,
  Context,
  DecodedEvent,
} from "@subsquid/pipes";

import type {
  EvmPortalData,
  EvmPortalSourceConfig,
  EvmDecoderConfig,
  EvmQueryBuilder,
} from "@subsquid/pipes/evm";

import type {
  ClickhouseTargetConfig,
  Store,
} from "@subsquid/pipes/targets/clickhouse";