Network System 0.1.1
High-performance modular networking library for scalable client-server applications
Loading...
Searching...
No Matches
unified_session_manager.cpp
Go to the documentation of this file.
1/*****************************************************************************
2BSD 3-Clause License
3
4Copyright (c) 2025, kcenon
5All rights reserved.
6*****************************************************************************/
7
9
10#include <algorithm>
11#include <atomic>
12#include <mutex>
13#include <shared_mutex>
14
15namespace kcenon::network::core {
16
25public:
26 explicit impl(const unified_session_config& config)
31 , total_cleaned_up_(0) {}
32
34
35 // =========================================================================
36 // Connection Acceptance
37 // =========================================================================
38
39 [[nodiscard]] auto can_accept_connection() const -> bool {
40 return session_count_.load(std::memory_order_acquire) < config_.max_sessions;
41 }
42
43 [[nodiscard]] auto is_backpressure_active() const -> bool {
45 return false;
46 }
47 auto count = session_count_.load(std::memory_order_acquire);
48 auto threshold =
50 return count >= threshold;
51 }
52
53 // =========================================================================
54 // Session CRUD
55 // =========================================================================
56
57 auto add_session(session_handle handle, const std::string& session_id) -> bool {
58 if (!can_accept_connection()) {
59 total_rejected_.fetch_add(1, std::memory_order_relaxed);
60 return false;
61 }
62
63 std::unique_lock lock(sessions_mutex_);
64 std::string id = session_id.empty() ? generate_id("session_") : session_id;
65
66 auto [it, inserted] = sessions_.emplace(id, std::move(handle));
67 if (inserted) {
68 session_count_.fetch_add(1, std::memory_order_release);
69 total_accepted_.fetch_add(1, std::memory_order_relaxed);
70 }
71 return inserted;
72 }
73
74 auto add_session_with_id(session_handle handle, const std::string& session_id)
75 -> std::string {
76 if (!can_accept_connection()) {
77 total_rejected_.fetch_add(1, std::memory_order_relaxed);
78 return "";
79 }
80
81 std::unique_lock lock(sessions_mutex_);
82 std::string id = session_id.empty() ? generate_id("session_") : session_id;
83
84 auto [it, inserted] = sessions_.emplace(id, std::move(handle));
85 if (inserted) {
86 session_count_.fetch_add(1, std::memory_order_release);
87 total_accepted_.fetch_add(1, std::memory_order_relaxed);
88 return id;
89 }
90 return "";
91 }
92
93 auto remove_session(const std::string& session_id) -> bool {
94 std::unique_lock lock(sessions_mutex_);
95 auto it = sessions_.find(session_id);
96 if (it != sessions_.end()) {
97 sessions_.erase(it);
98 session_count_.fetch_sub(1, std::memory_order_release);
99 return true;
100 }
101 return false;
102 }
103
104 [[nodiscard]] auto get_session(const std::string& session_id) -> session_handle* {
105 std::shared_lock lock(sessions_mutex_);
106 auto it = sessions_.find(session_id);
107 return (it != sessions_.end()) ? &it->second : nullptr;
108 }
109
110 [[nodiscard]] auto get_session(const std::string& session_id) const
111 -> const session_handle* {
112 std::shared_lock lock(sessions_mutex_);
113 auto it = sessions_.find(session_id);
114 return (it != sessions_.end()) ? &it->second : nullptr;
115 }
116
117 auto with_session(const std::string& session_id,
118 const std::function<void(session_handle&)>& callback) -> bool {
119 std::shared_lock lock(sessions_mutex_);
120 auto it = sessions_.find(session_id);
121 if (it != sessions_.end()) {
122 callback(it->second);
123 return true;
124 }
125 return false;
126 }
127
128 [[nodiscard]] auto has_session(const std::string& session_id) const -> bool {
129 std::shared_lock lock(sessions_mutex_);
130 return sessions_.find(session_id) != sessions_.end();
131 }
132
133 [[nodiscard]] auto get_all_session_ids() const -> std::vector<std::string> {
134 std::shared_lock lock(sessions_mutex_);
135 std::vector<std::string> ids;
136 ids.reserve(sessions_.size());
137 for (const auto& [id, handle] : sessions_) {
138 ids.push_back(id);
139 }
140 return ids;
141 }
142
143 // =========================================================================
144 // Iteration
145 // =========================================================================
146
147 auto for_each_mutable(const std::function<void(session_handle&)>& callback)
148 -> void {
149 std::shared_lock lock(sessions_mutex_);
150 for (auto& [id, handle] : sessions_) {
151 callback(handle);
152 }
153 }
154
156 const std::function<void(const session_handle&)>& callback) const -> void {
157 std::shared_lock lock(sessions_mutex_);
158 for (const auto& [id, handle] : sessions_) {
159 callback(handle);
160 }
161 }
162
163 auto broadcast(std::vector<uint8_t> data) -> size_t {
164 std::shared_lock lock(sessions_mutex_);
165 size_t sent_count = 0;
166
167 for (auto& [id, handle] : sessions_) {
168 if (handle.is_connected()) {
169 std::vector<uint8_t> copy = data;
170 auto result = handle.send(std::move(copy));
171 if (result.is_ok()) {
172 ++sent_count;
173 }
174 }
175 }
176 return sent_count;
177 }
178
179 // =========================================================================
180 // Activity Tracking & Cleanup
181 // =========================================================================
182
183 auto update_activity(const std::string& session_id) -> bool {
184 std::shared_lock lock(sessions_mutex_);
185 auto it = sessions_.find(session_id);
186 if (it != sessions_.end()) {
187 it->second.update_activity();
188 return true;
189 }
190 return false;
191 }
192
193 auto cleanup_idle_sessions() -> size_t {
194 std::vector<std::string> to_remove;
195
196 // Identify idle sessions under read lock
197 {
198 std::shared_lock lock(sessions_mutex_);
199 for (const auto& [id, handle] : sessions_) {
200 if (handle.has_activity_tracking() &&
201 handle.idle_duration() > config_.idle_timeout) {
202 to_remove.push_back(id);
203 }
204 }
205 }
206
207 // Stop and remove idle sessions
208 size_t removed = 0;
209 for (const auto& id : to_remove) {
210 // Stop the session before removal
211 {
212 std::shared_lock lock(sessions_mutex_);
213 auto it = sessions_.find(id);
214 if (it != sessions_.end()) {
215 it->second.stop();
216 }
217 }
218
219 if (remove_session(id)) {
220 ++removed;
221 }
222 }
223
224 if (removed > 0) {
225 total_cleaned_up_.fetch_add(removed, std::memory_order_relaxed);
226 }
227 return removed;
228 }
229
230 // =========================================================================
231 // Lifecycle Management
232 // =========================================================================
233
234 auto clear_all_sessions() -> void {
235 // Stop all sessions first
236 {
237 std::shared_lock lock(sessions_mutex_);
238 for (auto& [id, handle] : sessions_) {
239 handle.stop();
240 }
241 }
242
243 std::unique_lock lock(sessions_mutex_);
244 sessions_.clear();
245 session_count_.store(0, std::memory_order_release);
246 }
247
248 // =========================================================================
249 // Metrics
250 // =========================================================================
251
252 [[nodiscard]] auto get_session_count() const -> size_t {
253 return session_count_.load(std::memory_order_acquire);
254 }
255
256 [[nodiscard]] auto get_total_accepted() const -> uint64_t {
257 return total_accepted_.load(std::memory_order_relaxed);
258 }
259
260 [[nodiscard]] auto get_total_rejected() const -> uint64_t {
261 return total_rejected_.load(std::memory_order_relaxed);
262 }
263
264 [[nodiscard]] auto get_total_cleaned_up() const -> uint64_t {
265 return total_cleaned_up_.load(std::memory_order_relaxed);
266 }
267
268 [[nodiscard]] auto get_utilization() const -> double {
269 if (config_.max_sessions == 0) {
270 return 0.0;
271 }
272 return static_cast<double>(session_count_.load(std::memory_order_acquire)) /
274 }
275
276 auto increment_rejected() -> void {
277 total_rejected_.fetch_add(1, std::memory_order_relaxed);
278 }
279
280 // =========================================================================
281 // Configuration
282 // =========================================================================
283
284 auto set_max_sessions(size_t max_sessions) -> void {
285 config_.max_sessions = max_sessions;
286 }
287
288 [[nodiscard]] auto get_config() const -> const unified_session_config& {
289 return config_;
290 }
291
292 [[nodiscard]] auto get_stats() const -> unified_session_manager::stats {
293 return stats{
294 .active_sessions = session_count_.load(std::memory_order_acquire),
295 .max_sessions = config_.max_sessions,
296 .total_accepted = total_accepted_.load(std::memory_order_relaxed),
297 .total_rejected = total_rejected_.load(std::memory_order_relaxed),
298 .total_cleaned_up = total_cleaned_up_.load(std::memory_order_relaxed),
299 .utilization = get_utilization(),
300 .backpressure_active = is_backpressure_active(),
301 .idle_timeout = config_.idle_timeout};
302 }
303
304 // =========================================================================
305 // ID Generation
306 // =========================================================================
307
308 static auto generate_id(const std::string& prefix) -> std::string {
309 static std::atomic<uint64_t> counter{0};
310 return prefix + std::to_string(counter.fetch_add(1, std::memory_order_relaxed));
311 }
312
313private:
315 mutable std::shared_mutex sessions_mutex_;
316 std::unordered_map<std::string, session_handle> sessions_;
317
318 std::atomic<size_t> session_count_;
319 std::atomic<uint64_t> total_accepted_;
320 std::atomic<uint64_t> total_rejected_;
321 std::atomic<uint64_t> total_cleaned_up_;
322};
323
324// ============================================================================
325// unified_session_manager Implementation (delegating to impl)
326// ============================================================================
327
330
333
335
339
343
345 const std::string& session_id) -> bool {
346 return impl_->add_session(std::move(handle), session_id);
347}
348
349auto unified_session_manager::remove_session(const std::string& session_id) -> bool {
350 return impl_->remove_session(session_id);
351}
352
353auto unified_session_manager::get_session(const std::string& session_id)
354 -> session_handle* {
355 return impl_->get_session(session_id);
356}
357
358auto unified_session_manager::get_session(const std::string& session_id) const
359 -> const session_handle* {
360 return impl_->get_session(session_id);
361}
362
364 const std::string& session_id,
365 const std::function<void(session_handle&)>& callback) -> bool {
366 return impl_->with_session(session_id, callback);
367}
368
369auto unified_session_manager::has_session(const std::string& session_id) const
370 -> bool {
371 return impl_->has_session(session_id);
372}
373
375 -> std::vector<std::string> {
376 return impl_->get_all_session_ids();
377}
378
380 const std::function<void(session_handle&)>& callback) -> void {
381 impl_->for_each_mutable(callback);
382}
383
385 const std::function<void(const session_handle&)>& callback) const -> void {
386 impl_->for_each_const(callback);
387}
388
389auto unified_session_manager::broadcast(std::vector<uint8_t> data) -> size_t {
390 return impl_->broadcast(std::move(data));
391}
392
393auto unified_session_manager::update_activity(const std::string& session_id)
394 -> bool {
395 return impl_->update_activity(session_id);
396}
397
399 return impl_->cleanup_idle_sessions();
400}
401
403 impl_->clear_all_sessions();
404}
405
407 impl_->clear_all_sessions();
408}
409
411 return impl_->get_session_count();
412}
413
415 return impl_->get_total_accepted();
416}
417
419 return impl_->get_total_rejected();
420}
421
423 return impl_->get_total_cleaned_up();
424}
425
427 return impl_->get_utilization();
428}
429
430auto unified_session_manager::set_max_sessions(size_t max_sessions) -> void {
431 impl_->set_max_sessions(max_sessions);
432}
433
435 -> const unified_session_config& {
436 return impl_->get_config();
437}
438
440 return impl_->get_stats();
441}
442
443auto unified_session_manager::generate_id(const std::string& prefix)
444 -> std::string {
445 return impl::generate_id(prefix);
446}
447
449 const std::string& session_id)
450 -> bool {
451 return impl_->add_session(std::move(handle), session_id);
452}
453
455 session_handle handle, const std::string& session_id) -> std::string {
456 return impl_->add_session_with_id(std::move(handle), session_id);
457}
458
460 impl_->increment_rejected();
461}
462
463} // namespace kcenon::network::core
Value-semantic wrapper for type-erased sessions.
PIMPL implementation hiding all template-related code.
auto get_all_session_ids() const -> std::vector< std::string >
std::unordered_map< std::string, session_handle > sessions_
auto has_session(const std::string &session_id) const -> bool
auto remove_session(const std::string &session_id) -> bool
auto add_session(session_handle handle, const std::string &session_id) -> bool
auto update_activity(const std::string &session_id) -> bool
auto add_session_with_id(session_handle handle, const std::string &session_id) -> std::string
auto get_session(const std::string &session_id) const -> const session_handle *
auto with_session(const std::string &session_id, const std::function< void(session_handle &)> &callback) -> bool
auto for_each_const(const std::function< void(const session_handle &)> &callback) const -> void
static auto generate_id(const std::string &prefix) -> std::string
auto broadcast(std::vector< uint8_t > data) -> size_t
auto get_config() const -> const unified_session_config &
auto for_each_mutable(const std::function< void(session_handle &)> &callback) -> void
auto get_stats() const -> unified_session_manager::stats
auto get_session(const std::string &session_id) -> session_handle *
Type-erased session manager that handles any session type.
auto get_total_cleaned_up() const -> uint64_t
Get total cleaned up sessions.
auto is_backpressure_active() const -> bool
Check if backpressure should be applied.
auto get_all_session_ids() const -> std::vector< std::string >
Get all session IDs.
auto add_session_with_id_impl(session_handle handle, const std::string &session_id) -> std::string
auto broadcast(std::vector< uint8_t > data) -> size_t
Broadcast data to all connected sessions.
auto cleanup_idle_sessions() -> size_t
Cleanup idle sessions that exceeded idle_timeout.
auto update_activity(const std::string &session_id) -> bool
Update activity timestamp for a session.
auto with_session(const std::string &session_id, const std::function< void(session_handle &)> &callback) -> bool
Execute a callback with a session (safer than get_session)
auto get_total_accepted() const -> uint64_t
Get total accepted connections.
auto get_stats() const -> stats
Get comprehensive statistics.
auto stop_all_sessions() -> void
Stop all sessions (alias for clear_all_sessions)
auto add_session(std::shared_ptr< SessionType > session, const std::string &session_id="") -> bool
Add a session using type erasure.
auto set_max_sessions(size_t max_sessions) -> void
Set maximum sessions.
auto get_utilization() const -> double
Get current session utilization.
auto has_session(const std::string &session_id) const -> bool
Check if a session exists.
unified_session_manager()
Constructs a unified session manager with default configuration.
auto remove_session(const std::string &session_id) -> bool
Remove session by ID.
auto for_each(const std::function< void(session_handle &)> &callback) -> void
Execute a callback for each session.
auto get_total_rejected() const -> uint64_t
Get total rejected connections.
auto add_session_impl(session_handle handle, const std::string &session_id) -> bool
auto get_session(const std::string &session_id) -> session_handle *
Get session handle by ID (non-owning reference)
auto can_accept_connection() const -> bool
Check if new connection can be accepted.
auto get_session_count() const -> size_t
Get current session count.
auto get_config() const -> const unified_session_config &
Get current configuration.
static auto generate_id(const std::string &prefix="session_") -> std::string
Generate a unique session ID.
tracing_config config
Definition exporters.cpp:29
Configuration for unified session management.