feat(eventloop): Delayed enqueue

A new worker that will block the queue channel
until the delayed event has been processed.

This is used to limit the amount of X button events
within defined time frame and we can't block
the main X thread.
This commit is contained in:
Michael Carlberg
2016-11-25 21:20:50 +01:00
parent ff9be848c7
commit c2acdff7d4
4 changed files with 289 additions and 110 deletions

View File

@ -6,10 +6,28 @@
POLYBAR_NS
/**
* Construct eventloop instance
*/
eventloop::eventloop(const logger& logger, const config& config) : m_log(logger), m_conf(config) {
m_delayed_time = duration_t{m_conf.get<double>("settings", "eventloop-delayed-time", 25)};
m_swallow_time = duration_t{m_conf.get<double>("settings", "eventloop-swallow-time", 10)};
m_swallow_limit = m_conf.get<size_t>("settings", "eventloop-swallow", 5U);
}
/**
* Deconstruct eventloop
*/
eventloop::~eventloop() noexcept {
m_update_cb = nullptr;
m_unrecognized_input_cb = nullptr;
if (m_delayed_thread.joinable()) {
m_delayed_thread.join();
}
std::lock_guard<std::mutex> guard(m_modulelock, std::adopt_lock);
for (auto&& block : m_modules) {
for (auto&& module : block.second) {
auto module_name = module->name();
@ -23,68 +41,25 @@ eventloop::~eventloop() noexcept {
}
/**
* Enqueue event
* Start module and worker threads
*/
bool eventloop::enqueue(const entry_t& i) {
bool enqueued;
void eventloop::start() {
m_log.info("Starting event loop");
m_running = true;
if (!(enqueued = m_queue.enqueue(i))) {
m_log.warn("Failed to queue event (%d)", i.type);
}
dispatch_modules();
return enqueued;
m_queue_thread = thread(&eventloop::dispatch_queue_worker, this);
m_delayed_thread = thread(&eventloop::dispatch_delayed_worker, this);
}
/**
* Start module threads and wait for events on the queue
*
* @param timeframe Time to wait for subsequent events
* @param limit Maximum amount of subsequent events to swallow within timeframe
* Wait for worker threads to end
*/
void eventloop::run(std::chrono::duration<double, std::milli> timeframe, int limit) {
m_log.info("Starting event loop", timeframe.count(), limit);
m_running = true;
m_log.trace("eventloop: timeframe: %d, limit: %d", timeframe.count(), limit);
start_modules();
while (m_running) {
entry_t evt, next{static_cast<int>(event_type::NONE)};
m_queue.wait_dequeue(evt);
if (!m_running) {
break;
}
if (match_event(evt, event_type::UPDATE)) {
int swallowed = 0;
while (swallowed++ < limit && m_queue.wait_dequeue_timed(next, timeframe)) {
if (match_event(next, event_type::QUIT)) {
evt = next;
break;
} else if (compare_events(evt, next)) {
m_log.trace_x("eventloop: Swallowing event within timeframe");
evt = next;
} else {
break;
}
}
}
forward_event(evt);
if (match_event(next, event_type::NONE)) {
continue;
}
if (compare_events(evt, next)) {
continue;
}
forward_event(next);
void eventloop::wait() {
if (m_queue_thread.joinable()) {
m_queue_thread.join();
}
m_log.trace("eventloop: Loop ended");
}
/**
@ -93,7 +68,47 @@ void eventloop::run(std::chrono::duration<double, std::milli> timeframe, int lim
void eventloop::stop() {
m_log.info("Stopping event loop");
m_running = false;
enqueue({static_cast<int>(event_type::QUIT)});
if (m_delayed_thread.joinable()) {
m_delayed_cond.notify_one();
}
enqueue({static_cast<uint8_t>(event_type::QUIT)});
}
/**
* Enqueue event
*/
bool eventloop::enqueue(const entry_t& entry) {
if (m_queue.enqueue(entry)) {
return true;
}
m_log.warn("Failed to enqueue event (%d)", entry.type);
return false;
}
/**
* Delay enqueue by given time
*/
bool eventloop::enqueue_delayed(const entry_t& entry) {
if (!m_delayed_lock.try_lock()) {
return false;
}
std::unique_lock<std::mutex> guard(m_delayed_lock, std::adopt_lock);
if (m_delayed_entry.type != 0) {
return false;
}
m_delayed_entry = entry;
if (m_queue.enqueue(entry)) {
return true;
}
m_delayed_entry.type = 0;
return false;
}
/**
@ -114,6 +129,8 @@ void eventloop::set_input_db(callback<string>&& cb) {
* Add module to alignment block
*/
void eventloop::add_module(const alignment pos, module_t&& module) {
std::lock_guard<std::mutex> guard(m_modulelock, std::adopt_lock);
auto it = m_modules.lower_bound(pos);
if (it != m_modules.end() && !(m_modules.key_comp()(pos, it->first))) {
@ -128,16 +145,29 @@ void eventloop::add_module(const alignment pos, module_t&& module) {
/**
* Get reference to module map
*/
modulemap_t& eventloop::modules() {
const modulemap_t& eventloop::modules() const {
return m_modules;
}
/**
* Get loaded module count
*/
size_t eventloop::module_count() const {
size_t count{0};
for (auto&& block : m_modules) {
count += block.second.size();
}
return count;
}
/**
* Start module threads
*/
void eventloop::start_modules() {
for (auto&& block : m_modules) {
for (auto&& module : block.second) {
void eventloop::dispatch_modules() {
std::lock_guard<std::mutex> guard(m_modulelock);
for (const auto& block : m_modules) {
for (const auto& module : block.second) {
try {
m_log.info("Starting %s", module->name());
module->start();
@ -148,31 +178,95 @@ void eventloop::start_modules() {
}
}
/**
* Dispatch queue worker thread
*/
void eventloop::dispatch_queue_worker() {
while (m_running) {
entry_t evt, next{static_cast<uint8_t>(event_type::NONE)};
m_queue.wait_dequeue(evt);
if (!m_running) {
break;
}
if (m_delayed_entry.type != 0 && compare_events(evt, m_delayed_entry)) {
m_delayed_cond.notify_one();
}
size_t swallowed{0};
while (swallowed++ < m_swallow_limit && m_queue.wait_dequeue_timed(next, m_swallow_time)) {
if (match_event(next, event_type::QUIT)) {
evt = next;
break;
} else if (!compare_events(evt, next)) {
enqueue(move(next));
break;
}
m_log.trace_x("eventloop: Swallowing event within timeframe");
evt = next;
}
forward_event(evt);
}
m_log.trace("eventloop: Reached end of queue worker thread");
}
/**
* Dispatch delayed worker thread
*/
void eventloop::dispatch_delayed_worker() {
while (true) {
// wait for notification
while (m_running && m_delayed_entry.type != 0) {
std::unique_lock<std::mutex> guard(m_delayed_lock);
m_delayed_cond.wait(guard, [&] { return m_delayed_entry.type != 0 || !m_running; });
}
if (!m_running) {
break;
}
this_thread::sleep_for(m_delayed_time);
m_delayed_entry.type = 0;
}
m_log.trace("eventloop: Reached end of delayed worker thread");
}
/**
* Test if event matches given type
*/
bool eventloop::match_event(entry_t evt, event_type type) {
return static_cast<int>(type) == evt.type;
inline bool eventloop::match_event(entry_t evt, event_type type) {
return static_cast<uint8_t>(type) == evt.type;
}
/**
* Compare given events
*/
bool eventloop::compare_events(entry_t evt, entry_t evt2) {
return evt.type == evt2.type;
inline bool eventloop::compare_events(entry_t evt, entry_t evt2) {
if (evt.type != evt2.type) {
return false;
} else if (match_event(evt, event_type::INPUT)) {
return evt.data[0] == evt2.data[0] && strncmp(evt.data, evt2.data, strlen(evt.data)) == 0;
}
return true;
}
/**
* Forward event to handler based on type
*/
void eventloop::forward_event(entry_t evt) {
if (evt.type == static_cast<int>(event_type::UPDATE)) {
if (evt.type == static_cast<uint8_t>(event_type::UPDATE)) {
on_update();
} else if (evt.type == static_cast<int>(event_type::INPUT)) {
on_input(string{evt.data});
} else if (evt.type == static_cast<int>(event_type::CHECK)) {
} else if (evt.type == static_cast<uint8_t>(event_type::INPUT)) {
on_input(evt.data);
} else if (evt.type == static_cast<uint8_t>(event_type::CHECK)) {
on_check();
} else if (evt.type == static_cast<int>(event_type::QUIT)) {
} else if (evt.type == static_cast<uint8_t>(event_type::QUIT)) {
on_quit();
} else {
m_log.warn("Unknown event type for enqueued event (%d)", evt.type);
@ -230,8 +324,14 @@ void eventloop::on_check() {
return;
}
for (auto&& block : m_modules) {
for (auto&& module : block.second) {
if (!m_modulelock.try_lock()) {
return;
}
std::lock_guard<std::mutex> guard(m_modulelock, std::adopt_lock);
for (const auto& block : m_modules) {
for (const auto& module : block.second) {
if (module->running()) {
return;
}