Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
job.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2024, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
6
8
9using namespace utility_module;
10
32namespace kcenon::thread
33{
34 // Initialize static member for job ID generation
35 std::atomic<std::uint64_t> job::next_job_id_{0};
36
53 job::job(const std::string& name)
54 : name_(name)
55 , data_(std::vector<uint8_t>())
56 , job_id_(next_job_id_.fetch_add(1, std::memory_order_relaxed))
57 , enqueue_time_(std::chrono::steady_clock::now())
58 {
59 }
60
83 job::job(const std::vector<uint8_t>& data, const std::string& name)
84 : name_(name)
85 , data_(data)
86 , job_id_(next_job_id_.fetch_add(1, std::memory_order_relaxed))
87 , enqueue_time_(std::chrono::steady_clock::now())
88 {
89 }
90
100 job::~job(void) {}
101
112 auto job::get_name(void) const -> std::string { return name_; }
113
135 auto job::do_work(void) -> common::VoidResult {
136 // Base implementation indicates missing override in derived class
137 // This should never be called in production code
138 return common::error_info{static_cast<int>(error_code::not_implemented), "job::do_work() must be implemented in derived class", "thread_system"};
139 }
140
164 cancellation_token_ = token;
165 }
166
195
219 auto job::set_job_queue(const std::shared_ptr<job_queue>& job_queue) -> void
220 {
221 job_queue_ = job_queue;
222 }
223
244 auto job::get_job_queue(void) const -> std::shared_ptr<job_queue> { return job_queue_.lock(); }
245
262 auto job::to_string(void) const -> std::string { return formatter::format("job: {}", name_); }
263
264 // ========================================================================
265 // Composition Methods Implementation
266 // ========================================================================
267
280 {
281 if (!components_)
282 {
283 components_ = std::make_unique<job_components>();
284 }
285 return *components_;
286 }
287
299 auto job::with_on_complete(std::function<void(common::VoidResult)> callback) -> job&
300 {
301 ensure_components().on_complete = std::move(callback);
302 return *this;
303 }
304
316 auto job::with_on_error(std::function<void(const common::error_info&)> callback) -> job&
317 {
318 ensure_components().on_error = std::move(callback);
319 return *this;
320 }
321
334 {
335 ensure_components().priority = priority;
336 return *this;
337 }
338
351 {
352 cancellation_token_ = token;
353 ensure_components().has_explicit_cancellation = true;
354 return *this;
355 }
356
368 auto job::with_retry(const retry_policy& policy) -> job&
369 {
370 ensure_components().retry = policy;
371 return *this;
372 }
373
385 auto job::with_timeout(std::chrono::milliseconds timeout) -> job&
386 {
387 ensure_components().timeout = timeout;
388 return *this;
389 }
390
401 {
402 if (components_ && components_->priority.has_value())
403 {
404 return components_->priority.value();
405 }
407 }
408
418 auto job::get_retry_policy() const -> std::optional<retry_policy>
419 {
420 if (components_ && components_->retry.has_value())
421 {
422 return components_->retry;
423 }
424 return std::nullopt;
425 }
426
436 auto job::get_timeout() const -> std::optional<std::chrono::milliseconds>
437 {
438 if (components_ && components_->timeout.has_value())
439 {
440 return components_->timeout;
441 }
442 return std::nullopt;
443 }
444
455 {
456 return components_ && components_->has_explicit_cancellation;
457 }
458
468 auto job::has_components() const -> bool
469 {
470 return components_ != nullptr;
471 }
472
484 auto job::invoke_callbacks(const common::VoidResult& result) -> void
485 {
486 if (!components_)
487 {
488 return;
489 }
490
491 // Invoke error callback first if result is an error
492 if (result.is_err() && components_->on_error)
493 {
494 components_->on_error(result.error());
495 }
496
497 // Always invoke on_complete callback if set
498 if (components_->on_complete)
499 {
500 components_->on_complete(result);
501 }
502 }
503} // namespace kcenon::thread
Provides a mechanism for cooperative cancellation of operations.
A thread-safe job queue for managing and dispatching work items.
Definition job_queue.h:65
Represents a unit of work (task) to be executed, typically by a job queue.
Definition job.h:136
auto get_priority() const -> job_priority
Gets the priority level of this job.
Definition job.cpp:400
auto with_retry(const retry_policy &policy) -> job &
Attaches a retry policy to this job.
Definition job.cpp:368
std::unique_ptr< job_components > components_
Composed components for this job.
Definition job.h:535
virtual auto set_job_queue(const std::shared_ptr< job_queue > &job_queue) -> void
Associates this job with a specific job_queue.
Definition job.cpp:219
auto ensure_components() -> job_components &
Ensures components_ is allocated (lazy initialization).
Definition job.cpp:279
auto has_components() const -> bool
Checks if this job has any composed components.
Definition job.cpp:468
auto with_on_error(std::function< void(const common::error_info &)> callback) -> job &
Attaches an error callback to this job.
Definition job.cpp:316
virtual auto set_cancellation_token(const cancellation_token &token) -> void
Sets a cancellation token that can be used to cancel the job.
Definition job.cpp:163
virtual ~job(void)
Virtual destructor for the job class to allow proper cleanup in derived classes.
Definition job.cpp:100
virtual auto to_string(void) const -> std::string
Provides a string representation of the job for logging or debugging.
Definition job.cpp:262
static std::atomic< std::uint64_t > next_job_id_
Static counter for generating unique job IDs.
Definition job.h:510
auto with_timeout(std::chrono::milliseconds timeout) -> job &
Sets a timeout for job execution.
Definition job.cpp:385
auto invoke_callbacks(const common::VoidResult &result) -> void
Invokes the completion callbacks if they are set.
Definition job.cpp:484
virtual auto get_job_queue(void) const -> std::shared_ptr< job_queue >
Retrieves the job_queue associated with this job, if any.
Definition job.cpp:244
auto with_priority(job_priority priority) -> job &
Sets the priority level for this job.
Definition job.cpp:333
auto has_explicit_cancellation() const -> bool
Checks if this job has an explicit cancellation set via composition.
Definition job.cpp:454
auto get_timeout() const -> std::optional< std::chrono::milliseconds >
Gets the timeout duration for this job.
Definition job.cpp:436
virtual auto do_work(void) -> common::VoidResult
The core task execution method to be overridden by derived classes.
Definition job.cpp:135
auto get_name(void) const -> std::string
Retrieves the name of this job.
Definition job.cpp:112
cancellation_token cancellation_token_
The cancellation token associated with this job.
Definition job.h:504
auto with_on_complete(std::function< void(common::VoidResult)> callback) -> job &
Attaches a completion callback to this job.
Definition job.cpp:299
virtual auto get_cancellation_token() const -> cancellation_token
Gets the cancellation token associated with this job.
Definition job.cpp:192
auto get_retry_policy() const -> std::optional< retry_policy >
Gets the retry policy of this job.
Definition job.cpp:418
job(const std::string &name="job")
Constructs a new job with an optional human-readable name.
Definition job.cpp:53
auto with_cancellation(const cancellation_token &token) -> job &
Attaches a cancellation token to this job via composition.
Definition job.cpp:350
A template class representing either a value or an error.
Encapsulates retry behavior configuration for jobs.
static auto format(const char *formats, const FormatArgs &... args) -> std::string
Formats a narrow-character string with the given arguments.
Definition formatter.h:132
@ callback
Call user callback for custom decision.
Base job class for schedulable work units in the thread system.
Thread-safe FIFO job queue with optional bounded size.
Core threading foundation of the thread system library.
Definition thread_impl.h:17
job_priority
Priority levels for job scheduling.
Definition job.h:48
@ normal
Normal priority, default for most jobs.
@ priority
Priority-based scheduling.
STL namespace.
Internal structure holding composed behaviors for a job.
Definition job.h:69