Skip to main content

Documentation Index

Fetch the complete documentation index at: https://beta.docs.sqd.dev/llms.txt

Use this file to discover all available pages before exploring further.

createTransformer

Construct a whole-pipe transformer.
createTransformer<I, O>(config: TransformerOptions<I, O>): Transformer<I, O>
Config fields:
  • transform: (required) (data: I, ctx: BatchContext) => O | Promise<O>. Called once per batch.
  • start: (optional) (ctx: StartCtx) => void | Promise<void>. Called once when the pipe starts. Use this to load state, warm up caches, or query the portal for historical data before the main stream begins.
  • stop: (optional) (ctx: StopCtx) => void | Promise<void>. Called once when the pipe stops.
  • fork: (optional) (cursor: BlockCursor, ctx: Ctx) => void | Promise<void>. Called before the next batch whenever the source detects a chain reorg. cursor identifies the last safe . See Fork handling.
  • profiler: (optional) { name: string; hidden?: boolean }. Overrides the transformer’s node name in the profiler tree.
Example:

Context variables

Each callback receives a context object. The fields differ by callback.

transform(data, ctx: BatchContext)

ctx is the full per-batch context. Fields:
FieldTypeDescription
idstringPipeline ID — the id passed to starknetPortalStream().
loggerLoggerPino-compatible logger scoped to this batch. Defaults to the source-level logger.
metricsMetricsPrometheus metrics registry. Use ctx.metrics.counter(), .gauge(), .histogram(), .summary() to register and update custom metrics. See Metrics.
profilerProfilerOpen a span with ctx.profiler.start('label'). See Profiling.
streamBatchStreamContextPer-stream state (see below).
batchBatchMetadataPer-batch volume info (see below).

ctx.stream: BatchStreamContext

FieldTypeDescription
datasetApiDatasetDataset metadata returned by the portal (chain name, genesis, tier).
head.finalizedBlockCursor | undefinedCurrent finalized head known to the portal, if advertised.
head.latestBlockCursor | undefinedCurrent unfinalized head.
state.initialnumberFirst the stream was configured to read.
state.lastnumberLast the stream intends to read (often Infinity).
state.currentBlockCursorLatest in this batch. Cursor has { number, hash?, timestamp? }.
state.rollbackChainBlockCursor[]Unfinalized-chain tail — cursors the stream will need to roll back if a fork is detected.
progressProgressEvent['progress']Progress metrics when progress is configured on the source; otherwise undefined.
query{ url, hash, raw }Debug info for the portal query feeding this batch.

ctx.batch: BatchMetadata

FieldTypeDescription
blocksCountnumberNumber of in this batch.
bytesSizenumberCompressed payload size received from the portal.
requestsRecord<number, number>Map of HTTP status code → number of responses that produced this batch.
lastBlockReceivedAtDateWall-clock time the last block was received.

start(ctx: StartCtx)

Fired once, before any batch. Use to warm up caches or run one-off queries.
FieldTypeDescription
idstringPipeline ID.
loggerLoggerSame as in BatchContext.
metricsMetricsSame as in BatchContext.
portalPortalClientLive portal client. Use portal.getStream(query) for warm-up reads.
state.initialnumberFirst the stream was configured to read.
state.currentBlockCursor | undefinedCursor persisted by the previous run, if any. undefined on a fresh start.

fork(cursor, ctx: Ctx)

Fired before the next batch whenever a reorg is detected. cursor is the last to keep; drop state produced for anything after it.
FieldTypeDescription
loggerLoggerSame as in BatchContext.
profilerProfilerSame as in BatchContext.

stop(ctx: StopCtx)

Fired once when the pipe stops.
FieldTypeDescription
loggerLoggerSame as in BatchContext.