Install Client
Copy
npm install @clickhouse/client
clickhouseTarget
ClickHouse target with automatic cursor management.Copy
import { createClient } from "@clickhouse/client";
import { clickhouseTarget } from "@subsquid/pipes/targets/clickhouse";
async function main() {
const client = createClient({
username: "default",
password: "default",
url: "http://localhost:8123",
});
const target = clickhouseTarget({
client,
onData: async ({ store, data, ctx }) => {
store.insert({
table: "transfers",
values: data.transfer.map((t) => ({
block_number: t.block.number,
timestamp: t.timestamp.valueOf() / 1000,
transaction_hash: t.rawEvent.transactionHash,
from_address: t.event.from,
to_address: t.event.to,
value: t.event.value.toString(),
})),
format: "JSONEachRow",
});
},
onRollback: async ({ store, safeCursor }) => {
await store.removeAllRows({
tables: ["transfers"],
where: `block_number > ${safeCursor.number}`,
});
},
});
await source.pipe(decoder).pipeTo(target);
}
void main();
Create Table
Copy
CREATE TABLE IF NOT EXISTS transfers (
block_number UInt32 CODEC (DoubleDelta, ZSTD),
timestamp DateTime CODEC (DoubleDelta, ZSTD),
transaction_hash String,
log_index UInt16,
from_address LowCardinality(FixedString(42)),
to_address LowCardinality(FixedString(42)),
value UInt256,
sign Int8 DEFAULT 1
)
ENGINE = CollapsingMergeTree(sign)
ORDER BY (block_number, transaction_hash, log_index);
Copy
async function main() {
await client.command({
query: `
CREATE TABLE IF NOT EXISTS transfers (
block_number UInt32 CODEC (DoubleDelta, ZSTD),
timestamp DateTime CODEC (DoubleDelta, ZSTD),
transaction_hash String,
log_index UInt16,
from_address LowCardinality(FixedString(42)),
to_address LowCardinality(FixedString(42)),
value UInt256,
sign Int8 DEFAULT 1
)
ENGINE = CollapsingMergeTree(sign)
ORDER BY (block_number, transaction_hash, log_index)
`,
});
}
void main();
onData
Insert data with store helper:Copy
onData: async ({ store, data, ctx }) => {
const { logger, profiler } = ctx;
const span = profiler.start("insert");
logger.info(`Inserting ${data.transfer.length} transfers`);
store.insert({
table: "transfers",
values: data.transfer.map((t) => ({
block_number: t.block.number,
timestamp: t.timestamp.valueOf() / 1000,
transaction_hash: t.rawEvent.transactionHash,
log_index: t.rawEvent.logIndex,
from_address: t.event.from,
to_address: t.event.to,
value: t.event.value.toString(),
})),
format: "JSONEachRow",
});
span.end();
};
Store Methods
store.insert() - Queue insert:Copy
store.insert({
table: 'transfers',
values: [...],
format: 'JSONEachRow'
})
Copy
async function main() {
await store.removeAllRows({
tables: ["transfers"],
where: `block_number > ${cursor.number}`,
});
}
void main();
onRollback
Handle blockchain forks:Copy
onRollback: async ({ store, safeCursor, type }) => {
console.log(`Fork at block ${safeCursor.number}`);
try {
await store.removeAllRows({
tables: ["transfers"],
where: `block_number > ${safeCursor.number}`,
});
} catch (err) {
console.error("Rollback failed:", err);
throw err;
}
};
Complete Example
Copy
import { createClient } from "@clickhouse/client";
import {
evmPortalSource,
evmDecoder,
commonAbis,
} from "@subsquid/pipes/evm";
import { clickhouseTarget } from "@subsquid/pipes/targets/clickhouse";
async function main() {
// Create ClickHouse client
const client = createClient({
username: "default",
password: "default",
url: "http://localhost:8123",
});
// Create table
await client.command({
query: `
CREATE TABLE IF NOT EXISTS usdc_transfers (
block_number UInt32 CODEC (DoubleDelta, ZSTD),
timestamp DateTime CODEC (DoubleDelta, ZSTD),
transaction_hash String,
log_index UInt16,
from_address LowCardinality(FixedString(42)),
to_address LowCardinality(FixedString(42)),
value UInt256,
sign Int8 DEFAULT 1
)
ENGINE = CollapsingMergeTree(sign)
ORDER BY (block_number, transaction_hash, log_index)
`,
});
// Create source
const source = evmPortalSource({
portal: "https://portal.sqd.dev/datasets/ethereum-mainnet",
});
// Create decoder
const decoder = evmDecoder({
range: { from: "latest" },
contracts: ["0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48"],
events: {
transfer: commonAbis.erc20.events.Transfer,
},
});
// Create target
const target = clickhouseTarget({
client,
onData: async ({ store, data, ctx }) => {
console.log(`Inserting ${data.transfer.length} transfers`);
store.insert({
table: "usdc_transfers",
values: data.transfer.map((t) => ({
block_number: t.block.number,
timestamp: t.timestamp.valueOf() / 1000,
transaction_hash: t.rawEvent.transactionHash,
log_index: t.rawEvent.logIndex,
from_address: t.event.from,
to_address: t.event.to,
value: t.event.value.toString(),
})),
format: "JSONEachRow",
});
},
onRollback: async ({ store, safeCursor }) => {
await store.removeAllRows({
tables: ["usdc_transfers"],
where: `block_number > ${safeCursor.number}`,
});
},
});
// Run pipeline
await source.pipe(decoder).pipeTo(target);
}
void main();
Table Design
Recommended Engine
UseCollapsingMergeTree for fork handling:
Copy
ENGINE = CollapsingMergeTree(sign)
ORDER BY (block_number, transaction_hash, log_index)
Codecs
Use compression codecs:Copy
block_number UInt32 CODEC (DoubleDelta, ZSTD),
timestamp DateTime CODEC (DoubleDelta, ZSTD)
Address Storage
UseLowCardinality for repeated values:
Copy
from_address LowCardinality(FixedString(42))
BigInt Values
Store as String:Copy
value UInt256
-- or
value String -- for very large numbers
Data Flow
Querying Data
Copy
-- Recent transfers
SELECT * FROM transfers
ORDER BY block_number DESC
LIMIT 100;
-- Top senders
SELECT
from_address,
count() as transfer_count,
sum(value) as total_value
FROM transfers
GROUP BY from_address
ORDER BY total_value DESC
LIMIT 10;
-- Daily volume
SELECT
toDate(timestamp) as date,
count() as transfer_count,
sum(value) / 1e6 as total_usdc
FROM transfers
GROUP BY date
ORDER BY date;
Docker Setup
docker-compose.yml
Copy
version: "3.8"
services:
clickhouse:
image: clickhouse/clickhouse-server:latest
ports:
- "8123:8123"
- "9000:9000"
environment:
CLICKHOUSE_DB: default
CLICKHOUSE_USER: default
CLICKHOUSE_PASSWORD: default
volumes:
- clickhouse-data:/var/lib/clickhouse
volumes:
clickhouse-data:
Copy
docker compose up -d

