Skip to main content
Index ERC20 Transfer events from Ethereum mainnet to a ClickHouse database.

Prerequisites

  • Node.js 22+ installed
  • Docker installed
1

Download and install

Install the Pipes SDK package.
npm install @subsquid/pipes @clickhouse/client
2

Create a new project

Create a minimal docker-compose.yml:
docker-compose.yml
services:
  clickhouse:
    image: clickhouse/clickhouse-server:latest
    container_name: clickhouse
    ports:
      - "8123:8123"
    environment:
      CLICKHOUSE_DB: default
      CLICKHOUSE_USER: default
      CLICKHOUSE_PASSWORD: password
      CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT: "1"
3

Start the Clickhouse server

Start the services using Docker Compose:
docker compose up -d
4

Create the script

Create a file index.ts with the following code:
index.ts
import { createClient } from '@clickhouse/client'
import { commonAbis, evmDecoder, evmPortalSource } from '@subsquid/pipes/evm'
import { clickhouseTarget } from '@subsquid/pipes/targets/clickhouse'

async function main() {
  await evmPortalSource({
    portal: 'https://portal.sqd.dev/datasets/ethereum-mainnet',
  })
    .pipe(
      evmDecoder({
        range: { from: 'latest' },
        events: {
          transfers: commonAbis.erc20.events.Transfer,
        },
      }),
    )
    .pipe({
      transform: (data) =>
        data.transfers.map((t) => ({
          block_number: t.block.number,
          token: t.contract,
          from: t.event.from,
          to: t.event.to,
          amount: t.event.value.toString(),
        })),
    })
    .pipeTo(
      clickhouseTarget({
        client: createClient({
          username: 'default',
          password: 'password',
          url: 'http://localhost:8123',
        }),
        onStart: async ({ store }) => {
          await store.command({
            query: `
              CREATE TABLE IF NOT EXISTS erc20_transfers (
                block_number  UInt64,
                timestamp     DateTime64(3) CODEC (DoubleDelta, ZSTD),
                token         String,
                from          String,
                to            String,
                amount        UInt256
              )
              ENGINE = MergeTree
              ORDER BY block_number
          `,
          })
        },
        onData: async ({ ctx, data, store }) => {
          await store.insert({
            table: 'erc20_transfers',
            values: data,
            format: 'JSONEachRow',
          })
        },
        onRollback: async ({ safeCursor, store }) => {
          await store.removeAllRows({
            tables: ['erc20_transfers'],
            where: `block_number > {latest:UInt32}`,
            params: { latest: safeCursor.number },
         })
        },
      }),
    )
}

void main()
5

Run the script

Execute the script to start indexing Transfer events.
npx tsx index.ts
You should see the indexer progress in the console:
[12:46:37.044] INFO: Start indexing from 0 block
[12:46:42.035] INFO: 1,501,277 / 38,429,144 (3.91%), ETA: 6m 43s
    blocks: "91444 blocks/second"
    bytes: "20.01 MB/second"
[12:46:47.034] INFO: 1,955,534 / 38,429,144 (5.09%), ETA: 6m 53s
    blocks: "88166 blocks/second"
    bytes: "28.31 MB/second"
[12:46:52.098] INFO: 1,964,997 / 38,429,144 (5.11%), ETA: 11m 12s
    blocks: "54234 blocks/second"
    bytes: "38.23 MB/second"

How It Works

The pipeline connects four components: a Portal source streams blockchain data, an event decoder filters and decodes Transfer events using the ERC20 ABI, a transform step maps decoded events to database rows, and a ClickHouse target persists the data. Each decoded event includes block (number and hash), timestamp, contract address, and typed event fields (from, to, value).

Next Steps