Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions tools/l1-tx-volume-indexer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ This tool:
- Computes:
- `l1_timestamp`, `from_address`, `to_address`
- `total_vol_eth`, `eth_vol`, `weth_vol`, `token_vol_eth`, `swap_vol_eth`
- `eth_price_usd`, `swap_vol_usd`, `total_vol_usd`
- `is_swap`, `is_lending`, `is_transfer`, `is_approval`
- `primary_class`, `protocol`
- Writes results into `mevcommit_57173.processed_l1_txns_v2`.
Expand Down
205 changes: 186 additions & 19 deletions tools/l1-tx-volume-indexer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,18 @@ const (
zeroAddr = "0x0000000000000000000000000000000000000000"

// User-provided swap topics:
uniswapV2SwapTopic0 = "0x1c411e9a96e071241c2f21f7726b17ae89e3cab4c78be50e062b03a9fffbbad1"
uniswapV3SwapTopic0 = "0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67"
uniswapV4SwapTopic0 = "0x40e9cecb9f5f1f1c5b9c97dec2917b7ee92e57ba5563708daca94dd84ad7112f"
uniswapV2SwapTopic0 = "0x1c411e9a96e071241c2f21f7726b17ae89e3cab4c78be50e062b03a9fffbbad1"
uniswapV3SwapTopic0 = "0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67"
uniswapV4SwapTopic0 = "0x40e9cecb9f5f1f1c5b9c97dec2917b7ee92e57ba5563708daca94dd84ad7112f"
solverCallExecTopic0 = "0x93485dcd31a905e3ffd7b012abe3438fa8fa77f98ddc9f50e879d3fa7ccdc324"
)

var httpClient = &http.Client{Timeout: 180 * time.Second}

// In-memory cache: date string "YYYY-MM-DD" -> ETH/USD price.
// Avoids repeat Covalent API calls for transactions on the same day.
var ethUSDCache = map[string]float64{}

// Router/settlement allowlist (used to prevent false-positive swaps)
var swapRouterAllowlist = map[string]string{
"0x66a9893cc07d91d95644aedd05d03f95e1dba8af": "UniswapV4UniversalRouter",
Expand Down Expand Up @@ -119,6 +124,10 @@ type ExistingRow struct {
TokenVol *float64
SwapVol *float64

EthPriceUSD *float64
SwapVolUSD *float64
TotalVolUSD *float64

IsSwap *bool
IsLending *bool
IsTransfer *bool
Expand Down Expand Up @@ -190,6 +199,10 @@ type Computed struct {
TokenVolEth float64
TotalVolEth float64

EthPriceUSD float64
SwapVolUSD float64
TotalVolUSD float64

// Classification
IsSwap bool
IsLending bool
Expand Down Expand Up @@ -340,7 +353,7 @@ func main() {

// If we get a"tx not found" (Covalent 404), insert a tombstone (not_found row)so we don't retry forever.
if w.InsertCandidate != nil && !*onlyUpdates && isCovalentTxNotFound(err) {
ok, why := shouldTombstoneNotFound(db, w.InsertCandidate.HashNorm, 75)
ok, why := shouldTombstoneNotFound(db, w.InsertCandidate.HashNorm, w.InsertCandidate.Source, w.InsertCandidate.CommitmentIndex, 15)
if !ok {
log.Printf("skip tombstone %s (%s)", w.Hash0x, why)
continue
Expand Down Expand Up @@ -706,6 +719,10 @@ SELECT
token_vol_eth,
swap_vol_eth,

eth_price_usd,
swap_vol_usd,
total_vol_usd,

is_swap,
is_lending,
is_transfer,
Expand Down Expand Up @@ -747,6 +764,10 @@ SELECT
token_vol_eth,
swap_vol_eth,

eth_price_usd,
swap_vol_usd,
total_vol_usd,

is_swap,
is_lending,
is_transfer,
Expand All @@ -769,6 +790,7 @@ WHERE l1_tx_hash IS NOT NULL
OR l1_timestamp IS NULL
OR total_vol_eth IS NULL
OR swap_vol_eth IS NULL
OR eth_price_usd IS NULL
)
%s;
`, lim)
Expand Down Expand Up @@ -802,6 +824,10 @@ WHERE l1_tx_hash IS NOT NULL
tokv sql.NullFloat64
swapv sql.NullFloat64

ethPriceUSD sql.NullFloat64
swapVolUSD sql.NullFloat64
totalVolUSD sql.NullFloat64

isSwap sql.NullBool
isLnd sql.NullBool
isTr sql.NullBool
Expand All @@ -815,6 +841,7 @@ WHERE l1_tx_hash IS NOT NULL
&hashNorm, &hash0x,
&ci, &ts, &fr, &to, &bd, &cm,
&total, &ethv, &wethv, &tokv, &swapv,
&ethPriceUSD, &swapVolUSD, &totalVolUSD,
&isSwap, &isLnd, &isTr, &isAp,
&prim, &prot,
); err != nil {
Expand Down Expand Up @@ -871,6 +898,19 @@ WHERE l1_tx_hash IS NOT NULL
r.SwapVol = &v
}

if ethPriceUSD.Valid {
v := ethPriceUSD.Float64
r.EthPriceUSD = &v
}
if swapVolUSD.Valid {
v := swapVolUSD.Float64
r.SwapVolUSD = &v
}
if totalVolUSD.Valid {
v := totalVolUSD.Float64
r.TotalVolUSD = &v
}

if isSwap.Valid {
v := isSwap.Bool
r.IsSwap = &v
Expand Down Expand Up @@ -920,13 +960,16 @@ INSERT INTO mevcommit_57173.processed_l1_txns_v2 (
weth_vol,
token_vol_eth,
swap_vol_eth,
eth_price_usd,
swap_vol_usd,
total_vol_usd,
is_swap,
is_lending,
is_transfer,
is_approval,
primary_class,
protocol
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);
`
tsStr := comp.BlockTime.UTC().Format("2006-01-02 15:04:05")

Expand All @@ -943,6 +986,9 @@ INSERT INTO mevcommit_57173.processed_l1_txns_v2 (
comp.WethVolEth,
comp.TokenVolEth,
comp.SwapVolEth,
comp.EthPriceUSD,
comp.SwapVolUSD,
comp.TotalVolUSD,
comp.IsSwap,
comp.IsLending,
comp.IsTransfer,
Expand Down Expand Up @@ -993,6 +1039,9 @@ SET
weth_vol = ?,
token_vol_eth = ?,
swap_vol_eth = ?,
eth_price_usd = ?,
swap_vol_usd = ?,
total_vol_usd = ?,
is_swap = ?,
is_lending = ?,
is_transfer = ?,
Expand All @@ -1010,6 +1059,9 @@ WHERE LOWER(CAST(l1_tx_hash AS VARCHAR)) = ?;
comp.WethVolEth,
comp.TokenVolEth,
comp.SwapVolEth,
comp.EthPriceUSD,
comp.SwapVolUSD,
comp.TotalVolUSD,
comp.IsSwap,
comp.IsLending,
comp.IsTransfer,
Expand All @@ -1035,6 +1087,10 @@ SET
token_vol_eth = CASE WHEN token_vol_eth IS NULL THEN ? ELSE token_vol_eth END,
swap_vol_eth = CASE WHEN swap_vol_eth IS NULL THEN ? ELSE swap_vol_eth END,

eth_price_usd = CASE WHEN eth_price_usd IS NULL THEN ? ELSE eth_price_usd END,
swap_vol_usd = CASE WHEN swap_vol_usd IS NULL THEN ? ELSE swap_vol_usd END,
total_vol_usd = CASE WHEN total_vol_usd IS NULL THEN ? ELSE total_vol_usd END,

is_swap = CASE WHEN is_swap IS NULL THEN ? ELSE is_swap END,
is_lending = CASE WHEN is_lending IS NULL THEN ? ELSE is_lending END,
is_transfer = CASE WHEN is_transfer IS NULL THEN ? ELSE is_transfer END,
Expand All @@ -1054,6 +1110,9 @@ WHERE LOWER(CAST(l1_tx_hash AS VARCHAR)) = ?;
comp.WethVolEth,
comp.TokenVolEth,
comp.SwapVolEth,
comp.EthPriceUSD,
comp.SwapVolUSD,
comp.TotalVolUSD,
comp.IsSwap,
comp.IsLending,
comp.IsTransfer,
Expand Down Expand Up @@ -1257,7 +1316,7 @@ func computeAll(txHash0x string, apiKey string) (*Computed, error) {
totalVol := txValueEth + wethVol + tokenVol

// Swap evidence:
swapIsUni := hasAnyTopic0(item.LogEvents, uniswapV2SwapTopic0, uniswapV3SwapTopic0, uniswapV4SwapTopic0)
swapIsUni := hasAnyTopic0(item.LogEvents, uniswapV2SwapTopic0, uniswapV3SwapTopic0, uniswapV4SwapTopic0, solverCallExecTopic0)
hasRouter := touchedDexInfra(item.LogEvents, toAddr)
hasSwapEvent := hasDecodedSwapLikeEvent(item.LogEvents)

Expand All @@ -1280,6 +1339,12 @@ func computeAll(txHash0x string, apiKey string) (*Computed, error) {

primary := primaryClass(isSwap, isLending, isTransfer, isApproval)

// Fetch ETH/USD price for this day (cached per date to avoid repeat API calls).
ethPriceUSD, err := fetchETHPriceUSD(apiKey, blockTime)
if err != nil {
log.Printf("warning: fetchETHPriceUSD for %s: %v (setting USD fields to 0)", blockTime.Format("2006-01-02"), err)
}

return &Computed{
Hash0x: txHash0x,
HashNorm: hashNorm,
Expand All @@ -1290,6 +1355,9 @@ func computeAll(txHash0x string, apiKey string) (*Computed, error) {
WethVolEth: wethVol,
TokenVolEth: tokenVol,
TotalVolEth: totalVol,
EthPriceUSD: ethPriceUSD,
SwapVolUSD: swapVol * ethPriceUSD,
TotalVolUSD: totalVol * ethPriceUSD,
IsSwap: isSwap,
SwapVolEth: swapVol,
IsLending: isLending,
Expand Down Expand Up @@ -1442,7 +1510,7 @@ func isCovalentTxNotFound(err error) bool {
return strings.Contains(ls, "transaction hash:") && strings.Contains(ls, " not found")
}

func shouldTombstoneNotFound(db *sql.DB, txHashNorm string, minAgeBlocks int64) (bool, string) {
func shouldTombstoneNotFound(db *sql.DB, txHashNorm string, source string, commitmentIndex *string, minAgeMinutes int64) (bool, string) {
// txHashNorm is expected WITHOUT 0x, lowercased
txHashNorm = strings.ToLower(strings.TrimSpace(strip0x(txHashNorm)))

Expand All @@ -1465,19 +1533,57 @@ WHERE LOWER(CAST(status AS VARCHAR)) IN ('confirmed','pre-confirmed');
if err := db.QueryRow(q, txHashNorm).Scan(&head, &txb); err != nil {
return false, fmt.Sprintf("age_query_error: %v", err)
}
if !txb.Valid || txb.Int64 == 0 {
// We can’t prove it’s old; be conservative and don’t tombstone.
return false, "no_tx_block"
}
if !head.Valid || head.Int64 == 0 {
return false, "no_head_block"
}

age := head.Int64 - txb.Int64
if age <= minAgeBlocks {
return false, fmt.Sprintf("too_recent age_blocks=%d head=%d txb=%d", age, head.Int64, txb.Int64)
// RPC-only path: use L1 block numbers (~12 sec/block).
if txb.Valid && txb.Int64 > 0 && head.Valid && head.Int64 > 0 {
minAgeBlocks := minAgeMinutes * 60 / 12 // e.g. 15 min -> 75 blocks
age := head.Int64 - txb.Int64
if age <= minAgeBlocks {
return false, fmt.Sprintf("too_recent age_blocks=%d head=%d txb=%d", age, head.Int64, txb.Int64)
}
return true, fmt.Sprintf("old_enough age_blocks=%d head=%d txb=%d", age, head.Int64, txb.Int64)
}

// Event-backed path: use CommitmentProcessed event timestamp from tx_view.
if source == "events" && commitmentIndex != nil {
ciNorm := strings.ToLower(strings.TrimSpace(strip0x(*commitmentIndex)))
q2 := `
SELECT l_block_timestamp
FROM mevcommit_57173.tx_view
WHERE l_decoded IS NOT NULL
AND COALESCE(l_removed, 0) = 0
AND get_json_string(CAST(l_decoded AS VARCHAR), '$.name') = 'CommitmentProcessed'
AND t_chain_id IN (8855, 57173)
AND LOWER(REPLACE(
get_json_string(CAST(l_decoded AS VARCHAR), '$.args.commitmentIndex'),
'0x', ''
)) = ?
LIMIT 1;
`
var eventTS sql.NullInt64
if err := db.QueryRow(q2, ciNorm).Scan(&eventTS); err != nil {
return false, fmt.Sprintf("event_ts_query_error: %v", err)
}
if !eventTS.Valid || eventTS.Int64 == 0 {
return false, "no_event_timestamp"
}

// l_block_timestamp may be seconds or milliseconds; normalise to seconds.
tsSec := eventTS.Int64
if tsSec > 1_000_000_000_000 {
tsSec = tsSec / 1000
}
nowSec := time.Now().Unix()
ageSec := nowSec - tsSec
minAgeSec := minAgeMinutes * 60
if ageSec < minAgeSec {
return false, fmt.Sprintf("event_too_recent age_sec=%d threshold=%d", ageSec, minAgeSec)
}
return true, fmt.Sprintf("event_old_enough age_sec=%d", ageSec)
}
return true, fmt.Sprintf("old_enough age_blocks=%d head=%d txb=%d", age, head.Int64, txb.Int64)

// No block info and not event-backed; be conservative.
return false, "no_tx_block"
}

func fetchTokenPricesETH(apiKey string, tokenSet map[string]struct{}, dateStr string) (map[string]float64, error) {
Expand Down Expand Up @@ -1539,6 +1645,66 @@ func fetchTokenPricesETH(apiKey string, tokenSet map[string]struct{}, dateStr st
return out, nil
}

// fetchETHPriceUSD returns the ETH price in USD for the given block time.
// Results are cached per calendar day so same-day transactions share one API call.
func fetchETHPriceUSD(apiKey string, blockTime time.Time) (float64, error) {
dateStr := blockTime.UTC().Format("2006-01-02")

// Check cache first.
if price, ok := ethUSDCache[dateStr]; ok {
return price, nil
}

apiKey = strings.TrimSpace(apiKey)
url := fmt.Sprintf("%s/pricing/historical_by_addresses_v2/%s/USD/%s/?from=%s&to=%s",
covalentBaseURL, chainName, wethAddress, dateStr, dateStr)

ctx, cancel := context.WithTimeout(context.Background(), 25*time.Second)
defer cancel()

req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return 0, err
}
req.Header.Set("Authorization", "Bearer "+apiKey)
req.Header.Set("Accept", "application/json")

resp, err := httpClient.Do(req)
if err != nil {
return 0, fmt.Errorf("ETH/USD pricing request error: %w", err)
}
defer func() {
if err := resp.Body.Close(); err != nil {
log.Printf("resp.Body.Close: %v", err)
}
}()

body, readErr := io.ReadAll(resp.Body)
if readErr != nil {
return 0, fmt.Errorf("read ETH/USD pricing body: %w", readErr)
}
if resp.StatusCode != 200 {
return 0, fmt.Errorf("covalent ETH/USD pricing HTTP %d: %s", resp.StatusCode, truncateBody(body))
}

var pr PricingResponse
if err := json.Unmarshal(body, &pr); err != nil {
return 0, fmt.Errorf("ETH/USD pricing JSON decode: %w; body: %s", err, truncateBody(body))
}
if pr.Error {
return 0, fmt.Errorf("ETH/USD pricing error: %s", pr.ErrorMessage)
}

for _, item := range pr.Data {
if len(item.Prices) > 0 && item.Prices[0].Price > 0 {
price := item.Prices[0].Price
ethUSDCache[dateStr] = price
return price, nil
}
}
return 0, fmt.Errorf("no ETH/USD price data returned for date %s", dateStr)
}

// -------------------- Base volume calc --------------------

func computeWethAndTokenVolEth(logs []LogEvent, blockTime time.Time, apiKey string) (wethVolEth float64, tokenVolEth float64, err error) {
Expand Down Expand Up @@ -1943,7 +2109,8 @@ func isSwapLikeEvent(name string) bool {
"Trade", "OrderSettled", "Settlement",
"Fill", "LimitOrderFilled", "RfqOrderFilled", "OrderFilled",
"TransformERC20", "TransformedERC20", "ERC20BridgeTransfer",
"DODOSwap", "DODOV2SellBaseToken", "DODOV2SellQuoteToken", "Buy", "Sell":
"DODOSwap", "DODOV2SellBaseToken", "DODOV2SellQuoteToken", "Buy", "Sell",
"SolverCallExecuted":
return true
default:
return false
Expand Down
Loading