Factory Pattern
Index events from dynamically created contracts.
import { factory, factorySqliteDatabase } from "@subsquid/pipes/evm";
import { events as factoryAbi } from "./abi/uniswap-v3-factory";
import { events as poolAbi } from "./abi/uniswap-v3-pool";
const decoder = evmDecoder({
range: { from: 12369621 },
contracts: factory({
address: "0x1f98431c8ad98523631ae4a59f267346ea31f984",
event: factoryAbi.PoolCreated,
parameter: "pool",
database: factorySqliteDatabase({ path: "./pools.sqlite" }),
}),
events: {
swap: poolAbi.Swap,
},
});
Generating ABIs
Generate ABIs for both the factory contract and the pool contracts using @subsquid/evm-typegen:
npx @subsquid/evm-typegen src/abi abi/uniswap-v3-factory.json abi/uniswap-v3-pool.json
Using @subsquid/evm-typegen provides full type safety for your event data and eliminates the need to manually look up raw event signatures (topic0 hashes). The generated types ensure compile-time safety when working with factory patterns and decoded events.
How It Works
- Monitor factory contract for creation events
- Extract child contract addresses
- Store addresses in SQLite
- Decode events from all discovered contracts
Factory Database
const database = factorySqliteDatabase({
path: "./uniswap-v3-pools.sqlite",
});
Composite Pipes
Process multiple data streams simultaneously.
async function main() {
const pipeline = source.pipeComposite({
transfers: evmDecoder({
range: { from: 20000000 },
contracts: ["0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48"],
events: { transfer: commonAbis.erc20.events.Transfer },
}),
swaps: evmDecoder({
range: { from: 20000000 },
contracts: ["0x88e6a0c2ddd26feeb64f039a2c41296fcb3f5640"],
events: { swap: uniswapAbi.events.Swap },
}),
});
for await (const { data } of pipeline) {
console.log(`Transfers: ${data.transfers.transfer.length}`);
console.log(`Swaps: ${data.swaps.swap.length}`);
}
}
void main()
Portal Caching
Cache Portal responses locally for faster iteration.
import { portalSqliteCache } from "@subsquid/pipes/portal-cache/node";
const source = evmPortalSource({
portal: "https://portal.sqd.dev/datasets/ethereum-mainnet",
cache: portalSqliteCache({
path: "./portal-cache.sqlite",
}),
});
When to Use
- Development iteration
- Testing pipelines
- Repeated processing of same blocks
Custom Logging
The Pipes SDK uses a Pino-compatible logger, allowing you to integrate custom log transports and send logs to external services like GCP Cloud Logging, Sentry, or any other Pino-compatible destination.
Basic Custom Logger
Pass a custom logger to the source to configure logging for your entire pipeline.
import { createTarget } from "@subsquid/pipes";
import { evmPortalSource } from "@subsquid/pipes/evm";
import pino from "pino";
async function main() {
const transport = pino.transport({
target: "pino-pretty",
options: {
colorize: true,
translateTime: "HH:MM:ss",
},
});
const source = evmPortalSource({
portal: "https://portal.sqd.dev/datasets/ethereum-mainnet",
logger: pino(transport),
});
const target = createTarget({
write: async ({ logger, read }) => {
for await (const { data } of read()) {
logger.info({ count: data.length }, "Processed batch");
}
},
});
await source.pipeTo(target);
}
void main()
Integration with Cloud Services
You can use any Pino transport to send logs to cloud services. Pass the configured logger to the source.
GCP Cloud Logging
Sentry
Multiple Transports
import { createTarget } from "@subsquid/pipes";
import { evmPortalSource } from "@subsquid/pipes/evm";
import pino from "pino";
async function main() {
const transport = pino.transport({
target: "@google-cloud/logging-pino",
options: {
projectId: "your-project-id",
logName: "pipes-indexer",
},
});
const source = evmPortalSource({
portal: "https://portal.sqd.dev/datasets/ethereum-mainnet",
logger: pino(transport),
});
const target = createTarget({
write: async ({ logger, read }) => {
for await (const { data } of read()) {
logger.info(
{
blocksProcessed: data.blocks?.length,
eventsCount: data.transfer?.length,
},
"Batch processed"
);
}
},
});
await source.pipeTo(target);
}
void main()
import { createTarget } from "@subsquid/pipes";
import { evmPortalSource } from "@subsquid/pipes/evm";
import pino from "pino";
async function main() {
const transport = pino.transport({
target: "pino-sentry-transport",
options: {
sentry: {
dsn: process.env.SENTRY_DSN,
environment: "production",
},
level: "error", // Only send errors to Sentry
},
});
const source = evmPortalSource({
portal: "https://portal.sqd.dev/datasets/ethereum-mainnet",
logger: pino(transport),
});
const target = createTarget({
write: async ({ logger, read }) => {
for await (const { data } of read()) {
try {
await processData(data);
logger.info({ count: data.length }, "Batch processed");
} catch (error) {
logger.error({ error, data }, "Failed to process batch");
}
}
},
});
await source.pipeTo(target);
}
void main()
import { createTarget } from "@subsquid/pipes";
import { evmPortalSource } from "@subsquid/pipes/evm";
import pino from "pino";
async function main() {
const transport = pino.transport({
targets: [
{
target: "pino-pretty",
options: { colorize: true },
level: "info",
},
{
target: "@google-cloud/logging-pino",
options: { projectId: "your-project-id" },
level: "info",
},
{
target: "pino-sentry-transport",
options: { sentry: { dsn: process.env.SENTRY_DSN } },
level: "error",
},
],
});
const source = evmPortalSource({
portal: "https://portal.sqd.dev/datasets/ethereum-mainnet",
logger: pino(transport),
});
const target = createTarget({
write: async ({ logger, read }) => {
for await (const { data } of read()) {
logger.info({ count: data.length }, "Processed batch");
}
},
});
await source.pipeTo(target);
}
void main()
The ctx.logger in transformers and targets is the same logger instance passed to the source. Configure logging at the source level, then use ctx.logger throughout your pipeline for consistent logging.
Custom Metrics
Track custom Prometheus metrics in your pipeline. The Pipes SDK exposes a metrics server that you can use to add counters, gauges, histograms, and summaries.
import { commonAbis, evmDecoder, evmPortalSource } from "@subsquid/pipes/evm";
import { metricsServer } from "@subsquid/pipes/metrics/node";
async function main() {
const stream = evmPortalSource({
portal: "https://portal.sqd.dev/datasets/ethereum-mainnet",
metrics: metricsServer({ port: 9090 }),
}).pipe(
evmDecoder({
range: { from: "latest" },
events: {
transfers: commonAbis.erc20.events.Transfer,
},
})
);
for await (const { data, ctx } of stream) {
// Add custom counter metric
ctx.metrics
.counter({
name: "my_transfers_counter",
help: "Number of processed transactions",
})
.inc(data.transfers.length);
// Use profiler to measure performance
const span = ctx.profiler.start("processing");
await processData(data);
span.end();
}
}
void main()
Access metrics at http://localhost:9090/metrics:
# HELP my_transfers_counter Number of processed transactions
# TYPE my_transfers_counter counter
my_transfers_counter 218598
Available Metric Types
You can create different types of Prometheus metrics:
for await (const { data, ctx } of stream) {
// Counter - monotonically increasing value
ctx.metrics.counter({ name: "events_total", help: "Total events" }).inc();
// Gauge - value that can go up or down
ctx.metrics
.gauge({ name: "queue_size", help: "Current queue size" })
.set(queueSize);
// Histogram - observations with configurable buckets
ctx.metrics
.histogram({ name: "batch_size", help: "Batch size distribution" })
.observe(data.transfers.length);
}
Pipeline Monitoring UI
Visualize pipeline metrics and performance in real-time using Pipe UI, a web-based dashboard for monitoring your pipeline’s performance, profiling data, and metrics.
To start the monitoring UI, run:
npx @sqd-pipes/pipe-ui@latest --open
This command launches the Pipe UI dashboard and automatically opens it in your browser. The dashboard connects to your pipe’s Prometheus metrics endpoint to display real-time metrics, profiling data, and pipeline status.
Pipe UI requires metrics to be exposed from your pipe. Enable profilers in your decoders and add metricsServer() to your source to make metrics accessible. See the Pipe UI reference for complete setup instructions.
RPC Latency Monitoring
Compare Portal data availability against external RPC providers to monitor relative latency. The evmRpcLatencyWatcher subscribes to RPC endpoints via WebSocket and measures when blocks arrive at the Portal versus when they appear at the RPC endpoints.
The measured values include client-side network latency. For RPC endpoints, only the arrival time of blocks is measured—this does not capture the node’s internal processing or response latency if queried directly. Results represent end-to-end delays as experienced by the client, not pure Portal or RPC processing performance.
import { formatBlock } from "@subsquid/pipes";
import { evmPortalSource, evmRpcLatencyWatcher } from "@subsquid/pipes/evm";
import { metricsServer } from "@subsquid/pipes/metrics/node";
async function main() {
const stream = evmPortalSource({
portal: "https://portal.sqd.dev/datasets/base-mainnet",
query: { from: "latest" },
metrics: metricsServer({ port: 9090 }),
}).pipe(
evmRpcLatencyWatcher({
rpcUrl: ["https://base.drpc.org", "https://base-rpc.publicnode.com"],
}).pipe((data, { metrics }) => {
if (!data) return;
// Update Prometheus metrics for each RPC endpoint
for (const rpc of data.rpc) {
metrics
.gauge({
name: "rpc_latency_ms",
help: "RPC Latency in ms",
labelNames: ["url"],
})
.set({ url: rpc.url }, rpc.portalDelayMs);
}
return data;
})
);
for await (const { data } of stream) {
if (!data) continue;
console.log(`Block: ${formatBlock(data.number)} / ${data.timestamp}`);
console.table(data.rpc);
}
}
void main()
The latency data includes:
url: RPC endpoint URL
receivedAt: Timestamp when the RPC endpoint received the block
portalDelayMs: Milliseconds between RPC arrival and Portal availability
Block: 36,046,611 / Fri Sep 26 2025 14:29:29 GMT+0400
┌───┬─────────────────────────────────┬──────────────────────────┬───────────────┐
│ │ url │ receivedAt │ portalDelayMs │
├───┼─────────────────────────────────┼──────────────────────────┼───────────────┤
│ 0 │ https://base.drpc.org │ 2025-09-26T10:29:29.134Z │ 646 │
│ 1 │ https://base-rpc.publicnode.com │ 2025-09-26T10:29:29.130Z │ 642 │
└───┴─────────────────────────────────┴──────────────────────────┴───────────────┘
Pre-indexing Factories Experimental
Pre-populate factory database automatically using the _experimental_preindex option before running main pipe.
This is an experimental feature and may change in future versions. The pre-indexing request is limited and this approach won’t work for thousands of addresses. Use this feature only for small to medium-sized factory contract sets.
import {
evmPortalSource,
evmDecoder,
factory,
factorySqliteDatabase,
} from "@subsquid/pipes/evm";
import { events as factoryAbi } from "./abi/uniswap-v3-factory";
import { events as poolAbi } from "./abi/uniswap-v3-pool";
async function main() {
const stream = evmPortalSource({
portal: "https://portal.sqd.dev/datasets/ethereum-mainnet",
}).pipe(
evmDecoder({
range: { from: "12,369,621", to: "12,410,000" },
contracts: factory({
address: "0x1f98431c8ad98523631ae4a59f267346ea31f984",
event: factoryAbi.PoolCreated,
parameter: "pool",
_experimental_preindex: { from: "12,369,621", to: "12,400,000" },
database: factorySqliteDatabase({ path: "./pools.sqlite" }),
}),
events: {
swaps: poolAbi.Swap,
},
})
);
for await (const { data } of stream) {
console.log(`Processed ${data.swaps.length} swaps`);
}
}
void main()
Combining Patterns
Combine multiple advanced patterns for production-ready pipelines. This example demonstrates how to integrate portal caching, custom metrics, profiling, composite pipes, and ClickHouse persistence together.
import { createClient } from "@clickhouse/client";
import {
commonAbis,
evmDecoder,
evmPortalSource,
factory,
factorySqliteDatabase,
} from "@subsquid/pipes/evm";
import { portalSqliteCache } from "@subsquid/pipes/portal-cache/node";
import { clickhouseTarget } from "@subsquid/pipes/targets/clickhouse";
import { metricsServer } from "@subsquid/pipes/metrics/node";
import { events as factoryAbi } from "./abi/uniswap-v3-factory";
import { events as poolAbi } from "./abi/uniswap-v3-pool";
async function main() {
const client = createClient({
url: "http://localhost:8123",
username: "default",
password: "default",
});
await evmPortalSource({
portal: "https://portal.sqd.dev/datasets/ethereum-mainnet",
cache: portalSqliteCache({ path: "./cache.sqlite" }),
metrics: metricsServer({ port: 9090 }),
})
.pipeComposite({
transfers: evmDecoder({
profiler: { id: "ERC20" },
range: { from: 20000000 },
contracts: ["0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48"],
events: { transfer: commonAbis.erc20.events.Transfer },
}),
swaps: evmDecoder({
profiler: { id: "Uniswap" },
range: { from: 20000000 },
contracts: factory({
address: "0x1f98431c8ad98523631ae4a59f267346ea31f984",
event: factoryAbi.PoolCreated,
parameter: "pool",
database: factorySqliteDatabase({ path: "./pools.sqlite" }),
}),
events: { swap: poolAbi.Swap },
}),
})
.pipeTo(
clickhouseTarget({
client,
onData: async ({ store, data, ctx }) => {
// Track custom metrics
ctx.metrics
.counter({ name: "transfers_total", help: "Total transfers" })
.inc(data.transfers.transfer.length);
ctx.metrics
.counter({ name: "swaps_total", help: "Total swaps" })
.inc(data.swaps.swap.length);
// Insert transfers
await store.insert({
table: "transfers",
values: data.transfers.transfer.map((t) => ({
block_number: t.block.number,
from: t.from,
to: t.to,
value: t.value.toString(),
})),
format: "JSONEachRow",
});
// Insert swaps
await store.insert({
table: "swaps",
values: data.swaps.swap.map((s) => ({
block_number: s.block.number,
amount0: s.amount0.toString(),
amount1: s.amount1.toString(),
})),
format: "JSONEachRow",
});
},
onRollback: async ({ store, safeCursor }) => {
await store.removeAllRows({
tables: ["transfers", "swaps"],
where: `block_number > ${safeCursor.number}`,
});
},
})
);
}
void main()
This production example combines:
- Portal caching: Local SQLite cache for faster development iteration
- Custom metrics: Prometheus counters exposed on port 9090
- Profiling: Performance analysis for each decoder stream
- Composite pipes: Parallel processing of transfers and swaps
- ClickHouse target: Production persistence with automatic rollback handling