createTransformer
Transform data as it flows through pipes.Copy
import { createTransformer } from "@subsquid/pipes";
import type { EvmPortalData } from "@subsquid/pipes/evm";
async function main() {
const transformer = createTransformer({
transform: async (data: EvmPortalData<any>) => {
return data.blocks.map((block) => ({
number: block.header.number,
hash: block.header.hash,
logCount: block.logs.length,
}));
},
});
await source.pipe(transformer).pipeTo(target);
}
void main();
Transform Function
Copy
transform: async (data: EvmPortalData<any>) => Promise<any>;
Copy
transform: async (data) => {
return data.blocks.flatMap((block) =>
block.logs.map((log) => ({
blockNumber: block.header.number,
transactionHash: log.transactionHash,
address: log.address,
}))
);
};
Query Callback
Add queries dynamically:Copy
const transformer = createTransformer({
query: ({ queryBuilder, portal, logger }) => {
queryBuilder
.addFields({
block: { number: true, hash: true },
log: { address: true, topics: true, data: true },
})
.addLog({
request: { address: ["0x..."] },
range: { from: 20000000 },
});
},
transform: async (data) => processData(data),
});
Self-Contained Transformers
Copy
async function main() {
// Start with empty query in source
const source = evmPortalSource({
portal: "https://portal.sqd.dev/datasets/ethereum-mainnet",
query: new EvmQueryBuilder(),
});
// Transformer specifies its own data requirements
const usdcTransformer = createTransformer({
query: ({ queryBuilder }) => {
queryBuilder
.addFields({
block: { number: true, hash: true },
log: { address: true, topics: true, data: true, transactionHash: true },
})
.addLog({
request: {
address: ["0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48"],
topic0: [
"0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef",
],
},
range: { from: 20000000, to: 20000000 },
});
},
transform: async (data) => {
return data.blocks.map((b) => b.logs.map((l) => l.transactionHash));
},
});
await source.pipe(usdcTransformer).pipeTo(target);
}
void main();
Patterns
Extract Fields
Copy
transform: async (data) => {
return data.blocks.map((block) => ({
number: block.header.number,
timestamp: block.header.timestamp,
transactions: block.logs.map((log) => log.transactionHash),
}));
};
Filter Data
Copy
transform: async (data) => {
return data.blocks.filter((block) => block.logs.length > 10);
};
Aggregate
Copy
transform: async (data) => {
const logCount = data.blocks.reduce(
(sum, block) => sum + block.logs.length,
0
);
return { totalLogs: logCount };
};
Flatten
Copy
transform: async (data) => {
return data.blocks.flatMap((block) => block.logs);
};
Chaining Transformers
Copy
async function main() {
const extract = createTransformer({
transform: async (data) => data.blocks.map((b) => b.logs),
});
const flatten = createTransformer({
transform: async (data) => data.flat(),
});
await source.pipe(extract).pipe(flatten).pipeTo(target);
}
void main();
Data Flow
Context Access
Copy
const transformer = createTransformer({
transform: async (data, ctx) => {
const { logger, profiler } = ctx;
const span = profiler.start("transform");
logger.info(`Processing ${data.blocks.length} blocks`);
const result = processData(data);
span.end();
return result;
},
});

