Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
dag_scheduler.h
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2024, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
12#pragma once
13
14#include "dag_job.h"
15#include "dag_job_builder.h"
16#include "dag_config.h"
17
20
21#include <atomic>
22#include <condition_variable>
23#include <future>
24#include <memory>
25#include <mutex>
26#include <optional>
27#include <shared_mutex>
28#include <string>
29#include <unordered_map>
30#include <unordered_set>
31#include <vector>
32
33namespace kcenon::thread
34{
79 {
80 public:
86 explicit dag_scheduler(std::shared_ptr<thread_pool> pool,
87 dag_config config = {});
88
93
94 // Non-copyable
95 dag_scheduler(const dag_scheduler&) = delete;
96 auto operator=(const dag_scheduler&) -> dag_scheduler& = delete;
97
98 // Movable
99 dag_scheduler(dag_scheduler&&) noexcept;
100 auto operator=(dag_scheduler&&) noexcept -> dag_scheduler&;
101
102 // ============================================
103 // Job Management
104 // ============================================
105
113 [[nodiscard]] auto add_job(std::unique_ptr<dag_job> j) -> job_id;
114
122 [[nodiscard]] auto add_job(dag_job_builder&& builder) -> job_id;
123
132 [[nodiscard]] auto add_dependency(job_id dependent, job_id dependency) -> common::VoidResult;
133
141 [[nodiscard]] auto remove_job(job_id id) -> common::VoidResult;
142
143 // ============================================
144 // Execution Control
145 // ============================================
146
153 [[nodiscard]] auto execute_all() -> std::future<common::VoidResult>;
154
162 [[nodiscard]] auto execute(job_id target) -> std::future<common::VoidResult>;
163
171 auto cancel_all() -> void;
172
179 [[nodiscard]] auto wait() -> common::VoidResult;
180
188 [[nodiscard]] auto reset() -> common::VoidResult;
189
190 // ============================================
191 // Query
192 // ============================================
193
201 [[nodiscard]] auto get_job_info(job_id id) const -> std::optional<dag_job_info>;
202
209 [[nodiscard]] auto get_all_jobs() const -> std::vector<dag_job_info>;
210
218 [[nodiscard]] auto get_jobs_in_state(dag_job_state state) const -> std::vector<dag_job_info>;
219
226 [[nodiscard]] auto get_ready_jobs() const -> std::vector<job_id>;
227
234 [[nodiscard]] auto has_cycles() const -> bool;
235
242 [[nodiscard]] auto get_execution_order() const -> std::vector<job_id>;
243
254 template<typename T>
255 [[nodiscard]] auto get_result(job_id id) const -> const T&
256 {
257 std::shared_lock lock(mutex_);
258 auto it = jobs_.find(id);
259 if (it == jobs_.end())
260 {
261 throw std::runtime_error("Job not found: " + std::to_string(id));
262 }
263 if (it->second->get_state() != dag_job_state::completed)
264 {
265 throw std::runtime_error("Job not completed: " + std::to_string(id));
266 }
267 return it->second->get_result<T>();
268 }
269
270 // ============================================
271 // Visualization
272 // ============================================
273
280 [[nodiscard]] auto to_dot() const -> std::string;
281
288 [[nodiscard]] auto to_json() const -> std::string;
289
290 // ============================================
291 // Statistics
292 // ============================================
293
300 [[nodiscard]] auto get_stats() const -> dag_stats;
301
306 [[nodiscard]] auto get_config() const -> const dag_config& { return config_; }
307
308 private:
312 std::shared_ptr<thread_pool> pool_;
313
318
322 std::unordered_map<job_id, std::unique_ptr<dag_job>> jobs_;
323
327 std::unordered_map<job_id, std::vector<job_id>> dependencies_;
328
332 std::unordered_map<job_id, std::vector<job_id>> dependents_;
333
337 mutable std::shared_mutex mutex_;
338
342 std::condition_variable_any completion_cv_;
343
347 std::atomic<bool> executing_{false};
348
352 std::atomic<bool> cancelled_{false};
353
357 std::atomic<std::size_t> running_count_{0};
358
365 std::atomic<std::size_t> active_callbacks_{0};
366
370 std::chrono::steady_clock::time_point execution_start_time_;
371
375 std::optional<common::error_info> first_error_;
376
380 std::unordered_map<job_id, std::size_t> retry_counts_;
381
382 // ============================================
383 // Internal Methods
384 // ============================================
385
390 auto on_job_completed(job_id id) -> void;
391
397 auto on_job_failed(job_id id, const std::string& error) -> void;
398
402 auto schedule_ready_jobs() -> void;
403
408 auto execute_job(job_id id) -> void;
409
414 [[nodiscard]] auto topological_sort() const -> std::vector<job_id>;
415
420 [[nodiscard]] auto detect_cycle() const -> bool;
421
427 [[nodiscard]] auto are_dependencies_satisfied(job_id id) const -> bool;
428
433 auto skip_dependents(job_id failed_id) -> void;
434
439 auto cancel_dependents(job_id failed_id) -> void;
440
446 [[nodiscard]] static auto get_state_color(dag_job_state state) -> std::string;
447
452 [[nodiscard]] auto is_execution_complete() const -> bool;
453 };
454
455} // namespace kcenon::thread
Fluent builder for creating dag_job instances.
A job with dependency support for DAG-based scheduling.
Definition dag_job.h:157
DAG-based job scheduler with dependency management.
std::atomic< std::size_t > active_callbacks_
Number of active callbacks (for safe destruction)
std::shared_mutex mutex_
Mutex for thread-safe access.
auto on_job_failed(job_id id, const std::string &error) -> void
Called when a job fails.
auto wait() -> common::VoidResult
Waits for all jobs to complete.
auto topological_sort() const -> std::vector< job_id >
Performs topological sort.
std::unordered_map< job_id, std::vector< job_id > > dependents_
Reverse dependency graph (job -> jobs that depend on it)
auto on_job_completed(job_id id) -> void
Called when a job completes successfully.
auto to_json() const -> std::string
Exports the DAG as JSON format.
auto get_execution_order() const -> std::vector< job_id >
Gets topological execution order.
static auto get_state_color(dag_job_state state) -> std::string
Gets the state color for DOT visualization.
auto get_config() const -> const dag_config &
Gets the configuration.
std::unordered_map< job_id, std::unique_ptr< dag_job > > jobs_
Job storage (job_id -> dag_job)
auto execute(job_id target) -> std::future< common::VoidResult >
Executes a specific job and its dependencies.
dag_scheduler(std::shared_ptr< thread_pool > pool, dag_config config={})
Constructs a DAG scheduler with a thread pool.
auto schedule_ready_jobs() -> void
Schedules ready jobs for execution.
auto get_jobs_in_state(dag_job_state state) const -> std::vector< dag_job_info >
Gets jobs in a specific state.
std::shared_ptr< thread_pool > pool_
Thread pool for job execution.
auto get_job_info(job_id id) const -> std::optional< dag_job_info >
Gets information about a specific job.
std::unordered_map< job_id, std::size_t > retry_counts_
Retry count per job.
auto reset() -> common::VoidResult
Resets the scheduler for reuse.
auto operator=(const dag_scheduler &) -> dag_scheduler &=delete
std::atomic< std::size_t > running_count_
Number of jobs currently running.
auto execute_job(job_id id) -> void
Executes a single job.
std::optional< common::error_info > first_error_
First error encountered during execution.
auto is_execution_complete() const -> bool
Checks if execution is complete.
std::unordered_map< job_id, std::vector< job_id > > dependencies_
Dependency graph (job -> jobs it depends on)
auto get_all_jobs() const -> std::vector< dag_job_info >
Gets information about all jobs.
auto add_job(std::unique_ptr< dag_job > j) -> job_id
Adds a job to the DAG.
std::chrono::steady_clock::time_point execution_start_time_
Execution start time.
auto add_dependency(job_id dependent, job_id dependency) -> common::VoidResult
Adds a dependency between jobs.
auto get_ready_jobs() const -> std::vector< job_id >
Gets IDs of ready jobs (dependencies satisfied)
auto execute_all() -> std::future< common::VoidResult >
Executes all jobs in dependency order.
auto cancel_all() -> void
Cancels all pending jobs.
std::condition_variable_any completion_cv_
Condition variable for waiting on completion.
std::atomic< bool > executing_
Flag indicating execution is in progress.
auto are_dependencies_satisfied(job_id id) const -> bool
Checks if a job's dependencies are all satisfied.
auto get_result(job_id id) const -> const T &
Gets the result from a completed job.
dag_config config_
Configuration.
auto has_cycles() const -> bool
Checks if the DAG has cycles.
auto get_stats() const -> dag_stats
Gets execution statistics.
std::atomic< bool > cancelled_
Flag indicating cancellation was requested.
auto remove_job(job_id id) -> common::VoidResult
Removes a job from the DAG (only if not yet started)
~dag_scheduler()
Destructor - cancels any pending jobs.
auto detect_cycle() const -> bool
Detects cycles using DFS.
dag_scheduler(const dag_scheduler &)=delete
auto skip_dependents(job_id failed_id) -> void
Marks dependents as skipped due to dependency failure.
auto cancel_dependents(job_id failed_id) -> void
Cancels dependents due to dependency failure.
auto to_dot() const -> std::string
Exports the DAG as DOT format (Graphviz)
Represents an error in the thread system.
Core thread pool implementation with work stealing and auto-scaling.
Configuration for DAG scheduler including failure handling policies.
DAG-aware job with dependency tracking and unique identifiers.
Fluent builder for creating dag_job instances with dependencies.
Error codes and utilities for the thread system.
Core threading foundation of the thread system library.
Definition thread_impl.h:17
dag_job_state
State of a job in the DAG scheduler.
Definition dag_job.h:45
@ completed
Successfully completed.
std::uint64_t job_id
Unique job identifier for DAG scheduler.
Definition dag_job.h:33
STL namespace.
Configuration options for the DAG scheduler.
Definition dag_config.h:73
Information about a job in the DAG.
Definition dag_job.h:83
Statistics about DAG execution.
Definition dag_config.h:148