diff --git a/nri-kafka/scripts/pause-resume-bug/Consumer.hs b/nri-kafka/scripts/pause-resume-bug/Consumer.hs index 96a45ad6..02929822 100644 --- a/nri-kafka/scripts/pause-resume-bug/Consumer.hs +++ b/nri-kafka/scripts/pause-resume-bug/Consumer.hs @@ -1,14 +1,14 @@ module Consumer where import Control.Concurrent (forkIO, threadDelay) -import Control.Concurrent.MVar (MVar, newEmptyMVar, newMVar, putMVar, tryTakeMVar, withMVar) -import Control.Monad (void) +import Control.Concurrent.MVar (MVar, newMVar, withMVar) +import Control.Monad (void, when) import qualified Environment import qualified Kafka.Worker as Kafka import Message -import System.Environment (setEnv) -import System.IO (Handle, hPutStrLn, stderr, stdout) -import Prelude (IO, String, show) +import System.Environment (getEnv, setEnv) +import System.IO (Handle, hPutStrLn, stdout) +import Prelude (IO, String, mod, show, fromIntegral, pure) main :: IO () main = do @@ -20,39 +20,26 @@ main = do setEnv "KAFKA_MAX_MSGS_PER_PARTITION_BUFFERED_LOCALLY" "20" setEnv "KAFKA_POLL_BATCH_SIZE" "5" + fireDelay <- readIntEnvVar "FIRE_DELAY" 31 -- seconds + fireModulo <- readIntEnvVar "FIRE_MODULO" 5 -- sleep on every Nth message + settings <- Environment.decode Kafka.decoder doAnythingHandler <- Platform.doAnythingHandler - lastId <- newEmptyMVar lock <- newMVar () let processMsg (msg :: Message) = ( do - let msgId = ("ID(" ++ show (id msg) ++ ")") - prevId <- tryTakeMVar lastId - - case (prevId, id msg) of - (Nothing, _) -> - printAtomic lock stdout (msgId ++ " First message has been received") - (_, 1) -> - printAtomic lock stdout (msgId ++ " Producer has been restarted") - (Just prev, curr) - | prev + 1 == curr -> - -- This is the expected behavior - printAtomic lock stdout (msgId ++ " OK") - (Just prev, curr) -> - -- This is the bug - printAtomic - lock - stderr - ( "ERROR: Expected ID " - ++ show (prev + 1) - ++ " but got " - ++ show curr - ) - - putMVar lastId (id msg) - threadDelay 200000 + let msgId = id msg + let msgIdStr = "ID(" ++ show msgId ++ ")" + when + (msgId `mod` fireModulo == 0) + ( do + printAtomic lock stdout (msgIdStr ++ " Pausing consumer (simulating stuck MySQL)") + threadDelay (fromIntegral fireDelay * 1000000) + ) + printAtomic lock stdout (msgIdStr ++ " Done") + threadDelay 2000 ) |> fmap Ok |> Platform.doAnything doAnythingHandler @@ -66,3 +53,12 @@ printAtomic lock handle msg = do |> withMVar lock |> forkIO |> void + +readIntEnvVar :: String -> Int -> IO Int +readIntEnvVar name defaultVal = do + valueStr <- getEnv name + valueStr + |> Text.fromList + |> Text.toInt + |> Maybe.withDefault defaultVal + |> pure diff --git a/nri-kafka/src/Kafka/Worker/Fetcher.hs b/nri-kafka/src/Kafka/Worker/Fetcher.hs index 273b6b99..1623aceb 100644 --- a/nri-kafka/src/Kafka/Worker/Fetcher.hs +++ b/nri-kafka/src/Kafka/Worker/Fetcher.hs @@ -11,6 +11,7 @@ import qualified Kafka.Worker.Analytics as Analytics import qualified Kafka.Worker.Partition as Partition import qualified Kafka.Worker.Settings as Settings import qualified Prelude +import qualified Data.Either type EnqueueRecord = (ConsumerRecord -> Prelude.IO Partition.SeekCmd) @@ -76,8 +77,12 @@ pollingLoop' -- See https://github.com/confluentinc/librdkafka/blob/c282ba2423b2694052393c8edb0399a5ef471b3f/CHANGELOG.md?plain=1#L90-L95 -- -- We have a small app to reproduce the bug. Check out scripts/pause-resume-bug/README.md - MVar.withMVar consumerLock - <| \_ -> Consumer.pollMessageBatch consumer pollingTimeout pollBatchSize + MVar.withMVar consumerLock <| \_ -> do + Prelude.putStrLn "Polling for messages..." + em <- Consumer.pollMessageBatch consumer pollingTimeout pollBatchSize + let digits = List.filterMap recordContents em |> ByteString.intercalate ", " + Prelude.putStrLn <| "Polling done. Found messages: " ++ Prelude.show digits + Prelude.pure em msgs <- Prelude.traverse handleKafkaError eitherMsgs assignment <- Consumer.assignment consumer @@ -103,6 +108,13 @@ pollingLoop' throttle maxMsgsPerSecondPerPartition maxPollIntervalMs (List.length appendResults) analytics now lastPollTimestamp pollingLoop' settings enqueueRecord analytics consumer consumerLock (pollTimeIsOld now) +recordContents :: Data.Either.Either x ConsumerRecord -> Maybe ByteString.ByteString +recordContents (Data.Either.Left _) = Nothing +recordContents (Data.Either.Right record) = do + val <- Consumer.crValue record + let digits = ByteString.filter (\c -> c >= 48 && c <= 57) val + Just digits + getPartitionKey :: Consumer.ConsumerRecord k v -> (Consumer.TopicName, Consumer.PartitionId) getPartitionKey record = ( Consumer.crTopic record,