Skip to main content

Documentation Index

Fetch the complete documentation index at: https://beta.docs.sqd.dev/llms.txt

Use this file to discover all available pages before exploring further.

A stateful transform is any step in your pipeline that produces output based on more than the current batch — for example, running balances, sliding-window aggregates, or enrichment lookups. This page surveys the available approaches and when to choose each one.

Before you add state

The cleanest solution is often to emit raw events and let the downstream database derive the state at query time. ClickHouse materialized views and Postgres views both work for this. If your logic can be expressed as SQL and you can tolerate slightly higher query latency, prefer this over transformer state — it eliminates crash recovery and fork handling entirely on the transformer side. If your logic is hard to express in SQL, or if the derived state must be pre-computed before reaching the target, read on.

At a glance

ApproachState lives inPersistence across restartsFork handlingExtra infraBest for
A. Pure in-RAMJS heaprebuilt from portal on startupfork() callbacknonesliding windows, candles, rolling aggregates
B. ClickHouse MVsClickHousesign = -1 rowsClickHouseSQL-expressible analytics
C. SQLite transformerLocal file✓ (delta table)fork() callbacknonemoderate state
D. Postgres/Drizzle targetPostgres✓ automaticPostgresatomic state + output, Postgres target
E. Apache FlinkFlink clustervia compensating eventsKafka + FlinkTB-scale distributed state
F. External KV storeRedis / Valkey✓ (with AOF/RDB)fork() callbackRedisµs-latency lookups, multi-process state

A. Pure in-RAM

Keep state in a JavaScript Map or array inside the transformer closure. No external storage is involved. When to use:
  • State can be derived from a bounded window of recent blocks (e.g., last N blocks or last M seconds).
  • You can afford to replay that window on restart (warm-up time ∝ window size).
  • State loss is contained: at most one window’s worth of history needs to be replayed.
When not to use:
  • State grows without bound (e.g., all-time ERC-20 balances). Use Postgres approach D instead — specifically the in-memory + Postgres mirror sub-approach.
  • The warm-up window is too large to replay quickly on every restart.

The warm-up pattern

When the process restarts, the target’s cursor tells you where the pipeline left off. The in-RAM state is gone. To rebuild it, call portal.getStream() in the start() callback — the raw portal client API, independent of the main stream:
start: async ({ portal, state, logger }) => {
  if (!state.current) return  // first ever run: start empty

  const warmupFrom = Math.max(state.initial, state.current.number - LOOKBACK_BLOCKS)
  if (warmupFrom >= state.current.number) return

  for await (const { blocks } of portal.getStream({
    type: 'evm',
    fromBlock: warmupFrom,
    toBlock: state.current.number,
    fields: { block: { number: true }, log: { data: true } },
    logs: [{ address: [CONTRACT], topic0: [EVENT_TOPIC] }],
  })) {
    for (const block of blocks) {
      for (const log of block.logs) {
        // rebuild in-RAM state from block and log fields
      }
    }
  }
}
portal is a live PortalClient already connected to the dataset. The warm-up query’s toBlock is the saved cursor, so it terminates immediately after the pipeline resumes from cursor + 1. Multiple in-RAM transformers each run their own start() warm-up in parallel (the SDK calls child start() callbacks concurrently).

Fork handling

target.fork() fires first (ClickHouse onRollback or drizzle snapshot rollback), then the transformer’s fork() callback. At that point the database already reflects pre-fork state. In fork(), drop in-RAM entries for blocks beyond the rollback cursor:
fork: async (cursor, { logger }) => {
  recentEntries = recentEntries.filter(e => e.blockNumber <= cursor.number)
}

Composability

Use the same initQueue/WriteQueue pattern as the Postgres examples. For ClickHouse targets the queue holds closures over ClickhouseStore instead of a Postgres Transaction:
type CHS = { insert(params: { table: string; values: unknown[]; format: string }): Promise<unknown> }

class WriteQueue {
  private ops: Array<(store: CHS) => Promise<void>> = []
  push(op: (store: CHS) => Promise<void>): void { this.ops.push(op) }
  async flush(store: CHS): Promise<void> { for (const op of this.ops) await op(store) }
}

B. ClickHouse materialized views

Write raw events to a base table; let ClickHouse compute derived state via materialized views (MVs). The transformer is stateless — it only emits events, not pre-computed state. When to use:
  • Your aggregation logic is expressible in SQL.
  • ClickHouse is already your target.
  • You want derived state updated automatically without any transformer code.
When not to use:
  • Logic requires imperative iteration (e.g., order-dependent simulation).
  • Each MV chain adds latency — avoid long dependency chains for latency-sensitive consumers.
  • Very frequent writes on lightweight data: prefer plain (non-materialized) views if you have spare CPU on the database machine.

The core limitation: MVs see only new rows

A materialized view fires when new rows are inserted into its source table. Its SELECT clause only operates on the newly inserted batch, not the full table. Running totals like cumulative balance cannot be written directly in the MV SELECT.

Workaround: auxiliary aggregating tables

Maintain a separate “current state” table using AggregatingMergeTree with argMaxState. The MV reads this table alongside the new rows to resolve the latest value before the current batch:
-- Stores latest balance per pool (AggregatingMergeTree = efficient upsert)
CREATE TABLE current_balances (
    pool_address        String,
    token_a_balance_raw AggregateFunction(argMax, Int256, Tuple(DateTime, UInt16, UInt16)),
    token_b_balance_raw AggregateFunction(argMax, Int256, Tuple(DateTime, UInt16, UInt16))
) ENGINE = AggregatingMergeTree()
ORDER BY pool_address;

-- MV that keeps current_balances up to date
CREATE MATERIALIZED VIEW current_balances_mv TO current_balances AS
SELECT
    pool_address,
    argMaxState(token_a_balance_raw, (timestamp, transaction_index, log_index)) AS token_a_balance_raw,
    argMaxState(token_b_balance_raw, (timestamp, transaction_index, log_index)) AS token_b_balance_raw
FROM balances_history
GROUP BY pool_address;
A downstream MV that needs the running balance queries current_balances with argMaxMerge():
latest_pool_balances AS (
    SELECT
        pool_address,
        argMaxMerge(token_a_balance_raw) AS balance_token_a_raw,
        argMaxMerge(token_b_balance_raw) AS balance_token_b_raw
    FROM current_balances
    WHERE pool_address IN (SELECT pool_address FROM unique_pools_to_insert)
    GROUP BY pool_address
)

Temporal joins with ASOF JOIN

When you need “the most recent price before each event”, use ASOF JOIN:
SELECT ...
FROM liquidity_events_raw ml
ASOF JOIN latest_prices wp
    ON wp.pool_address = ml.pool_address
    AND wp.ts_num + wp.transaction_index * 100_000 + wp.log_index
        <= ml.ts_num + ml.transaction_index * 100_000 + ml.log_index
WHERE ml.protocol = 'uniswap_v4'
The ASOF JOIN selects the latest row in latest_prices whose ordering key is ≤ the current event’s key — effectively “last price before this log”.

Fork rollback

ClickHouse is non-transactional. Use CollapsingMergeTree with a sign column: insert sign = 1 rows on the way forward and sign = -1 rows to cancel them on rollback. Your onRollback handler computes which blocks to cancel and inserts the negating rows.

C. SQLite transformer

Keep transformer state in a local SQLite database. The transformer reads and writes SQLite; the downstream target (typically ClickHouse) receives the pre-computed rows. When to use:
  • State is too large for RAM.
  • You need random-access lookups (e.g., “current balance of address X”) that would be slow as a linear scan of in-RAM arrays.
  • You’re not using Postgres as your target (otherwise see approach D for better atomicity).
  • The indexer runs on persistent infrastructure (SQLite file must survive restarts).
When not to use:
  • The indexer runs on ephemeral infrastructure (containers, spot VMs). SQLite is lost on restart.
  • State requires complex analytical SQL (window functions, multi-table joins) — consider DuckDB as a drop-in alternative with full analytical query support.

The delta table pattern

SQLite and the downstream target commit separately — a crash between them leaves the two out of sync. A balance_deltas table records the net change per address per block, allowing rollbackTo(blockNumber) to invert any set of blocks atomically:
// Schema
db.exec(`
  CREATE TABLE IF NOT EXISTS balance_deltas (
    address      TEXT    NOT NULL,
    block_number INTEGER NOT NULL,
    delta        TEXT    NOT NULL,
    PRIMARY KEY (address, block_number)
  )
`)

function rollbackTo(blockNumber: number) {
  db.transaction(() => {
    const deltas = db.prepare('SELECT address, delta FROM balance_deltas WHERE block_number > ?').all(blockNumber)
    const net = new Map<string, bigint>()
    for (const { address, delta } of deltas) net.set(address, (net.get(address) ?? 0n) + BigInt(delta))
    for (const [address, delta] of net) {
      const { balance } = db.prepare('SELECT balance FROM balances WHERE address = ?').get(address) as any
      db.prepare('INSERT INTO balances (address, balance) VALUES (?, ?) ON CONFLICT(address) DO UPDATE SET balance = excluded.balance')
        .run(address, (BigInt(balance) - delta).toString())
    }
    db.prepare('DELETE FROM balance_deltas WHERE block_number > ?').run(blockNumber)
    db.prepare('DELETE FROM processed_blocks WHERE block_number > ?').run(blockNumber)
  })()
}

Crash recovery in start()

Compare the SQLite high-water mark with the pipeline cursor. If SQLite is ahead, roll back to match:
start: async ({ state }) => {
  const sqliteLastBlock  = db.prepare('SELECT MAX(block_number) as m FROM processed_blocks').get().m ?? null
  const pipelineLastBlock = state.current?.number ?? null

  if (sqliteLastBlock !== null && (pipelineLastBlock === null || sqliteLastBlock > pipelineLastBlock)) {
    rollbackTo(pipelineLastBlock ?? -1)  // SQLite crashed ahead of cursor — roll back
  } else if (sqliteLastBlock !== pipelineLastBlock) {
    throw new Error(`State mismatch: SQLite=${sqliteLastBlock}, cursor=${pipelineLastBlock}. Delete the SQLite file to rebuild.`)
  }
}

Historical-only variant

If you’re indexing only finalized data and will never see forks, drop the delta table and accept that a crash requires rebuilding from scratch:
start: async ({ state }) => {
  if (sqliteLastBlock !== pipelineLastBlock) {
    throw new Error(`Delete ${SQLITE_DB_PATH} to rebuild.`)
  }
}

D. Postgres/Drizzle target

When your target is already Postgres, the drizzleTarget can commit transformer state and output rows inside the same serializable transaction as the cursor save. This gives the strongest atomicity guarantees of any approach: a crash between transform() and the cursor save is impossible because both commit together. When to use:
  • Postgres is your output target.
  • You want zero crash recovery code (atomicity handles it automatically).
  • Fork rollback should be automatic (drizzleTarget installs snapshot triggers).
When not to use:
  • Your target is ClickHouse or another non-Postgres database.
  • State is too large for Postgres (rare).

The WriteQueue / initQueue pattern

Multiple stateful transformers all need to write inside the same transaction. The WriteQueue collects their write closures; initQueue wraps each batch in a Piped<T> with a fresh queue; onData flushes everything:
class WriteQueue {
  private ops: Array<(tx: Transaction) => Promise<void>> = []
  push(op: (tx: Transaction) => Promise<void>): void { this.ops.push(op) }
  async flush(tx: Transaction): Promise<void> { for (const op of this.ops) await op(tx) }
}

function initQueue<T>() {
  return createTransformer<T, Piped<T>>({
    transform: (data) => ({ payload: data, writes: new WriteQueue() }),
  })
}

// Pipeline:
stream
  .pipe(initQueue<DecodedBatch>())
  .pipe(transformerA(db))
  .pipe(transformerB(db))
  .pipeTo(drizzleTarget({
    db,
    tables: [tableA, tableB],
    onData: async ({ tx, data }) => { await data.writes.flush(tx) },
  }))
onData stays a one-liner regardless of how many transformers are chained.

Sub-approach 1 — Stateless transform (per-batch DB reads)

transform() reads current state from Postgres (the last committed snapshot), computes the delta, and pushes write closures to the queue. No in-RAM Map survives between batches.
  • ✓ No RAM limit on state size
  • ✓ Zero fork handling code (snapshot triggers on tables cover rollback)
  • ✗ One SELECT … WHERE address IN (…) per batch

Sub-approach 2 — In-memory + Postgres mirror

start() loads the full state into in-RAM Maps. transform() reads/writes the Maps with no DB round trips per batch. fork() reloads the Maps from Postgres after drizzleTarget commits the snapshot rollback.
  • ✓ No per-batch DB reads — all reads from memory after startup
  • ✓ Fast for large batches with many distinct keys
  • ✗ Full state must fit in RAM
  • ✗ Startup time is O(state size)
  • fork() callbacks required to resync Maps after rollback
For both sub-approaches, all state tables must be listed in drizzleTarget’s tables array. This installs PostgreSQL snapshot triggers that roll them back automatically on a blockchain reorg. The onStart callback can run CREATE TABLE IF NOT EXISTS for quick setup; in production, use drizzle-kit migrations instead.

Apache Flink is a distributed stateful stream-processing framework. The Pipes SDK acts as a data source feeding Flink via Kafka or a direct connector. When to use:
  • State is too large for a single machine (terabytes).
  • Your problem requires stateful joins across multiple independent streams (e.g., correlate DEX trades with lending liquidations across different chains).
  • You need exactly-once semantics across multiple heterogeneous sinks.
When not to use:
  • Single-node deployments — the operational overhead (JVM runtime, cluster management, ZooKeeper or KRaft, checkpoint storage) is only justified when the problem genuinely requires distributed state.
Architecture: The Pipes SDK emits raw events to Kafka (one topic per event type). On a blockchain fork, it emits compensating rows (e.g., sign = -1) that Flink sees as normal data and can handle with a subtract-and-recompute pattern. Flink manages its own checkpoints; crash recovery is handled entirely by Flink.

F. External KV store

Use Redis, Valkey, or a similar key-value store as a fast external state backend. When to use:
  • Multiple parallel pipeline instances must share state (horizontal scaling of the indexer).
  • Per-key lookups must complete in under 1 ms (e.g., enriching 50 k events per second with metadata from a 100 M-entry map that doesn’t fit in RAM).
When not to use:
  • A single-process indexer is sufficient — adding Redis increases operational complexity for no benefit.
  • You need transactional state + output commits (use approach D instead).
Fork handling: The transformer’s fork() callback must delete or undo the Redis keys written for rolled-back blocks. Keep a per-block write log (similar to the SQLite delta table) to know which keys to revert. Crash safety: Redis is not durable by default. Enable AOF or RDB persistence, or treat Redis purely as a warm cache and accept that a Redis restart requires a replay from the pipeline cursor.

Fork callbacks and crash recovery

The fork handling responsibilities differ by approach:
Approachfork() needed in transformerHow DB state is rolled back
A. In-RAM✓ — prune entries > cursorn/a (state is RAM-only)
B. ClickHouse MVs✗ — handled in onRollbacksign = -1 rows via onRollback
C. SQLite✓ — calls rollbackTo(cursor.number)rollbackTo() reverts delta table
D. Postgres/drizzle✗ — automaticsnapshot triggers via tables
E. Flink✗ — compensating eventsFlink checkpoint rollback
F. External KV✓ — revert write logmanual key deletion
Ordering guarantee: target.fork() always fires before transformer fork() callbacks. By the time your transformer’s fork() runs, the target (ClickHouse onRollback, drizzleTarget snapshot rollback) has already committed the database rollback. It is safe to read the database in fork(). Crash recovery (approaches A, B, C only): a crash between the transformer’s store write and the target’s cursor save leaves state ahead of the pipeline cursor. Handle this in start() by comparing your local high-water mark to state.current:
start: async ({ state }) => {
  const localLastBlock    = /* read your checkpoint */
  const pipelineLastBlock = state.current?.number ?? null

  if (localLastBlock !== null && (pipelineLastBlock === null || localLastBlock > pipelineLastBlock)) {
    rollbackTo(pipelineLastBlock ?? -1)  // crash recovery: undo ahead-of-cursor writes
  }
}
Approaches D (Postgres/drizzle) and E (Flink) are immune to this problem: state and cursor commit atomically.

Composing multiple stateful transformers

When multiple stateful transformers write to the same target, they must not each independently call the target’s write API. Use the WriteQueue / initQueue pattern to collect all writes and flush them in a single onData call:
  1. initQueue<T>() wraps the raw batch in Piped<T> with a fresh WriteQueue. Place it as the first .pipe().
  2. Each transformer receives Piped<T>, pushes closures to writes, and returns Piped<T> unchanged.
  3. onData calls data.writes.flush(store_or_tx) — a one-liner that scales to any number of transformers.
type Piped<T> = { payload: T; writes: WriteQueue }

function initQueue<T>() {
  return createTransformer<T, Piped<T>>({
    transform: (data) => ({ payload: data, writes: new WriteQueue() }),
  })
}
Because every domain transformer takes Piped<T> as input and produces Piped<T> as output, none of them assume a fixed position in the chain — they are all order-independent and can be added or removed without touching the others. For Postgres targets, WriteQueue closures take a Transaction; for ClickHouse, they take the structural CHS type shown in approach A. The pattern is identical in both cases.