Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 27 additions & 31 deletions nri-kafka/scripts/pause-resume-bug/Consumer.hs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
module Consumer where

import Control.Concurrent (forkIO, threadDelay)
import Control.Concurrent.MVar (MVar, newEmptyMVar, newMVar, putMVar, tryTakeMVar, withMVar)
import Control.Monad (void)
import Control.Concurrent.MVar (MVar, newMVar, withMVar)
import Control.Monad (void, when)
import qualified Environment
import qualified Kafka.Worker as Kafka
import Message
import System.Environment (setEnv)
import System.IO (Handle, hPutStrLn, stderr, stdout)
import Prelude (IO, String, show)
import System.Environment (getEnv, setEnv)
import System.IO (Handle, hPutStrLn, stdout)
import Prelude (IO, String, mod, show, fromIntegral, pure)

main :: IO ()
main = do
Expand All @@ -20,39 +20,26 @@ main = do
setEnv "KAFKA_MAX_MSGS_PER_PARTITION_BUFFERED_LOCALLY" "20"
setEnv "KAFKA_POLL_BATCH_SIZE" "5"

fireDelay <- readIntEnvVar "FIRE_DELAY" 31 -- seconds
fireModulo <- readIntEnvVar "FIRE_MODULO" 5 -- sleep on every Nth message

settings <- Environment.decode Kafka.decoder
doAnythingHandler <- Platform.doAnythingHandler
lastId <- newEmptyMVar

lock <- newMVar ()

let processMsg (msg :: Message) =
( do
let msgId = ("ID(" ++ show (id msg) ++ ")")
prevId <- tryTakeMVar lastId

case (prevId, id msg) of
(Nothing, _) ->
printAtomic lock stdout (msgId ++ " First message has been received")
(_, 1) ->
printAtomic lock stdout (msgId ++ " Producer has been restarted")
(Just prev, curr)
| prev + 1 == curr ->
-- This is the expected behavior
printAtomic lock stdout (msgId ++ " OK")
(Just prev, curr) ->
-- This is the bug
printAtomic
lock
stderr
( "ERROR: Expected ID "
++ show (prev + 1)
++ " but got "
++ show curr
)

putMVar lastId (id msg)
threadDelay 200000
let msgId = id msg
let msgIdStr = "ID(" ++ show msgId ++ ")"
when
(msgId `mod` fireModulo == 0)
( do
printAtomic lock stdout (msgIdStr ++ " Pausing consumer (simulating stuck MySQL)")
threadDelay (fromIntegral fireDelay * 1000000)
)
printAtomic lock stdout (msgIdStr ++ " Done")
threadDelay 2000
)
|> fmap Ok
|> Platform.doAnything doAnythingHandler
Expand All @@ -66,3 +53,12 @@ printAtomic lock handle msg = do
|> withMVar lock
|> forkIO
|> void

readIntEnvVar :: String -> Int -> IO Int
readIntEnvVar name defaultVal = do
valueStr <- getEnv name
valueStr
|> Text.fromList
|> Text.toInt
|> Maybe.withDefault defaultVal
|> pure
16 changes: 14 additions & 2 deletions nri-kafka/src/Kafka/Worker/Fetcher.hs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import qualified Kafka.Worker.Analytics as Analytics
import qualified Kafka.Worker.Partition as Partition
import qualified Kafka.Worker.Settings as Settings
import qualified Prelude
import qualified Data.Either

type EnqueueRecord = (ConsumerRecord -> Prelude.IO Partition.SeekCmd)

Expand Down Expand Up @@ -76,8 +77,12 @@ pollingLoop'
-- See https://github.com/confluentinc/librdkafka/blob/c282ba2423b2694052393c8edb0399a5ef471b3f/CHANGELOG.md?plain=1#L90-L95
--
-- We have a small app to reproduce the bug. Check out scripts/pause-resume-bug/README.md
MVar.withMVar consumerLock
<| \_ -> Consumer.pollMessageBatch consumer pollingTimeout pollBatchSize
MVar.withMVar consumerLock <| \_ -> do
Prelude.putStrLn "Polling for messages..."
em <- Consumer.pollMessageBatch consumer pollingTimeout pollBatchSize
let digits = List.filterMap recordContents em |> ByteString.intercalate ", "
Prelude.putStrLn <| "Polling done. Found messages: " ++ Prelude.show digits
Prelude.pure em
msgs <- Prelude.traverse handleKafkaError eitherMsgs
assignment <-
Consumer.assignment consumer
Expand All @@ -103,6 +108,13 @@ pollingLoop'
throttle maxMsgsPerSecondPerPartition maxPollIntervalMs (List.length appendResults) analytics now lastPollTimestamp
pollingLoop' settings enqueueRecord analytics consumer consumerLock (pollTimeIsOld now)

recordContents :: Data.Either.Either x ConsumerRecord -> Maybe ByteString.ByteString
recordContents (Data.Either.Left _) = Nothing
recordContents (Data.Either.Right record) = do
val <- Consumer.crValue record
let digits = ByteString.filter (\c -> c >= 48 && c <= 57) val
Just digits

getPartitionKey :: Consumer.ConsumerRecord k v -> (Consumer.TopicName, Consumer.PartitionId)
getPartitionKey record =
( Consumer.crTopic record,
Expand Down