Skip to main content

Documentation Index

Fetch the complete documentation index at: https://beta.docs.sqd.dev/llms.txt

Use this file to discover all available pages before exploring further.

When consuming a real-time stream near the chain head, the portal can detect that the client’s view of the chain has diverged from the canonical chain — a situation known as a fork or reorg. The portal signals this with an HTTP 409 response containing a sample of from the new canonical chain. Your code must find the highest that both chains agree on, roll back any state written after that point, and replay from there.
Fork handling is only needed for real-time streams (range.from: 'latest'). Historical streams consume already-finalized data and never produce forks. See Fork detection scope below.
The SDK provides two patterns for consuming a stream. Both use the same state-tracking logic; they differ in how the fork signal is delivered.
If your pipeline includes a stateful transformer, it must also implement a fork callback to roll back its own state in lockstep with the target.
The pipeTo(createTarget({write, fork})) pattern keeps fork handling completely separate from batch processing. The SDK catches the 409 internally and calls fork() with the portal’s consensus sample; write() never sees the interruption and continues iterating batches without restarting.
1

Declare state

Two variables span the lifetime of the stream:
let recentUnfinalizedBlocks: BlockCursor[] = []
let finalizedHighWatermark: BlockCursor | undefined
recentUnfinalizedBlocks is the local history of unfinalized used to find the common ancestor during a fork. finalizedHighWatermark tracks the highest finalized ever seen — stored as a full BlockCursor (number and hash) so it can double as a rollback cursor when needed. Both must be declared outside pipeTo so fork() can access them.
2

Collect rollback history

Inside write(), append each batch’s unfinalized to the local history:
ctx.stream.state.rollbackChain.forEach((bc) => {
  recentUnfinalizedBlocks.push(bc)
})
ctx.stream.state.rollbackChain contains only the from this batch that are above the current finalized head — it is a per-batch delta, not a full snapshot. Always append to the end; never replace or reorder.
3

Prune finalized blocks

After collecting history, prune that are now finalized and cap the queue:
if (ctx.stream.head.finalized) {
  if (!finalizedHighWatermark || ctx.stream.head.finalized.number > finalizedHighWatermark.number) {
    finalizedHighWatermark = ctx.stream.head.finalized
  }
  recentUnfinalizedBlocks = recentUnfinalizedBlocks.filter(b => b.number >= finalizedHighWatermark!.number)
}
recentUnfinalizedBlocks = recentUnfinalizedBlocks.slice(recentUnfinalizedBlocks.length - 1000)
Portal instances behind a load balancer can report different finalized heads. Using the maximum seen so far (the high-water mark) prevents the pruning threshold from moving backwards when the stream reconnects to a lagging instance. See consideration 6 for details.
4

Implement fork()

fork() receives previousBlocks — the portal’s current-chain sample — and must return the last good cursor, or null if recovery is impossible:
fork: async (newConsensusBlocks) => {
  const rollbackIndex = findRollbackIndex(recentUnfinalizedBlocks, newConsensusBlocks)
  if (rollbackIndex >= 0) {
    recentUnfinalizedBlocks.length = rollbackIndex + 1
    return recentUnfinalizedBlocks[rollbackIndex]
  }
  if (finalizedHighWatermark &&
      newConsensusBlocks.every(b => b.number < finalizedHighWatermark!.number)) {
    recentUnfinalizedBlocks = recentUnfinalizedBlocks.filter(b => b.number <= finalizedHighWatermark!.number)
    return finalizedHighWatermark
  }
  return null
}
Three cases: (1) a common ancestor is found in local history — truncate and return it; (2) all previousBlocks fall below the finalized high-water mark, meaning the portal’s sample doesn’t reach local history — return the high-water mark cursor; (3) no recovery possible — return null, which surfaces a ForkCursorMissingError.
Both approaches use the same merge-sort scan. Given two ascending-sorted arrays of BlockCursor — local history and the portal’s previousBlocksfindRollbackIndex returns the index in local history of the last entry that both chains agree on (same number and hash):
function findRollbackIndex(chainA: BlockCursor[], chainB: BlockCursor[]): number {
  let aIndex = 0, bIndex = 0, lastCommonIndex = -1
  while (aIndex < chainA.length && bIndex < chainB.length) {
    const a = chainA[aIndex], b = chainB[bIndex]
    if (a.number < b.number) { aIndex++; continue }
    if (a.number > b.number) { bIndex++; continue }
    if (a.hash !== b.hash) return lastCommonIndex   // chains diverged here
    lastCommonIndex = aIndex; aIndex++; bIndex++
  }
  return lastCommonIndex
}
The scan advances the pointer for the lower-numbered entry until both point to the same number. A hash mismatch means the chains diverged at this number; lastCommonIndex holds the last agreement point. Returning -1 means no common ancestor was found in the sample.

Edge cases and considerations

Empty history at stream start. The rollback chain is built batch-by-batch from ctx.stream.state.rollbackChain. Until the first batch arrives the history is empty. A fork arriving before any batch has been processed means fork() will find no common ancestor and must return null, which the SDK turns into a fatal error. For a long-running process this window is typically acceptable, but it matters for freshly started consumers.History gaps from fast-moving finalization. rollbackChain in each batch contains only the from that batch that are strictly above the current finalized head. A that was already at or below the finalized head when its batch was fetched will never appear in any rollback chain and will therefore be absent from history. This can leave gaps in the number sequence. Algorithms that assume a contiguous history will fail; always match by both number and hash.No finalized-head info in a batch. When batch.head.finalized is absent, no history is accumulated. On networks or portal deployments that do not yet surface finality data, the rollback chain stays empty indefinitely. On such networks fork recovery is impossible unless unfinalized are tracked through another mechanism.
Ascending order, match by hash and number. The API spec requires matching on both. Matching only by number is wrong — different chains can have the same number. The array is ordered ascending (lowest number first); the last entry is the most recent the portal knows about.previousBlocks may have no overlap with local history. The portal sends a bounded sample. If findRollbackIndex finds no agreement point at all (returns -1) and no HWM fallback applies, fork recovery is impossible — return null. The SDK will surface a ForkCursorMissingError. Do not silently roll back to 0 or crash.Multiple consecutive 409s converge to the common ancestor. These two cases are distinct from each other: when findRollbackIndex does find an overlap point, the stream rolls back there and resumes. If the true common ancestor is deeper still — because the previousBlocks sample only reached partway — the portal detects another mismatch and sends a fresh 409 with an older window, this time closer to the true ancestor. The stream converges over several rounds. fork() must be idempotent across these calls; truncating the history array in place handles this correctly, since each call receives a shorter local history. Database-backed approaches must also handle re-entrant rollback calls.
Fork deeper than your history. If you cap rollback history (e.g. to 1000 ), a reorg deeper than the cap is unrecoverable. Choose the cap based on the worst-case reorg depth for your target network. Ethereum mainnet finalizes within ~64 (~2 epochs), but PoW or pre-finality networks can reorg much deeper. Fail loudly rather than silently replaying from 0.The finalized as the last-resort anchor. Keep the current finalized in your rollback history even though it is technically not unfinalized. It is the guaranteed safe floor: the portal will never ask you to roll back past it. Having it available means fork() can always return a valid cursor for the deepest possible reorg. Pruning with number > finalized instead of number >= finalized removes this anchor and makes very deep reorgs unrecoverable.History that never gets pruned. If the portal never sends a finalized head, rollback history will grow without bound. Apply a -count cap as a secondary safeguard.
Business state and rollback-chain history must be rolled back atomically. For databases with transactions (Postgres), both must be updated in the same transaction — a crash between the two leaves fork() computing the wrong rollback point.For non-transactional databases (ClickHouse), atomicity is not achievable; use a crash-recovery callback instead. Write application data first, write the rollback-chain checkpoint second. A crash after data but before the checkpoint save leaves the checkpoint pointing to the previous batch. On every restart, before the stream resumes, the checkpoint cursor should be read and used to purge any rows written after it — this closes the gap. This is how the Pipes SDK ClickHouse target works: onRollback is invoked with type: 'offset_check' on every startup so user code can delete the partial batch. Because ClickHouse DELETEs are asynchronous and unsafe under concurrent writes, the SDK inserts tombstone rows (sign = -1) via CollapsingMergeTree instead of issuing true deletes; queries that need to see only live rows must use the FINAL modifier.Rolling back spans multiple batches. A single reorg can invalidate data written across many batches. Your rollback mechanism must undo all rows/documents written after the rollback point, not just the last batch.Idempotency of re-processing. After a rollback the stream replays from the rollback cursor forward. Write logic that is not idempotent (e.g. unconditional INSERT instead of UPSERT, incrementing a counter instead of setting it) will corrupt state on replay. Design writes so they are safe to run more than once for the same .Side effects that cannot be rolled back. Database writes can be undone; emails, webhook calls, and Kafka publishes cannot. Either defer all external side effects until the is finalized, or build a separate reconciliation layer. Treating unfinalized state as permanent is the most common source of production incidents in real-time blockchain consumers.
The cursor returned from fork() is inclusive. Return the last you consider good; the SDK resumes from cursor.number + 1. Off-by-one errors cause either duplicate re-processing or skipped .The cursor hash must be set. The SDK sends parentBlockHash = cursor.hash in the next request so the portal can detect the next fork. A cursor with a missing hash silently disables fork detection for that request.The cursor in write()’s read() call is only the initial startup cursor. pipeTo() handles post-fork cursor updates inside the read() generator; write() runs continuously through forks and is never restarted by the SDK. The cursor you pass to read() is only relevant if write() is re-invoked by an external retry mechanism. For in-memory implementations the cursor is effectively always undefined.Process restart loses in-memory rollback history. An in-memory rollback chain survives forks but not process restarts. After a restart you have no history. For services that must survive restarts, persist the rollback chain alongside application state and restore it on startup. See Cursor management for patterns.
The X-Sqd-Finalized-Head-Number header can go backwards. Portal instances behind a load balancer can be at different heights. When a reconnected stream lands on a lagging instance, the finalized value in batch.head.finalized may be lower than what was previously reported. Do not use the current batch’s finalized number as a pruning threshold directly.Treat the finalized head as a high-water mark. Maintain the highest finalized number seen across all batches and key all pruning on that value. For database-backed implementations this is critical: a DELETE keyed on the current (possibly lower) finalized number will over-retain rows on some batches, and under-retain them if the logic is structured the other way.A 409 from a lagging instance may have previousBlocks entirely below the high-water mark. Two cases:
  • All of previousBlocks are strictly below the high-water mark. The lagging instance’s sample doesn’t reach local history. Because the high-water mark is truly final, every correct instance agrees on it: the fork is somewhere above it. Return the high-water mark cursor. This requires storing the finalized head as a full BlockCursor (number and hash), not just a number — the hash is needed for the next request’s parentBlockHash.
  • Some of previousBlocks are at or above the high-water mark but no hash match is found. This is a genuine inconsistency at a height the client already considers final. Return null and surface the error.
Forks only occur in the real-time (unfinalized) portion of the stream. The /finalized-stream endpoint never returns a 409. Fork handling is only needed when consuming the /stream endpoint with fromBlock near or at the chain head. If your range is bounded and entirely in the past, you will never see a fork.parentBlockHash is the tripwire. Every request to the portal includes the hash of the last the client has seen. A mismatch triggers a 409. Anything that disrupts this — starting from a cursor with a wrong or missing hash, replaying from a checkpoint that has drifted from the chain — will produce spurious fork events.
rollbackChain is per-batch, not cumulative. It contains only the in this batch that are above the current finalized head. Treat it as a delta to append to running history, not as a full snapshot of the current unfinalized chain.** near the finality boundary move between finalized and unfinalized.** A that appears in one batch’s rollbackChain may be at or below the finalized head in the next batch. The pruning filter must remove these once they are finalized, or rollback history will slowly fill with that can never be the subject of a reorg.Empty rollbackChain is valid. It means either (a) the batch contained no above the finalized head, or (b) the finalized head was unknown. Do not treat an empty rollback chain as an error.
fork() is called synchronously relative to the batch stream. The SDK awaits fork() before resuming the stream. No new batches arrive while fork() is running. It is safe to mutate shared state inside fork() without additional locking.write() and fork() share mutable state without synchronization. This is safe only because the SDK never calls them concurrently. If you introduce background workers or async tasks that also read or write rollback state, you must add explicit synchronization.The order in which you update rollback history and application state matters. If you update application state first and crash before updating rollback history, the next restart will not know how far to roll back. Prefer database transactions that update both atomically, or update rollback history first so a crash leaves you conservative — you can always re-process a you have already seen.