Skip to main content

createTransformer

Transform data as it flows through pipes.
import { createTransformer } from "@subsquid/pipes";
import type { EvmPortalData } from "@subsquid/pipes/evm";

async function main() {
const transformer = createTransformer({
  transform: async (data: EvmPortalData<any>) => {
    return data.blocks.map((block) => ({
      number: block.header.number,
      hash: block.header.hash,
      logCount: block.logs.length,
    }));
  },
});

await source.pipe(transformer).pipeTo(target);
}

void main();

Transform Function

transform: async (data: EvmPortalData<any>) => Promise<any>;
Access blocks and logs:
transform: async (data) => {
  return data.blocks.flatMap((block) =>
    block.logs.map((log) => ({
      blockNumber: block.header.number,
      transactionHash: log.transactionHash,
      address: log.address,
    }))
  );
};

Query Callback

Add queries dynamically:
const transformer = createTransformer({
  query: ({ queryBuilder, portal, logger }) => {
    queryBuilder
      .addFields({
        block: { number: true, hash: true },
        log: { address: true, topics: true, data: true },
      })
      .addLog({
        request: { address: ["0x..."] },
        range: { from: 20000000 },
      });
  },
  transform: async (data) => processData(data),
});

Self-Contained Transformers

async function main() {
// Start with empty query in source
const source = evmPortalSource({
  portal: "https://portal.sqd.dev/datasets/ethereum-mainnet",
  query: new EvmQueryBuilder(),
});

// Transformer specifies its own data requirements
const usdcTransformer = createTransformer({
  query: ({ queryBuilder }) => {
    queryBuilder
      .addFields({
        block: { number: true, hash: true },
        log: { address: true, topics: true, data: true, transactionHash: true },
      })
      .addLog({
        request: {
          address: ["0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48"],
          topic0: [
            "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef",
          ],
        },
        range: { from: 20000000, to: 20000000 },
      });
  },
  transform: async (data) => {
    return data.blocks.map((b) => b.logs.map((l) => l.transactionHash));
  },
});

await source.pipe(usdcTransformer).pipeTo(target);
}

void main();

Patterns

Extract Fields

transform: async (data) => {
  return data.blocks.map((block) => ({
    number: block.header.number,
    timestamp: block.header.timestamp,
    transactions: block.logs.map((log) => log.transactionHash),
  }));
};

Filter Data

transform: async (data) => {
  return data.blocks.filter((block) => block.logs.length > 10);
};

Aggregate

transform: async (data) => {
  const logCount = data.blocks.reduce(
    (sum, block) => sum + block.logs.length,
    0
  );
  return { totalLogs: logCount };
};

Flatten

transform: async (data) => {
  return data.blocks.flatMap((block) => block.logs);
};

Chaining Transformers

async function main() {
const extract = createTransformer({
  transform: async (data) => data.blocks.map((b) => b.logs),
});

const flatten = createTransformer({
  transform: async (data) => data.flat(),
});

await source.pipe(extract).pipe(flatten).pipeTo(target);
}

void main();

Data Flow

Context Access

const transformer = createTransformer({
  transform: async (data, ctx) => {
    const { logger, profiler } = ctx;
    const span = profiler.start("transform");
    logger.info(`Processing ${data.blocks.length} blocks`);

    const result = processData(data);

    span.end();
    return result;
  },
});