Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
dag_scheduler.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2024, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
7
8#include <algorithm>
9#include <format>
10#include <queue>
11#include <sstream>
12#include <stack>
13#include <thread>
14
15namespace kcenon::thread
16{
17
18dag_scheduler::dag_scheduler(std::shared_ptr<thread_pool> pool, dag_config config)
19 : pool_(std::move(pool))
20 , config_(std::move(config))
21{
22}
23
25{
26 cancel_all();
27
28 // Wait for in-flight callbacks to finish accessing members
29 // (e.g., completion_cv_.notify_all() after mutex unlock)
30 while (active_callbacks_.load(std::memory_order_acquire) > 0)
31 {
32 std::this_thread::yield();
33 }
34}
35
37 : pool_(std::move(other.pool_))
38 , config_(std::move(other.config_))
39 , jobs_(std::move(other.jobs_))
40 , dependencies_(std::move(other.dependencies_))
41 , dependents_(std::move(other.dependents_))
42 , executing_(other.executing_.load())
43 , cancelled_(other.cancelled_.load())
44 , running_count_(other.running_count_.load())
45 , execution_start_time_(other.execution_start_time_)
46 , first_error_(std::move(other.first_error_))
47 , retry_counts_(std::move(other.retry_counts_))
48{
49}
50
52{
53 if (this != &other)
54 {
55 pool_ = std::move(other.pool_);
56 config_ = std::move(other.config_);
57 jobs_ = std::move(other.jobs_);
58 dependencies_ = std::move(other.dependencies_);
59 dependents_ = std::move(other.dependents_);
60 executing_.store(other.executing_.load());
61 cancelled_.store(other.cancelled_.load());
62 running_count_.store(other.running_count_.load());
63 execution_start_time_ = other.execution_start_time_;
64 first_error_ = std::move(other.first_error_);
65 retry_counts_ = std::move(other.retry_counts_);
66 }
67 return *this;
68}
69
70// ============================================
71// Job Management
72// ============================================
73
74auto dag_scheduler::add_job(std::unique_ptr<dag_job> j) -> job_id
75{
76 std::unique_lock lock(mutex_);
77
78 auto id = j->get_dag_id();
79
80 // Add dependencies from the job
81 for (const auto& dep : j->get_dependencies())
82 {
83 dependencies_[id].push_back(dep);
84 dependents_[dep].push_back(id);
85 }
86
87 // Check for cycles if configured
88 if (config_.detect_cycles && detect_cycle())
89 {
90 // Rollback
91 dependencies_.erase(id);
92 for (const auto& dep : j->get_dependencies())
93 {
94 auto& deps = dependents_[dep];
95 deps.erase(std::remove(deps.begin(), deps.end(), id), deps.end());
96 }
97 return INVALID_JOB_ID;
98 }
99
100 jobs_[id] = std::move(j);
101 return id;
102}
103
105{
106 return add_job(builder.build());
107}
108
109auto dag_scheduler::add_dependency(job_id dependent, job_id dependency) -> common::VoidResult
110{
111 std::unique_lock lock(mutex_);
112
113 // Check if jobs exist
114 if (jobs_.find(dependent) == jobs_.end())
115 {
117 "Dependent job not found: " + std::to_string(dependent));
118 }
119 if (jobs_.find(dependency) == jobs_.end())
120 {
122 "Dependency job not found: " + std::to_string(dependency));
123 }
124
125 // Add dependency
126 dependencies_[dependent].push_back(dependency);
127 dependents_[dependency].push_back(dependent);
128
129 // Check for cycles
130 if (config_.detect_cycles && detect_cycle())
131 {
132 // Rollback
133 dependencies_[dependent].pop_back();
134 dependents_[dependency].pop_back();
136 "Adding dependency would create a cycle");
137 }
138
139 // Update job's dependencies
140 jobs_[dependent]->add_dependency(dependency);
141
142 return common::ok();
143}
144
145auto dag_scheduler::remove_job(job_id id) -> common::VoidResult
146{
147 std::unique_lock lock(mutex_);
148
149 auto it = jobs_.find(id);
150 if (it == jobs_.end())
151 {
153 "Job not found: " + std::to_string(id));
154 }
155
156 auto state = it->second->get_state();
157 if (state == dag_job_state::running)
158 {
160 "Cannot remove running job: " + std::to_string(id));
161 }
162
163 // Remove from dependency graphs
164 dependencies_.erase(id);
165 dependents_.erase(id);
166
167 // Remove from other jobs' dependencies
168 for (auto& [job_id, deps] : dependencies_)
169 {
170 deps.erase(std::remove(deps.begin(), deps.end(), id), deps.end());
171 }
172 for (auto& [job_id, deps] : dependents_)
173 {
174 deps.erase(std::remove(deps.begin(), deps.end(), id), deps.end());
175 }
176
177 jobs_.erase(it);
178 return common::ok();
179}
180
181// ============================================
182// Execution Control
183// ============================================
184
185auto dag_scheduler::execute_all() -> std::future<common::VoidResult>
186{
187 auto promise = std::make_shared<std::promise<common::VoidResult>>();
188 auto future = promise->get_future();
189
190 // Start execution in a new thread
191 std::thread([this, promise]() {
192 {
193 std::unique_lock lock(mutex_);
194 if (executing_.exchange(true))
195 {
197 "Execution already in progress"));
198 return;
199 }
200
201 cancelled_.store(false);
202 first_error_.reset();
203 execution_start_time_ = std::chrono::steady_clock::now();
204
205 // Mark all jobs with satisfied dependencies as ready
206 for (auto& [id, job] : jobs_)
207 {
208 if (job->get_state() == dag_job_state::pending)
209 {
210 if (are_dependencies_satisfied(id))
211 {
212 job->set_state(dag_job_state::ready);
213 }
214 }
215 }
216 }
217
218 // Schedule ready jobs
219 schedule_ready_jobs();
220
221 // Wait for completion
222 auto result = wait();
223 executing_.store(false);
224 promise->set_value(result);
225 }).detach();
226
227 return future;
228}
229
230auto dag_scheduler::execute(job_id target) -> std::future<common::VoidResult>
231{
232 auto promise = std::make_shared<std::promise<common::VoidResult>>();
233 auto future = promise->get_future();
234
235 std::thread([this, target, promise]() {
236 {
237 std::unique_lock lock(mutex_);
238
239 auto it = jobs_.find(target);
240 if (it == jobs_.end())
241 {
243 "Target job not found: " + std::to_string(target)));
244 return;
245 }
246
247 if (executing_.exchange(true))
248 {
250 "Execution already in progress"));
251 return;
252 }
253
254 cancelled_.store(false);
255 first_error_.reset();
256 execution_start_time_ = std::chrono::steady_clock::now();
257
258 // Get all dependencies of target (transitively)
259 std::unordered_set<job_id> needed_jobs;
260 std::stack<job_id> to_visit;
261 to_visit.push(target);
262
263 while (!to_visit.empty())
264 {
265 auto current = to_visit.top();
266 to_visit.pop();
267
268 if (needed_jobs.count(current) > 0)
269 {
270 continue;
271 }
272 needed_jobs.insert(current);
273
274 auto deps_it = dependencies_.find(current);
275 if (deps_it != dependencies_.end())
276 {
277 for (const auto& dep : deps_it->second)
278 {
279 to_visit.push(dep);
280 }
281 }
282 }
283
284 // Mark needed jobs as ready if dependencies satisfied
285 for (const auto& id : needed_jobs)
286 {
287 auto job_it = jobs_.find(id);
288 if (job_it != jobs_.end() && job_it->second->get_state() == dag_job_state::pending)
289 {
290 bool deps_in_needed = true;
291 auto deps_it = dependencies_.find(id);
292 if (deps_it != dependencies_.end())
293 {
294 for (const auto& dep : deps_it->second)
295 {
296 if (needed_jobs.count(dep) == 0)
297 {
298 deps_in_needed = false;
299 break;
300 }
301 }
302 }
303
304 if (deps_in_needed && are_dependencies_satisfied(id))
305 {
306 job_it->second->set_state(dag_job_state::ready);
307 }
308 }
309 }
310 }
311
312 schedule_ready_jobs();
313
314 auto result = wait();
315 executing_.store(false);
316 promise->set_value(result);
317 }).detach();
318
319 return future;
320}
321
323{
324 std::unique_lock lock(mutex_);
325 cancelled_.store(true);
326
327 for (auto& [id, job] : jobs_)
328 {
329 auto state = job->get_state();
330 if (state == dag_job_state::pending || state == dag_job_state::ready)
331 {
332 job->set_state(dag_job_state::cancelled);
333 if (config_.state_callback)
334 {
335 config_.state_callback(id, state, dag_job_state::cancelled);
336 }
337 }
338 }
339
340 completion_cv_.notify_all();
341}
342
343auto dag_scheduler::wait() -> common::VoidResult
344{
345 std::shared_lock lock(mutex_);
346
347 completion_cv_.wait(lock, [this]() {
348 return is_execution_complete() || cancelled_.load();
349 });
350
351 if (first_error_)
352 {
353 return common::VoidResult(*first_error_);
354 }
355
356 return common::ok();
357}
358
359auto dag_scheduler::reset() -> common::VoidResult
360{
361 std::unique_lock lock(mutex_);
362
363 if (executing_.load())
364 {
366 "Cannot reset while execution is in progress");
367 }
368
369 jobs_.clear();
370 dependencies_.clear();
371 dependents_.clear();
372 retry_counts_.clear();
373 first_error_.reset();
374 cancelled_.store(false);
375
376 return common::ok();
377}
378
379// ============================================
380// Query
381// ============================================
382
383auto dag_scheduler::get_job_info(job_id id) const -> std::optional<dag_job_info>
384{
385 std::shared_lock lock(mutex_);
386
387 auto it = jobs_.find(id);
388 if (it == jobs_.end())
389 {
390 return std::nullopt;
391 }
392
393 auto info = it->second->get_info();
394
395 // Add dependents from our tracking
396 auto dep_it = dependents_.find(id);
397 if (dep_it != dependents_.end())
398 {
399 info.dependents = dep_it->second;
400 }
401
402 return info;
403}
404
406{
407 std::shared_lock lock(mutex_);
408
409 std::vector<dag_job_info> result;
410 result.reserve(jobs_.size());
411
412 for (const auto& [id, job] : jobs_)
413 {
414 auto info = job->get_info();
415 auto dep_it = dependents_.find(id);
416 if (dep_it != dependents_.end())
417 {
418 info.dependents = dep_it->second;
419 }
420 result.push_back(std::move(info));
421 }
422
423 return result;
424}
425
426auto dag_scheduler::get_jobs_in_state(dag_job_state state) const -> std::vector<dag_job_info>
427{
428 std::shared_lock lock(mutex_);
429
430 std::vector<dag_job_info> result;
431
432 for (const auto& [id, job] : jobs_)
433 {
434 if (job->get_state() == state)
435 {
436 auto info = job->get_info();
437 auto dep_it = dependents_.find(id);
438 if (dep_it != dependents_.end())
439 {
440 info.dependents = dep_it->second;
441 }
442 result.push_back(std::move(info));
443 }
444 }
445
446 return result;
447}
448
450{
451 std::shared_lock lock(mutex_);
452
453 std::vector<job_id> result;
454
455 for (const auto& [id, job] : jobs_)
456 {
457 if (job->get_state() == dag_job_state::ready)
458 {
459 result.push_back(id);
460 }
461 else if (job->get_state() == dag_job_state::pending && are_dependencies_satisfied(id))
462 {
463 result.push_back(id);
464 }
465 }
466
467 return result;
468}
469
470auto dag_scheduler::has_cycles() const -> bool
471{
472 std::shared_lock lock(mutex_);
473 return detect_cycle();
474}
475
477{
478 std::shared_lock lock(mutex_);
479 return topological_sort();
480}
481
482// ============================================
483// Visualization
484// ============================================
485
486auto dag_scheduler::to_dot() const -> std::string
487{
488 std::shared_lock lock(mutex_);
489
490 std::ostringstream ss;
491 ss << "digraph DAG {\n";
492 ss << " rankdir=TB;\n";
493 ss << " node [shape=box, style=filled];\n\n";
494
495 // Nodes
496 for (const auto& [id, job] : jobs_)
497 {
498 auto state = job->get_state();
499 ss << " " << id << " [label=\"" << job->get_name()
500 << "\\n(" << dag_job_state_to_string(state) << ")\""
501 << ", fillcolor=\"" << get_state_color(state) << "\"];\n";
502 }
503
504 ss << "\n";
505
506 // Edges
507 for (const auto& [dependent, deps] : dependencies_)
508 {
509 for (const auto& dependency : deps)
510 {
511 ss << " " << dependency << " -> " << dependent << ";\n";
512 }
513 }
514
515 ss << "}\n";
516
517 return ss.str();
518}
519
520auto dag_scheduler::to_json() const -> std::string
521{
522 std::shared_lock lock(mutex_);
523
524 std::ostringstream ss;
525 ss << "{\n";
526 ss << " \"jobs\": [\n";
527
528 bool first_job = true;
529 for (const auto& [id, job] : jobs_)
530 {
531 if (!first_job) ss << ",\n";
532 first_job = false;
533
534 auto info = job->get_info();
535 ss << " {\n";
536 ss << " \"id\": " << id << ",\n";
537 ss << " \"name\": \"" << info.name << "\",\n";
538 ss << " \"state\": \"" << dag_job_state_to_string(info.state) << "\",\n";
539 ss << " \"dependencies\": [";
540
541 bool first_dep = true;
542 for (const auto& dep : info.dependencies)
543 {
544 if (!first_dep) ss << ", ";
545 first_dep = false;
546 ss << dep;
547 }
548 ss << "],\n";
549
550 ss << " \"dependents\": [";
551 auto dep_it = dependents_.find(id);
552 if (dep_it != dependents_.end())
553 {
554 first_dep = true;
555 for (const auto& dep : dep_it->second)
556 {
557 if (!first_dep) ss << ", ";
558 first_dep = false;
559 ss << dep;
560 }
561 }
562 ss << "],\n";
563
564 auto wait_ms = info.get_wait_time().count();
565 auto exec_ms = info.get_execution_time().count();
566 ss << " \"wait_time_ms\": " << wait_ms << ",\n";
567 ss << " \"execution_time_ms\": " << exec_ms;
568
569 if (info.error_message)
570 {
571 ss << ",\n \"error\": \"" << *info.error_message << "\"";
572 }
573
574 ss << "\n }";
575 }
576
577 ss << "\n ],\n";
578
579 // Add statistics
580 auto stats = get_stats();
581 ss << " \"stats\": {\n";
582 ss << " \"total_jobs\": " << stats.total_jobs << ",\n";
583 ss << " \"completed_jobs\": " << stats.completed_jobs << ",\n";
584 ss << " \"failed_jobs\": " << stats.failed_jobs << ",\n";
585 ss << " \"pending_jobs\": " << stats.pending_jobs << ",\n";
586 ss << " \"running_jobs\": " << stats.running_jobs << ",\n";
587 ss << " \"skipped_jobs\": " << stats.skipped_jobs << ",\n";
588 ss << " \"cancelled_jobs\": " << stats.cancelled_jobs << "\n";
589 ss << " }\n";
590
591 ss << "}\n";
592
593 return ss.str();
594}
595
596// ============================================
597// Statistics
598// ============================================
599
601{
602 // Note: This is called from to_json which already holds the lock
603 // For standalone calls, we need to be careful about deadlock
604
605 dag_stats stats;
606 stats.total_jobs = jobs_.size();
607
608 for (const auto& [id, job] : jobs_)
609 {
610 switch (job->get_state())
611 {
612 case dag_job_state::completed: ++stats.completed_jobs; break;
613 case dag_job_state::failed: ++stats.failed_jobs; break;
615 case dag_job_state::ready: ++stats.pending_jobs; break;
616 case dag_job_state::running: ++stats.running_jobs; break;
617 case dag_job_state::skipped: ++stats.skipped_jobs; break;
618 case dag_job_state::cancelled: ++stats.cancelled_jobs; break;
619 }
620 }
621
622 if (executing_.load())
623 {
624 stats.total_execution_time = std::chrono::duration_cast<std::chrono::milliseconds>(
625 std::chrono::steady_clock::now() - execution_start_time_);
626 }
627
628 return stats;
629}
630
631// ============================================
632// Internal Methods
633// ============================================
634
636{
637 ++active_callbacks_;
638 std::unique_lock lock(mutex_);
639
640 auto it = jobs_.find(id);
641 if (it == jobs_.end())
642 {
643 return;
644 }
645
646 auto old_state = it->second->get_state();
647 it->second->set_state(dag_job_state::completed);
648 it->second->record_end_time();
649 --running_count_;
650
651 if (config_.state_callback)
652 {
653 config_.state_callback(id, old_state, dag_job_state::completed);
654 }
655
656 if (config_.completion_callback)
657 {
658 config_.completion_callback(id);
659 }
660
661 // Check and update dependents
662 auto dep_it = dependents_.find(id);
663 if (dep_it != dependents_.end())
664 {
665 for (const auto& dependent : dep_it->second)
666 {
667 auto job_it = jobs_.find(dependent);
668 if (job_it != jobs_.end() && job_it->second->get_state() == dag_job_state::pending)
669 {
670 if (are_dependencies_satisfied(dependent))
671 {
672 job_it->second->set_state(dag_job_state::ready);
673 if (config_.state_callback)
674 {
675 config_.state_callback(dependent, dag_job_state::pending, dag_job_state::ready);
676 }
677 }
678 }
679 }
680 }
681
682 lock.unlock();
683
684 // Schedule more jobs
685 schedule_ready_jobs();
686
687 // Notify waiters
688 completion_cv_.notify_all();
689 --active_callbacks_;
690}
691
692auto dag_scheduler::on_job_failed(job_id id, const std::string& error) -> void
693{
694 ++active_callbacks_;
695 std::unique_lock lock(mutex_);
696
697 auto it = jobs_.find(id);
698 if (it == jobs_.end())
699 {
700 return;
701 }
702
703 it->second->set_error_message(error);
704 it->second->record_end_time();
705
706 // Handle based on failure policy
707 switch (config_.failure_policy)
708 {
710 {
711 auto& retry_count = retry_counts_[id];
712 if (retry_count < config_.max_retries)
713 {
714 ++retry_count;
715 // Reset state to ready for retry
716 it->second->set_state(dag_job_state::ready);
717 --running_count_;
718
719 if (config_.state_callback)
720 {
721 config_.state_callback(id, dag_job_state::running, dag_job_state::ready);
722 }
723
724 lock.unlock();
725
726 // Wait before retry
727 std::this_thread::sleep_for(config_.retry_delay);
728
729 schedule_ready_jobs();
730 return;
731 }
732 // Fall through to fail_fast if max retries exceeded
733 [[fallthrough]];
734 }
735
737 {
738 if (config_.failure_policy == dag_failure_policy::fallback && it->second->has_fallback())
739 {
740 // Try fallback
741 auto fallback = it->second->get_fallback();
742 auto result = fallback();
743 if (result.is_ok())
744 {
745 // Fallback succeeded
746 it->second->set_state(dag_job_state::completed);
747 --running_count_;
748 if (config_.state_callback)
749 {
750 config_.state_callback(id, dag_job_state::running, dag_job_state::completed);
751 }
752 lock.unlock();
753 schedule_ready_jobs();
754 completion_cv_.notify_all();
755 --active_callbacks_;
756 return;
757 }
758 }
759 [[fallthrough]];
760 }
761
763 {
764 auto old_state = it->second->get_state();
765 it->second->set_state(dag_job_state::failed);
766 --running_count_;
767
768 if (!first_error_)
769 {
771 }
772
773 if (config_.state_callback)
774 {
775 config_.state_callback(id, old_state, dag_job_state::failed);
776 }
777
778 if (config_.error_callback)
779 {
780 config_.error_callback(id, error);
781 }
782
783 // Cancel all dependents
784 cancel_dependents(id);
785 break;
786 }
787
789 {
790 auto old_state = it->second->get_state();
791 it->second->set_state(dag_job_state::failed);
792 --running_count_;
793
794 if (!first_error_)
795 {
797 }
798
799 if (config_.state_callback)
800 {
801 config_.state_callback(id, old_state, dag_job_state::failed);
802 }
803
804 if (config_.error_callback)
805 {
806 config_.error_callback(id, error);
807 }
808
809 // Skip dependents but continue others
810 skip_dependents(id);
811 break;
812 }
813 }
814
815 lock.unlock();
816 schedule_ready_jobs();
817 completion_cv_.notify_all();
818 --active_callbacks_;
819}
820
822{
823 if (cancelled_.load())
824 {
825 return;
826 }
827
828 std::vector<job_id> ready_jobs;
829
830 {
831 std::shared_lock lock(mutex_);
832 for (const auto& [id, job] : jobs_)
833 {
834 if (job->get_state() == dag_job_state::ready)
835 {
836 ready_jobs.push_back(id);
837 }
838 }
839 }
840
841 for (const auto& id : ready_jobs)
842 {
843 if (cancelled_.load())
844 {
845 break;
846 }
847
848 if (config_.execute_in_parallel)
849 {
850 execute_job(id);
851 }
852 else
853 {
854 // Execute sequentially
855 execute_job(id);
856 // Wait for completion before next job
857 std::shared_lock lock(mutex_);
858 completion_cv_.wait(lock, [this, id]() {
859 auto it = jobs_.find(id);
860 if (it == jobs_.end()) return true;
861 auto state = it->second->get_state();
862 return state != dag_job_state::running;
863 });
864 }
865 }
866}
867
869{
870 std::unique_lock lock(mutex_);
871
872 auto it = jobs_.find(id);
873 if (it == jobs_.end())
874 {
875 return;
876 }
877
878 if (!it->second->try_transition_state(dag_job_state::ready, dag_job_state::running))
879 {
880 return; // Already running or in terminal state
881 }
882
883 it->second->record_start_time();
884 ++running_count_;
885
886 if (config_.state_callback)
887 {
888 config_.state_callback(id, dag_job_state::ready, dag_job_state::running);
889 }
890
891 lock.unlock();
892
893 // Create a callback job to execute the dag_job
894 auto wrapper_job = std::make_unique<callback_job>(
895 [this, id]() -> common::VoidResult {
896 std::shared_lock slock(mutex_);
897 auto job_it = jobs_.find(id);
898 if (job_it == jobs_.end())
899 {
900 return make_error_result(error_code::job_invalid, "Job not found");
901 }
902
903 auto& dag_job_ptr = job_it->second;
904 slock.unlock();
905
906 auto result = dag_job_ptr->do_work();
907
908 if (result.is_ok())
909 {
910 on_job_completed(id);
911 }
912 else
913 {
914 on_job_failed(id, result.error().message);
915 }
916
917 return result;
918 },
919 "dag_job_" + std::to_string(id)
920 );
921
922 pool_->enqueue(std::move(wrapper_job));
923}
924
926{
927 std::vector<job_id> result;
928 std::unordered_map<job_id, int> in_degree;
929 std::queue<job_id> ready_queue;
930
931 // Initialize in-degrees
932 for (const auto& [id, _] : jobs_)
933 {
934 in_degree[id] = 0;
935 }
936
937 for (const auto& [id, deps] : dependencies_)
938 {
939 in_degree[id] = static_cast<int>(deps.size());
940 }
941
942 // Find initial ready jobs (in-degree 0)
943 for (const auto& [id, degree] : in_degree)
944 {
945 if (degree == 0)
946 {
947 ready_queue.push(id);
948 }
949 }
950
951 // Process queue
952 while (!ready_queue.empty())
953 {
954 auto current = ready_queue.front();
955 ready_queue.pop();
956 result.push_back(current);
957
958 // Reduce in-degree of dependents
959 auto it = dependents_.find(current);
960 if (it != dependents_.end())
961 {
962 for (const auto& dependent : it->second)
963 {
964 --in_degree[dependent];
965 if (in_degree[dependent] == 0)
966 {
967 ready_queue.push(dependent);
968 }
969 }
970 }
971 }
972
973 // If result doesn't contain all jobs, there's a cycle
974 if (result.size() != jobs_.size())
975 {
976 return {}; // Cycle detected
977 }
978
979 return result;
980}
981
982auto dag_scheduler::detect_cycle() const -> bool
983{
984 enum class Color { White, Gray, Black };
985
986 std::unordered_map<job_id, Color> color;
987 for (const auto& [id, _] : jobs_)
988 {
989 color[id] = Color::White;
990 }
991
992 std::function<bool(job_id)> dfs = [&](job_id id) -> bool {
993 color[id] = Color::Gray;
994
995 auto it = dependencies_.find(id);
996 if (it != dependencies_.end())
997 {
998 for (const auto& dep : it->second)
999 {
1000 if (color[dep] == Color::Gray)
1001 {
1002 return true; // Back edge = cycle
1003 }
1004 if (color[dep] == Color::White && dfs(dep))
1005 {
1006 return true;
1007 }
1008 }
1009 }
1010
1011 color[id] = Color::Black;
1012 return false;
1013 };
1014
1015 for (const auto& [id, _] : jobs_)
1016 {
1017 if (color[id] == Color::White && dfs(id))
1018 {
1019 return true;
1020 }
1021 }
1022
1023 return false;
1024}
1025
1027{
1028 auto it = dependencies_.find(id);
1029 if (it == dependencies_.end() || it->second.empty())
1030 {
1031 return true; // No dependencies
1032 }
1033
1034 for (const auto& dep : it->second)
1035 {
1036 auto job_it = jobs_.find(dep);
1037 if (job_it == jobs_.end())
1038 {
1039 return false; // Dependency not found
1040 }
1041 if (job_it->second->get_state() != dag_job_state::completed)
1042 {
1043 return false; // Dependency not completed
1044 }
1045 }
1046
1047 return true;
1048}
1049
1051{
1052 auto it = dependents_.find(failed_id);
1053 if (it == dependents_.end())
1054 {
1055 return;
1056 }
1057
1058 std::queue<job_id> to_skip;
1059 for (const auto& dep : it->second)
1060 {
1061 to_skip.push(dep);
1062 }
1063
1064 while (!to_skip.empty())
1065 {
1066 auto current = to_skip.front();
1067 to_skip.pop();
1068
1069 auto job_it = jobs_.find(current);
1070 if (job_it == jobs_.end())
1071 {
1072 continue;
1073 }
1074
1075 auto old_state = job_it->second->get_state();
1076 if (old_state == dag_job_state::pending || old_state == dag_job_state::ready)
1077 {
1078 job_it->second->set_state(dag_job_state::skipped);
1079 if (config_.state_callback)
1080 {
1081 config_.state_callback(current, old_state, dag_job_state::skipped);
1082 }
1083
1084 // Also skip dependents of skipped job
1085 auto dep_it = dependents_.find(current);
1086 if (dep_it != dependents_.end())
1087 {
1088 for (const auto& dep : dep_it->second)
1089 {
1090 to_skip.push(dep);
1091 }
1092 }
1093 }
1094 }
1095}
1096
1098{
1099 auto it = dependents_.find(failed_id);
1100 if (it == dependents_.end())
1101 {
1102 return;
1103 }
1104
1105 std::queue<job_id> to_cancel;
1106 for (const auto& dep : it->second)
1107 {
1108 to_cancel.push(dep);
1109 }
1110
1111 while (!to_cancel.empty())
1112 {
1113 auto current = to_cancel.front();
1114 to_cancel.pop();
1115
1116 auto job_it = jobs_.find(current);
1117 if (job_it == jobs_.end())
1118 {
1119 continue;
1120 }
1121
1122 auto old_state = job_it->second->get_state();
1123 if (old_state == dag_job_state::pending || old_state == dag_job_state::ready)
1124 {
1125 job_it->second->set_state(dag_job_state::cancelled);
1126 if (config_.state_callback)
1127 {
1128 config_.state_callback(current, old_state, dag_job_state::cancelled);
1129 }
1130
1131 // Also cancel dependents of cancelled job
1132 auto dep_it = dependents_.find(current);
1133 if (dep_it != dependents_.end())
1134 {
1135 for (const auto& dep : dep_it->second)
1136 {
1137 to_cancel.push(dep);
1138 }
1139 }
1140 }
1141 }
1142}
1143
1145{
1146 switch (state)
1147 {
1148 case dag_job_state::pending: return "#FFFFFF"; // White
1149 case dag_job_state::ready: return "#87CEEB"; // Light blue
1150 case dag_job_state::running: return "#FFFF00"; // Yellow
1151 case dag_job_state::completed: return "#90EE90"; // Light green
1152 case dag_job_state::failed: return "#FF6B6B"; // Light red
1153 case dag_job_state::cancelled: return "#D3D3D3"; // Light gray
1154 case dag_job_state::skipped: return "#FFA500"; // Orange
1155 default: return "#FFFFFF";
1156 }
1157}
1158
1160{
1161 for (const auto& [id, job] : jobs_)
1162 {
1163 auto state = job->get_state();
1164 if (state == dag_job_state::pending ||
1165 state == dag_job_state::ready ||
1166 state == dag_job_state::running)
1167 {
1168 return false;
1169 }
1170 }
1171 return true;
1172}
1173
1174} // namespace kcenon::thread
Specialized job class that encapsulates user-defined callbacks.
Fluent builder for creating dag_job instances.
DAG-based job scheduler with dependency management.
std::atomic< std::size_t > active_callbacks_
Number of active callbacks (for safe destruction)
std::shared_mutex mutex_
Mutex for thread-safe access.
auto on_job_failed(job_id id, const std::string &error) -> void
Called when a job fails.
auto wait() -> common::VoidResult
Waits for all jobs to complete.
auto topological_sort() const -> std::vector< job_id >
Performs topological sort.
std::unordered_map< job_id, std::vector< job_id > > dependents_
Reverse dependency graph (job -> jobs that depend on it)
auto on_job_completed(job_id id) -> void
Called when a job completes successfully.
auto to_json() const -> std::string
Exports the DAG as JSON format.
auto get_execution_order() const -> std::vector< job_id >
Gets topological execution order.
static auto get_state_color(dag_job_state state) -> std::string
Gets the state color for DOT visualization.
std::unordered_map< job_id, std::unique_ptr< dag_job > > jobs_
Job storage (job_id -> dag_job)
auto execute(job_id target) -> std::future< common::VoidResult >
Executes a specific job and its dependencies.
dag_scheduler(std::shared_ptr< thread_pool > pool, dag_config config={})
Constructs a DAG scheduler with a thread pool.
auto schedule_ready_jobs() -> void
Schedules ready jobs for execution.
auto get_jobs_in_state(dag_job_state state) const -> std::vector< dag_job_info >
Gets jobs in a specific state.
auto get_job_info(job_id id) const -> std::optional< dag_job_info >
Gets information about a specific job.
auto reset() -> common::VoidResult
Resets the scheduler for reuse.
auto operator=(const dag_scheduler &) -> dag_scheduler &=delete
auto execute_job(job_id id) -> void
Executes a single job.
auto is_execution_complete() const -> bool
Checks if execution is complete.
std::unordered_map< job_id, std::vector< job_id > > dependencies_
Dependency graph (job -> jobs it depends on)
auto get_all_jobs() const -> std::vector< dag_job_info >
Gets information about all jobs.
auto add_job(std::unique_ptr< dag_job > j) -> job_id
Adds a job to the DAG.
std::chrono::steady_clock::time_point execution_start_time_
Execution start time.
auto add_dependency(job_id dependent, job_id dependency) -> common::VoidResult
Adds a dependency between jobs.
auto get_ready_jobs() const -> std::vector< job_id >
Gets IDs of ready jobs (dependencies satisfied)
auto execute_all() -> std::future< common::VoidResult >
Executes all jobs in dependency order.
auto cancel_all() -> void
Cancels all pending jobs.
std::atomic< bool > executing_
Flag indicating execution is in progress.
auto are_dependencies_satisfied(job_id id) const -> bool
Checks if a job's dependencies are all satisfied.
auto has_cycles() const -> bool
Checks if the DAG has cycles.
auto get_stats() const -> dag_stats
Gets execution statistics.
auto remove_job(job_id id) -> common::VoidResult
Removes a job from the DAG (only if not yet started)
~dag_scheduler()
Destructor - cancels any pending jobs.
auto detect_cycle() const -> bool
Detects cycles using DFS.
auto skip_dependents(job_id failed_id) -> void
Marks dependents as skipped due to dependency failure.
auto cancel_dependents(job_id failed_id) -> void
Cancels dependents due to dependency failure.
auto to_dot() const -> std::string
Exports the DAG as DOT format (Graphviz)
Represents an error in the thread system.
Represents a unit of work (task) to be executed, typically by a job queue.
Definition job.h:136
auto get_name(void) const -> std::string
Retrieves the name of this job.
Definition job.cpp:112
A template class representing either a value or an error.
bool is_ok() const noexcept
Checks if the result is successful.
DAG-based job scheduler with dependency management and topological execution.
Core threading foundation of the thread system library.
Definition thread_impl.h:17
common::VoidResult make_error_result(error_code code, const std::string &message="")
Create a common::VoidResult error from a thread::error_code.
auto dag_job_state_to_string(dag_job_state state) -> std::string
Convert dag_job_state to string representation.
Definition dag_job.h:60
dag_job_state
State of a job in the DAG scheduler.
Definition dag_job.h:45
@ failed
Execution failed.
@ cancelled
Cancelled by user or dependency failure.
@ running
Currently executing.
@ pending
Waiting for dependencies to complete.
@ completed
Successfully completed.
@ ready
Dependencies satisfied, can be executed.
@ skipped
Skipped due to dependency failure.
std::uint64_t job_id
Unique job identifier for DAG scheduler.
Definition dag_job.h:33
common::error_info to_error_info(error_code code, const std::string &message="")
Convert a thread::error_code to common::error_info.
@ info
Informational messages highlighting progress.
constexpr job_id INVALID_JOB_ID
Invalid job ID constant.
Definition dag_job.h:38
@ retry
Retry failed job (with max retries)
@ fallback
Execute fallback job if available.
@ continue_others
Continue unrelated jobs, skip dependents.
@ fail_fast
Cancel all dependents immediately on failure.
STL namespace.
Configuration options for the DAG scheduler.
Definition dag_config.h:73
Information about a job in the DAG.
Definition dag_job.h:83
Statistics about DAG execution.
Definition dag_config.h:148
std::chrono::milliseconds total_execution_time
Total wall-clock time.
Definition dag_config.h:157
std::size_t pending_jobs
Number of pending jobs.
Definition dag_config.h:152
std::size_t cancelled_jobs
Number of cancelled jobs.
Definition dag_config.h:155
std::size_t total_jobs
Total number of jobs in DAG.
Definition dag_config.h:149
std::size_t skipped_jobs
Number of skipped jobs.
Definition dag_config.h:154
std::size_t failed_jobs
Number of failed jobs.
Definition dag_config.h:151
std::size_t running_jobs
Number of currently running jobs.
Definition dag_config.h:153
std::size_t completed_jobs
Number of successfully completed jobs.
Definition dag_config.h:150