Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
dag_job.h
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2024, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
13#pragma once
14
17
18#include <any>
19#include <atomic>
20#include <chrono>
21#include <cstdint>
22#include <functional>
23#include <memory>
24#include <optional>
25#include <string>
26#include <vector>
27
28namespace kcenon::thread
29{
33 using job_id = std::uint64_t;
34
38 constexpr job_id INVALID_JOB_ID = 0;
39
44 enum class dag_job_state
45 {
46 pending,
47 ready,
48 running,
49 completed,
50 failed,
51 cancelled,
52 skipped
53 };
54
60 [[nodiscard]] inline auto dag_job_state_to_string(dag_job_state state) -> std::string
61 {
62 switch (state)
63 {
64 case dag_job_state::pending: return "pending";
65 case dag_job_state::ready: return "ready";
66 case dag_job_state::running: return "running";
67 case dag_job_state::completed: return "completed";
68 case dag_job_state::failed: return "failed";
69 case dag_job_state::cancelled: return "cancelled";
70 case dag_job_state::skipped: return "skipped";
71 default: return "unknown";
72 }
73 }
74
83 {
85 std::string name;
87
88 std::vector<job_id> dependencies;
89 std::vector<job_id> dependents;
90
91 std::chrono::steady_clock::time_point submit_time;
92 std::chrono::steady_clock::time_point start_time;
93 std::chrono::steady_clock::time_point end_time;
94
95 std::optional<std::string> error_message;
96 std::optional<std::any> result;
97
102 [[nodiscard]] auto get_wait_time() const -> std::chrono::milliseconds
103 {
104 if (start_time == std::chrono::steady_clock::time_point{})
105 {
106 return std::chrono::milliseconds{0};
107 }
108 return std::chrono::duration_cast<std::chrono::milliseconds>(start_time - submit_time);
109 }
110
115 [[nodiscard]] auto get_execution_time() const -> std::chrono::milliseconds
116 {
117 if (end_time == std::chrono::steady_clock::time_point{} ||
118 start_time == std::chrono::steady_clock::time_point{})
119 {
120 return std::chrono::milliseconds{0};
121 }
122 return std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time);
123 }
124 };
125
126 // Forward declaration
127 class dag_scheduler;
128
156 class dag_job : public job
157 {
158 public:
163 explicit dag_job(const std::string& name = "dag_job");
164
168 ~dag_job() override;
169
176 [[nodiscard]] auto get_dag_id() const -> job_id { return dag_id_; }
177
184 [[nodiscard]] auto get_state() const -> dag_job_state
185 {
186 return state_.load(std::memory_order_acquire);
187 }
188
195 auto set_state(dag_job_state new_state) -> void
196 {
197 state_.store(new_state, std::memory_order_release);
198 }
199
208 [[nodiscard]] auto try_transition_state(dag_job_state expected, dag_job_state desired) -> bool
209 {
210 return state_.compare_exchange_strong(expected, desired,
211 std::memory_order_acq_rel,
212 std::memory_order_acquire);
213 }
214
221 [[nodiscard]] auto get_dependencies() const -> const std::vector<job_id>& { return dependencies_; }
222
229 auto add_dependency(job_id dependency_id) -> void
230 {
231 if (dependency_id != INVALID_JOB_ID)
232 {
233 dependencies_.push_back(dependency_id);
234 }
235 }
236
243 auto add_dependencies(const std::vector<job_id>& dependency_ids) -> void
244 {
245 for (const auto& id : dependency_ids)
246 {
247 add_dependency(id);
248 }
249 }
250
257 auto set_work(std::function<common::VoidResult()> work_func) -> void
258 {
259 work_func_ = std::move(work_func);
260 }
261
269 template<typename T>
270 auto set_work_with_result(std::function<common::Result<T>()> work_func) -> void
271 {
272 work_func_ = [this, func = std::move(work_func)]() -> common::VoidResult {
273 auto result = func();
274 if (result.is_ok())
275 {
277 return common::ok();
278 }
279 return common::VoidResult(result.error());
280 };
281 }
282
289 auto set_fallback(std::function<common::VoidResult()> fallback_func) -> void
290 {
291 fallback_func_ = std::move(fallback_func);
292 }
293
298 [[nodiscard]] auto get_fallback() const -> const std::function<common::VoidResult()>&
299 {
300 return fallback_func_;
301 }
302
307 [[nodiscard]] auto has_fallback() const -> bool
308 {
309 return fallback_func_ != nullptr;
310 }
311
319 template<typename T>
320 auto set_result(T&& value) -> void
321 {
322 result_ = std::forward<T>(value);
323 }
324
333 template<typename T>
334 [[nodiscard]] auto get_result() const -> const T&
335 {
336 return std::any_cast<const T&>(result_);
337 }
338
343 [[nodiscard]] auto has_result() const -> bool
344 {
345 return result_.has_value();
346 }
347
352 [[nodiscard]] auto get_result_any() const -> const std::any& { return result_; }
353
358 auto set_error_message(const std::string& message) -> void
359 {
360 error_message_ = message;
361 }
362
367 [[nodiscard]] auto get_error_message() const -> const std::optional<std::string>&
368 {
369 return error_message_;
370 }
371
375 auto record_start_time() -> void
376 {
377 start_time_ = std::chrono::steady_clock::now();
378 }
379
383 auto record_end_time() -> void
384 {
385 end_time_ = std::chrono::steady_clock::now();
386 }
387
392 [[nodiscard]] auto get_submit_time() const -> std::chrono::steady_clock::time_point
393 {
394 return submit_time_;
395 }
396
401 [[nodiscard]] auto get_start_time() const -> std::chrono::steady_clock::time_point
402 {
403 return start_time_;
404 }
405
410 [[nodiscard]] auto get_end_time() const -> std::chrono::steady_clock::time_point
411 {
412 return end_time_;
413 }
414
419 [[nodiscard]] auto get_info() const -> dag_job_info;
420
427 [[nodiscard]] auto do_work() -> common::VoidResult override;
428
433 [[nodiscard]] auto to_string() const -> std::string override;
434
435 private:
439 static std::atomic<job_id> next_dag_id_;
440
445
450
454 std::vector<job_id> dependencies_;
455
459 std::function<common::VoidResult()> work_func_;
460
464 std::function<common::VoidResult()> fallback_func_;
465
469 std::any result_;
470
474 std::optional<std::string> error_message_;
475
479 std::chrono::steady_clock::time_point submit_time_;
480
484 std::chrono::steady_clock::time_point start_time_;
485
489 std::chrono::steady_clock::time_point end_time_;
490 };
491
492} // namespace kcenon::thread
493
494// ----------------------------------------------------------------------------
495// Formatter specializations for dag_job
496// ----------------------------------------------------------------------------
497
499
A job with dependency support for DAG-based scheduling.
Definition dag_job.h:157
auto get_info() const -> dag_job_info
Creates a dag_job_info snapshot.
Definition dag_job.cpp:23
auto get_error_message() const -> const std::optional< std::string > &
Gets the error message.
Definition dag_job.h:367
auto try_transition_state(dag_job_state expected, dag_job_state desired) -> bool
Attempts to transition state atomically.
Definition dag_job.h:208
std::function< common::VoidResult()> work_func_
The work function to execute.
Definition dag_job.h:459
auto get_state() const -> dag_job_state
Gets the current state of the job.
Definition dag_job.h:184
auto record_start_time() -> void
Records the start time.
Definition dag_job.h:375
std::chrono::steady_clock::time_point submit_time_
Time when the job was created.
Definition dag_job.h:479
dag_job(const std::string &name="dag_job")
Constructs a new dag_job with a name.
Definition dag_job.cpp:14
auto set_work_with_result(std::function< common::Result< T >()> work_func) -> void
Sets the work function with result.
Definition dag_job.h:270
auto get_start_time() const -> std::chrono::steady_clock::time_point
Gets the start time.
Definition dag_job.h:401
std::chrono::steady_clock::time_point start_time_
Time when execution started.
Definition dag_job.h:484
std::atomic< dag_job_state > state_
Current state of the job.
Definition dag_job.h:449
auto add_dependencies(const std::vector< job_id > &dependency_ids) -> void
Adds multiple dependencies.
Definition dag_job.h:243
job_id dag_id_
Unique identifier for this job in the DAG.
Definition dag_job.h:444
auto get_result() const -> const T &
Gets the result value.
Definition dag_job.h:334
std::vector< job_id > dependencies_
List of job IDs this job depends on.
Definition dag_job.h:454
auto set_error_message(const std::string &message) -> void
Sets the error message for failed jobs.
Definition dag_job.h:358
auto has_fallback() const -> bool
Checks if a fallback function is set.
Definition dag_job.h:307
auto has_result() const -> bool
Checks if the job has a result.
Definition dag_job.h:343
auto get_result_any() const -> const std::any &
Gets the result as std::any.
Definition dag_job.h:352
auto get_dependencies() const -> const std::vector< job_id > &
Gets the list of dependency job IDs.
Definition dag_job.h:221
auto set_work(std::function< common::VoidResult()> work_func) -> void
Sets the work function to execute.
Definition dag_job.h:257
std::optional< std::string > error_message_
Error message if job failed.
Definition dag_job.h:474
std::any result_
Result value for passing between jobs.
Definition dag_job.h:469
auto do_work() -> common::VoidResult override
Executes the job's work function.
Definition dag_job.cpp:43
auto get_dag_id() const -> job_id
Gets the unique DAG job identifier.
Definition dag_job.h:176
std::function< common::VoidResult()> fallback_func_
The fallback function to execute on failure.
Definition dag_job.h:464
auto record_end_time() -> void
Records the end time.
Definition dag_job.h:383
std::chrono::steady_clock::time_point end_time_
Time when execution ended.
Definition dag_job.h:489
~dag_job() override
Virtual destructor.
auto add_dependency(job_id dependency_id) -> void
Adds a dependency on another job.
Definition dag_job.h:229
auto get_fallback() const -> const std::function< common::VoidResult()> &
Gets the fallback function.
Definition dag_job.h:298
auto set_state(dag_job_state new_state) -> void
Sets the job state.
Definition dag_job.h:195
auto set_result(T &&value) -> void
Sets the result value.
Definition dag_job.h:320
auto set_fallback(std::function< common::VoidResult()> fallback_func) -> void
Sets the fallback function to execute on failure.
Definition dag_job.h:289
auto get_submit_time() const -> std::chrono::steady_clock::time_point
Gets the submit time.
Definition dag_job.h:392
auto get_end_time() const -> std::chrono::steady_clock::time_point
Gets the end time.
Definition dag_job.h:410
static std::atomic< job_id > next_dag_id_
Static counter for generating unique DAG job IDs.
Definition dag_job.h:439
auto to_string() const -> std::string override
Returns a string representation of the job.
Definition dag_job.cpp:75
Represents a unit of work (task) to be executed, typically by a job queue.
Definition job.h:136
A template class representing either a value or an error.
T & value() &
Gets the value.
bool is_ok() const noexcept
Checks if the result is successful.
Error codes and utilities for the thread system.
Provides macros for generating std::formatter specializations.
Base job class for schedulable work units in the thread system.
Core threading foundation of the thread system library.
Definition thread_impl.h:17
auto dag_job_state_to_string(dag_job_state state) -> std::string
Convert dag_job_state to string representation.
Definition dag_job.h:60
dag_job_state
State of a job in the DAG scheduler.
Definition dag_job.h:45
@ failed
Execution failed.
@ cancelled
Cancelled by user or dependency failure.
@ running
Currently executing.
@ pending
Waiting for dependencies to complete.
@ completed
Successfully completed.
@ ready
Dependencies satisfied, can be executed.
@ skipped
Skipped due to dependency failure.
std::uint64_t job_id
Unique job identifier for DAG scheduler.
Definition dag_job.h:33
constexpr job_id INVALID_JOB_ID
Invalid job ID constant.
Definition dag_job.h:38
STL namespace.
Information about a job in the DAG.
Definition dag_job.h:83
std::chrono::steady_clock::time_point submit_time
When job was added to DAG.
Definition dag_job.h:91
std::optional< std::string > error_message
Error message if failed.
Definition dag_job.h:95
std::vector< job_id > dependencies
Jobs this job depends on.
Definition dag_job.h:88
auto get_wait_time() const -> std::chrono::milliseconds
Calculate wait time (time from submit to start)
Definition dag_job.h:102
std::chrono::steady_clock::time_point end_time
When execution ended.
Definition dag_job.h:93
dag_job_state state
Current job state.
Definition dag_job.h:86
std::vector< job_id > dependents
Jobs that depend on this job.
Definition dag_job.h:89
std::optional< std::any > result
Result value for passing between jobs.
Definition dag_job.h:96
auto get_execution_time() const -> std::chrono::milliseconds
Calculate execution time.
Definition dag_job.h:115
std::string name
Human-readable job name.
Definition dag_job.h:85
std::chrono::steady_clock::time_point start_time
When execution started.
Definition dag_job.h:92
#define DECLARE_FORMATTER(CLASS_NAME)
Generates std::formatter specializations for narrow and wide characters.