From 59707f1339c794dee47ce8cea007f09c1766f878 Mon Sep 17 00:00:00 2001 From: tovjemam Date: Wed, 14 Jan 2026 18:04:41 +0100 Subject: [PATCH] Limit WS event queue --- src/server/wsserver.cpp | 20 ++++++++++++++------ src/server/wsserver.hpp | 6 ++++++ 2 files changed, 20 insertions(+), 6 deletions(-) diff --git a/src/server/wsserver.cpp b/src/server/wsserver.cpp index b484819..00e263c 100644 --- a/src/server/wsserver.cpp +++ b/src/server/wsserver.cpp @@ -17,7 +17,7 @@ sv::WSServer::WSServer(uint16_t port) .onopen([&](crow::websocket::connection& conn) { CROW_LOG_INFO << "new websocket connection from " << conn.get_remote_ip(); - std::lock_guard lock(mtx_); + std::unique_lock lock(mtx_); conn.set_nodelay(); WSConnId conn_id = utils::AllocNum(id2conn_, last_id_); @@ -26,17 +26,17 @@ sv::WSServer::WSServer(uint16_t port) id2conn_[conn_id] = &conn; conn.userdata(reinterpret_cast(static_cast(conn_id))); // push connection event - events_.emplace_back(WSE_CONNECTED, conn_id); + PushEvent(lock, {WSE_CONNECTED, conn_id}); }) .onclose([&](crow::websocket::connection& conn, const std::string& reason, uint16_t) { CROW_LOG_INFO << "websocket connection closed: " << reason; - std::lock_guard lock(mtx_); + std::unique_lock lock(mtx_); WSConnId conn_id = static_cast(reinterpret_cast(conn.userdata())); // push disonnected event - events_.emplace_back(WSE_DISCONNECTED, conn_id); + PushEvent(lock, {WSE_DISCONNECTED, conn_id}); id2conn_.erase(conn_id); }) @@ -44,10 +44,10 @@ sv::WSServer::WSServer(uint16_t port) if (!is_binary) return; // only accept binary messages here - std::lock_guard lock(mtx_); + std::unique_lock lock(mtx_); WSConnId conn_id = static_cast(reinterpret_cast(conn.userdata())); - events_.emplace_back(WSE_MESSAGE, conn_id, data); + PushEvent(lock, {WSE_MESSAGE, conn_id, data}); }); // CROW_ROUTE(app, "/") @@ -79,6 +79,7 @@ bool sv::WSServer::PollEvent(WSEvent& out_event) out_event = std::move(events_.front()); events_.pop_front(); + not_full_.notify_one(); return true; } @@ -109,3 +110,10 @@ sv::WSServer::~WSServer() if (ws_thread_ && ws_thread_->joinable()) ws_thread_->join(); } + +void sv::WSServer::PushEvent(std::unique_lock& lock, const WSEvent& event) +{ + not_full_.wait(lock, [this] { return events_.size() < MAX_QUEUE_SIZE; }); + events_.emplace_back(event); + +} diff --git a/src/server/wsserver.hpp b/src/server/wsserver.hpp index 78f9aac..cd81d39 100644 --- a/src/server/wsserver.hpp +++ b/src/server/wsserver.hpp @@ -8,6 +8,7 @@ #include #include #include +#include namespace crow::websocket { @@ -55,9 +56,14 @@ public: ~WSServer(); +public: + void PushEvent(std::unique_lock& lock, const WSEvent& event); + private: + static constexpr size_t MAX_QUEUE_SIZE = 64; std::deque events_; std::mutex mtx_; + std::condition_variable not_full_; std::unique_ptr ws_thread_; void* app_ptr_ = nullptr;