From e4ff024fa87af9ad63b6a9fc9da8f0beb5afa7c6 Mon Sep 17 00:00:00 2001 From: Austin Horstman Date: Mon, 9 Feb 2026 14:06:27 -0600 Subject: [PATCH] fix(util): bound SafeSignal queue growth under burst load SafeSignal could queue events forever when worker threads emitted faster than the main loop could consume, which risks memory growth and stale updates. I added a queue cap with a drop-oldest policy so growth stays bounded under burst load, plus a regression test that validates bounded delivery. Signed-off-by: Austin Horstman --- include/util/SafeSignal.hpp | 20 ++++++++++++++++++++ test/utils/SafeSignal.cpp | 31 +++++++++++++++++++++++++++++++ 2 files changed, 51 insertions(+) diff --git a/include/util/SafeSignal.hpp b/include/util/SafeSignal.hpp index 340f74ee..1e901792 100644 --- a/include/util/SafeSignal.hpp +++ b/include/util/SafeSignal.hpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include @@ -27,6 +28,12 @@ struct SafeSignal : sigc::signal...)> { public: SafeSignal() { dp_.connect(sigc::mem_fun(*this, &SafeSignal::handle_event)); } + void set_max_queued_events(std::size_t max_queued_events) { + std::unique_lock lock(mutex_); + max_queued_events_ = max_queued_events; + trim_queue_locked(); + } + template void emit(EmitArgs&&... args) { if (main_tid_ == std::this_thread::get_id()) { @@ -41,6 +48,9 @@ struct SafeSignal : sigc::signal...)> { } else { { std::unique_lock lock(mutex_); + if (max_queued_events_ != 0 && queue_.size() >= max_queued_events_) { + queue_.pop(); + } queue_.emplace(std::forward(args)...); } dp_.emit(); @@ -60,6 +70,15 @@ struct SafeSignal : sigc::signal...)> { using signal_t::emit_reverse; using signal_t::make_slot; + void trim_queue_locked() { + if (max_queued_events_ == 0) { + return; + } + while (queue_.size() > max_queued_events_) { + queue_.pop(); + } + } + void handle_event() { for (std::unique_lock lock(mutex_); !queue_.empty(); lock.lock()) { auto args = queue_.front(); @@ -72,6 +91,7 @@ struct SafeSignal : sigc::signal...)> { Glib::Dispatcher dp_; std::mutex mutex_; std::queue queue_; + std::size_t max_queued_events_ = 4096; const std::thread::id main_tid_ = std::this_thread::get_id(); // cache functor for signal emission to avoid recreating it on each event const slot_t cached_fn_ = make_slot(); diff --git a/test/utils/SafeSignal.cpp b/test/utils/SafeSignal.cpp index e7e096b0..1502b7d9 100644 --- a/test/utils/SafeSignal.cpp +++ b/test/utils/SafeSignal.cpp @@ -9,6 +9,7 @@ #endif #include #include +#include #include "fixtures/GlibTestsFixture.hpp" @@ -141,3 +142,33 @@ TEST_CASE_METHOD(GlibTestsFixture, "SafeSignal copy/move counter", "[signal][thr producer.join(); REQUIRE(count == NUM_EVENTS); } + +TEST_CASE_METHOD(GlibTestsFixture, "SafeSignal queue stays bounded under burst load", + "[signal][thread][util][perf]") { + constexpr int NUM_EVENTS = 200; + constexpr std::size_t MAX_QUEUED_EVENTS = 8; + std::vector received; + + SafeSignal test_signal; + test_signal.set_max_queued_events(MAX_QUEUED_EVENTS); + + setTimeout(500); + + test_signal.connect([&](auto value) { received.push_back(value); }); + + run([&]() { + std::thread producer([&]() { + for (int i = 1; i <= NUM_EVENTS; ++i) { + test_signal.emit(i); + } + }); + producer.join(); + + Glib::signal_timeout().connect_once([this]() { this->quit(); }, 50); + }); + + REQUIRE(received.size() <= MAX_QUEUED_EVENTS); + REQUIRE_FALSE(received.empty()); + REQUIRE(received.back() == NUM_EVENTS); + REQUIRE(received.front() == NUM_EVENTS - static_cast(received.size()) + 1); +}