Skip to main content
Learn how to manage indexing state, handle blockchain forks, and persist data.

Cursor Management

Resume indexing from a specific block using cursors.
const target = createTarget({
  write: async ({logger, read}) => {
    // Resume from block 20_000_300
    for await (const {data} of read({ number: 20_000_300 })) {
      console.log('Processing data from block:', data)
    }
  }
})
Pass a cursor to read() to resume processing from a specific block. The cursor format is { number: blockNumber }. This allows you to restart your indexer without reprocessing all historical data. Store the cursor after successfully processing each batch to enable resumption.

View full example on GitHub

Complete working code with cursor management

Fork Handling

Handle blockchain forks by tracking unfinalized blocks and rolling back when forks occur.
let recentUnfinalizedBlocks: BlockCursor[] = []

const target = createTarget({
  write: async ({read}) => {
    for await (const {data, ctx} of read(recentUnfinalizedBlocks[recentUnfinalizedBlocks.length-1])) {
      // Track unfinalized blocks from the batch
      ctx.state.rollbackChain.forEach((bc) => {
        recentUnfinalizedBlocks.push(bc)
      })
      
      // Prune finalized blocks
      if (ctx.head.finalized) {
        recentUnfinalizedBlocks = recentUnfinalizedBlocks.filter(
          b => b.number >= ctx.head.finalized!.number
        )
      }
    }
  },
  fork: async (previousBlocks) => {
    const rollbackIndex = findCommonAncestor(recentUnfinalizedBlocks, previousBlocks)
    if (rollbackIndex >= 0) {
      recentUnfinalizedBlocks.length = rollbackIndex + 1
      return recentUnfinalizedBlocks[rollbackIndex]
    }
    return null
  }
})
When the source detects a fork, it calls the fork() handler with blocks from the new consensus chain. The handler finds the common ancestor between your stored blocks and the new chain, then returns the cursor to resume from. Track unfinalized blocks in ctx.state.rollbackChain and prune them as blocks become finalized.

View full example on GitHub

Complete working code with fork handling

ClickHouse Target

Use ClickHouse for automatic cursor management and rollback handling.
const target = clickhouseTarget({
  client,
  onData: async ({ store, data, ctx }) => {
    store.insert({
      table: 'usdc_transfers',
      values: data.transfers.map(t => ({
        block_number: t.block.number,
        timestamp: t.timestamp.valueOf() / 1000,
        transaction_hash: t.rawEvent.transactionHash,
        from: t.event.from,
        to: t.event.to,
        value: t.event.value.toString()
      })),
      format: 'JSONEachRow'
    })
  },
  onRollback: async ({store, safeCursor}) => {
    await store.removeAllRows({
      tables: ['usdc_transfers'],
      where: `block_number > ${safeCursor.number}`
    })
  }
})
The clickhouseTarget handles cursor management and rollbacks automatically. Use onData to insert new data and onRollback to remove data after a fork point. The target stores cursors in a dedicated table and manages the entire lifecycle of data persistence and fork recovery.

View full example on GitHub

Complete working code with ClickHouse integration