Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
kcenon::thread::dag_scheduler Class Reference

DAG-based job scheduler with dependency management. More...

#include <dag_scheduler.h>

Collaboration diagram for kcenon::thread::dag_scheduler:
Collaboration graph

Public Member Functions

 dag_scheduler (std::shared_ptr< thread_pool > pool, dag_config config={})
 Constructs a DAG scheduler with a thread pool.
 
 ~dag_scheduler ()
 Destructor - cancels any pending jobs.
 
 dag_scheduler (const dag_scheduler &)=delete
 
auto operator= (const dag_scheduler &) -> dag_scheduler &=delete
 
 dag_scheduler (dag_scheduler &&) noexcept
 
auto operator= (dag_scheduler &&) noexcept -> dag_scheduler &
 
auto add_job (std::unique_ptr< dag_job > j) -> job_id
 Adds a job to the DAG.
 
auto add_job (dag_job_builder &&builder) -> job_id
 Adds a job using a builder.
 
auto add_dependency (job_id dependent, job_id dependency) -> common::VoidResult
 Adds a dependency between jobs.
 
auto remove_job (job_id id) -> common::VoidResult
 Removes a job from the DAG (only if not yet started)
 
auto execute_all () -> std::future< common::VoidResult >
 Executes all jobs in dependency order.
 
auto execute (job_id target) -> std::future< common::VoidResult >
 Executes a specific job and its dependencies.
 
auto cancel_all () -> void
 Cancels all pending jobs.
 
auto wait () -> common::VoidResult
 Waits for all jobs to complete.
 
auto reset () -> common::VoidResult
 Resets the scheduler for reuse.
 
auto get_job_info (job_id id) const -> std::optional< dag_job_info >
 Gets information about a specific job.
 
auto get_all_jobs () const -> std::vector< dag_job_info >
 Gets information about all jobs.
 
auto get_jobs_in_state (dag_job_state state) const -> std::vector< dag_job_info >
 Gets jobs in a specific state.
 
auto get_ready_jobs () const -> std::vector< job_id >
 Gets IDs of ready jobs (dependencies satisfied)
 
auto has_cycles () const -> bool
 Checks if the DAG has cycles.
 
auto get_execution_order () const -> std::vector< job_id >
 Gets topological execution order.
 
template<typename T >
auto get_result (job_id id) const -> const T &
 Gets the result from a completed job.
 
auto to_dot () const -> std::string
 Exports the DAG as DOT format (Graphviz)
 
auto to_json () const -> std::string
 Exports the DAG as JSON format.
 
auto get_stats () const -> dag_stats
 Gets execution statistics.
 
auto get_config () const -> const dag_config &
 Gets the configuration.
 

Private Member Functions

auto on_job_completed (job_id id) -> void
 Called when a job completes successfully.
 
auto on_job_failed (job_id id, const std::string &error) -> void
 Called when a job fails.
 
auto schedule_ready_jobs () -> void
 Schedules ready jobs for execution.
 
auto execute_job (job_id id) -> void
 Executes a single job.
 
auto topological_sort () const -> std::vector< job_id >
 Performs topological sort.
 
auto detect_cycle () const -> bool
 Detects cycles using DFS.
 
auto are_dependencies_satisfied (job_id id) const -> bool
 Checks if a job's dependencies are all satisfied.
 
auto skip_dependents (job_id failed_id) -> void
 Marks dependents as skipped due to dependency failure.
 
auto cancel_dependents (job_id failed_id) -> void
 Cancels dependents due to dependency failure.
 
auto is_execution_complete () const -> bool
 Checks if execution is complete.
 

Static Private Member Functions

static auto get_state_color (dag_job_state state) -> std::string
 Gets the state color for DOT visualization.
 

Private Attributes

std::shared_ptr< thread_poolpool_
 Thread pool for job execution.
 
dag_config config_
 Configuration.
 
std::unordered_map< job_id, std::unique_ptr< dag_job > > jobs_
 Job storage (job_id -> dag_job)
 
std::unordered_map< job_id, std::vector< job_id > > dependencies_
 Dependency graph (job -> jobs it depends on)
 
std::unordered_map< job_id, std::vector< job_id > > dependents_
 Reverse dependency graph (job -> jobs that depend on it)
 
std::shared_mutex mutex_
 Mutex for thread-safe access.
 
std::condition_variable_any completion_cv_
 Condition variable for waiting on completion.
 
std::atomic< bool > executing_ {false}
 Flag indicating execution is in progress.
 
std::atomic< bool > cancelled_ {false}
 Flag indicating cancellation was requested.
 
std::atomic< std::size_t > running_count_ {0}
 Number of jobs currently running.
 
std::atomic< std::size_t > active_callbacks_ {0}
 Number of active callbacks (for safe destruction)
 
std::chrono::steady_clock::time_point execution_start_time_
 Execution start time.
 
std::optional< common::error_info > first_error_
 First error encountered during execution.
 
std::unordered_map< job_id, std::size_t > retry_counts_
 Retry count per job.
 

Detailed Description

DAG-based job scheduler with dependency management.

The dag_scheduler manages jobs with dependencies, ensuring they execute in the correct order. Jobs are represented as a Directed Acyclic Graph (DAG) where edges represent dependencies.

Key Features

  • Automatic dependency resolution
  • Parallel execution of independent jobs
  • Cycle detection
  • Multiple failure handling policies
  • Result passing between jobs
  • DOT/JSON visualization export

Thread Safety

All public methods are thread-safe and can be called from any thread. Internal state is protected by a shared_mutex for optimal read performance.

Usage Example

auto pool = std::make_shared<thread_pool>("pool");
pool->start();
dag_scheduler scheduler(pool);
auto job_a = scheduler.add_job(
dag_job_builder("fetch")
.work([]{ return fetch_data(); })
.build()
);
auto job_b = scheduler.add_job(
dag_job_builder("process")
.depends_on(job_a)
.work([]{ return process_data(); })
.build()
);
scheduler.execute_all().wait();
Fluent builder for creating dag_job instances.
auto build() -> std::unique_ptr< dag_job >
Builds and returns the configured dag_job.
DAG-based job scheduler with dependency management.

Definition at line 78 of file dag_scheduler.h.

Constructor & Destructor Documentation

◆ dag_scheduler() [1/3]

kcenon::thread::dag_scheduler::dag_scheduler ( std::shared_ptr< thread_pool > pool,
dag_config config = {} )
explicit

Constructs a DAG scheduler with a thread pool.

Parameters
poolThe thread pool to use for job execution
configOptional configuration (default: dag_config{})

Definition at line 18 of file dag_scheduler.cpp.

19 : pool_(std::move(pool))
20 , config_(std::move(config))
21{
22}
std::shared_ptr< thread_pool > pool_
Thread pool for job execution.
dag_config config_
Configuration.

◆ ~dag_scheduler()

kcenon::thread::dag_scheduler::~dag_scheduler ( )

Destructor - cancels any pending jobs.

Definition at line 24 of file dag_scheduler.cpp.

25{
26 cancel_all();
27
28 // Wait for in-flight callbacks to finish accessing members
29 // (e.g., completion_cv_.notify_all() after mutex unlock)
30 while (active_callbacks_.load(std::memory_order_acquire) > 0)
31 {
32 std::this_thread::yield();
33 }
34}
std::atomic< std::size_t > active_callbacks_
Number of active callbacks (for safe destruction)
auto cancel_all() -> void
Cancels all pending jobs.

References active_callbacks_, and cancel_all().

Here is the call graph for this function:

◆ dag_scheduler() [2/3]

kcenon::thread::dag_scheduler::dag_scheduler ( const dag_scheduler & )
delete

◆ dag_scheduler() [3/3]

kcenon::thread::dag_scheduler::dag_scheduler ( dag_scheduler && other)
noexcept

Definition at line 36 of file dag_scheduler.cpp.

37 : pool_(std::move(other.pool_))
38 , config_(std::move(other.config_))
39 , jobs_(std::move(other.jobs_))
40 , dependencies_(std::move(other.dependencies_))
41 , dependents_(std::move(other.dependents_))
42 , executing_(other.executing_.load())
43 , cancelled_(other.cancelled_.load())
44 , running_count_(other.running_count_.load())
45 , execution_start_time_(other.execution_start_time_)
46 , first_error_(std::move(other.first_error_))
47 , retry_counts_(std::move(other.retry_counts_))
48{
49}
std::unordered_map< job_id, std::vector< job_id > > dependents_
Reverse dependency graph (job -> jobs that depend on it)
std::unordered_map< job_id, std::unique_ptr< dag_job > > jobs_
Job storage (job_id -> dag_job)
std::unordered_map< job_id, std::size_t > retry_counts_
Retry count per job.
std::atomic< std::size_t > running_count_
Number of jobs currently running.
std::optional< common::error_info > first_error_
First error encountered during execution.
std::unordered_map< job_id, std::vector< job_id > > dependencies_
Dependency graph (job -> jobs it depends on)
std::chrono::steady_clock::time_point execution_start_time_
Execution start time.
std::atomic< bool > executing_
Flag indicating execution is in progress.
std::atomic< bool > cancelled_
Flag indicating cancellation was requested.

Member Function Documentation

◆ add_dependency()

auto kcenon::thread::dag_scheduler::add_dependency ( job_id dependent,
job_id dependency ) -> common::VoidResult
nodiscard

Adds a dependency between jobs.

Parameters
dependentThe job that depends on another
dependencyThe job being depended on
Returns
Success or error if cycle detected or jobs not found

Thread Safety: Thread-safe (acquires exclusive lock)

Definition at line 109 of file dag_scheduler.cpp.

110{
111 std::unique_lock lock(mutex_);
112
113 // Check if jobs exist
114 if (jobs_.find(dependent) == jobs_.end())
115 {
117 "Dependent job not found: " + std::to_string(dependent));
118 }
119 if (jobs_.find(dependency) == jobs_.end())
120 {
122 "Dependency job not found: " + std::to_string(dependency));
123 }
124
125 // Add dependency
126 dependencies_[dependent].push_back(dependency);
127 dependents_[dependency].push_back(dependent);
128
129 // Check for cycles
131 {
132 // Rollback
133 dependencies_[dependent].pop_back();
134 dependents_[dependency].pop_back();
136 "Adding dependency would create a cycle");
137 }
138
139 // Update job's dependencies
140 jobs_[dependent]->add_dependency(dependency);
141
142 return common::ok();
143}
std::shared_mutex mutex_
Mutex for thread-safe access.
auto detect_cycle() const -> bool
Detects cycles using DFS.
common::VoidResult make_error_result(error_code code, const std::string &message="")
Create a common::VoidResult error from a thread::error_code.
bool detect_cycles
Whether to detect and reject cycles.
Definition dag_config.h:105

References kcenon::thread::invalid_argument, kcenon::thread::job_invalid, and kcenon::thread::make_error_result().

Here is the call graph for this function:

◆ add_job() [1/2]

auto kcenon::thread::dag_scheduler::add_job ( dag_job_builder && builder) -> job_id
nodiscard

Adds a job using a builder.

Parameters
builderThe job builder
Returns
The assigned job ID

Thread Safety: Thread-safe (acquires exclusive lock)

Definition at line 104 of file dag_scheduler.cpp.

105{
106 return add_job(builder.build());
107}
auto add_job(std::unique_ptr< dag_job > j) -> job_id
Adds a job to the DAG.

◆ add_job() [2/2]

auto kcenon::thread::dag_scheduler::add_job ( std::unique_ptr< dag_job > j) -> job_id
nodiscard

Adds a job to the DAG.

Parameters
jThe job to add
Returns
The assigned job ID

Thread Safety: Thread-safe (acquires exclusive lock)

Definition at line 74 of file dag_scheduler.cpp.

75{
76 std::unique_lock lock(mutex_);
77
78 auto id = j->get_dag_id();
79
80 // Add dependencies from the job
81 for (const auto& dep : j->get_dependencies())
82 {
83 dependencies_[id].push_back(dep);
84 dependents_[dep].push_back(id);
85 }
86
87 // Check for cycles if configured
89 {
90 // Rollback
91 dependencies_.erase(id);
92 for (const auto& dep : j->get_dependencies())
93 {
94 auto& deps = dependents_[dep];
95 deps.erase(std::remove(deps.begin(), deps.end(), id), deps.end());
96 }
97 return INVALID_JOB_ID;
98 }
99
100 jobs_[id] = std::move(j);
101 return id;
102}
constexpr job_id INVALID_JOB_ID
Invalid job ID constant.
Definition dag_job.h:38

References kcenon::thread::INVALID_JOB_ID.

◆ are_dependencies_satisfied()

auto kcenon::thread::dag_scheduler::are_dependencies_satisfied ( job_id id) const -> bool
nodiscardprivate

Checks if a job's dependencies are all satisfied.

Parameters
idThe job ID to check
Returns
true if all dependencies are completed

Definition at line 1026 of file dag_scheduler.cpp.

1027{
1028 auto it = dependencies_.find(id);
1029 if (it == dependencies_.end() || it->second.empty())
1030 {
1031 return true; // No dependencies
1032 }
1033
1034 for (const auto& dep : it->second)
1035 {
1036 auto job_it = jobs_.find(dep);
1037 if (job_it == jobs_.end())
1038 {
1039 return false; // Dependency not found
1040 }
1041 if (job_it->second->get_state() != dag_job_state::completed)
1042 {
1043 return false; // Dependency not completed
1044 }
1045 }
1046
1047 return true;
1048}
@ completed
Successfully completed.

References kcenon::thread::completed.

Referenced by get_ready_jobs().

Here is the caller graph for this function:

◆ cancel_all()

auto kcenon::thread::dag_scheduler::cancel_all ( ) -> void

Cancels all pending jobs.

Jobs that are already running will complete, but no new jobs will start.

Thread Safety: Thread-safe

Definition at line 322 of file dag_scheduler.cpp.

323{
324 std::unique_lock lock(mutex_);
325 cancelled_.store(true);
326
327 for (auto& [id, job] : jobs_)
328 {
329 auto state = job->get_state();
330 if (state == dag_job_state::pending || state == dag_job_state::ready)
331 {
332 job->set_state(dag_job_state::cancelled);
334 {
336 }
337 }
338 }
339
340 completion_cv_.notify_all();
341}
std::condition_variable_any completion_cv_
Condition variable for waiting on completion.
@ cancelled
Cancelled by user or dependency failure.
@ pending
Waiting for dependencies to complete.
@ ready
Dependencies satisfied, can be executed.
std::function< void(job_id, dag_job_state, dag_job_state)> state_callback
Callback for state changes.
Definition dag_config.h:123

References kcenon::thread::cancelled, kcenon::thread::pending, and kcenon::thread::ready.

Referenced by ~dag_scheduler().

Here is the caller graph for this function:

◆ cancel_dependents()

auto kcenon::thread::dag_scheduler::cancel_dependents ( job_id failed_id) -> void
private

Cancels dependents due to dependency failure.

Parameters
failed_idThe failed job ID

Definition at line 1097 of file dag_scheduler.cpp.

1098{
1099 auto it = dependents_.find(failed_id);
1100 if (it == dependents_.end())
1101 {
1102 return;
1103 }
1104
1105 std::queue<job_id> to_cancel;
1106 for (const auto& dep : it->second)
1107 {
1108 to_cancel.push(dep);
1109 }
1110
1111 while (!to_cancel.empty())
1112 {
1113 auto current = to_cancel.front();
1114 to_cancel.pop();
1115
1116 auto job_it = jobs_.find(current);
1117 if (job_it == jobs_.end())
1118 {
1119 continue;
1120 }
1121
1122 auto old_state = job_it->second->get_state();
1123 if (old_state == dag_job_state::pending || old_state == dag_job_state::ready)
1124 {
1125 job_it->second->set_state(dag_job_state::cancelled);
1127 {
1128 config_.state_callback(current, old_state, dag_job_state::cancelled);
1129 }
1130
1131 // Also cancel dependents of cancelled job
1132 auto dep_it = dependents_.find(current);
1133 if (dep_it != dependents_.end())
1134 {
1135 for (const auto& dep : dep_it->second)
1136 {
1137 to_cancel.push(dep);
1138 }
1139 }
1140 }
1141 }
1142}

References kcenon::thread::cancelled, kcenon::thread::pending, and kcenon::thread::ready.

◆ detect_cycle()

auto kcenon::thread::dag_scheduler::detect_cycle ( ) const -> bool
nodiscardprivate

Detects cycles using DFS.

Returns
true if cycle detected

Definition at line 982 of file dag_scheduler.cpp.

983{
984 enum class Color { White, Gray, Black };
985
986 std::unordered_map<job_id, Color> color;
987 for (const auto& [id, _] : jobs_)
988 {
989 color[id] = Color::White;
990 }
991
992 std::function<bool(job_id)> dfs = [&](job_id id) -> bool {
993 color[id] = Color::Gray;
994
995 auto it = dependencies_.find(id);
996 if (it != dependencies_.end())
997 {
998 for (const auto& dep : it->second)
999 {
1000 if (color[dep] == Color::Gray)
1001 {
1002 return true; // Back edge = cycle
1003 }
1004 if (color[dep] == Color::White && dfs(dep))
1005 {
1006 return true;
1007 }
1008 }
1009 }
1010
1011 color[id] = Color::Black;
1012 return false;
1013 };
1014
1015 for (const auto& [id, _] : jobs_)
1016 {
1017 if (color[id] == Color::White && dfs(id))
1018 {
1019 return true;
1020 }
1021 }
1022
1023 return false;
1024}
std::uint64_t job_id
Unique job identifier for DAG scheduler.
Definition dag_job.h:33

References dependencies_, and jobs_.

Referenced by has_cycles().

Here is the caller graph for this function:

◆ execute()

auto kcenon::thread::dag_scheduler::execute ( job_id target) -> std::future<common::VoidResult>
nodiscard

Executes a specific job and its dependencies.

Parameters
targetThe target job to execute
Returns
Future that completes when the target and dependencies are done

Thread Safety: Thread-safe

Definition at line 230 of file dag_scheduler.cpp.

231{
232 auto promise = std::make_shared<std::promise<common::VoidResult>>();
233 auto future = promise->get_future();
234
235 std::thread([this, target, promise]() {
236 {
237 std::unique_lock lock(mutex_);
238
239 auto it = jobs_.find(target);
240 if (it == jobs_.end())
241 {
243 "Target job not found: " + std::to_string(target)));
244 return;
245 }
246
247 if (executing_.exchange(true))
248 {
250 "Execution already in progress"));
251 return;
252 }
253
254 cancelled_.store(false);
255 first_error_.reset();
256 execution_start_time_ = std::chrono::steady_clock::now();
257
258 // Get all dependencies of target (transitively)
259 std::unordered_set<job_id> needed_jobs;
260 std::stack<job_id> to_visit;
261 to_visit.push(target);
262
263 while (!to_visit.empty())
264 {
265 auto current = to_visit.top();
266 to_visit.pop();
267
268 if (needed_jobs.count(current) > 0)
269 {
270 continue;
271 }
272 needed_jobs.insert(current);
273
274 auto deps_it = dependencies_.find(current);
275 if (deps_it != dependencies_.end())
276 {
277 for (const auto& dep : deps_it->second)
278 {
279 to_visit.push(dep);
280 }
281 }
282 }
283
284 // Mark needed jobs as ready if dependencies satisfied
285 for (const auto& id : needed_jobs)
286 {
287 auto job_it = jobs_.find(id);
288 if (job_it != jobs_.end() && job_it->second->get_state() == dag_job_state::pending)
289 {
290 bool deps_in_needed = true;
291 auto deps_it = dependencies_.find(id);
292 if (deps_it != dependencies_.end())
293 {
294 for (const auto& dep : deps_it->second)
295 {
296 if (needed_jobs.count(dep) == 0)
297 {
298 deps_in_needed = false;
299 break;
300 }
301 }
302 }
303
304 if (deps_in_needed && are_dependencies_satisfied(id))
305 {
306 job_it->second->set_state(dag_job_state::ready);
307 }
308 }
309 }
310 }
311
313
314 auto result = wait();
315 executing_.store(false);
316 promise->set_value(result);
317 }).detach();
318
319 return future;
320}
auto wait() -> common::VoidResult
Waits for all jobs to complete.
auto schedule_ready_jobs() -> void
Schedules ready jobs for execution.
auto are_dependencies_satisfied(job_id id) const -> bool
Checks if a job's dependencies are all satisfied.

References kcenon::thread::job_invalid, kcenon::thread::make_error_result(), kcenon::thread::pending, kcenon::thread::ready, and kcenon::thread::thread_already_running.

Here is the call graph for this function:

◆ execute_all()

auto kcenon::thread::dag_scheduler::execute_all ( ) -> std::future<common::VoidResult>
nodiscard

Executes all jobs in dependency order.

Returns
Future that completes when all jobs are done

Thread Safety: Thread-safe

Definition at line 185 of file dag_scheduler.cpp.

186{
187 auto promise = std::make_shared<std::promise<common::VoidResult>>();
188 auto future = promise->get_future();
189
190 // Start execution in a new thread
191 std::thread([this, promise]() {
192 {
193 std::unique_lock lock(mutex_);
194 if (executing_.exchange(true))
195 {
197 "Execution already in progress"));
198 return;
199 }
200
201 cancelled_.store(false);
202 first_error_.reset();
203 execution_start_time_ = std::chrono::steady_clock::now();
204
205 // Mark all jobs with satisfied dependencies as ready
206 for (auto& [id, job] : jobs_)
207 {
208 if (job->get_state() == dag_job_state::pending)
209 {
211 {
212 job->set_state(dag_job_state::ready);
213 }
214 }
215 }
216 }
217
218 // Schedule ready jobs
220
221 // Wait for completion
222 auto result = wait();
223 executing_.store(false);
224 promise->set_value(result);
225 }).detach();
226
227 return future;
228}

References kcenon::thread::make_error_result(), kcenon::thread::pending, kcenon::thread::ready, and kcenon::thread::thread_already_running.

Here is the call graph for this function:

◆ execute_job()

auto kcenon::thread::dag_scheduler::execute_job ( job_id id) -> void
private

Executes a single job.

Parameters
idThe job ID to execute

Definition at line 868 of file dag_scheduler.cpp.

869{
870 std::unique_lock lock(mutex_);
871
872 auto it = jobs_.find(id);
873 if (it == jobs_.end())
874 {
875 return;
876 }
877
878 if (!it->second->try_transition_state(dag_job_state::ready, dag_job_state::running))
879 {
880 return; // Already running or in terminal state
881 }
882
883 it->second->record_start_time();
885
887 {
889 }
890
891 lock.unlock();
892
893 // Create a callback job to execute the dag_job
894 auto wrapper_job = std::make_unique<callback_job>(
895 [this, id]() -> common::VoidResult {
896 std::shared_lock slock(mutex_);
897 auto job_it = jobs_.find(id);
898 if (job_it == jobs_.end())
899 {
900 return make_error_result(error_code::job_invalid, "Job not found");
901 }
902
903 auto& dag_job_ptr = job_it->second;
904 slock.unlock();
905
906 auto result = dag_job_ptr->do_work();
907
908 if (result.is_ok())
909 {
911 }
912 else
913 {
914 on_job_failed(id, result.error().message);
915 }
916
917 return result;
918 },
919 "dag_job_" + std::to_string(id)
920 );
921
922 pool_->enqueue(std::move(wrapper_job));
923}
auto on_job_failed(job_id id, const std::string &error) -> void
Called when a job fails.
auto on_job_completed(job_id id) -> void
Called when a job completes successfully.
@ running
Currently executing.

References kcenon::thread::result< T >::is_ok(), kcenon::thread::job_invalid, kcenon::thread::make_error_result(), kcenon::thread::ready, and kcenon::thread::running.

Here is the call graph for this function:

◆ get_all_jobs()

auto kcenon::thread::dag_scheduler::get_all_jobs ( ) const -> std::vector<dag_job_info>
nodiscard

Gets information about all jobs.

Returns
Vector of job info for all jobs

Thread Safety: Thread-safe (acquires shared lock)

Definition at line 405 of file dag_scheduler.cpp.

406{
407 std::shared_lock lock(mutex_);
408
409 std::vector<dag_job_info> result;
410 result.reserve(jobs_.size());
411
412 for (const auto& [id, job] : jobs_)
413 {
414 auto info = job->get_info();
415 auto dep_it = dependents_.find(id);
416 if (dep_it != dependents_.end())
417 {
418 info.dependents = dep_it->second;
419 }
420 result.push_back(std::move(info));
421 }
422
423 return result;
424}
@ info
Informational messages highlighting progress.

References dependents_, kcenon::thread::info, jobs_, and mutex_.

◆ get_config()

auto kcenon::thread::dag_scheduler::get_config ( ) const -> const dag_config&
inlinenodiscard

Gets the configuration.

Returns
Current configuration

Definition at line 306 of file dag_scheduler.h.

306{ return config_; }

References config_.

◆ get_execution_order()

auto kcenon::thread::dag_scheduler::get_execution_order ( ) const -> std::vector<job_id>
nodiscard

Gets topological execution order.

Returns
Vector of job IDs in execution order

Thread Safety: Thread-safe (acquires shared lock)

Definition at line 476 of file dag_scheduler.cpp.

477{
478 std::shared_lock lock(mutex_);
479 return topological_sort();
480}
auto topological_sort() const -> std::vector< job_id >
Performs topological sort.

References mutex_, and topological_sort().

Here is the call graph for this function:

◆ get_job_info()

auto kcenon::thread::dag_scheduler::get_job_info ( job_id id) const -> std::optional<dag_job_info>
nodiscard

Gets information about a specific job.

Parameters
idThe job ID
Returns
Job info, or std::nullopt if not found

Thread Safety: Thread-safe (acquires shared lock)

Definition at line 383 of file dag_scheduler.cpp.

384{
385 std::shared_lock lock(mutex_);
386
387 auto it = jobs_.find(id);
388 if (it == jobs_.end())
389 {
390 return std::nullopt;
391 }
392
393 auto info = it->second->get_info();
394
395 // Add dependents from our tracking
396 auto dep_it = dependents_.find(id);
397 if (dep_it != dependents_.end())
398 {
399 info.dependents = dep_it->second;
400 }
401
402 return info;
403}

References kcenon::thread::info.

◆ get_jobs_in_state()

auto kcenon::thread::dag_scheduler::get_jobs_in_state ( dag_job_state state) const -> std::vector<dag_job_info>
nodiscard

Gets jobs in a specific state.

Parameters
stateThe state to filter by
Returns
Vector of job info for matching jobs

Thread Safety: Thread-safe (acquires shared lock)

Definition at line 426 of file dag_scheduler.cpp.

427{
428 std::shared_lock lock(mutex_);
429
430 std::vector<dag_job_info> result;
431
432 for (const auto& [id, job] : jobs_)
433 {
434 if (job->get_state() == state)
435 {
436 auto info = job->get_info();
437 auto dep_it = dependents_.find(id);
438 if (dep_it != dependents_.end())
439 {
440 info.dependents = dep_it->second;
441 }
442 result.push_back(std::move(info));
443 }
444 }
445
446 return result;
447}

References kcenon::thread::info.

◆ get_ready_jobs()

auto kcenon::thread::dag_scheduler::get_ready_jobs ( ) const -> std::vector<job_id>
nodiscard

Gets IDs of ready jobs (dependencies satisfied)

Returns
Vector of job IDs that can be executed

Thread Safety: Thread-safe (acquires shared lock)

Definition at line 449 of file dag_scheduler.cpp.

450{
451 std::shared_lock lock(mutex_);
452
453 std::vector<job_id> result;
454
455 for (const auto& [id, job] : jobs_)
456 {
457 if (job->get_state() == dag_job_state::ready)
458 {
459 result.push_back(id);
460 }
461 else if (job->get_state() == dag_job_state::pending && are_dependencies_satisfied(id))
462 {
463 result.push_back(id);
464 }
465 }
466
467 return result;
468}

References are_dependencies_satisfied(), jobs_, mutex_, kcenon::thread::pending, and kcenon::thread::ready.

Here is the call graph for this function:

◆ get_result()

template<typename T >
auto kcenon::thread::dag_scheduler::get_result ( job_id id) const -> const T&
inlinenodiscard

Gets the result from a completed job.

Template Parameters
TThe expected result type
Parameters
idThe job ID
Returns
The result value
Exceptions
std::runtime_errorif job not found or not completed
std::bad_any_castif type doesn't match

Thread Safety: Thread-safe (acquires shared lock)

Definition at line 255 of file dag_scheduler.h.

256 {
257 std::shared_lock lock(mutex_);
258 auto it = jobs_.find(id);
259 if (it == jobs_.end())
260 {
261 throw std::runtime_error("Job not found: " + std::to_string(id));
262 }
263 if (it->second->get_state() != dag_job_state::completed)
264 {
265 throw std::runtime_error("Job not completed: " + std::to_string(id));
266 }
267 return it->second->get_result<T>();
268 }

References kcenon::thread::completed, jobs_, and mutex_.

◆ get_state_color()

auto kcenon::thread::dag_scheduler::get_state_color ( dag_job_state state) -> std::string
staticnodiscardprivate

Gets the state color for DOT visualization.

Parameters
stateThe job state
Returns
Color string for DOT format

Definition at line 1144 of file dag_scheduler.cpp.

1145{
1146 switch (state)
1147 {
1148 case dag_job_state::pending: return "#FFFFFF"; // White
1149 case dag_job_state::ready: return "#87CEEB"; // Light blue
1150 case dag_job_state::running: return "#FFFF00"; // Yellow
1151 case dag_job_state::completed: return "#90EE90"; // Light green
1152 case dag_job_state::failed: return "#FF6B6B"; // Light red
1153 case dag_job_state::cancelled: return "#D3D3D3"; // Light gray
1154 case dag_job_state::skipped: return "#FFA500"; // Orange
1155 default: return "#FFFFFF";
1156 }
1157}
@ failed
Execution failed.
@ skipped
Skipped due to dependency failure.

References kcenon::thread::cancelled, kcenon::thread::completed, kcenon::thread::failed, kcenon::thread::pending, kcenon::thread::ready, kcenon::thread::running, and kcenon::thread::skipped.

Referenced by to_dot().

Here is the caller graph for this function:

◆ get_stats()

auto kcenon::thread::dag_scheduler::get_stats ( ) const -> dag_stats
nodiscard

Gets execution statistics.

Returns
Current statistics

Thread Safety: Thread-safe (acquires shared lock)

Definition at line 600 of file dag_scheduler.cpp.

601{
602 // Note: This is called from to_json which already holds the lock
603 // For standalone calls, we need to be careful about deadlock
604
605 dag_stats stats;
606 stats.total_jobs = jobs_.size();
607
608 for (const auto& [id, job] : jobs_)
609 {
610 switch (job->get_state())
611 {
612 case dag_job_state::completed: ++stats.completed_jobs; break;
613 case dag_job_state::failed: ++stats.failed_jobs; break;
615 case dag_job_state::ready: ++stats.pending_jobs; break;
616 case dag_job_state::running: ++stats.running_jobs; break;
617 case dag_job_state::skipped: ++stats.skipped_jobs; break;
618 case dag_job_state::cancelled: ++stats.cancelled_jobs; break;
619 }
620 }
621
622 if (executing_.load())
623 {
624 stats.total_execution_time = std::chrono::duration_cast<std::chrono::milliseconds>(
625 std::chrono::steady_clock::now() - execution_start_time_);
626 }
627
628 return stats;
629}

References kcenon::thread::cancelled, kcenon::thread::dag_stats::cancelled_jobs, kcenon::thread::completed, kcenon::thread::dag_stats::completed_jobs, executing_, execution_start_time_, kcenon::thread::failed, kcenon::thread::dag_stats::failed_jobs, jobs_, kcenon::thread::pending, kcenon::thread::dag_stats::pending_jobs, kcenon::thread::ready, kcenon::thread::running, kcenon::thread::dag_stats::running_jobs, kcenon::thread::skipped, kcenon::thread::dag_stats::skipped_jobs, kcenon::thread::dag_stats::total_execution_time, and kcenon::thread::dag_stats::total_jobs.

Referenced by to_json().

Here is the caller graph for this function:

◆ has_cycles()

auto kcenon::thread::dag_scheduler::has_cycles ( ) const -> bool
nodiscard

Checks if the DAG has cycles.

Returns
true if cycles are detected

Thread Safety: Thread-safe (acquires shared lock)

Definition at line 470 of file dag_scheduler.cpp.

471{
472 std::shared_lock lock(mutex_);
473 return detect_cycle();
474}

References detect_cycle(), and mutex_.

Here is the call graph for this function:

◆ is_execution_complete()

auto kcenon::thread::dag_scheduler::is_execution_complete ( ) const -> bool
nodiscardprivate

Checks if execution is complete.

Returns
true if all jobs are in terminal state

Definition at line 1159 of file dag_scheduler.cpp.

1160{
1161 for (const auto& [id, job] : jobs_)
1162 {
1163 auto state = job->get_state();
1164 if (state == dag_job_state::pending ||
1165 state == dag_job_state::ready ||
1166 state == dag_job_state::running)
1167 {
1168 return false;
1169 }
1170 }
1171 return true;
1172}

References jobs_, kcenon::thread::pending, kcenon::thread::ready, and kcenon::thread::running.

◆ on_job_completed()

auto kcenon::thread::dag_scheduler::on_job_completed ( job_id id) -> void
private

Called when a job completes successfully.

Parameters
idThe completed job ID

Definition at line 635 of file dag_scheduler.cpp.

636{
638 std::unique_lock lock(mutex_);
639
640 auto it = jobs_.find(id);
641 if (it == jobs_.end())
642 {
643 return;
644 }
645
646 auto old_state = it->second->get_state();
647 it->second->set_state(dag_job_state::completed);
648 it->second->record_end_time();
650
652 {
654 }
655
657 {
659 }
660
661 // Check and update dependents
662 auto dep_it = dependents_.find(id);
663 if (dep_it != dependents_.end())
664 {
665 for (const auto& dependent : dep_it->second)
666 {
667 auto job_it = jobs_.find(dependent);
668 if (job_it != jobs_.end() && job_it->second->get_state() == dag_job_state::pending)
669 {
670 if (are_dependencies_satisfied(dependent))
671 {
672 job_it->second->set_state(dag_job_state::ready);
674 {
676 }
677 }
678 }
679 }
680 }
681
682 lock.unlock();
683
684 // Schedule more jobs
686
687 // Notify waiters
688 completion_cv_.notify_all();
690}
std::function< void(job_id)> completion_callback
Callback for job completion.
Definition dag_config.h:140

References kcenon::thread::completed, kcenon::thread::pending, and kcenon::thread::ready.

◆ on_job_failed()

auto kcenon::thread::dag_scheduler::on_job_failed ( job_id id,
const std::string & error ) -> void
private

Called when a job fails.

Parameters
idThe failed job ID
errorThe error message

Definition at line 692 of file dag_scheduler.cpp.

693{
695 std::unique_lock lock(mutex_);
696
697 auto it = jobs_.find(id);
698 if (it == jobs_.end())
699 {
700 return;
701 }
702
703 it->second->set_error_message(error);
704 it->second->record_end_time();
705
706 // Handle based on failure policy
707 switch (config_.failure_policy)
708 {
710 {
711 auto& retry_count = retry_counts_[id];
712 if (retry_count < config_.max_retries)
713 {
714 ++retry_count;
715 // Reset state to ready for retry
716 it->second->set_state(dag_job_state::ready);
718
720 {
722 }
723
724 lock.unlock();
725
726 // Wait before retry
727 std::this_thread::sleep_for(config_.retry_delay);
728
730 return;
731 }
732 // Fall through to fail_fast if max retries exceeded
733 [[fallthrough]];
734 }
735
737 {
738 if (config_.failure_policy == dag_failure_policy::fallback && it->second->has_fallback())
739 {
740 // Try fallback
741 auto fallback = it->second->get_fallback();
742 auto result = fallback();
743 if (result.is_ok())
744 {
745 // Fallback succeeded
746 it->second->set_state(dag_job_state::completed);
749 {
751 }
752 lock.unlock();
754 completion_cv_.notify_all();
756 return;
757 }
758 }
759 [[fallthrough]];
760 }
761
763 {
764 auto old_state = it->second->get_state();
765 it->second->set_state(dag_job_state::failed);
767
768 if (!first_error_)
769 {
771 }
772
774 {
776 }
777
779 {
781 }
782
783 // Cancel all dependents
785 break;
786 }
787
789 {
790 auto old_state = it->second->get_state();
791 it->second->set_state(dag_job_state::failed);
793
794 if (!first_error_)
795 {
797 }
798
800 {
802 }
803
805 {
807 }
808
809 // Skip dependents but continue others
810 skip_dependents(id);
811 break;
812 }
813 }
814
815 lock.unlock();
817 completion_cv_.notify_all();
819}
auto skip_dependents(job_id failed_id) -> void
Marks dependents as skipped due to dependency failure.
auto cancel_dependents(job_id failed_id) -> void
Cancels dependents due to dependency failure.
common::error_info to_error_info(error_code code, const std::string &message="")
Convert a thread::error_code to common::error_info.
@ error
Error events that might still allow continuation.
@ retry
Retry failed job (with max retries)
@ fallback
Execute fallback job if available.
@ continue_others
Continue unrelated jobs, skip dependents.
@ fail_fast
Cancel all dependents immediately on failure.
std::size_t max_retries
Maximum number of retry attempts for failed jobs.
Definition dag_config.h:90
dag_failure_policy failure_policy
How to handle job failures.
Definition dag_config.h:82
std::function< void(job_id, const std::string &)> error_callback
Callback for job errors.
Definition dag_config.h:132
std::chrono::milliseconds retry_delay
Delay between retry attempts.
Definition dag_config.h:97

References kcenon::thread::completed, kcenon::thread::continue_others, kcenon::thread::fail_fast, kcenon::thread::failed, kcenon::thread::fallback, kcenon::thread::result< T >::is_ok(), kcenon::thread::job_execution_failed, kcenon::thread::ready, kcenon::thread::retry, kcenon::thread::running, and kcenon::thread::to_error_info().

Here is the call graph for this function:

◆ operator=() [1/2]

auto kcenon::thread::dag_scheduler::operator= ( const dag_scheduler & ) -> dag_scheduler &=delete
delete

◆ operator=() [2/2]

auto kcenon::thread::dag_scheduler::operator= ( dag_scheduler && other) -> dag_scheduler&
noexcept

Definition at line 51 of file dag_scheduler.cpp.

52{
53 if (this != &other)
54 {
55 pool_ = std::move(other.pool_);
56 config_ = std::move(other.config_);
57 jobs_ = std::move(other.jobs_);
58 dependencies_ = std::move(other.dependencies_);
59 dependents_ = std::move(other.dependents_);
60 executing_.store(other.executing_.load());
61 cancelled_.store(other.cancelled_.load());
62 running_count_.store(other.running_count_.load());
63 execution_start_time_ = other.execution_start_time_;
64 first_error_ = std::move(other.first_error_);
65 retry_counts_ = std::move(other.retry_counts_);
66 }
67 return *this;
68}

◆ remove_job()

auto kcenon::thread::dag_scheduler::remove_job ( job_id id) -> common::VoidResult
nodiscard

Removes a job from the DAG (only if not yet started)

Parameters
idThe job ID to remove
Returns
Success or error if job is running or not found

Thread Safety: Thread-safe (acquires exclusive lock)

Definition at line 145 of file dag_scheduler.cpp.

146{
147 std::unique_lock lock(mutex_);
148
149 auto it = jobs_.find(id);
150 if (it == jobs_.end())
151 {
153 "Job not found: " + std::to_string(id));
154 }
155
156 auto state = it->second->get_state();
157 if (state == dag_job_state::running)
158 {
160 "Cannot remove running job: " + std::to_string(id));
161 }
162
163 // Remove from dependency graphs
164 dependencies_.erase(id);
165 dependents_.erase(id);
166
167 // Remove from other jobs' dependencies
168 for (auto& [job_id, deps] : dependencies_)
169 {
170 deps.erase(std::remove(deps.begin(), deps.end(), id), deps.end());
171 }
172 for (auto& [job_id, deps] : dependents_)
173 {
174 deps.erase(std::remove(deps.begin(), deps.end(), id), deps.end());
175 }
176
177 jobs_.erase(it);
178 return common::ok();
179}

References kcenon::thread::job_invalid, kcenon::thread::make_error_result(), and kcenon::thread::running.

Here is the call graph for this function:

◆ reset()

auto kcenon::thread::dag_scheduler::reset ( ) -> common::VoidResult
nodiscard

Resets the scheduler for reuse.

Clears all jobs and resets state. Cannot be called while jobs are running.

Thread Safety: Thread-safe (acquires exclusive lock)

Definition at line 359 of file dag_scheduler.cpp.

360{
361 std::unique_lock lock(mutex_);
362
363 if (executing_.load())
364 {
366 "Cannot reset while execution is in progress");
367 }
368
369 jobs_.clear();
370 dependencies_.clear();
371 dependents_.clear();
372 retry_counts_.clear();
373 first_error_.reset();
374 cancelled_.store(false);
375
376 return common::ok();
377}

References kcenon::thread::make_error_result(), and kcenon::thread::thread_already_running.

Here is the call graph for this function:

◆ schedule_ready_jobs()

auto kcenon::thread::dag_scheduler::schedule_ready_jobs ( ) -> void
private

Schedules ready jobs for execution.

Definition at line 821 of file dag_scheduler.cpp.

822{
823 if (cancelled_.load())
824 {
825 return;
826 }
827
828 std::vector<job_id> ready_jobs;
829
830 {
831 std::shared_lock lock(mutex_);
832 for (const auto& [id, job] : jobs_)
833 {
834 if (job->get_state() == dag_job_state::ready)
835 {
836 ready_jobs.push_back(id);
837 }
838 }
839 }
840
841 for (const auto& id : ready_jobs)
842 {
843 if (cancelled_.load())
844 {
845 break;
846 }
847
849 {
850 execute_job(id);
851 }
852 else
853 {
854 // Execute sequentially
855 execute_job(id);
856 // Wait for completion before next job
857 std::shared_lock lock(mutex_);
858 completion_cv_.wait(lock, [this, id]() {
859 auto it = jobs_.find(id);
860 if (it == jobs_.end()) return true;
861 auto state = it->second->get_state();
862 return state != dag_job_state::running;
863 });
864 }
865 }
866}
auto execute_job(job_id id) -> void
Executes a single job.
bool execute_in_parallel
Whether to execute ready jobs in parallel.
Definition dag_config.h:113

References kcenon::thread::ready, and kcenon::thread::running.

◆ skip_dependents()

auto kcenon::thread::dag_scheduler::skip_dependents ( job_id failed_id) -> void
private

Marks dependents as skipped due to dependency failure.

Parameters
failed_idThe failed job ID

Definition at line 1050 of file dag_scheduler.cpp.

1051{
1052 auto it = dependents_.find(failed_id);
1053 if (it == dependents_.end())
1054 {
1055 return;
1056 }
1057
1058 std::queue<job_id> to_skip;
1059 for (const auto& dep : it->second)
1060 {
1061 to_skip.push(dep);
1062 }
1063
1064 while (!to_skip.empty())
1065 {
1066 auto current = to_skip.front();
1067 to_skip.pop();
1068
1069 auto job_it = jobs_.find(current);
1070 if (job_it == jobs_.end())
1071 {
1072 continue;
1073 }
1074
1075 auto old_state = job_it->second->get_state();
1076 if (old_state == dag_job_state::pending || old_state == dag_job_state::ready)
1077 {
1078 job_it->second->set_state(dag_job_state::skipped);
1080 {
1081 config_.state_callback(current, old_state, dag_job_state::skipped);
1082 }
1083
1084 // Also skip dependents of skipped job
1085 auto dep_it = dependents_.find(current);
1086 if (dep_it != dependents_.end())
1087 {
1088 for (const auto& dep : dep_it->second)
1089 {
1090 to_skip.push(dep);
1091 }
1092 }
1093 }
1094 }
1095}

References kcenon::thread::pending, kcenon::thread::ready, and kcenon::thread::skipped.

◆ to_dot()

auto kcenon::thread::dag_scheduler::to_dot ( ) const -> std::string
nodiscard

Exports the DAG as DOT format (Graphviz)

Returns
DOT format string

Thread Safety: Thread-safe (acquires shared lock)

Definition at line 486 of file dag_scheduler.cpp.

487{
488 std::shared_lock lock(mutex_);
489
490 std::ostringstream ss;
491 ss << "digraph DAG {\n";
492 ss << " rankdir=TB;\n";
493 ss << " node [shape=box, style=filled];\n\n";
494
495 // Nodes
496 for (const auto& [id, job] : jobs_)
497 {
498 auto state = job->get_state();
499 ss << " " << id << " [label=\"" << job->get_name()
500 << "\\n(" << dag_job_state_to_string(state) << ")\""
501 << ", fillcolor=\"" << get_state_color(state) << "\"];\n";
502 }
503
504 ss << "\n";
505
506 // Edges
507 for (const auto& [dependent, deps] : dependencies_)
508 {
509 for (const auto& dependency : deps)
510 {
511 ss << " " << dependency << " -> " << dependent << ";\n";
512 }
513 }
514
515 ss << "}\n";
516
517 return ss.str();
518}
static auto get_state_color(dag_job_state state) -> std::string
Gets the state color for DOT visualization.
auto dag_job_state_to_string(dag_job_state state) -> std::string
Convert dag_job_state to string representation.
Definition dag_job.h:60

References kcenon::thread::dag_job_state_to_string(), dependencies_, kcenon::thread::job::get_name(), get_state_color(), jobs_, and mutex_.

Here is the call graph for this function:

◆ to_json()

auto kcenon::thread::dag_scheduler::to_json ( ) const -> std::string
nodiscard

Exports the DAG as JSON format.

Returns
JSON format string

Thread Safety: Thread-safe (acquires shared lock)

Definition at line 520 of file dag_scheduler.cpp.

521{
522 std::shared_lock lock(mutex_);
523
524 std::ostringstream ss;
525 ss << "{\n";
526 ss << " \"jobs\": [\n";
527
528 bool first_job = true;
529 for (const auto& [id, job] : jobs_)
530 {
531 if (!first_job) ss << ",\n";
532 first_job = false;
533
534 auto info = job->get_info();
535 ss << " {\n";
536 ss << " \"id\": " << id << ",\n";
537 ss << " \"name\": \"" << info.name << "\",\n";
538 ss << " \"state\": \"" << dag_job_state_to_string(info.state) << "\",\n";
539 ss << " \"dependencies\": [";
540
541 bool first_dep = true;
542 for (const auto& dep : info.dependencies)
543 {
544 if (!first_dep) ss << ", ";
545 first_dep = false;
546 ss << dep;
547 }
548 ss << "],\n";
549
550 ss << " \"dependents\": [";
551 auto dep_it = dependents_.find(id);
552 if (dep_it != dependents_.end())
553 {
554 first_dep = true;
555 for (const auto& dep : dep_it->second)
556 {
557 if (!first_dep) ss << ", ";
558 first_dep = false;
559 ss << dep;
560 }
561 }
562 ss << "],\n";
563
564 auto wait_ms = info.get_wait_time().count();
565 auto exec_ms = info.get_execution_time().count();
566 ss << " \"wait_time_ms\": " << wait_ms << ",\n";
567 ss << " \"execution_time_ms\": " << exec_ms;
568
569 if (info.error_message)
570 {
571 ss << ",\n \"error\": \"" << *info.error_message << "\"";
572 }
573
574 ss << "\n }";
575 }
576
577 ss << "\n ],\n";
578
579 // Add statistics
580 auto stats = get_stats();
581 ss << " \"stats\": {\n";
582 ss << " \"total_jobs\": " << stats.total_jobs << ",\n";
583 ss << " \"completed_jobs\": " << stats.completed_jobs << ",\n";
584 ss << " \"failed_jobs\": " << stats.failed_jobs << ",\n";
585 ss << " \"pending_jobs\": " << stats.pending_jobs << ",\n";
586 ss << " \"running_jobs\": " << stats.running_jobs << ",\n";
587 ss << " \"skipped_jobs\": " << stats.skipped_jobs << ",\n";
588 ss << " \"cancelled_jobs\": " << stats.cancelled_jobs << "\n";
589 ss << " }\n";
590
591 ss << "}\n";
592
593 return ss.str();
594}
auto get_stats() const -> dag_stats
Gets execution statistics.

References kcenon::thread::dag_job_state_to_string(), dependents_, get_stats(), kcenon::thread::info, jobs_, and mutex_.

Here is the call graph for this function:

◆ topological_sort()

auto kcenon::thread::dag_scheduler::topological_sort ( ) const -> std::vector<job_id>
nodiscardprivate

Performs topological sort.

Returns
Sorted job IDs, or empty if cycle detected

Definition at line 925 of file dag_scheduler.cpp.

926{
927 std::vector<job_id> result;
928 std::unordered_map<job_id, int> in_degree;
929 std::queue<job_id> ready_queue;
930
931 // Initialize in-degrees
932 for (const auto& [id, _] : jobs_)
933 {
934 in_degree[id] = 0;
935 }
936
937 for (const auto& [id, deps] : dependencies_)
938 {
939 in_degree[id] = static_cast<int>(deps.size());
940 }
941
942 // Find initial ready jobs (in-degree 0)
943 for (const auto& [id, degree] : in_degree)
944 {
945 if (degree == 0)
946 {
947 ready_queue.push(id);
948 }
949 }
950
951 // Process queue
952 while (!ready_queue.empty())
953 {
954 auto current = ready_queue.front();
955 ready_queue.pop();
956 result.push_back(current);
957
958 // Reduce in-degree of dependents
959 auto it = dependents_.find(current);
960 if (it != dependents_.end())
961 {
962 for (const auto& dependent : it->second)
963 {
964 --in_degree[dependent];
965 if (in_degree[dependent] == 0)
966 {
967 ready_queue.push(dependent);
968 }
969 }
970 }
971 }
972
973 // If result doesn't contain all jobs, there's a cycle
974 if (result.size() != jobs_.size())
975 {
976 return {}; // Cycle detected
977 }
978
979 return result;
980}

References dependencies_, dependents_, and jobs_.

Referenced by get_execution_order().

Here is the caller graph for this function:

◆ wait()

auto kcenon::thread::dag_scheduler::wait ( ) -> common::VoidResult
nodiscard

Waits for all jobs to complete.

Returns
Success or the first error encountered

Thread Safety: Thread-safe

Definition at line 343 of file dag_scheduler.cpp.

344{
345 std::shared_lock lock(mutex_);
346
347 completion_cv_.wait(lock, [this]() {
348 return is_execution_complete() || cancelled_.load();
349 });
350
351 if (first_error_)
352 {
353 return common::VoidResult(*first_error_);
354 }
355
356 return common::ok();
357}
auto is_execution_complete() const -> bool
Checks if execution is complete.

Member Data Documentation

◆ active_callbacks_

std::atomic<std::size_t> kcenon::thread::dag_scheduler::active_callbacks_ {0}
private

Number of active callbacks (for safe destruction)

Tracks callbacks that may access members after releasing the mutex. The destructor waits for this to reach 0 before destroying members.

Definition at line 365 of file dag_scheduler.h.

365{0};

Referenced by ~dag_scheduler().

◆ cancelled_

std::atomic<bool> kcenon::thread::dag_scheduler::cancelled_ {false}
private

Flag indicating cancellation was requested.

Definition at line 352 of file dag_scheduler.h.

352{false};

◆ completion_cv_

std::condition_variable_any kcenon::thread::dag_scheduler::completion_cv_
private

Condition variable for waiting on completion.

Definition at line 342 of file dag_scheduler.h.

◆ config_

dag_config kcenon::thread::dag_scheduler::config_
private

Configuration.

Definition at line 317 of file dag_scheduler.h.

Referenced by get_config().

◆ dependencies_

std::unordered_map<job_id, std::vector<job_id> > kcenon::thread::dag_scheduler::dependencies_
private

Dependency graph (job -> jobs it depends on)

Definition at line 327 of file dag_scheduler.h.

Referenced by detect_cycle(), to_dot(), and topological_sort().

◆ dependents_

std::unordered_map<job_id, std::vector<job_id> > kcenon::thread::dag_scheduler::dependents_
private

Reverse dependency graph (job -> jobs that depend on it)

Definition at line 332 of file dag_scheduler.h.

Referenced by get_all_jobs(), to_json(), and topological_sort().

◆ executing_

std::atomic<bool> kcenon::thread::dag_scheduler::executing_ {false}
private

Flag indicating execution is in progress.

Definition at line 347 of file dag_scheduler.h.

347{false};

Referenced by get_stats().

◆ execution_start_time_

std::chrono::steady_clock::time_point kcenon::thread::dag_scheduler::execution_start_time_
private

Execution start time.

Definition at line 370 of file dag_scheduler.h.

Referenced by get_stats().

◆ first_error_

std::optional<common::error_info> kcenon::thread::dag_scheduler::first_error_
private

First error encountered during execution.

Definition at line 375 of file dag_scheduler.h.

◆ jobs_

std::unordered_map<job_id, std::unique_ptr<dag_job> > kcenon::thread::dag_scheduler::jobs_
private

◆ mutex_

std::shared_mutex kcenon::thread::dag_scheduler::mutex_
mutableprivate

Mutex for thread-safe access.

Definition at line 337 of file dag_scheduler.h.

Referenced by get_all_jobs(), get_execution_order(), get_ready_jobs(), get_result(), has_cycles(), to_dot(), and to_json().

◆ pool_

std::shared_ptr<thread_pool> kcenon::thread::dag_scheduler::pool_
private

Thread pool for job execution.

Definition at line 312 of file dag_scheduler.h.

◆ retry_counts_

std::unordered_map<job_id, std::size_t> kcenon::thread::dag_scheduler::retry_counts_
private

Retry count per job.

Definition at line 380 of file dag_scheduler.h.

◆ running_count_

std::atomic<std::size_t> kcenon::thread::dag_scheduler::running_count_ {0}
private

Number of jobs currently running.

Definition at line 357 of file dag_scheduler.h.

357{0};

The documentation for this class was generated from the following files: