A stateful transform is any step in your pipeline that produces output based on more than the current batch — for example, running balances, sliding-window aggregates, or enrichment lookups. This page surveys the available approaches and when to choose each one.Documentation Index
Fetch the complete documentation index at: https://beta.docs.sqd.dev/llms.txt
Use this file to discover all available pages before exploring further.
Before you add state
The cleanest solution is often to emit raw events and let the downstream database derive the state at query time. ClickHouse materialized views and Postgres views both work for this. If your logic can be expressed as SQL and you can tolerate slightly higher query latency, prefer this over transformer state — it eliminates crash recovery and fork handling entirely on the transformer side. If your logic is hard to express in SQL, or if the derived state must be pre-computed before reaching the target, read on.At a glance
| Approach | State lives in | Persistence across restarts | Fork handling | Extra infra | Best for |
|---|---|---|---|---|---|
| A. Pure in-RAM | JS heap | rebuilt from portal on startup | fork() callback | none | sliding windows, candles, rolling aggregates |
| B. ClickHouse MVs | ClickHouse | ✓ | sign = -1 rows | ClickHouse | SQL-expressible analytics |
| C. SQLite transformer | Local file | ✓ (delta table) | fork() callback | none | moderate state |
| D. Postgres/Drizzle target | Postgres | ✓ | ✓ automatic | Postgres | atomic state + output, Postgres target |
| E. Apache Flink | Flink cluster | ✓ | via compensating events | Kafka + Flink | TB-scale distributed state |
| F. External KV store | Redis / Valkey | ✓ (with AOF/RDB) | fork() callback | Redis | µs-latency lookups, multi-process state |
A. Pure in-RAM
Keep state in a JavaScriptMap or array inside the transformer closure. No external storage is involved.
When to use:
- State can be derived from a bounded window of recent blocks (e.g., last N blocks or last M seconds).
- You can afford to replay that window on restart (warm-up time ∝ window size).
- State loss is contained: at most one window’s worth of history needs to be replayed.
- State grows without bound (e.g., all-time ERC-20 balances). Use Postgres approach D instead — specifically the in-memory + Postgres mirror sub-approach.
- The warm-up window is too large to replay quickly on every restart.
The warm-up pattern
When the process restarts, the target’s cursor tells you where the pipeline left off. The in-RAM state is gone. To rebuild it, callportal.getStream() in the start() callback — the raw portal client API, independent of the main stream:
portal is a live PortalClient already connected to the dataset. The warm-up query’s toBlock is the saved cursor, so it terminates immediately after the pipeline resumes from cursor + 1. Multiple in-RAM transformers each run their own start() warm-up in parallel (the SDK calls child start() callbacks concurrently).
Fork handling
target.fork() fires first (ClickHouse onRollback or drizzle snapshot rollback), then the transformer’s fork() callback. At that point the database already reflects pre-fork state. In fork(), drop in-RAM entries for blocks beyond the rollback cursor:
Composability
Use the sameinitQueue/WriteQueue pattern as the Postgres examples. For ClickHouse targets the queue holds closures over ClickhouseStore instead of a Postgres Transaction:
B. ClickHouse materialized views
Write raw events to a base table; let ClickHouse compute derived state via materialized views (MVs). The transformer is stateless — it only emits events, not pre-computed state. When to use:- Your aggregation logic is expressible in SQL.
- ClickHouse is already your target.
- You want derived state updated automatically without any transformer code.
- Logic requires imperative iteration (e.g., order-dependent simulation).
- Each MV chain adds latency — avoid long dependency chains for latency-sensitive consumers.
- Very frequent writes on lightweight data: prefer plain (non-materialized) views if you have spare CPU on the database machine.
The core limitation: MVs see only new rows
A materialized view fires when new rows are inserted into its source table. ItsSELECT clause only operates on the newly inserted batch, not the full table. Running totals like cumulative balance cannot be written directly in the MV SELECT.
Workaround: auxiliary aggregating tables
Maintain a separate “current state” table usingAggregatingMergeTree with argMaxState. The MV reads this table alongside the new rows to resolve the latest value before the current batch:
current_balances with argMaxMerge():
Temporal joins with ASOF JOIN
When you need “the most recent price before each event”, useASOF JOIN:
ASOF JOIN selects the latest row in latest_prices whose ordering key is ≤ the current event’s key — effectively “last price before this log”.
Fork rollback
ClickHouse is non-transactional. UseCollapsingMergeTree with a sign column: insert sign = 1 rows on the way forward and sign = -1 rows to cancel them on rollback. Your onRollback handler computes which blocks to cancel and inserts the negating rows.
C. SQLite transformer
Keep transformer state in a local SQLite database. The transformer reads and writes SQLite; the downstream target (typically ClickHouse) receives the pre-computed rows. When to use:- State is too large for RAM.
- You need random-access lookups (e.g., “current balance of address X”) that would be slow as a linear scan of in-RAM arrays.
- You’re not using Postgres as your target (otherwise see approach D for better atomicity).
- The indexer runs on persistent infrastructure (SQLite file must survive restarts).
- The indexer runs on ephemeral infrastructure (containers, spot VMs). SQLite is lost on restart.
- State requires complex analytical SQL (window functions, multi-table joins) — consider DuckDB as a drop-in alternative with full analytical query support.
The delta table pattern
SQLite and the downstream target commit separately — a crash between them leaves the two out of sync. Abalance_deltas table records the net change per address per block, allowing rollbackTo(blockNumber) to invert any set of blocks atomically:
Crash recovery in start()
Compare the SQLite high-water mark with the pipeline cursor. If SQLite is ahead, roll back to match:Historical-only variant
If you’re indexing only finalized data and will never see forks, drop the delta table and accept that a crash requires rebuilding from scratch:D. Postgres/Drizzle target
When your target is already Postgres, thedrizzleTarget can commit transformer state and output rows inside the same serializable transaction as the cursor save. This gives the strongest atomicity guarantees of any approach: a crash between transform() and the cursor save is impossible because both commit together.
When to use:
- Postgres is your output target.
- You want zero crash recovery code (atomicity handles it automatically).
- Fork rollback should be automatic (drizzleTarget installs snapshot triggers).
- Your target is ClickHouse or another non-Postgres database.
- State is too large for Postgres (rare).
The WriteQueue / initQueue pattern
Multiple stateful transformers all need to write inside the same transaction. TheWriteQueue collects their write closures; initQueue wraps each batch in a Piped<T> with a fresh queue; onData flushes everything:
onData stays a one-liner regardless of how many transformers are chained.
Sub-approach 1 — Stateless transform (per-batch DB reads)
transform() reads current state from Postgres (the last committed snapshot), computes the delta, and pushes write closures to the queue. No in-RAM Map survives between batches.
- ✓ No RAM limit on state size
- ✓ Zero fork handling code (snapshot triggers on
tablescover rollback) - ✗ One
SELECT … WHERE address IN (…)per batch
Sub-approach 2 — In-memory + Postgres mirror
start() loads the full state into in-RAM Maps. transform() reads/writes the Maps with no DB round trips per batch. fork() reloads the Maps from Postgres after drizzleTarget commits the snapshot rollback.
- ✓ No per-batch DB reads — all reads from memory after startup
- ✓ Fast for large batches with many distinct keys
- ✗ Full state must fit in RAM
- ✗ Startup time is O(state size)
- ✗
fork()callbacks required to resync Maps after rollback
For both sub-approaches, all state tables must be listed in
drizzleTarget’s tables array. This installs PostgreSQL snapshot triggers that roll them back automatically on a blockchain reorg. The onStart callback can run CREATE TABLE IF NOT EXISTS for quick setup; in production, use drizzle-kit migrations instead.E. Apache Flink
Apache Flink is a distributed stateful stream-processing framework. The Pipes SDK acts as a data source feeding Flink via Kafka or a direct connector. When to use:- State is too large for a single machine (terabytes).
- Your problem requires stateful joins across multiple independent streams (e.g., correlate DEX trades with lending liquidations across different chains).
- You need exactly-once semantics across multiple heterogeneous sinks.
- Single-node deployments — the operational overhead (JVM runtime, cluster management, ZooKeeper or KRaft, checkpoint storage) is only justified when the problem genuinely requires distributed state.
sign = -1) that Flink sees as normal data and can handle with a subtract-and-recompute pattern. Flink manages its own checkpoints; crash recovery is handled entirely by Flink.
F. External KV store
Use Redis, Valkey, or a similar key-value store as a fast external state backend. When to use:- Multiple parallel pipeline instances must share state (horizontal scaling of the indexer).
- Per-key lookups must complete in under 1 ms (e.g., enriching 50 k events per second with metadata from a 100 M-entry map that doesn’t fit in RAM).
- A single-process indexer is sufficient — adding Redis increases operational complexity for no benefit.
- You need transactional state + output commits (use approach D instead).
fork() callback must delete or undo the Redis keys written for rolled-back blocks. Keep a per-block write log (similar to the SQLite delta table) to know which keys to revert.
Crash safety: Redis is not durable by default. Enable AOF or RDB persistence, or treat Redis purely as a warm cache and accept that a Redis restart requires a replay from the pipeline cursor.
Fork callbacks and crash recovery
The fork handling responsibilities differ by approach:| Approach | fork() needed in transformer | How DB state is rolled back |
|---|---|---|
| A. In-RAM | ✓ — prune entries > cursor | n/a (state is RAM-only) |
| B. ClickHouse MVs | ✗ — handled in onRollback | sign = -1 rows via onRollback |
| C. SQLite | ✓ — calls rollbackTo(cursor.number) | rollbackTo() reverts delta table |
| D. Postgres/drizzle | ✗ — automatic | snapshot triggers via tables |
| E. Flink | ✗ — compensating events | Flink checkpoint rollback |
| F. External KV | ✓ — revert write log | manual key deletion |
target.fork() always fires before transformer fork() callbacks. By the time your transformer’s fork() runs, the target (ClickHouse onRollback, drizzleTarget snapshot rollback) has already committed the database rollback. It is safe to read the database in fork().
Crash recovery (approaches A, B, C only): a crash between the transformer’s store write and the target’s cursor save leaves state ahead of the pipeline cursor. Handle this in start() by comparing your local high-water mark to state.current:
Composing multiple stateful transformers
When multiple stateful transformers write to the same target, they must not each independently call the target’s write API. Use theWriteQueue / initQueue pattern to collect all writes and flush them in a single onData call:
initQueue<T>()wraps the raw batch inPiped<T>with a freshWriteQueue. Place it as the first.pipe().- Each transformer receives
Piped<T>, pushes closures towrites, and returnsPiped<T>unchanged. onDatacallsdata.writes.flush(store_or_tx)— a one-liner that scales to any number of transformers.
Piped<T> as input and produces Piped<T> as output, none of them assume a fixed position in the chain — they are all order-independent and can be added or removed without touching the others.
For Postgres targets, WriteQueue closures take a Transaction; for ClickHouse, they take the structural CHS type shown in approach A. The pattern is identical in both cases.