From 2170be6f541e66f2e06ea62b14e1e1d24dd024dc Mon Sep 17 00:00:00 2001 From: Jan Betley Date: Wed, 9 Mar 2022 18:47:48 +0100 Subject: [PATCH 1/9] Accepted invoice amount is decided in strategy --- yapapi/engine.py | 4 +++- yapapi/invoice_manager.py | 36 +++++++++++++++++++++------- yapapi/strategy/base.py | 13 ++++++++++ yapapi/strategy/wrapping_strategy.py | 4 ++++ 4 files changed, 48 insertions(+), 9 deletions(-) diff --git a/yapapi/engine.py b/yapapi/engine.py index 9059478f3..de539aa48 100644 --- a/yapapi/engine.py +++ b/yapapi/engine.py @@ -399,7 +399,9 @@ async def accept_payments_for_agreement(self, job_id: str, agreement_id: str) -> async def _agreement_payment_attempt(self, agreement_id: str) -> None: invoice_manager = self._invoice_manager - paid = await invoice_manager.attempt_payment(agreement_id, self._get_allocation) + paid = await invoice_manager.attempt_payment( + agreement_id, self._get_allocation, self._strategy.invoice_accepted_amount + ) if paid: # We've accepted the final invoice, so we can ignore debit notes job_id = invoice_manager.agreement_job(agreement_id).id diff --git a/yapapi/invoice_manager.py b/yapapi/invoice_manager.py index ec7b73047..4efa64e23 100644 --- a/yapapi/invoice_manager.py +++ b/yapapi/invoice_manager.py @@ -1,6 +1,7 @@ from asyncio import CancelledError from dataclasses import dataclass -from typing import Callable, Dict, Optional, Set, TYPE_CHECKING +from decimal import Decimal +from typing import Awaitable, Callable, Dict, Optional, Set, TYPE_CHECKING import sys from yapapi import events @@ -20,6 +21,11 @@ class AgreementData: paid: bool = False +import logging + +logger = logging.getLogger(__name__) + + class InvoiceManager: def __init__(self): self._agreement_data: Dict[str, AgreementData] = {} @@ -71,7 +77,10 @@ def set_payable(self, agreement_id: str) -> None: self._agreement_data[agreement_id].payable = True async def attempt_payment( - self, agreement_id: str, get_allocation: Callable[["Invoice"], "Allocation"] + self, + agreement_id: str, + get_allocation: Callable[["Invoice"], "Allocation"], + get_accepted_amount: Callable[["Invoice"], Awaitable[Decimal]], ) -> bool: ad = self._agreement_data.get(agreement_id) if not ad or not ad.invoice or not ad.payable or ad.paid: @@ -80,12 +89,23 @@ async def attempt_payment( invoice = ad.invoice try: allocation = get_allocation(invoice) - await invoice.accept(amount=invoice.amount, allocation=allocation) - ad.job.emit( - events.InvoiceAccepted, - agreement=ad.agreement, - invoice=invoice, - ) + accepted_amount = await get_accepted_amount(invoice) + if accepted_amount == Decimal(invoice.amount): + await invoice.accept(amount=accepted_amount, allocation=allocation) + ad.job.emit( + events.InvoiceAccepted, + agreement=ad.agreement, + invoice=invoice, + ) + else: + # We should reject the invoice, but it's not implemented in yagna, + # so we just ignore it now + logger.warning( + "Ignored invoice %s for %s, we accept only %s", + invoice.invoice_id, + invoice.amount, + accepted_amount, + ) except CancelledError: raise except Exception: diff --git a/yapapi/strategy/base.py b/yapapi/strategy/base.py index 16332657a..a9894406a 100644 --- a/yapapi/strategy/base.py +++ b/yapapi/strategy/base.py @@ -3,6 +3,7 @@ import abc from copy import deepcopy from datetime import datetime, timezone +from decimal import Decimal import logging from typing import Dict, Optional @@ -86,6 +87,14 @@ async def respond_to_provider_offer( """Respond with a modified `DemandBuilder` object to an offer coming from a provider.""" raise NotImplementedError() + @abc.abstractmethod + async def invoice_accepted_amount(self, invoice: rest.payment.Invoice) -> Decimal: + """Return the amount we accept to pay. + + Current Golem implementation accepts the invoice if returned amount equals `invoice.amount` + and ignores the invoice if it is different. This will change in the future.""" + raise NotImplementedError() + class MarketStrategy(BaseMarketStrategy, abc.ABC): """Abstract market strategy.""" @@ -170,3 +179,7 @@ async def respond_to_provider_offer( async def decorate_demand(self, demand: DemandBuilder) -> None: """Optionally add relevant constraints to a Demand.""" + + async def invoice_accepted_amount(self, invoice: rest.payment.Invoice) -> Decimal: + """Accept full invoice amount.""" + return Decimal(invoice.amount) diff --git a/yapapi/strategy/wrapping_strategy.py b/yapapi/strategy/wrapping_strategy.py index cd486428d..ec0ec73a0 100644 --- a/yapapi/strategy/wrapping_strategy.py +++ b/yapapi/strategy/wrapping_strategy.py @@ -1,4 +1,5 @@ import abc +from decimal import Decimal from yapapi.props.builder import DemandBuilder from yapapi import rest @@ -30,6 +31,9 @@ async def decorate_demand(self, demand: DemandBuilder) -> None: async def score_offer(self, offer: rest.market.OfferProposal) -> float: return await self.base_strategy.score_offer(offer) + async def invoice_accepted_amount(self, invoice: rest.payment.Invoice) -> Decimal: + return await self.base_strategy.invoice_accepted_amount(invoice) + async def respond_to_provider_offer( self, our_demand: DemandBuilder, From 9eb004254bb829337f4d5af234fece4c5fd13246 Mon Sep 17 00:00:00 2001 From: Jan Betley Date: Wed, 9 Mar 2022 19:40:25 +0100 Subject: [PATCH 2/9] Accepted debit note amount is decided in strategy --- yapapi/engine.py | 28 ++++++++++++++++++++-------- yapapi/strategy/base.py | 15 ++++++++++++++- yapapi/strategy/wrapping_strategy.py | 3 +++ 3 files changed, 37 insertions(+), 9 deletions(-) diff --git a/yapapi/engine.py b/yapapi/engine.py index de539aa48..24a47ae75 100644 --- a/yapapi/engine.py +++ b/yapapi/engine.py @@ -523,14 +523,26 @@ async def _process_debit_notes(self) -> None: try: allocation = self._get_allocation(debit_note) - await debit_note.accept( - amount=debit_note.total_amount_due, allocation=allocation - ) - job.emit( - events.DebitNoteAccepted, - agreement=agreement, - debit_note=debit_note, - ) + accepted_amount = await self._strategy.debit_note_accepted_amount(debit_note) + if accepted_amount == Decimal(debit_note.total_amount_due): + await debit_note.accept( + amount=debit_note.total_amount_due, allocation=allocation + ) + job.emit( + events.DebitNoteAccepted, + agreement=agreement, + debit_note=debit_note, + ) + else: + # We should reject the debit note, but it's not implemented in yagna, + # so we just ignore it now + logger.warning( + "Ignored debit note %s for %s, we accept only %s", + debit_note.debit_note_id, + debit_note.total_amount_due, + accepted_amount, + ) + except CancelledError: raise except Exception: diff --git a/yapapi/strategy/base.py b/yapapi/strategy/base.py index a9894406a..8fb399e07 100644 --- a/yapapi/strategy/base.py +++ b/yapapi/strategy/base.py @@ -89,12 +89,21 @@ async def respond_to_provider_offer( @abc.abstractmethod async def invoice_accepted_amount(self, invoice: rest.payment.Invoice) -> Decimal: - """Return the amount we accept to pay. + """Return the amount we accept to pay for the invoice. Current Golem implementation accepts the invoice if returned amount equals `invoice.amount` and ignores the invoice if it is different. This will change in the future.""" raise NotImplementedError() + @abc.abstractmethod + async def debit_note_accepted_amount(self, debit_note: rest.payment.DebitNote) -> Decimal: + """Return the amount we accept to pay for the debit note. + + Current Golem implementation accepts the debit note if returned amount equals a + `debit_note.total_amount_due` and ignores the debit note if it is different. + This will change in the future.""" + raise NotImplementedError() + class MarketStrategy(BaseMarketStrategy, abc.ABC): """Abstract market strategy.""" @@ -183,3 +192,7 @@ async def decorate_demand(self, demand: DemandBuilder) -> None: async def invoice_accepted_amount(self, invoice: rest.payment.Invoice) -> Decimal: """Accept full invoice amount.""" return Decimal(invoice.amount) + + async def debit_note_accepted_amount(self, debit_note: rest.payment.DebitNote) -> Decimal: + """Accept full debit note amount.""" + return Decimal(debit_note.total_amount_due) diff --git a/yapapi/strategy/wrapping_strategy.py b/yapapi/strategy/wrapping_strategy.py index ec0ec73a0..c5366d8c9 100644 --- a/yapapi/strategy/wrapping_strategy.py +++ b/yapapi/strategy/wrapping_strategy.py @@ -34,6 +34,9 @@ async def score_offer(self, offer: rest.market.OfferProposal) -> float: async def invoice_accepted_amount(self, invoice: rest.payment.Invoice) -> Decimal: return await self.base_strategy.invoice_accepted_amount(invoice) + async def debit_note_accepted_amount(self, debit_note: rest.payment.DebitNote) -> Decimal: + return await self.base_strategy.debit_note_accepted_amount(debit_note) + async def respond_to_provider_offer( self, our_demand: DemandBuilder, From 1f8bac48ba088a7ff33ef18a01e724df2198d172 Mon Sep 17 00:00:00 2001 From: Jan Betley Date: Thu, 10 Mar 2022 14:17:34 +0100 Subject: [PATCH 3/9] Strategy that pays only for perfect work This just a first POC. More-or-less works. --- examples/blender/blender.py | 29 +++++++++--- yapapi/contrib/strategy/rep_a1.py | 75 +++++++++++++++++++++++++++++++ 2 files changed, 98 insertions(+), 6 deletions(-) create mode 100644 yapapi/contrib/strategy/rep_a1.py diff --git a/examples/blender/blender.py b/examples/blender/blender.py index 6b2bb7fc9..af06bae39 100755 --- a/examples/blender/blender.py +++ b/examples/blender/blender.py @@ -75,9 +75,17 @@ async def worker(ctx: WorkContext, tasks): script.download_file(f"/golem/output/out{frame:04d}.png", output_file) try: yield script - # TODO: Check if job results are valid - # and reject by: task.reject_task(reason = 'invalid file') - task.accept_result(result=output_file) + + import random + r = random.random() + if r < 0.1: + # Activity failed + raise Exception("oops") + # if r < 0.3: + # # Task failed + # task.reject_result(retry=True) + else: + task.accept_result() except BatchTimeoutError: print( f"{TEXT_COLOR_RED}" @@ -102,7 +110,7 @@ async def worker(ctx: WorkContext, tasks): ) # Iterator over the frame indices that we want to render - frames: range = range(0, 60, 10) + frames: range = range(0, 60000, 10) # Worst-case overhead, in minutes, for initialization (negotiation, file transfer etc.) # TODO: make this dynamic, e.g. depending on the size of files to transfer init_overhead = 3 @@ -113,12 +121,21 @@ async def worker(ctx: WorkContext, tasks): timeout = timedelta(minutes=max(min(init_overhead + len(frames) * 2, max_timeout), min_timeout)) - async with Golem( + from yapapi.contrib.strategy.rep_a1 import RepA1 + + golem = Golem( budget=10.0, subnet_tag=subnet_tag, payment_driver=payment_driver, payment_network=payment_network, - ) as golem: + ) + + # Wrap the default Golem strategy + strategy = RepA1(golem.strategy) + golem.strategy = strategy + golem.add_event_consumer(strategy.on_event) + + async with golem: print_env_info(golem) num_tasks = 0 diff --git a/yapapi/contrib/strategy/rep_a1.py b/yapapi/contrib/strategy/rep_a1.py new file mode 100644 index 000000000..5f1e80eb4 --- /dev/null +++ b/yapapi/contrib/strategy/rep_a1.py @@ -0,0 +1,75 @@ +from typing import Dict, Set, TYPE_CHECKING +from decimal import Decimal +from collections import defaultdict +import logging + +from yapapi.strategy import WrappingMarketStrategy +from yapapi import events + +if TYPE_CHECKING: + from yapapi.rest.payment import DebitNote, Invoice + + +logger = logging.getLogger(__name__) + +ActivityId = str +AgreementId = str + + +class RepA1(WrappingMarketStrategy): + def __init__(self, base_strategy): + super().__init__(base_strategy) + + self._failed_activities: Set[ActivityId] = set() + self._accepted_amounts: Dict[ActivityId, Decimal] = defaultdict(Decimal) + self._agreement_activity_map: Dict[AgreementId, Set[ActivityId]] = defaultdict(set) + + def on_event(self, event: events.Event): + if isinstance(event, events.ActivityEvent): + self._agreement_activity_map[event.agreement.id].add(event.activity.id) + + if isinstance(event, events.WorkerFinished): + if event.exception is not None: + self._activity_failed(event, "WORKER EXCEPTION") + elif isinstance(event, events.TaskRejected): + self._activity_failed(event, "TASK REJECTED") + + elif isinstance(event, events.DebitNoteAccepted): + activity_id = event.debit_note.activity_id + prev_accepted_amount = self._accepted_amounts[activity_id] + new_accepted_amount = max(prev_accepted_amount, Decimal(event.debit_note.total_amount_due)) + self._accepted_amounts[activity_id] = new_accepted_amount + logger.warning("Accepted debit note for %s, total accepted amount: %s", activity_id, new_accepted_amount) + + def _activity_failed(self, event: events.ActivityEvent, reason: str): + activity_id = event.activity.id + + self._failed_activities.add(activity_id) + + provider_name = event.provider_info.name + logger.warning("Disabling payments for activity %s on %s, reason %s", activity_id, provider_name, reason) + + async def debit_note_accepted_amount(self, debit_note: "DebitNote") -> Decimal: + activity_id = debit_note.activity_id + + if activity_id in self._failed_activities: + # NOTE: currently it doesn't really matter what we return here, + # as long as it is not debit_note.total_amount_due + return self._accepted_amounts[activity_id] + return Decimal(debit_note.total_amount_due) + + async def invoice_accepted_amount(self, invoice: "Invoice") -> Decimal: + agreement_id = invoice.agreement_id + if self._agreement_has_failed_activity(agreement_id): + accepted_amount = self._total_agreement_amount(agreement_id) + logger.warning("REJECTED INVOICE FOR %s, we accept only %s", invoice.amount, accepted_amount) + return accepted_amount + else: + logger.warning("ACCEPTED INVOICE FOR %s", invoice.amount) + return Decimal(invoice.amount) + + def _agreement_has_failed_activity(self, agreement_id: str) -> bool: + return any(act in self._failed_activities for act in self._agreement_activity_map[agreement_id]) + + def _total_agreement_amount(self, agreement_id: str) -> Decimal: + return Decimal(sum(self._accepted_amounts[act] for act in self._agreement_activity_map[agreement_id])) From 059634e7ca941379efc6cfb0365aca7ecfe7d54f Mon Sep 17 00:00:00 2001 From: Jan Betley Date: Fri, 11 Mar 2022 15:58:29 +0100 Subject: [PATCH 4/9] POC of the exterenal data used in score_offer --- yapapi/contrib/strategy/rep_a1.py | 32 +++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/yapapi/contrib/strategy/rep_a1.py b/yapapi/contrib/strategy/rep_a1.py index 5f1e80eb4..7b623cb99 100644 --- a/yapapi/contrib/strategy/rep_a1.py +++ b/yapapi/contrib/strategy/rep_a1.py @@ -2,12 +2,14 @@ from decimal import Decimal from collections import defaultdict import logging +from random import gauss from yapapi.strategy import WrappingMarketStrategy from yapapi import events if TYPE_CHECKING: from yapapi.rest.payment import DebitNote, Invoice + from yapapi.rest.market import OfferProposal logger = logging.getLogger(__name__) @@ -16,6 +18,14 @@ AgreementId = str +async def get_provider_score(provider_id: str) -> float: + # This is a mock. + # In the final (alpha1) version this will query the API for the provider score, + # but the score received will have the same "meaning", i.e.: how this provider + # differs from the average provider quality, expressed in the number of SDs. + return gauss(0, 1) + + class RepA1(WrappingMarketStrategy): def __init__(self, base_strategy): super().__init__(base_strategy) @@ -24,6 +34,28 @@ def __init__(self, base_strategy): self._accepted_amounts: Dict[ActivityId, Decimal] = defaultdict(Decimal) self._agreement_activity_map: Dict[AgreementId, Set[ActivityId]] = defaultdict(set) + ################# + # OFFER SCORING + async def score_offer(self, offer: "OfferProposal") -> float: + offer_score = await super().score_offer(offer) + provider_score = await get_provider_score(offer.issuer) + combined_score = self._final_score(offer_score, provider_score) + + provider_name = offer._proposal.proposal.properties['golem.node.id.name'] + logger.info( + "Scored %s - base: %s, provider: %s, combined: %s", + provider_name, offer_score, provider_score, combined_score + ) + return combined_score + + def _final_score(self, base_score: float, provider_score: float) -> float: + # NOTE: this logic is just a POC + if provider_score < -1.5: + return -1 + return base_score + provider_score + + ###################### + # PAYMENT MANAGEMENT def on_event(self, event: events.Event): if isinstance(event, events.ActivityEvent): self._agreement_activity_map[event.agreement.id].add(event.activity.id) From 3147e92aa44d08c5cfcb6431cbbbe7bfb570812d Mon Sep 17 00:00:00 2001 From: Jan Betley Date: Tue, 15 Mar 2022 14:01:32 +0100 Subject: [PATCH 5/9] minor improvements after the CR --- yapapi/contrib/strategy/rep_a1.py | 17 +++++++++++------ yapapi/engine.py | 2 +- yapapi/invoice_manager.py | 2 +- yapapi/strategy/base.py | 8 ++++---- 4 files changed, 17 insertions(+), 12 deletions(-) diff --git a/yapapi/contrib/strategy/rep_a1.py b/yapapi/contrib/strategy/rep_a1.py index 7b623cb99..cb1eb1b24 100644 --- a/yapapi/contrib/strategy/rep_a1.py +++ b/yapapi/contrib/strategy/rep_a1.py @@ -79,7 +79,8 @@ def _activity_failed(self, event: events.ActivityEvent, reason: str): self._failed_activities.add(activity_id) provider_name = event.provider_info.name - logger.warning("Disabling payments for activity %s on %s, reason %s", activity_id, provider_name, reason) + msg = "Activity %s on %s failed (reason: %s), we'll pay only the already accepted amount %s" + logger.warning(msg, activity_id, provider_name, reason, self._accepted_amounts[activity_id]) async def debit_note_accepted_amount(self, debit_note: "DebitNote") -> Decimal: activity_id = debit_note.activity_id @@ -94,11 +95,15 @@ async def invoice_accepted_amount(self, invoice: "Invoice") -> Decimal: agreement_id = invoice.agreement_id if self._agreement_has_failed_activity(agreement_id): accepted_amount = self._total_agreement_amount(agreement_id) - logger.warning("REJECTED INVOICE FOR %s, we accept only %s", invoice.amount, accepted_amount) - return accepted_amount - else: - logger.warning("ACCEPTED INVOICE FOR %s", invoice.amount) - return Decimal(invoice.amount) + + # NOTE: this will (currently) always be true for a failed activity, but there is no rule + # saying provider must send invoice for more than the accepted amount + if accepted_amount > Decimal(invoice.amount): + logger.warning("REJECTED INVOICE FOR %s, we accept only %s", invoice.amount, accepted_amount) + return accepted_amount + + logger.warning("ACCEPTED INVOICE FOR %s", invoice.amount) + return Decimal(invoice.amount) def _agreement_has_failed_activity(self, agreement_id: str) -> bool: return any(act in self._failed_activities for act in self._agreement_activity_map[agreement_id]) diff --git a/yapapi/engine.py b/yapapi/engine.py index 24a47ae75..28fa9879f 100644 --- a/yapapi/engine.py +++ b/yapapi/engine.py @@ -524,7 +524,7 @@ async def _process_debit_notes(self) -> None: try: allocation = self._get_allocation(debit_note) accepted_amount = await self._strategy.debit_note_accepted_amount(debit_note) - if accepted_amount == Decimal(debit_note.total_amount_due): + if accepted_amount >= Decimal(debit_note.total_amount_due): await debit_note.accept( amount=debit_note.total_amount_due, allocation=allocation ) diff --git a/yapapi/invoice_manager.py b/yapapi/invoice_manager.py index 4efa64e23..8b3c096cc 100644 --- a/yapapi/invoice_manager.py +++ b/yapapi/invoice_manager.py @@ -90,7 +90,7 @@ async def attempt_payment( try: allocation = get_allocation(invoice) accepted_amount = await get_accepted_amount(invoice) - if accepted_amount == Decimal(invoice.amount): + if accepted_amount >= Decimal(invoice.amount): await invoice.accept(amount=accepted_amount, allocation=allocation) ad.job.emit( events.InvoiceAccepted, diff --git a/yapapi/strategy/base.py b/yapapi/strategy/base.py index 8fb399e07..bd8fc6f18 100644 --- a/yapapi/strategy/base.py +++ b/yapapi/strategy/base.py @@ -91,16 +91,16 @@ async def respond_to_provider_offer( async def invoice_accepted_amount(self, invoice: rest.payment.Invoice) -> Decimal: """Return the amount we accept to pay for the invoice. - Current Golem implementation accepts the invoice if returned amount equals `invoice.amount` - and ignores the invoice if it is different. This will change in the future.""" + Current Golem Engine implementation accepts the invoice if returned amount is not lower than + `invoice.amount` and ignores the invoice otherwise. This will change in the future.""" raise NotImplementedError() @abc.abstractmethod async def debit_note_accepted_amount(self, debit_note: rest.payment.DebitNote) -> Decimal: """Return the amount we accept to pay for the debit note. - Current Golem implementation accepts the debit note if returned amount equals a - `debit_note.total_amount_due` and ignores the debit note if it is different. + Current Golem Engine implementation accepts the debit note if returned amount is not lower than + `debit_note.total_amount_due` and ignores the debit note otherwise. This will change in the future.""" raise NotImplementedError() From 2357158f29cf3968ed251347b96e0057ab56aaec Mon Sep 17 00:00:00 2001 From: Jan Betley Date: Wed, 30 Mar 2022 10:08:30 +0200 Subject: [PATCH 6/9] Reputation strategy consumes data from the reputation service --- yapapi/contrib/strategy/rep_a1.py | 33 +++++++++++++++++++++---------- 1 file changed, 23 insertions(+), 10 deletions(-) diff --git a/yapapi/contrib/strategy/rep_a1.py b/yapapi/contrib/strategy/rep_a1.py index cb1eb1b24..00faf8736 100644 --- a/yapapi/contrib/strategy/rep_a1.py +++ b/yapapi/contrib/strategy/rep_a1.py @@ -1,8 +1,9 @@ -from typing import Dict, Set, TYPE_CHECKING +from typing import Dict, Optional, Set, TYPE_CHECKING from decimal import Decimal from collections import defaultdict import logging -from random import gauss + +import aiohttp from yapapi.strategy import WrappingMarketStrategy from yapapi import events @@ -18,12 +19,21 @@ AgreementId = str -async def get_provider_score(provider_id: str) -> float: - # This is a mock. - # In the final (alpha1) version this will query the API for the provider score, - # but the score received will have the same "meaning", i.e.: how this provider - # differs from the average provider quality, expressed in the number of SDs. - return gauss(0, 1) +PROVIDER_STANDARD_SCORE_URL = "http://reputation.dev.golem.network/standard_score/provider/{}" + + +async def get_provider_standard_score(provider_id: str) -> Optional[float]: + url = PROVIDER_STANDARD_SCORE_URL.format(provider_id) + + try: + async with aiohttp.ClientSession() as session: + async with session.get(url) as response: + if response.status != 200: + return None + return (await response.json())['score'] + except aiohttp.client_exceptions.ClientError: + logger.exception("Reputation service is down") + return None class RepA1(WrappingMarketStrategy): @@ -38,7 +48,7 @@ def __init__(self, base_strategy): # OFFER SCORING async def score_offer(self, offer: "OfferProposal") -> float: offer_score = await super().score_offer(offer) - provider_score = await get_provider_score(offer.issuer) + provider_score = await get_provider_standard_score(offer.issuer) combined_score = self._final_score(offer_score, provider_score) provider_name = offer._proposal.proposal.properties['golem.node.id.name'] @@ -48,8 +58,11 @@ async def score_offer(self, offer: "OfferProposal") -> float: ) return combined_score - def _final_score(self, base_score: float, provider_score: float) -> float: + def _final_score(self, base_score: float, provider_score: Optional[float]) -> float: # NOTE: this logic is just a POC + if provider_score is None: + provider_score = 0 + if provider_score < -1.5: return -1 return base_score + provider_score From f9785f35648028f62d3ccd4b720d806d683efdc4 Mon Sep 17 00:00:00 2001 From: Jan Betley Date: Wed, 30 Mar 2022 10:10:22 +0200 Subject: [PATCH 7/9] Disabled github workflows (please revert this!) --- .github/workflows/build_publish.yml | 103 ----------------------- .github/workflows/goth-nightly.yml | 124 ---------------------------- .github/workflows/goth.yml | 86 ------------------- .github/workflows/tests.yml | 63 -------------- 4 files changed, 376 deletions(-) delete mode 100644 .github/workflows/build_publish.yml delete mode 100644 .github/workflows/goth-nightly.yml delete mode 100644 .github/workflows/goth.yml delete mode 100644 .github/workflows/tests.yml diff --git a/.github/workflows/build_publish.yml b/.github/workflows/build_publish.yml deleted file mode 100644 index 0ebbbecdf..000000000 --- a/.github/workflows/build_publish.yml +++ /dev/null @@ -1,103 +0,0 @@ -name: Build and publish the release - -on: - release: - types: [prereleased, released] - -jobs: - test: - name: Run checks - runs-on: ubuntu-latest - - steps: - - uses: actions/checkout@v2 - - uses: actions/setup-python@v1 - with: - python-version: 3.8 - - uses: Gr1N/setup-poetry@v4 - with: - poetry-version: 1.1.4 - - run: poetry install - - run: poetry run poe test - - run: poetry run poe typecheck - - run: poetry run poe codestyle - - run: poetry run poe liccheck - - build: - needs: [test] - name: Build the release - runs-on: ubuntu-latest - - steps: - - uses: actions/checkout@v2 - - uses: actions/setup-python@v1 - with: - python-version: 3.8 - - uses: Gr1N/setup-poetry@v4 - with: - poetry-version: 1.1.4 - - name: Get git release tag - run: echo "::set-output name=git-release-tag::yapapi $(git describe --tags)" - id: git_describe - - name: Get package version - run: echo "::set-output name=poetry-version::$(poetry version)" - id: poetry_version - - name: Fail on version mismatch - run: exit 1 - if: - ${{ steps.git_describe.outputs.git-release-tag != - steps.poetry_version.outputs.poetry-version }} - - name: Build the release - run: poetry build - - name: Store the built package - uses: actions/upload-artifact@v2 - with: - name: dist - path: dist - - test_publish: - needs: [build] - name: Publish the release to test.pypi - runs-on: ubuntu-latest - if: ${{ github.event.action == 'prereleased' }} - - steps: - - uses: actions/checkout@v2 - - uses: actions/setup-python@v1 - with: - python-version: 3.8 - - uses: Gr1N/setup-poetry@v4 - with: - poetry-version: 1.1.4 - - name: Retrieve the built package - uses: actions/download-artifact@v2 - with: - name: dist - path: dist - - name: Publish to pypi - run: | - poetry config repositories.testpypi https://test.pypi.org/legacy/ - poetry publish -r testpypi -u __token__ -p ${{ secrets.TESTPYPI_TOKEN }} - - publish: - needs: [build] - name: Publish the release - runs-on: ubuntu-latest - if: ${{ github.event.action == 'released' }} - - steps: - - uses: actions/checkout@v2 - - uses: actions/setup-python@v1 - with: - python-version: 3.8 - - uses: Gr1N/setup-poetry@v4 - with: - poetry-version: 1.1.4 - - name: Retrieve the built package - uses: actions/download-artifact@v2 - with: - name: dist - path: dist - - name: Publish to pypi - run: | - poetry publish -u __token__ -p ${{ secrets.PYPI_TOKEN }} diff --git a/.github/workflows/goth-nightly.yml b/.github/workflows/goth-nightly.yml deleted file mode 100644 index 497ebdd72..000000000 --- a/.github/workflows/goth-nightly.yml +++ /dev/null @@ -1,124 +0,0 @@ -name: Goth nightly - -on: - schedule: - # run this workflow every day at 1:00 AM UTC - - cron: '0 1 * * *' - -jobs: - prepare-matrix: - name: Prepare matrix JSON - runs-on: ubuntu-latest - outputs: - matrix-json: ${{ steps.get-matrix.outputs.matrix }} - steps: - - name: Checkout - uses: actions/checkout@v2 - with: - fetch-depth: 0 - - # finds branches with names matching the release branch convention (e.g. release/v0.7) and returns one with highest version - - name: Get latest stable branch - id: latest-stable - # second sed expression removes leading whitespaces and '*' characters (git uses it to indicate the current branch) - run: | - branch=$(git branch -a | sed -e 's:remotes/origin/::' -e 's:^[ \t*]*::' | grep -E '^b[0-9]+(\.[0-9]+)+$' | sort -Vr | head -1) - echo "::set-output name=branch::$branch" - - # prepares JSON object representing strategy matrix which contains two 'branch' variants: master and latest stable - - name: Get matrix JSON - id: get-matrix - run: echo "::set-output name=matrix::{\"include\":[{\"branch\":\"master\"},{\"branch\":\"${{ steps.latest-stable.outputs.branch }}\"}]}" - - goth-tests: - runs-on: goth - needs: prepare-matrix - strategy: - matrix: ${{ fromJson(needs.prepare-matrix.outputs.matrix-json) }} - fail-fast: false - name: Run integration tests (nightly) on ${{ matrix.branch }} - steps: - - name: Checkout - uses: actions/checkout@v2 - with: - ref: ${{ matrix.branch }} - - - name: Configure python - uses: actions/setup-python@v2 - with: - python-version: '3.8.0' - - - name: Configure poetry - uses: Gr1N/setup-poetry@v4 - with: - poetry-version: 1.1.6 - - - name: Install dependencies - run: | - poetry env use python3.8 - poetry install -E integration-tests - - - name: Install websocat - run: | - sudo wget https://github.com/vi/websocat/releases/download/v1.9.0/websocat_linux64 -O /usr/local/bin/websocat - sudo chmod +x /usr/local/bin/websocat - - - name: Disconnect Docker containers from default network - continue-on-error: true - # related to this issue: https://github.com/moby/moby/issues/23302 - run: | - docker network inspect docker_default - sudo apt-get install -y jq - docker network inspect docker_default | jq ".[0].Containers | map(.Name)[]" | tee /dev/stderr | xargs --max-args 1 -- docker network disconnect -f docker_default - - - name: Remove Docker containers - continue-on-error: true - run: docker rm -f $(docker ps -a -q) - - - name: Restart Docker daemon - # related to this issue: https://github.com/moby/moby/issues/23302 - run: sudo systemctl restart docker - - - name: Log in to GitHub Docker repository - run: echo ${{ secrets.GITHUB_TOKEN }} | docker login docker.pkg.github.com -u ${{github.actor}} --password-stdin - - - name: Run test suite - env: - GITHUB_API_TOKEN: ${{ secrets.GITHUB_TOKEN }} - run: | - poetry run poe goth-assets - poetry run poe goth-tests - - - name: Upload test logs - uses: actions/upload-artifact@v2 - if: always() - with: - name: goth-logs - path: /tmp/goth-tests - - # Only relevant for self-hosted runners - - name: Remove test logs - if: always() - run: rm -rf /tmp/goth-tests - - # Only relevant for self-hosted runners - - name: Remove poetry virtual env - if: always() - # Python version below should agree with the version set up by this job. - # In future we'll be able to use the `--all` flag here to remove envs for - # all Python versions (https://github.com/python-poetry/poetry/issues/3208). - run: poetry env remove python3.8 - - - name: Send Discord notification on failure - uses: Ilshidur/action-discord@0.3.2 - if: failure() - env: - BRANCH_NAME: ${{ matrix.branch }} - DISCORD_AVATAR: https://i.imgur.com/EOX16Mx.jpg - DISCORD_USERNAME: 'Goth night watch' - DISCORD_WEBHOOK: ${{ secrets.DISCORD_WEBHOOK }} - REPO_NAME: ${{ github.repository }} - WORKFLOW_URL: https://github.com/${{ github.repository }}/actions/runs/${{ github.run_id }} - with: - # <@&717623005911580713> = @sdk-integrations - args: '<@&717623005911580713> Goth nightly run failed for `{{ REPO_NAME }}` on branch `{{ BRANCH_NAME }}`! <{{ WORKFLOW_URL }}>' diff --git a/.github/workflows/goth.yml b/.github/workflows/goth.yml deleted file mode 100644 index 15d8023b2..000000000 --- a/.github/workflows/goth.yml +++ /dev/null @@ -1,86 +0,0 @@ -name: Goth (PR and push) - -on: - workflow_dispatch: - push: - branches: - - master - # - # put your branch name here to test it @ GH Actions - pull_request: - branches: - - master - - b0.* - -jobs: - goth-tests: - name: Run integration tests - runs-on: goth - steps: - - name: Checkout - uses: actions/checkout@v2 - - - name: Configure python - uses: actions/setup-python@v2 - with: - python-version: '3.8.0' - - - name: Configure poetry - uses: Gr1N/setup-poetry@v4 - with: - poetry-version: 1.1.6 - - - name: Install dependencies - run: | - poetry env use python3.8 - poetry install -E integration-tests - - - name: Install websocat - run: | - sudo wget https://github.com/vi/websocat/releases/download/v1.9.0/websocat_linux64 -O /usr/local/bin/websocat - sudo chmod +x /usr/local/bin/websocat - - - name: Disconnect Docker containers from default network - continue-on-error: true - # related to this issue: https://github.com/moby/moby/issues/23302 - run: | - docker network inspect docker_default - sudo apt-get install -y jq - docker network inspect docker_default | jq ".[0].Containers | map(.Name)[]" | tee /dev/stderr | xargs --max-args 1 -- docker network disconnect -f docker_default - - - name: Remove Docker containers - continue-on-error: true - run: docker rm -f $(docker ps -a -q) - - - name: Restart Docker daemon - # related to this issue: https://github.com/moby/moby/issues/23302 - run: sudo systemctl restart docker - - - name: Log in to GitHub Docker repository - run: echo ${{ secrets.GITHUB_TOKEN }} | docker login docker.pkg.github.com -u ${{github.actor}} --password-stdin - - - name: Run test suite - env: - GITHUB_API_TOKEN: ${{ secrets.GITHUB_TOKEN }} - run: | - poetry run poe goth-assets - poetry run poe goth-tests - - - name: Upload test logs - uses: actions/upload-artifact@v2 - if: always() - with: - name: goth-logs - path: /tmp/goth-tests - - # Only relevant for self-hosted runners - - name: Remove test logs - if: always() - run: rm -rf /tmp/goth-tests - - # Only relevant for self-hosted runners - - name: Remove poetry virtual env - if: always() - # Python version below should agree with the version set up by this job. - # In future we'll be able to use the `--all` flag here to remove envs for - # all Python versions (https://github.com/python-poetry/poetry/issues/3208). - run: poetry env remove python3.8 diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml deleted file mode 100644 index a3505ed93..000000000 --- a/.github/workflows/tests.yml +++ /dev/null @@ -1,63 +0,0 @@ -name: Continuous integration -on: - push: - branches: - - master - - b0.* - pull_request: - branches: - - master - - b0.* - -jobs: - test: - name: Run checks - runs-on: ${{ matrix.os }} - strategy: - matrix: - python-version: [3.6, 3.7, 3.8, 3.9] - os: - - ubuntu-latest - - macos-latest - - windows-latest - exclude: - - os: windows-latest - python-version: 3.9 - - os: windows-latest - python-version: 3.6 - - os: macos-latest - python-version: 3.9 - - os: macos-latest - python-version: 3.6 - fail-fast: false - - steps: - - uses: actions/checkout@v2 - - uses: actions/setup-python@v1 - with: - python-version: ${{ matrix.python-version }} - - uses: Gr1N/setup-poetry@v4 - with: - poetry-version: 1.1.4 - - - run: echo "::set-output name=ENABLE::1" - if: ${{ matrix.os == 'ubuntu-latest' }} - name: Enable extended checks - id: extended-checks - - run: echo "::set-output name=ENABLE::1" - if: ${{ matrix.os == 'ubuntu-latest' && matrix.python-version == '3.8' }} - name: Enable sphinx check - id: extended-checks-sphinx - - - run: poetry install - - run: poetry install -E docs - if: ${{ steps.extended-checks-sphinx.outputs.ENABLE }} - - - run: poetry run poe test - - run: poetry run poe typecheck - - run: poetry run poe codestyle - if: ${{ steps.extended-checks.outputs.ENABLE }} - - run: poetry run poe liccheck - if: ${{ steps.extended-checks.outputs.ENABLE }} - - run: poetry run poe sphinx -W - if: ${{ steps.extended-checks-sphinx.outputs.ENABLE }} From 876fe7c005366a5bc05c1ede6e296c42a49fbabb Mon Sep 17 00:00:00 2001 From: Jan Betley Date: Tue, 5 Apr 2022 13:34:14 +0200 Subject: [PATCH 8/9] Hotfix for InvoiceSettledEvent --- yapapi/rest/payment.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/yapapi/rest/payment.py b/yapapi/rest/payment.py index 296af8af9..34a829c71 100644 --- a/yapapi/rest/payment.py +++ b/yapapi/rest/payment.py @@ -225,16 +225,22 @@ async def fetch(init_ts: datetime): events = [] async with SuppressedExceptions(is_intermittent_error): events = await self._api.get_invoice_events(after_timestamp=ts) + + # TODO + # This `new_ts` thing is ugly, we don't want it. This is a temporary fix, waiting for + # https://github.com/golemfactory/yapapi/issues/921 + new_ts = False for ev in events: logger.debug("Received invoice event: %r, type: %s", ev, ev.__class__) if isinstance(ev, yap.InvoiceReceivedEvent): + new_ts = True ts = ev.event_date if not ev.invoice_id: logger.error("Empty invoice id in event: %r", ev) continue invoice = await self.invoice(ev.invoice_id) yield invoice - if not events: + if not events or not new_ts: await asyncio.sleep(1) return fetch(ts) From 97296e06a5942e44ffce3d02f8bca7694559f3f8 Mon Sep 17 00:00:00 2001 From: Jan Betley Date: Wed, 13 Apr 2022 14:06:16 +0200 Subject: [PATCH 9/9] few bugfixes, formatting etc --- yapapi/contrib/strategy/rep_a1.py | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/yapapi/contrib/strategy/rep_a1.py b/yapapi/contrib/strategy/rep_a1.py index 00faf8736..b79da8177 100644 --- a/yapapi/contrib/strategy/rep_a1.py +++ b/yapapi/contrib/strategy/rep_a1.py @@ -22,6 +22,11 @@ PROVIDER_STANDARD_SCORE_URL = "http://reputation.dev.golem.network/standard_score/provider/{}" +def log(msg, *args, **kwargs): + msg = "\033[94m" + msg + "\033[0m" + logger.info(msg, *args, **kwargs) + + async def get_provider_standard_score(provider_id: str) -> Optional[float]: url = PROVIDER_STANDARD_SCORE_URL.format(provider_id) @@ -30,7 +35,9 @@ async def get_provider_standard_score(provider_id: str) -> Optional[float]: async with session.get(url) as response: if response.status != 200: return None - return (await response.json())['score'] + score_str = (await response.json())['score'] + score = float(score_str) if score_str is not None else None + return score except aiohttp.client_exceptions.ClientError: logger.exception("Reputation service is down") return None @@ -52,7 +59,7 @@ async def score_offer(self, offer: "OfferProposal") -> float: combined_score = self._final_score(offer_score, provider_score) provider_name = offer._proposal.proposal.properties['golem.node.id.name'] - logger.info( + log( "Scored %s - base: %s, provider: %s, combined: %s", provider_name, offer_score, provider_score, combined_score ) @@ -84,7 +91,7 @@ def on_event(self, event: events.Event): prev_accepted_amount = self._accepted_amounts[activity_id] new_accepted_amount = max(prev_accepted_amount, Decimal(event.debit_note.total_amount_due)) self._accepted_amounts[activity_id] = new_accepted_amount - logger.warning("Accepted debit note for %s, total accepted amount: %s", activity_id, new_accepted_amount) + log("Accepted debit note, total accepted amount: %s", new_accepted_amount) def _activity_failed(self, event: events.ActivityEvent, reason: str): activity_id = event.activity.id @@ -92,8 +99,9 @@ def _activity_failed(self, event: events.ActivityEvent, reason: str): self._failed_activities.add(activity_id) provider_name = event.provider_info.name - msg = "Activity %s on %s failed (reason: %s), we'll pay only the already accepted amount %s" - logger.warning(msg, activity_id, provider_name, reason, self._accepted_amounts[activity_id]) + provider_id = event.agreement.details.raw_details.offer.provider_id + msg = "Activity on %s (%s) failed, refusing further debit notes/invoices" + log(msg, provider_name, provider_id) async def debit_note_accepted_amount(self, debit_note: "DebitNote") -> Decimal: activity_id = debit_note.activity_id @@ -111,11 +119,11 @@ async def invoice_accepted_amount(self, invoice: "Invoice") -> Decimal: # NOTE: this will (currently) always be true for a failed activity, but there is no rule # saying provider must send invoice for more than the accepted amount - if accepted_amount > Decimal(invoice.amount): - logger.warning("REJECTED INVOICE FOR %s, we accept only %s", invoice.amount, accepted_amount) + if accepted_amount < Decimal(invoice.amount): + log("REJECTED INVOICE FOR %s, we accept only %s", invoice.amount, accepted_amount) return accepted_amount - logger.warning("ACCEPTED INVOICE FOR %s", invoice.amount) + log("ACCEPTED INVOICE FOR %s", invoice.amount) return Decimal(invoice.amount) def _agreement_has_failed_activity(self, agreement_id: str) -> bool: