From db521069343d651d514b9ed7168fd1d9f7e81eb1 Mon Sep 17 00:00:00 2001
From: patrick96
Date: Sun, 12 Sep 2021 23:26:24 +0200
Subject: [PATCH] Support receiving IPC messages in multiple parts
---
include/components/controller.hpp | 1 -
include/components/eventloop.hpp | 5 +++--
include/components/ipc.hpp | 10 ++++++++--
src/components/controller.cpp | 9 +++------
src/components/eventloop.cpp | 9 +++++----
src/components/ipc.cpp | 28 +++++++++++++++++++++-------
6 files changed, 40 insertions(+), 22 deletions(-)
diff --git a/include/components/controller.hpp b/include/components/controller.hpp
index 58212c73..81623866 100644
--- a/include/components/controller.hpp
+++ b/include/components/controller.hpp
@@ -57,7 +57,6 @@ class controller : public signal_receiver {
};
struct PipeHandle : public UVHandleGeneric {
- PipeHandle(uv_loop_t* loop, std::function fun);
+ PipeHandle(uv_loop_t* loop, std::function fun, std::function eof_cb);
void start(int fd);
void read_cb(ssize_t nread, const uv_buf_t* buf);
std::function func;
+ std::function eof_cb;
int fd;
};
@@ -114,7 +115,7 @@ class eventloop {
void signal_handler(int signum, std::function fun);
void poll_handler(int events, int fd, std::function fun);
void fs_event_handler(const string& path, std::function fun);
- void pipe_handle(int fd, std::function fun);
+ void pipe_handle(int fd, std::function fun, std::function eof_cb);
void timer_handle(uint64_t timeout, uint64_t repeat, std::function fun);
AsyncHandle_t async_handle(std::function fun);
diff --git a/include/components/ipc.hpp b/include/components/ipc.hpp
index e551d789..27bcdff9 100644
--- a/include/components/ipc.hpp
+++ b/include/components/ipc.hpp
@@ -34,7 +34,8 @@ class ipc {
explicit ipc(signal_emitter& emitter, const logger& logger);
~ipc();
- void receive_message(string buf);
+ void receive_data(string buf);
+ void receive_eof();
int get_file_descriptor() const;
private:
@@ -42,7 +43,12 @@ class ipc {
const logger& m_log;
string m_path{};
- unique_ptr m_fd;
+ int m_fd;
+
+ /**
+ * Buffer for the currently received IPC message.
+ */
+ string m_buffer{};
};
POLYBAR_NS_END
diff --git a/src/components/controller.cpp b/src/components/controller.cpp
index 29fd232c..99211b13 100644
--- a/src/components/controller.cpp
+++ b/src/components/controller.cpp
@@ -195,11 +195,6 @@ void controller::conn_cb(int status, int) {
}
}
-void controller::ipc_cb(string buf) {
- // TODO handle messages sent in multiple parts.
- m_ipc->receive_message(buf);
-}
-
void controller::signal_handler(int signum) {
m_log.notice("Received signal(%d): %s", signum, strsignal(signum));
stop(signum == SIGUSR1);
@@ -264,7 +259,9 @@ void controller::read_events(bool confwatch) {
}
if (m_ipc) {
- eloop->pipe_handle(m_ipc->get_file_descriptor(), [this](const string payload) { ipc_cb(payload); });
+ eloop->pipe_handle(
+ m_ipc->get_file_descriptor(), [this](const string payload) { m_ipc->receive_data(payload); },
+ [this]() { m_ipc->receive_eof(); });
}
if (!m_snapshot_dst.empty()) {
diff --git a/src/components/eventloop.cpp b/src/components/eventloop.cpp
index 6003aa76..57292ee0 100644
--- a/src/components/eventloop.cpp
+++ b/src/components/eventloop.cpp
@@ -73,8 +73,8 @@ void FSEventHandle::start(const string& path) {
// }}}
// PipeHandle {{{
-PipeHandle::PipeHandle(uv_loop_t* loop, std::function fun)
- : UVHandleGeneric([&](ssize_t nread, const uv_buf_t* buf) { read_cb(nread, buf); }), func(fun) {
+PipeHandle::PipeHandle(uv_loop_t* loop, std::function fun, std::function eof_cb)
+ : UVHandleGeneric([&](ssize_t nread, const uv_buf_t* buf) { read_cb(nread, buf); }), func(fun), eof_cb(eof_cb) {
UV(uv_pipe_init, loop, handle, false);
}
@@ -97,6 +97,7 @@ void PipeHandle::read_cb(ssize_t nread, const uv_buf_t* buf) {
log.err("Read error: %s", uv_err_name(nread));
uv_close((uv_handle_t*)handle, nullptr);
} else {
+ eof_cb();
// TODO this causes constant EOFs
start(this->fd);
}
@@ -191,8 +192,8 @@ void eventloop::fs_event_handler(const string& path, std::functionstart(path);
}
-void eventloop::pipe_handle(int fd, std::function fun) {
- m_pipe_handles.emplace_back(std::make_unique(get(), fun));
+void eventloop::pipe_handle(int fd, std::function fun, std::function eof_cb) {
+ m_pipe_handles.emplace_back(std::make_unique(get(), fun, eof_cb));
m_pipe_handles.back()->start(fd);
}
diff --git a/src/components/ipc.cpp b/src/components/ipc.cpp
index 7487a910..d89f305a 100644
--- a/src/components/ipc.cpp
+++ b/src/components/ipc.cpp
@@ -33,8 +33,11 @@ ipc::ipc(signal_emitter& emitter, const logger& logger) : m_sig(emitter), m_log(
throw system_error("Failed to create ipc channel");
}
+ if ((m_fd = open(m_path.c_str(), O_RDONLY | O_NONBLOCK)) == -1) {
+ throw system_error("Failed to open pipe '" + m_path + "'");
+ }
+
m_log.info("Created ipc channel at: %s", m_path);
- m_fd = file_util::make_file_descriptor(m_path, O_RDONLY | O_NONBLOCK, false);
}
/**
@@ -48,12 +51,23 @@ ipc::~ipc() {
}
/**
- * Receive available ipc messages and delegate valid events
+ * Receive parts of an IPC message
*/
-void ipc::receive_message(string buf) {
- m_log.info("Receiving ipc message");
+void ipc::receive_data(string buf) {
+ m_buffer += buf;
+}
- string payload{string_util::trim(std::move(buf), '\n')};
+/**
+ * Called once the end of the message arrives.
+ */
+void ipc::receive_eof() {
+ if (m_buffer.empty()) {
+ return;
+ }
+
+ string payload{string_util::trim(std::move(m_buffer), '\n')};
+
+ m_buffer = std::string();
if (payload.find(ipc_command_prefix) == 0) {
m_sig.emit(signals::ipc::command{payload.substr(strlen(ipc_command_prefix))});
@@ -61,7 +75,7 @@ void ipc::receive_message(string buf) {
m_sig.emit(signals::ipc::hook{payload.substr(strlen(ipc_hook_prefix))});
} else if (payload.find(ipc_action_prefix) == 0) {
m_sig.emit(signals::ipc::action{payload.substr(strlen(ipc_action_prefix))});
- } else if (!payload.empty()) {
+ } else {
m_log.warn("Received unknown ipc message: (payload=%s)", payload);
}
}
@@ -70,7 +84,7 @@ void ipc::receive_message(string buf) {
* Get the file descriptor to the ipc channel
*/
int ipc::get_file_descriptor() const {
- return *m_fd;
+ return m_fd;
}
POLYBAR_NS_END