diff --git a/backend/core/src/core/services/nft.py b/backend/core/src/core/services/nft.py index cda7e076..4c9dd70a 100644 --- a/backend/core/src/core/services/nft.py +++ b/backend/core/src/core/services/nft.py @@ -1,4 +1,6 @@ import logging +from collections.abc import Sequence +from core.utils.misc import batched from pytonapi.schema.nft import NftItem as TONNftItem, NftItems from sqlalchemy import desc @@ -10,6 +12,7 @@ NftCollectionMetadataDTO, NftCollectionDTO, ) +from core.constants import DEFAULT_DB_QUERY_MAX_PARAMETERS_SIZE from core.services.base import BaseService @@ -164,3 +167,35 @@ def bulk_create_or_update( def count(self) -> int: return self.db_session.query(NftItem).count() + + def delete_missing(self, owner_address: str, keep_addresses: Sequence[str]) -> None: + """ + Deletes NFT Items for the given owner that are NOT in the keep_addresses list. + + :param owner_address: The address of the wallet owner + :param keep_addresses: List of NFT Item addresses to keep (active) + """ + # 1. Fetch all existing NFT Item addresses for this owner + existing_items_query = self.db_session.query(NftItem.address).filter( + NftItem.owner_address == owner_address + ) + existing_addresses = {nft_addr[0] for nft_addr in existing_items_query.all()} + + # 2. Calculate addresses to delete + keep_addresses_set = set(keep_addresses) + to_delete = list(existing_addresses - keep_addresses_set) + + if not to_delete: + return + + logger.info( + f"Deleting {len(to_delete)} stale NFT Items for owner {owner_address!r}" + ) + + # 3. Batch delete in chunks + for chunk in batched(to_delete, DEFAULT_DB_QUERY_MAX_PARAMETERS_SIZE): + self.db_session.query(NftItem).filter(NftItem.address.in_(chunk)).delete( + synchronize_session=False + ) + + self.db_session.flush() diff --git a/backend/core/src/core/services/wallet.py b/backend/core/src/core/services/wallet.py index 1f91bce4..b7ecdeab 100644 --- a/backend/core/src/core/services/wallet.py +++ b/backend/core/src/core/services/wallet.py @@ -1,5 +1,6 @@ import logging -from collections.abc import Generator +from collections.abc import Generator, Sequence +from core.utils.misc import batched from pytonapi.schema.jettons import JettonBalance, JettonsBalances from sqlalchemy import select @@ -274,3 +275,37 @@ def bulk_create_or_update( def count(self) -> int: return self.db_session.query(JettonWallet).count() + + def delete_missing(self, owner_address: str, keep_addresses: Sequence[str]) -> None: + """ + Deletes Jetton Wallets for the given owner that are NOT in the keep_addresses list. + + :param owner_address: The address of the wallet owner + :param keep_addresses: List of Jetton Wallet addresses to keep (active) + """ + # 1. Fetch all existing Jetton Wallet addresses for this owner + existing_wallets_query = self.db_session.query(JettonWallet.address).filter( + JettonWallet.owner_address == owner_address + ) + existing_addresses = { + wallet_addr[0] for wallet_addr in existing_wallets_query.all() + } + + # 2. Calculate addresses to delete + keep_addresses_set = set(keep_addresses) + to_delete = list(existing_addresses - keep_addresses_set) + + if not to_delete: + return + + logger.info( + f"Deleting {len(to_delete)} stale Jetton Wallets for owner {owner_address!r}" + ) + + # 3. Batch delete in chunks + for chunk in batched(to_delete, DEFAULT_DB_QUERY_MAX_PARAMETERS_SIZE): + self.db_session.query(JettonWallet).filter( + JettonWallet.address.in_(chunk) + ).delete(synchronize_session=False) + + self.db_session.flush() diff --git a/backend/indexer_blockchain/tasks.py b/backend/indexer_blockchain/tasks.py index 6136338b..2ea1695d 100644 --- a/backend/indexer_blockchain/tasks.py +++ b/backend/indexer_blockchain/tasks.py @@ -51,6 +51,23 @@ def fetch_wallet_details(address: str) -> None: blockchain_service.get_all_jetton_balances(address) ) + with DBService().db_session() as db_session: + # Pre-calculate whitelist collection addresses for NFT fetch + nft_collection_service = NftCollectionService(db_session) + whitelisted_nfts = nft_collection_service.get_whitelisted() + whitelist_collection_addresses = [ + collection.address for collection in whitelisted_nfts + ] + + # Perform async NFT fetch OUTSIDE the DB session + nft_items: NftItems = asyncio.run( + get_all_nfts_per_user( + blockchain_service=blockchain_service, + address=address, + nft_collections=whitelist_collection_addresses, + ) + ) + with DBService().db_session() as db_session: wallet_service = WalletService(db_session) wallet_service.set_balance( @@ -66,26 +83,21 @@ def fetch_wallet_details(address: str) -> None: jetton_wallet_service.bulk_create_or_update( jettons_balances, whitelisted_jettons, owner_address=address ) - logger.info(f"Jettons for {address!r} fetched.") + logger.info(f"Jettons for {address!r} updated.") - nft_collection_service = NftCollectionService(db_session) - whitelisted_nfts = nft_collection_service.get_whitelisted() - whitelist_collection_addresses = [ - collection.address for collection in whitelisted_nfts + active_jetton_wallets = [ + balance.wallet_address.address.to_raw() + for balance in jettons_balances.balances ] + jetton_wallet_service.delete_missing(address, active_jetton_wallets) - nft_items: NftItems = asyncio.run( - get_all_nfts_per_user( - blockchain_service=blockchain_service, - address=address, - nft_collections=whitelist_collection_addresses, - ) - ) - with DBService().db_session() as db_session: nft_service = NftItemService(db_session) nft_service.bulk_create_or_update(nft_items, whitelist_collection_addresses) - logger.info(f"NFT items for {address!r} fetched.") + active_nft_items = [item.address.to_raw() for item in nft_items.nft_items] + nft_service.delete_missing(address, active_nft_items) + + logger.info(f"NFT items for {address!r} updated.") redis_service = RedisService() redis_service.add_to_set(UPDATED_WALLETS_SET_NAME, address) diff --git a/backend/tests/factories/nft.py b/backend/tests/factories/nft.py index 82004c4b..d86970e1 100644 --- a/backend/tests/factories/nft.py +++ b/backend/tests/factories/nft.py @@ -1,5 +1,5 @@ import factory -from core.models.blockchain import NFTCollection +from core.models.blockchain import NFTCollection, NftItem from tests.factories.base import BaseSQLAlchemyModelFactory @@ -12,3 +12,15 @@ class Meta: name = factory.Faker("word") description = factory.Faker("text") is_enabled = True + + +class NftItemFactory(BaseSQLAlchemyModelFactory): + class Meta: + model = NftItem + sqlalchemy_session_persistence = "flush" + exclude = ("collection",) + + address = factory.Faker("pystr", min_chars=65, max_chars=65, prefix="0:") + owner_address = factory.Faker("pystr", min_chars=65, max_chars=65, prefix="0:") + collection_address = factory.SelfAttribute("collection.address") + collection = factory.SubFactory(NFTCollectionFactory) diff --git a/backend/tests/factories/wallet.py b/backend/tests/factories/wallet.py new file mode 100644 index 00000000..1da4e1f4 --- /dev/null +++ b/backend/tests/factories/wallet.py @@ -0,0 +1,29 @@ +import factory +from core.models.wallet import UserWallet, JettonWallet +from tests.factories.base import BaseSQLAlchemyModelFactory +from tests.factories.jetton import JettonFactory +from tests.factories.user import UserFactory + + +class UserWalletFactory(BaseSQLAlchemyModelFactory): + class Meta: + model = UserWallet + sqlalchemy_session_persistence = "flush" + + address = factory.Faker("pystr", min_chars=65, max_chars=65, prefix="0:") + user = factory.SubFactory(UserFactory) + balance = factory.Faker("random_int", min=0, max=1000000000) + hide_wallet = False + + +class JettonWalletFactory(BaseSQLAlchemyModelFactory): + class Meta: + model = JettonWallet + sqlalchemy_session_persistence = "flush" + exclude = ("jetton",) + + address = factory.Faker("pystr", min_chars=65, max_chars=65, prefix="0:") + jetton_master_address = factory.SelfAttribute("jetton.address") + jetton = factory.SubFactory(JettonFactory) + owner_address = factory.Faker("pystr", min_chars=65, max_chars=65, prefix="0:") + balance = factory.Faker("random_int", min=0, max=1000000) diff --git a/backend/tests/unit/core/services/test_sync_logic.py b/backend/tests/unit/core/services/test_sync_logic.py new file mode 100644 index 00000000..500385a6 --- /dev/null +++ b/backend/tests/unit/core/services/test_sync_logic.py @@ -0,0 +1,113 @@ +import pytest +from sqlalchemy.orm import Session + +from core.models.blockchain import NftItem +from core.models.wallet import UserWallet, JettonWallet +from core.models.user import User +from core.services.nft import NftItemService +from core.services.wallet import JettonWalletService +from tests.factories.jetton import JettonFactory +from tests.factories.nft import NFTCollectionFactory, NftItemFactory +from tests.factories.user import UserFactory +from tests.factories.wallet import UserWalletFactory, JettonWalletFactory + + +@pytest.mark.usefixtures("db_session") +class TestSyncLogic: + def test_jetton_wallet_delete_missing(self, db_session: Session): + wallet = UserWalletFactory.with_session(db_session).create( + address="0:wallet_address_123" + ) + service = JettonWalletService(db_session) + + # Create Jettons + jetton1 = JettonFactory.with_session(db_session).create() + jetton2 = JettonFactory.with_session(db_session).create() + + # Create Wallets: 1 and 2 exist + jw1 = JettonWalletFactory.with_session(db_session).create( + owner_address=wallet.address, + jetton=jetton1, + ) + jw2 = JettonWalletFactory.with_session(db_session).create( + owner_address=wallet.address, + jetton=jetton2, + ) + + # Scenario: indexer returns only jw1 (so jw2 should be deleted) + keep_addresses = [jw1.address] + + service.delete_missing(wallet.address, keep_addresses) + + # Verify jw1 still exists + assert ( + db_session.query(JettonWallet).filter_by(address=jw1.address).one_or_none() + is not None + ) + # Verify jw2 is deleted + assert ( + db_session.query(JettonWallet).filter_by(address=jw2.address).one_or_none() + is None + ) + + def test_nft_item_delete_missing(self, db_session: Session): + wallet = UserWalletFactory.with_session(db_session).create( + address="0:wallet_address_123" + ) + service = NftItemService(db_session) + + # Create Collection + collection = NFTCollectionFactory.with_session(db_session).create() + + # Create NFT Items: 1 and 2 exist + # Use simpler unique addresses respecting 67 char limit (0: + 64 chars) + addr1 = "0:" + "1" * 64 + addr2 = "0:" + "2" * 64 + + nft1 = NftItemFactory.with_session(db_session).create( + owner_address=wallet.address, + collection=collection, + address=addr1, + ) + nft2 = NftItemFactory.with_session(db_session).create( + owner_address=wallet.address, + collection=collection, + address=addr2, + ) + + # Scenario: indexer returns only nft1 (so nft2 should be deleted) + keep_addresses = [nft1.address] + + service.delete_missing(wallet.address, keep_addresses) + + # Verify nft1 still exists + assert ( + db_session.query(NftItem).filter_by(address=nft1.address).one_or_none() + is not None + ) + # Verify nft2 is deleted + assert ( + db_session.query(NftItem).filter_by(address=nft2.address).one_or_none() + is None + ) + + def test_delete_missing_empty_keep_list(self, db_session: Session): + """Test that passing an empty list deletes ALL items for that user.""" + wallet = UserWalletFactory.with_session(db_session).create( + address="0:wallet_address_123" + ) + service = JettonWalletService(db_session) + + jetton1 = JettonFactory.with_session(db_session).create() + jw1 = JettonWalletFactory.with_session(db_session).create( + owner_address=wallet.address, + jetton=jetton1, + ) + + # Empty keep list + service.delete_missing(wallet.address, []) + + assert ( + db_session.query(JettonWallet).filter_by(address=jw1.address).one_or_none() + is None + )