Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
Tutorial: DAG Scheduling

The DAG scheduler executes jobs that form a Directed Acyclic Graph of dependencies. Each node runs at most once, and only after all of its predecessors have completed successfully. This page shows how to define a graph, reason about execution order, and handle errors that propagate through the chain.

Introduction

A DAG is appropriate when the work has structured dependencies that go beyond a simple FIFO queue — for example, build pipelines, data processing stages, or fan-out/fan-in workloads. Thread System's dag_scheduler runs DAG nodes on top of an existing thread_pool, so you keep the same worker model while gaining dependency tracking.

Defining Dependencies

Use dag_job_builder to declare nodes and edges. Edges express "must-finish-before" relationships:

auto builder = kcenon::thread::dag::dag_job_builder{};
auto load = builder.add_node("load", [] { return load_input(); });
auto parse = builder.add_node("parse", [] { return parse_data(); });
auto verify = builder.add_node("verify", [] { return verify_data(); });
auto store = builder.add_node("store", [] { return persist(); });
builder.add_edge(load, parse);
builder.add_edge(parse, verify);
builder.add_edge(verify, store);
auto dag = std::move(builder).build();
Fluent builder for creating dag_job instances with dependencies.

The builder validates that the resulting graph contains no cycles. Adding an edge that introduces a cycle returns an error result rather than throwing.

Execution Order Guarantees

The scheduler provides two guarantees:

  1. A node never starts before all of its predecessors have transitioned to a terminal state (success, failed, or cancelled).
  2. Nodes that share no transitive ordering may run concurrently, subject to the underlying thread pool capacity.

The scheduler does not guarantee a particular order between independent nodes. If two parsers have no edge between them, either may run first. Encode ordering you actually require as edges.

Error Handling

When a node returns an error result, the scheduler:

  • Marks the node as failed.
  • Cancels all transitive successors that have not started yet.
  • Aggregates failures into the final scheduler result.

You can opt into "best-effort" mode, where independent branches continue to run even if a sibling branch fails. This is useful when each branch produces an independently consumable artifact (e.g., parallel report generators).

auto config = kcenon::thread::dag::dag_config{};
config.failure_mode = kcenon::thread::dag::failure_mode::best_effort;
auto scheduler = kcenon::thread::dag::dag_scheduler{pool, config};
auto result = scheduler.run(dag);
if (!result) {
// result.get_error() contains aggregated failure info.
}

Example 1: Linear Pipeline

int main() {
auto pool = kcenon::thread::thread_pool::create();
pool->start();
auto b = kcenon::thread::dag::dag_job_builder{};
auto a = b.add_node("a", [] { return kcenon::thread::result_void{}; });
auto c = b.add_node("b", [] { return kcenon::thread::result_void{}; });
auto d = b.add_node("c", [] { return kcenon::thread::result_void{}; });
b.add_edge(a, c);
b.add_edge(c, d);
auto dag = std::move(b).build();
kcenon::thread::dag::dag_scheduler{pool}.run(dag);
pool->stop();
return 0;
}
Wrapper for void result.
DAG-based job scheduler with dependency management and topological execution.
Stable public include for thread_pool and numa_thread_pool.

Example 2: Fan-out / Fan-in

auto b = kcenon::thread::dag::dag_job_builder{};
auto split = b.add_node("split", [] { return split_input(); });
auto worker1 = b.add_node("worker1", [] { return process_chunk(0); });
auto worker2 = b.add_node("worker2", [] { return process_chunk(1); });
auto worker3 = b.add_node("worker3", [] { return process_chunk(2); });
auto join = b.add_node("join", [] { return merge_results(); });
b.add_edge(split, worker1);
b.add_edge(split, worker2);
b.add_edge(split, worker3);
b.add_edge(worker1, join);
b.add_edge(worker2, join);
b.add_edge(worker3, join);
auto dag = std::move(b).build();

The three worker nodes run in parallel after split completes; join waits for all three.

Example 3: Best-Effort Failure Handling

auto config = kcenon::thread::dag::dag_config{};
config.failure_mode = kcenon::thread::dag::failure_mode::best_effort;
auto scheduler = kcenon::thread::dag::dag_scheduler{pool, config};
auto result = scheduler.run(dag);
if (!result) {
auto err = result.get_error();
// err.failed_nodes lists nodes that did not complete successfully.
// err.completed_nodes lists nodes that did finish.
}

Next Steps