Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
job_builder.h
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2024, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
13#pragma once
14
15#include "job.h"
16#include "retry_policy.h"
17#include "cancellation_token.h"
18
19#include <chrono>
20#include <functional>
21#include <memory>
22#include <string>
23#include <type_traits>
24#include <vector>
25
26namespace kcenon::thread
27{
98 {
99 public:
103 job_builder() = default;
104
110 auto name(const std::string& name) -> job_builder&
111 {
112 name_ = name;
113 return *this;
114 }
115
125 auto work(std::function<common::VoidResult()> work_fn) -> job_builder&
126 {
127 work_fn_ = std::move(work_fn);
128 return *this;
129 }
130
139 const std::vector<uint8_t>& data,
140 std::function<common::VoidResult(const std::vector<uint8_t>&)> work_fn) -> job_builder&
141 {
142 data_ = data;
143 data_work_fn_ = std::move(work_fn);
144 return *this;
145 }
146
154 {
155 cancellation_token_ = token;
156 has_cancellation_ = true;
157 return *this;
158 }
159
166 auto on_complete(std::function<void(common::VoidResult)> callback) -> job_builder&
167 {
168 on_complete_ = std::move(callback);
169 return *this;
170 }
171
178 auto on_error(std::function<void(const common::error_info&)> callback) -> job_builder&
179 {
180 on_error_ = std::move(callback);
181 return *this;
182 }
183
191 {
192 priority_ = prio;
193 has_priority_ = true;
194 return *this;
195 }
196
203 auto retry(const retry_policy& policy) -> job_builder&
204 {
205 retry_policy_ = policy;
206 has_retry_ = true;
207 return *this;
208 }
209
216 auto timeout(std::chrono::milliseconds timeout) -> job_builder&
217 {
219 has_timeout_ = true;
220 return *this;
221 }
222
247 template<typename JobType, typename... Args>
248 auto from(Args&&... args) -> job_builder&
249 {
250 static_assert(std::is_base_of_v<job, JobType>,
251 "JobType must be derived from kcenon::thread::job");
252
253 custom_job_factory_ = [args = std::make_tuple(std::forward<Args>(args)...)]() mutable
254 {
255 return std::apply([](auto&&... a) {
256 return std::make_unique<JobType>(std::forward<decltype(a)>(a)...);
257 }, std::move(args));
258 };
259 use_custom_job_ = true;
260 return *this;
261 }
262
271 [[nodiscard]] auto build() -> std::unique_ptr<job>
272 {
273 std::unique_ptr<job> result;
274
276 {
278 }
279 else
280 {
281 result = std::make_unique<built_job>(
282 name_.empty() ? "builder_job" : name_,
283 data_,
284 std::move(work_fn_),
285 std::move(data_work_fn_)
286 );
287 }
288
289 // Apply composition
291 {
292 result->with_cancellation(cancellation_token_);
293 }
294 if (on_complete_)
295 {
296 result->with_on_complete(std::move(on_complete_));
297 }
298 if (on_error_)
299 {
300 result->with_on_error(std::move(on_error_));
301 }
302 if (has_priority_)
303 {
304 result->with_priority(priority_);
305 }
306 if (has_retry_)
307 {
308 result->with_retry(retry_policy_);
309 }
310 if (has_timeout_)
311 {
312 result->with_timeout(timeout_);
313 }
314
315 return result;
316 }
317
323 [[nodiscard]] auto build_shared() -> std::shared_ptr<job>
324 {
325 return build();
326 }
327
328 private:
336 class built_job : public job
337 {
338 public:
340 const std::string& name,
341 const std::vector<uint8_t>& data,
342 std::function<common::VoidResult()> work_fn,
343 std::function<common::VoidResult(const std::vector<uint8_t>&)> data_work_fn)
344 : job(data.empty() ? name : name)
345 , work_fn_(std::move(work_fn))
346 , data_work_fn_(std::move(data_work_fn))
347 {
348 if (!data.empty())
349 {
350 data_ = data;
351 }
352 }
353
354 [[nodiscard]] auto do_work() -> common::VoidResult override
355 {
356 // Check cancellation before starting
358 {
361 return result;
362 }
363
364 common::VoidResult result = [this]() -> common::VoidResult {
365 if (data_work_fn_ && !data_.empty())
366 {
367 return data_work_fn_(data_);
368 }
369 else if (work_fn_)
370 {
371 return work_fn_();
372 }
373 else
374 {
375 return common::error_info{
376 static_cast<int>(error_code::not_implemented),
377 "No work function provided to job_builder",
378 "thread_system"
379 };
380 }
381 }();
382
383 // Invoke callbacks
385
386 return result;
387 }
388
389 private:
390 std::function<common::VoidResult()> work_fn_;
391 std::function<common::VoidResult(const std::vector<uint8_t>&)> data_work_fn_;
392 };
393
394 // Builder state
395 std::string name_;
396 std::vector<uint8_t> data_;
397 std::function<common::VoidResult()> work_fn_;
398 std::function<common::VoidResult(const std::vector<uint8_t>&)> data_work_fn_;
399
401 bool has_cancellation_{false};
402
403 std::function<void(common::VoidResult)> on_complete_;
404 std::function<void(const common::error_info&)> on_error_;
405
407 bool has_priority_{false};
408
410 bool has_retry_{false};
411
412 std::chrono::milliseconds timeout_{0};
413 bool has_timeout_{false};
414
415 std::function<std::unique_ptr<job>()> custom_job_factory_;
416 bool use_custom_job_{false};
417 };
418
432 [[nodiscard]] inline auto make_job() -> job_builder
433 {
434 return job_builder{};
435 }
436
437} // namespace kcenon::thread
Provides a mechanism for cooperative cancellation of operations.
bool is_cancelled() const
Checks if the token has been canceled.
Internal job implementation created by the builder.
std::function< common::VoidResult()> work_fn_
auto do_work() -> common::VoidResult override
The core task execution method to be overridden by derived classes.
std::function< common::VoidResult(const std::vector< uint8_t > &)> data_work_fn_
built_job(const std::string &name, const std::vector< uint8_t > &data, std::function< common::VoidResult()> work_fn, std::function< common::VoidResult(const std::vector< uint8_t > &)> data_work_fn)
Fluent builder for creating and configuring jobs with composition.
Definition job_builder.h:98
auto work(std::function< common::VoidResult()> work_fn) -> job_builder &
Sets the work function for the job.
cancellation_token cancellation_token_
auto cancellation(const cancellation_token &token) -> job_builder &
Sets a cancellation token for cooperative cancellation.
std::function< common::VoidResult()> work_fn_
job_builder()=default
Default constructor.
auto build_shared() -> std::shared_ptr< job >
Builds and returns the configured job as a shared pointer.
std::vector< uint8_t > data_
auto work_with_data(const std::vector< uint8_t > &data, std::function< common::VoidResult(const std::vector< uint8_t > &)> work_fn) -> job_builder &
Sets the work function with data parameter.
auto priority(job_priority prio) -> job_builder &
Sets the job priority.
auto on_error(std::function< void(const common::error_info &)> callback) -> job_builder &
Sets an error callback.
auto on_complete(std::function< void(common::VoidResult)> callback) -> job_builder &
Sets a completion callback.
auto build() -> std::unique_ptr< job >
Builds and returns the configured job.
auto from(Args &&... args) -> job_builder &
auto timeout(std::chrono::milliseconds timeout) -> job_builder &
Sets the execution timeout.
std::function< void(const common::error_info &)> on_error_
std::function< std::unique_ptr< job >()> custom_job_factory_
auto retry(const retry_policy &policy) -> job_builder &
Sets the retry policy.
std::chrono::milliseconds timeout_
auto name(const std::string &name) -> job_builder &
Sets the job name.
std::function< void(common::VoidResult)> on_complete_
std::function< common::VoidResult(const std::vector< uint8_t > &)> data_work_fn_
Represents a unit of work (task) to be executed, typically by a job queue.
Definition job.h:136
std::vector< uint8_t > data_
An optional container of raw byte data that may be used by the job.
Definition job.h:488
auto invoke_callbacks(const common::VoidResult &result) -> void
Invokes the completion callbacks if they are set.
Definition job.cpp:484
cancellation_token cancellation_token_
The cancellation token associated with this job.
Definition job.h:504
A template class representing either a value or an error.
Encapsulates retry behavior configuration for jobs.
@ callback
Call user callback for custom decision.
Implementation of a cancellation token for cooperative cancellation.
Base job class for schedulable work units in the thread system.
Core threading foundation of the thread system library.
Definition thread_impl.h:17
auto make_job() -> job_builder
common::VoidResult make_error_result(error_code code, const std::string &message="")
Create a common::VoidResult error from a thread::error_code.
job_priority
Priority levels for job scheduling.
Definition job.h:48
@ normal
Normal priority, default for most jobs.
STL namespace.
Retry policy with configurable strategies: fixed, linear, and exponential backoff.