Skip to main content

Documentation Index

Fetch the complete documentation index at: https://beta.docs.sqd.dev/llms.txt

Use this file to discover all available pages before exploring further.

Pipe anatomy
An EVM pipe made with SQD’s Pipes SDK consists of:
  • A source - typically made with evmPortalSource(). Can have one or more outputs.
  • Queries - tell the source which data has to be retrieved to compute each output. A query is defined by a chain call terminated by .build(). Here’s an example:
    evmQuery()
      .addFields({
        block: { timestamp: true },
        log: { address: true, transactionHash: true },
      })
      .addLog({
        topic0: [ TRANSFER_TOPIC ]
      })
      .build()
    
  • Per-query transforms (optional) - you can pass data from each query through a chain of simple transforms:
    query
      .pipe(data => data.map(item => ({
        funkyNumber: item.header.timestamp + item.header.number,
        ...item
      })))
      .pipe(someOtherSimpleTransformCallback)
    
    Source object streams the data you get out of each chain of transforms as the value of the corresponding output field.
  • Making utils that return reusable query-transform combos is a very useful pattern. In particular, on EVM it is often convenient to keep retrieval and decoding of event logs in a single module. You can easily make such combos with the evmDecoder() function - see the Handling events guide.
  • Whole pipe transformers (optional) - use this if you need to compute something based on data originating from multiple queries, or if you need access to per-batch context (cursor, logger, profiler, fork callbacks). Use createTransformer() so the SDK can thread cursor and rollback information (1, 2) through your transform:
    import { createTransformer } from '@subsquid/pipes'
    
    const enrichTransfers = createTransformer<
      { transfers: Transfer[]; approvals: Approval[] },
      { events: EnrichedEvent[] }
    >({
      transform: ({ transfers, approvals }, ctx) => {
        ctx.logger.info({ batch: ctx.stream.state.current?.number }, 'enriching')
        return {
          events: [
            ...transfers.map((t) => ({ kind: 'transfer' as const, ...t })),
            ...approvals.map((a) => ({ kind: 'approval' as const, ...a })),
          ],
        }
      },
    })
    
    evmPortalStream({ /* ... */ }).pipe(enrichTransfers)
    
  • Pipe termination: a plain async iterator or a target.
    • If you use the pipe as an async iterator it will throw exceptions if the underlying chain is experiencing reorgs, see Fork handling.
    • We offer two targets out of the box:
      • Postgres via Drizzle
      • ClickHouse
      You can make your own using createTarget().