Skip to main content

Install Client

npm install @clickhouse/client

clickhouseTarget

ClickHouse target with automatic cursor management.
import { createClient } from "@clickhouse/client";
import { clickhouseTarget } from "@subsquid/pipes/targets/clickhouse";

async function main() {
  const client = createClient({
    username: "default",
    password: "default",
    url: "http://localhost:8123",
  });

  const target = clickhouseTarget({
    client,
    onData: async ({ store, data, ctx }) => {
      store.insert({
        table: "transfers",
        values: data.transfer.map((t) => ({
          block_number: t.block.number,
          timestamp: t.timestamp.valueOf() / 1000,
          transaction_hash: t.rawEvent.transactionHash,
          from_address: t.event.from,
          to_address: t.event.to,
          value: t.event.value.toString(),
        })),
        format: "JSONEachRow",
      });
    },
    onRollback: async ({ store, safeCursor }) => {
      await store.removeAllRows({
        tables: ["transfers"],
        where: `block_number > ${safeCursor.number}`,
      });
    },
  });

  await source.pipe(decoder).pipeTo(target);
}

void main();

Create Table

CREATE TABLE IF NOT EXISTS transfers (
  block_number          UInt32 CODEC (DoubleDelta, ZSTD),
  timestamp             DateTime CODEC (DoubleDelta, ZSTD),
  transaction_hash      String,
  log_index             UInt16,
  from_address          LowCardinality(FixedString(42)),
  to_address            LowCardinality(FixedString(42)),
  value                 UInt256,
  sign                  Int8 DEFAULT 1
)
ENGINE = CollapsingMergeTree(sign)
ORDER BY (block_number, transaction_hash, log_index);
Run with client:
async function main() {
  await client.command({
    query: `
      CREATE TABLE IF NOT EXISTS transfers (
        block_number UInt32 CODEC (DoubleDelta, ZSTD),
        timestamp DateTime CODEC (DoubleDelta, ZSTD),
        transaction_hash String,
        log_index UInt16,
        from_address LowCardinality(FixedString(42)),
        to_address LowCardinality(FixedString(42)),
        value UInt256,
        sign Int8 DEFAULT 1
      )
      ENGINE = CollapsingMergeTree(sign)
      ORDER BY (block_number, transaction_hash, log_index)
    `,
  });
}

void main();

onData

Insert data with store helper:
onData: async ({ store, data, ctx }) => {
  const { logger, profiler } = ctx;
  const span = profiler.start("insert");

  logger.info(`Inserting ${data.transfer.length} transfers`);

  store.insert({
    table: "transfers",
    values: data.transfer.map((t) => ({
      block_number: t.block.number,
      timestamp: t.timestamp.valueOf() / 1000,
      transaction_hash: t.rawEvent.transactionHash,
      log_index: t.rawEvent.logIndex,
      from_address: t.event.from,
      to_address: t.event.to,
      value: t.event.value.toString(),
    })),
    format: "JSONEachRow",
  });

  span.end();
};

Store Methods

store.insert() - Queue insert:
store.insert({
  table: 'transfers',
  values: [...],
  format: 'JSONEachRow'
})
store.removeAllRows() - Delete rows:
async function main() {
  await store.removeAllRows({
    tables: ["transfers"],
    where: `block_number > ${cursor.number}`,
  });
}

void main();

onRollback

Handle blockchain forks:
onRollback: async ({ store, safeCursor, type }) => {
  console.log(`Fork at block ${safeCursor.number}`);

  try {
    await store.removeAllRows({
      tables: ["transfers"],
      where: `block_number > ${safeCursor.number}`,
    });
  } catch (err) {
    console.error("Rollback failed:", err);
    throw err;
  }
};

Complete Example

import { createClient } from "@clickhouse/client";
import {
  evmPortalSource,
  evmDecoder,
  commonAbis,
} from "@subsquid/pipes/evm";
import { clickhouseTarget } from "@subsquid/pipes/targets/clickhouse";

async function main() {
  // Create ClickHouse client
  const client = createClient({
    username: "default",
    password: "default",
    url: "http://localhost:8123",
  });

  // Create table
  await client.command({
    query: `
      CREATE TABLE IF NOT EXISTS usdc_transfers (
        block_number UInt32 CODEC (DoubleDelta, ZSTD),
        timestamp DateTime CODEC (DoubleDelta, ZSTD),
        transaction_hash String,
        log_index UInt16,
        from_address LowCardinality(FixedString(42)),
        to_address LowCardinality(FixedString(42)),
        value UInt256,
        sign Int8 DEFAULT 1
      )
      ENGINE = CollapsingMergeTree(sign)
      ORDER BY (block_number, transaction_hash, log_index)
    `,
  });

  // Create source
  const source = evmPortalSource({
    portal: "https://portal.sqd.dev/datasets/ethereum-mainnet",
  });

  // Create decoder
  const decoder = evmDecoder({
    range: { from: "latest" },
    contracts: ["0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48"],
    events: {
      transfer: commonAbis.erc20.events.Transfer,
    },
  });

  // Create target
  const target = clickhouseTarget({
    client,
    onData: async ({ store, data, ctx }) => {
      console.log(`Inserting ${data.transfer.length} transfers`);

      store.insert({
        table: "usdc_transfers",
        values: data.transfer.map((t) => ({
          block_number: t.block.number,
          timestamp: t.timestamp.valueOf() / 1000,
          transaction_hash: t.rawEvent.transactionHash,
          log_index: t.rawEvent.logIndex,
          from_address: t.event.from,
          to_address: t.event.to,
          value: t.event.value.toString(),
        })),
        format: "JSONEachRow",
      });
    },
    onRollback: async ({ store, safeCursor }) => {
      await store.removeAllRows({
        tables: ["usdc_transfers"],
        where: `block_number > ${safeCursor.number}`,
      });
    },
  });

  // Run pipeline
  await source.pipe(decoder).pipeTo(target);
}

void main();

Table Design

Use CollapsingMergeTree for fork handling:
ENGINE = CollapsingMergeTree(sign)
ORDER BY (block_number, transaction_hash, log_index)

Codecs

Use compression codecs:
block_number UInt32 CODEC (DoubleDelta, ZSTD),
timestamp DateTime CODEC (DoubleDelta, ZSTD)

Address Storage

Use LowCardinality for repeated values:
from_address LowCardinality(FixedString(42))

BigInt Values

Store as String:
value UInt256
-- or
value String  -- for very large numbers

Data Flow

Querying Data

-- Recent transfers
SELECT * FROM transfers
ORDER BY block_number DESC
LIMIT 100;

-- Top senders
SELECT
  from_address,
  count() as transfer_count,
  sum(value) as total_value
FROM transfers
GROUP BY from_address
ORDER BY total_value DESC
LIMIT 10;

-- Daily volume
SELECT
  toDate(timestamp) as date,
  count() as transfer_count,
  sum(value) / 1e6 as total_usdc
FROM transfers
GROUP BY date
ORDER BY date;

Docker Setup

docker-compose.yml
version: "3.8"

services:
  clickhouse:
    image: clickhouse/clickhouse-server:latest
    ports:
      - "8123:8123"
      - "9000:9000"
    environment:
      CLICKHOUSE_DB: default
      CLICKHOUSE_USER: default
      CLICKHOUSE_PASSWORD: default
    volumes:
      - clickhouse-data:/var/lib/clickhouse

volumes:
  clickhouse-data:
Start:
docker compose up -d