Documentation Index Fetch the complete documentation index at: https://beta.docs.sqd.dev/llms.txt
Use this file to discover all available pages before exploring further.
Install the ClickHouse Node.js client:
npm install @clickhouse/client
At a glance, the pipeline looks like this:
import { createClient } from '@clickhouse/client'
import { clickhouseTarget } from '@subsquid/pipes/targets/clickhouse'
await solanaPortalSource ({ ... }). pipeTo (
clickhouseTarget ({
client: createClient ({ url: 'http://localhost:8123' }),
onData : async ({ store , data }) => {
store . insert ({ table: 'swaps' , values: data . swap . map ( ... ), format: 'JSONEachRow' })
},
onRollback : async ({ store , safeCursor }) => {
await store . removeAllRows ({ tables: [ 'swaps' ], where: `slot > ${ safeCursor . number } ` })
},
}),
)
Table design
Use CollapsingMergeTree with a sign Int8 DEFAULT 1 column. This engine enables efficient fork rollbacks: to cancel rows, the target re-inserts them with sign = -1 and ClickHouse merges the pair during background processing.
CREATE TABLE IF NOT EXISTS swaps (
slot UInt64 CODEC(DoubleDelta, ZSTD),
instruction_index UInt32,
program_id String,
sign Int8 DEFAULT 1
) ENGINE = CollapsingMergeTree(sign)
ORDER BY (slot, instruction_index);
Design notes:
Apply DoubleDelta + ZSTD codecs to monotonically increasing columns such as block numbers and timestamps.
Use LowCardinality for columns with low cardinality like addresses to reduce storage and speed up filtering.
Store 256-bit integers as UInt256; serialize JavaScript BigInt values to strings before insertion.
Create the table in onStart using store.command():
onStart : async ({ store }) => {
await store . command ({ query: `CREATE TABLE IF NOT EXISTS transfers ( ... )` })
}
onData
Call store.insert() to queue an insert. The call is non-blocking — inserts fire concurrently and are fully flushed when the target closes:
onData : async ({ store , data }) => {
store . insert ({
table: 'swaps' ,
values: data . swap . map (( d ) => ({
slot: d . block . slot ,
instruction_index: d . instruction . instructionIndex ,
program_id: d . programId ,
})),
format: 'JSONEachRow' ,
})
}
onRollback
Implement onRollback to handle blockchain forks. It is invoked in two situations:
type: 'offset_check' — on startup, when a saved cursor is found, to discard writes from a previous crashed or partial run
type: 'blockchain_fork' — when the stream detects a chain reorganisation
Use store.removeAllRows() to cancel rows past the safe point. For CollapsingMergeTree tables this re-inserts matching rows with sign = -1:
onRollback : async ({ store , safeCursor }) => {
await store . removeAllRows ({
tables: [ 'swaps' ],
where: `slot > ${ safeCursor . number } ` ,
})
}
Complete example
import { solanaInstructionDecoder , solanaPortalSource } from '@subsquid/pipes/solana'
import { clickhouseTarget } from '@subsquid/pipes/targets/clickhouse'
import { createClient } from '@clickhouse/client'
const client = createClient ({ url: 'http://localhost:8123' })
await solanaPortalSource ({
portal: 'https://portal.sqd.dev/datasets/solana-mainnet' ,
outputs: solanaInstructionDecoder ({
range: { from: '340000000' },
programId: '...' ,
instructions: { swap: swapInstruction },
}),
}). pipeTo (
clickhouseTarget ({
client ,
onStart : async ({ store }) => {
await store . command ({
query: `
CREATE TABLE IF NOT EXISTS swaps (
slot UInt64 CODEC(DoubleDelta, ZSTD),
instruction_index UInt32,
program_id String,
sign Int8 DEFAULT 1
) ENGINE = CollapsingMergeTree(sign)
ORDER BY (slot, instruction_index)
` ,
})
},
onData : async ({ store , data }) => {
store . insert ({
table: 'swaps' ,
values: data . swap . map (( d ) => ({
slot: d . block . slot ,
instruction_index: d . instruction . instructionIndex ,
program_id: d . programId ,
})),
format: 'JSONEachRow' ,
})
},
onRollback : async ({ store , safeCursor }) => {
await store . removeAllRows ({
tables: [ 'swaps' ],
where: `slot > ${ safeCursor . number } ` ,
})
},
}),
)
See all 48 lines
Docker setup
services :
clickhouse :
image : clickhouse/clickhouse-server:latest
ports :
- "8123:8123"
- "9000:9000"
environment :
CLICKHOUSE_DB : default
CLICKHOUSE_USER : default
CLICKHOUSE_PASSWORD : default
volumes :
- clickhouse-data:/var/lib/clickhouse
volumes :
clickhouse-data :
See the clickhouseTarget reference for the full API.