Skip to main content

createTarget

Persist data to any destination.
import { createTarget } from "@subsquid/pipes";

async function main() {
const target = createTarget({
  write: async ({ logger, read }) => {
    for await (const { data } of read()) {
      await database.save(data);
      logger.info(`Saved ${data.length} items`);
    }
  },
});

await source.pipeTo(target);
}

void main();
Use finalized blocks or implement the onRollback handler to handle blockchain reorgs.

Using Finalized Blocks

You can configure the portal source to only receive finalized blocks:
const source = evmPortalSource({
  portal: {
    finalized: true,
    url: 'https://portal.sqd.dev/datasets/ethereum-mainnet'
  }
});
Using finalized blocks eliminates the need for onRollback handlers, simplifying your target implementation.

Write Function

write: async ({ logger, read }) => {
  for await (const { data, ctx } of read()) {
    // Process data - ctx.profiler available here
  }
};

Context

Access logger and profiler:
write: async ({ logger, read }) => {
  logger.info("Starting batch");

  for await (const { data, ctx } of read()) {
    const span = ctx.profiler.start("processing");
    await processData(data);
    span.end();
  }
};
Process batches with context:
write: async ({ logger, read }) => {
  for await (const { data, ctx } of read()) {
    const span = ctx.profiler.start("batch");
    logger.info("Processing batch");

    const processedBlocks = data.blocks.map((block) => {
      // Process each block
      return {
        number: block.header.number,
        timestamp: block.header.timestamp,
        txCount: block.transactions.length,
      };
    });

    await database.insert(processedBlocks);
    span.end();
  }
};

Context in Transformers

Context is also available in transform functions as the second parameter:
const transformer = createTransformer({
  transform: async (data, ctx) => {
    const { logger, profiler } = ctx;
    const span = profiler.start("batch");
    logger.info("Processing batch");

    const processedBlocks = data.blocks.map((block) => {
      // Transform block data
      return {
        number: block.header.number,
        timestamp: block.header.timestamp,
        logCount: block.logs.length,
      };
    });

    span.end();
    return processedBlocks;
  },
});

Read Iterator

Stream data batches:
write: async ({ read }) => {
  for await (const { data } of read()) {
    console.log(data);
  }
};

Cursor Management

Save progress to resume from last processed block.
let lastBlock = 0;

const target = createTarget({
  write: async ({ read }) => {
    for await (const { data } of read()) {
      // Save data
      await database.insert(data);

      // Update cursor
      lastBlock = getLastBlockNumber(data);
      await fs.writeFile("cursor.json", JSON.stringify({ block: lastBlock }));
    }
  },
});

Resume from Cursor

// Load saved cursor
const cursor = JSON.parse(await fs.readFile("cursor.json", "utf-8"));

// Start source from cursor
const source = evmPortalSource({
  portal: "https://portal.sqd.dev/datasets/ethereum-mainnet",
  query: queryBuilder,
  cursor: { blockNumber: cursor.block },
});

Fork Handling

Handle blockchain reorganizations.
const target = createTarget({
  write: async ({ read }) => {
    for await (const { data } of read()) {
      await database.insert(data);
    }
  },
  onRollback: async ({ cursor }) => {
    // Delete data after fork point
    await database.query("DELETE FROM transfers WHERE block_number > $1", [
      cursor.blockNumber,
    ]);
  },
});

Fork Detection

Targets receive rollback signals:
onRollback: async ({ cursor, type }) => {
  console.log(`Fork detected at block ${cursor.blockNumber}`);
  console.log(`Rollback type: ${type}`);

  // Remove orphaned data
  await removeDataAfter(cursor.blockNumber);

  // Update cursor
  await saveCursor(cursor.blockNumber);
};

Complete Example

import { createTarget } from "@subsquid/pipes";
import {
  evmPortalSource,
  evmDecoder,
  commonAbis,
} from "@subsquid/pipes/evm";
import fs from "fs/promises";

const CURSOR_FILE = "cursor.json";

async function loadCursor() {
  try {
    const data = await fs.readFile(CURSOR_FILE, "utf-8");
    return JSON.parse(data);
  } catch {
    return null;
  }
}

async function saveCursor(blockNumber: number) {
  await fs.writeFile(CURSOR_FILE, JSON.stringify({ blockNumber }));
}

async function main() {
const cursor = await loadCursor();

const source = evmPortalSource({
  portal: "https://portal.sqd.dev/datasets/ethereum-mainnet",
  cursor: cursor ? { blockNumber: cursor.blockNumber } : undefined,
});

const decoder = evmDecoder({
  range: { from: cursor?.blockNumber || 20000000 },
  contracts: ["0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48"],
  events: { transfer: commonAbis.erc20.events.Transfer },
});

const target = createTarget({
  write: async ({ logger, read }) => {
    for await (const { data } of read()) {
      // Save transfers
      await database.insert(data.transfer);

      // Update cursor
      const lastBlock = Math.max(...data.transfer.map((t) => t.blockNumber));
      await saveCursor(lastBlock);

      logger.info(`Processed up to block ${lastBlock}`);
    }
  },
  onRollback: async ({ cursor }) => {
    logger.warn(`Fork at block ${cursor.blockNumber}`);
    await database.query("DELETE FROM transfers WHERE block_number > $1", [
      cursor.blockNumber,
    ]);
    await saveCursor(cursor.blockNumber);
  },
});

await source.pipe(decoder).pipeTo(target);
}

void main();

Patterns

Console Logging

const target = createTarget({
  write: async ({ read }) => {
    for await (const { data } of read()) {
      console.log(JSON.stringify(data, null, 2));
    }
  },
});

File Output

const target = createTarget({
  write: async ({ read }) => {
    for await (const { data } of read()) {
      const filename = `data-${Date.now()}.json`;
      await fs.writeFile(filename, JSON.stringify(data));
    }
  },
});

Database Insert

const target = createTarget({
  write: async ({ read }) => {
    for await (const { data } of read()) {
      await database.transaction(async (tx) => {
        for (const item of data.transfer) {
          await tx.insert("transfers", {
            block_number: item.blockNumber,
            from_address: item.event.from,
            to_address: item.event.to,
            value: item.event.value.toString(),
          });
        }
      });
    }
  },
});

Data Flow