Basic Factory
Index swaps from Uniswap V3 pools dynamically discovered via factory. Use case: Track events from contracts created by a factory.This example requires generated ABI files. Generate them using:See the Custom ABI section for details.
Copy
npx @subsquid/evm-typegen src/abi abi/UniswapV3Factory.json
npx @subsquid/evm-typegen src/abi abi/UniswapV3Pool.json
Copy
import {
evmPortalSource,
evmDecoder,
factory,
factorySqliteDatabase,
} from "@subsquid/pipes/evm";
import { createTarget } from "@subsquid/pipes";
import * as factoryAbi from "./abi/uniswap-v3-factory";
import * as poolAbi from "./abi/uniswap-v3-pool";
async function main() {
const source = evmPortalSource({
portal: "https://portal.sqd.dev/datasets/ethereum-mainnet",
});
const decoder = evmDecoder({
range: { from: 12369621 },
contracts: factory({
address: "0x1f98431c8ad98523631ae4a59f267346ea31f984",
event: factoryAbi.events.PoolCreated,
parameter: "pool",
database: factorySqliteDatabase({
path: "./uniswap-v3-pools.sqlite",
}),
}),
events: {
swap: poolAbi.events.Swap,
},
});
const target = createTarget({
write: async ({ logger, read }) => {
for await (const { data } of read()) {
logger.info(`Parsed ${data.swap.length} swaps`);
}
},
});
await source.pipe(decoder).pipeTo(target);
}
void main();
How It Works
Factory with Event Metadata
Access factory event data in decoded events. Use case: Include factory context with each decoded event.Copy
import {
evmPortalSource,
evmDecoder,
factory,
DecodedEvent,
factorySqliteDatabase,
} from "@subsquid/pipes/evm";
import { createTarget } from "@subsquid/pipes";
import * as factoryAbi from "./abi/uniswap-v3-factory";
import * as poolAbi from "./abi/uniswap-v3-pool";
// Transform to include factory event
function addFactoryMetadata<T, F>(event: DecodedEvent<T, F>) {
return {
...event.event,
blockNumber: event.block.number,
factoryEvent: event.factory?.event,
};
}
async function main() {
const source = evmPortalSource({
portal: "https://portal.sqd.dev/datasets/ethereum-mainnet",
});
const decoder = evmDecoder({
range: { from: 12369621 },
contracts: factory({
address: "0x1f98431c8ad98523631ae4a59f267346ea31f984",
event: factoryAbi.events.PoolCreated,
parameter: "pool",
database: factorySqliteDatabase({
path: "./uniswap-v3-pools.sqlite",
}),
}),
events: {
swap: poolAbi.events.Swap,
mint: poolAbi.events.Mint,
},
}).pipe(({ swap, mint }) => ({
swap: swap.map(addFactoryMetadata),
mint: mint.map(addFactoryMetadata),
}));
const target = createTarget({
write: async ({ logger, read }) => {
for await (const { data } of read()) {
for (const s of data.swap) {
logger.info({
pool: s.factoryEvent?.pool,
token0: s.factoryEvent?.token0,
token1: s.factoryEvent?.token1,
amount0: s.amount0.toString(),
amount1: s.amount1.toString(),
});
}
}
},
});
await source.pipe(decoder).pipeTo(target);
}
void main();
Pre-indexing Factory Experimental
Pre-populate factory database before main pipeline. Use case: Discover all existing contracts before starting main indexing.This is an experimental feature and may change in future versions. The pre-indexing request is limited and this approach won’t work for thousands of addresses. Use this feature only for small to medium-sized factory contract sets.
Copy
import {
evmPortalSource,
evmDecoder,
factory,
factorySqliteDatabase,
} from "@subsquid/pipes/evm";
import { createTarget } from "@subsquid/pipes";
import * as factoryAbi from "./abi/uniswap-v3-factory";
import * as poolAbi from "./abi/uniswap-v3-pool";
// Create factory database
const factoryDb = factorySqliteDatabase({
path: "./uniswap-v3-pools.sqlite",
});
// Step 1: Pre-index all historical pool creations
console.log("Pre-indexing pools...");
const preIndexSource = evmPortalSource({
portal: "https://portal.sqd.dev/datasets/ethereum-mainnet",
});
const preIndexDecoder = evmDecoder({
range: { from: 12369621, to: 20000000 },
contracts: ["0x1f98431c8ad98523631ae4a59f267346ea31f984"],
events: {
poolCreated: factoryAbi.events.PoolCreated,
},
});
async function main() {
const preIndexTarget = createTarget({
write: async ({ logger, read }) => {
for await (const { data } of read()) {
for (const event of data.poolCreated) {
await factoryDb.add(event.event.pool);
logger.info(`Added pool: ${event.event.pool}`);
}
}
},
});
await preIndexSource.pipe(preIndexDecoder).pipeTo(preIndexTarget);
console.log("Pre-indexing complete. Starting main pipeline...");
// Step 2: Run main pipeline with populated factory database
const mainSource = evmPortalSource({
portal: "https://portal.sqd.dev/datasets/ethereum-mainnet",
});
const mainDecoder = evmDecoder({
range: { from: 20000000 },
contracts: factory({
address: "0x1f98431c8ad98523631ae4a59f267346ea31f984",
event: factoryAbi.events.PoolCreated,
parameter: "pool",
database: factoryDb,
}),
events: {
swap: poolAbi.events.Swap,
},
});
const mainTarget = createTarget({
write: async ({ logger, read }) => {
for await (const { data } of read()) {
logger.info(`Processed ${data.swap.length} swaps`);
}
},
});
await mainSource.pipe(mainDecoder).pipeTo(mainTarget);
}
void main();
Workflow
Multiple Factories
Track contracts from multiple factory contracts. Use case: Index events from contracts created by different factories.Copy
import {
evmPortalSource,
evmDecoder,
factory,
factorySqliteDatabase,
} from "@subsquid/pipes/evm";
import * as uniswapV3FactoryAbi from "./abi/uniswap-v3-factory";
import * as uniswapV2FactoryAbi from "./abi/uniswap-v2-factory";
import * as poolAbi from "./abi/pool";
const source = evmPortalSource({
portal: "https://portal.sqd.dev/datasets/ethereum-mainnet",
});
const pipeline = source.pipeComposite({
v3: evmDecoder({
range: { from: 12369621 },
contracts: factory({
address: "0x1f98431c8ad98523631ae4a59f267346ea31f984",
event: uniswapV3FactoryAbi.events.PoolCreated,
parameter: "pool",
database: factorySqliteDatabase({ path: "./v3-pools.sqlite" }),
}),
events: { swap: poolAbi.events.Swap },
}),
v2: evmDecoder({
range: { from: 10000835 },
contracts: factory({
address: "0x5C69bEe701ef814a2B6a3EDD4B1652CB9cc5aA6f",
event: uniswapV2FactoryAbi.events.PairCreated,
parameter: "pair",
database: factorySqliteDatabase({ path: "./v2-pairs.sqlite" }),
}),
events: { swap: poolAbi.events.Swap },
}),
});
async function main() {
const target = createTarget({
write: async ({ logger, read }) => {
for await (const { data } of read()) {
logger.info({
v3Swaps: data.v3.swap.length,
v2Swaps: data.v2.swap.length,
});
}
},
});
await pipeline.pipeTo(target);
}
void main();

