前言
在 Web3 领域,链上交易数据是上层应用的基础。无论是聪明钱追踪、代币数据服务、还是实时 K 线系统,都依赖同一个底层能力:
将区块链上的原始交易数据解析成结构化、可消费的业务事件。
但构建这样的解析层面临几个核心挑战:
- 多链异构:Solana 和 EVM 链(BSC、Base)的区块结构、交易模型、RPC 接口完全不同。Solana 有 Account Model、指令嵌套和 CPI(Cross-Program Invocation);EVM 链有
types.Block、Receipt、Event Log。解析器需要统一这两种范式。 - 协议多样性:仅 Solana 生态就有 Raydium、Meteora、PumpFun、Orca、Pancake 等十几种 DEX 协议,每种协议的指令编码方式(Borsh/自定)和账本结构各不相同。
- 实时性要求高:从区块产生 → 扫链 → 解析 → 发送下游,需要在秒级完成。区块不能积压,解析失败需自动重试。
- 可观测性:生产环境需要监控解析延迟、失败率、Kafka 生产速率、RPC 可用性等指标。
本文介绍一个用 Golang 构建的生产级多链交易解析服务。该服务已覆盖 Solana、BSC、Base、Polygon 四条链,支持 12+ DEX 协议,每日处理百万级交易。
一、系统架构
1.1 整体架构
整个系统分为两个独立微服务:Scanner(扫链)和 Parser(解析),通过 Kafka 解耦。
另外对于高实时要求的链如Solana会有GRPC(监听),redis里用lua脚本去重存到待解析tx队列,解析都是基于tx,而扫链也会结合ws监听出块已减少rpc请求(rpc节点压力过大性能会下降)。
flowchart TB
subgraph Blockchain["区块链网络"]
BSC["BSC Node"]
BASE["Base Node"]
POLYGON["Polygon Node"]
SOLANA["Solana Node"]
end
subgraph Scanner["Scanner Service"]
EVM1["EVM Scanner BSC
轮询 + RLP编码"]
EVM2["EVM Scanner Base
轮询 + RLP编码"]
EVM3["EVM Scanner Polygon
轮询 + RLP编码"]
SOL["Solana Scanner
轮询 + Vote过滤 + 拆分"]
RedisScan["Redis
扫描状态
最后区块号"]
end
subgraph Kafka["Kafka Message Queue"]
TOPIC_BSC["raw_block_BSC"]
TOPIC_BASE["raw_block_BASE"]
TOPIC_POLYGON["raw_block_POLYGON"]
TOPIC_SOL["raw_block_SOLANA"]
end
subgraph Parser["Parser Service"]
EP1["EVM Parser BSC
Receipt解析 + ERC20提取 + 并行"]
EP2["EVM Parser Base
Receipt解析 + ERC20提取 + 并行"]
SOLP["Solana Parser
Plugin 指令解析器
Token → DEX 两阶段"]
Sub["Submitter
多Topic分发"]
end
subgraph Output["下游 Kafka Topics"]
BT["block_topic
完整区块"]
TT["transfer_topic
钱包转账"]
LQ["liquidity_events
流动性变化"]
CT["token_events
代币创建/更新/发射"]
BL["balance_topic
余额快照"]
TL["trade_events
交易记录/币对创建"]
end
subgraph Storage["存储"]
PG[("PostgreSQL
钱包配置")]
RC[("Redis
状态/缓存/回溯")]
end
BSC --> EVM1
BASE --> EVM2
POLYGON --> EVM3
SOLANA --> SOL
EVM1 --> TOPIC_BSC
EVM2 --> TOPIC_BASE
EVM3 --> TOPIC_POLYGON
SOL --> TOPIC_SOL
RedisScan -.-> EVM1 & EVM2 & EVM3 & SOL
TOPIC_BSC --> EP1
TOPIC_BASE --> EP2
TOPIC_POLYGON --> EP2
TOPIC_SOL --> SOLP
EP1 & EP2 & SOLP --> Sub
Sub --> BT & TT & LQ & CT & BL
PG -.-> Parser
RC -.-> Scanner & Parser
1.2 Scanner 服务 — 扫链
Scanner 职责单一:轮询区块链节点、获取新区块、序列化后发送到 Kafka。
sequenceDiagram
participant S as Scheduler
participant R as ScannerRunner
participant C as RPC Client
participant Rds as Redis
participant K as Kafka
loop 每 N 毫秒(每条链独立)
S->>R: 触发扫描
R->>Rds: 读取最后处理区块号
Rds-->>R: lastBlockNumber
R->>C: GetBlock(lastBlockNumber + 1)
alt EVM 链
C-->>R: Block + Receipts
Note over R: RLP 编码 Block
JSON 编码 Receipts
else Solana 链
C-->>R: GetBlockResult
Note over R: stripVoteTxs
大块拆分多个 part
end
R->>K: 发送到 raw_block_
R->>Rds: 更新最后区块号
alt 获取失败
R->>R: 加入重试队列
指数退避重试
end
end
关键设计要点:
Multi-RPC Client:每条链配置多个 RPC 端点,自动负载均衡和故障切换。当主节点限流或断连时,透明切换到备用节点。
1 | ClientPool 结构(简化): |
Scan State 持久化:通过 Redis 记录每个链的最后处理区块,重启后自动续扫,不会重复或遗漏。
1 | Redis Key 格式: |
Vote 交易优化:Solana 的 Vote 交易占块内交易的 80%+。Scanner 通过 stripVoteTxs 将 vote 交易的 Transaction 和 Meta 置为 nil,保留数组位置以维持其他交易的 index 不变。对于超大区块(非 nil 交易超过阈值),还会自动拆分为多个 Kafka 消息。
1 | // internal/scanner/solana/scan.go |
RLP 编码:EVM 的 types.Block 使用 RLP(Recursive Length Prefix)编码为二进制格式,比 JSON 节省约 60% 体积。
1 | // internal/common/block_wire.go |
1.3 Parser 服务 — 解析
Parser 是核心服务,从 Kafka 消费原始区块,解析出结构化事件,再发到不同的下游 Topic。
flowchart LR
subgraph Input["Kafka 输入"]
K1["raw_block_BSC"]
K2["raw_block_SOLANA"]
end
subgraph Decode["反序列化"]
D1["RLP 解码 Block
JSON 解码 Receipts"]
D2["JSON 解码
GetBlockResult"]
end
subgraph Parse["解析引擎"]
E1["EVM Parser"]
S1["Solana Parser"]
end
subgraph Submit["Submitter"]
SUB["并发分发"]
end
K1 --> D1 --> E1
K2 --> D2 --> S1
E1 & S1 --> SUB
二、核心技术
2.1 EVM 交易解析
2.1.1 解析流程
flowchart TD
Block["EvmBlockWithReceipts
(Block + Receipts)"]
Step1["Step 1: 获取 Token Decimals
遍历 Receipts 的 Logs
提取 ERC20 Transfer 中的代币地址
通过 Multicall 批量查询精度"]
Step2["Step 2: 提取 ERC20 Transfer Events
从 Log 中匹配
Transfer(address,address,uint256)"]
Step3["Step 3: 并行解析每笔交易
Bounded Pool(64 goroutines)"]
Block --> Step1 --> Step2 --> Step3
subgraph PerTx["每笔交易并行执行"]
DEX["Goroutine A: DEX事件解析
TryParseAndHandle
分发到各 DEX Handler"]
TX["Goroutine B: 基础转账解析
ParseTransaction
Native + ERC20 转账"]
end
Step3 --> PerTx
DEX --> |Handler 各自处理| DEX1["PancakeV2 Handler"]
DEX --> DEX2["PancakeV3 Handler"]
DEX --> DEX3["Aerodrome Handler"]
DEX --> DEX4["Clanker Handler"]
DEX --> DEX5["其他 EVM DEX..."]
TX --> TX1["解析 Native 转账
(to 地址为空, Data 为空)"]
TX --> TX2["解析 ERC20 Transfer
从 Receipt 获取金额"]
TX --> TX3["Wallet余额查询
(BalanceAt RPC)"]
TX --> TX4["生成 BalanceChange
Native + Token"]
DEX1 & DEX2 & DEX3 & DEX4 & DEX5 --> Result
TX1 & TX2 & TX3 & TX4 --> Result
Result["ParsedTransaction
Transfers + LiquidityEvents + BalanceChanges"]
2.1.2 核心代码
1 | // internal/parser/evm/parser.go - parseBlockOnce 核心逻辑 |
2.1.3 ERC20 Transfer Event 提取细节
1 | // 从 Receipt Log 中匹配 ERC20 Transfer 事件 |
2.1.4 EVM DEX 事件流
TryParseAndHandle 实际是通过 EvmEnventHander 的事件分发机制,将 Receipt 中的 Log 匹配到注册的 DEX Handler。每个 Handler 负责特定协议的事件解析:
1 | TryParseAndHandle(receipt) |
2.2 Solana 指令解析 — Plugin 架构
Solana 的解析比 EVM 复杂一个数量级。核心原因是 Solana 交易是指令嵌套结构:
flowchart LR
TX["一笔 Solana 交易"] --> OUTER1["外层指令 1
(Program A)"]
TX --> OUTER2["外层指令 2
(Program B)"]
TX --> OUTER3["外层指令 3
(Program C: DEX)"]
OUTER3 --> CPI1["CPI 内层指令 3_0
(Token Program: Transfer)"]
OUTER3 --> CPI2["CPI 内层指令 3_1
(Token Program: MintTo)"]
OUTER3 --> CPI3["CPI 内层指令 3_2
(System Program: Transfer)"]
style TX fill:#e1f5fe
style OUTER3 fill:#fff3e0
style CPI1 fill:#f3e5f5
style CPI2 fill:#f3e5f5
style CPI3 fill:#f3e5f5
外层指令(Outer Instructions)是交易中显式调用的指令;内层指令(Inner Instructions)是 CPI 调用触发的子指令。Token 转账通常发生在内层指令中,而 DEX 协议的外层指令决定了业务语义(如 Swap、AddLiquidity)。
项目设计了 Plugin 式指令解析器管理器:
classDiagram
class InstructionParserManager {
-tokenParser: InstructionParser
-parsers: map[string]InstructionParser
+RegisterParser(programID, parser)
+ParseInstruction(ctx, extractor, instruction, index) error
+ParseToken(ctx, extractor, instruction, index) error
}
class InstructionParser {
<>
+Parse(ctx, extractor, instruction, index) error
}
class SystemProgramParser {
+Parse() error
}
class TokenProgramParser {
+Parse() error
}
class RaydiumCLMMParser {
-client: MultiRPCClient
+Parse() error
+parseIncreaseLiquidity() error
+parseDecreaseLiquidityV2() error
}
class PumpfunAmmParser {
-client: MultiRPCClient
+Parse() error
+parseDeposit() error
+parseWithdraw() error
}
class MeteoraDLMMParser {
-client: MultiRPCClient
+Parse() error
}
class WhirlpoolsParser {
-client: MultiRPCClient
+Parse() error
}
InstructionParserManager --> InstructionParser : 管理
InstructionParser <|.. SystemProgramParser
InstructionParser <|.. TokenProgramParser
InstructionParser <|.. RaydiumCLMMParser
InstructionParser <|.. PumpfunAmmParser
InstructionParser <|.. MeteoraDLMMParser
InstructionParser <|.. WhirlpoolsParser
1 | // internal/parser/solana/instructionParser/instruction_parser.go |
2.2.1 两阶段解析流程
sequenceDiagram
participant B as SolanaBlock
participant P as SolanaParser
participant M as InstructionParserManager
participant TokenP as TokenParser
participant DexP as DEX Parsers
participant RPC as RPC Client
B->>P: ParseBlock(SolanaBlock)
P->>P: 解析 PreBalances/PostBalances
PreTokenBalances/PostTokenBalances
Note over P: Phase 1: Token 解析(高并发 20 goroutines)
par 每笔外层指令
P->>M: ParseToken(instruction)
M->>TokenP: Token Program?
TokenP->>TokenP: 解析 Transfer/Mint/Burn
TokenP-->>M: TokenTransferInfo
and 每笔内层指令
P->>M: ParseToken(innerInstruction)
M->>TokenP: Token Program?
TokenP->>TokenP: 解析内层 Transfer
TokenP-->>M: TokenTransferInfo
end
Note over P: Phase 1 完成: 所有 Token 转账已提取
Note over P: Phase 2: DEX 解析(低并发 5 goroutines)
par 每笔外层指令
P->>M: ParseInstruction(instruction)
M->>DexP: programID 匹配?
DexP->>DexP: Borsh 反序列化指令数据
识别指令类型(swap/add/remove)
DexP->>RPC: 读取池状态(SqrtPrice)
RPC-->>DexP: Pool account data
DexP->>DexP: 关联 Phase 1 的 TokenTransfer
计算价格
DexP-->>M: DexLiquidityEventData
and 每笔内层指令
P->>M: ParseInstruction(innerInstruction)
M->>DexP: programID 匹配?
DexP->>DexP: 处理 CPI 子指令
DexP-->>M: DexLiquidityEventData
end
P->>P: 组装 ParsedTransaction
P->>B: 返回完整 ParsedBlockInfo
2.2.2 DEX Parser 内部流程(以 Raydium CLMM 为例)
flowchart TD
Start["Parse(ctx, extractor, instruction, index)"]
--> Decode["Borsh 反序列化指令数据
amm_v3.DecodeInstruction()"]
--> Switch{"指令类型判断
instruction.TypeID"}
Switch -->|Instruction_IncreaseLiquidity| IncLiq["parseIncreaseLiquidity
解析添加流动性"]
Switch -->|Instruction_IncreaseLiquidityV2| IncLiqV2["parseIncreaseLiquidityV2"]
Switch -->|Instruction_DecreaseLiquidity| DecLiq["parseDecreaseLiquidity
解析移除流动性"]
Switch -->|Instruction_DecreaseLiquidityV2| DecLiqV2["parseDecreaseLiquidityV2"]
Switch -->|Instruction_OpenPosition| OpenPos["parseOpenPositionV2
开仓 + 添加流动性"]
Switch -->|Instruction_CollectFundFee| CollFee["parseCollectFundFees
收取手续费"]
Switch -->|Instruction_CollectProtocolFee| CollProto["parseCollectProtocolFees"]
subgraph IncreaseLiquidity["添加流动性解析流程"]
IncLiq --> GetBase["从指令 Accounts 获取
baseVault, quoteVault, pool"]
GetBase --> GetMint["从 Extractor 的
SplTokenAccountInfoMap 中
查找 baseMint, quoteMint"]
GetMint --> FindTransfer["从内层指令索引获取
Token Transfer 金额
(getChildrenInstructionIndex)"]
FindTransfer --> GetPrice["RPC 读取 Pool 账户
Borsh 反序列化池状态
获取 SqrtPriceX64"]
GetPrice --> CalcPrice["sqrtPriceX64ToTokenPrices
计算实时价格"]
CalcPrice --> BuildEvent["构建 DexLiquidityEventData
EventType: add_liquidity
BaseAmount + QuoteAmount + Price"]
end
关键数据结构 — Raydium CLMM Pool State:
1 | type RaydiumPoolInfo struct { |
价格计算:
1 | // sqrtPriceX64ToTokenPrices 将 sqrt 定点数转换为实际 token 价格 |
2.3 余额提取
2.3.1 Solana — 内置余额变化
Solana 相比 EVM 的一大优势:节点直接在交易 meta 中提供余额变化,无需额外 RPC 调用。
flowchart LR
subgraph TxMeta["Transaction Meta"]
PB["PreBalances: [10 SOL, 5 SOL, 2 SOL]"]
POB["PostBalances: [8 SOL, 7 SOL, 2 SOL]"]
PTB["PreTokenBalances:
(accountIndex, mint, amount)"]
POTB["PostTokenBalances:
(accountIndex, mint, amount)"]
end
subgraph Native["Native SOL 余额变化"]
N1["account[0]: 10 → 8 = -2 SOL"]
N2["account[1]: 5 → 7 = +2 SOL"]
N3["account[2]: 2 → 2 = 0 SOL"]
end
subgraph Token["SPL Token 余额变化"]
T1["匹配 Pre/Post
计算差值"]
T2["处理销户场景
(有 Pre 无 Post → 归零)"]
T3["处理开户场景
(无 Pre 有 Post → 新余额)"]
end
PB & POB --> Native
PTB & POTB --> Token
1 | // internal/parser/solana/parser.go - ExtractBalanceChanges |
2.3.2 EVM — 按需查询
EVM 不提供交易级余额变化,只能按需查询:
flowchart TD
TX["ParsedTransaction"]
--> Check{"from 或 to
是否在跟踪钱包中?"}
Check -->|from 是跟踪钱包| From["BalanceAt(from)
最多重试 3 次"]
Check -->|to 是跟踪钱包| To["BalanceAt(to)
最多重试 3 次"]
Check -->|都不是| Skip["跳过"]
From --> Update["更新 NativeBalanceChange
BalanceAfter"]
To --> Update
TX --> Batch["批量 ERC20 Balance
(通过专用合约 Multicall)"]
Batch --> Update2["更新 ERC20 BalanceAfter"]
2.4 区块序列化与反序列化
Scanner 和 Parser 通过 Kafka 传输区块数据,序列化方案直接影响性能和带宽:
flowchart LR
subgraph Scanner_side["Scanner 序列化"]
EVMB["EVM Block"] -->|"RLP Encode(types.Block)"| RLP["[]byte"]
EVMR["EVM Receipts"] -->|"sonic.Marshal 逐个编码"| JSONR["json.RawMessage[]"]
SOLB["Solana GetBlockResult"] -->|"sonic.Marshal
(原生 JSON 支持)"| JSONS["JSON"]
RLP & JSONR --> EVMWire["evmBlockWire{Chain, ChainId, Block, Receipts}"]
JSONS --> SOLWire["solanaBlockWire{Chain, Slot, Block}"]
end
subgraph Kafka["Kafka Payload"]
KB["raw_block_BSC: JSON(RLP(Block) + JSON(Receipts))"]
KS["raw_block_SOLANA: JSON(GetBlockResult)"]
end
subgraph Parser_side["Parser 反序列化"]
KB -->|"sonic.Unmarshal → RLP.Decode"| EVMBlock["EvmBlockWithReceipts"]
KS -->|"sonic.Unmarshal"| SOLBlock["SolanaBlock"]
end
EVMWire --> KB
SOLWire --> KS
为什么 EVM 用 RLP + JSON 混合?
types.Block包含完整的区块头(32个字段)+ 交易列表,RLP 编码后体积约为 JSON 的 40%types.Receipt需要按交易逐笔访问,JSON 编码更方便逐个反序列化bytedance/sonic是 Go 最快的 JSON 库之一,比encoding/json快 3-5 倍
1 | // internal/common/block_wire.go |
2.5 Submitter — 多 Topic 分发
sequenceDiagram
participant P as Parser
participant Sub as Submitter
participant DB as dex_data_broker
participant K as Kafka
P->>Sub: SubmitParsedBlock(parsedBlockInfo)
Sub->>Sub: Block.DeepCopy()
Note over Sub: 深拷贝: 避免并发修改
独立处理各 Topic
par 并发分发
Sub->>Sub: submitBalance()
Sub->>K: balance_topic
Sub->>Sub: 过滤钱包相关转账
Sub->>K: transfer_topic
Sub->>DB: submitLiquidityEvents()
DB->>K: liquidity_events
Sub->>DB: submitCreateTokenEvents()
DB->>K: token_create_events
Sub->>DB: submitSupplyEvents()
DB->>K: supply_events
Sub->>K: block_topic(完整区块)
alt 回溯模式
Sub->>K: *_history 后缀 Topic
end
end
Note over Sub: pool.New() 无界并发发送
Topic 设计:
| Topic | 内容 | 消费者 | 过滤逻辑 | ||
|---|---|---|---|---|---|
block_topic |
完整区块(所有交易) | 数据仓库、全量索引 | 无过滤 | ||
transfer_topic |
仅钱包转账 | 聪明钱系统、通知服务 | `CheckAddress(from) | CheckAddress(to)` | |
liquidity_events |
DEX 流动性事件 | 代币数据服务、K线 | 全部 | ||
token_create_events |
代币创建 | 代币收录服务 | 全部 | ||
supply_events |
供应量变更 | 代币数据服务 | 全部 | ||
balance_topic |
余额快照 | 持仓同步 | 仅跟踪钱包 | ||
pair_payload_topic |
Pair 更新 | 池管理服务 | 全部 |
深度拷贝的目的:
同一批解析结果需要分发到多个 Kafka Topic,每个 Topic 对数据有不同的裁剪要求。例如 transfer_topic 只需要转账数据,不需要余额快照。ParsedTransaction 和 ParsedBlockInfo 包含 big.Int、切片、map 等引用类型,直接浅拷贝会导致多个消费者共享内部状态,修改一个影响另一个。
1 | // internal/submitter/submitter.go - 裁剪示例 |
三、工程实践
3.1 错误处理 — 三层保护
区块链节点不稳定、RPC 限流、区块数据异常是常态。系统设计了三层保护:
flowchart TD
Start["解析区块"] --> Level1
subgraph Level1["第一层: 重试机制"]
Attempt1["第 1 次尝试"]
Attempt1 -->|失败| Delay["随机延迟 2-5s"]
Delay --> Attempt2["第 2 次尝试"]
Attempt2 -->|失败| Delay2["随机延迟 2-5s"]
Delay2 --> Attempt3["第 3 次尝试"]
Attempt3 -->|失败| Fail["返回错误"]
Attempt1 -->|成功| Done["完成"]
Attempt2 -->|成功| Done
Attempt3 -->|成功| Done
end
subgraph Level2["第二层: Goroutine 容错"]
Panic["某笔交易解析 panic"] --> Recover["recover()"]
Recover --> Stack["打印堆栈"]
Stack --> Continue["继续处理下一笔交易"]
Continue --> TxPool["Bounded Pool 不受影响"]
end
subgraph Level3["第三层: 上下文取消"]
SIGINT["收到 SIGINT/SIGTERM"] --> Cancel["context cancel()"]
Cancel --> InFlight["所有 in-flight goroutine
立即中止"]
InFlight --> Graceful["Scheduler.Stop() 等待完成"]
end
Level1 -->|重试耗尽| Report["上报 Lark 告警"]
Level1 -->|context 取消| Level3
Fail --> Report
重试实现:
1 | func parseBlockRetryDelay() time.Duration { |
3.2 并发控制
系统大量使用 sourcegraph/conc/pool 的 bounded goroutine pool,避免无限创建 goroutine 导致 OOM:
flowchart LR
subgraph EVM_Parser["EVM Parser"]
EVM_Pool["Bounded Pool (64)"]
EVM_Pool --> TX1["TX 0 DEX"]
EVM_Pool --> TX1B["TX 0 Transfer"]
EVM_Pool --> TX2["TX 1 DEX"]
EVM_Pool --> TX2B["TX 1 Transfer"]
EVM_Pool --> TXN["... (max 64 ongoing)"]
end
subgraph Solana_Parser["Solana Parser"]
SP["Bounded Pool (256)"] --> TXS0["TX 0"]
SP --> TXS1["TX 1"]
SP --> TXSN["... (max 256 ongoing)"]
subgraph PerTX["每笔交易内部"]
SPP1["Pool (20)
Token 解析"]
SPP2["Pool (5)
DEX 解析"]
end
end
subgraph Submit["Submitter"]
SUBP["Unbounded Pool
每个 Topic 一个 goroutine"]
end
| 场景 | Pool 大小 | 说明 |
|---|---|---|
| EVM 交易级解析 | 64 | 包含 DEX + Transfer + Balance 查询,RPC 调用多,限制防止积压 |
| Solana 交易级 | 256 | 主要是本地计算 + 少量 RPC,可以更高并发 |
| Solana Token 解析 | 20 | 全部本地计算(从已解码的 meta 提取) |
| Solana DEX 解析 | 5 | 需要额外 RPC 读取池状态,低并发减少节点压力 |
| Submitter 分发 | 无界 | 固定 5-7 个 goroutine,不会无限增长 |
3.3 依赖注入
项目采用依赖注入方式组织代码。Dependencies 结构体集中管理所有外部依赖:
flowchart TD
subgraph Dependencies["Dependencies 结构体"]
LOG["Logger (zap)"]
DB["DbClient (PostgreSQL)"]
RDS["RedisClient (状态/缓存)"]
LC["LayerCacheRdb (分层缓存)"]
TR["TrackRedisClient (回溯)"]
KAF["Producer (Kafka)"]
LK["LarkClient (告警通知)"]
end
subgraph Init["initDependencies() 初始化"]
Init_LOG["NewLogger"]
Init_DB["pgutil.NewDB"]
Init_RDS["redis_pkg.NewRedisClient × 3"]
Init_KAF["kafka.NewProducer"]
Init_LK["larkrpc.NewLarkRpcClient"]
end
Init --> Init_LOG & Init_DB & Init_RDS & Init_KAF & Init_LK
Init_LOG --> LOG
Init_DB --> DB
Init_RDS --> RDS & LC & TR
Init_KAF --> KAF
Init_LK --> LK
subgraph Inject["注入各模块"]
LOG --> ScannerRunner["ScannerRunner"]
LOG --> ParserRunner["ParserRunner"]
RDS --> ScannerRunner
RDS --> ParserRunner
KAF --> ScannerRunner
KAF --> ParserRunner
DB --> ParserRunner
LC --> ParserRunner
LK --> ScannerRunner
LK --> ParserRunner
end
这使得:
- 各模块可独立测试(Mock 替换真实依赖)
- 启动和关闭顺序可控(
initDependencies→stop) - 切换链或数据源只需修改配置
3.4 可观测性
flowchart LR
subgraph Metrics["Prometheus Metrics"]
M1["parser_block_parse_duration
区块解析耗时"]
M2["parser_kafka_produce_errors
Kafka 生产错误数"]
M3["scanner_rpc_call_duration
RPC 调用耗时"]
M4["scanner_redis_pool_connections
Redis 连接数"]
M5["scanner_block_interval
区块到达间隔"]
end
subgraph Alert["Lark 告警"]
A1["区块解析失败"]
A2["RPC 连续超时"]
A3["Kafka 生产异常"]
A4["区块长时间未到达"]
end
subgraph Log["结构化日志 (zap)"]
L1["chain=BSC block_number=12345"]
L2["tx_hash=0xabc... status=success"]
L3["parse_duration=2.3s retry=2"]
end
Metrics --> Grafana["Grafana 面板"]
Alert --> Lark["飞书机器人"]
Log --> File["lumberjack 滚动文件"]
3.5 Trackback 模式
sequenceDiagram
participant C as Config
participant S as Scanner
participant K as Kafka
participant P as Parser
participant Out as Output Topic
Note over C: 启用回溯配置
startBlock / endBlock
C->>S: SolanaTrackBackStartBlock=350000000
SolanaTrackBackEndBlock=350001000
loop startBlock → endBlock
S->>S: 从 Redis 读取回溯进度
S->>SOL: GetBlock(blockNumber)
SOL-->>S: Block Data
S->>K: raw_block_SOLANA
Note over S: 正常扫描逻辑复用
end
K->>P: 消费原始块
P->>P: isTrackback=true
par 正常 Topic
P->>Out: block_topic_history
P->>Out: transfer_topic_history
P->>Out: balance_topic_history
Note over Out: 带 _history 后缀
不污染实时数据流
and dex_data_broker
P->>DB: LiquidityDataBroker(isTrackback=true)
DB->>DB: 写入历史存储
end
Note over S,P: 回溯完成后自动退出
Trackback 模式的核心设计:
复用同一套扫描和解析逻辑,通过 isTrackback 标志控制数据路由。 这保证了:
- 回溯行为和实时行为完全一致(相同的解析逻辑、相同的错误处理)
- 回溯数据不污染实时数据流(不同的 Kafka Topic)
- 支持中 continued 断点续扫(Redis 记录进度)
四、支持协议一览
EVM
| 链 | 协议 | 解析内容 | 解析方式 |
|---|---|---|---|
| BSC | PancakeSwap V2 | Swap、Add/Remove Liquidity | Event Log 解析 |
| BSC | PancakeSwap V3 | 集中流动性事件 | Event Log 解析 |
| BASE | Aerodrome | Swap、Add/Remove Liquidity | Event Log 解析 |
| BASE | Clanker | Token 创建事件 | Event Log 解析 |
| BASE | Flap | Swap | Event Log 解析 |
| BASE | Dopper | Swap | Event Log 解析 |
| BASE/ZORA | Zora | NFT 铸造 | Event Log 解析 |
| BASE | FourMeme | Meme 代币交易 | Event Log 解析 |
| BASE | Polymarket | 预测市场事件 | Event Log 解析 |
| Polygon | QuickSwap/UniswapV3 等 | 通用 ERC20 事件 | Event Log 解析 |
Solana
| 协议 | 程序 ID | 解析内容 | 编码方式 |
|---|---|---|---|
| Raydium CLMM | CAMMCzo5YL8w4VFF8KVHrK22GGUsp5VTaW7grrKgrWqK |
集中流动性添加/移除、Swap | Borsh |
| Raydium CPMM | CPMMoo8L3F4NbTegBCKVNunggL7H1ZpdTHKxQB5qKP1C |
恒定乘积池交易 | Borsh |
| Raydium PoolV4 | 675kPX9MHTjS2zt1iDB1BT4W4PHU4tNmNsLu8j6Tteo |
AMM 池交易 | Borsh |
| Meteora DLMM | LBUZKhRdB2Ua3mLqLT8rpBgLzXKfbPcBMhR2R2r9GAs |
动态流动性做市 | Borsh |
| Meteora AMM | Eo7WjgDUJL3T4r7J8R4KJ8wN7Bz4vTtLpP3D9WwqR8 |
AMM 池交易 | Borsh |
| Meteora AMMV2 | M2SD9R8KEmL5LVkpAMRzKxqZF6oXmiGWHcFgjqpVZcP |
AMM V2 池交易 | Borsh |
| PumpFun AMM | pAMMBay6oceH9fJKBRHGP5D4bD4sWpmSwMn52FMfXEA |
Meme 代币 AMM 流动性 | Borsh |
| Orca Whirlpools | whirLbMiicVdio4qvUfM5KAg6Ct8VwpYzGff3uctyCc |
集中流动性池 | Borsh |
| Orca TokenSwapV2 | 9W959DqEETiGZocYWCQPaJ6sBmUzgfxXfqGeTEdp3FaP |
经典 AMM 池 | Borsh |
| PancakeSwap | PcKFoK... |
流动性事件 | Borsh |
| Heaven | heaven... |
流动性事件 | Borsh |
| DBC | DBC... |
流动性事件 | Borsh |
| Metaplex | metaqbxxUerdq28cj1RbAWkYQm3ybzjb6a8bt518x1s |
Token Metadata | Borsh |
五、总结
核心设计理念
mindmap
root((多链交易解析服务))
分层解耦
Scanner 只负责扫链
Parser 专注解析
Kafka 消峰解耦
各自独立扩容
Plugin 扩展
InstructionParser 接口
programID 映射 parser
新增协议只需注册
无需修改核心逻辑
并发安全
Bounded goroutine pool
深度拷贝 DeepCopy
sync.Mutex 细粒度锁
避免数据竞争
防御性编程
重试加随机抖动
panic recovery
context 取消
退化降级
可观测
Prometheus 指标
Lark 告警
结构化日志
分层解耦:Scanner → Kafka → Parser 的经典 Pipeline 架构,Scanner 只负责扫链,Parser 专注解析,通过 Kafka 消峰解耦。两层可以独立扩缩容。
Plugin 扩展:Solana 指令解析器采用插件注册模式,新增协议只需实现
InstructionParser接口并调用RegisterParser注册,完全不需要修改核心解析循环。并发安全:通过 bounded goroutine pool + 深度拷贝 + sync.Mutex 组合策略,在保证吞吐的同时避免数据竞争。
防御性编程:重试、panic recovery、context 取消、退化降级(recoverable error 不阻断全块),让系统在不可靠的公链环境中稳定运行。