66 std::cout <<
"\n=== IoT Data Collection Scenario ===" << std::endl;
68 const int num_devices = 5;
69 const int readings_per_device = 20;
70 const int batch_size = 10;
72 std::vector<std::string> device_types = {
"temperature",
"humidity",
"pressure"};
73 std::vector<std::thread> device_threads;
75 std::queue<SensorReading> sensor_queue;
76 std::mutex queue_mutex;
77 std::condition_variable queue_cv;
78 std::atomic<bool> collection_active{
true};
81 std::thread aggregator_thread([&]() {
82 std::vector<SensorReading> batch;
83 batch.reserve(batch_size);
85 while (collection_active || !sensor_queue.empty()) {
86 std::unique_lock<std::mutex> lock(queue_mutex);
87 queue_cv.wait(lock, [&]() {
return !sensor_queue.empty() || !collection_active; });
89 while (!sensor_queue.empty() && batch.size() <
static_cast<size_t>(batch_size)) {
90 batch.push_back(sensor_queue.front());
104 std::random_device rd;
105 for (
int device_id = 0; device_id < num_devices; ++device_id) {
106 device_threads.emplace_back([&, device_id]() {
107 std::mt19937 gen(rd());
108 std::uniform_real_distribution<> temp_dist(18.0, 35.0);
109 std::uniform_real_distribution<> humidity_dist(30.0, 80.0);
110 std::uniform_real_distribution<> pressure_dist(990.0, 1030.0);
112 for (
int reading = 0; reading < readings_per_device; ++reading) {
113 for (
const auto& sensor_type : device_types) {
115 sensor_reading.
device_id =
"device_" + std::to_string(device_id);
117 sensor_reading.
timestamp = std::chrono::system_clock::now();
119 if (sensor_type ==
"temperature") {
120 sensor_reading.
value = temp_dist(gen);
121 }
else if (sensor_type ==
"humidity") {
122 sensor_reading.
value = humidity_dist(gen);
124 sensor_reading.
value = pressure_dist(gen);
128 std::lock_guard<std::mutex> lock(queue_mutex);
129 sensor_queue.push(sensor_reading);
131 queue_cv.notify_one();
134 std::this_thread::sleep_for(std::chrono::milliseconds(5));
140 for (
auto& thread : device_threads) {
144 collection_active =
false;
145 queue_cv.notify_all();
146 aggregator_thread.join();
148 std::cout <<
"IoT simulation completed:" << std::endl;
150 std::cout <<
" Batches sent: " <<
batches_sent_.load() << std::endl;
155 auto container = std::make_shared<value_container>();
156 container->set_source(
"iot_aggregator",
"batch_processor");
157 container->set_target(
"iot_analytics_service",
"data_processor");
158 container->set_message_type(
"sensor_data_batch");
160 container->set(
"batch_size",
static_cast<int32_t
>(batch.size()));
161 container->set(
"batch_timestamp",
static_cast<int64_t
>(
162 std::chrono::duration_cast<std::chrono::milliseconds>(
163 std::chrono::system_clock::now().time_since_epoch()).count()));
165 for (
size_t i = 0; i < batch.size(); ++i) {
166 const auto& reading = batch[i];
167 std::string prefix =
"reading_" + std::to_string(i) +
"_";
169 container->set(prefix +
"device_id", reading.device_id);
170 container->set(prefix +
"sensor_type", reading.sensor_type);
171 container->set(prefix +
"value", reading.value);
172 container->set(prefix +
"timestamp",
static_cast<int64_t
>(
173 std::chrono::duration_cast<std::chrono::milliseconds>(
174 reading.timestamp.time_since_epoch()).count()));
177 std::string serialized = container->serialize_string(value_container::serialization_format::binary).value();
178 std::cout <<
" Sent IoT batch: " << batch.size() <<
" readings, "
179 << serialized.size() <<
" bytes" << std::endl;