Skip to main content

Trivial Pipe

Stream USDC transfers from Portal to logger. Use case: Basic data streaming with query builder.
import { createTarget } from "@subsquid/pipes";
import { evmPortalSource, EvmQueryBuilder } from "@subsquid/pipes/evm";

async function main() {
const queryBuilder = new EvmQueryBuilder()
  .addFields({
    block: { number: true, hash: true },
    log: { address: true, topics: true, data: true, transactionHash: true },
  })
  .addLog({
    request: {
      address: ["0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48"], // USDC
      topic0: [
        "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef",
      ], // Transfer
    },
    range: { from: 20000000, to: 20000000 },
  });

const source = evmPortalSource({
  portal: "https://portal.sqd.dev/datasets/ethereum-mainnet",
  query: queryBuilder,
});

const target = createTarget({
  write: async ({ logger, read }) => {
    for await (const { data } of read()) {
      logger.info(data, "data");
    }
  },
});

await source.pipeTo(target);
}

void main();
For better type safety and developer experience, use evmDecoder with ABIs generated by @subsquid/evm-typegen instead of manually specifying topic0 hashes. See the Event Decoding example for a typed approach.

Data Flow

Adding Transformer

Extract transaction hashes from logs. Use case: Transform raw Portal data before logging.
import { createTarget, createTransformer } from "@subsquid/pipes";
import {
  evmPortalSource,
  type EvmPortalData,
  EvmQueryBuilder,
} from "@subsquid/pipes/evm";

async function main() {
const queryBuilder = new EvmQueryBuilder()
  .addFields({
    block: { number: true, hash: true },
    log: { address: true, topics: true, data: true, transactionHash: true },
  })
  .addLog({
    request: {
      address: ["0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48"],
      topic0: [
        "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef",
      ],
    },
    range: { from: 20000000, to: 20000000 },
  });

const source = evmPortalSource({
  portal: "https://portal.sqd.dev/datasets/ethereum-mainnet",
  query: queryBuilder,
});

const transformer = createTransformer({
  transform: async (data: EvmPortalData<any>) => {
    return data.blocks.map((b) => b.logs.map((l) => l.transactionHash));
  },
});

const target = createTarget({
  write: async ({ logger, read }) => {
    for await (const { data } of read()) {
      logger.info({ data }, "transaction hashes");
    }
  },
});

await source.pipe(transformer).pipeTo(target);
}

void main();

Data Flow

Query from Transformer

Build query dynamically in transformer. Use case: Self-contained transformers that specify their own data requirements.
import { createTarget, createTransformer } from "@subsquid/pipes";
import {
  evmPortalSource,
  type EvmPortalData,
  EvmQueryBuilder,
} from "@subsquid/pipes/evm";

async function main() {
// Start with empty query
const source = evmPortalSource({
  portal: "https://portal.sqd.dev/datasets/ethereum-mainnet",
  query: new EvmQueryBuilder(),
});

// Transformer adds to query
const transformer = createTransformer({
  query: ({ queryBuilder }) => {
    queryBuilder
      .addFields({
        block: { number: true, hash: true },
        log: { address: true, topics: true, data: true, transactionHash: true },
      })
      .addLog({
        request: {
          address: ["0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48"],
          topic0: [
            "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef",
          ],
        },
        range: { from: 20000000, to: 20000000 },
      });
  },
  transform: async (data: EvmPortalData<any>) => {
    return data.blocks.map((b) => b.logs.map((l) => l.transactionHash));
  },
});

const target = createTarget({
  write: async ({ logger, read }) => {
    for await (const { data } of read()) {
      logger.info({ data }, "data");
    }
  },
});

await source.pipe(transformer).pipeTo(target);
}

void main();

Data Flow

Key Distinction: Transformers can operate in two ways:
  • Pure transformation (previous example): Only implements transform() - processes data after the Portal query is executed
  • Query-aware transformation (this example): Implements both query() and transform() - contributes to building the Portal query AND processes the resulting data
When a transformer has a query() method, it augments the Portal query before data is fetched, allowing transformers to be self-contained with their data requirements.