Skip to main content

Documentation Index

Fetch the complete documentation index at: https://beta.docs.sqd.dev/llms.txt

Use this file to discover all available pages before exploring further.

Install the ClickHouse Node.js client:
npm install @clickhouse/client
At a glance, the pipeline looks like this:
import { createClient } from '@clickhouse/client'
import { clickhouseTarget } from '@subsquid/pipes/targets/clickhouse'

await solanaPortalSource({ ... }).pipeTo(
  clickhouseTarget({
    client: createClient({ url: 'http://localhost:8123' }),
    onData: async ({ store, data }) => {
      store.insert({ table: 'swaps', values: data.swap.map(...), format: 'JSONEachRow' })
    },
    onRollback: async ({ store, safeCursor }) => {
      await store.removeAllRows({ tables: ['swaps'], where: `slot > ${safeCursor.number}` })
    },
  }),
)

Table design

Use CollapsingMergeTree with a sign Int8 DEFAULT 1 column. This engine enables efficient fork rollbacks: to cancel rows, the target re-inserts them with sign = -1 and ClickHouse merges the pair during background processing.
CREATE TABLE IF NOT EXISTS swaps (
  slot              UInt64 CODEC(DoubleDelta, ZSTD),
  instruction_index UInt32,
  program_id        String,
  sign              Int8 DEFAULT 1
) ENGINE = CollapsingMergeTree(sign)
  ORDER BY (slot, instruction_index);
Design notes:
  • Apply DoubleDelta + ZSTD codecs to monotonically increasing columns such as block numbers and timestamps.
  • Use LowCardinality for columns with low cardinality like addresses to reduce storage and speed up filtering.
  • Store 256-bit integers as UInt256; serialize JavaScript BigInt values to strings before insertion.
Create the table in onStart using store.command():
onStart: async ({ store }) => {
  await store.command({ query: `CREATE TABLE IF NOT EXISTS transfers ( ... )` })
}

onData

Call store.insert() to queue an insert. The call is non-blocking — inserts fire concurrently and are fully flushed when the target closes:
onData: async ({ store, data }) => {
  store.insert({
    table: 'swaps',
    values: data.swap.map((d) => ({
      slot:              d.block.slot,
      instruction_index: d.instruction.instructionIndex,
      program_id:        d.programId,
    })),
    format: 'JSONEachRow',
  })
}

onRollback

Implement onRollback to handle blockchain forks. It is invoked in two situations:
  • type: 'offset_check' — on startup, when a saved cursor is found, to discard writes from a previous crashed or partial run
  • type: 'blockchain_fork' — when the stream detects a chain reorganisation
Use store.removeAllRows() to cancel rows past the safe point. For CollapsingMergeTree tables this re-inserts matching rows with sign = -1:
onRollback: async ({ store, safeCursor }) => {
  await store.removeAllRows({
    tables: ['swaps'],
    where: `slot > ${safeCursor.number}`,
  })
}

Complete example

import { solanaInstructionDecoder, solanaPortalSource } from '@subsquid/pipes/solana'
import { clickhouseTarget } from '@subsquid/pipes/targets/clickhouse'
import { createClient } from '@clickhouse/client'

const client = createClient({ url: 'http://localhost:8123' })

await solanaPortalSource({
  portal: 'https://portal.sqd.dev/datasets/solana-mainnet',
  outputs: solanaInstructionDecoder({
    range: { from: '340000000' },
    programId: '...',
    instructions: { swap: swapInstruction },
  }),
}).pipeTo(
  clickhouseTarget({
    client,
    onStart: async ({ store }) => {
      await store.command({
        query: `
          CREATE TABLE IF NOT EXISTS swaps (
            slot              UInt64 CODEC(DoubleDelta, ZSTD),
            instruction_index UInt32,
            program_id        String,
            sign              Int8 DEFAULT 1
          ) ENGINE = CollapsingMergeTree(sign)
            ORDER BY (slot, instruction_index)
        `,
      })
    },
    onData: async ({ store, data }) => {
      store.insert({
        table: 'swaps',
        values: data.swap.map((d) => ({
          slot:              d.block.slot,
          instruction_index: d.instruction.instructionIndex,
          program_id:        d.programId,
        })),
        format: 'JSONEachRow',
      })
    },
    onRollback: async ({ store, safeCursor }) => {
      await store.removeAllRows({
        tables: ['swaps'],
        where: `slot > ${safeCursor.number}`,
      })
    },
  }),
)

Docker setup

docker-compose.yml
services:
  clickhouse:
    image: clickhouse/clickhouse-server:latest
    ports:
      - "8123:8123"
      - "9000:9000"
    environment:
      CLICKHOUSE_DB: default
      CLICKHOUSE_USER: default
      CLICKHOUSE_PASSWORD: default
    volumes:
      - clickhouse-data:/var/lib/clickhouse

volumes:
  clickhouse-data:
docker compose up -d
See the clickhouseTarget reference for the full API.