19 : pool_(
std::move(pool))
20 , config_(
std::move(config))
32 std::this_thread::yield();
37 : pool_(std::move(other.pool_))
38 , config_(std::move(other.config_))
39 , jobs_(std::move(other.jobs_))
40 , dependencies_(std::move(other.dependencies_))
41 , dependents_(std::move(other.dependents_))
42 , executing_(other.executing_.load())
43 , cancelled_(other.cancelled_.load())
44 , running_count_(other.running_count_.load())
45 , execution_start_time_(other.execution_start_time_)
46 , first_error_(std::move(other.first_error_))
47 , retry_counts_(std::move(other.retry_counts_))
55 pool_ = std::move(other.pool_);
56 config_ = std::move(other.config_);
57 jobs_ = std::move(other.jobs_);
58 dependencies_ = std::move(other.dependencies_);
59 dependents_ = std::move(other.dependents_);
60 executing_.store(other.executing_.load());
61 cancelled_.store(other.cancelled_.load());
62 running_count_.store(other.running_count_.load());
63 execution_start_time_ = other.execution_start_time_;
64 first_error_ = std::move(other.first_error_);
65 retry_counts_ = std::move(other.retry_counts_);
76 std::unique_lock lock(mutex_);
78 auto id = j->get_dag_id();
81 for (
const auto& dep : j->get_dependencies())
83 dependencies_[id].push_back(dep);
84 dependents_[dep].push_back(
id);
88 if (config_.detect_cycles && detect_cycle())
91 dependencies_.erase(
id);
92 for (
const auto& dep : j->get_dependencies())
94 auto& deps = dependents_[dep];
95 deps.erase(std::remove(deps.begin(), deps.end(),
id), deps.end());
100 jobs_[id] = std::move(j);
106 return add_job(builder.build());
111 std::unique_lock lock(mutex_);
114 if (jobs_.find(dependent) == jobs_.end())
117 "Dependent job not found: " + std::to_string(dependent));
119 if (jobs_.find(dependency) == jobs_.end())
122 "Dependency job not found: " + std::to_string(dependency));
126 dependencies_[dependent].push_back(dependency);
127 dependents_[dependency].push_back(dependent);
130 if (config_.detect_cycles && detect_cycle())
133 dependencies_[dependent].pop_back();
134 dependents_[dependency].pop_back();
136 "Adding dependency would create a cycle");
140 jobs_[dependent]->add_dependency(dependency);
147 std::unique_lock lock(mutex_);
149 auto it = jobs_.find(
id);
150 if (it == jobs_.end())
153 "Job not found: " + std::to_string(
id));
156 auto state = it->second->get_state();
160 "Cannot remove running job: " + std::to_string(
id));
164 dependencies_.erase(
id);
165 dependents_.erase(
id);
168 for (
auto& [
job_id, deps] : dependencies_)
170 deps.erase(std::remove(deps.begin(), deps.end(),
id), deps.end());
172 for (
auto& [
job_id, deps] : dependents_)
174 deps.erase(std::remove(deps.begin(), deps.end(),
id), deps.end());
187 auto promise = std::make_shared<std::promise<common::VoidResult>>();
188 auto future = promise->get_future();
191 std::thread([
this, promise]() {
193 std::unique_lock lock(mutex_);
194 if (executing_.exchange(
true))
197 "Execution already in progress"));
201 cancelled_.store(
false);
202 first_error_.reset();
203 execution_start_time_ = std::chrono::steady_clock::now();
206 for (
auto& [
id,
job] : jobs_)
210 if (are_dependencies_satisfied(
id))
219 schedule_ready_jobs();
223 executing_.store(
false);
224 promise->set_value(
result);
232 auto promise = std::make_shared<std::promise<common::VoidResult>>();
233 auto future = promise->get_future();
235 std::thread([
this, target, promise]() {
237 std::unique_lock lock(mutex_);
239 auto it = jobs_.find(target);
240 if (it == jobs_.end())
243 "Target job not found: " + std::to_string(target)));
247 if (executing_.exchange(
true))
250 "Execution already in progress"));
254 cancelled_.store(
false);
255 first_error_.reset();
256 execution_start_time_ = std::chrono::steady_clock::now();
259 std::unordered_set<job_id> needed_jobs;
260 std::stack<job_id> to_visit;
261 to_visit.push(target);
263 while (!to_visit.empty())
265 auto current = to_visit.top();
268 if (needed_jobs.count(current) > 0)
272 needed_jobs.insert(current);
274 auto deps_it = dependencies_.find(current);
275 if (deps_it != dependencies_.end())
277 for (
const auto& dep : deps_it->second)
285 for (
const auto&
id : needed_jobs)
287 auto job_it = jobs_.find(
id);
290 bool deps_in_needed =
true;
291 auto deps_it = dependencies_.find(
id);
292 if (deps_it != dependencies_.end())
294 for (
const auto& dep : deps_it->second)
296 if (needed_jobs.count(dep) == 0)
298 deps_in_needed =
false;
304 if (deps_in_needed && are_dependencies_satisfied(
id))
312 schedule_ready_jobs();
315 executing_.store(
false);
316 promise->set_value(
result);
324 std::unique_lock lock(mutex_);
325 cancelled_.store(
true);
327 for (
auto& [
id,
job] : jobs_)
329 auto state =
job->get_state();
333 if (config_.state_callback)
340 completion_cv_.notify_all();
345 std::shared_lock lock(mutex_);
347 completion_cv_.wait(lock, [
this]() {
348 return is_execution_complete() || cancelled_.load();
353 return common::VoidResult(*first_error_);
361 std::unique_lock lock(mutex_);
363 if (executing_.load())
366 "Cannot reset while execution is in progress");
370 dependencies_.clear();
372 retry_counts_.clear();
373 first_error_.reset();
374 cancelled_.store(
false);
385 std::shared_lock lock(mutex_);
387 auto it = jobs_.find(
id);
388 if (it == jobs_.end())
393 auto info = it->second->get_info();
396 auto dep_it = dependents_.find(
id);
397 if (dep_it != dependents_.end())
399 info.dependents = dep_it->second;
407 std::shared_lock lock(
mutex_);
409 std::vector<dag_job_info>
result;
418 info.dependents = dep_it->second;
428 std::shared_lock lock(mutex_);
430 std::vector<dag_job_info>
result;
432 for (
const auto& [
id,
job] : jobs_)
434 if (
job->get_state() == state)
437 auto dep_it = dependents_.find(
id);
438 if (dep_it != dependents_.end())
440 info.dependents = dep_it->second;
451 std::shared_lock lock(
mutex_);
453 std::vector<job_id>
result;
472 std::shared_lock lock(
mutex_);
478 std::shared_lock lock(
mutex_);
488 std::shared_lock lock(
mutex_);
490 std::ostringstream ss;
491 ss <<
"digraph DAG {\n";
492 ss <<
" rankdir=TB;\n";
493 ss <<
" node [shape=box, style=filled];\n\n";
498 auto state =
job->get_state();
509 for (
const auto& dependency : deps)
511 ss <<
" " << dependency <<
" -> " << dependent <<
";\n";
522 std::shared_lock lock(
mutex_);
524 std::ostringstream ss;
526 ss <<
" \"jobs\": [\n";
528 bool first_job =
true;
531 if (!first_job) ss <<
",\n";
536 ss <<
" \"id\": " <<
id <<
",\n";
537 ss <<
" \"name\": \"" <<
info.name <<
"\",\n";
539 ss <<
" \"dependencies\": [";
541 bool first_dep =
true;
542 for (
const auto& dep :
info.dependencies)
544 if (!first_dep) ss <<
", ";
550 ss <<
" \"dependents\": [";
555 for (
const auto& dep : dep_it->second)
557 if (!first_dep) ss <<
", ";
564 auto wait_ms =
info.get_wait_time().count();
565 auto exec_ms =
info.get_execution_time().count();
566 ss <<
" \"wait_time_ms\": " << wait_ms <<
",\n";
567 ss <<
" \"execution_time_ms\": " << exec_ms;
569 if (
info.error_message)
571 ss <<
",\n \"error\": \"" << *
info.error_message <<
"\"";
581 ss <<
" \"stats\": {\n";
582 ss <<
" \"total_jobs\": " << stats.total_jobs <<
",\n";
583 ss <<
" \"completed_jobs\": " << stats.completed_jobs <<
",\n";
584 ss <<
" \"failed_jobs\": " << stats.failed_jobs <<
",\n";
585 ss <<
" \"pending_jobs\": " << stats.pending_jobs <<
",\n";
586 ss <<
" \"running_jobs\": " << stats.running_jobs <<
",\n";
587 ss <<
" \"skipped_jobs\": " << stats.skipped_jobs <<
",\n";
588 ss <<
" \"cancelled_jobs\": " << stats.cancelled_jobs <<
"\n";
610 switch (
job->get_state())
638 std::unique_lock lock(mutex_);
640 auto it = jobs_.find(
id);
641 if (it == jobs_.end())
646 auto old_state = it->second->get_state();
648 it->second->record_end_time();
651 if (config_.state_callback)
656 if (config_.completion_callback)
658 config_.completion_callback(
id);
662 auto dep_it = dependents_.find(
id);
663 if (dep_it != dependents_.end())
665 for (
const auto& dependent : dep_it->second)
667 auto job_it = jobs_.find(dependent);
670 if (are_dependencies_satisfied(dependent))
673 if (config_.state_callback)
685 schedule_ready_jobs();
688 completion_cv_.notify_all();
695 std::unique_lock lock(mutex_);
697 auto it = jobs_.find(
id);
698 if (it == jobs_.end())
703 it->second->set_error_message(
error);
704 it->second->record_end_time();
707 switch (config_.failure_policy)
711 auto& retry_count = retry_counts_[id];
712 if (retry_count < config_.max_retries)
719 if (config_.state_callback)
727 std::this_thread::sleep_for(config_.retry_delay);
729 schedule_ready_jobs();
741 auto fallback = it->second->get_fallback();
748 if (config_.state_callback)
753 schedule_ready_jobs();
754 completion_cv_.notify_all();
764 auto old_state = it->second->get_state();
773 if (config_.state_callback)
778 if (config_.error_callback)
780 config_.error_callback(
id,
error);
784 cancel_dependents(
id);
790 auto old_state = it->second->get_state();
799 if (config_.state_callback)
804 if (config_.error_callback)
806 config_.error_callback(
id,
error);
816 schedule_ready_jobs();
817 completion_cv_.notify_all();
823 if (cancelled_.load())
828 std::vector<job_id> ready_jobs;
831 std::shared_lock lock(mutex_);
832 for (
const auto& [
id,
job] : jobs_)
836 ready_jobs.push_back(
id);
841 for (
const auto&
id : ready_jobs)
843 if (cancelled_.load())
848 if (config_.execute_in_parallel)
857 std::shared_lock lock(mutex_);
858 completion_cv_.wait(lock, [
this,
id]() {
859 auto it = jobs_.find(
id);
860 if (it == jobs_.end())
return true;
861 auto state = it->second->get_state();
870 std::unique_lock lock(mutex_);
872 auto it = jobs_.find(
id);
873 if (it == jobs_.end())
883 it->second->record_start_time();
886 if (config_.state_callback)
894 auto wrapper_job = std::make_unique<callback_job>(
895 [
this,
id]() -> common::VoidResult {
896 std::shared_lock slock(mutex_);
897 auto job_it = jobs_.find(
id);
898 if (job_it == jobs_.end())
903 auto& dag_job_ptr = job_it->second;
906 auto result = dag_job_ptr->do_work();
910 on_job_completed(
id);
914 on_job_failed(
id,
result.error().message);
919 "dag_job_" + std::to_string(
id)
922 pool_->enqueue(std::move(wrapper_job));
927 std::vector<job_id>
result;
928 std::unordered_map<job_id, int> in_degree;
929 std::queue<job_id> ready_queue;
932 for (
const auto& [
id, _] :
jobs_)
939 in_degree[id] =
static_cast<int>(deps.size());
943 for (
const auto& [
id, degree] : in_degree)
947 ready_queue.push(
id);
952 while (!ready_queue.empty())
954 auto current = ready_queue.front();
956 result.push_back(current);
962 for (
const auto& dependent : it->second)
964 --in_degree[dependent];
965 if (in_degree[dependent] == 0)
967 ready_queue.push(dependent);
984 enum class Color { White, Gray, Black };
986 std::unordered_map<job_id, Color> color;
987 for (
const auto& [
id, _] :
jobs_)
989 color[id] = Color::White;
992 std::function<bool(
job_id)> dfs = [&](
job_id id) ->
bool {
993 color[id] = Color::Gray;
998 for (
const auto& dep : it->second)
1000 if (color[dep] == Color::Gray)
1004 if (color[dep] == Color::White && dfs(dep))
1011 color[id] = Color::Black;
1015 for (
const auto& [
id, _] :
jobs_)
1017 if (color[
id] == Color::White && dfs(
id))
1028 auto it = dependencies_.find(
id);
1029 if (it == dependencies_.end() || it->second.empty())
1034 for (
const auto& dep : it->second)
1036 auto job_it = jobs_.find(dep);
1037 if (job_it == jobs_.end())
1052 auto it = dependents_.find(failed_id);
1053 if (it == dependents_.end())
1058 std::queue<job_id> to_skip;
1059 for (
const auto& dep : it->second)
1064 while (!to_skip.empty())
1066 auto current = to_skip.front();
1069 auto job_it = jobs_.find(current);
1070 if (job_it == jobs_.end())
1075 auto old_state = job_it->second->get_state();
1079 if (config_.state_callback)
1085 auto dep_it = dependents_.find(current);
1086 if (dep_it != dependents_.end())
1088 for (
const auto& dep : dep_it->second)
1099 auto it = dependents_.find(failed_id);
1100 if (it == dependents_.end())
1105 std::queue<job_id> to_cancel;
1106 for (
const auto& dep : it->second)
1108 to_cancel.push(dep);
1111 while (!to_cancel.empty())
1113 auto current = to_cancel.front();
1116 auto job_it = jobs_.find(current);
1117 if (job_it == jobs_.end())
1122 auto old_state = job_it->second->get_state();
1126 if (config_.state_callback)
1132 auto dep_it = dependents_.find(current);
1133 if (dep_it != dependents_.end())
1135 for (
const auto& dep : dep_it->second)
1137 to_cancel.push(dep);
1155 default:
return "#FFFFFF";
1161 for (
const auto& [
id,
job] :
jobs_)
1163 auto state =
job->get_state();
Specialized job class that encapsulates user-defined callbacks.
Fluent builder for creating dag_job instances.
DAG-based job scheduler with dependency management.
std::atomic< std::size_t > active_callbacks_
Number of active callbacks (for safe destruction)
std::shared_mutex mutex_
Mutex for thread-safe access.
auto on_job_failed(job_id id, const std::string &error) -> void
Called when a job fails.
auto wait() -> common::VoidResult
Waits for all jobs to complete.
auto topological_sort() const -> std::vector< job_id >
Performs topological sort.
std::unordered_map< job_id, std::vector< job_id > > dependents_
Reverse dependency graph (job -> jobs that depend on it)
auto on_job_completed(job_id id) -> void
Called when a job completes successfully.
auto to_json() const -> std::string
Exports the DAG as JSON format.
auto get_execution_order() const -> std::vector< job_id >
Gets topological execution order.
static auto get_state_color(dag_job_state state) -> std::string
Gets the state color for DOT visualization.
std::unordered_map< job_id, std::unique_ptr< dag_job > > jobs_
Job storage (job_id -> dag_job)
auto execute(job_id target) -> std::future< common::VoidResult >
Executes a specific job and its dependencies.
dag_scheduler(std::shared_ptr< thread_pool > pool, dag_config config={})
Constructs a DAG scheduler with a thread pool.
auto schedule_ready_jobs() -> void
Schedules ready jobs for execution.
auto get_jobs_in_state(dag_job_state state) const -> std::vector< dag_job_info >
Gets jobs in a specific state.
auto get_job_info(job_id id) const -> std::optional< dag_job_info >
Gets information about a specific job.
auto reset() -> common::VoidResult
Resets the scheduler for reuse.
auto operator=(const dag_scheduler &) -> dag_scheduler &=delete
auto execute_job(job_id id) -> void
Executes a single job.
auto is_execution_complete() const -> bool
Checks if execution is complete.
std::unordered_map< job_id, std::vector< job_id > > dependencies_
Dependency graph (job -> jobs it depends on)
auto get_all_jobs() const -> std::vector< dag_job_info >
Gets information about all jobs.
auto add_job(std::unique_ptr< dag_job > j) -> job_id
Adds a job to the DAG.
std::chrono::steady_clock::time_point execution_start_time_
Execution start time.
auto add_dependency(job_id dependent, job_id dependency) -> common::VoidResult
Adds a dependency between jobs.
auto get_ready_jobs() const -> std::vector< job_id >
Gets IDs of ready jobs (dependencies satisfied)
auto execute_all() -> std::future< common::VoidResult >
Executes all jobs in dependency order.
auto cancel_all() -> void
Cancels all pending jobs.
std::atomic< bool > executing_
Flag indicating execution is in progress.
auto are_dependencies_satisfied(job_id id) const -> bool
Checks if a job's dependencies are all satisfied.
auto has_cycles() const -> bool
Checks if the DAG has cycles.
auto get_stats() const -> dag_stats
Gets execution statistics.
auto remove_job(job_id id) -> common::VoidResult
Removes a job from the DAG (only if not yet started)
~dag_scheduler()
Destructor - cancels any pending jobs.
auto detect_cycle() const -> bool
Detects cycles using DFS.
auto skip_dependents(job_id failed_id) -> void
Marks dependents as skipped due to dependency failure.
auto cancel_dependents(job_id failed_id) -> void
Cancels dependents due to dependency failure.
auto to_dot() const -> std::string
Exports the DAG as DOT format (Graphviz)
Represents an error in the thread system.
Represents a unit of work (task) to be executed, typically by a job queue.
auto get_name(void) const -> std::string
Retrieves the name of this job.
A template class representing either a value or an error.
bool is_ok() const noexcept
Checks if the result is successful.
DAG-based job scheduler with dependency management and topological execution.
Core threading foundation of the thread system library.
common::VoidResult make_error_result(error_code code, const std::string &message="")
Create a common::VoidResult error from a thread::error_code.
auto dag_job_state_to_string(dag_job_state state) -> std::string
Convert dag_job_state to string representation.
dag_job_state
State of a job in the DAG scheduler.
@ failed
Execution failed.
@ cancelled
Cancelled by user or dependency failure.
@ running
Currently executing.
@ pending
Waiting for dependencies to complete.
@ completed
Successfully completed.
@ ready
Dependencies satisfied, can be executed.
@ skipped
Skipped due to dependency failure.
std::uint64_t job_id
Unique job identifier for DAG scheduler.
common::error_info to_error_info(error_code code, const std::string &message="")
Convert a thread::error_code to common::error_info.
@ info
Informational messages highlighting progress.
constexpr job_id INVALID_JOB_ID
Invalid job ID constant.
@ retry
Retry failed job (with max retries)
@ fallback
Execute fallback job if available.
@ continue_others
Continue unrelated jobs, skip dependents.
@ fail_fast
Cancel all dependents immediately on failure.
Configuration options for the DAG scheduler.
Information about a job in the DAG.
Statistics about DAG execution.
std::chrono::milliseconds total_execution_time
Total wall-clock time.
std::size_t pending_jobs
Number of pending jobs.
std::size_t cancelled_jobs
Number of cancelled jobs.
std::size_t total_jobs
Total number of jobs in DAG.
std::size_t skipped_jobs
Number of skipped jobs.
std::size_t failed_jobs
Number of failed jobs.
std::size_t running_jobs
Number of currently running jobs.
std::size_t completed_jobs
Number of successfully completed jobs.