import { BlockCursor, createTarget } from '@subsquid/pipes'
import { solanaPortalSource, solanaInstructionDecoder} from '@subsquid/pipes/solana'
import * as orcaWhirlpool from './abi/orca_whirlpool/index.js'
async function main() {
const source = solanaPortalSource({
portal: 'https://portal.sqd.dev/datasets/solana-mainnet'
})
const decoder = solanaInstructionDecoder({
programId: orcaWhirlpool.programId,
instructions: {
swap: orcaWhirlpool.instructions.swap,
},
range: { from: 'latest' }
})
// Track recently processed unfinalized slots
let recentUnfinalizedSlots: BlockCursor[] = []
await source
.pipe(decoder)
.pipeTo(createTarget({
write: async ({logger, read}) => {
// Resume from the last known slot
for await (const {data, ctx} of read(recentUnfinalizedSlots[recentUnfinalizedSlots.length-1])) {
console.log(`Got ${data.swap.length} swaps`)
// Track unfinalized slots from the batch
ctx.state.rollbackChain.forEach((bc) => {
recentUnfinalizedSlots.push(bc)
})
// Prune finalized slots and cap queue length at 1000
if (ctx.head.finalized) {
recentUnfinalizedSlots = recentUnfinalizedSlots.filter(
b => b.number >= ctx.head.finalized!.number
)
}
recentUnfinalizedSlots = recentUnfinalizedSlots.slice(
recentUnfinalizedSlots.length - 1000,
recentUnfinalizedSlots.length
)
}
},
// Handle fork events
fork: async (previousBlocks) => {
const rollbackIndex = findRollbackIndex(recentUnfinalizedSlots, previousBlocks)
if (rollbackIndex >= 0) {
recentUnfinalizedSlots.length = rollbackIndex + 1
return recentUnfinalizedSlots[rollbackIndex]
} else {
recentUnfinalizedSlots.length = 0
return null
}
}
}))
}
function findRollbackIndex(chainA: BlockCursor[], chainB: BlockCursor[]): number {
let aIndex = 0
let bIndex = 0
let lastCommonIndex = -1
while (aIndex < chainA.length && bIndex < chainB.length) {
const slotA = chainA[aIndex]
const slotB = chainB[bIndex]
if (slotA.number < slotB.number) {
aIndex++
continue
}
if (slotA.number > slotB.number) {
bIndex++
continue
}
if (slotA.number === slotB.number && slotA.hash !== slotB.hash) {
return lastCommonIndex
}
lastCommonIndex = aIndex
aIndex++
bIndex++
}
return lastCommonIndex
}
void main()