Skip to main content

Documentation Index

Fetch the complete documentation index at: https://beta.docs.sqd.dev/llms.txt

Use this file to discover all available pages before exploring further.

Build a custom data sink. A target drains batches from the pipe and is responsible for persisting them.
createTarget<In>(config: Target<In>): Target<In>
Config fields:
  • write: (required) Async function ({ read, logger }) => Promise<void>. Iterate the stream by calling read() and consuming { data, ctx } batches. The function returns when the stream ends.
  • fork: (optional) (previousBlocks: BlockCursor[]) => Promise<BlockCursor | null>. Called when the source detects a chain reorg. Return the last safe cursor to roll back to, or null if no common ancestor can be determined (the stream will throw). See Fork handling. You don’t need this callback when the source is configured to read only finalized blocks.

The write context

type WriteCtx<In> = {
  read: (cursor?: BlockCursor) => AsyncIterableIterator<PortalBatch<In>>
  logger: Logger
}
FieldDescription
readOpens an async iterator over pipeline batches. Pass cursor to resume from a specific the target has persisted.
loggerPino-compatible logger scoped to this target.

Per-batch context (ctx)

Each { data, ctx } yielded by read() carries the same BatchContext that transformers receive. Fields:
FieldTypeDescription
idstringPipeline ID — the id passed to fuelPortalStream().
loggerLoggerBatch-scoped logger.
metricsMetricsPrometheus metrics registry. See Metrics.
profilerProfilerOpen a span with ctx.profiler.start('label'). See Profiling.
stream.datasetApiDatasetDataset metadata.
stream.head.finalizedBlockCursor | undefinedCurrent finalized head.
stream.head.latestBlockCursor | undefinedCurrent unfinalized head.
stream.state.initialnumberFirst the stream was configured to read.
stream.state.lastnumberLast the stream intends to read.
stream.state.currentBlockCursorLatest in this batch.
stream.state.rollbackChainBlockCursor[]Tail of unfinalized cursors subject to rollback.
stream.progressProgressEvent['progress']Progress metrics when progress is enabled.
stream.query{ url, hash, raw }Portal query details for the batch.
batch.blocksCountnumberNumber of in this batch.
batch.bytesSizenumberCompressed payload size received from the portal.
batch.requestsRecord<number, number>Map of HTTP status code → count of responses that produced this batch.
batch.lastBlockReceivedAtDateWall-clock time the last block was received.

Example

const target = createTarget({
  write: async ({ read, logger }) => {
    for await (const { data, ctx } of read()) {
      const span = ctx.profiler.start('save')
      await database.save(data)
      span.end()
      logger.info(
        { block: ctx.stream.state.current.number, rows: ctx.batch.blocksCount },
        'saved batch',
      )
    }
  },
  fork: async (previousBlocks) => {
    // Return a cursor from your persisted state; null to fail hard.
    return previousBlocks[previousBlocks.length - 1] ?? null
  },
})

Resuming from a persisted cursor

Stateful targets typically persist a cursor and resume from it on restart:
createTarget({
  write: async ({ read, logger }) => {
    const lastSaved = await database.getCursor() // BlockCursor | undefined
    for await (const { data, ctx } of read(lastSaved)) {
      await database.save(data)
      await database.saveCursor(ctx.stream.state.current)
    }
  },
})