Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
kcenon::thread::dag_job Class Reference

A job with dependency support for DAG-based scheduling. More...

#include <dag_job.h>

Inheritance diagram for kcenon::thread::dag_job:
Inheritance graph
Collaboration diagram for kcenon::thread::dag_job:
Collaboration graph

Public Member Functions

 dag_job (const std::string &name="dag_job")
 Constructs a new dag_job with a name.
 
 ~dag_job () override
 Virtual destructor.
 
auto get_dag_id () const -> job_id
 Gets the unique DAG job identifier.
 
auto get_state () const -> dag_job_state
 Gets the current state of the job.
 
auto set_state (dag_job_state new_state) -> void
 Sets the job state.
 
auto try_transition_state (dag_job_state expected, dag_job_state desired) -> bool
 Attempts to transition state atomically.
 
auto get_dependencies () const -> const std::vector< job_id > &
 Gets the list of dependency job IDs.
 
auto add_dependency (job_id dependency_id) -> void
 Adds a dependency on another job.
 
auto add_dependencies (const std::vector< job_id > &dependency_ids) -> void
 Adds multiple dependencies.
 
auto set_work (std::function< common::VoidResult()> work_func) -> void
 Sets the work function to execute.
 
template<typename T >
auto set_work_with_result (std::function< common::Result< T >()> work_func) -> void
 Sets the work function with result.
 
auto set_fallback (std::function< common::VoidResult()> fallback_func) -> void
 Sets the fallback function to execute on failure.
 
auto get_fallback () const -> const std::function< common::VoidResult()> &
 Gets the fallback function.
 
auto has_fallback () const -> bool
 Checks if a fallback function is set.
 
template<typename T >
auto set_result (T &&value) -> void
 Sets the result value.
 
template<typename T >
auto get_result () const -> const T &
 Gets the result value.
 
auto has_result () const -> bool
 Checks if the job has a result.
 
auto get_result_any () const -> const std::any &
 Gets the result as std::any.
 
auto set_error_message (const std::string &message) -> void
 Sets the error message for failed jobs.
 
auto get_error_message () const -> const std::optional< std::string > &
 Gets the error message.
 
auto record_start_time () -> void
 Records the start time.
 
auto record_end_time () -> void
 Records the end time.
 
auto get_submit_time () const -> std::chrono::steady_clock::time_point
 Gets the submit time.
 
auto get_start_time () const -> std::chrono::steady_clock::time_point
 Gets the start time.
 
auto get_end_time () const -> std::chrono::steady_clock::time_point
 Gets the end time.
 
auto get_info () const -> dag_job_info
 Creates a dag_job_info snapshot.
 
auto do_work () -> common::VoidResult override
 Executes the job's work function.
 
auto to_string () const -> std::string override
 Returns a string representation of the job.
 
- Public Member Functions inherited from kcenon::thread::job
auto get_job_id () const -> std::uint64_t
 Gets the unique ID of this job.
 
auto get_enqueue_time () const -> std::chrono::steady_clock::time_point
 Gets the time when this job was created (enqueued).
 
 job (const std::string &name="job")
 Constructs a new job with an optional human-readable name.
 
 job (const std::vector< uint8_t > &data, const std::string &name="data_job")
 Constructs a new job with associated raw byte data and a name.
 
virtual ~job (void)
 Virtual destructor for the job class to allow proper cleanup in derived classes.
 
auto get_name (void) const -> std::string
 Retrieves the name of this job.
 
virtual auto set_cancellation_token (const cancellation_token &token) -> void
 Sets a cancellation token that can be used to cancel the job.
 
virtual auto get_cancellation_token () const -> cancellation_token
 Gets the cancellation token associated with this job.
 
virtual auto set_job_queue (const std::shared_ptr< job_queue > &job_queue) -> void
 Associates this job with a specific job_queue.
 
virtual auto get_job_queue (void) const -> std::shared_ptr< job_queue >
 Retrieves the job_queue associated with this job, if any.
 
auto with_on_complete (std::function< void(common::VoidResult)> callback) -> job &
 Attaches a completion callback to this job.
 
auto with_on_error (std::function< void(const common::error_info &)> callback) -> job &
 Attaches an error callback to this job.
 
auto with_priority (job_priority priority) -> job &
 Sets the priority level for this job.
 
auto with_cancellation (const cancellation_token &token) -> job &
 Attaches a cancellation token to this job via composition.
 
auto with_retry (const retry_policy &policy) -> job &
 Attaches a retry policy to this job.
 
auto with_timeout (std::chrono::milliseconds timeout) -> job &
 Sets a timeout for job execution.
 
auto get_priority () const -> job_priority
 Gets the priority level of this job.
 
auto get_retry_policy () const -> std::optional< retry_policy >
 Gets the retry policy of this job.
 
auto get_timeout () const -> std::optional< std::chrono::milliseconds >
 Gets the timeout duration for this job.
 
auto has_explicit_cancellation () const -> bool
 Checks if this job has an explicit cancellation set via composition.
 
auto has_components () const -> bool
 Checks if this job has any composed components.
 

Private Attributes

job_id dag_id_
 Unique identifier for this job in the DAG.
 
std::atomic< dag_job_statestate_ {dag_job_state::pending}
 Current state of the job.
 
std::vector< job_iddependencies_
 List of job IDs this job depends on.
 
std::function< common::VoidResult()> work_func_
 The work function to execute.
 
std::function< common::VoidResult()> fallback_func_
 The fallback function to execute on failure.
 
std::any result_
 Result value for passing between jobs.
 
std::optional< std::string > error_message_
 Error message if job failed.
 
std::chrono::steady_clock::time_point submit_time_
 Time when the job was created.
 
std::chrono::steady_clock::time_point start_time_
 Time when execution started.
 
std::chrono::steady_clock::time_point end_time_
 Time when execution ended.
 

Static Private Attributes

static std::atomic< job_idnext_dag_id_ {1}
 Static counter for generating unique DAG job IDs.
 

Additional Inherited Members

- Protected Member Functions inherited from kcenon::thread::job
auto invoke_callbacks (const common::VoidResult &result) -> void
 Invokes the completion callbacks if they are set.
 
- Protected Attributes inherited from kcenon::thread::job
std::string name_
 The descriptive name of the job, used primarily for identification and logging.
 
std::vector< uint8_t > data_
 An optional container of raw byte data that may be used by the job.
 
std::weak_ptr< job_queuejob_queue_
 A weak reference to the job_queue that currently manages this job.
 
cancellation_token cancellation_token_
 The cancellation token associated with this job.
 

Detailed Description

A job with dependency support for DAG-based scheduling.

The dag_job class extends the base job class to support:

  • Dependency declarations on other jobs
  • State tracking for DAG execution
  • Result storage for passing data between jobs

Thread Safety

  • State transitions are atomic
  • Result access should be synchronized externally
  • Dependencies should be set before adding to scheduler

Usage Example

auto job_a = std::make_unique<dag_job>("fetch_data");
job_a->set_work([]{ return fetch_from_database(); });
auto job_b = std::make_unique<dag_job>("process_data");
job_b->add_dependency(job_a->get_dag_id());
job_b->set_work([&scheduler, job_a_id] {
auto data = scheduler.get_result<Data>(job_a_id);
return process(data);
});

Definition at line 156 of file dag_job.h.

Constructor & Destructor Documentation

◆ dag_job()

kcenon::thread::dag_job::dag_job ( const std::string & name = "dag_job")
explicit

Constructs a new dag_job with a name.

Parameters
nameHuman-readable name for the job

Definition at line 14 of file dag_job.cpp.

15 : job(name)
16 , dag_id_(next_dag_id_.fetch_add(1, std::memory_order_relaxed))
17 , submit_time_(std::chrono::steady_clock::now())
18{
19}
std::chrono::steady_clock::time_point submit_time_
Time when the job was created.
Definition dag_job.h:479
job_id dag_id_
Unique identifier for this job in the DAG.
Definition dag_job.h:444
static std::atomic< job_id > next_dag_id_
Static counter for generating unique DAG job IDs.
Definition dag_job.h:439
job(const std::string &name="job")
Constructs a new job with an optional human-readable name.
Definition job.cpp:53

◆ ~dag_job()

kcenon::thread::dag_job::~dag_job ( )
overridedefault

Virtual destructor.

Member Function Documentation

◆ add_dependencies()

auto kcenon::thread::dag_job::add_dependencies ( const std::vector< job_id > & dependency_ids) -> void
inline

Adds multiple dependencies.

Parameters
dependency_idsVector of job IDs to depend on

Thread Safety: Not thread-safe, should be called before scheduling

Definition at line 243 of file dag_job.h.

244 {
245 for (const auto& id : dependency_ids)
246 {
247 add_dependency(id);
248 }
249 }
auto add_dependency(job_id dependency_id) -> void
Adds a dependency on another job.
Definition dag_job.h:229

References add_dependency().

Here is the call graph for this function:

◆ add_dependency()

auto kcenon::thread::dag_job::add_dependency ( job_id dependency_id) -> void
inline

Adds a dependency on another job.

Parameters
dependency_idThe ID of the job to depend on

Thread Safety: Not thread-safe, should be called before scheduling

Definition at line 229 of file dag_job.h.

230 {
231 if (dependency_id != INVALID_JOB_ID)
232 {
233 dependencies_.push_back(dependency_id);
234 }
235 }
std::vector< job_id > dependencies_
List of job IDs this job depends on.
Definition dag_job.h:454
constexpr job_id INVALID_JOB_ID
Invalid job ID constant.
Definition dag_job.h:38

References dependencies_, and kcenon::thread::INVALID_JOB_ID.

Referenced by add_dependencies().

Here is the caller graph for this function:

◆ do_work()

auto kcenon::thread::dag_job::do_work ( void ) -> common::VoidResult
nodiscardoverridevirtual

Executes the job's work function.

Returns
Result of execution

Thread Safety: Should only be called by the scheduler

Reimplemented from kcenon::thread::job.

Definition at line 43 of file dag_job.cpp.

44{
45 // Check cancellation token
47 {
49 return make_error_result(error_code::operation_canceled, "Job was cancelled");
50 }
51
52 // If no work function is set, return success
53 if (!work_func_)
54 {
55 return common::ok();
56 }
57
58 // Execute the work function
59 try
60 {
61 return work_func_();
62 }
63 catch (const std::exception& e)
64 {
65 set_error_message(e.what());
67 }
68 catch (...)
69 {
70 set_error_message("Unknown exception");
71 return make_error_result(error_code::job_execution_failed, "Unknown exception during job execution");
72 }
73}
bool is_cancelled() const
Checks if the token has been canceled.
std::function< common::VoidResult()> work_func_
The work function to execute.
Definition dag_job.h:459
auto set_error_message(const std::string &message) -> void
Sets the error message for failed jobs.
Definition dag_job.h:358
auto set_state(dag_job_state new_state) -> void
Sets the job state.
Definition dag_job.h:195
cancellation_token cancellation_token_
The cancellation token associated with this job.
Definition job.h:504
common::VoidResult make_error_result(error_code code, const std::string &message="")
Create a common::VoidResult error from a thread::error_code.
@ cancelled
Cancelled by user or dependency failure.

References kcenon::thread::cancelled, kcenon::thread::job_execution_failed, kcenon::thread::make_error_result(), and kcenon::thread::operation_canceled.

Here is the call graph for this function:

◆ get_dag_id()

auto kcenon::thread::dag_job::get_dag_id ( ) const -> job_id
inlinenodiscard

Gets the unique DAG job identifier.

Returns
The job's unique ID within the DAG

Thread Safety: Safe to call from any thread (ID is immutable)

Definition at line 176 of file dag_job.h.

176{ return dag_id_; }

References dag_id_.

◆ get_dependencies()

auto kcenon::thread::dag_job::get_dependencies ( ) const -> const std::vector<job_id>&
inlinenodiscard

Gets the list of dependency job IDs.

Returns
Vector of job IDs this job depends on

Thread Safety: Not thread-safe, should be set before scheduling

Definition at line 221 of file dag_job.h.

221{ return dependencies_; }

References dependencies_.

◆ get_end_time()

auto kcenon::thread::dag_job::get_end_time ( ) const -> std::chrono::steady_clock::time_point
inlinenodiscard

Gets the end time.

Returns
The time when execution ended

Definition at line 410 of file dag_job.h.

411 {
412 return end_time_;
413 }
std::chrono::steady_clock::time_point end_time_
Time when execution ended.
Definition dag_job.h:489

References end_time_.

◆ get_error_message()

auto kcenon::thread::dag_job::get_error_message ( ) const -> const std::optional<std::string>&
inlinenodiscard

Gets the error message.

Returns
The error message, or std::nullopt if not set

Definition at line 367 of file dag_job.h.

368 {
369 return error_message_;
370 }
std::optional< std::string > error_message_
Error message if job failed.
Definition dag_job.h:474

References error_message_.

◆ get_fallback()

auto kcenon::thread::dag_job::get_fallback ( ) const -> const std::function<common::VoidResult()>&
inlinenodiscard

Gets the fallback function.

Returns
The fallback function, or nullptr if not set

Definition at line 298 of file dag_job.h.

299 {
300 return fallback_func_;
301 }
std::function< common::VoidResult()> fallback_func_
The fallback function to execute on failure.
Definition dag_job.h:464

References fallback_func_.

◆ get_info()

auto kcenon::thread::dag_job::get_info ( ) const -> dag_job_info
nodiscard

Creates a dag_job_info snapshot.

Returns
Snapshot of current job state

Definition at line 23 of file dag_job.cpp.

24{
25 dag_job_info info;
26 info.id = dag_id_;
27 info.name = get_name();
28 info.state = get_state();
29 info.dependencies = dependencies_;
30 info.submit_time = submit_time_;
31 info.start_time = start_time_;
32 info.end_time = end_time_;
33 info.error_message = error_message_;
34
35 if (has_result())
36 {
37 info.result = result_;
38 }
39
40 return info;
41}
auto get_state() const -> dag_job_state
Gets the current state of the job.
Definition dag_job.h:184
std::chrono::steady_clock::time_point start_time_
Time when execution started.
Definition dag_job.h:484
auto has_result() const -> bool
Checks if the job has a result.
Definition dag_job.h:343
std::any result_
Result value for passing between jobs.
Definition dag_job.h:469
auto get_name(void) const -> std::string
Retrieves the name of this job.
Definition job.cpp:112
@ info
Informational messages highlighting progress.

References dag_id_, dependencies_, end_time_, error_message_, kcenon::thread::job::get_name(), get_state(), has_result(), kcenon::thread::dag_job_info::id, kcenon::thread::info, result_, start_time_, and submit_time_.

Here is the call graph for this function:

◆ get_result()

template<typename T >
auto kcenon::thread::dag_job::get_result ( ) const -> const T&
inlinenodiscard

Gets the result value.

Template Parameters
TThe expected result type
Returns
The result value
Exceptions
std::bad_any_castif type doesn't match

Thread Safety: Not thread-safe, should be called after job completes

Definition at line 334 of file dag_job.h.

335 {
336 return std::any_cast<const T&>(result_);
337 }

References result_.

◆ get_result_any()

auto kcenon::thread::dag_job::get_result_any ( ) const -> const std::any&
inlinenodiscard

Gets the result as std::any.

Returns
The result as std::any

Definition at line 352 of file dag_job.h.

352{ return result_; }

References result_.

◆ get_start_time()

auto kcenon::thread::dag_job::get_start_time ( ) const -> std::chrono::steady_clock::time_point
inlinenodiscard

Gets the start time.

Returns
The time when execution started

Definition at line 401 of file dag_job.h.

402 {
403 return start_time_;
404 }

References start_time_.

◆ get_state()

auto kcenon::thread::dag_job::get_state ( ) const -> dag_job_state
inlinenodiscard

Gets the current state of the job.

Returns
Current dag_job_state

Thread Safety: Atomic read

Definition at line 184 of file dag_job.h.

185 {
186 return state_.load(std::memory_order_acquire);
187 }
std::atomic< dag_job_state > state_
Current state of the job.
Definition dag_job.h:449

References state_.

Referenced by get_info(), and to_string().

Here is the caller graph for this function:

◆ get_submit_time()

auto kcenon::thread::dag_job::get_submit_time ( ) const -> std::chrono::steady_clock::time_point
inlinenodiscard

Gets the submit time.

Returns
The time when the job was created

Definition at line 392 of file dag_job.h.

393 {
394 return submit_time_;
395 }

References submit_time_.

◆ has_fallback()

auto kcenon::thread::dag_job::has_fallback ( ) const -> bool
inlinenodiscard

Checks if a fallback function is set.

Returns
true if fallback is set

Definition at line 307 of file dag_job.h.

308 {
309 return fallback_func_ != nullptr;
310 }

References fallback_func_.

◆ has_result()

auto kcenon::thread::dag_job::has_result ( ) const -> bool
inlinenodiscard

Checks if the job has a result.

Returns
true if result is set

Definition at line 343 of file dag_job.h.

344 {
345 return result_.has_value();
346 }

References result_.

Referenced by get_info().

Here is the caller graph for this function:

◆ record_end_time()

auto kcenon::thread::dag_job::record_end_time ( ) -> void
inline

Records the end time.

Definition at line 383 of file dag_job.h.

384 {
385 end_time_ = std::chrono::steady_clock::now();
386 }

References end_time_.

◆ record_start_time()

auto kcenon::thread::dag_job::record_start_time ( ) -> void
inline

Records the start time.

Definition at line 375 of file dag_job.h.

376 {
377 start_time_ = std::chrono::steady_clock::now();
378 }

References start_time_.

◆ set_error_message()

auto kcenon::thread::dag_job::set_error_message ( const std::string & message) -> void
inline

Sets the error message for failed jobs.

Parameters
messageThe error message

Definition at line 358 of file dag_job.h.

359 {
360 error_message_ = message;
361 }

References error_message_.

◆ set_fallback()

auto kcenon::thread::dag_job::set_fallback ( std::function< common::VoidResult()> fallback_func) -> void
inline

Sets the fallback function to execute on failure.

Parameters
fallback_funcThe fallback function

Thread Safety: Not thread-safe, should be called before scheduling

Definition at line 289 of file dag_job.h.

290 {
291 fallback_func_ = std::move(fallback_func);
292 }

References fallback_func_.

◆ set_result()

template<typename T >
auto kcenon::thread::dag_job::set_result ( T && value) -> void
inline

Sets the result value.

Template Parameters
TThe result type
Parameters
valueThe result value

Thread Safety: Not thread-safe, should be called from worker thread only

Definition at line 320 of file dag_job.h.

321 {
322 result_ = std::forward<T>(value);
323 }

References result_.

Referenced by set_work_with_result().

Here is the caller graph for this function:

◆ set_state()

auto kcenon::thread::dag_job::set_state ( dag_job_state new_state) -> void
inline

Sets the job state.

Parameters
new_stateThe new state to set

Thread Safety: Atomic write

Definition at line 195 of file dag_job.h.

196 {
197 state_.store(new_state, std::memory_order_release);
198 }

References state_.

◆ set_work()

auto kcenon::thread::dag_job::set_work ( std::function< common::VoidResult()> work_func) -> void
inline

Sets the work function to execute.

Parameters
work_funcThe function to execute

Thread Safety: Not thread-safe, should be called before scheduling

Definition at line 257 of file dag_job.h.

258 {
259 work_func_ = std::move(work_func);
260 }

References work_func_.

◆ set_work_with_result()

template<typename T >
auto kcenon::thread::dag_job::set_work_with_result ( std::function< common::Result< T >()> work_func) -> void
inline

Sets the work function with result.

Template Parameters
TThe result type
Parameters
work_funcThe function to execute that returns a result

Thread Safety: Not thread-safe, should be called before scheduling

Definition at line 270 of file dag_job.h.

271 {
272 work_func_ = [this, func = std::move(work_func)]() -> common::VoidResult {
273 auto result = func();
274 if (result.is_ok())
275 {
276 set_result(result.value());
277 return common::ok();
278 }
279 return common::VoidResult(result.error());
280 };
281 }
auto set_result(T &&value) -> void
Sets the result value.
Definition dag_job.h:320

References kcenon::thread::result< T >::is_ok(), set_result(), kcenon::thread::result< T >::value(), and work_func_.

Here is the call graph for this function:

◆ to_string()

auto kcenon::thread::dag_job::to_string ( void ) const -> std::string
nodiscardoverridevirtual

Returns a string representation of the job.

Returns
String describing the job

Reimplemented from kcenon::thread::job.

Definition at line 75 of file dag_job.cpp.

76{
77 return std::format("[dag_job: {} (id={}, state={})]",
78 get_name(),
79 dag_id_,
81}
auto dag_job_state_to_string(dag_job_state state) -> std::string
Convert dag_job_state to string representation.
Definition dag_job.h:60

References dag_id_, kcenon::thread::dag_job_state_to_string(), kcenon::thread::job::get_name(), and get_state().

Here is the call graph for this function:

◆ try_transition_state()

auto kcenon::thread::dag_job::try_transition_state ( dag_job_state expected,
dag_job_state desired ) -> bool
inlinenodiscard

Attempts to transition state atomically.

Parameters
expectedThe expected current state
desiredThe desired new state
Returns
true if transition succeeded, false otherwise

Thread Safety: Atomic compare-exchange

Definition at line 208 of file dag_job.h.

209 {
210 return state_.compare_exchange_strong(expected, desired,
211 std::memory_order_acq_rel,
212 std::memory_order_acquire);
213 }

References state_.

Member Data Documentation

◆ dag_id_

job_id kcenon::thread::dag_job::dag_id_
private

Unique identifier for this job in the DAG.

Definition at line 444 of file dag_job.h.

Referenced by get_dag_id(), get_info(), and to_string().

◆ dependencies_

std::vector<job_id> kcenon::thread::dag_job::dependencies_
private

List of job IDs this job depends on.

Definition at line 454 of file dag_job.h.

Referenced by add_dependency(), get_dependencies(), and get_info().

◆ end_time_

std::chrono::steady_clock::time_point kcenon::thread::dag_job::end_time_
private

Time when execution ended.

Definition at line 489 of file dag_job.h.

Referenced by get_end_time(), get_info(), and record_end_time().

◆ error_message_

std::optional<std::string> kcenon::thread::dag_job::error_message_
private

Error message if job failed.

Definition at line 474 of file dag_job.h.

Referenced by get_error_message(), get_info(), and set_error_message().

◆ fallback_func_

std::function<common::VoidResult()> kcenon::thread::dag_job::fallback_func_
private

The fallback function to execute on failure.

Definition at line 464 of file dag_job.h.

Referenced by get_fallback(), has_fallback(), and set_fallback().

◆ next_dag_id_

std::atomic< job_id > kcenon::thread::dag_job::next_dag_id_ {1}
staticprivate

Static counter for generating unique DAG job IDs.

Definition at line 439 of file dag_job.h.

◆ result_

std::any kcenon::thread::dag_job::result_
private

Result value for passing between jobs.

Definition at line 469 of file dag_job.h.

Referenced by get_info(), get_result(), get_result_any(), has_result(), and set_result().

◆ start_time_

std::chrono::steady_clock::time_point kcenon::thread::dag_job::start_time_
private

Time when execution started.

Definition at line 484 of file dag_job.h.

Referenced by get_info(), get_start_time(), and record_start_time().

◆ state_

std::atomic<dag_job_state> kcenon::thread::dag_job::state_ {dag_job_state::pending}
private

Current state of the job.

Definition at line 449 of file dag_job.h.

@ pending
Waiting for dependencies to complete.

Referenced by get_state(), set_state(), and try_transition_state().

◆ submit_time_

std::chrono::steady_clock::time_point kcenon::thread::dag_job::submit_time_
private

Time when the job was created.

Definition at line 479 of file dag_job.h.

Referenced by get_info(), and get_submit_time().

◆ work_func_

std::function<common::VoidResult()> kcenon::thread::dag_job::work_func_
private

The work function to execute.

Definition at line 459 of file dag_job.h.

Referenced by set_work(), and set_work_with_result().


The documentation for this class was generated from the following files: