Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion duneanalytics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
from .__version__ import __version__, __build__, __author_email__
from .__version__ import __author__, __license__, __copyright__

from .duneanalytics import DuneAnalytics
from .duneanalytics import DuneAnalytics, DuneAnalyticsException
78 changes: 73 additions & 5 deletions duneanalytics/duneanalytics.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,26 @@

# --------- Constants --------- #

class DuneAnalyticsException(Exception):
"""Exception raised for errors during communication with Dune Analytics.

Attributes:
response -- response object received the last time before exception occurs
message -- explanation of the error
"""
def __init__(self, message="", response=None):
self.response = response
self.message = message
super().__init__(self.message)


class DuneAnalytics:
"""
DuneAnalytics class to act as python client for duneanalytics.com.
All requests to be made through this class.
"""

def __init__(self, username, password):
def __init__(self, username, password, raise_exception=False):
"""
Initialize the object
:param username: username for duneanalytics.com
Expand All @@ -28,6 +40,7 @@ def __init__(self, username, password):
self.token = None
self.username = username
self.password = password
self.raise_exception = raise_exception
self.session = Session()
headers = {
'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,'
Expand All @@ -43,7 +56,14 @@ def __init__(self, username, password):
}
self.session.headers.update(headers)

def login(self):
def _should_raise_exception(self, override_raise_exception):
return (override_raise_exception is True or (override_raise_exception is None and self.raise_exception))

def _try_raise_exception(raise_exception, description):
if (raise_exception is True or (raise_exception is None and self.raise_exception)):
raise Exception(description)

def login(self, raise_exception=None):
"""
Try to login to duneanalytics.com & get the token
:return:
Expand All @@ -59,6 +79,9 @@ def login(self):
self.session.post(csrf_url)
self.csrf = self.session.cookies.get('csrf')

if (not self.csrf and self._should_raise_exception(raise_exception)):
raise DuneAnalyticsException("Could not fetch CSRF token!", None)

# try to login
form_data = {
'action': 'login',
Expand All @@ -71,7 +94,7 @@ def login(self):
self.session.post(auth_url, data=form_data)
self.auth_refresh = self.session.cookies.get('auth-refresh')

def fetch_auth_token(self):
def fetch_auth_token(self, raise_exception=None):
"""
Fetch authorization token for the user
:return:
Expand All @@ -82,20 +105,25 @@ def fetch_auth_token(self):
if response.status_code == 200:
self.token = response.json().get('token')
else:
if (self._should_raise_exception(raise_exception)):
raise DuneAnalyticsException("Could not get auth token!", response=response)
print(response.text)

def query_result_id(self, query_id):
def query_result_id(self, query_id, parameters=None, raise_exception=None):
"""
Fetch the query result id for a query

:param query_id: provide the query_id
:param parameters: Array of {key, type, value}. For example, {key: "NFT Contract", type: "text", value: "0xBD4455dA5929D5639EE098ABFaa3241e9ae111Af"}
:return:
"""
query_data = {"operationName": "GetResult", "variables": {"query_id": query_id},
"query": "query GetResult($query_id: Int!, $parameters: [Parameter!]) "
"{\n get_result(query_id: $query_id, parameters: $parameters) "
"{\n job_id\n result_id\n __typename\n }\n}\n"
}
if parameters is not None and len(parameters) > 0:
query_data["variables"].update({ "parameters": parameters })

self.session.headers.update({'authorization': f'Bearer {self.token}'})

Expand All @@ -104,14 +132,52 @@ def query_result_id(self, query_id):
data = response.json()
# print(data)
if 'errors' in data:
if(self._should_raise_exception(raise_exception)):
raise DuneAnalyticsException("Could not get query result id!", response=response)
return None
result_id = data.get('data').get('get_result').get('result_id')
return result_id
else:
if (self._should_raise_exception(raise_exception)):
raise DuneAnalyticsException("Could not get query result id!", response=response)
print(response.text)
return None

def query_result_id_v2(self, query_id, parameters=None, raise_exception=None):
"""
Fetch the query result id for a query

:param query_id: provide the query_id
:param parameters: Array of {key, type, value}. For example, {key: "NFT Contract", type: "text", value: "0xBD4455dA5929D5639EE098ABFaa3241e9ae111Af"}
:return:
"""
query_data = {"operationName": "GetResult", "variables": {"query_id": query_id},
"query": "query GetResult($query_id: Int!, $parameters: [Parameter!]) "
"{\n get_result_v2(query_id: $query_id, parameters: $parameters) "
"{\n job_id\n result_id\n error_id\n __typename\n }\n}\n"
}
if parameters is not None and len(parameters) > 0:
query_data["variables"].update({ "parameters": parameters })

self.session.headers.update({'authorization': f'Bearer {self.token}'})

response = self.session.post(GRAPH_URL, json=query_data)
if response.status_code == 200:
data = response.json()
# print(data)
if 'errors' in data:
if(self._should_raise_exception(raise_exception)):
raise DuneAnalyticsException("Could not get query result id!", response=response)
return None
result_id = data.get('data').get('get_result_v2').get('result_id')
return result_id
else:
if (self._should_raise_exception(raise_exception)):
raise DuneAnalyticsException("Could not get query result id!", response=response)
print(response.text)
return None

def query_result(self, result_id):
def query_result(self, result_id, raise_exception=None):
"""
Fetch the result for a query
:param result_id: result id of the query
Expand All @@ -133,5 +199,7 @@ def query_result(self, result_id):
# print(data)
return data
else:
if (self._should_raise_exception(raise_exception)):
raise DuneAnalyticsException("Could not get query result data!", response=response)
print(response.text)
return {}