Skip to main content

Factory Pattern

Index events from dynamically created contracts.
import { factory, factorySqliteDatabase } from "@subsquid/pipes/evm";
import { events as factoryAbi } from "./abi/uniswap-v3-factory";
import { events as poolAbi } from "./abi/uniswap-v3-pool";

const decoder = evmDecoder({
  range: { from: 12369621 },
  contracts: factory({
    address: "0x1f98431c8ad98523631ae4a59f267346ea31f984",
    event: factoryAbi.PoolCreated,
    parameter: "pool",
    database: factorySqliteDatabase({ path: "./pools.sqlite" }),
  }),
  events: {
    swap: poolAbi.Swap,
  },
});

Generating ABIs

Generate ABIs for both the factory contract and the pool contracts using @subsquid/evm-typegen:
npx @subsquid/evm-typegen src/abi abi/uniswap-v3-factory.json abi/uniswap-v3-pool.json
Using @subsquid/evm-typegen provides full type safety for your event data and eliminates the need to manually look up raw event signatures (topic0 hashes). The generated types ensure compile-time safety when working with factory patterns and decoded events.

How It Works

  1. Monitor factory contract for creation events
  2. Extract child contract addresses
  3. Store addresses in SQLite
  4. Decode events from all discovered contracts

Factory Database

const database = factorySqliteDatabase({
  path: "./uniswap-v3-pools.sqlite",
});

Composite Pipes

Process multiple data streams simultaneously.
async function main() {
const pipeline = source.pipeComposite({
  transfers: evmDecoder({
    range: { from: 20000000 },
    contracts: ["0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48"],
    events: { transfer: commonAbis.erc20.events.Transfer },
  }),
  swaps: evmDecoder({
    range: { from: 20000000 },
    contracts: ["0x88e6a0c2ddd26feeb64f039a2c41296fcb3f5640"],
    events: { swap: uniswapAbi.events.Swap },
  }),
});

for await (const { data } of pipeline) {
  console.log(`Transfers: ${data.transfers.transfer.length}`);
  console.log(`Swaps: ${data.swaps.swap.length}`);
}
}

void main()

Portal Caching

Cache Portal responses locally for faster iteration.
import { portalSqliteCache } from "@subsquid/pipes/portal-cache/node";

const source = evmPortalSource({
  portal: "https://portal.sqd.dev/datasets/ethereum-mainnet",
  cache: portalSqliteCache({
      path: "./portal-cache.sqlite",
    }),
});

When to Use

  • Development iteration
  • Testing pipelines
  • Repeated processing of same blocks

Custom Logging

The Pipes SDK uses a Pino-compatible logger, allowing you to integrate custom log transports and send logs to external services like GCP Cloud Logging, Sentry, or any other Pino-compatible destination.

Basic Custom Logger

Pass a custom logger to the source to configure logging for your entire pipeline.
import { createTarget } from "@subsquid/pipes";
import { evmPortalSource } from "@subsquid/pipes/evm";
import pino from "pino";

async function main() {
const transport = pino.transport({
  target: "pino-pretty",
  options: {
    colorize: true,
    translateTime: "HH:MM:ss",
  },
});

const source = evmPortalSource({
  portal: "https://portal.sqd.dev/datasets/ethereum-mainnet",
  logger: pino(transport),
});

const target = createTarget({
  write: async ({ logger, read }) => {
    for await (const { data } of read()) {
      logger.info({ count: data.length }, "Processed batch");
    }
  },
});

await source.pipeTo(target);
}

void main()

Integration with Cloud Services

You can use any Pino transport to send logs to cloud services. Pass the configured logger to the source.
import { createTarget } from "@subsquid/pipes";
import { evmPortalSource } from "@subsquid/pipes/evm";
import pino from "pino";

async function main() {
const transport = pino.transport({
  target: "@google-cloud/logging-pino",
  options: {
    projectId: "your-project-id",
    logName: "pipes-indexer",
  },
});

const source = evmPortalSource({
  portal: "https://portal.sqd.dev/datasets/ethereum-mainnet",
  logger: pino(transport),
});

const target = createTarget({
  write: async ({ logger, read }) => {
    for await (const { data } of read()) {
      logger.info(
        {
          blocksProcessed: data.blocks?.length,
          eventsCount: data.transfer?.length,
        },
        "Batch processed"
      );
    }
  },
});

await source.pipeTo(target);
}

void main()
The ctx.logger in transformers and targets is the same logger instance passed to the source. Configure logging at the source level, then use ctx.logger throughout your pipeline for consistent logging.

Custom Metrics

Track custom Prometheus metrics in your pipeline. The Pipes SDK exposes a metrics server that you can use to add counters, gauges, histograms, and summaries.
import { commonAbis, evmDecoder, evmPortalSource } from "@subsquid/pipes/evm";
import { metricsServer } from "@subsquid/pipes/metrics/node";

async function main() {
const stream = evmPortalSource({
  portal: "https://portal.sqd.dev/datasets/ethereum-mainnet",
  metrics: metricsServer({ port: 9090 }),
}).pipe(
  evmDecoder({
    range: { from: "latest" },
    events: {
      transfers: commonAbis.erc20.events.Transfer,
    },
  })
);

for await (const { data, ctx } of stream) {
  // Add custom counter metric
  ctx.metrics
    .counter({
      name: "my_transfers_counter",
      help: "Number of processed transactions",
    })
    .inc(data.transfers.length);

  // Use profiler to measure performance
  const span = ctx.profiler.start("processing");
  await processData(data);
  span.end();
}
}

void main()
Access metrics at http://localhost:9090/metrics:
# HELP my_transfers_counter Number of processed transactions
# TYPE my_transfers_counter counter
my_transfers_counter 218598

Available Metric Types

You can create different types of Prometheus metrics:
for await (const { data, ctx } of stream) {
  // Counter - monotonically increasing value
  ctx.metrics.counter({ name: "events_total", help: "Total events" }).inc();

  // Gauge - value that can go up or down
  ctx.metrics
    .gauge({ name: "queue_size", help: "Current queue size" })
    .set(queueSize);

  // Histogram - observations with configurable buckets
  ctx.metrics
    .histogram({ name: "batch_size", help: "Batch size distribution" })
    .observe(data.transfers.length);
}

Pipeline Monitoring UI

Visualize pipeline metrics and performance in real-time using Pipe UI, a web-based dashboard for monitoring your pipeline’s performance, profiling data, and metrics. To start the monitoring UI, run:
npx @sqd-pipes/pipe-ui@latest --open
This command launches the Pipe UI dashboard and automatically opens it in your browser. The dashboard connects to your pipe’s Prometheus metrics endpoint to display real-time metrics, profiling data, and pipeline status.
Pipe UI requires metrics to be exposed from your pipe. Enable profilers in your decoders and add metricsServer() to your source to make metrics accessible. See the Pipe UI reference for complete setup instructions.

RPC Latency Monitoring

Compare Portal data availability against external RPC providers to monitor relative latency. The evmRpcLatencyWatcher subscribes to RPC endpoints via WebSocket and measures when blocks arrive at the Portal versus when they appear at the RPC endpoints.
The measured values include client-side network latency. For RPC endpoints, only the arrival time of blocks is measured—this does not capture the node’s internal processing or response latency if queried directly. Results represent end-to-end delays as experienced by the client, not pure Portal or RPC processing performance.
import { formatBlock } from "@subsquid/pipes";
import { evmPortalSource, evmRpcLatencyWatcher } from "@subsquid/pipes/evm";
import { metricsServer } from "@subsquid/pipes/metrics/node";

async function main() {
const stream = evmPortalSource({
  portal: "https://portal.sqd.dev/datasets/base-mainnet",
  query: { from: "latest" },
  metrics: metricsServer({ port: 9090 }),
}).pipe(
  evmRpcLatencyWatcher({
    rpcUrl: ["https://base.drpc.org", "https://base-rpc.publicnode.com"],
  }).pipe((data, { metrics }) => {
    if (!data) return;

    // Update Prometheus metrics for each RPC endpoint
    for (const rpc of data.rpc) {
      metrics
        .gauge({
          name: "rpc_latency_ms",
          help: "RPC Latency in ms",
          labelNames: ["url"],
        })
        .set({ url: rpc.url }, rpc.portalDelayMs);
    }

    return data;
  })
);

for await (const { data } of stream) {
  if (!data) continue;

  console.log(`Block: ${formatBlock(data.number)} / ${data.timestamp}`);
  console.table(data.rpc);
}
}

void main()

Output Format

The latency data includes:
  • url: RPC endpoint URL
  • receivedAt: Timestamp when the RPC endpoint received the block
  • portalDelayMs: Milliseconds between RPC arrival and Portal availability
Block: 36,046,611 / Fri Sep 26 2025 14:29:29 GMT+0400
┌───┬─────────────────────────────────┬──────────────────────────┬───────────────┐
│   │ url                             │ receivedAt               │ portalDelayMs │
├───┼─────────────────────────────────┼──────────────────────────┼───────────────┤
│ 0 │ https://base.drpc.org           │ 2025-09-26T10:29:29.134Z │ 646           │
│ 1 │ https://base-rpc.publicnode.com │ 2025-09-26T10:29:29.130Z │ 642           │
└───┴─────────────────────────────────┴──────────────────────────┴───────────────┘

Pre-indexing Factories Experimental

Pre-populate factory database automatically using the _experimental_preindex option before running main pipe.
This is an experimental feature and may change in future versions. The pre-indexing request is limited and this approach won’t work for thousands of addresses. Use this feature only for small to medium-sized factory contract sets.
import {
  evmPortalSource,
  evmDecoder,
  factory,
  factorySqliteDatabase,
} from "@subsquid/pipes/evm";
import { events as factoryAbi } from "./abi/uniswap-v3-factory";
import { events as poolAbi } from "./abi/uniswap-v3-pool";

async function main() {
const stream = evmPortalSource({
  portal: "https://portal.sqd.dev/datasets/ethereum-mainnet",
}).pipe(
  evmDecoder({
    range: { from: "12,369,621", to: "12,410,000" },
    contracts: factory({
      address: "0x1f98431c8ad98523631ae4a59f267346ea31f984",
      event: factoryAbi.PoolCreated,
      parameter: "pool",
      _experimental_preindex: { from: "12,369,621", to: "12,400,000" },
      database: factorySqliteDatabase({ path: "./pools.sqlite" }),
    }),
    events: {
      swaps: poolAbi.Swap,
    },
  })
);

for await (const { data } of stream) {
  console.log(`Processed ${data.swaps.length} swaps`);
}
}

void main()

Combining Patterns

Combine multiple advanced patterns for production-ready pipelines. This example demonstrates how to integrate portal caching, custom metrics, profiling, composite pipes, and ClickHouse persistence together.
import { createClient } from "@clickhouse/client";
import {
  commonAbis,
  evmDecoder,
  evmPortalSource,
  factory,
  factorySqliteDatabase,
} from "@subsquid/pipes/evm";
import { portalSqliteCache } from "@subsquid/pipes/portal-cache/node";
import { clickhouseTarget } from "@subsquid/pipes/targets/clickhouse";
import { metricsServer } from "@subsquid/pipes/metrics/node";
import { events as factoryAbi } from "./abi/uniswap-v3-factory";
import { events as poolAbi } from "./abi/uniswap-v3-pool";

async function main() {
const client = createClient({
  url: "http://localhost:8123",
  username: "default",
  password: "default",
});

await evmPortalSource({
  portal: "https://portal.sqd.dev/datasets/ethereum-mainnet",
  cache: portalSqliteCache({ path: "./cache.sqlite" }),
  metrics: metricsServer({ port: 9090 }),
})
  .pipeComposite({
    transfers: evmDecoder({
      profiler: { id: "ERC20" },
      range: { from: 20000000 },
      contracts: ["0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48"],
      events: { transfer: commonAbis.erc20.events.Transfer },
    }),
    swaps: evmDecoder({
      profiler: { id: "Uniswap" },
      range: { from: 20000000 },
      contracts: factory({
        address: "0x1f98431c8ad98523631ae4a59f267346ea31f984",
        event: factoryAbi.PoolCreated,
        parameter: "pool",
        database: factorySqliteDatabase({ path: "./pools.sqlite" }),
      }),
      events: { swap: poolAbi.Swap },
    }),
  })
  .pipeTo(
    clickhouseTarget({
      client,
      onData: async ({ store, data, ctx }) => {
        // Track custom metrics
        ctx.metrics
          .counter({ name: "transfers_total", help: "Total transfers" })
          .inc(data.transfers.transfer.length);
        ctx.metrics
          .counter({ name: "swaps_total", help: "Total swaps" })
          .inc(data.swaps.swap.length);

        // Insert transfers
        await store.insert({
          table: "transfers",
          values: data.transfers.transfer.map((t) => ({
            block_number: t.block.number,
            from: t.from,
            to: t.to,
            value: t.value.toString(),
          })),
          format: "JSONEachRow",
        });

        // Insert swaps
        await store.insert({
          table: "swaps",
          values: data.swaps.swap.map((s) => ({
            block_number: s.block.number,
            amount0: s.amount0.toString(),
            amount1: s.amount1.toString(),
          })),
          format: "JSONEachRow",
        });
      },
      onRollback: async ({ store, safeCursor }) => {
        await store.removeAllRows({
          tables: ["transfers", "swaps"],
          where: `block_number > ${safeCursor.number}`,
        });
      },
    })
  );
}

void main()
This production example combines:
  • Portal caching: Local SQLite cache for faster development iteration
  • Custom metrics: Prometheus counters exposed on port 9090
  • Profiling: Performance analysis for each decoder stream
  • Composite pipes: Parallel processing of transfers and swaps
  • ClickHouse target: Production persistence with automatic rollback handling