Limit WS event queue
This commit is contained in:
parent
1e94a118eb
commit
59707f1339
@ -17,7 +17,7 @@ sv::WSServer::WSServer(uint16_t port)
|
|||||||
.onopen([&](crow::websocket::connection& conn) {
|
.onopen([&](crow::websocket::connection& conn) {
|
||||||
CROW_LOG_INFO << "new websocket connection from " << conn.get_remote_ip();
|
CROW_LOG_INFO << "new websocket connection from " << conn.get_remote_ip();
|
||||||
|
|
||||||
std::lock_guard<std::mutex> lock(mtx_);
|
std::unique_lock<std::mutex> lock(mtx_);
|
||||||
conn.set_nodelay();
|
conn.set_nodelay();
|
||||||
|
|
||||||
WSConnId conn_id = utils::AllocNum(id2conn_, last_id_);
|
WSConnId conn_id = utils::AllocNum(id2conn_, last_id_);
|
||||||
@ -26,17 +26,17 @@ sv::WSServer::WSServer(uint16_t port)
|
|||||||
id2conn_[conn_id] = &conn;
|
id2conn_[conn_id] = &conn;
|
||||||
conn.userdata(reinterpret_cast<void*>(static_cast<uintptr_t>(conn_id)));
|
conn.userdata(reinterpret_cast<void*>(static_cast<uintptr_t>(conn_id)));
|
||||||
// push connection event
|
// push connection event
|
||||||
events_.emplace_back(WSE_CONNECTED, conn_id);
|
PushEvent(lock, {WSE_CONNECTED, conn_id});
|
||||||
})
|
})
|
||||||
.onclose([&](crow::websocket::connection& conn, const std::string& reason, uint16_t) {
|
.onclose([&](crow::websocket::connection& conn, const std::string& reason, uint16_t) {
|
||||||
CROW_LOG_INFO << "websocket connection closed: " << reason;
|
CROW_LOG_INFO << "websocket connection closed: " << reason;
|
||||||
|
|
||||||
std::lock_guard<std::mutex> lock(mtx_);
|
std::unique_lock<std::mutex> lock(mtx_);
|
||||||
|
|
||||||
WSConnId conn_id = static_cast<uint32_t>(reinterpret_cast<uintptr_t>(conn.userdata()));
|
WSConnId conn_id = static_cast<uint32_t>(reinterpret_cast<uintptr_t>(conn.userdata()));
|
||||||
|
|
||||||
// push disonnected event
|
// push disonnected event
|
||||||
events_.emplace_back(WSE_DISCONNECTED, conn_id);
|
PushEvent(lock, {WSE_DISCONNECTED, conn_id});
|
||||||
|
|
||||||
id2conn_.erase(conn_id);
|
id2conn_.erase(conn_id);
|
||||||
})
|
})
|
||||||
@ -44,10 +44,10 @@ sv::WSServer::WSServer(uint16_t port)
|
|||||||
if (!is_binary)
|
if (!is_binary)
|
||||||
return; // only accept binary messages here
|
return; // only accept binary messages here
|
||||||
|
|
||||||
std::lock_guard<std::mutex> lock(mtx_);
|
std::unique_lock<std::mutex> lock(mtx_);
|
||||||
|
|
||||||
WSConnId conn_id = static_cast<uint32_t>(reinterpret_cast<uintptr_t>(conn.userdata()));
|
WSConnId conn_id = static_cast<uint32_t>(reinterpret_cast<uintptr_t>(conn.userdata()));
|
||||||
events_.emplace_back(WSE_MESSAGE, conn_id, data);
|
PushEvent(lock, {WSE_MESSAGE, conn_id, data});
|
||||||
});
|
});
|
||||||
|
|
||||||
// CROW_ROUTE(app, "/")
|
// CROW_ROUTE(app, "/")
|
||||||
@ -79,6 +79,7 @@ bool sv::WSServer::PollEvent(WSEvent& out_event)
|
|||||||
|
|
||||||
out_event = std::move(events_.front());
|
out_event = std::move(events_.front());
|
||||||
events_.pop_front();
|
events_.pop_front();
|
||||||
|
not_full_.notify_one();
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -109,3 +110,10 @@ sv::WSServer::~WSServer()
|
|||||||
if (ws_thread_ && ws_thread_->joinable())
|
if (ws_thread_ && ws_thread_->joinable())
|
||||||
ws_thread_->join();
|
ws_thread_->join();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void sv::WSServer::PushEvent(std::unique_lock<std::mutex>& lock, const WSEvent& event)
|
||||||
|
{
|
||||||
|
not_full_.wait(lock, [this] { return events_.size() < MAX_QUEUE_SIZE; });
|
||||||
|
events_.emplace_back(event);
|
||||||
|
|
||||||
|
}
|
||||||
|
|||||||
@ -8,6 +8,7 @@
|
|||||||
#include <unordered_map>
|
#include <unordered_map>
|
||||||
#include <unordered_set>
|
#include <unordered_set>
|
||||||
#include <thread>
|
#include <thread>
|
||||||
|
#include <condition_variable>
|
||||||
|
|
||||||
namespace crow::websocket
|
namespace crow::websocket
|
||||||
{
|
{
|
||||||
@ -55,9 +56,14 @@ public:
|
|||||||
|
|
||||||
~WSServer();
|
~WSServer();
|
||||||
|
|
||||||
|
public:
|
||||||
|
void PushEvent(std::unique_lock<std::mutex>& lock, const WSEvent& event);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
static constexpr size_t MAX_QUEUE_SIZE = 64;
|
||||||
std::deque<WSEvent> events_;
|
std::deque<WSEvent> events_;
|
||||||
std::mutex mtx_;
|
std::mutex mtx_;
|
||||||
|
std::condition_variable not_full_;
|
||||||
|
|
||||||
std::unique_ptr<std::thread> ws_thread_;
|
std::unique_ptr<std::thread> ws_thread_;
|
||||||
void* app_ptr_ = nullptr;
|
void* app_ptr_ = nullptr;
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user