diff --git a/src/Makefile.am b/src/Makefile.am index 7b5c4ca68402..ea4f542e501c 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -287,7 +287,6 @@ BITCOIN_CORE_H = \ masternode/node.h \ masternode/meta.h \ masternode/payments.h \ - masternode/sync.h \ masternode/utils.h \ memusage.h \ merkleblock.h \ @@ -296,6 +295,9 @@ BITCOIN_CORE_H = \ net.h \ net_permissions.h \ net_processing.h \ + net_governance.h \ + net_instantsend.h \ + net_signing.h \ net_types.h \ netaddress.h \ netbase.h \ @@ -313,6 +315,7 @@ BITCOIN_CORE_H = \ node/miner.h \ node/minisketchwrapper.h \ node/psbt.h \ + node/sync.h \ node/transaction.h \ node/txreconciliation.h \ node/interface_ui.h \ @@ -551,12 +554,14 @@ libbitcoin_node_a_SOURCES = \ masternode/node.cpp \ masternode/meta.cpp \ masternode/payments.cpp \ - masternode/sync.cpp \ masternode/utils.cpp \ net.cpp \ netfulfilledman.cpp \ netgroup.cpp \ net_processing.cpp \ + net_governance.cpp \ + net_instantsend.cpp \ + net_signing.cpp \ node/blockstorage.cpp \ node/caches.cpp \ node/chainstate.cpp \ @@ -569,6 +574,7 @@ libbitcoin_node_a_SOURCES = \ node/miner.cpp \ node/minisketchwrapper.cpp \ node/psbt.cpp \ + node/sync.cpp \ node/transaction.cpp \ node/txreconciliation.cpp \ node/interface_ui.cpp \ diff --git a/src/chainlock/chainlock.cpp b/src/chainlock/chainlock.cpp index bb4380170cf0..d794f8f19806 100644 --- a/src/chainlock/chainlock.cpp +++ b/src/chainlock/chainlock.cpp @@ -18,7 +18,7 @@ #include #include -#include +#include #include #include diff --git a/src/chainlock/signing.cpp b/src/chainlock/signing.cpp index 9b574d5cc6ee..0d3b432ff9f8 100644 --- a/src/chainlock/signing.cpp +++ b/src/chainlock/signing.cpp @@ -10,7 +10,7 @@ #include #include #include -#include +#include #include using node::ReadBlockFromDisk; @@ -141,7 +141,7 @@ void ChainLockSigner::TrySignChainTip(const llmq::CInstantSendManager& isman) lastSignedMsgHash = msgHash; } - m_shareman.AsyncSignIfMember(Params().GetConsensus().llmqTypeChainLocks, m_sigman, requestId, msgHash); + m_shareman.AsyncSignIfMember(Params().GetConsensus().llmqTypeChainLocks, requestId, msgHash); } void ChainLockSigner::EraseFromBlockHashTxidMap(const uint256& hash) diff --git a/src/coinjoin/client.cpp b/src/coinjoin/client.cpp index 88d5ee4f8187..85dffb22d0c2 100644 --- a/src/coinjoin/client.cpp +++ b/src/coinjoin/client.cpp @@ -10,7 +10,7 @@ #include #include #include -#include +#include #include #include #include diff --git a/src/coinjoin/coinjoin.cpp b/src/coinjoin/coinjoin.cpp index d48239927f1a..6c8aec95ab6c 100644 --- a/src/coinjoin/coinjoin.cpp +++ b/src/coinjoin/coinjoin.cpp @@ -16,7 +16,7 @@ #include #include #include -#include +#include #include diff --git a/src/coinjoin/server.cpp b/src/coinjoin/server.cpp index 6f7169af7486..f28171c70527 100644 --- a/src/coinjoin/server.cpp +++ b/src/coinjoin/server.cpp @@ -8,7 +8,7 @@ #include #include #include -#include +#include #include #include #include diff --git a/src/dsnotificationinterface.cpp b/src/dsnotificationinterface.cpp index 1a1faa969563..71f8d51e72c1 100644 --- a/src/dsnotificationinterface.cpp +++ b/src/dsnotificationinterface.cpp @@ -18,7 +18,7 @@ #include #include #include -#include +#include CDSNotificationInterface::CDSNotificationInterface(CConnman& connman, CDSTXManager& dstxman, diff --git a/src/evo/mnauth.cpp b/src/evo/mnauth.cpp index 376c96ecfaf4..fcc3f0de8e11 100644 --- a/src/evo/mnauth.cpp +++ b/src/evo/mnauth.cpp @@ -10,7 +10,7 @@ #include #include #include -#include +#include #include #include #include diff --git a/src/governance/governance.cpp b/src/governance/governance.cpp index 9d193de7036c..e79fb6da0685 100644 --- a/src/governance/governance.cpp +++ b/src/governance/governance.cpp @@ -14,18 +14,18 @@ #include #include #include -#include -#include +#include #include #include #include #include #include #include +#include #include #include #include -#include +#include int nSubmittedFinalBudget; @@ -91,35 +91,6 @@ CGovernanceManager::~CGovernanceManager() m_db->Store(*this); } -void CGovernanceManager::Schedule(CScheduler& scheduler, CConnman& connman, PeerManager& peerman) -{ - assert(IsValid()); - - scheduler.scheduleEvery( - [this, &connman]() -> void { - if (!m_mn_sync.IsSynced()) return; - - // CHECK OBJECTS WE'VE ASKED FOR, REMOVE OLD ENTRIES - CleanOrphanObjects(); - RequestOrphanObjects(connman); - - // CHECK AND REMOVE - REPROCESS GOVERNANCE OBJECTS - CheckAndRemove(); - }, - std::chrono::minutes{5}); - - scheduler.scheduleEvery( - [this, &peerman]() -> void { - LOCK(cs_relay); - for (const auto& inv : m_relay_invs) { - peerman.RelayInv(inv); - } - m_relay_invs.clear(); - }, - // Tests need tighter timings to avoid timeouts, use more relaxed pacing otherwise - Params().IsMockableChain() ? std::chrono::seconds{1} : std::chrono::seconds{5}); -} - bool CGovernanceManager::LoadCache(bool load_cache) { assert(m_db != nullptr); @@ -562,6 +533,19 @@ void CGovernanceManager::CheckAndRemove() ToString(), m_requested_hash_time.size()); } +CDeterministicMNManager& CGovernanceManager::GetMNManager() +{ + return *Assert(m_dmnman); +} + +std::vector CGovernanceManager::FetchRelayInventory() +{ + std::vector ret; + LOCK(cs_relay); + swap(ret, m_relay_invs); + return ret; +} + const CGovernanceObject* CGovernanceManager::FindConstGovernanceObject(const uint256& nHash) const { AssertLockHeld(cs); @@ -572,18 +556,14 @@ const CGovernanceObject* CGovernanceManager::FindConstGovernanceObject(const uin return nullptr; } -CGovernanceObject* CGovernanceManager::FindGovernanceObject(const uint256& nHash) +CGovernanceObject* CGovernanceManager::FindGovernanceObject(const uint256& hash) { AssertLockNotHeld(cs); LOCK(cs); - return FindGovernanceObjectInternal(nHash); -} + auto it = mapObjects.find(hash); + if (it == mapObjects.end()) return nullptr; -CGovernanceObject* CGovernanceManager::FindGovernanceObjectInternal(const uint256& nHash) -{ - AssertLockHeld(cs); - if (mapObjects.count(nHash)) return &mapObjects[nHash]; - return nullptr; + return &it->second; } CGovernanceObject* CGovernanceManager::FindGovernanceObjectByDataHash(const uint256 &nDataHash) @@ -616,26 +596,26 @@ std::vector CGovernanceManager::GetCurrentVotes(const uint256& const CGovernanceObject& govobj = it->second; const auto tip_mn_list = Assert(m_dmnman)->GetListAtChainTip(); - std::map mapMasternodes; + std::set masternodes; if (mnCollateralOutpointFilter.IsNull()) { - tip_mn_list.ForEachMNShared(false, [&](const CDeterministicMNCPtr& dmn) { - mapMasternodes.emplace(dmn->collateralOutpoint, dmn); + tip_mn_list.ForEachMN(false, [&](const CDeterministicMN& mn) { + masternodes.emplace(mn.collateralOutpoint); }); } else { auto dmn = tip_mn_list.GetMNByCollateral(mnCollateralOutpointFilter); if (dmn) { - mapMasternodes.emplace(dmn->collateralOutpoint, dmn); + masternodes.insert(mnCollateralOutpointFilter); } } // Loop through each MN collateral outpoint and get the votes for the `nParentHash` governance object - for (const auto& mnpair : mapMasternodes) { + for (const auto& collateral : masternodes) { // get a vote_rec_t from the govobj vote_rec_t voteRecord; - if (!govobj.GetCurrentMNVotes(mnpair.first, voteRecord)) continue; + if (!govobj.GetCurrentMNVotes(collateral, voteRecord)) continue; for (const auto& [signal, vote_instance] : voteRecord.mapInstances) { - CGovernanceVote vote = CGovernanceVote(mnpair.first, nParentHash, (vote_signal_enum_t)signal, + CGovernanceVote vote = CGovernanceVote(collateral, nParentHash, (vote_signal_enum_t)signal, vote_instance.eOutcome); vote.SetTime(vote_instance.nCreationTime); vecResult.push_back(vote); @@ -758,7 +738,7 @@ MessageProcessingResult CGovernanceManager::SyncSingleObjVotes(CNode& peer, cons } CNetMsgMaker msgMaker(peer.GetCommonVersion()); - connman.PushMessage(&peer, msgMaker.Make(NetMsgType::SYNCSTATUSCOUNT, MASTERNODE_SYNC_GOVOBJ_VOTE, + connman.PushMessage(&peer, msgMaker.Make(NetMsgType::SYNCSTATUSCOUNT, NODE_SYNC_GOVOBJ_VOTE, static_cast(ret.m_inventory.size()))); LogPrint(BCLog::GOBJECT, "CGovernanceManager::%s -- sent %d votes to peer=%d\n", __func__, ret.m_inventory.size(), peer.GetId()); @@ -806,7 +786,7 @@ MessageProcessingResult CGovernanceManager::SyncObjects(CNode& peer, CConnman& c } CNetMsgMaker msgMaker(peer.GetCommonVersion()); - connman.PushMessage(&peer, msgMaker.Make(NetMsgType::SYNCSTATUSCOUNT, MASTERNODE_SYNC_GOVOBJ, + connman.PushMessage(&peer, msgMaker.Make(NetMsgType::SYNCSTATUSCOUNT, NODE_SYNC_GOVOBJ, static_cast(ret.m_inventory.size()))); LogPrint(BCLog::GOBJECT, "CGovernanceManager::%s -- sent %d objects to peer=%d\n", __func__, ret.m_inventory.size(), peer.GetId()); @@ -1073,49 +1053,16 @@ void CGovernanceManager::AddInvalidVote(const CGovernanceVote& vote) cmapInvalidVotes.Insert(vote.GetHash(), vote); } -int CGovernanceManager::RequestGovernanceObjectVotes(CNode& peer, CConnman& connman, const PeerManager& peerman) const +std::pair, std::vector> CGovernanceManager::PrepareVotesToRequest(const std::vector& vNodesCopy, std::map>& mapAskedRecently, int64_t nNow, size_t peers_per_hash_max) const { - const std::vector vNodeCopy{&peer}; - return RequestGovernanceObjectVotes(vNodeCopy, connman, peerman); -} - -int CGovernanceManager::RequestGovernanceObjectVotes(const std::vector& vNodesCopy, CConnman& connman, - const PeerManager& peerman) const -{ - static std::map > mapAskedRecently; - // Maximum number of nodes to request votes from for the same object hash on real networks - // (mainnet, testnet, devnets). Keep this low to avoid unnecessary bandwidth usage. - static constexpr size_t REALNET_PEERS_PER_HASH{3}; - // Maximum number of nodes to request votes from for the same object hash on regtest. - // During testing, nodes are isolated to create conflicting triggers. Using the real - // networks limit of 3 nodes often results in querying only "non-isolated" nodes, missing the - // isolated ones we need to test. This high limit ensures all available nodes are queried. - static constexpr size_t REGTEST_PEERS_PER_HASH{std::numeric_limits::max()}; - - if (vNodesCopy.empty()) return -1; - - int64_t nNow = GetTime(); - int nTimeout = 60 * 60; - size_t nPeersPerHashMax = Params().IsMockableChain() ? REGTEST_PEERS_PER_HASH : REALNET_PEERS_PER_HASH; - - std::vector vTriggerObjHashes; - std::vector vOtherObjHashes; - - // This should help us to get some idea about an impact this can bring once deployed on mainnet. - // Testnet is ~40 times smaller in masternode count, but only ~1000 masternodes usually vote, - // so 1 obj on mainnet == ~10 objs or ~1000 votes on testnet. However we want to test a higher - // number of votes to make sure it's robust enough, so aim at 2000 votes per masternode per request. - // On mainnet nMaxObjRequestsPerNode is always set to 1. - int nMaxObjRequestsPerNode = 1; - size_t nProjectedVotes = 2000; - if (Params().NetworkIDString() != CBaseChainParams::MAIN) { - nMaxObjRequestsPerNode = std::max(1, int(nProjectedVotes / std::max(1, (int)Assert(m_dmnman)->GetListAtChainTip().GetValidMNsCount()))); - } + std::pair, std::vector> ret; + std::vector &vTriggerObjHashes = ret.first; + std::vector &vOtherObjHashes = ret.second; { LOCK(cs); - if (mapObjects.empty()) return -2; + if (mapObjects.empty()) return ret; for (const auto& [nHash, govobj] : mapObjects) { if (govobj.IsSetCachedDelete()) continue; @@ -1128,7 +1075,7 @@ int CGovernanceManager::RequestGovernanceObjectVotes(const std::vector& ++it; } } - if (mapAskedRecently[nHash].size() >= nPeersPerHashMax) continue; + if (mapAskedRecently[nHash].size() >= peers_per_hash_max) continue; } if (govobj.GetObjectType() == GovernanceObject::TRIGGER) { @@ -1139,55 +1086,13 @@ int CGovernanceManager::RequestGovernanceObjectVotes(const std::vector& } } - LogPrint(BCLog::GOBJECT, "CGovernanceManager::RequestGovernanceObjectVotes -- start: vTriggerObjHashes %d vOtherObjHashes %d mapAskedRecently %d\n", + LogPrint(BCLog::GOBJECT, "CGovernanceManager::PrepareVotesToRequest -- start: vTriggerObjHashes %d vOtherObjHashes %d mapAskedRecently %d\n", vTriggerObjHashes.size(), vOtherObjHashes.size(), mapAskedRecently.size()); Shuffle(vTriggerObjHashes.begin(), vTriggerObjHashes.end(), FastRandomContext()); Shuffle(vOtherObjHashes.begin(), vOtherObjHashes.end(), FastRandomContext()); - for (int i = 0; i < nMaxObjRequestsPerNode; ++i) { - uint256 nHashGovobj; - - // ask for triggers first - if (!vTriggerObjHashes.empty()) { - nHashGovobj = vTriggerObjHashes.back(); - } else { - if (vOtherObjHashes.empty()) break; - nHashGovobj = vOtherObjHashes.back(); - } - bool fAsked = false; - for (const auto& pnode : vNodesCopy) { - // Don't try to sync any data from outbound non-relay "masternode" connections. - // Inbound connection this early is most likely a "masternode" connection - // initiated from another node, so skip it too. - if (!pnode->CanRelay() || (connman.IsActiveMasternode() && pnode->IsInboundConn())) continue; - // stop early to prevent setAskFor overflow - { - LOCK(::cs_main); - size_t nProjectedSize = peerman.GetRequestedObjectCount(pnode->GetId()) + nProjectedVotes; - if (nProjectedSize > MAX_INV_SZ) continue; - // to early to ask the same node - if (mapAskedRecently[nHashGovobj].count(pnode->addr)) continue; - } - - RequestGovernanceObject(pnode, nHashGovobj, connman, true); - mapAskedRecently[nHashGovobj][pnode->addr] = nNow + nTimeout; - fAsked = true; - // stop loop if max number of peers per obj was asked - if (mapAskedRecently[nHashGovobj].size() >= nPeersPerHashMax) break; - } - // NOTE: this should match `if` above (the one before `while`) - if (!vTriggerObjHashes.empty()) { - vTriggerObjHashes.pop_back(); - } else { - vOtherObjHashes.pop_back(); - } - if (!fAsked) i--; - } - LogPrint(BCLog::GOBJECT, "CGovernanceManager::RequestGovernanceObjectVotes -- end: vTriggerObjHashes %d vOtherObjHashes %d mapAskedRecently %d\n", - vTriggerObjHashes.size(), vOtherObjHashes.size(), mapAskedRecently.size()); - - return int(vTriggerObjHashes.size() + vOtherObjHashes.size()); + return ret; } bool CGovernanceManager::AcceptMessage(const uint256& nHash) @@ -1457,11 +1362,11 @@ bool CGovernanceManager::AddNewTrigger(uint256 nHash) CSuperblock_sptr pSuperblock; try { - const CGovernanceObject* pGovObj = FindGovernanceObjectInternal(nHash); - if (!pGovObj) { + const auto& gov_obj = mapObjects.find(nHash); + if (gov_obj == mapObjects.end()) { throw std::runtime_error("CSuperblock: Failed to find Governance Object"); } - pSuperblock = std::make_shared(*pGovObj, nHash); + pSuperblock = std::make_shared(gov_obj->second, nHash); } catch (std::exception& e) { LogPrintf("CGovernanceManager::%s -- Error creating superblock: %s\n", __func__, e.what()); return false; @@ -1493,16 +1398,18 @@ void CGovernanceManager::CleanAndRemoveTriggers() auto it = mapTrigger.begin(); while (it != mapTrigger.end()) { bool remove = false; - CGovernanceObject* pObj = nullptr; + CGovernanceObject* pObj{nullptr}; const CSuperblock_sptr& pSuperblock = it->second; if (!pSuperblock) { LogPrint(BCLog::GOBJECT, "CGovernanceManager::%s -- nullptr superblock\n", __func__); remove = true; } else { - pObj = FindGovernanceObjectInternal(it->first); - if (!pObj || pObj->GetObjectType() != GovernanceObject::TRIGGER) { + auto gov_obj_it = mapObjects.find(it->first); + if (gov_obj_it == mapObjects.end() || gov_obj_it->second.GetObjectType() != GovernanceObject::TRIGGER) { LogPrint(BCLog::GOBJECT, "CGovernanceManager::%s -- Unknown or non-trigger superblock\n", __func__); pSuperblock->SetStatus(SeenObjectStatus::ErrorInvalid); + } else { + pObj = &gov_obj_it->second; } LogPrint(BCLog::GOBJECT, "CGovernanceManager::%s -- superblock status = %d\n", __func__, @@ -1566,8 +1473,7 @@ std::vector CGovernanceManager::GetActiveTriggersInternal() co // LOOK AT THESE OBJECTS AND COMPILE A VALID LIST OF TRIGGERS for (const auto& pair : mapTrigger) { - const CGovernanceObject* pObj = FindConstGovernanceObject(pair.first); - if (pObj) { + if (mapObjects.find(pair.first) != mapObjects.end()) { vecResults.push_back(pair.second); } } @@ -1594,13 +1500,12 @@ bool CGovernanceManager::IsSuperblockTriggered(const CDeterministicMNList& tip_m continue; } - CGovernanceObject* pObj = FindGovernanceObjectInternal(pSuperblock->GetGovernanceObjHash()); - if (!pObj) { - LogPrintf("IsSuperblockTriggered -- pObj == nullptr, continuing\n"); + auto gov_obj_it = mapObjects.find(pSuperblock->GetGovernanceObjHash()); + if (gov_obj_it == mapObjects.end()) { + LogPrintf("IsSuperblockTriggered -- superblock is in mapObject, continuing\n"); continue; } - - LogPrint(BCLog::GOBJECT, "IsSuperblockTriggered -- data = %s\n", pObj->GetDataAsPlainString()); + LogPrint(BCLog::GOBJECT, "IsSuperblockTriggered -- data = %s\n", gov_obj_it->second.GetDataAsPlainString()); // note : 12.1 - is epoch calculation correct? @@ -1614,9 +1519,9 @@ bool CGovernanceManager::IsSuperblockTriggered(const CDeterministicMNList& tip_m // MAKE SURE THIS TRIGGER IS ACTIVE VIA FUNDING CACHE FLAG - pObj->UpdateSentinelVariables(tip_mn_list); + gov_obj_it->second.UpdateSentinelVariables(tip_mn_list); - if (pObj->IsSetCachedFunding()) { + if (gov_obj_it->second.IsSetCachedFunding()) { LogPrint(BCLog::GOBJECT, "IsSuperblockTriggered -- fCacheFunding = true, returning true\n"); return true; } else { @@ -1651,17 +1556,15 @@ bool CGovernanceManager::GetBestSuperblockInternal(const CDeterministicMNList& t continue; } - const CGovernanceObject* pObj = FindGovernanceObjectInternal(pSuperblock->GetGovernanceObjHash()); - if (!pObj) { - continue; - } + if (const auto& gov_obj_it = mapObjects.find(pSuperblock->GetGovernanceObjHash()); + gov_obj_it != mapObjects.end()) { + // DO WE HAVE A NEW WINNER? - // DO WE HAVE A NEW WINNER? - - int nTempYesCount = pObj->GetAbsoluteYesCount(tip_mn_list, VOTE_SIGNAL_FUNDING); - if (nTempYesCount > nYesCount) { - nYesCount = nTempYesCount; - pSuperblockRet = pSuperblock; + int nTempYesCount = gov_obj_it->second.GetAbsoluteYesCount(tip_mn_list, VOTE_SIGNAL_FUNDING); + if (nTempYesCount > nYesCount) { + nYesCount = nTempYesCount; + pSuperblockRet = pSuperblock; + } } } diff --git a/src/governance/governance.h b/src/governance/governance.h index 8035985fdad6..a1da1f211aed 100644 --- a/src/governance/governance.h +++ b/src/governance/governance.h @@ -32,8 +32,7 @@ template class CFlatDB; class CInv; class CNode; -class CScheduler; -class PeerManager; +class CService; class CDeterministicMNList; class CDeterministicMNManager; @@ -278,8 +277,6 @@ class CGovernanceManager : public GovernanceStore, public GovernanceSignerParent const std::unique_ptr& dmnman, CMasternodeSync& mn_sync); ~CGovernanceManager(); - void Schedule(CScheduler& scheduler, CConnman& connman, PeerManager& peerman); - bool LoadCache(bool load_cache); bool IsValid() const override { return is_valid; } @@ -312,8 +309,6 @@ class CGovernanceManager : public GovernanceStore, public GovernanceSignerParent void AddGovernanceObject(CGovernanceObject& govobj, const CNode* pfrom = nullptr) override EXCLUSIVE_LOCKS_REQUIRED(!cs_relay); - void CheckAndRemove(); - UniValue ToJson() const; void UpdatedBlockTip(const CBlockIndex* pindex) EXCLUSIVE_LOCKS_REQUIRED(!cs_relay); @@ -354,9 +349,7 @@ class CGovernanceManager : public GovernanceStore, public GovernanceSignerParent void InitOnLoad(); - int RequestGovernanceObjectVotes(CNode& peer, CConnman& connman, const PeerManager& peerman) const; - int RequestGovernanceObjectVotes(const std::vector& vNodesCopy, CConnman& connman, - const PeerManager& peerman) const; + std::pair, std::vector> PrepareVotesToRequest(const std::vector& vNodesCopy, std::map>& mapAskedRecently, int64_t now, size_t peers_per_hash_max) const; /* * Trigger Management (formerly CGovernanceTriggerManager) @@ -393,17 +386,22 @@ class CGovernanceManager : public GovernanceStore, public GovernanceSignerParent std::vector> GetApprovedProposals(const CDeterministicMNList& tip_mn_list) override EXCLUSIVE_LOCKS_REQUIRED(!cs); + // used by NetGovernance + void RequestOrphanObjects(CConnman& connman); + void CleanOrphanObjects(); + void CheckAndRemove(); + CDeterministicMNManager& GetMNManager(); + //! This method clears internal data structure and returns a copy + std::vector FetchRelayInventory() EXCLUSIVE_LOCKS_REQUIRED(!cs_relay); + void RequestGovernanceObject(CNode* pfrom, const uint256& nHash, CConnman& connman, bool fUseFilter = false) const; private: //! Internal functions that require locks to be held - CGovernanceObject* FindGovernanceObjectInternal(const uint256& nHash) EXCLUSIVE_LOCKS_REQUIRED(cs); std::vector> GetActiveTriggersInternal() const EXCLUSIVE_LOCKS_REQUIRED(cs); bool GetBestSuperblockInternal(const CDeterministicMNList& tip_mn_list, CSuperblock_sptr& pSuperblockRet, int nBlockHeight) EXCLUSIVE_LOCKS_REQUIRED(cs); void ExecuteBestSuperblock(const CDeterministicMNList& tip_mn_list, int nBlockHeight); - void RequestGovernanceObject(CNode* pfrom, const uint256& nHash, CConnman& connman, bool fUseFilter = false) const; - void AddInvalidVote(const CGovernanceVote& vote); bool ProcessVote(CNode* pfrom, const CGovernanceVote& vote, CGovernanceException& exception, CConnman& connman); @@ -417,10 +415,6 @@ class CGovernanceManager : public GovernanceStore, public GovernanceSignerParent void AddCachedTriggers(); - void RequestOrphanObjects(CConnman& connman); - - void CleanOrphanObjects(); - void RemoveInvalidVotes(); }; diff --git a/src/governance/object.cpp b/src/governance/object.cpp index 8bea754eea50..900c3ccf1b7a 100644 --- a/src/governance/object.cpp +++ b/src/governance/object.cpp @@ -9,7 +9,7 @@ #include #include #include -#include +#include #include #include diff --git a/src/governance/signing.cpp b/src/governance/signing.cpp index 13dd74d88637..52d6cd98894a 100644 --- a/src/governance/signing.cpp +++ b/src/governance/signing.cpp @@ -7,7 +7,7 @@ #include #include #include -#include +#include #include #include diff --git a/src/governance/vote.cpp b/src/governance/vote.cpp index 2a013465e38c..41299162ce3f 100644 --- a/src/governance/vote.cpp +++ b/src/governance/vote.cpp @@ -7,7 +7,7 @@ #include #include #include -#include +#include #include #include diff --git a/src/init.cpp b/src/init.cpp index 18d1cafe92c8..0493902b4a50 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -37,6 +37,9 @@ #include #include #include +#include +#include +#include #include #include #include @@ -95,7 +98,7 @@ #include #include #include -#include +#include #include #include #include @@ -234,6 +237,9 @@ void Interrupt(NodeContext& node) if (node.active_ctx) { node.active_ctx->Interrupt(); } + if (node.peerman) { + node.peerman->InterruptHandlers(); + } if (node.llmq_ctx) { node.llmq_ctx->Interrupt(); } @@ -269,7 +275,10 @@ void PrepareShutdown(NodeContext& node) StopREST(); StopRPC(); StopHTTPServer(); + if (node.peerman) node.peerman->RemoveHandlers(); + if (node.active_ctx) node.active_ctx->Stop(); + if (node.peerman) node.peerman->StopHandlers(); if (node.llmq_ctx) node.llmq_ctx->Stop(); for (const auto& client : node.chain_clients) { @@ -2173,6 +2182,8 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) g_active_notification_interface = std::make_unique(*node.active_ctx, *node.mn_activeman); RegisterValidationInterface(g_active_notification_interface.get()); } + node.peerman->AddExtraHandler(std::make_unique(node.peerman.get(), *node.llmq_ctx->isman, *node.llmq_ctx->qman)); + node.peerman->AddExtraHandler(std::make_unique(node.peerman.get(), *node.llmq_ctx->sigman, node.active_ctx ? node.active_ctx->shareman.get() : nullptr)); // ********************************************************* Step 7d: Setup other Dash services @@ -2204,6 +2215,8 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) } } + // NetGovernance depends on govman.IsValid() + node.peerman->AddExtraHandler(std::make_unique(node.peerman.get(), *node.govman, *node.mn_sync, *node.netfulfilledman)); // ********************************************************* Step 8: start indexers if (args.GetBoolArg("-txindex", DEFAULT_TXINDEX)) { g_txindex = std::make_unique(cache_sizes.tx_index, false, fReindex); @@ -2268,16 +2281,15 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) // ********************************************************* Step 10a: schedule Dash-specific tasks node.llmq_ctx->Start(*node.peerman); + node.peerman->StartHandlers(); if (node.active_ctx) node.active_ctx->Start(*node.connman, *node.peerman); node.scheduler->scheduleEvery(std::bind(&CNetFulfilledRequestManager::DoMaintenance, std::ref(*node.netfulfilledman)), std::chrono::minutes{1}); - node.scheduler->scheduleEvery(std::bind(&CMasternodeSync::DoMaintenance, std::ref(*node.mn_sync), std::cref(*node.peerman), std::cref(*node.govman)), std::chrono::seconds{1}); node.scheduler->scheduleEvery(std::bind(&CMasternodeUtils::DoMaintenance, std::ref(*node.connman), std::ref(*node.dmnman), std::ref(*node.mn_sync), node.cj_walletman.get()), std::chrono::minutes{1}); node.scheduler->scheduleEvery(std::bind(&CDeterministicMNManager::DoMaintenance, std::ref(*node.dmnman)), std::chrono::seconds{10}); - if (node.govman->IsValid()) { - node.govman->Schedule(*node.scheduler, *node.connman, *node.peerman); - } + + node.peerman->ScheduleHandlers(*node.scheduler); if (node.mn_activeman) { node.scheduler->scheduleEvery(std::bind(&CCoinJoinServer::DoMaintenance, std::ref(*node.active_ctx->cj_server)), std::chrono::seconds{1}); diff --git a/src/instantsend/instantsend.cpp b/src/instantsend/instantsend.cpp index 34537ffb64c8..5220c642ccb8 100644 --- a/src/instantsend/instantsend.cpp +++ b/src/instantsend/instantsend.cpp @@ -4,25 +4,17 @@ #include +#include #include #include -#include -#include -#include -#include -#include - -#include -#include #include -#include -#include -#include -#include +#include +#include #include #include - -#include +#include +#include +#include // Forward declaration to break dependency over node/transaction.h namespace node { @@ -56,100 +48,46 @@ Uint256HashSet GetIdsFromLockable(const std::vector& vec) } } // anonymous namespace -CInstantSendManager::CInstantSendManager(CChainLocksHandler& _clhandler, CChainState& chainstate, CQuorumManager& _qman, +CInstantSendManager::CInstantSendManager(CChainLocksHandler& _clhandler, CChainState& chainstate, CSigningManager& _sigman, CSporkManager& sporkman, CTxMemPool& _mempool, const CMasternodeSync& mn_sync, bool unitTests, bool fWipe) : db{unitTests, fWipe}, clhandler{_clhandler}, m_chainstate{chainstate}, - qman{_qman}, sigman{_sigman}, spork_manager{sporkman}, mempool{_mempool}, m_mn_sync{mn_sync} { - workInterrupt.reset(); } CInstantSendManager::~CInstantSendManager() = default; -void CInstantSendManager::Start(PeerManager& peerman) -{ - // can't start new thread if we have one running already - if (workThread.joinable()) { - assert(false); - } - - workThread = std::thread(&util::TraceThread, "isman", [this, &peerman] { WorkThreadMain(peerman); }); - - if (auto signer = m_signer.load(std::memory_order_acquire); signer) { - signer->Start(); - } -} - -void CInstantSendManager::Stop() +bool ShouldReportISLockTiming() { - if (auto signer = m_signer.load(std::memory_order_acquire); signer) { - signer->Stop(); - } - - // make sure to call InterruptWorkerThread() first - if (!workInterrupt) { - assert(false); - } - - if (workThread.joinable()) { - workThread.join(); - } -} - -bool ShouldReportISLockTiming() { return g_stats_client->active() || LogAcceptDebug(BCLog::INSTANTSEND); } -MessageProcessingResult CInstantSendManager::ProcessMessage(NodeId from, std::string_view msg_type, CDataStream& vRecv) +int CInstantSendManager::GetCycleBlockHeight(const uint256& cycle_hash) const { - if (!IsInstantSendEnabled() || msg_type != NetMsgType::ISDLOCK) { - return {}; - } - - const auto islock = std::make_shared(); - vRecv >> *islock; - - auto hash = ::SerializeHash(*islock); - - MessageProcessingResult ret{}; - ret.m_to_erase = CInv{MSG_ISDLOCK, hash}; - - if (!islock->TriviallyValid()) { - ret.m_error = MisbehavingError{100}; - return ret; - } - - const auto blockIndex = WITH_LOCK(::cs_main, return m_chainstate.m_blockman.LookupBlockIndex(islock->cycleHash)); + const auto blockIndex = WITH_LOCK(::cs_main, return m_chainstate.m_blockman.LookupBlockIndex(cycle_hash)); if (blockIndex == nullptr) { - // Maybe we don't have the block yet or maybe some peer spams invalid values for cycleHash - ret.m_error = MisbehavingError{1}; - return ret; - } - - // Deterministic islocks MUST use rotation based llmq - auto llmqType = Params().GetConsensus().llmqTypeDIP0024InstantSend; - const auto& llmq_params_opt = Params().GetLLMQ(llmqType); - assert(llmq_params_opt); - if (blockIndex->nHeight % llmq_params_opt->dkgInterval != 0) { - ret.m_error = MisbehavingError{100}; - return ret; + return -1; } + return blockIndex->nHeight; +} +bool CInstantSendManager::IsKnownInstantSend(const uint256& hash) const +{ if (WITH_LOCK(cs_pendingLocks, return pendingInstantSendLocks.count(hash) || pendingNoTxInstantSendLocks.count(hash)) || db.KnownInstantSendLock(hash)) { - return ret; + return true; } + return false; +} - LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- txid=%s, islock=%s: received islock, peer=%d\n", __func__, - islock->txid.ToString(), hash.ToString(), from); - +void CInstantSendManager::EnqueueInstantSendLock(NodeId from, const uint256& hash, const std::shared_ptr& islock) +{ if (ShouldReportISLockTiming()) { auto time_diff = [&]() -> int64_t { LOCK(cs_timingsTxSeen); @@ -169,187 +107,42 @@ MessageProcessingResult CInstantSendManager::ProcessMessage(NodeId from, std::st LOCK(cs_pendingLocks); pendingInstantSendLocks.emplace(hash, std::make_pair(from, islock)); - return ret; } -instantsend::PendingState CInstantSendManager::ProcessPendingInstantSendLocks() +instantsend::PendingState CInstantSendManager::GetPendingLocks() { - decltype(pendingInstantSendLocks) pend; instantsend::PendingState ret; if (!IsInstantSendEnabled()) { return ret; } - { - LOCK(cs_pendingLocks); - // only process a max 32 locks at a time to avoid duplicate verification of recovered signatures which have been - // verified by CSigningManager in parallel - const size_t maxCount = 32; - // The keys of the removed values are temporaily stored here to avoid invalidating an iterator - std::vector removed; - removed.reserve(maxCount); - - for (const auto& [islockHash, nodeid_islptr_pair] : pendingInstantSendLocks) { - // Check if we've reached max count - if (pend.size() >= maxCount) { - ret.m_pending_work = true; - break; - } - pend.emplace(islockHash, std::move(nodeid_islptr_pair)); - removed.emplace_back(islockHash); - } - - for (const auto& islockHash : removed) { - pendingInstantSendLocks.erase(islockHash); + LOCK(cs_pendingLocks); + // only process a max 32 locks at a time to avoid duplicate verification of recovered signatures which have been + // verified by CSigningManager in parallel + const size_t maxCount = 32; + // The keys of the removed values are temporaily stored here to avoid invalidating an iterator + std::vector removed; + removed.reserve(maxCount); + + for (const auto& [islockHash, nodeid_islptr_pair] : pendingInstantSendLocks) { + // Check if we've reached max count + if (ret.m_pending_is.size() >= maxCount) { + ret.m_pending_work = true; + break; } + ret.m_pending_is.emplace(islockHash, std::move(nodeid_islptr_pair)); + removed.emplace_back(islockHash); } - if (pend.empty()) { - ret.m_pending_work = false; - return ret; - } - - // TODO Investigate if leaving this is ok - auto llmqType = Params().GetConsensus().llmqTypeDIP0024InstantSend; - const auto& llmq_params_opt = Params().GetLLMQ(llmqType); - assert(llmq_params_opt); - const auto& llmq_params = llmq_params_opt.value(); - auto dkgInterval = llmq_params.dkgInterval; - - // First check against the current active set and don't ban - auto badISLocks = ProcessPendingInstantSendLocks(llmq_params, /*signOffset=*/0, /*ban=*/false, pend, ret.m_peer_activity); - if (!badISLocks.empty()) { - LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- doing verification on old active set\n", __func__); - - // filter out valid IS locks from "pend" - for (auto it = pend.begin(); it != pend.end();) { - if (!badISLocks.count(it->first)) { - it = pend.erase(it); - } else { - ++it; - } - } - // Now check against the previous active set and perform banning if this fails - ProcessPendingInstantSendLocks(llmq_params, dkgInterval, /*ban=*/true, pend, ret.m_peer_activity); + for (const auto& islockHash : removed) { + pendingInstantSendLocks.erase(islockHash); } return ret; } -Uint256HashSet CInstantSendManager::ProcessPendingInstantSendLocks( - const Consensus::LLMQParams& llmq_params, int signOffset, bool ban, - const Uint256HashMap>& pend, - std::vector>& peer_activity) -{ - CBLSBatchVerifier batchVerifier(false, true, 8); - Uint256HashMap recSigs; - - size_t verifyCount = 0; - size_t alreadyVerified = 0; - for (const auto& p : pend) { - const auto& hash = p.first; - auto nodeId = p.second.first; - const auto& islock = p.second.second; - - if (batchVerifier.badSources.count(nodeId)) { - continue; - } - - if (!islock->sig.Get().IsValid()) { - batchVerifier.badSources.emplace(nodeId); - continue; - } - - auto id = islock->GetRequestId(); - - // no need to verify an ISLOCK if we already have verified the recovered sig that belongs to it - if (sigman.HasRecoveredSig(llmq_params.type, id, islock->txid)) { - alreadyVerified++; - continue; - } - - const auto blockIndex = WITH_LOCK(::cs_main, return m_chainstate.m_blockman.LookupBlockIndex(islock->cycleHash)); - if (blockIndex == nullptr) { - batchVerifier.badSources.emplace(nodeId); - continue; - } - - int nSignHeight{-1}; - const auto dkgInterval = llmq_params.dkgInterval; - if (blockIndex->nHeight + dkgInterval < m_chainstate.m_chain.Height()) { - nSignHeight = blockIndex->nHeight + dkgInterval - 1; - } - // For RegTest non-rotating quorum cycleHash has directly quorum hash - auto quorum = llmq_params.useRotation ? llmq::SelectQuorumForSigning(llmq_params, m_chainstate.m_chain, qman, - id, nSignHeight, signOffset) - : qman.GetQuorum(llmq_params.type, islock->cycleHash); - - if (!quorum) { - // should not happen, but if one fails to select, all others will also fail to select - return {}; - } - uint256 signHash = llmq::SignHash{llmq_params.type, quorum->qc->quorumHash, id, islock->txid}.Get(); - batchVerifier.PushMessage(nodeId, hash, signHash, islock->sig.Get(), quorum->qc->quorumPublicKey); - verifyCount++; - - // We can reconstruct the CRecoveredSig objects from the islock and pass it to the signing manager, which - // avoids unnecessary double-verification of the signature. We however only do this when verification here - // turns out to be good (which is checked further down) - if (!sigman.HasRecoveredSigForId(llmq_params.type, id)) { - recSigs.try_emplace(hash, - CRecoveredSig(llmq_params.type, quorum->qc->quorumHash, id, islock->txid, islock->sig)); - } - } - - cxxtimer::Timer verifyTimer(true); - batchVerifier.Verify(); - verifyTimer.stop(); - - LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- verified locks. count=%d, alreadyVerified=%d, vt=%d, nodes=%d\n", __func__, - verifyCount, alreadyVerified, verifyTimer.count(), batchVerifier.GetUniqueSourceCount()); - - Uint256HashSet badISLocks; - - if (ban && !batchVerifier.badSources.empty()) { - LOCK(::cs_main); - for (const auto& nodeId : batchVerifier.badSources) { - // Let's not be too harsh, as the peer might simply be unlucky and might have sent us an old lock which - // does not validate anymore due to changed quorums - peer_activity.emplace_back(nodeId, MisbehavingError{20}); - } - } - for (const auto& p : pend) { - const auto& hash = p.first; - auto nodeId = p.second.first; - const auto& islock = p.second.second; - - if (batchVerifier.badMessages.count(hash)) { - LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- txid=%s, islock=%s: invalid sig in islock, peer=%d\n", - __func__, islock->txid.ToString(), hash.ToString(), nodeId); - badISLocks.emplace(hash); - continue; - } - - peer_activity.emplace_back(nodeId, ProcessInstantSendLock(nodeId, hash, islock)); - - // See comment further on top. We pass a reconstructed recovered sig to the signing manager to avoid - // double-verification of the sig. - auto it = recSigs.find(hash); - if (it != recSigs.end()) { - auto recSig = std::make_shared(std::move(it->second)); - if (!sigman.HasRecoveredSigForId(llmq_params.type, recSig->getId())) { - LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- txid=%s, islock=%s: passing reconstructed recSig to signing mgr, peer=%d\n", __func__, - islock->txid.ToString(), hash.ToString(), nodeId); - sigman.PushReconstructedRecoveredSig(recSig); - } - } - } - - return badISLocks; -} - -MessageProcessingResult CInstantSendManager::ProcessInstantSendLock(NodeId from, const uint256& hash, +std::variant CInstantSendManager::ProcessInstantSendLock(NodeId from, const uint256& hash, const instantsend::InstantSendLockPtr& islock) { LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- txid=%s, islock=%s: processing islock, peer=%d\n", @@ -359,12 +152,12 @@ MessageProcessingResult CInstantSendManager::ProcessInstantSendLock(NodeId from, signer->ClearLockFromQueue(islock); } if (db.KnownInstantSendLock(hash)) { - return {}; + return std::monostate{}; } if (const auto sameTxIsLock = db.GetInstantSendLockByTxid(islock->txid)) { // can happen, nothing to do - return {}; + return std::monostate{}; } for (const auto& in : islock->inputs) { const auto sameOutpointIsLock = db.GetInstantSendLockByInput(in); @@ -375,6 +168,7 @@ MessageProcessingResult CInstantSendManager::ProcessInstantSendLock(NodeId from, } uint256 hashBlock{}; + // TODO: move GetTransaction to NetInstantsend ; keep it here const auto tx = GetTransaction(nullptr, &mempool, islock->txid, Params().GetConsensus(), hashBlock); const CBlockIndex* pindexMined{nullptr}; const bool found_transaction{tx != nullptr}; @@ -387,7 +181,7 @@ MessageProcessingResult CInstantSendManager::ProcessInstantSendLock(NodeId from, if (pindexMined != nullptr && clhandler.HasChainLock(pindexMined->nHeight, pindexMined->GetBlockHash())) { LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- txlock=%s, islock=%s: dropping islock as it already got a ChainLock in block %s, peer=%d\n", __func__, islock->txid.ToString(), hash.ToString(), hashBlock.ToString(), from); - return {}; + return std::monostate{}; } } @@ -418,17 +212,10 @@ MessageProcessingResult CInstantSendManager::ProcessInstantSendLock(NodeId from, mempool.AddTransactionsUpdated(1); } - MessageProcessingResult ret{}; - CInv inv(MSG_ISDLOCK, hash); if (found_transaction) { - ret.m_inv_filter = std::make_pair(inv, tx); - } else { - // we don't have the TX yet, so we only filter based on txid. Later when that TX arrives, we will re-announce - // with the TX taken into account. - ret.m_inv_filter = std::make_pair(inv, islock->txid); - ret.m_request_tx = islock->txid; + return tx; } - return ret; + return islock->txid; } void CInstantSendManager::TransactionAddedToMempool(const CTransactionRef& tx) @@ -601,6 +388,24 @@ void CInstantSendManager::RemoveNonLockedTx(const uint256& txid, bool retryChild __func__, txid.ToString(), retryChildren, retryChildrenCount); } +std::vector CInstantSendManager::PrepareTxToRetry() +{ + std::vector txns{}; + + LOCK2(cs_nonLocked, cs_pendingRetry); + if (pendingRetryTxs.empty()) return txns; + txns.reserve(pendingRetryTxs.size()); + for (const auto& txid : pendingRetryTxs) { + if (auto it = nonLockedTxs.find(txid); it != nonLockedTxs.end()) { + const auto& [_, tx_info] = *it; + if (tx_info.tx) { + txns.push_back(tx_info.tx); + } + } + } + return txns; +} + void CInstantSendManager::RemoveConflictedTx(const CTransaction& tx) { RemoveNonLockedTx(tx.GetHash(), false); @@ -923,43 +728,6 @@ size_t CInstantSendManager::GetInstantSendLockCount() const return db.GetInstantSendLockCount(); } -void CInstantSendManager::WorkThreadMain(PeerManager& peerman) -{ - while (!workInterrupt) { - bool fMoreWork = [&]() -> bool { - if (!IsInstantSendEnabled()) return false; - auto [more_work, peer_activity] = ProcessPendingInstantSendLocks(); - for (auto& [node_id, mpr] : peer_activity) { - peerman.PostProcessMessage(std::move(mpr), node_id); - } - auto signer = m_signer.load(std::memory_order_acquire); - if (!signer) return more_work; - // Construct set of non-locked transactions that are pending to retry - std::vector txns{}; - { - LOCK2(cs_nonLocked, cs_pendingRetry); - if (pendingRetryTxs.empty()) return more_work; - txns.reserve(pendingRetryTxs.size()); - for (const auto& txid : pendingRetryTxs) { - if (auto it = nonLockedTxs.find(txid); it != nonLockedTxs.end()) { - const auto& [_, tx_info] = *it; - if (tx_info.tx) { - txns.push_back(tx_info.tx); - } - } - } - } - // Retry processing them - signer->ProcessPendingRetryLockTxs(txns); - return more_work; - }(); - - if (!fMoreWork && !workInterrupt.sleep_for(std::chrono::milliseconds(100))) { - return; - } - } -} - bool CInstantSendManager::IsInstantSendEnabled() const { return !fReindex && !fImporting && spork_manager.IsSporkActive(SPORK_2_INSTANTSEND_ENABLED); diff --git a/src/instantsend/instantsend.h b/src/instantsend/instantsend.h index e9b5c809c197..20cf9ba6208c 100644 --- a/src/instantsend/instantsend.h +++ b/src/instantsend/instantsend.h @@ -5,21 +5,22 @@ #ifndef BITCOIN_INSTANTSEND_INSTANTSEND_H #define BITCOIN_INSTANTSEND_INSTANTSEND_H +#include +#include +#include + #include #include #include #include #include -#include - -#include -#include -#include #include #include #include #include +#include +#include class CBlockIndex; class CChainState; @@ -27,7 +28,6 @@ class CDataStream; class CMasternodeSync; class CSporkManager; class CTxMemPool; -class PeerManager; namespace Consensus { struct LLMQParams; } // namespace Consensus @@ -37,13 +37,12 @@ class InstantSendSigner; struct PendingState { bool m_pending_work{false}; - std::vector> m_peer_activity{}; + Uint256HashMap> m_pending_is; }; } // namespace instantsend namespace llmq { class CChainLocksHandler; -class CQuorumManager; class CSigningManager; class CInstantSendManager final : public instantsend::InstantSendSignerParent @@ -53,7 +52,6 @@ class CInstantSendManager final : public instantsend::InstantSendSignerParent CChainLocksHandler& clhandler; CChainState& m_chainstate; - CQuorumManager& qman; CSigningManager& sigman; CSporkManager& spork_manager; CTxMemPool& mempool; @@ -61,9 +59,6 @@ class CInstantSendManager final : public instantsend::InstantSendSignerParent std::atomic m_signer{nullptr}; - std::thread workThread; - CThreadInterrupt workInterrupt; - mutable Mutex cs_pendingLocks; // Incoming and not verified yet Uint256HashMap> pendingInstantSendLocks GUARDED_BY(cs_pendingLocks); @@ -89,7 +84,7 @@ class CInstantSendManager final : public instantsend::InstantSendSignerParent Uint256HashMap timingsTxSeen GUARDED_BY(cs_timingsTxSeen); public: - explicit CInstantSendManager(CChainLocksHandler& _clhandler, CChainState& chainstate, CQuorumManager& _qman, + explicit CInstantSendManager(CChainLocksHandler& _clhandler, CChainState& chainstate, CSigningManager& _sigman, CSporkManager& sporkman, CTxMemPool& _mempool, const CMasternodeSync& mn_sync, bool unitTests, bool fWipe); ~CInstantSendManager(); @@ -102,21 +97,9 @@ class CInstantSendManager final : public instantsend::InstantSendSignerParent } void DisconnectSigner() { m_signer.store(nullptr, std::memory_order_release); } - void Start(PeerManager& peerman); - void Stop(); - void InterruptWorkerThread() { workInterrupt(); }; + instantsend::InstantSendSigner* Signer() const { return m_signer.load(); } private: - instantsend::PendingState ProcessPendingInstantSendLocks() - EXCLUSIVE_LOCKS_REQUIRED(!cs_nonLocked, !cs_pendingLocks, !cs_pendingRetry); - - Uint256HashSet ProcessPendingInstantSendLocks(const Consensus::LLMQParams& llmq_params, int signOffset, bool ban, - const Uint256HashMap>& pend, - std::vector>& peer_activity) - EXCLUSIVE_LOCKS_REQUIRED(!cs_nonLocked, !cs_pendingLocks, !cs_pendingRetry); - MessageProcessingResult ProcessInstantSendLock(NodeId from, const uint256& hash, - const instantsend::InstantSendLockPtr& islock) - EXCLUSIVE_LOCKS_REQUIRED(!cs_nonLocked, !cs_pendingLocks, !cs_pendingRetry); void AddNonLockedTx(const CTransactionRef& tx, const CBlockIndex* pindexMined) EXCLUSIVE_LOCKS_REQUIRED(!cs_nonLocked, !cs_pendingLocks, !cs_timingsTxSeen); @@ -131,9 +114,6 @@ class CInstantSendManager final : public instantsend::InstantSendSignerParent void ResolveBlockConflicts(const uint256& islockHash, const instantsend::InstantSendLock& islock) EXCLUSIVE_LOCKS_REQUIRED(!cs_nonLocked, !cs_pendingLocks, !cs_pendingRetry); - void WorkThreadMain(PeerManager& peerman) - EXCLUSIVE_LOCKS_REQUIRED(!cs_nonLocked, !cs_pendingLocks, !cs_pendingRetry); - void HandleFullyConfirmedBlock(const CBlockIndex* pindex) EXCLUSIVE_LOCKS_REQUIRED(!cs_nonLocked, !cs_pendingRetry); @@ -142,9 +122,23 @@ class CInstantSendManager final : public instantsend::InstantSendSignerParent bool IsWaitingForTx(const uint256& txHash) const EXCLUSIVE_LOCKS_REQUIRED(!cs_pendingLocks); instantsend::InstantSendLockPtr GetConflictingLock(const CTransaction& tx) const override; - [[nodiscard]] MessageProcessingResult ProcessMessage(NodeId from, std::string_view msg_type, CDataStream& vRecv) + /* Helpers for communications between CInstantSendManager & NetInstantSend */ + // GetCycleBlockHeight returns negative number if cycle_hash is now known + int GetCycleBlockHeight(const uint256& cycle_hash) const; + bool IsKnownInstantSend(const uint256& hash) const EXCLUSIVE_LOCKS_REQUIRED(!cs_pendingLocks); + void EnqueueInstantSendLock(NodeId from, const uint256& hash, const std::shared_ptr& islock) EXCLUSIVE_LOCKS_REQUIRED(!cs_pendingLocks); + std::vector PrepareTxToRetry() EXCLUSIVE_LOCKS_REQUIRED(!cs_nonLocked, cs_pendingRetry); + instantsend::PendingState GetPendingLocks(); + CSigningManager& Sigman() { return sigman; } + CChainState& Chainstate() { return m_chainstate; } + std::variant ProcessInstantSendLock( + NodeId from, const uint256& hash, + const instantsend::InstantSendLockPtr& islock) + EXCLUSIVE_LOCKS_REQUIRED(!cs_nonLocked, !cs_pendingLocks, !cs_pendingRetry); + //---- + // void TransactionAddedToMempool(const CTransactionRef& tx) EXCLUSIVE_LOCKS_REQUIRED(!cs_nonLocked, !cs_pendingLocks, !cs_pendingRetry, !cs_timingsTxSeen); void TransactionRemovedFromMempool(const CTransactionRef& tx); diff --git a/src/instantsend/signing.cpp b/src/instantsend/signing.cpp index 789c0135f968..413cf4cb89d9 100644 --- a/src/instantsend/signing.cpp +++ b/src/instantsend/signing.cpp @@ -14,7 +14,7 @@ #include #include #include -#include +#include #include // Forward declaration to break dependency over node/transaction.h @@ -331,7 +331,7 @@ bool InstantSendSigner::TrySignInputLocks(const CTransaction& tx, bool fRetroact WITH_LOCK(cs_input_requests, inputRequestIds.emplace(id)); LogPrint(BCLog::INSTANTSEND, "%s -- txid=%s: trying to vote on input %s with id %s. fRetroactive=%d\n", __func__, tx.GetHash().ToString(), in.prevout.ToStringShort(), id.ToString(), fRetroactive); - if (m_shareman.AsyncSignIfMember(llmqType, m_sigman, id, tx.GetHash(), {}, fRetroactive)) { + if (m_shareman.AsyncSignIfMember(llmqType, id, tx.GetHash(), {}, fRetroactive)) { LogPrint(BCLog::INSTANTSEND, "%s -- txid=%s: voted on input %s with id %s\n", __func__, tx.GetHash().ToString(), in.prevout.ToStringShort(), id.ToString()); } @@ -389,6 +389,6 @@ void InstantSendSigner::TrySignInstantSendLock(const CTransaction& tx) txToCreatingInstantSendLocks.emplace(tx.GetHash(), &e.first->second); } - m_shareman.AsyncSignIfMember(llmqType, m_sigman, id, tx.GetHash(), quorum->m_quorum_base_block_index->GetBlockHash()); + m_shareman.AsyncSignIfMember(llmqType, id, tx.GetHash(), quorum->m_quorum_base_block_index->GetBlockHash()); } } // namespace instantsend diff --git a/src/llmq/context.cpp b/src/llmq/context.cpp index 359d2a23924c..0ee243180f74 100644 --- a/src/llmq/context.cpp +++ b/src/llmq/context.cpp @@ -30,9 +30,9 @@ LLMQContext::LLMQContext(ChainstateManager& chainman, CDeterministicMNManager& d qman{std::make_unique(*bls_worker, chainman.ActiveChainstate(), dmnman, *qdkgsman, evo_db, *quorum_block_processor, *qsnapman, mn_activeman, mn_sync, sporkman, unit_tests, wipe)}, - sigman{std::make_unique(chainman.ActiveChainstate(), *qman, unit_tests, wipe)}, + sigman{std::make_unique(*qman, unit_tests, wipe)}, clhandler{std::make_unique(chainman.ActiveChainstate(), *qman, sporkman, mempool, mn_sync)}, - isman{std::make_unique(*clhandler, chainman.ActiveChainstate(), *qman, *sigman, sporkman, + isman{std::make_unique(*clhandler, chainman.ActiveChainstate(), *sigman, sporkman, mempool, mn_sync, unit_tests, wipe)} { // Have to start it early to let VerifyDB check ChainLock signatures in coinbase @@ -44,22 +44,16 @@ LLMQContext::~LLMQContext() { } void LLMQContext::Interrupt() { - isman->InterruptWorkerThread(); - sigman->InterruptWorkerThread(); } void LLMQContext::Start(PeerManager& peerman) { qman->Start(); - sigman->StartWorkerThread(peerman); clhandler->Start(*isman); - isman->Start(peerman); } void LLMQContext::Stop() { - isman->Stop(); clhandler->Stop(); - sigman->StopWorkerThread(); qman->Stop(); } diff --git a/src/llmq/ehf_signals.cpp b/src/llmq/ehf_signals.cpp index afec0ae3ee94..4546619cc364 100644 --- a/src/llmq/ehf_signals.cpp +++ b/src/llmq/ehf_signals.cpp @@ -78,7 +78,7 @@ void CEHFSignalsHandler::trySignEHFSignal(int bit, const CBlockIndex* const pind const uint256 msgHash = mnhfPayload.PrepareTx().GetHash(); WITH_LOCK(cs, ids.insert(requestId)); - shareman.AsyncSignIfMember(llmqType, sigman, requestId, msgHash, quorum->qc->quorumHash, false, true); + shareman.AsyncSignIfMember(llmqType, requestId, msgHash, quorum->qc->quorumHash, false, true); } MessageProcessingResult CEHFSignalsHandler::HandleNewRecoveredSig(const CRecoveredSig& recoveredSig) @@ -92,7 +92,6 @@ MessageProcessingResult CEHFSignalsHandler::HandleNewRecoveredSig(const CRecover return {}; } - MessageProcessingResult ret; const auto ehfSignals = mnhfman.GetSignalsStage(WITH_LOCK(::cs_main, return m_chainman.ActiveTip())); MNHFTxPayload mnhfPayload; for (const auto& deployment : Params().GetConsensus().vDeployments) { @@ -112,19 +111,18 @@ MessageProcessingResult CEHFSignalsHandler::HandleNewRecoveredSig(const CRecover CMutableTransaction tx = mnhfPayload.PrepareTx(); - { - CTransactionRef tx_to_sent = MakeTransactionRef(std::move(tx)); - LogPrintf("CEHFSignalsHandler::HandleNewRecoveredSig Special EHF TX is created hash=%s\n", tx_to_sent->GetHash().ToString()); - LOCK(::cs_main); - const MempoolAcceptResult result = m_chainman.ProcessTransaction(tx_to_sent); - if (result.m_result_type == MempoolAcceptResult::ResultType::VALID) { - ret.m_transactions.push_back(tx_to_sent->GetHash()); - } else { - LogPrintf("CEHFSignalsHandler::HandleNewRecoveredSig -- AcceptToMemoryPool failed: %s\n", result.m_state.ToString()); - } + CTransactionRef tx_to_sent = MakeTransactionRef(std::move(tx)); + LogPrintf("CEHFSignalsHandler::HandleNewRecoveredSig Special EHF TX is created hash=%s\n", tx_to_sent->GetHash().ToString()); + LOCK(::cs_main); + const MempoolAcceptResult result = m_chainman.ProcessTransaction(tx_to_sent); + if (result.m_result_type == MempoolAcceptResult::ResultType::VALID) { + MessageProcessingResult ret; + ret.m_transactions.push_back(tx_to_sent->GetHash()); + return ret; } - break; + LogPrintf("CEHFSignalsHandler::HandleNewRecoveredSig -- AcceptToMemoryPool failed: %s\n", result.m_state.ToString()); + return {}; } - return ret; + return {}; } } // namespace llmq diff --git a/src/llmq/quorums.cpp b/src/llmq/quorums.cpp index c73395a01071..8dba511eedcf 100644 --- a/src/llmq/quorums.cpp +++ b/src/llmq/quorums.cpp @@ -18,7 +18,7 @@ #include #include #include -#include +#include #include #include #include diff --git a/src/llmq/signing.cpp b/src/llmq/signing.cpp index 8ed8640c60cb..ec779366c78b 100644 --- a/src/llmq/signing.cpp +++ b/src/llmq/signing.cpp @@ -7,22 +7,15 @@ #include #include #include -#include -#include #include -#include #include -#include -#include -#include #include -#include +#include #include -#include -#include #include +#include #include namespace llmq @@ -332,9 +325,8 @@ void CRecoveredSigsDb::CleanupOldVotes(int64_t maxAge) ////////////////// -CSigningManager::CSigningManager(const CChainState& chainstate, const CQuorumManager& _qman, bool fMemory, bool fWipe) : +CSigningManager::CSigningManager(const CQuorumManager& _qman, bool fMemory, bool fWipe) : db(fMemory, fWipe), - m_chainstate(chainstate), qman(_qman) { } @@ -366,55 +358,12 @@ bool CSigningManager::GetRecoveredSigForGetData(const uint256& hash, CRecoveredS return true; } -static bool PreVerifyRecoveredSig(const CQuorumManager& quorum_manager, const CRecoveredSig& recoveredSig, bool& retBan) +void CSigningManager::ProcessRecoveredSig(NodeId from, std::shared_ptr&& recoveredSig) { - retBan = false; - - auto llmqType = recoveredSig.getLlmqType(); - if (!Params().GetLLMQ(llmqType).has_value()) { - retBan = true; - return false; - } - - auto quorum = quorum_manager.GetQuorum(llmqType, recoveredSig.getQuorumHash()); - - if (!quorum) { - LogPrint(BCLog::LLMQ, "CSigningManager::%s -- quorum %s not found\n", __func__, - recoveredSig.getQuorumHash().ToString()); - return false; - } - if (!IsQuorumActive(llmqType, quorum_manager, quorum->qc->quorumHash)) { - return false; - } - - return true; -} - -MessageProcessingResult CSigningManager::ProcessMessage(NodeId from, std::string_view msg_type, CDataStream& vRecv) -{ - if (msg_type != NetMsgType::QSIGREC) { - return {}; - } - - auto recoveredSig = std::make_shared(); - vRecv >> *recoveredSig; - - MessageProcessingResult ret{}; - ret.m_to_erase = CInv{MSG_QUORUM_RECOVERED_SIG, recoveredSig->GetHash()}; - - bool ban = false; - if (!PreVerifyRecoveredSig(qman, *recoveredSig, ban)) { - if (ban) { - ret.m_error = MisbehavingError{100}; - return ret; - } - return ret; - } - // It's important to only skip seen *valid* sig shares here. See comment for CBatchedSigShare // We don't receive recovered sigs in batches, but we do batched verification per node on these if (db.HasRecoveredSigForHash(recoveredSig->GetHash())) { - return ret; + return; } LogPrint(BCLog::LLMQ, "CSigningManager::%s -- signHash=%s, id=%s, msgHash=%s, node=%d\n", __func__, @@ -425,11 +374,10 @@ MessageProcessingResult CSigningManager::ProcessMessage(NodeId from, std::string // no need to perform full verification LogPrint(BCLog::LLMQ, "CSigningManager::%s -- already pending reconstructed sig, signHash=%s, id=%s, msgHash=%s, node=%d\n", __func__, recoveredSig->buildSignHash().ToString(), recoveredSig->getId().ToString(), recoveredSig->getMsgHash().ToString(), from); - return ret; + return; } pendingRecoveredSigs[from].emplace_back(recoveredSig); - return ret; } void CSigningManager::CollectPendingRecoveredSigsToVerify( @@ -467,10 +415,7 @@ void CSigningManager::CollectPendingRecoveredSigsToVerify( } } - for (auto& p : retSigShares) { - NodeId nodeId = p.first; - auto& v = p.second; - + for (auto& [nodeId, v] : retSigShares) { for (auto it = v.begin(); it != v.end();) { const auto& recSig = *it; @@ -499,88 +444,20 @@ void CSigningManager::CollectPendingRecoveredSigsToVerify( } } -void CSigningManager::ProcessPendingReconstructedRecoveredSigs(PeerManager& peerman) +Uint256HashMap> CSigningManager::FetchPendingReconstructed() { - decltype(pendingReconstructedRecoveredSigs) m; - WITH_LOCK(cs_pending, swap(m, pendingReconstructedRecoveredSigs)); - - for (const auto& p : m) { - ProcessRecoveredSig(p.second, peerman); - } -} - -bool CSigningManager::ProcessPendingRecoveredSigs(PeerManager& peerman) -{ - std::unordered_map>> recSigsByNode; - std::unordered_map, CQuorumCPtr, StaticSaltedHasher> quorums; - - ProcessPendingReconstructedRecoveredSigs(peerman); - - const size_t nMaxBatchSize{32}; - CollectPendingRecoveredSigsToVerify(nMaxBatchSize, recSigsByNode, quorums); - if (recSigsByNode.empty()) { - return false; - } - - // It's ok to perform insecure batched verification here as we verify against the quorum public keys, which are not - // craftable by individual entities, making the rogue public key attack impossible - CBLSBatchVerifier batchVerifier(false, false); - - size_t verifyCount = 0; - for (const auto& p : recSigsByNode) { - NodeId nodeId = p.first; - const auto& v = p.second; - - for (const auto& recSig : v) { - // we didn't verify the lazy signature until now - if (!recSig->sig.Get().IsValid()) { - batchVerifier.badSources.emplace(nodeId); - break; - } - - const auto& quorum = quorums.at(std::make_pair(recSig->getLlmqType(), recSig->getQuorumHash())); - batchVerifier.PushMessage(nodeId, recSig->GetHash(), recSig->buildSignHash().Get(), recSig->sig.Get(), - quorum->qc->quorumPublicKey); - verifyCount++; - } - } - - cxxtimer::Timer verifyTimer(true); - batchVerifier.Verify(); - verifyTimer.stop(); - - LogPrint(BCLog::LLMQ, "CSigningManager::%s -- verified recovered sig(s). count=%d, vt=%d, nodes=%d\n", __func__, verifyCount, verifyTimer.count(), recSigsByNode.size()); - - Uint256HashSet processed; - for (const auto& p : recSigsByNode) { - NodeId nodeId = p.first; - const auto& v = p.second; - - if (batchVerifier.badSources.count(nodeId)) { - LogPrint(BCLog::LLMQ, "CSigningManager::%s -- invalid recSig from other node, banning peer=%d\n", __func__, nodeId); - peerman.Misbehaving(nodeId, 100); - continue; - } - - for (const auto& recSig : v) { - if (!processed.emplace(recSig->GetHash()).second) { - continue; - } - - ProcessRecoveredSig(recSig, peerman); - } - } - - return recSigsByNode.size() >= nMaxBatchSize; + Uint256HashMap> tmp; + WITH_LOCK(cs_pending, swap(tmp, pendingReconstructedRecoveredSigs)); + return tmp; } // signature must be verified already -void CSigningManager::ProcessRecoveredSig(const std::shared_ptr& recoveredSig, PeerManager& peerman) +bool CSigningManager::ProcessRecoveredSig(const std::shared_ptr& recoveredSig) { auto llmqType = recoveredSig->getLlmqType(); if (db.HasRecoveredSigForHash(recoveredSig->GetHash())) { - return; + return false; } auto signHash = recoveredSig->buildSignHash(); @@ -602,7 +479,7 @@ void CSigningManager::ProcessRecoveredSig(const std::shared_ptrGetHash())); - auto listeners = WITH_LOCK(cs_listeners, return recoveredSigsListeners); - for (auto& l : listeners) { - peerman.PostProcessMessage(l->HandleNewRecoveredSig(*recoveredSig)); - } + return true; +} - GetMainSignals().NotifyRecoveredSig(recoveredSig, recoveredSig->GetHash().ToString()); +std::vector CSigningManager::GetListeners() const +{ + LOCK(cs_listeners); + return recoveredSigsListeners; } void CSigningManager::PushReconstructedRecoveredSig(const std::shared_ptr& recoveredSig) @@ -704,47 +582,6 @@ bool CSigningManager::GetVoteForId(Consensus::LLMQType llmqType, const uint256& return db.GetVoteForId(llmqType, id, msgHashRet); } -void CSigningManager::StartWorkerThread(PeerManager& peerman) -{ - // can't start new thread if we have one running already - if (workThread.joinable()) { - assert(false); - } - - workThread = std::thread(&util::TraceThread, "recsigs", [this, &peerman] { WorkThreadMain(peerman); }); -} - -void CSigningManager::StopWorkerThread() -{ - // make sure to call InterruptWorkerThread() first - if (!workInterrupt) { - assert(false); - } - - if (workThread.joinable()) { - workThread.join(); - } -} - -void CSigningManager::InterruptWorkerThread() -{ - workInterrupt(); -} - -void CSigningManager::WorkThreadMain(PeerManager& peerman) -{ - while (!workInterrupt) { - bool fMoreWork = ProcessPendingRecoveredSigs(peerman); - - Cleanup(); - - // TODO Wakeup when pending signing is needed? - if (!fMoreWork && !workInterrupt.sleep_for(std::chrono::milliseconds(100))) { - return; - } - } -} - SignHash CSigBase::buildSignHash() const { return SignHash(llmqType, quorumHash, id, msgHash); } diff --git a/src/llmq/signing.h b/src/llmq/signing.h index 4f60be0320ee..ed59c3e2c251 100644 --- a/src/llmq/signing.h +++ b/src/llmq/signing.h @@ -7,22 +7,17 @@ #include #include -#include #include -#include #include +#include #include #include #include #include -#include - -#include #include #include -#include #include class CChainState; @@ -30,7 +25,6 @@ class CDataStream; class CDBBatch; class CDBWrapper; class CInv; -class PeerManager; struct RPCResult; class UniValue; @@ -38,6 +32,7 @@ class UniValue; namespace llmq { class CQuorumManager; class CSigSharesManager; +class SignHash; // Keep recovered signatures for a week. This is a "-maxrecsigsage" option default. static constexpr int64_t DEFAULT_MAX_RECOVERED_SIGS_AGE{60 * 60 * 24 * 7}; @@ -55,7 +50,7 @@ class CSigBase CSigBase() = default; public: - [[nodiscard]] constexpr auto getLlmqType() const { + [[nodiscard]] constexpr Consensus::LLMQType getLlmqType() const { return llmqType; } @@ -164,7 +159,6 @@ class CSigningManager private: CRecoveredSigsDb db; - const CChainState& m_chainstate; const CQuorumManager& qman; mutable Mutex cs_pending; @@ -180,12 +174,12 @@ class CSigningManager std::vector recoveredSigsListeners GUARDED_BY(cs_listeners); public: - CSigningManager(const CChainState& chainstate, const CQuorumManager& _qman, bool fMemory, bool fWipe); + CSigningManager(const CQuorumManager& _qman, bool fMemory, bool fWipe); bool AlreadyHave(const CInv& inv) const EXCLUSIVE_LOCKS_REQUIRED(!cs_pending); bool GetRecoveredSigForGetData(const uint256& hash, CRecoveredSig& ret) const; - [[nodiscard]] MessageProcessingResult ProcessMessage(NodeId from, std::string_view msg_type, CDataStream& vRecv) + void ProcessRecoveredSig(NodeId from, std::shared_ptr&& recovered_sig) EXCLUSIVE_LOCKS_REQUIRED(!cs_pending); // This is called when a recovered signature was was reconstructed from another P2P message and is known to be valid @@ -199,20 +193,21 @@ class CSigningManager // allows AlreadyHave to keep returning true. Cleanup will later remove the remains void TruncateRecoveredSig(Consensus::LLMQType llmqType, const uint256& id); -private: + // Used by NetSigning: + const CQuorumManager& Qman() { return qman; } + Uint256HashMap> FetchPendingReconstructed() EXCLUSIVE_LOCKS_REQUIRED(!cs_pending); void CollectPendingRecoveredSigsToVerify( size_t maxUniqueSessions, std::unordered_map>>& retSigShares, std::unordered_map, CQuorumCPtr, StaticSaltedHasher>& retQuorums) EXCLUSIVE_LOCKS_REQUIRED(!cs_pending); - void ProcessPendingReconstructedRecoveredSigs(PeerManager& peerman) - EXCLUSIVE_LOCKS_REQUIRED(!cs_pending, !cs_listeners); - bool ProcessPendingRecoveredSigs(PeerManager& peerman) - EXCLUSIVE_LOCKS_REQUIRED(!cs_pending, !cs_listeners); // called from the worker thread of CSigSharesManager + std::vector GetListeners() const EXCLUSIVE_LOCKS_REQUIRED(!cs_listeners); + // Returns true if recovered sigs should be send to listeners + bool ProcessRecoveredSig(const std::shared_ptr& recoveredSig) + EXCLUSIVE_LOCKS_REQUIRED(!cs_pending); +private: // Used by CSigSharesManager CRecoveredSigsDb& GetDb() { return db; } - void ProcessRecoveredSig(const std::shared_ptr& recoveredSig, PeerManager& peerman) - EXCLUSIVE_LOCKS_REQUIRED(!cs_pending, !cs_listeners); // Needed for access to GetDb() and ProcessRecoveredSig() friend class CSigSharesManager; @@ -230,16 +225,8 @@ class CSigningManager bool GetVoteForId(Consensus::LLMQType llmqType, const uint256& id, uint256& msgHashRet) const; -private: - std::thread workThread; - CThreadInterrupt workInterrupt; - void Cleanup(); // called from the worker thread of CSigSharesManager - void WorkThreadMain(PeerManager& peerman) EXCLUSIVE_LOCKS_REQUIRED(!cs_pending, !cs_listeners); - public: - void StartWorkerThread(PeerManager& peerman); - void StopWorkerThread(); - void InterruptWorkerThread(); + void Cleanup(); }; template diff --git a/src/llmq/signing_shares.cpp b/src/llmq/signing_shares.cpp index 4b070416b9bb..ed3326f6e11c 100644 --- a/src/llmq/signing_shares.cpp +++ b/src/llmq/signing_shares.cpp @@ -10,15 +10,11 @@ #include #include -#include #include #include #include -#include #include -#include #include -#include #include #include #include @@ -181,43 +177,19 @@ void CSigSharesNodeState::RemoveSession(const uint256& signHash) ////////////////////// CSigSharesManager::CSigSharesManager(CConnman& connman, CChainState& chainstate, CSigningManager& _sigman, - PeerManager& peerman, const CActiveMasternodeManager& mn_activeman, + const CActiveMasternodeManager& mn_activeman, const CQuorumManager& _qman, const CSporkManager& sporkman) : m_connman{connman}, m_chainstate{chainstate}, sigman{_sigman}, - m_peerman{peerman}, m_mn_activeman{mn_activeman}, qman{_qman}, m_sporkman{sporkman} { - workInterrupt.reset(); } CSigSharesManager::~CSigSharesManager() = default; -void CSigSharesManager::StartWorkerThread() -{ - // can't start new thread if we have one running already - if (workThread.joinable()) { - assert(false); - } - - workThread = std::thread(&util::TraceThread, "sigshares", [this] { WorkThreadMain(); }); -} - -void CSigSharesManager::StopWorkerThread() -{ - // make sure to call InterruptWorkerThread() first - if (!workInterrupt) { - assert(false); - } - - if (workThread.joinable()) { - workThread.join(); - } -} - void CSigSharesManager::RegisterAsRecoveredSigsListener() { sigman.RegisterRecoveredSigsListener(this); @@ -228,91 +200,7 @@ void CSigSharesManager::UnregisterAsRecoveredSigsListener() sigman.UnregisterRecoveredSigsListener(this); } -void CSigSharesManager::InterruptWorkerThread() -{ - workInterrupt(); -} - -void CSigSharesManager::ProcessMessage(const CNode& pfrom, const std::string& msg_type, CDataStream& vRecv) -{ - // non-masternodes are not interested in sigshares - if (m_mn_activeman.GetProTxHash().IsNull()) return; - - if (m_sporkman.IsSporkActive(SPORK_21_QUORUM_ALL_CONNECTED) && msg_type == NetMsgType::QSIGSHARE) { - std::vector receivedSigShares; - vRecv >> receivedSigShares; - - if (receivedSigShares.size() > MAX_MSGS_SIG_SHARES) { - LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- too many sigs in QSIGSHARE message. cnt=%d, max=%d, node=%d\n", __func__, receivedSigShares.size(), MAX_MSGS_SIG_SHARES, pfrom.GetId()); - BanNode(pfrom.GetId()); - return; - } - - for (const auto& sigShare : receivedSigShares) { - ProcessMessageSigShare(pfrom.GetId(), sigShare); - } - } - - if (msg_type == NetMsgType::QSIGSESANN) { - std::vector msgs; - vRecv >> msgs; - if (msgs.size() > MAX_MSGS_CNT_QSIGSESANN) { - LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- too many announcements in QSIGSESANN message. cnt=%d, max=%d, node=%d\n", __func__, msgs.size(), MAX_MSGS_CNT_QSIGSESANN, pfrom.GetId()); - BanNode(pfrom.GetId()); - return; - } - if (!ranges::all_of(msgs, - [this, &pfrom](const auto& ann){ return ProcessMessageSigSesAnn(pfrom, ann); })) { - BanNode(pfrom.GetId()); - return; - } - } else if (msg_type == NetMsgType::QSIGSHARESINV) { - std::vector msgs; - vRecv >> msgs; - if (msgs.size() > MAX_MSGS_CNT_QSIGSHARESINV) { - LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- too many invs in QSIGSHARESINV message. cnt=%d, max=%d, node=%d\n", __func__, msgs.size(), MAX_MSGS_CNT_QSIGSHARESINV, pfrom.GetId()); - BanNode(pfrom.GetId()); - return; - } - if (!ranges::all_of(msgs, - [this, &pfrom](const auto& inv){ return ProcessMessageSigSharesInv(pfrom, inv); })) { - BanNode(pfrom.GetId()); - return; - } - } else if (msg_type == NetMsgType::QGETSIGSHARES) { - std::vector msgs; - vRecv >> msgs; - if (msgs.size() > MAX_MSGS_CNT_QGETSIGSHARES) { - LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- too many invs in QGETSIGSHARES message. cnt=%d, max=%d, node=%d\n", __func__, msgs.size(), MAX_MSGS_CNT_QGETSIGSHARES, pfrom.GetId()); - BanNode(pfrom.GetId()); - return; - } - if (!ranges::all_of(msgs, - [this, &pfrom](const auto& inv){ return ProcessMessageGetSigShares(pfrom, inv); })) { - BanNode(pfrom.GetId()); - return; - } - } else if (msg_type == NetMsgType::QBSIGSHARES) { - std::vector msgs; - vRecv >> msgs; - size_t totalSigsCount = 0; - for (const auto& bs : msgs) { - totalSigsCount += bs.sigShares.size(); - } - if (totalSigsCount > MAX_MSGS_TOTAL_BATCHED_SIGS) { - LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- too many sigs in QBSIGSHARES message. cnt=%d, max=%d, node=%d\n", __func__, msgs.size(), MAX_MSGS_TOTAL_BATCHED_SIGS, pfrom.GetId()); - BanNode(pfrom.GetId()); - return; - } - if (!ranges::all_of(msgs, - [this, &pfrom](const auto& bs){ return ProcessMessageBatchedSigShares(pfrom, bs); })) { - BanNode(pfrom.GetId()); - return; - } - } -} - -bool CSigSharesManager::ProcessMessageSigSesAnn(const CNode& pfrom, const CSigSesAnn& ann) +bool CSigSharesManager::ProcessMessageSigSesAnn(const CSigSesAnn& ann, NodeId node_id) { auto llmqType = ann.getLlmqType(); if (!Params().GetLLMQ(llmqType).has_value()) { @@ -322,18 +210,18 @@ bool CSigSharesManager::ProcessMessageSigSesAnn(const CNode& pfrom, const CSigSe return false; } - LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- ann={%s}, node=%d\n", __func__, ann.ToString(), pfrom.GetId()); + LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- ann={%s}, node=%d\n", __func__, ann.ToString(), node_id); auto quorum = qman.GetQuorum(llmqType, ann.getQuorumHash()); if (!quorum) { // TODO should we ban here? LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- quorum %s not found, node=%d\n", __func__, - ann.getQuorumHash().ToString(), pfrom.GetId()); + ann.getQuorumHash().ToString(), node_id); return true; // let's still try other announcements from the same message } LOCK(cs); - auto& nodeState = nodeStates[pfrom.GetId()]; + auto& nodeState = nodeStates[node_id]; auto& session = nodeState.GetOrCreateSessionFromAnn(ann); nodeState.sessionByRecvId.erase(session.recvSessionId); nodeState.sessionByRecvId.erase(ann.getSessionId()); @@ -344,20 +232,18 @@ bool CSigSharesManager::ProcessMessageSigSesAnn(const CNode& pfrom, const CSigSe return true; } -bool CSigSharesManager::VerifySigSharesInv(Consensus::LLMQType llmqType, const CSigSharesInv& inv) -{ - const auto& llmq_params_opt = Params().GetLLMQ(llmqType); - return llmq_params_opt.has_value() && (inv.inv.size() == size_t(llmq_params_opt->size)); -} - -bool CSigSharesManager::ProcessMessageSigSharesInv(const CNode& pfrom, const CSigSharesInv& inv) +bool CSigSharesManager::ProcessMessageSigShares(const CSigSharesInv& inv, NodeId node_id, bool requested) { CSigSharesNodeState::SessionInfo sessionInfo; - if (!GetSessionInfoByRecvId(pfrom.GetId(), inv.sessionId, sessionInfo)) { + if (!GetSessionInfoByRecvId(node_id, inv.sessionId, sessionInfo)) { return true; } - if (!VerifySigSharesInv(sessionInfo.llmqType, inv)) { + const auto& llmq_params_opt = Params().GetLLMQ(sessionInfo.llmqType); + if (!llmq_params_opt.has_value()) { + return false; + } + if (inv.inv.size() != size_t(llmq_params_opt->size)) { return false; } @@ -367,52 +253,26 @@ bool CSigSharesManager::ProcessMessageSigSharesInv(const CNode& pfrom, const CSi } LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- signHash=%s, inv={%s}, node=%d\n", __func__, - sessionInfo.signHash.ToString(), inv.ToString(), pfrom.GetId()); + sessionInfo.signHash.ToString(), inv.ToString(), node_id); - if (!sessionInfo.quorum->HasVerificationVector()) { + if (!requested && !sessionInfo.quorum->HasVerificationVector()) { // TODO we should allow to ask other nodes for the quorum vvec if we missed it in the DKG LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- we don't have the quorum vvec for %s, not requesting sig shares. node=%d\n", __func__, - sessionInfo.quorumHash.ToString(), pfrom.GetId()); + sessionInfo.quorumHash.ToString(), node_id); return true; } LOCK(cs); - auto& nodeState = nodeStates[pfrom.GetId()]; + auto& nodeState = nodeStates[node_id]; auto* session = nodeState.GetSessionByRecvId(inv.sessionId); if (session == nullptr) { return true; } - session->announced.Merge(inv); - session->knows.Merge(inv); - return true; -} - -bool CSigSharesManager::ProcessMessageGetSigShares(const CNode& pfrom, const CSigSharesInv& inv) -{ - CSigSharesNodeState::SessionInfo sessionInfo; - if (!GetSessionInfoByRecvId(pfrom.GetId(), inv.sessionId, sessionInfo)) { - return true; - } - - if (!VerifySigSharesInv(sessionInfo.llmqType, inv)) { - return false; - } - - // TODO for PoSe, we should consider propagating shares even if we already have a recovered sig - if (sigman.HasRecoveredSigForSession(sessionInfo.signHash.Get())) { - return true; + if (requested) { + session->requested.Merge(inv); + } else { + session->announced.Merge(inv); } - - LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- signHash=%s, inv={%s}, node=%d\n", __func__, - sessionInfo.signHash.ToString(), inv.ToString(), pfrom.GetId()); - - LOCK(cs); - auto& nodeState = nodeStates[pfrom.GetId()]; - auto* session = nodeState.GetSessionByRecvId(inv.sessionId); - if (session == nullptr) { - return true; - } - session->requested.Merge(inv); session->knows.Merge(inv); return true; } @@ -471,47 +331,42 @@ bool CSigSharesManager::ProcessMessageBatchedSigShares(const CNode& pfrom, const return true; } -void CSigSharesManager::ProcessMessageSigShare(NodeId fromId, const CSigShare& sigShare) +bool CSigSharesManager::ProcessMessageSigShare(const CSigShare& sigShare, NodeId fromId) { auto quorum = qman.GetQuorum(sigShare.getLlmqType(), sigShare.getQuorumHash()); if (!quorum) { - return; + return true; } if (!IsQuorumActive(sigShare.getLlmqType(), qman, quorum->qc->quorumHash)) { // quorum is too old - return; + return true; } if (!quorum->IsMember(m_mn_activeman.GetProTxHash())) { // we're not a member so we can't verify it (we actually shouldn't have received it) - return; + return true; } if (!quorum->HasVerificationVector()) { // TODO we should allow to ask other nodes for the quorum vvec if we missed it in the DKG LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- we don't have the quorum vvec for %s, no verification possible. node=%d\n", __func__, quorum->qc->quorumHash.ToString(), fromId); - return; + return true; } if (sigShare.getQuorumMember() >= quorum->members.size()) { LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- quorumMember out of bounds\n", __func__); - BanNode(fromId); - return; + return false; } if (!quorum->qc->validMembers[sigShare.getQuorumMember()]) { LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- quorumMember not valid\n", __func__); - BanNode(fromId); - return; + return false; } { LOCK(cs); - if (sigShares.Has(sigShare.GetKey())) { - return; - } - - if (sigman.HasRecoveredSigForId(sigShare.getLlmqType(), sigShare.getId())) { - return; + if (sigShares.Has(sigShare.GetKey()) || + sigman.HasRecoveredSigForId(sigShare.getLlmqType(), sigShare.getId())) { + return true; } auto& nodeState = nodeStates[fromId]; @@ -520,6 +375,7 @@ void CSigSharesManager::ProcessMessageSigShare(NodeId fromId, const CSigShare& s LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- signHash=%s, id=%s, msgHash=%s, member=%d, node=%d\n", __func__, sigShare.GetSignHash().ToString(), sigShare.getId().ToString(), sigShare.getMsgHash().ToString(), sigShare.getQuorumMember(), fromId); + return true; } bool CSigSharesManager::PreVerifyBatchedSigShares(const CActiveMasternodeManager& mn_activeman, const CQuorumManager& quorum_manager, @@ -564,14 +420,14 @@ bool CSigSharesManager::PreVerifyBatchedSigShares(const CActiveMasternodeManager return true; } -bool CSigSharesManager::CollectPendingSigSharesToVerify( +void CSigSharesManager::CollectPendingSigSharesToVerify( size_t maxUniqueSessions, std::unordered_map>& retSigShares, std::unordered_map, CQuorumCPtr, StaticSaltedHasher>& retQuorums) { { LOCK(cs); if (nodeStates.empty()) { - return false; + return; } // This will iterate node states in random order and pick one sig share at a time. This avoids processing @@ -604,7 +460,7 @@ bool CSigSharesManager::CollectPendingSigSharesToVerify( rnd); if (retSigShares.empty()) { - return false; + return; } } @@ -626,101 +482,17 @@ bool CSigSharesManager::CollectPendingSigSharesToVerify( if (!quorum) { LogPrintf("%s: ERROR! Unexpected missing quorum with llmqType=%d, quorumHash=%s\n", __func__, ToUnderlying(llmqType), sigShare.getQuorumHash().ToString()); - return false; + retSigShares.clear(); + retQuorums.clear(); + return; } retQuorums.try_emplace(k, quorum); } } - - return true; -} - -bool CSigSharesManager::ProcessPendingSigShares() -{ - std::unordered_map> sigSharesByNodes; - std::unordered_map, CQuorumCPtr, StaticSaltedHasher> quorums; - - const size_t nMaxBatchSize{32}; - bool collect_status = CollectPendingSigSharesToVerify(nMaxBatchSize, sigSharesByNodes, quorums); - if (!collect_status || sigSharesByNodes.empty()) { - return false; - } - - // It's ok to perform insecure batched verification here as we verify against the quorum public key shares, - // which are not craftable by individual entities, making the rogue public key attack impossible - CBLSBatchVerifier batchVerifier(false, true); - - cxxtimer::Timer prepareTimer(true); - size_t verifyCount = 0; - for (const auto& [nodeId, v] : sigSharesByNodes) { - for (const auto& sigShare : v) { - if (sigman.HasRecoveredSigForId(sigShare.getLlmqType(), sigShare.getId())) { - continue; - } - - // we didn't check this earlier because we use a lazy BLS signature and tried to avoid doing the expensive - // deserialization in the message thread - if (!sigShare.sigShare.Get().IsValid()) { - BanNode(nodeId); - // don't process any additional shares from this node - break; - } - - auto quorum = quorums.at(std::make_pair(sigShare.getLlmqType(), sigShare.getQuorumHash())); - auto pubKeyShare = quorum->GetPubKeyShare(sigShare.getQuorumMember()); - - if (!pubKeyShare.IsValid()) { - // this should really not happen (we already ensured we have the quorum vvec, - // so we should also be able to create all pubkey shares) - LogPrintf("CSigSharesManager::%s -- pubKeyShare is invalid, which should not be possible here\n", __func__); - assert(false); - } - - batchVerifier.PushMessage(nodeId, sigShare.GetKey(), sigShare.GetSignHash(), sigShare.sigShare.Get(), pubKeyShare); - verifyCount++; - } - } - prepareTimer.stop(); - - cxxtimer::Timer verifyTimer(true); - batchVerifier.Verify(); - verifyTimer.stop(); - - LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- verified sig shares. count=%d, pt=%d, vt=%d, nodes=%d\n", __func__, verifyCount, prepareTimer.count(), verifyTimer.count(), sigSharesByNodes.size()); - - for (const auto& [nodeId, v] : sigSharesByNodes) { - if (batchVerifier.badSources.count(nodeId) != 0) { - LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- invalid sig shares from other node, banning peer=%d\n", - __func__, nodeId); - // this will also cause re-requesting of the shares that were sent by this node - BanNode(nodeId); - continue; - } - - ProcessPendingSigShares(v, quorums); - } - - return sigSharesByNodes.size() >= nMaxBatchSize; -} - -// It's ensured that no duplicates are passed to this method -void CSigSharesManager::ProcessPendingSigShares( - const std::vector& sigSharesToProcess, - const std::unordered_map, CQuorumCPtr, StaticSaltedHasher>& quorums) -{ - cxxtimer::Timer t(true); - for (const auto& sigShare : sigSharesToProcess) { - auto quorumKey = std::make_pair(sigShare.getLlmqType(), sigShare.getQuorumHash()); - ProcessSigShare(sigShare, quorums.at(quorumKey)); - } - t.stop(); - - LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- processed sigShare batch. shares=%d, time=%ds\n", __func__, - sigSharesToProcess.size(), t.count()); } // sig shares are already verified when entering this method -void CSigSharesManager::ProcessSigShare(const CSigShare& sigShare, const CQuorumCPtr& quorum) +std::shared_ptr CSigSharesManager::ProcessSigShare(const CSigShare& sigShare, const CQuorumCPtr& quorum) { auto llmqType = quorum->params.type; bool canTryRecovery = false; @@ -735,14 +507,14 @@ void CSigSharesManager::ProcessSigShare(const CSigShare& sigShare, const CQuorum } if (sigman.HasRecoveredSigForId(llmqType, sigShare.getId())) { - return; + return nullptr; } { LOCK(cs); if (!sigShares.Add(sigShare.GetKey(), sigShare)) { - return; + return nullptr; } if (!isAllMembersConnectedEnabled) { sigSharesQueuedToAnnounce.Add(sigShare.GetKey(), true); @@ -768,14 +540,15 @@ void CSigSharesManager::ProcessSigShare(const CSigShare& sigShare, const CQuorum } if (canTryRecovery) { - TryRecoverSig(quorum, sigShare.getId(), sigShare.getMsgHash()); + return TryRecoverSig(quorum, sigShare.getId(), sigShare.getMsgHash()); } + return nullptr; } -void CSigSharesManager::TryRecoverSig(const CQuorumCPtr& quorum, const uint256& id, const uint256& msgHash) +std::shared_ptr CSigSharesManager::TryRecoverSig(const CQuorumCPtr& quorum, const uint256& id, const uint256& msgHash) { if (sigman.HasRecoveredSigForId(quorum->params.type, id)) { - return; + return nullptr; } std::vector sigSharesForRecovery; @@ -786,7 +559,7 @@ void CSigSharesManager::TryRecoverSig(const CQuorumCPtr& quorum, const uint256& auto signHash = SignHash(quorum->params.type, quorum->qc->quorumHash, id, msgHash).Get(); const auto* sigSharesForSignHash = sigShares.GetAllForSignHash(signHash); if (sigSharesForSignHash == nullptr) { - return; + return nullptr; } if (quorum->params.is_single_member()) { @@ -795,7 +568,7 @@ void CSigSharesManager::TryRecoverSig(const CQuorumCPtr& quorum, const uint256& "CSigSharesManager::%s -- impossible to recover single-node signature - no shares yet. id=%s, " "msgHash=%s\n", __func__, id.ToString(), msgHash.ToString()); - return; + return nullptr; } const auto& sigShare = sigSharesForSignHash->begin()->second; CBLSSignature recoveredSig = sigShare.sigShare.Get(); @@ -804,8 +577,7 @@ void CSigSharesManager::TryRecoverSig(const CQuorumCPtr& quorum, const uint256& auto rs = std::make_shared(quorum->params.type, quorum->qc->quorumHash, id, msgHash, recoveredSig); - sigman.ProcessRecoveredSig(rs, m_peerman); - return; // end of single-quorum processing + return rs; // end of single-quorum processing } sigSharesForRecovery.reserve((size_t) quorum->params.threshold); @@ -818,7 +590,7 @@ void CSigSharesManager::TryRecoverSig(const CQuorumCPtr& quorum, const uint256& // check if we can recover the final signature if (sigSharesForRecovery.size() < size_t(quorum->params.threshold)) { - return; + return nullptr; } } @@ -828,7 +600,7 @@ void CSigSharesManager::TryRecoverSig(const CQuorumCPtr& quorum, const uint256& if (!recoveredSig.Recover(sigSharesForRecovery, idsForRecovery)) { LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- failed to recover signature. id=%s, msgHash=%s, time=%d\n", __func__, id.ToString(), msgHash.ToString(), t.count()); - return; + return nullptr; } LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- recovered signature. id=%s, msgHash=%s, time=%d\n", __func__, @@ -846,11 +618,10 @@ void CSigSharesManager::TryRecoverSig(const CQuorumCPtr& quorum, const uint256& // this should really not happen as we have verified all signature shares before LogPrintf("CSigSharesManager::%s -- own recovered signature is invalid. id=%s, msgHash=%s\n", __func__, id.ToString(), msgHash.ToString()); - return; + return nullptr; } } - - sigman.ProcessRecoveredSig(rs, m_peerman); + return rs; } CDeterministicMNCPtr CSigSharesManager::SelectMemberForRecovery(const CQuorumCPtr& quorum, const uint256 &id, int attempt) @@ -868,7 +639,7 @@ CDeterministicMNCPtr CSigSharesManager::SelectMemberForRecovery(const CQuorumCPt return v[attempt % v.size()].second; } -bool CSigSharesManager::AsyncSignIfMember(Consensus::LLMQType llmqType, CSigningManager& sigman, const uint256& id, +bool CSigSharesManager::AsyncSignIfMember(Consensus::LLMQType llmqType, const uint256& id, const uint256& msgHash, const uint256& quorumHash, bool allowReSign, bool allowDiffMsgHashSigning) { @@ -930,7 +701,7 @@ bool CSigSharesManager::AsyncSignIfMember(Consensus::LLMQType llmqType, CSigning } } - if (db.HasRecoveredSigForId(llmqType, id)) { + if (sigman.HasRecoveredSigForId(llmqType, id)) { // no need to sign it if we already have a recovered sig return true; } @@ -943,14 +714,11 @@ bool CSigSharesManager::AsyncSignIfMember(Consensus::LLMQType llmqType, CSigning // make us re-announce all known shares (other nodes might have run into a timeout) ForceReAnnouncement(quorum, llmqType, id, msgHash); } - AsyncSign(quorum, id, msgHash); - return true; -} + LOCK(cs_pendingSigns); + pendingSigns.emplace_back(quorum, id, msgHash); -void CSigSharesManager::NotifyRecoveredSig(const std::shared_ptr& sig) const -{ - m_peerman.RelayRecoveredSig(Assert(sig)->GetHash()); + return true; } void CSigSharesManager::CollectSigSharesToRequest(std::unordered_map>& sigSharesToRequest) @@ -1280,25 +1048,6 @@ bool CSigSharesManager::SendMessages() } } - if (const auto it = sigSharesToRequest.find(pnode->GetId()); it != sigSharesToRequest.end()) { - std::vector msgs; - for (const auto& [signHash, inv] : it->second) { - assert(inv.CountSet() != 0); - LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::SendMessages -- QGETSIGSHARES signHash=%s, inv={%s}, node=%d\n", - signHash.ToString(), inv.ToString(), pnode->GetId()); - msgs.emplace_back(inv); - if (msgs.size() == MAX_MSGS_CNT_QGETSIGSHARES) { - m_connman.PushMessage(pnode, msgMaker.Make(NetMsgType::QGETSIGSHARES, msgs)); - msgs.clear(); - didSend = true; - } - } - if (!msgs.empty()) { - m_connman.PushMessage(pnode, msgMaker.Make(NetMsgType::QGETSIGSHARES, msgs)); - didSend = true; - } - } - if (const auto jt = sigShareBatchesToSend.find(pnode->GetId()); jt != sigShareBatchesToSend.end()) { size_t totalSigsCount = 0; std::vector msgs; @@ -1321,25 +1070,29 @@ bool CSigSharesManager::SendMessages() } } - if (const auto kt = sigSharesToAnnounce.find(pnode->GetId()); kt != sigSharesToAnnounce.end()) { - std::vector msgs; - for (const auto& [signHash, inv] : kt->second) { - assert(inv.CountSet() != 0); - LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::SendMessages -- QSIGSHARESINV signHash=%s, inv={%s}, node=%d\n", - signHash.ToString(), inv.ToString(), pnode->GetId()); - msgs.emplace_back(inv); - if (msgs.size() == MAX_MSGS_CNT_QSIGSHARESINV) { - m_connman.PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSHARESINV, msgs)); - msgs.clear(); + for (const auto& [msg, sig_shares] : { + std::make_pair(NetMsgType::QSIGSHARESINV, sigSharesToAnnounce), + std::make_pair(NetMsgType::QGETSIGSHARES, sigSharesToRequest)}) + { + if (const auto kt = sig_shares.find(pnode->GetId()); kt != sig_shares.end()) { + std::vector msgs; + for (const auto& [signHash, inv] : kt->second) { + assert(inv.CountSet() != 0); + LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::SendMessages -- %s signHash=%s, inv={%s}, node=%d\n", + msg, signHash.ToString(), inv.ToString(), pnode->GetId()); + msgs.emplace_back(inv); + if (msgs.size() == MAX_MSGS_CNT_QSIGSHARES) { + m_connman.PushMessage(pnode, msgMaker.Make(msg, msgs)); + msgs.clear(); + didSend = true; + } + } + if (!msgs.empty()) { + m_connman.PushMessage(pnode, msgMaker.Make(msg, msgs)); didSend = true; } } - if (!msgs.empty()) { - m_connman.PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSHARESINV, msgs)); - didSend = true; - } } - auto lt = sigSharesToSend.find(pnode->GetId()); if (lt != sigSharesToSend.end()) { std::vector msgs; @@ -1411,36 +1164,22 @@ void CSigSharesManager::Cleanup() } { + Uint256HashSet sessions_to_remove; // Now delete sessions which are for inactive quorums LOCK(cs); - Uint256HashSet inactiveQuorumSessions; - sigShares.ForEach([&quorums, &inactiveQuorumSessions](const SigShareKey&, const CSigShare& sigShare) { + sigShares.ForEach([this, &quorums, &sessions_to_remove](const SigShareKey&, const CSigShare& sigShare) { if (quorums.count(std::make_pair(sigShare.getLlmqType(), sigShare.getQuorumHash())) == 0) { - inactiveQuorumSessions.emplace(sigShare.GetSignHash()); + sessions_to_remove.emplace(sigShare.GetSignHash()); + } else if (sigman.HasRecoveredSigForSession(sigShare.GetSignHash())) { + sessions_to_remove.emplace(sigShare.GetSignHash()); } }); - for (const auto& signHash : inactiveQuorumSessions) { + for (const auto& signHash : sessions_to_remove) { RemoveSigSharesForSession(signHash); } } - { LOCK(cs); - - // Remove sessions which were successfully recovered - Uint256HashSet doneSessions; - sigShares.ForEach([&doneSessions, this](const SigShareKey&, const CSigShare& sigShare) { - if (doneSessions.count(sigShare.GetSignHash()) != 0) { - return; - } - if (sigman.HasRecoveredSigForSession(sigShare.GetSignHash())) { - doneSessions.emplace(sigShare.GetSignHash()); - } - }); - for (const auto& signHash : doneSessions) { - RemoveSigSharesForSession(signHash); - } - // Remove sessions which timed out Uint256HashSet timeoutSessions; for (const auto& [signHash, lastSeenTime] : timeSeenForSessions) { @@ -1528,35 +1267,37 @@ void CSigSharesManager::RemoveSigSharesForSession(const uint256& signHash) timeSeenForSessions.erase(signHash); } -void CSigSharesManager::RemoveBannedNodeStates() +std::vector CSigSharesManager::GetAllNodes() const { - // Called regularly to cleanup local node states for banned nodes + vector nodes; LOCK(cs); - for (auto it = nodeStates.begin(); it != nodeStates.end();) { - if (m_peerman.IsBanned(it->first)) { - // re-request sigshares from other nodes - // TODO: remove NO_THREAD_SAFETY_ANALYSIS - // using here template ForEach makes impossible to use lock annotation - it->second.requestedSigShares.ForEach([this](const SigShareKey& k, int64_t) NO_THREAD_SAFETY_ANALYSIS { - AssertLockHeld(cs); - sigSharesRequested.Erase(k); - }); - it = nodeStates.erase(it); - } else { - ++it; - } + nodes.reserve(nodeStates.size()); + for (const auto& [node_id, _] : nodeStates) { + nodes.push_back(node_id); } + return nodes; } -void CSigSharesManager::BanNode(NodeId nodeId) +void CSigSharesManager::RemoveAsBanned(NodeId node_id) { - if (nodeId == -1) { - return; - } + LOCK(cs); - m_peerman.Misbehaving(nodeId, 100); + auto it = nodeStates.find(node_id); + if (it != nodeStates.end()) { + // re-request sigshares from other nodes + // TODO: remove NO_THREAD_SAFETY_ANALYSIS + // using here template ForEach makes impossible to use lock annotation + it->second.requestedSigShares.ForEach([this](const SigShareKey& k, int64_t) NO_THREAD_SAFETY_ANALYSIS { + AssertLockHeld(cs); + sigSharesRequested.Erase(k); + }); + nodeStates.erase(it); + } +} +void CSigSharesManager::MarkAsBanned(NodeId nodeId) +{ LOCK(cs); auto it = nodeStates.find(nodeId); if (it == nodeStates.end()) { @@ -1564,7 +1305,7 @@ void CSigSharesManager::BanNode(NodeId nodeId) } auto& nodeState = it->second; - // Whatever we requested from him, let's request it from someone else now + // Whatever we requested from him, let's request it better from someone else now // TODO: remove NO_THREAD_SAFETY_ANALYSIS // using here template ForEach makes impossible to use lock annotation nodeState.requestedSigShares.ForEach([this](const SigShareKey& k, int64_t) NO_THREAD_SAFETY_ANALYSIS { @@ -1575,58 +1316,33 @@ void CSigSharesManager::BanNode(NodeId nodeId) nodeState.banned = true; } -void CSigSharesManager::WorkThreadMain() -{ - int64_t lastSendTime = 0; - - while (!workInterrupt) { - RemoveBannedNodeStates(); - - bool fMoreWork = ProcessPendingSigShares(); - SignPendingSigShares(); - - if (TicksSinceEpoch(SystemClock::now()) - lastSendTime > 100) { - SendMessages(); - lastSendTime = TicksSinceEpoch(SystemClock::now()); - } - - Cleanup(); - - // TODO Wakeup when pending signing is needed? - if (!fMoreWork && !workInterrupt.sleep_for(std::chrono::milliseconds(100))) { - return; - } - } -} - -void CSigSharesManager::AsyncSign(const CQuorumCPtr& quorum, const uint256& id, const uint256& msgHash) -{ - LOCK(cs_pendingSigns); - pendingSigns.emplace_back(quorum, id, msgHash); -} - -void CSigSharesManager::SignPendingSigShares() +std::vector CSigSharesManager::FetchPendingSigShares() { std::vector v; WITH_LOCK(cs_pendingSigns, v.swap(pendingSigns)); + return v; +} - for (const auto& [pQuorum, id, msgHash] : v) { - auto opt_sigShare = CreateSigShare(pQuorum, id, msgHash); +std::shared_ptr CSigSharesManager::SignPendingSigShare(const llmq::PendingSignatureData& data) +{ + const auto& [pQuorum, id, msgHash] = data; + auto opt_sigShare = CreateSigShare(pQuorum, id, msgHash); - if (opt_sigShare.has_value() && opt_sigShare->sigShare.Get().IsValid()) { - auto sigShare = *opt_sigShare; - ProcessSigShare(sigShare, pQuorum); + if (opt_sigShare.has_value() && opt_sigShare->sigShare.Get().IsValid()) { + auto sigShare = *opt_sigShare; + auto rs = ProcessSigShare(sigShare, pQuorum); - if (IsAllMembersConnectedEnabled(pQuorum->params.type, m_sporkman)) { - LOCK(cs); - auto& session = signedSessions[sigShare.GetSignHash()]; - session.sigShare = sigShare; - session.quorum = pQuorum; - session.nextAttemptTime = 0; - session.attempt = 0; - } + if (IsAllMembersConnectedEnabled(pQuorum->params.type, m_sporkman)) { + LOCK(cs); + auto& session = signedSessions[sigShare.GetSignHash()]; + session.sigShare = sigShare; + session.quorum = pQuorum; + session.nextAttemptTime = 0; + session.attempt = 0; } + return rs; } + return nullptr; } std::optional CSigSharesManager::CreateSigShare(const CQuorumCPtr& quorum, const uint256& id, const uint256& msgHash) const diff --git a/src/llmq/signing_shares.h b/src/llmq/signing_shares.h index fb2f692306fa..fa96217a8b27 100644 --- a/src/llmq/signing_shares.h +++ b/src/llmq/signing_shares.h @@ -7,6 +7,7 @@ #include #include +#include #include #include @@ -14,7 +15,6 @@ #include #include #include -#include #include #include @@ -22,7 +22,6 @@ #include #include #include -#include #include #include #include @@ -32,7 +31,6 @@ class CNode; class CConnman; class CDeterministicMN; class CSporkManager; -class PeerManager; namespace llmq { @@ -359,27 +357,36 @@ class CSignedSession int attempt{0}; }; +struct PendingSignatureData { + const CQuorumCPtr quorum; + const uint256 id; + const uint256 msgHash; + + PendingSignatureData(CQuorumCPtr quorum, const uint256& id, const uint256& msgHash) : quorum(std::move(quorum)), id(id), msgHash(msgHash){} +}; + + +// TODO: drop all class CSigSharesManager; replace it by std::vector +// and move it to llmq/signing.h; +// Keep only methods AsyncSignIfMember & CreateSigShare in that +// Move everything else to NetSigning ; in this file keep only CSigShare & related classes class CSigSharesManager : public CRecoveredSigsListener { -private: - static constexpr int64_t SESSION_NEW_SHARES_TIMEOUT{60}; - static constexpr int64_t SIG_SHARE_REQUEST_TIMEOUT{5}; - +public: // we try to keep total message size below 10k static constexpr size_t MAX_MSGS_CNT_QSIGSESANN{100}; - static constexpr size_t MAX_MSGS_CNT_QGETSIGSHARES{200}; - static constexpr size_t MAX_MSGS_CNT_QSIGSHARESINV{200}; + static constexpr size_t MAX_MSGS_CNT_QSIGSHARES{200}; // 400 is the maximum quorum size, so this is also the maximum number of sigs we need to support static constexpr size_t MAX_MSGS_TOTAL_BATCHED_SIGS{400}; + static constexpr size_t MAX_MSGS_SIG_SHARES{32}; +private: + static constexpr int64_t SESSION_NEW_SHARES_TIMEOUT{60}; + static constexpr int64_t SIG_SHARE_REQUEST_TIMEOUT{5}; static constexpr int64_t EXP_SEND_FOR_RECOVERY_TIMEOUT{2000}; static constexpr int64_t MAX_SEND_FOR_RECOVERY_TIMEOUT{10000}; - static constexpr size_t MAX_MSGS_SIG_SHARES{32}; - - RecursiveMutex cs; - std::thread workThread; - CThreadInterrupt workInterrupt; + mutable RecursiveMutex cs; SigShareMap sigShares GUARDED_BY(cs); Uint256HashMap signedSessions GUARDED_BY(cs); @@ -391,14 +398,6 @@ class CSigSharesManager : public CRecoveredSigsListener SigShareMap> sigSharesRequested GUARDED_BY(cs); SigShareMap sigSharesQueuedToAnnounce GUARDED_BY(cs); - struct PendingSignatureData { - const CQuorumCPtr quorum; - const uint256 id; - const uint256 msgHash; - - PendingSignatureData(CQuorumCPtr quorum, const uint256& id, const uint256& msgHash) : quorum(std::move(quorum)), id(id), msgHash(msgHash){} - }; - Mutex cs_pendingSigns; std::vector pendingSigns GUARDED_BY(cs_pendingSigns); @@ -407,7 +406,6 @@ class CSigSharesManager : public CRecoveredSigsListener CConnman& m_connman; CChainState& m_chainstate; CSigningManager& sigman; - PeerManager& m_peerman; const CActiveMasternodeManager& m_mn_activeman; const CQuorumManager& qman; const CSporkManager& m_sporkman; @@ -417,70 +415,65 @@ class CSigSharesManager : public CRecoveredSigsListener public: explicit CSigSharesManager(CConnman& connman, CChainState& chainstate, CSigningManager& _sigman, - PeerManager& peerman, const CActiveMasternodeManager& mn_activeman, + const CActiveMasternodeManager& mn_activeman, const CQuorumManager& _qman, const CSporkManager& sporkman); ~CSigSharesManager() override; CSigSharesManager() = delete; - void StartWorkerThread(); - void StopWorkerThread(); void RegisterAsRecoveredSigsListener(); void UnregisterAsRecoveredSigsListener(); - void InterruptWorkerThread(); void ProcessMessage(const CNode& pnode, const std::string& msg_type, CDataStream& vRecv); - void AsyncSign(const CQuorumCPtr& quorum, const uint256& id, const uint256& msgHash) - EXCLUSIVE_LOCKS_REQUIRED(!cs_pendingSigns); std::optional CreateSigShare(const CQuorumCPtr& quorum, const uint256& id, const uint256& msgHash) const; void ForceReAnnouncement(const CQuorumCPtr& quorum, Consensus::LLMQType llmqType, const uint256& id, const uint256& msgHash); + // TODO: simplify it to std::variant [[nodiscard]] MessageProcessingResult HandleNewRecoveredSig(const CRecoveredSig& recoveredSig) override; static CDeterministicMNCPtr SelectMemberForRecovery(const CQuorumCPtr& quorum, const uint256& id, int attempt); - bool AsyncSignIfMember(Consensus::LLMQType llmqType, CSigningManager& sigman, const uint256& id, + bool AsyncSignIfMember(Consensus::LLMQType llmqType, const uint256& id, const uint256& msgHash, const uint256& quorumHash = uint256(), bool allowReSign = false, bool allowDiffMsgHashSigning = false) EXCLUSIVE_LOCKS_REQUIRED(!cs_pendingSigns); - void NotifyRecoveredSig(const std::shared_ptr& sig) const; - -private: - // all of these return false when the currently processed message should be aborted (as each message actually contains multiple messages) - bool ProcessMessageSigSesAnn(const CNode& pfrom, const CSigSesAnn& ann); - bool ProcessMessageSigSharesInv(const CNode& pfrom, const CSigSharesInv& inv); - bool ProcessMessageGetSigShares(const CNode& pfrom, const CSigSharesInv& inv); + // Used by NetSigning + std::vector FetchPendingSigShares() EXCLUSIVE_LOCKS_REQUIRED(!cs_pendingSigns); + [[nodiscard]] std::shared_ptr SignPendingSigShare(const llmq::PendingSignatureData& v); + bool SendMessages(); + void Cleanup(); + const CActiveMasternodeManager& ActiveMNManager() { return m_mn_activeman; } + const CSporkManager& SporkManager() { return m_sporkman; } + void MarkAsBanned(NodeId node_id) EXCLUSIVE_LOCKS_REQUIRED(!cs); + void RemoveAsBanned(NodeId node_id) EXCLUSIVE_LOCKS_REQUIRED(!cs); + std::vector GetAllNodes() const EXCLUSIVE_LOCKS_REQUIRED(!cs); + + // It returns true if message is generally valid but we can't process it + // if it returns false, the sender peer should be banned + bool ProcessMessageSigShare(const CSigShare& sigShare, NodeId node_id); + std::shared_ptr ProcessSigShare(const CSigShare& sigShare, const CQuorumCPtr& quorum); + + // all of these return false when the currently processed message should be aborted (as each message actually contains multiple messages). The sender should be banned + bool ProcessMessageSigSesAnn(const CSigSesAnn& ann, NodeId node_id); + bool ProcessMessageSigShares(const CSigSharesInv& inv, NodeId node_id, bool requested); bool ProcessMessageBatchedSigShares(const CNode& pfrom, const CBatchedSigShares& batchedSigShares); - void ProcessMessageSigShare(NodeId fromId, const CSigShare& sigShare); - - static bool VerifySigSharesInv(Consensus::LLMQType llmqType, const CSigSharesInv& inv); - static bool PreVerifyBatchedSigShares(const CActiveMasternodeManager& mn_activeman, const CQuorumManager& quorum_manager, - const CSigSharesNodeState::SessionInfo& session, const CBatchedSigShares& batchedSigShares, bool& retBan); - - bool CollectPendingSigSharesToVerify( + void CollectPendingSigSharesToVerify( size_t maxUniqueSessions, std::unordered_map>& retSigShares, std::unordered_map, CQuorumCPtr, StaticSaltedHasher>& retQuorums); - bool ProcessPendingSigShares(); - void ProcessPendingSigShares( - const std::vector& sigSharesToProcess, - const std::unordered_map, CQuorumCPtr, StaticSaltedHasher>& quorums); +private: + static bool PreVerifyBatchedSigShares(const CActiveMasternodeManager& mn_activeman, const CQuorumManager& quorum_manager, + const CSigSharesNodeState::SessionInfo& session, const CBatchedSigShares& batchedSigShares, bool& retBan); - void ProcessSigShare(const CSigShare& sigShare, const CQuorumCPtr& quorum); - void TryRecoverSig(const CQuorumCPtr& quorum, const uint256& id, const uint256& msgHash); + std::shared_ptr TryRecoverSig(const CQuorumCPtr& quorum, const uint256& id, const uint256& msgHash); bool GetSessionInfoByRecvId(NodeId nodeId, uint32_t sessionId, CSigSharesNodeState::SessionInfo& retInfo); static CSigShare RebuildSigShare(const CSigSharesNodeState::SessionInfo& session, const std::pair& in); - void Cleanup(); void RemoveSigSharesForSession(const uint256& signHash) EXCLUSIVE_LOCKS_REQUIRED(cs); - void RemoveBannedNodeStates(); - void BanNode(NodeId nodeId); - - bool SendMessages(); void CollectSigSharesToRequest(std::unordered_map>& sigSharesToRequest) EXCLUSIVE_LOCKS_REQUIRED(cs); void CollectSigSharesToSend(std::unordered_map>& sigSharesToSend) @@ -488,8 +481,6 @@ class CSigSharesManager : public CRecoveredSigsListener void CollectSigSharesToSendConcentrated(std::unordered_map>& sigSharesToSend, const std::vector& vNodes) EXCLUSIVE_LOCKS_REQUIRED(cs); void CollectSigSharesToAnnounce(std::unordered_map>& sigSharesToAnnounce) EXCLUSIVE_LOCKS_REQUIRED(cs); - void SignPendingSigShares() EXCLUSIVE_LOCKS_REQUIRED(!cs_pendingSigns); - void WorkThreadMain() EXCLUSIVE_LOCKS_REQUIRED(!cs_pendingSigns); }; } // namespace llmq diff --git a/src/masternode/active/context.cpp b/src/masternode/active/context.cpp index c737d8a03d0b..85e68da77353 100644 --- a/src/masternode/active/context.cpp +++ b/src/masternode/active/context.cpp @@ -26,7 +26,7 @@ ActiveContext::ActiveContext(ChainstateManager& chainman, CConnman& connman, CDe cj_server{std::make_unique(chainman, connman, dmnman, dstxman, mn_metaman, mempool, peerman, mn_activeman, mn_sync, *llmq_ctx.isman)}, gov_signer{std::make_unique(connman, dmnman, govman, mn_activeman, chainman, mn_sync)}, - shareman{std::make_unique(connman, chainman.ActiveChainstate(), *llmq_ctx.sigman, peerman, + shareman{std::make_unique(connman, chainman.ActiveChainstate(), *llmq_ctx.sigman, mn_activeman, *llmq_ctx.qman, sporkman)}, ehf_sighandler{ std::make_unique(chainman, mnhfman, *llmq_ctx.sigman, *shareman, *llmq_ctx.qman)}, @@ -48,19 +48,16 @@ ActiveContext::~ActiveContext() void ActiveContext::Interrupt() { - shareman->InterruptWorkerThread(); } void ActiveContext::Start(CConnman& connman, PeerManager& peerman) { m_llmq_ctx.qdkgsman->StartThreads(connman, peerman); shareman->RegisterAsRecoveredSigsListener(); - shareman->StartWorkerThread(); } void ActiveContext::Stop() { - shareman->StopWorkerThread(); shareman->UnregisterAsRecoveredSigsListener(); m_llmq_ctx.qdkgsman->StopThreads(); } diff --git a/src/masternode/active/notificationinterface.cpp b/src/masternode/active/notificationinterface.cpp index 5a71a476cd0c..aee9f1917afc 100644 --- a/src/masternode/active/notificationinterface.cpp +++ b/src/masternode/active/notificationinterface.cpp @@ -6,7 +6,6 @@ #include #include -#include #include #include @@ -27,9 +26,4 @@ void ActiveNotificationInterface::UpdatedBlockTip(const CBlockIndex* pindexNew, m_active_ctx.gov_signer->UpdatedBlockTip(pindexNew); } -void ActiveNotificationInterface::NotifyRecoveredSig(const std::shared_ptr& sig) -{ - m_active_ctx.shareman->NotifyRecoveredSig(sig); -} - std::unique_ptr g_active_notification_interface; diff --git a/src/masternode/active/notificationinterface.h b/src/masternode/active/notificationinterface.h index 4e55bcf21681..8fb716833c66 100644 --- a/src/masternode/active/notificationinterface.h +++ b/src/masternode/active/notificationinterface.h @@ -25,7 +25,6 @@ class ActiveNotificationInterface final : public CValidationInterface protected: // CValidationInterface - void NotifyRecoveredSig(const std::shared_ptr& sig) override; void UpdatedBlockTip(const CBlockIndex* pindexNew, const CBlockIndex* pindexFork, bool fInitialDownload) override; private: diff --git a/src/masternode/payments.cpp b/src/masternode/payments.cpp index 1c5b1d4be107..7ca2fdbb4027 100644 --- a/src/masternode/payments.cpp +++ b/src/masternode/payments.cpp @@ -12,7 +12,7 @@ #include #include #include -#include +#include #include #include