Database System 0.1.0
Advanced C++20 Database System with Multi-Backend Support
Loading...
Searching...
No Matches
database::async::stream_processor Class Reference

Real-time data stream processing. More...

#include <async_operations.h>

Collaboration diagram for database::async::stream_processor:
Collaboration graph

Classes

struct  stream_event
 

Public Types

enum class  stream_type { postgresql_notify , mongodb_change_stream , redis_pubsub , custom }
 

Public Member Functions

 stream_processor (std::shared_ptr< core::database_backend > db)
 
 ~stream_processor ()
 
bool start_stream (stream_type type, const std::string &channel)
 
bool stop_stream (const std::string &channel)
 
void stop_all_streams ()
 
template<concepts::StreamEventHandler< stream_event > Handler>
void register_event_handler (const std::string &channel, Handler &&handler)
 
template<concepts::StreamEventHandler< stream_event > Handler>
void register_global_handler (Handler &&handler)
 
void register_event_handler (const std::string &channel, std::function< void(const stream_event &)> handler)
 
void register_global_handler (std::function< void(const stream_event &)> handler)
 
template<concepts::StreamEventFilter< stream_event > Filter>
void add_event_filter (const std::string &channel, Filter &&filter)
 
void add_event_filter (const std::string &channel, std::function< bool(const stream_event &)> filter)
 

Private Member Functions

void stream_thread (const std::string &channel, stream_type type)
 
void process_event (const stream_event &event)
 

Private Attributes

std::shared_ptr< core::database_backenddb_
 
std::mutex threads_mutex_
 
std::unordered_map< std::string, std::thread > stream_threads_
 
std::mutex handlers_mutex_
 
std::unordered_map< std::string, std::function< void(const stream_event &)> > event_handlers_
 
std::vector< std::function< void(const stream_event &)> > global_handlers_
 
std::unordered_map< std::string, std::function< bool(const stream_event &)> > event_filters_
 
std::atomic< bool > running_ {true}
 

Detailed Description

Real-time data stream processing.

Thread-safety: All public methods are thread-safe. Event handlers are called from dedicated stream threads, so handlers must be thread-safe if they access shared state. The handlers_mutex_ protects all handler registration and filter operations.

Definition at line 563 of file async_operations.h.

Member Enumeration Documentation

◆ stream_type

Constructor & Destructor Documentation

◆ stream_processor()

database::async::stream_processor::stream_processor ( std::shared_ptr< core::database_backend > db)
inline
Examples
/home/runner/work/database_system/database_system/database/async/async_operations.h.

Definition at line 1142 of file async_operations.h.

1144 : db_(std::move(db))
1145 {
1146 }
std::shared_ptr< core::database_backend > db_

◆ ~stream_processor()

database::async::stream_processor::~stream_processor ( )
inline
Examples
/home/runner/work/database_system/database_system/database/async/async_operations.h.

Definition at line 1148 of file async_operations.h.

References stop_all_streams().

Here is the call graph for this function:

Member Function Documentation

◆ add_event_filter() [1/2]

template<concepts::StreamEventFilter< stream_event > Filter>
void database::async::stream_processor::add_event_filter ( const std::string & channel,
Filter && filter )
inline
Examples
/home/runner/work/database_system/database_system/database/async/async_operations.h.

Definition at line 613 of file async_operations.h.

614 {
615 std::lock_guard<std::mutex> lock(handlers_mutex_);
616 event_filters_[channel] = std::forward<Filter>(filter);
617 }
std::unordered_map< std::string, std::function< bool(const stream_event &)> > event_filters_

References event_filters_, and handlers_mutex_.

◆ add_event_filter() [2/2]

void database::async::stream_processor::add_event_filter ( const std::string & channel,
std::function< bool(const stream_event &)> filter )
inline

Definition at line 1216 of file async_operations.h.

1219 {
1220 std::lock_guard<std::mutex> lock(handlers_mutex_);
1221 event_filters_[channel] = std::move(filter);
1222 }

References event_filters_, and handlers_mutex_.

◆ process_event()

void database::async::stream_processor::process_event ( const stream_event & event)
inlineprivate
Examples
/home/runner/work/database_system/database_system/database/async/async_operations.h.

Definition at line 1245 of file async_operations.h.

1246 {
1247 std::lock_guard<std::mutex> lock(handlers_mutex_);
1248
1249 auto filter_it = event_filters_.find(event.channel);
1250 if (filter_it != event_filters_.end()) {
1251 if (!filter_it->second(event)) {
1252 return;
1253 }
1254 }
1255
1256 auto handler_it = event_handlers_.find(event.channel);
1257 if (handler_it != event_handlers_.end()) {
1258 handler_it->second(event);
1259 }
1260
1261 for (const auto& handler : global_handlers_) {
1262 handler(event);
1263 }
1264 }
std::vector< std::function< void(const stream_event &)> > global_handlers_
std::unordered_map< std::string, std::function< void(const stream_event &)> > event_handlers_

References database::async::stream_processor::stream_event::channel, event_filters_, event_handlers_, global_handlers_, and handlers_mutex_.

Referenced by stream_thread().

Here is the caller graph for this function:

◆ register_event_handler() [1/2]

template<concepts::StreamEventHandler< stream_event > Handler>
void database::async::stream_processor::register_event_handler ( const std::string & channel,
Handler && handler )
inline
Examples
/home/runner/work/database_system/database_system/database/async/async_operations.h.

Definition at line 592 of file async_operations.h.

593 {
594 std::lock_guard<std::mutex> lock(handlers_mutex_);
595 event_handlers_[channel] = std::forward<Handler>(handler);
596 }

References event_handlers_, and handlers_mutex_.

◆ register_event_handler() [2/2]

void database::async::stream_processor::register_event_handler ( const std::string & channel,
std::function< void(const stream_event &)> handler )
inline

Definition at line 1201 of file async_operations.h.

1204 {
1205 std::lock_guard<std::mutex> lock(handlers_mutex_);
1206 event_handlers_[channel] = std::move(handler);
1207 }

References event_handlers_, and handlers_mutex_.

◆ register_global_handler() [1/2]

template<concepts::StreamEventHandler< stream_event > Handler>
void database::async::stream_processor::register_global_handler ( Handler && handler)
inline
Examples
/home/runner/work/database_system/database_system/database/async/async_operations.h.

Definition at line 599 of file async_operations.h.

600 {
601 std::lock_guard<std::mutex> lock(handlers_mutex_);
602 global_handlers_.push_back(std::forward<Handler>(handler));
603 }

References global_handlers_, and handlers_mutex_.

◆ register_global_handler() [2/2]

void database::async::stream_processor::register_global_handler ( std::function< void(const stream_event &)> handler)
inline

Definition at line 1209 of file async_operations.h.

1211 {
1212 std::lock_guard<std::mutex> lock(handlers_mutex_);
1213 global_handlers_.push_back(std::move(handler));
1214 }

References global_handlers_, and handlers_mutex_.

◆ start_stream()

bool database::async::stream_processor::start_stream ( stream_type type,
const std::string & channel )
inline
Examples
/home/runner/work/database_system/database_system/database/async/async_operations.h.

Definition at line 1153 of file async_operations.h.

1155 {
1156 std::lock_guard<std::mutex> lock(threads_mutex_);
1157 if (stream_threads_.count(channel) > 0) {
1158 return false;
1159 }
1160 stream_threads_.emplace(
1161 channel,
1162 std::thread(&stream_processor::stream_thread, this, channel, type));
1163 return true;
1164 }
void stream_thread(const std::string &channel, stream_type type)
std::unordered_map< std::string, std::thread > stream_threads_

References stream_thread(), stream_threads_, and threads_mutex_.

Referenced by TEST_F().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ stop_all_streams()

void database::async::stream_processor::stop_all_streams ( )
inline
Examples
/home/runner/work/database_system/database_system/database/async/async_operations.h.

Definition at line 1185 of file async_operations.h.

1186 {
1187 running_.store(false);
1188 std::unordered_map<std::string, std::thread> threads;
1189 {
1190 std::lock_guard<std::mutex> lock(threads_mutex_);
1191 threads.swap(stream_threads_);
1192 }
1193 for (auto& [channel, t] : threads) {
1194 if (t.joinable()) {
1195 t.join();
1196 }
1197 }
1198 running_.store(true);
1199 }

References running_, stream_threads_, and threads_mutex_.

Referenced by ~stream_processor().

Here is the caller graph for this function:

◆ stop_stream()

bool database::async::stream_processor::stop_stream ( const std::string & channel)
inline
Examples
/home/runner/work/database_system/database_system/database/async/async_operations.h.

Definition at line 1166 of file async_operations.h.

1167 {
1168 std::thread t;
1169 {
1170 std::lock_guard<std::mutex> lock(threads_mutex_);
1171 auto it = stream_threads_.find(channel);
1172 if (it == stream_threads_.end()) {
1173 return false;
1174 }
1175 t = std::move(it->second);
1176 stream_threads_.erase(it);
1177 }
1178 // Thread detects its channel removal via map-check and exits
1179 if (t.joinable()) {
1180 t.join();
1181 }
1182 return true;
1183 }

References stream_threads_, and threads_mutex_.

◆ stream_thread()

void database::async::stream_processor::stream_thread ( const std::string & channel,
stream_type type )
inlineprivate
Examples
/home/runner/work/database_system/database_system/database/async/async_operations.h.

Definition at line 1224 of file async_operations.h.

1226 {
1227 stream_event connected_event;
1228 connected_event.type = type;
1229 connected_event.channel = channel;
1230 connected_event.payload = "stream_connected";
1231 connected_event.timestamp = std::chrono::system_clock::now();
1232 process_event(connected_event);
1233
1234 while (running_.load()) {
1235 std::this_thread::sleep_for(std::chrono::milliseconds(10));
1236 {
1237 std::lock_guard<std::mutex> lock(threads_mutex_);
1238 if (stream_threads_.find(channel) == stream_threads_.end()) {
1239 return;
1240 }
1241 }
1242 }
1243 }
void process_event(const stream_event &event)

References database::async::stream_processor::stream_event::channel, database::async::stream_processor::stream_event::payload, process_event(), running_, stream_threads_, threads_mutex_, database::async::stream_processor::stream_event::timestamp, and database::async::stream_processor::stream_event::type.

Referenced by start_stream().

Here is the call graph for this function:
Here is the caller graph for this function:

Member Data Documentation

◆ db_

std::shared_ptr<core::database_backend> database::async::stream_processor::db_
private

◆ event_filters_

std::unordered_map<std::string, std::function<bool(const stream_event&)> > database::async::stream_processor::event_filters_
private

◆ event_handlers_

std::unordered_map<std::string, std::function<void(const stream_event&)> > database::async::stream_processor::event_handlers_
private

◆ global_handlers_

std::vector<std::function<void(const stream_event&)> > database::async::stream_processor::global_handlers_
private

◆ handlers_mutex_

◆ running_

std::atomic<bool> database::async::stream_processor::running_ {true}
private

◆ stream_threads_

std::unordered_map<std::string, std::thread> database::async::stream_processor::stream_threads_
private

◆ threads_mutex_

std::mutex database::async::stream_processor::threads_mutex_
private

The documentation for this class was generated from the following file: