From 05e7e9dfc7aad99c9dd3ab6bb36d133bc86401d4 Mon Sep 17 00:00:00 2001 From: lou lecrivain Date: Tue, 25 Nov 2025 11:48:04 +0100 Subject: [PATCH] switch to requests.Session + per-process fetcher --- cosmo/clients/netbox_client.py | 5 ++-- cosmo/clients/netbox_v4.py | 54 +++++++++++++++++++--------------- cosmo/tests/utils.py | 11 +++++-- 3 files changed, 42 insertions(+), 28 deletions(-) diff --git a/cosmo/clients/netbox_client.py b/cosmo/clients/netbox_client.py index 73e07fe..34bbdcc 100644 --- a/cosmo/clients/netbox_client.py +++ b/cosmo/clients/netbox_client.py @@ -8,10 +8,11 @@ class NetboxAPIClient: def __init__(self, url, token, interprocess_shared_cache: DictProxy): self.url = url self.token = token + self.session = requests.Session() self.cache = interprocess_shared_cache def query(self, query): - r = requests.post( + r = self.session.post( urljoin(self.url, "/graphql/"), json={"query": query}, headers={ @@ -33,7 +34,7 @@ def query(self, query): def _cached_get(self, url, headers): if url not in self.cache: - self.cache[url] = requests.get( + self.cache[url] = self.session.get( url, headers=headers, ) diff --git a/cosmo/clients/netbox_v4.py b/cosmo/clients/netbox_v4.py index 6808e17..9604768 100644 --- a/cosmo/clients/netbox_v4.py +++ b/cosmo/clients/netbox_v4.py @@ -6,12 +6,17 @@ from cosmo.clients.netbox_client import NetboxAPIClient +fetcher: NetboxAPIClient -class ParallelQuery(ABC): - def __init__(self, client: NetboxAPIClient, **kwargs): - self.client = client +def proc_init(url, token, shared_dict): + global fetcher + fetcher = NetboxAPIClient(url, token, shared_dict) + + +class ParallelQuery(ABC): + def __init__(self, **kwargs): self.data_promise = None self.kwargs = kwargs @@ -71,7 +76,7 @@ def _fetch_data(self, kwargs, pool): """ ) - return self.client.query(query_template.substitute())["data"] + return fetcher.query(query_template.substitute())["data"] def _merge_into(self, data: dict, query_data): @@ -156,7 +161,7 @@ def _fetch_data(self, kwargs, pool): """ ) - return self.client.query(query_template.substitute())["data"] + return fetcher.query(query_template.substitute())["data"] def _merge_into(self, data: dict, query_data): @@ -263,7 +268,7 @@ def _fetch_data(self, kwargs, pool): """ ) - return self.client.query(query_template.substitute())["data"] + return fetcher.query(query_template.substitute())["data"] def _merge_into(self, data: dict, query_data): return { @@ -276,7 +281,7 @@ class StaticRouteQuery(ParallelQuery): def _fetch_data(self, kwargs, pool): device_list = kwargs.get("device_list") - return self.client.query_rest( + return fetcher.query_rest( "api/plugins/routing/staticroutes/", {"device": device_list} ) @@ -307,7 +312,7 @@ class IPPoolDataQuery(ParallelQuery): def _fetch_data(self, kwargs, pool): device_list = kwargs.get("device_list") - return self.client.query_rest( + return fetcher.query_rest( "api/plugins/ip-pools/ippools/", {"devices": device_list} ) @@ -339,7 +344,7 @@ def _merge_into(self, data: dict, query_data): class TobagoLineMembersDataQuery(ParallelQuery): def _fetch_data(self, kwargs, pool): device = kwargs.get("device") - line_members = self.client.query_rest( + line_members = fetcher.query_rest( "api/plugins/tobago/line-members/find-by-object/", {"content_type": "dcim.device", "object_name": device}, ) @@ -387,7 +392,7 @@ def _merge_into(self, data: dict, query_result): class DeviceMACQuery(ParallelQuery): def _fetch_data(self, kwargs, pool): device_list = kwargs.get("device_list") - return self.client.query_rest( + return fetcher.query_rest( "api/dcim/interfaces", {"primary_mac_address__n": "null", "device": device_list}, ) @@ -413,7 +418,7 @@ def _merge_into(self, data: dict, query_data): class DeviceDataQuery(ParallelQuery): def __init__(self, *args, multiple_mac_addresses=False, **kwargs): - super().__init__(*args, **kwargs) + super().__init__(**kwargs) self.multiple_mac_addresses = multiple_mac_addresses def _fetch_data(self, kwargs, pool): @@ -615,7 +620,7 @@ def _fetch_data(self, kwargs, pool): device=json.dumps(device), ) - query_result = self.client.query(query) + query_result = fetcher.query(query) return query_result["data"] def _merge_into(self, data: dict, query_data): @@ -653,38 +658,39 @@ def get_data(self, device_config): queries.extend( [ DeviceDataQuery( - client, device=d, multiple_mac_addresses=self.multiple_mac_addresses, ), ( - TobagoLineMembersDataQuery(client, device=d) + TobagoLineMembersDataQuery(device=d) if self.feature_flags["tobago"] - else TobagoLineMemberDataDummyQuery(client, device=d) + else TobagoLineMemberDataDummyQuery(device=d) ), ] ) queries.extend( [ - L2VPNDataQuery(client, device_list=device_list), + L2VPNDataQuery(device_list=device_list), ( - StaticRouteQuery(client, device_list=device_list) + StaticRouteQuery(device_list=device_list) if self.feature_flags["routing"] - else StaticRouteDummyQuery(client, device_list=device_list) + else StaticRouteDummyQuery(device_list=device_list) ), - DeviceMACQuery(client, device_list=device_list), - ConnectedDevicesDataQuery(client, device_list=device_list), - LoopbackDataQuery(client, device_list=device_list), + DeviceMACQuery(device_list=device_list), + ConnectedDevicesDataQuery(device_list=device_list), + LoopbackDataQuery(device_list=device_list), ( - IPPoolDataQuery(client, device_list=device_list) + IPPoolDataQuery(device_list=device_list) if self.feature_flags["ippools"] - else IPPoolDataDummyQuery(client, device_list=device_list) + else IPPoolDataDummyQuery(device_list=device_list) ), ] ) - with manager.Pool() as pool: + with manager.Pool( + initializer=proc_init, initargs=(self.url, self.token, manager.dict()) + ) as pool: data_promises = list(map(lambda x: x.fetch_data(pool), queries)) data = dict() diff --git a/cosmo/tests/utils.py b/cosmo/tests/utils.py index 7eab04f..65fb51e 100644 --- a/cosmo/tests/utils.py +++ b/cosmo/tests/utils.py @@ -1,4 +1,7 @@ import json +from requests import Session + +from cosmo.clients.netbox_client import NetboxAPIClient class CommonSetup: @@ -85,8 +88,12 @@ def patchPostFunc(url, json, **kwargs): return ResponseMock(200, {"data": retVal}) - getMock = mocker.patch("requests.get", side_effect=patchGetFunc) - postMock = mocker.patch("requests.post", side_effect=patchPostFunc) + getMock = mocker.patch.object( + NetboxAPIClient, "session", side_effect=patchGetFunc + ) + postMock = mocker.patch.object( + NetboxAPIClient, "session", side_effect=patchPostFunc + ) return [getMock, postMock]