Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 32 additions & 4 deletions bal_tools/etherscan.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,27 @@ def _get_block_by_timestamp_plasma(
f"Error fetching block for timestamp {timestamp} on plasma: {str(e)}"
)

def _get_block_by_timestamp_routescan(
self, chain_id: int, timestamp: int, closest: str = "before"
) -> Optional[int]:
routescan_url = (
f"https://api.routescan.io/v2/network/mainnet/evm/{chain_id}/etherscan/api"
)
params = {
"module": "block",
"action": "getblocknobytime",
"timestamp": timestamp,
"closest": closest,
}
response = self.session.get(routescan_url, params=params, timeout=30)
response.raise_for_status()
data = response.json()

if data.get("status") == "1" and data.get("result"):
return int(data["result"])

return None

def get_block_by_timestamp(
self, chain: str, timestamp: int, closest: str = "before"
) -> Optional[int]:
Expand Down Expand Up @@ -119,7 +140,14 @@ def get_block_by_timestamp(

return None

except Exception as e:
raise Exception(
f"Error fetching block for timestamp {timestamp} on {chain}: {str(e)}"
)
except Exception as etherscan_error:
# if etherscan fails, try routescan as fallback
try:
return self._get_block_by_timestamp_routescan(
chain_id, timestamp, closest
)
except Exception:
# if routescan also fails, raise the original etherscan error
raise Exception(
f"Error fetching block for timestamp {timestamp} on {chain}: {str(etherscan_error)}"
)
76 changes: 27 additions & 49 deletions bal_tools/subgraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,61 +381,39 @@ def get_first_block_after_utc_timestamp(
if timestamp > int(datetime.now().strftime("%s")):
timestamp = int(datetime.now().strftime("%s")) - 2000

max_retries = 3

for attempt in range(max_retries):
if use_etherscan:
try:
if use_etherscan:
try:
if not self.etherscan_client:
self.etherscan_client = Etherscan()

block_number = self.etherscan_client.get_block_by_timestamp(
chain=self.chain, timestamp=timestamp, closest="after"
)
if not self.etherscan_client:
self.etherscan_client = Etherscan()

if block_number:
return block_number

except Exception as e:
if attempt == max_retries - 1:
warnings.warn(
f"Etherscan V2 block fetch failed for chain {self.chain}: {str(e)}. Falling back to subgraph.",
UserWarning,
)
else:
if "NOTOK" in str(e) or "temporarily unavailable" in str(e):
time.sleep(2**attempt)
continue
warnings.warn(
f"Etherscan V2 block fetch failed for chain {self.chain}: {str(e)}. Falling back to subgraph.",
UserWarning,
)

data = self.fetch_graphql_data(
"blocks",
"first_block_after_ts",
{
"timestamp_gt": int(timestamp) - 200,
"timestamp_lt": int(timestamp) + 200,
},
block_number = self.etherscan_client.get_block_by_timestamp(
chain=self.chain, timestamp=timestamp, closest="after"
)
data["blocks"].sort(key=lambda x: x["timestamp"], reverse=True)
return int(data["blocks"][0]["number"])

except Exception as e:
if "bad indexers" in str(e) and attempt < max_retries - 1:
time.sleep(2**attempt)
continue
if block_number:
return block_number

if attempt == max_retries - 1:
raise Exception(
f"Failed to fetch block for timestamp {timestamp} on {self.chain}: {str(e)}"
)
except Exception as e:
warnings.warn(
f"Etherscan V2 block fetch failed for chain {self.chain}: {str(e)}. Falling back to subgraph.",
UserWarning,
)

raise Exception(
f"Failed to fetch block for timestamp {timestamp} on {self.chain} after {max_retries} attempts"
)
try:
data = self.fetch_graphql_data(
"blocks",
"first_block_after_ts",
{
"timestamp_gt": int(timestamp) - 200,
"timestamp_lt": int(timestamp) + 200,
},
)
data["blocks"].sort(key=lambda x: x["timestamp"], reverse=True)
return int(data["blocks"][0]["number"])
except Exception as e:
raise Exception(
f"Failed to fetch block for timestamp {timestamp} on {self.chain}: {str(e)}"
)

def filter_outliers_and_average(
self, prices: List[Decimal], iqr_multiplier: float = 100_000.0
Expand Down