diff --git a/tools/l1-tx-volume-indexer/README.md b/tools/l1-tx-volume-indexer/README.md index 52864ec4b..9c846aba8 100644 --- a/tools/l1-tx-volume-indexer/README.md +++ b/tools/l1-tx-volume-indexer/README.md @@ -7,6 +7,7 @@ This tool: - Computes: - `l1_timestamp`, `from_address`, `to_address` - `total_vol_eth`, `eth_vol`, `weth_vol`, `token_vol_eth`, `swap_vol_eth` + - `eth_price_usd`, `swap_vol_usd`, `total_vol_usd` - `is_swap`, `is_lending`, `is_transfer`, `is_approval` - `primary_class`, `protocol` - Writes results into `mevcommit_57173.processed_l1_txns_v2`. diff --git a/tools/l1-tx-volume-indexer/main.go b/tools/l1-tx-volume-indexer/main.go index e19eb69fc..59cf2a66a 100644 --- a/tools/l1-tx-volume-indexer/main.go +++ b/tools/l1-tx-volume-indexer/main.go @@ -56,13 +56,18 @@ const ( zeroAddr = "0x0000000000000000000000000000000000000000" // User-provided swap topics: - uniswapV2SwapTopic0 = "0x1c411e9a96e071241c2f21f7726b17ae89e3cab4c78be50e062b03a9fffbbad1" - uniswapV3SwapTopic0 = "0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67" - uniswapV4SwapTopic0 = "0x40e9cecb9f5f1f1c5b9c97dec2917b7ee92e57ba5563708daca94dd84ad7112f" + uniswapV2SwapTopic0 = "0x1c411e9a96e071241c2f21f7726b17ae89e3cab4c78be50e062b03a9fffbbad1" + uniswapV3SwapTopic0 = "0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67" + uniswapV4SwapTopic0 = "0x40e9cecb9f5f1f1c5b9c97dec2917b7ee92e57ba5563708daca94dd84ad7112f" + solverCallExecTopic0 = "0x93485dcd31a905e3ffd7b012abe3438fa8fa77f98ddc9f50e879d3fa7ccdc324" ) var httpClient = &http.Client{Timeout: 180 * time.Second} +// In-memory cache: date string "YYYY-MM-DD" -> ETH/USD price. +// Avoids repeat Covalent API calls for transactions on the same day. +var ethUSDCache = map[string]float64{} + // Router/settlement allowlist (used to prevent false-positive swaps) var swapRouterAllowlist = map[string]string{ "0x66a9893cc07d91d95644aedd05d03f95e1dba8af": "UniswapV4UniversalRouter", @@ -119,6 +124,10 @@ type ExistingRow struct { TokenVol *float64 SwapVol *float64 + EthPriceUSD *float64 + SwapVolUSD *float64 + TotalVolUSD *float64 + IsSwap *bool IsLending *bool IsTransfer *bool @@ -190,6 +199,10 @@ type Computed struct { TokenVolEth float64 TotalVolEth float64 + EthPriceUSD float64 + SwapVolUSD float64 + TotalVolUSD float64 + // Classification IsSwap bool IsLending bool @@ -340,7 +353,7 @@ func main() { // If we get a"tx not found" (Covalent 404), insert a tombstone (not_found row)so we don't retry forever. if w.InsertCandidate != nil && !*onlyUpdates && isCovalentTxNotFound(err) { - ok, why := shouldTombstoneNotFound(db, w.InsertCandidate.HashNorm, 75) + ok, why := shouldTombstoneNotFound(db, w.InsertCandidate.HashNorm, w.InsertCandidate.Source, w.InsertCandidate.CommitmentIndex, 15) if !ok { log.Printf("skip tombstone %s (%s)", w.Hash0x, why) continue @@ -706,6 +719,10 @@ SELECT token_vol_eth, swap_vol_eth, + eth_price_usd, + swap_vol_usd, + total_vol_usd, + is_swap, is_lending, is_transfer, @@ -747,6 +764,10 @@ SELECT token_vol_eth, swap_vol_eth, + eth_price_usd, + swap_vol_usd, + total_vol_usd, + is_swap, is_lending, is_transfer, @@ -769,6 +790,7 @@ WHERE l1_tx_hash IS NOT NULL OR l1_timestamp IS NULL OR total_vol_eth IS NULL OR swap_vol_eth IS NULL + OR eth_price_usd IS NULL ) %s; `, lim) @@ -802,6 +824,10 @@ WHERE l1_tx_hash IS NOT NULL tokv sql.NullFloat64 swapv sql.NullFloat64 + ethPriceUSD sql.NullFloat64 + swapVolUSD sql.NullFloat64 + totalVolUSD sql.NullFloat64 + isSwap sql.NullBool isLnd sql.NullBool isTr sql.NullBool @@ -815,6 +841,7 @@ WHERE l1_tx_hash IS NOT NULL &hashNorm, &hash0x, &ci, &ts, &fr, &to, &bd, &cm, &total, ðv, &wethv, &tokv, &swapv, + ðPriceUSD, &swapVolUSD, &totalVolUSD, &isSwap, &isLnd, &isTr, &isAp, &prim, &prot, ); err != nil { @@ -871,6 +898,19 @@ WHERE l1_tx_hash IS NOT NULL r.SwapVol = &v } + if ethPriceUSD.Valid { + v := ethPriceUSD.Float64 + r.EthPriceUSD = &v + } + if swapVolUSD.Valid { + v := swapVolUSD.Float64 + r.SwapVolUSD = &v + } + if totalVolUSD.Valid { + v := totalVolUSD.Float64 + r.TotalVolUSD = &v + } + if isSwap.Valid { v := isSwap.Bool r.IsSwap = &v @@ -920,13 +960,16 @@ INSERT INTO mevcommit_57173.processed_l1_txns_v2 ( weth_vol, token_vol_eth, swap_vol_eth, + eth_price_usd, + swap_vol_usd, + total_vol_usd, is_swap, is_lending, is_transfer, is_approval, primary_class, protocol -) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?); +) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?); ` tsStr := comp.BlockTime.UTC().Format("2006-01-02 15:04:05") @@ -943,6 +986,9 @@ INSERT INTO mevcommit_57173.processed_l1_txns_v2 ( comp.WethVolEth, comp.TokenVolEth, comp.SwapVolEth, + comp.EthPriceUSD, + comp.SwapVolUSD, + comp.TotalVolUSD, comp.IsSwap, comp.IsLending, comp.IsTransfer, @@ -993,6 +1039,9 @@ SET weth_vol = ?, token_vol_eth = ?, swap_vol_eth = ?, + eth_price_usd = ?, + swap_vol_usd = ?, + total_vol_usd = ?, is_swap = ?, is_lending = ?, is_transfer = ?, @@ -1010,6 +1059,9 @@ WHERE LOWER(CAST(l1_tx_hash AS VARCHAR)) = ?; comp.WethVolEth, comp.TokenVolEth, comp.SwapVolEth, + comp.EthPriceUSD, + comp.SwapVolUSD, + comp.TotalVolUSD, comp.IsSwap, comp.IsLending, comp.IsTransfer, @@ -1035,6 +1087,10 @@ SET token_vol_eth = CASE WHEN token_vol_eth IS NULL THEN ? ELSE token_vol_eth END, swap_vol_eth = CASE WHEN swap_vol_eth IS NULL THEN ? ELSE swap_vol_eth END, + eth_price_usd = CASE WHEN eth_price_usd IS NULL THEN ? ELSE eth_price_usd END, + swap_vol_usd = CASE WHEN swap_vol_usd IS NULL THEN ? ELSE swap_vol_usd END, + total_vol_usd = CASE WHEN total_vol_usd IS NULL THEN ? ELSE total_vol_usd END, + is_swap = CASE WHEN is_swap IS NULL THEN ? ELSE is_swap END, is_lending = CASE WHEN is_lending IS NULL THEN ? ELSE is_lending END, is_transfer = CASE WHEN is_transfer IS NULL THEN ? ELSE is_transfer END, @@ -1054,6 +1110,9 @@ WHERE LOWER(CAST(l1_tx_hash AS VARCHAR)) = ?; comp.WethVolEth, comp.TokenVolEth, comp.SwapVolEth, + comp.EthPriceUSD, + comp.SwapVolUSD, + comp.TotalVolUSD, comp.IsSwap, comp.IsLending, comp.IsTransfer, @@ -1257,7 +1316,7 @@ func computeAll(txHash0x string, apiKey string) (*Computed, error) { totalVol := txValueEth + wethVol + tokenVol // Swap evidence: - swapIsUni := hasAnyTopic0(item.LogEvents, uniswapV2SwapTopic0, uniswapV3SwapTopic0, uniswapV4SwapTopic0) + swapIsUni := hasAnyTopic0(item.LogEvents, uniswapV2SwapTopic0, uniswapV3SwapTopic0, uniswapV4SwapTopic0, solverCallExecTopic0) hasRouter := touchedDexInfra(item.LogEvents, toAddr) hasSwapEvent := hasDecodedSwapLikeEvent(item.LogEvents) @@ -1280,6 +1339,12 @@ func computeAll(txHash0x string, apiKey string) (*Computed, error) { primary := primaryClass(isSwap, isLending, isTransfer, isApproval) + // Fetch ETH/USD price for this day (cached per date to avoid repeat API calls). + ethPriceUSD, err := fetchETHPriceUSD(apiKey, blockTime) + if err != nil { + log.Printf("warning: fetchETHPriceUSD for %s: %v (setting USD fields to 0)", blockTime.Format("2006-01-02"), err) + } + return &Computed{ Hash0x: txHash0x, HashNorm: hashNorm, @@ -1290,6 +1355,9 @@ func computeAll(txHash0x string, apiKey string) (*Computed, error) { WethVolEth: wethVol, TokenVolEth: tokenVol, TotalVolEth: totalVol, + EthPriceUSD: ethPriceUSD, + SwapVolUSD: swapVol * ethPriceUSD, + TotalVolUSD: totalVol * ethPriceUSD, IsSwap: isSwap, SwapVolEth: swapVol, IsLending: isLending, @@ -1442,7 +1510,7 @@ func isCovalentTxNotFound(err error) bool { return strings.Contains(ls, "transaction hash:") && strings.Contains(ls, " not found") } -func shouldTombstoneNotFound(db *sql.DB, txHashNorm string, minAgeBlocks int64) (bool, string) { +func shouldTombstoneNotFound(db *sql.DB, txHashNorm string, source string, commitmentIndex *string, minAgeMinutes int64) (bool, string) { // txHashNorm is expected WITHOUT 0x, lowercased txHashNorm = strings.ToLower(strings.TrimSpace(strip0x(txHashNorm))) @@ -1465,19 +1533,57 @@ WHERE LOWER(CAST(status AS VARCHAR)) IN ('confirmed','pre-confirmed'); if err := db.QueryRow(q, txHashNorm).Scan(&head, &txb); err != nil { return false, fmt.Sprintf("age_query_error: %v", err) } - if !txb.Valid || txb.Int64 == 0 { - // We can’t prove it’s old; be conservative and don’t tombstone. - return false, "no_tx_block" - } - if !head.Valid || head.Int64 == 0 { - return false, "no_head_block" - } - age := head.Int64 - txb.Int64 - if age <= minAgeBlocks { - return false, fmt.Sprintf("too_recent age_blocks=%d head=%d txb=%d", age, head.Int64, txb.Int64) + // RPC-only path: use L1 block numbers (~12 sec/block). + if txb.Valid && txb.Int64 > 0 && head.Valid && head.Int64 > 0 { + minAgeBlocks := minAgeMinutes * 60 / 12 // e.g. 15 min -> 75 blocks + age := head.Int64 - txb.Int64 + if age <= minAgeBlocks { + return false, fmt.Sprintf("too_recent age_blocks=%d head=%d txb=%d", age, head.Int64, txb.Int64) + } + return true, fmt.Sprintf("old_enough age_blocks=%d head=%d txb=%d", age, head.Int64, txb.Int64) + } + + // Event-backed path: use CommitmentProcessed event timestamp from tx_view. + if source == "events" && commitmentIndex != nil { + ciNorm := strings.ToLower(strings.TrimSpace(strip0x(*commitmentIndex))) + q2 := ` +SELECT l_block_timestamp +FROM mevcommit_57173.tx_view +WHERE l_decoded IS NOT NULL + AND COALESCE(l_removed, 0) = 0 + AND get_json_string(CAST(l_decoded AS VARCHAR), '$.name') = 'CommitmentProcessed' + AND t_chain_id IN (8855, 57173) + AND LOWER(REPLACE( + get_json_string(CAST(l_decoded AS VARCHAR), '$.args.commitmentIndex'), + '0x', '' + )) = ? +LIMIT 1; +` + var eventTS sql.NullInt64 + if err := db.QueryRow(q2, ciNorm).Scan(&eventTS); err != nil { + return false, fmt.Sprintf("event_ts_query_error: %v", err) + } + if !eventTS.Valid || eventTS.Int64 == 0 { + return false, "no_event_timestamp" + } + + // l_block_timestamp may be seconds or milliseconds; normalise to seconds. + tsSec := eventTS.Int64 + if tsSec > 1_000_000_000_000 { + tsSec = tsSec / 1000 + } + nowSec := time.Now().Unix() + ageSec := nowSec - tsSec + minAgeSec := minAgeMinutes * 60 + if ageSec < minAgeSec { + return false, fmt.Sprintf("event_too_recent age_sec=%d threshold=%d", ageSec, minAgeSec) + } + return true, fmt.Sprintf("event_old_enough age_sec=%d", ageSec) } - return true, fmt.Sprintf("old_enough age_blocks=%d head=%d txb=%d", age, head.Int64, txb.Int64) + + // No block info and not event-backed; be conservative. + return false, "no_tx_block" } func fetchTokenPricesETH(apiKey string, tokenSet map[string]struct{}, dateStr string) (map[string]float64, error) { @@ -1539,6 +1645,66 @@ func fetchTokenPricesETH(apiKey string, tokenSet map[string]struct{}, dateStr st return out, nil } +// fetchETHPriceUSD returns the ETH price in USD for the given block time. +// Results are cached per calendar day so same-day transactions share one API call. +func fetchETHPriceUSD(apiKey string, blockTime time.Time) (float64, error) { + dateStr := blockTime.UTC().Format("2006-01-02") + + // Check cache first. + if price, ok := ethUSDCache[dateStr]; ok { + return price, nil + } + + apiKey = strings.TrimSpace(apiKey) + url := fmt.Sprintf("%s/pricing/historical_by_addresses_v2/%s/USD/%s/?from=%s&to=%s", + covalentBaseURL, chainName, wethAddress, dateStr, dateStr) + + ctx, cancel := context.WithTimeout(context.Background(), 25*time.Second) + defer cancel() + + req, err := http.NewRequestWithContext(ctx, "GET", url, nil) + if err != nil { + return 0, err + } + req.Header.Set("Authorization", "Bearer "+apiKey) + req.Header.Set("Accept", "application/json") + + resp, err := httpClient.Do(req) + if err != nil { + return 0, fmt.Errorf("ETH/USD pricing request error: %w", err) + } + defer func() { + if err := resp.Body.Close(); err != nil { + log.Printf("resp.Body.Close: %v", err) + } + }() + + body, readErr := io.ReadAll(resp.Body) + if readErr != nil { + return 0, fmt.Errorf("read ETH/USD pricing body: %w", readErr) + } + if resp.StatusCode != 200 { + return 0, fmt.Errorf("covalent ETH/USD pricing HTTP %d: %s", resp.StatusCode, truncateBody(body)) + } + + var pr PricingResponse + if err := json.Unmarshal(body, &pr); err != nil { + return 0, fmt.Errorf("ETH/USD pricing JSON decode: %w; body: %s", err, truncateBody(body)) + } + if pr.Error { + return 0, fmt.Errorf("ETH/USD pricing error: %s", pr.ErrorMessage) + } + + for _, item := range pr.Data { + if len(item.Prices) > 0 && item.Prices[0].Price > 0 { + price := item.Prices[0].Price + ethUSDCache[dateStr] = price + return price, nil + } + } + return 0, fmt.Errorf("no ETH/USD price data returned for date %s", dateStr) +} + // -------------------- Base volume calc -------------------- func computeWethAndTokenVolEth(logs []LogEvent, blockTime time.Time, apiKey string) (wethVolEth float64, tokenVolEth float64, err error) { @@ -1943,7 +2109,8 @@ func isSwapLikeEvent(name string) bool { "Trade", "OrderSettled", "Settlement", "Fill", "LimitOrderFilled", "RfqOrderFilled", "OrderFilled", "TransformERC20", "TransformedERC20", "ERC20BridgeTransfer", - "DODOSwap", "DODOV2SellBaseToken", "DODOV2SellQuoteToken", "Buy", "Sell": + "DODOSwap", "DODOV2SellBaseToken", "DODOV2SellQuoteToken", "Buy", "Sell", + "SolverCallExecuted": return true default: return false