From b9c33bda6045c4d41a77199b28ee03bb424277a2 Mon Sep 17 00:00:00 2001 From: Kadena Kadena Date: Sat, 8 Nov 2025 16:48:56 -0500 Subject: [PATCH 1/2] Minimum cut queue size On the singleton graph the diameter is 0, but a cut queue with size 0 is not useful. So we set a minimum of 10. --- src/Chainweb/CutDB.hs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Chainweb/CutDB.hs b/src/Chainweb/CutDB.hs index ed982400d2..e034eb3cad 100644 --- a/src/Chainweb/CutDB.hs +++ b/src/Chainweb/CutDB.hs @@ -187,7 +187,7 @@ makeLenses ''CutDbParams defaultCutDbParams :: ChainwebVersion -> Int -> CutDbParams defaultCutDbParams v ft = CutDbParams { _cutDbParamsInitialCutFile = Nothing - , _cutDbParamsBufferSize = (order g ^ (2 :: Int)) * diameter g + , _cutDbParamsBufferSize = max 10 $ (order g ^ (2 :: Int)) * diameter g , _cutDbParamsLogLevel = Warn , _cutDbParamsTelemetryLevel = Warn , _cutDbParamsFetchTimeout = ft From 27306390aa1cb175b947f839f46fc90a52b3c776 Mon Sep 17 00:00:00 2001 From: Kadena Kadena Date: Sat, 8 Nov 2025 21:02:34 -0500 Subject: [PATCH 2/2] New PQueue implementation which disallows duplicates Disallowing duplicates in the Cut queue makes it slightly more resilient to repeatedly receiving cuts that time out validation. --- src/Chainweb/CutDB.hs | 14 ++-- src/Chainweb/Sync/WebBlockHeaderStore.hs | 4 +- src/Data/PQueue.hs | 80 +++++++++++-------- .../Chainweb/Test/Sync/WebBlockHeaderStore.hs | 4 +- test/unit/Data/Test/PQueue.hs | 54 ++++++------- test/unit/P2P/Test/TaskQueue.hs | 22 ++--- 6 files changed, 93 insertions(+), 85 deletions(-) diff --git a/src/Chainweb/CutDB.hs b/src/Chainweb/CutDB.hs index e034eb3cad..ac4bf7dfbe 100644 --- a/src/Chainweb/CutDB.hs +++ b/src/Chainweb/CutDB.hs @@ -257,7 +257,7 @@ instance Exception CutDbStopped where -- data CutDb tbl = CutDb { _cutDbCut :: !(TVar Cut) - , _cutDbQueue :: !(PQueue (Down CutHashes)) + , _cutDbQueue :: !(PQueue CutHashes) , _cutDbAsync :: !(Async ()) , _cutDbLogFunction :: !LogFunction , _cutDbHeaderStore :: !WebBlockHeaderStore @@ -315,7 +315,7 @@ cut :: Getter (CutDb tbl) (IO Cut) cut = to _cut addCutHashes :: CutDb tbl -> CutHashes -> IO () -addCutHashes db = pQueueInsertLimit (_cutDbQueue db) (_cutDbQueueSize db) . Down +addCutHashes db = pQueueInsert (_cutDbQueue db) -- | An 'STM' version of '_cut'. -- @@ -436,7 +436,7 @@ startCutDb config logfun headerStore payloadStore cutHashesStore = mask_ $ do c <- readTVarIO cutVar logg Info $ T.unlines $ "got initial cut:" : [" " <> block | block <- cutToTextShort c] - queue <- newEmptyPQueue + queue <- newEmptyPQueue _cutHashesWeight _cutHashesId (Just $ _cutDbParamsBufferSize config) cutAsync <- asyncWithUnmask $ \u -> u $ processor queue cutVar logg Debug "CutDB started" return CutDb @@ -456,7 +456,7 @@ startCutDb config logfun headerStore payloadStore cutHashesStore = mask_ $ do wbhdb = _webBlockHeaderStoreCas headerStore v = _chainwebVersion headerStore - processor :: PQueue (Down CutHashes) -> TVar Cut -> IO () + processor :: PQueue CutHashes -> TVar Cut -> IO () processor queue cutVar = runForever logfun "CutDB" $ processCuts config logfun headerStore payloadStore cutHashesStore queue cutVar @@ -548,7 +548,7 @@ processCuts -> WebBlockHeaderStore -> WebBlockPayloadStore tbl -> Casify RocksDbTable CutHashes - -> PQueue (Down CutHashes) + -> PQueue CutHashes -> TVar Cut -> IO () processCuts conf logFun headerStore payloadStore cutHashesStore queue cutVar = do @@ -601,7 +601,7 @@ processCuts conf logFun headerStore payloadStore cutHashesStore queue cutVar = d hdrStore = _webBlockHeaderStoreCas headerStore queueToStream = do - Down a <- liftIO (pQueueRemove queue) + a <- liftIO (pQueueRemove queue) S.yield a queueToStream @@ -814,7 +814,7 @@ cutHashesToBlockHeaderMap conf logfun headerStore payloadStore hs = return $! Left missing origin = _cutOrigin hs - priority = Priority (- int (_cutHashesHeight hs)) + priority = Priority (int (_cutHashesHeight hs)) tryGetBlockHeader hdrs plds localPayload cv@(cid, _) = (Right <$> mapM (getBlockHeader headerStore payloadStore hdrs plds localPayload cid priority origin) cv) diff --git a/src/Chainweb/Sync/WebBlockHeaderStore.hs b/src/Chainweb/Sync/WebBlockHeaderStore.hs index 887e669996..82e62ee43a 100644 --- a/src/Chainweb/Sync/WebBlockHeaderStore.hs +++ b/src/Chainweb/Sync/WebBlockHeaderStore.hs @@ -547,7 +547,7 @@ newWebBlockHeaderStore -> IO WebBlockHeaderStore newWebBlockHeaderStore mgr wdb logfun = do m <- new - queue <- newEmptyPQueue + queue <- newEmptyPQueue _taskPriority _taskId Nothing return $! WebBlockHeaderStore wdb m queue logfun mgr newEmptyWebPayloadStore @@ -569,7 +569,7 @@ newWebPayloadStore -> LogFunction -> IO (WebBlockPayloadStore tbl) newWebPayloadStore mgr pact payloadDb logfun = do - payloadTaskQueue <- newEmptyPQueue + payloadTaskQueue <- newEmptyPQueue _taskPriority _taskId Nothing payloadMemo <- new return $! WebBlockPayloadStore payloadDb payloadMemo payloadTaskQueue logfun mgr pact diff --git a/src/Data/PQueue.hs b/src/Data/PQueue.hs index 43da59d2da..9a9d9f48af 100644 --- a/src/Data/PQueue.hs +++ b/src/Data/PQueue.hs @@ -1,5 +1,6 @@ {-# LANGUAGE BangPatterns #-} -{-# LANGUAGE DeriveGeneric #-} +{-# LANGUAGE ExistentialQuantification #-} +{-# LANGUAGE ScopedTypeVariables #-} -- | -- Module: Data.PQueue @@ -14,19 +15,18 @@ module Data.PQueue ( PQueue , newEmptyPQueue , pQueueInsert -, pQueueInsertLimit , pQueueRemove , pQueueIsEmpty , pQueueSize ) where -import Control.Concurrent.MVar -import Control.Exception (evaluate) +import Control.Concurrent.STM import Control.Monad +import Data.Foldable -import qualified Data.Heap as H - -import GHC.Generics +import Data.Ord +import qualified Data.Map as M +import qualified Data.Set as S import Numeric.Natural @@ -39,43 +39,53 @@ import Numeric.Natural -- items in the queue. An item of low priority my starve in the queue if higher -- priority items are added at a rate at least as high as items are removed. -- -data PQueue a = PQueue !(MVar ()) !(MVar (H.Heap a)) - deriving (Generic) - -newEmptyPQueue :: IO (PQueue a) -newEmptyPQueue = PQueue <$> newEmptyMVar <*> newMVar mempty +data PQueue a = + forall p k. (Ord p, Ord k) => + PQueue (TVar (M.Map (Down p, k) a)) (TVar (S.Set k)) (a -> p) (a -> k) (Maybe Natural) -pQueueInsert :: Ord a => PQueue a -> a -> IO () -pQueueInsert (PQueue s q) t = modifyMVarMasked_ q $ \h -> do - h' <- evaluate $ H.insert t h - void $ tryPutMVar s () - return h' +newEmptyPQueue :: (Ord p, Ord k) => (a -> p) -> (a -> k) -> Maybe Natural -> IO (PQueue a) +newEmptyPQueue getPrio getKey maybeMaxLen = PQueue + <$> newTVarIO mempty + <*> newTVarIO mempty + <*> pure getPrio + <*> pure getKey + <*> pure maybeMaxLen -pQueueInsertLimit :: Ord a => PQueue a -> Natural -> a -> IO () -pQueueInsertLimit (PQueue s q) l t = modifyMVarMasked_ q $ \h -> do - h' <- evaluate $ H.insert t h - void $ tryPutMVar s () - return $! if H.size h > 2 * fromIntegral l - then H.take (fromIntegral l) h' - else h' +pQueueInsert :: PQueue a -> a -> IO () +pQueueInsert (PQueue mv sv getPrio getKey maybeMaxLen) a = + atomically $ do + s <- readTVar sv + m <- readTVar mv + let k = getKey a + if S.member k s + then return () + else do + let s' = S.insert k s + let m' = M.insert (Down $ getPrio a, k) a m + let fixup (maxlen :: Natural) = if M.size m' > fromIntegral (2 * maxlen) + then let (keep, dontkeep) = M.splitAt (fromIntegral maxlen) m' + in (foldl' (flip (S.delete . snd)) s' (M.keys dontkeep), keep) + else (s', m') + let (s'', m'') = maybe (s', m') fixup maybeMaxLen + writeTVar mv $! m'' + writeTVar sv $! s'' pQueueIsEmpty :: PQueue a -> IO Bool -pQueueIsEmpty (PQueue _ q) = H.null <$!> readMVar q +pQueueIsEmpty (PQueue mv _ _ _ _) = M.null <$!> readTVarIO mv pQueueSize :: PQueue a -> IO Natural -pQueueSize (PQueue _ q) = fromIntegral . H.size <$!> readMVar q +pQueueSize (PQueue mv _ _ _ _) = fromIntegral . M.size <$!> readTVarIO mv -- | If the queue is empty it blocks and races for new items -- pQueueRemove :: PQueue a -> IO a -pQueueRemove (PQueue s q) = run +pQueueRemove (PQueue mv sv _getPrio _getKey _) = atomically run where run = do - r <- modifyMVarMasked q $ \h -> case H.uncons h of - Nothing -> return (h, Nothing) - Just (!a, !b) -> do - when (H.null b) $ void $ tryTakeMVar s - return (b, Just a) - case r of - Nothing -> takeMVar s >> run - (Just !x) -> return x + m <- readTVar mv + case M.minViewWithKey m of + Nothing -> retry + Just (((_, k), a), m') -> do + writeTVar mv $! m' + modifyTVar' sv (S.delete k) + return a diff --git a/test/unit/Chainweb/Test/Sync/WebBlockHeaderStore.hs b/test/unit/Chainweb/Test/Sync/WebBlockHeaderStore.hs index 653a6b7b09..9815872a93 100644 --- a/test/unit/Chainweb/Test/Sync/WebBlockHeaderStore.hs +++ b/test/unit/Chainweb/Test/Sync/WebBlockHeaderStore.hs @@ -130,7 +130,7 @@ testQueueServer limit q = forM_ [0..] $ session_ limit q (\_ _ -> return ()) withNoopQueueServer :: (PQueue (Task env a) -> IO b) -> IO b withNoopQueueServer a = do - q <- newEmptyPQueue + q <- newEmptyPQueue _taskPriority _taskId Nothing let failTask = do task <- pQueueRemove q putIVar (_taskResult task) $ Left $ [] @@ -138,7 +138,7 @@ withNoopQueueServer a = do startNoopQueueServer :: IO (Async (), PQueue (Task env a)) startNoopQueueServer = do - q <- newEmptyPQueue + q <- newEmptyPQueue _taskPriority _taskId Nothing let failTask = do task <- pQueueRemove q putIVar (_taskResult task) $ Left $ [] diff --git a/test/unit/Data/Test/PQueue.hs b/test/unit/Data/Test/PQueue.hs index d6598fdde4..6e5d8af580 100644 --- a/test/unit/Data/Test/PQueue.hs +++ b/test/unit/Data/Test/PQueue.hs @@ -1,3 +1,5 @@ +{-# language ScopedTypeVariables #-} + -- | -- Module: Data.Test.PQueue -- Copyright: Copyright © 2018 - 2020 Kadena LLC. @@ -15,7 +17,6 @@ module Data.Test.PQueue , prop_insert_remove_null , prop_insert_remove_null_concurrent , prop_insert_remove_sort -, prop_insert_remove_sorted_concurrent , prop_insert_remove_concurrent ) where @@ -25,6 +26,7 @@ import Control.Monad.IO.Class import Data.Foldable import qualified Data.List as L import Data.Maybe +import Data.Ord import Test.QuickCheck import Test.QuickCheck.Monadic @@ -43,7 +45,6 @@ properties = , ("equal number of inserts and remove result in empty queue" , property $ prop_insert_remove_null) , ("equal number of concurrent inserts and remove result in empty queue" , property $ prop_insert_remove_null_concurrent) , ("inserting and removeing a list sorts the list", property $ prop_insert_remove_sort) - , ("concurrently inserting and removing a sorted list yields the original list" , property $ prop_insert_remove_sorted_concurrent) , ("concurrently inserting and removing a list yields the items of original list" , property $ prop_insert_remove_concurrent) ] @@ -52,7 +53,7 @@ properties = prop_empty :: Property prop_empty = once $ monadicIO $ do - q <- run newEmptyPQueue + q :: PQueue Int <- run $ newEmptyPQueue id id Nothing x <- run (pQueueIsEmpty q) assert x s <- run (pQueueSize q) @@ -60,56 +61,53 @@ prop_empty = once $ monadicIO $ do prop_insert :: [Int] -> Property prop_insert l = monadicIO $ do + let l' = zip [0 :: Int .. ] l s <- run $ do - q <- newEmptyPQueue - traverse_ (pQueueInsert q) l + q <- newEmptyPQueue snd fst Nothing + traverse_ (pQueueInsert q) l' pQueueSize q assert $ s == fromIntegral (length l) prop_insert_remove_null :: [Int] -> Property prop_insert_remove_null l = monadicIO $ do - q <- run newEmptyPQueue + let l' = zip [0 :: Int .. ] l + q <- run $ newEmptyPQueue snd fst Nothing s <- run $ do - traverse_ (pQueueInsert q) l - traverse_ (const $ pQueueRemove q) l + traverse_ (pQueueInsert q) l' + traverse_ (const $ pQueueRemove q) l' pQueueSize q assert $ s == 0 assert =<< run (pQueueIsEmpty q) prop_insert_remove_null_concurrent :: [Int] -> Property prop_insert_remove_null_concurrent l = monadicIO $ do - q <- run newEmptyPQueue + let l' = zip [0 :: Int .. ] l + q <- run $ newEmptyPQueue snd fst Nothing run $ concurrently_ - (traverse_ (pQueueInsert q) l) - (traverse_ (const $ pQueueRemove q) l) + (traverse_ (pQueueInsert q) l') + (traverse_ (const $ pQueueRemove q) l') s <- run $ pQueueSize q assert $ s == 0 assert =<< run (pQueueIsEmpty q) prop_insert_remove_sort :: [Int] -> Property prop_insert_remove_sort l = monadicIO $ do - q <- run newEmptyPQueue - l' <- run $ do - traverse_ (pQueueInsert q) l - traverse (const $ pQueueRemove q) l - assert $ L.sort l == l' - -prop_insert_remove_sorted_concurrent :: SortedList Int -> Property -prop_insert_remove_sorted_concurrent (Sorted l) = monadicIO $ do - q <- run newEmptyPQueue - l' <- run $ snd <$> concurrently - (traverse_ (pQueueInsert q) l) - (traverse (const $ pQueueRemove q) l) - assert $ l == l' + let l' = zip [0 :: Int .. ] l + q <- run $ newEmptyPQueue snd fst Nothing + l'' <- run $ do + traverse_ (pQueueInsert q) l' + traverse (const $ pQueueRemove q) l' + return $ L.sortOn (Down . snd) l'' === l'' prop_insert_remove_concurrent :: [Int] -> Property prop_insert_remove_concurrent l = monadicIO $ do - q <- run newEmptyPQueue + let l' = zip [0 :: Int .. ] l + q <- run $ newEmptyPQueue snd fst Nothing commands <- pick $ shuffle - $ (QueueInsert <$> l) ++ (QueueRemove <$ l) - l' <- run $ catMaybes + $ (QueueInsert <$> l') ++ (QueueRemove <$ l') + l'' <- run $ catMaybes <$> mapConcurrently (runQueueCommand q) commands - assert $ L.sort l == L.sort l' + return $ l' === L.sortOn fst l'' -- -------------------------------------------------------------------------- -- -- Utils diff --git a/test/unit/P2P/Test/TaskQueue.hs b/test/unit/P2P/Test/TaskQueue.hs index 024b2b39cf..1d30a64a46 100644 --- a/test/unit/P2P/Test/TaskQueue.hs +++ b/test/unit/P2P/Test/TaskQueue.hs @@ -55,8 +55,8 @@ testRunner limit q = forM_ [0..] $ session_ limit q (\_ _ -> yield) test1 :: Int -> IO Bool test1 n = do tasks <- forM [0..n] $ \i -> - newTask (TaskId $ sshow i) (Priority i) $ \_ -> return @_ @Int - q <- newEmptyPQueue + newTask (TaskId $ sshow i) (Priority (0 - i)) $ \_ -> return @_ @Int + q <- newEmptyPQueue _taskPriority _taskId Nothing withAsync (testRunner 3 q) $ \_ -> do traverse_ (pQueueInsert q) tasks results <- traverse awaitTask tasks @@ -66,11 +66,11 @@ test2a :: (Positive Int) -> IO Bool test2a (Positive n_) = do tasks <- forM [10..n+10] $ \i -> newTask (TaskId $ sshow i) (Priority (n - i)) $ \_ -> return @_ @Int - q <- newEmptyPQueue + q <- newEmptyPQueue _taskPriority _taskId Nothing withAsync (testRunner 3 q) $ \_ -> do traverse_ (pQueueInsert q) tasks results <- traverse awaitTask tasks - return $ results /= [10..n+10] + return $ results /= [n+10, n+9..10] where n = n_ + 10 @@ -78,7 +78,7 @@ test2b :: (Positive Int) -> IO Bool test2b (Positive n_) = do tasks <- forM [0..n] $ \i -> newTask (TaskId $ sshow i) (Priority (n - i)) $ \_ -> return @_ @Int - q <- newEmptyPQueue + q <- newEmptyPQueue _taskPriority _taskId Nothing withAsync (testRunner 3 q) $ \_ -> do traverse_ (pQueueInsert q) tasks results <- traverse awaitTask tasks @@ -90,21 +90,21 @@ test3 :: Int -> IO Bool test3 n = do tasks <- forM [0..n] $ \i -> newTask (TaskId $ sshow i) (Priority i) $ \_ -> return @_ @Int - q <- newEmptyPQueue + q <- newEmptyPQueue _taskPriority _taskId Nothing traverse_ (pQueueInsert q) tasks withAsync (testRunner 3 q) $ \_ -> do results <- traverse awaitTask tasks - return $ results == [0..n] + return $ results == [n, n-1..0] test4 :: Int -> IO Bool test4 n = do tasks <- forM [0..n] $ \i -> newTask (TaskId $ sshow i) (Priority (n - i)) $ \_ -> return @_ @Int - q <- newEmptyPQueue + q <- newEmptyPQueue _taskPriority _taskId Nothing traverse_ (pQueueInsert q) tasks withAsync (testRunner 3 q) $ \_ -> do results <- traverse awaitTask tasks - return $ results == reverse [0..n] + return $ results == [0..n] test5 :: Positive Int -> Positive Int -> Positive Int -> IO Bool test5 (Positive n) (Positive m) (Positive a) @@ -117,11 +117,11 @@ test5 (Positive n) (Positive m) (Positive a) newTask (TaskId $ sshow i) (Priority i) $ \_ e -> if | e `mod` m == 0 -> return e | otherwise -> throwM $ TestRunnerException e - q <- newEmptyPQueue + q <- newEmptyPQueue _taskPriority _taskId Nothing traverse_ (pQueueInsert q) tasks withAsync (testRunner (int a) q) $ \_ -> do results <- traverse awaitTask tasks - return $ results == [0,m..(n*m)] + return $ results == [n*m, (n-1)*m..0] properties :: [(String, Property)] properties =