Skip to main content

Documentation Index

Fetch the complete documentation index at: https://beta.docs.sqd.dev/llms.txt

Use this file to discover all available pages before exploring further.

Install the ClickHouse Node.js client:
npm install @clickhouse/client
At a glance, the pipeline looks like this:
import { createClient } from '@clickhouse/client'
import { clickhouseTarget } from '@subsquid/pipes/targets/clickhouse'

await evmPortalSource({ ... }).pipeTo(
  clickhouseTarget({
    client: createClient({ url: 'http://localhost:8123' }),
    onData: async ({ store, data }) => {
      store.insert({ table: 'transfers', values: data.transfers.map(...), format: 'JSONEachRow' })
    },
    onRollback: async ({ store, safeCursor }) => {
      await store.removeAllRows({ tables: ['transfers'], where: `block_number > ${safeCursor.number}` })
    },
  }),
)

Table design

Use CollapsingMergeTree with a sign Int8 DEFAULT 1 column. This engine enables efficient fork rollbacks: to cancel rows, the target re-inserts them with sign = -1 and ClickHouse merges the pair during background processing.
CREATE TABLE IF NOT EXISTS transfers (
  block_number     UInt32  CODEC(DoubleDelta, ZSTD),
  transaction_hash String,
  log_index        UInt16,
  from_address     LowCardinality(FixedString(42)),
  to_address       LowCardinality(FixedString(42)),
  value            UInt256,
  sign             Int8 DEFAULT 1
) ENGINE = CollapsingMergeTree(sign)
  ORDER BY (block_number, transaction_hash, log_index);
Design notes:
  • Apply DoubleDelta + ZSTD codecs to monotonically increasing columns such as block numbers and timestamps.
  • Use LowCardinality for columns with low cardinality like addresses to reduce storage and speed up filtering.
  • Store 256-bit integers as UInt256; serialize JavaScript BigInt values to strings before insertion.
Create the table in onStart using store.command():
onStart: async ({ store }) => {
  await store.command({ query: `CREATE TABLE IF NOT EXISTS transfers ( ... )` })
}

onData

Call store.insert() to queue an insert. The call is non-blocking — inserts fire concurrently and are fully flushed when the target closes:
onData: async ({ store, data }) => {
  store.insert({
    table: 'transfers',
    values: data.transfers.map((t) => ({
      block_number:     t.block.number,
      transaction_hash: t.rawEvent.transactionHash,
      log_index:        t.rawEvent.logIndex,
      from_address:     t.event.from,
      to_address:       t.event.to,
      value:            t.event.value.toString(),
    })),
    format: 'JSONEachRow',
  })
}

onRollback

Implement onRollback to handle blockchain forks. It is invoked in two situations:
  • type: 'offset_check' — on startup, when a saved cursor is found, to discard writes from a previous crashed or partial run
  • type: 'blockchain_fork' — when the stream detects a chain reorganisation
Use store.removeAllRows() to cancel rows past the safe point. For CollapsingMergeTree tables this re-inserts matching rows with sign = -1:
onRollback: async ({ store, safeCursor }) => {
  await store.removeAllRows({
    tables: ['transfers'],
    where: `block_number > ${safeCursor.number}`,
  })
}

Complete example

import { commonAbis, evmDecoder, evmPortalSource } from '@subsquid/pipes/evm'
import { clickhouseTarget } from '@subsquid/pipes/targets/clickhouse'
import { createClient } from '@clickhouse/client'

const client = createClient({ url: 'http://localhost:8123' })

await evmPortalSource({
  portal: 'https://portal.sqd.dev/datasets/ethereum-mainnet',
  outputs: evmDecoder({
    range: { from: 'latest' },
    events: { transfers: commonAbis.erc20.events.Transfer },
  }),
}).pipeTo(
  clickhouseTarget({
    client,
    onStart: async ({ store }) => {
      await store.command({
        query: `
          CREATE TABLE IF NOT EXISTS transfers (
            block_number     UInt32  CODEC(DoubleDelta, ZSTD),
            transaction_hash String,
            log_index        UInt16,
            from_address     LowCardinality(FixedString(42)),
            to_address       LowCardinality(FixedString(42)),
            value            UInt256,
            sign             Int8 DEFAULT 1
          ) ENGINE = CollapsingMergeTree(sign)
            ORDER BY (block_number, transaction_hash, log_index)
        `,
      })
    },
    onData: async ({ store, data }) => {
      store.insert({
        table: 'transfers',
        values: data.transfers.map((t) => ({
          block_number:     t.block.number,
          transaction_hash: t.rawEvent.transactionHash,
          log_index:        t.rawEvent.logIndex,
          from_address:     t.event.from,
          to_address:       t.event.to,
          value:            t.event.value.toString(),
        })),
        format: 'JSONEachRow',
      })
    },
    onRollback: async ({ store, safeCursor }) => {
      await store.removeAllRows({
        tables: ['transfers'],
        where: `block_number > ${safeCursor.number}`,
      })
    },
  }),
)

Docker setup

docker-compose.yml
services:
  clickhouse:
    image: clickhouse/clickhouse-server:latest
    ports:
      - "8123:8123"
      - "9000:9000"
    environment:
      CLICKHOUSE_DB: default
      CLICKHOUSE_USER: default
      CLICKHOUSE_PASSWORD: default
    volumes:
      - clickhouse-data:/var/lib/clickhouse

volumes:
  clickhouse-data:
docker compose up -d
See the clickhouseTarget reference for the full API.