-
Notifications
You must be signed in to change notification settings - Fork 6
Add Concurrency option and MPSC support using farbot. #18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
13f13fe
e6ee502
9173032
46e6306
d1839e5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -10,6 +10,7 @@ | |
| #include <fmt/format.h> | ||
| #endif // RTLOG_USE_FMTLIB | ||
|
|
||
| #include <farbot/fifo.hpp> | ||
| #include <readerwriterqueue.h> | ||
|
|
||
| #ifndef STB_SPRINTF_IMPLEMENTATION | ||
|
|
@@ -47,14 +48,20 @@ enum class Status { | |
| Error_MessageTruncated = 2, | ||
| }; | ||
|
|
||
| enum class QueueConcurrency { | ||
| Single_Producer_Single_Consumer = 0, | ||
| Multi_Producer_Single_Consumer = 1, | ||
| }; | ||
|
|
||
| /** | ||
| * @brief A logger class for logging messages. | ||
| * This class allows you to log messages of type LogData. | ||
| * This type is user defined, and is often the additional data outside the | ||
| * format string you want to log. For instance: The log level, the log region, | ||
| * the file name, the line number, etc. See examples or tests for some ideas. | ||
| * | ||
| * TODO: Currently is built on a single input/single output queue. Do not call | ||
| * NOTE: by default, it is built on a single input/single output queue. You have | ||
| * to specify QueueConcurrency for other types of queues. Otherwise, do not call | ||
| * Log or PrintAndClearLogQueue from multiple threads. | ||
| * | ||
| * @tparam LogData The type of the data to be logged. | ||
|
|
@@ -65,9 +72,16 @@ enum class Status { | |
| * @tparam SequenceNumber This number is incremented when the message is | ||
| * enqueued. It is assumed that your non-realtime logger increments and logs it | ||
| * on Log. | ||
| * @tparam QueueConcurrency The concurrency type of the internal queue. | ||
| * The default Single_Producer_Single_Consumer is for the simplest queue that | ||
| * works in single-producer thread model. | ||
| * Multi_Producer_Single_Consumer is for such an application that needs to | ||
| * handle multiple logging clients. | ||
| */ | ||
| template <typename LogData, size_t MaxNumMessages, size_t MaxMessageLength, | ||
| std::atomic<std::size_t> &SequenceNumber> | ||
| std::atomic<std::size_t> &SequenceNumber, | ||
| QueueConcurrency Concurrency = | ||
| QueueConcurrency::Single_Producer_Single_Consumer> | ||
| class Logger { | ||
| public: | ||
| /* | ||
|
|
@@ -114,7 +128,7 @@ class Logger { | |
|
|
||
| // Even if the message was truncated, we still try to enqueue it to minimize | ||
| // data loss | ||
| const bool dataWasEnqueued = mQueue.try_enqueue(dataToQueue); | ||
| const bool dataWasEnqueued = mQueue->tryEnqueue(std::move(dataToQueue)); | ||
|
|
||
| if (!dataWasEnqueued) | ||
| retVal = Status::Error_QueueFull; | ||
|
|
@@ -210,7 +224,7 @@ class Logger { | |
|
|
||
| // Even if the message was truncated, we still try to enqueue it to minimize | ||
| // data loss | ||
| const bool dataWasEnqueued = mQueue.try_enqueue(dataToQueue); | ||
| const bool dataWasEnqueued = mQueue->tryEnqueue(std::move(dataToQueue)); | ||
|
|
||
| if (!dataWasEnqueued) | ||
| retVal = Status::Error_QueueFull; | ||
|
|
@@ -241,7 +255,7 @@ class Logger { | |
| int numProcessed = 0; | ||
|
|
||
| InternalLogData value; | ||
| while (mQueue.try_dequeue(value)) { | ||
| while (mQueue->tryDequeue(value)) { | ||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My other slight concern is this use of dynamic memory here. While it is perfectly real-time safe, it is also slower (probably) than the one that holds the queue on the stack. Is there any way to have this be template based instead of inheritance based?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Alright. It is probably possible. We would still need those wrapper classes to make code consistent though (yet removing inheritance). |
||
| printLogFn(value.mLogData, value.mSequenceNumber, "%s", | ||
| value.mMessage.data()); | ||
| numProcessed++; | ||
|
|
@@ -257,7 +271,53 @@ class Logger { | |
| std::array<char, MaxMessageLength> mMessage{}; | ||
| }; | ||
|
|
||
| moodycamel::ReaderWriterQueue<InternalLogData> mQueue{MaxNumMessages}; | ||
| class InternalQueue { | ||
| public: | ||
| virtual ~InternalQueue() = default; | ||
| virtual bool tryEnqueue(InternalLogData &&value) = 0; | ||
| virtual bool tryDequeue(InternalLogData &value) = 0; | ||
| }; | ||
| class InternalQueueSPSC : public InternalQueue { | ||
| moodycamel::ReaderWriterQueue<InternalLogData> mQueue{MaxNumMessages}; | ||
|
|
||
| public: | ||
| bool tryEnqueue(InternalLogData &&value) override { | ||
| return mQueue.try_enqueue(std::move(value)); | ||
| } | ||
| bool tryDequeue(InternalLogData &value) override { | ||
| return mQueue.try_dequeue(value); | ||
| } | ||
| }; | ||
| class InternalQueueMPSC : public InternalQueue { | ||
| farbot::fifo<InternalLogData, farbot::fifo_options::concurrency::single, | ||
| farbot::fifo_options::concurrency::multiple, | ||
| farbot::fifo_options::full_empty_failure_mode:: | ||
| return_false_on_full_or_empty, | ||
| farbot::fifo_options::full_empty_failure_mode:: | ||
| overwrite_or_return_default> | ||
| mQueue{MaxNumMessages}; | ||
|
|
||
| public: | ||
| InternalQueueMPSC() { | ||
| static_assert((MaxNumMessages & (MaxNumMessages - 1)) == 0 || | ||
| Concurrency != | ||
| QueueConcurrency::Multi_Producer_Single_Consumer, | ||
| "you have to assign 2^n to MaxNumMessages (farbot backend " | ||
| "restriction)"); | ||
| } | ||
| bool tryEnqueue(InternalLogData &&value) override { | ||
| return mQueue.push(std::move(value)); | ||
| } | ||
| bool tryDequeue(InternalLogData &value) override { | ||
| return mQueue.pop(value); | ||
| } | ||
| }; | ||
|
|
||
| std::unique_ptr<InternalQueue> mQueue{ | ||
| Concurrency == QueueConcurrency::Single_Producer_Single_Consumer | ||
| ? (std::unique_ptr<InternalQueue>) | ||
| std::make_unique<InternalQueueSPSC>() | ||
| : std::make_unique<InternalQueueMPSC>()}; | ||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: perhaps a case statement would be more clear and more extensible for the future
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. agreed! |
||
| }; | ||
|
|
||
| /** | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -7,7 +7,7 @@ namespace rtlog::test { | |
| static std::atomic<std::size_t> gSequenceNumber{0}; | ||
|
|
||
| constexpr auto MAX_LOG_MESSAGE_LENGTH = 256; | ||
| constexpr auto MAX_NUM_LOG_MESSAGES = 100; | ||
| constexpr auto MAX_NUM_LOG_MESSAGES = 128; | ||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a reason this changed?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, same as |
||
|
|
||
| enum class ExampleLogLevel { Debug, Info, Warning, Critical }; | ||
|
|
||
|
|
@@ -82,6 +82,22 @@ TEST(RtlogTest, BasicConstruction) { | |
| EXPECT_EQ(logger.PrintAndClearLogQueue(PrintMessage), 4); | ||
| } | ||
|
|
||
| TEST(RtlogTest, MPSCWorksAsIntended) { | ||
| rtlog::Logger<ExampleLogData, MAX_NUM_LOG_MESSAGES, MAX_LOG_MESSAGE_LENGTH, | ||
| gSequenceNumber, | ||
| rtlog::QueueConcurrency::Multi_Producer_Single_Consumer> | ||
| logger; | ||
| logger.Log({ExampleLogLevel::Debug, ExampleLogRegion::Engine}, | ||
| "Hello, world!"); | ||
| logger.Log({ExampleLogLevel::Info, ExampleLogRegion::Game}, "Hello, world!"); | ||
| logger.Log({ExampleLogLevel::Warning, ExampleLogRegion::Network}, | ||
| "Hello, world!"); | ||
| logger.Log({ExampleLogLevel::Critical, ExampleLogRegion::Audio}, | ||
| "Hello, world!"); | ||
|
|
||
| EXPECT_EQ(logger.PrintAndClearLogQueue(PrintMessage), 4); | ||
| } | ||
|
|
||
| TEST(RtlogTest, VaArgsWorksAsIntended) { | ||
| rtlog::Logger<ExampleLogData, MAX_NUM_LOG_MESSAGES, MAX_LOG_MESSAGE_LENGTH, | ||
| gSequenceNumber> | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
THANK YOU for this contribution and taking the time to write it up.
I have a couple of concerns, which maybe you can help me figure out.
First, I never liked how I downloaded concurrentqueue "secretly" for the users. I think a (maybe better?) approach would be the users providing their own queue, perhaps wrapped in some External Polymorphism package like you've done here.
Now that we have two options (farbot AND concurrentqueue) we are downloading even more dependencies "secretly" for the user. It feels a little dirty, and I don't know what we should do about it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps one solution to this is to pass in the queue that the suer wants, but just provide two back ends that can be hidden behind some optional cmake
RTSAN_BUILD_FARBOT_BACKEND
RTSAN_BUILD_CONCURRENTQUEUE_BACKEND
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm completely spitballing here. I just want to make sure before we extend to 2 backends that we have a plan for when we inevitably need to add a 3rd
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand your concern. Once thing I would point out is that we would probably like to have those options alongside and not exclusive - one would like to have both kinds of loggers (also to avoid possible build config conflicts between sub-projects).
(BTW I didn't replace readerwriterqueue with concurrentqueue, so only farbot version is added. I can add that too if you want, but it's not going to be RT log anymore ;-) )
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm digging into this just a little bit and I remember one of the sticking points that prevented me from doing this in the past.
If you change the args to have a QueueType:
This becomes tricky because QueueType needs to depend on
InternalLogData.(I'm not sure immediately how to re-arrange to get around this problem)