Monitoring System 0.1.0
System resource monitoring with pluggable collectors and alerting
Loading...
Searching...
No Matches
jaeger_proto.h
Go to the documentation of this file.
1#pragma once
2
3// BSD 3-Clause License
4// Copyright (c) 2025
5// See the LICENSE file in the project root for full license information.
6
26#include "protobuf_wire.h"
27
28#include <chrono>
29#include <cstdint>
30#include <cstring>
31#include <string>
32#include <utility>
33#include <vector>
34
35namespace kcenon { namespace monitoring { namespace jaeger_proto {
36
37enum class value_type : std::int32_t {
38 string_type = 0,
39 bool_type = 1,
40 int64_type = 2,
41 float64_type = 3,
42 binary_type = 4,
43};
44
45struct key_value {
46 std::string key;
48 std::string v_str;
49 bool v_bool{false};
50 std::int64_t v_int64{0};
51 double v_float64{0.0};
52 std::vector<std::uint8_t> v_binary;
53};
54
55struct process {
56 std::string service_name;
57 std::vector<key_value> tags;
58};
59
60struct span_ref {
61 std::vector<std::uint8_t> trace_id;
62 std::vector<std::uint8_t> span_id;
63 std::int32_t ref_type{0}; // 0 = CHILD_OF, 1 = FOLLOWS_FROM
64};
65
66struct span {
67 std::vector<std::uint8_t> trace_id; // 16 bytes on the wire
68 std::vector<std::uint8_t> span_id; // 8 bytes on the wire
69 std::string operation_name;
70 std::vector<span_ref> references;
71 std::uint32_t flags{0};
72 std::int64_t start_time_seconds{0};
73 std::int32_t start_time_nanos{0};
74 std::int64_t duration_seconds{0};
75 std::int32_t duration_nanos{0};
76 std::vector<key_value> tags;
77 // Logs omitted from this minimal encoder (not produced by the current
78 // span data model). The decoder skips unknown fields safely.
80};
81
82struct batch {
83 std::vector<span> spans;
85 bool has_process{false};
86};
87
88// ---------------------------------------------------------------------------
89// Encoders
90// ---------------------------------------------------------------------------
91
92inline std::vector<std::uint8_t> encode_timestamp(std::int64_t seconds,
93 std::int32_t nanos) {
94 std::vector<std::uint8_t> out;
95 protobuf_wire::encode_uint64_field(out, 1, static_cast<std::uint64_t>(seconds));
96 protobuf_wire::encode_uint64_field(out, 2, static_cast<std::uint64_t>(nanos));
97 return out;
98}
99
100inline std::vector<std::uint8_t> encode_duration(std::int64_t seconds,
101 std::int32_t nanos) {
102 std::vector<std::uint8_t> out;
103 protobuf_wire::encode_uint64_field(out, 1, static_cast<std::uint64_t>(seconds));
104 protobuf_wire::encode_uint64_field(out, 2, static_cast<std::uint64_t>(nanos));
105 return out;
106}
107
108inline std::vector<std::uint8_t> encode_key_value(const key_value& kv) {
109 std::vector<std::uint8_t> out;
111 protobuf_wire::encode_enum_field(out, 2, static_cast<std::int32_t>(kv.v_type));
114 if (kv.v_int64 != 0) {
116 protobuf_wire::encode_varint(out, static_cast<std::uint64_t>(kv.v_int64));
117 }
118 if (kv.v_float64 != 0.0) {
119 std::uint64_t bits;
120 static_assert(sizeof(bits) == sizeof(kv.v_float64),
121 "double must be 64 bits");
122 std::memcpy(&bits, &kv.v_float64, sizeof(bits));
125 }
127 return out;
128}
129
130inline std::vector<std::uint8_t> encode_process(const process& p) {
131 std::vector<std::uint8_t> out;
133 for (const auto& tag : p.tags) {
134 auto serialized = encode_key_value(tag);
135 protobuf_wire::encode_message_field(out, 2, serialized);
136 }
137 return out;
138}
139
140inline std::vector<std::uint8_t> encode_span_ref(const span_ref& ref) {
141 std::vector<std::uint8_t> out;
145 return out;
146}
147
148inline std::vector<std::uint8_t> encode_span(const span& s) {
149 std::vector<std::uint8_t> out;
153 for (const auto& ref : s.references) {
154 auto serialized = encode_span_ref(ref);
155 protobuf_wire::encode_message_field(out, 4, serialized);
156 }
158 if (s.start_time_seconds != 0 || s.start_time_nanos != 0) {
161 }
162 if (s.duration_seconds != 0 || s.duration_nanos != 0) {
165 }
166 for (const auto& tag : s.tags) {
167 auto serialized = encode_key_value(tag);
168 protobuf_wire::encode_message_field(out, 8, serialized);
169 }
170 if (!s.proc.service_name.empty() || !s.proc.tags.empty()) {
171 auto serialized = encode_process(s.proc);
172 protobuf_wire::encode_message_field(out, 10, serialized);
173 }
174 return out;
175}
176
177inline std::vector<std::uint8_t> encode_batch(const batch& b) {
178 std::vector<std::uint8_t> out;
179 for (const auto& s : b.spans) {
180 auto serialized = encode_span(s);
181 protobuf_wire::encode_message_field(out, 1, serialized);
182 }
183 if (b.has_process) {
184 auto serialized = encode_process(b.proc);
185 protobuf_wire::encode_message_field(out, 2, serialized);
186 }
187 return out;
188}
189
190// ---------------------------------------------------------------------------
191// Decoders (used by round-trip tests)
192// ---------------------------------------------------------------------------
193
194inline bool decode_key_value(const std::uint8_t* data, std::size_t size,
195 key_value& out) {
196 protobuf_wire::reader r(data, size);
197 while (!r.eof()) {
198 std::uint32_t field_number;
200 if (!protobuf_wire::decode_tag(r, field_number, wt)) return false;
201 switch (field_number) {
202 case 1:
203 if (wt != protobuf_wire::wire_type::length_delimited) return false;
204 if (!r.read_string(out.key)) return false;
205 break;
206 case 2: {
207 if (wt != protobuf_wire::wire_type::varint) return false;
208 auto v = r.read_varint();
209 if (!v) return false;
210 out.v_type = static_cast<value_type>(*v);
211 break;
212 }
213 case 3:
214 if (wt != protobuf_wire::wire_type::length_delimited) return false;
215 if (!r.read_string(out.v_str)) return false;
216 break;
217 case 4: {
218 if (wt != protobuf_wire::wire_type::varint) return false;
219 auto v = r.read_varint();
220 if (!v) return false;
221 out.v_bool = (*v != 0);
222 break;
223 }
224 case 5: {
225 if (wt != protobuf_wire::wire_type::varint) return false;
226 auto v = r.read_varint();
227 if (!v) return false;
228 out.v_int64 = static_cast<std::int64_t>(*v);
229 break;
230 }
231 case 6: {
232 if (wt != protobuf_wire::wire_type::fixed64) return false;
233 auto v = r.read_fixed64();
234 if (!v) return false;
235 std::uint64_t bits = *v;
236 std::memcpy(&out.v_float64, &bits, sizeof(out.v_float64));
237 break;
238 }
239 case 7:
240 if (wt != protobuf_wire::wire_type::length_delimited) return false;
241 if (!r.read_bytes(out.v_binary)) return false;
242 break;
243 default:
244 if (!r.skip_field(wt)) return false;
245 }
246 }
247 return true;
248}
249
250inline bool decode_process(const std::uint8_t* data, std::size_t size,
251 process& out) {
252 protobuf_wire::reader r(data, size);
253 while (!r.eof()) {
254 std::uint32_t field_number;
256 if (!protobuf_wire::decode_tag(r, field_number, wt)) return false;
257 switch (field_number) {
258 case 1:
259 if (wt != protobuf_wire::wire_type::length_delimited) return false;
260 if (!r.read_string(out.service_name)) return false;
261 break;
262 case 2: {
263 if (wt != protobuf_wire::wire_type::length_delimited) return false;
264 const std::uint8_t* ptr; std::size_t len;
265 if (!r.read_length_delimited(&ptr, &len)) return false;
266 key_value kv;
267 if (!decode_key_value(ptr, len, kv)) return false;
268 out.tags.push_back(std::move(kv));
269 break;
270 }
271 default:
272 if (!r.skip_field(wt)) return false;
273 }
274 }
275 return true;
276}
277
278inline bool decode_timestamp(const std::uint8_t* data, std::size_t size,
279 std::int64_t& seconds, std::int32_t& nanos) {
280 protobuf_wire::reader r(data, size);
281 seconds = 0; nanos = 0;
282 while (!r.eof()) {
283 std::uint32_t field_number;
285 if (!protobuf_wire::decode_tag(r, field_number, wt)) return false;
287 if (!r.skip_field(wt)) return false;
288 continue;
289 }
290 auto v = r.read_varint();
291 if (!v) return false;
292 if (field_number == 1) seconds = static_cast<std::int64_t>(*v);
293 else if (field_number == 2) nanos = static_cast<std::int32_t>(*v);
294 }
295 return true;
296}
297
298inline bool decode_span_ref(const std::uint8_t* data, std::size_t size,
299 span_ref& out) {
300 protobuf_wire::reader r(data, size);
301 while (!r.eof()) {
302 std::uint32_t field_number;
304 if (!protobuf_wire::decode_tag(r, field_number, wt)) return false;
305 switch (field_number) {
306 case 1:
307 if (wt != protobuf_wire::wire_type::length_delimited) return false;
308 if (!r.read_bytes(out.trace_id)) return false;
309 break;
310 case 2:
311 if (wt != protobuf_wire::wire_type::length_delimited) return false;
312 if (!r.read_bytes(out.span_id)) return false;
313 break;
314 case 3: {
315 if (wt != protobuf_wire::wire_type::varint) return false;
316 auto v = r.read_varint();
317 if (!v) return false;
318 out.ref_type = static_cast<std::int32_t>(*v);
319 break;
320 }
321 default:
322 if (!r.skip_field(wt)) return false;
323 }
324 }
325 return true;
326}
327
328inline bool decode_span(const std::uint8_t* data, std::size_t size,
329 span& out) {
330 protobuf_wire::reader r(data, size);
331 while (!r.eof()) {
332 std::uint32_t field_number;
334 if (!protobuf_wire::decode_tag(r, field_number, wt)) return false;
335 switch (field_number) {
336 case 1:
337 if (wt != protobuf_wire::wire_type::length_delimited) return false;
338 if (!r.read_bytes(out.trace_id)) return false;
339 break;
340 case 2:
341 if (wt != protobuf_wire::wire_type::length_delimited) return false;
342 if (!r.read_bytes(out.span_id)) return false;
343 break;
344 case 3:
345 if (wt != protobuf_wire::wire_type::length_delimited) return false;
346 if (!r.read_string(out.operation_name)) return false;
347 break;
348 case 4: {
349 if (wt != protobuf_wire::wire_type::length_delimited) return false;
350 const std::uint8_t* ptr; std::size_t len;
351 if (!r.read_length_delimited(&ptr, &len)) return false;
352 span_ref ref;
353 if (!decode_span_ref(ptr, len, ref)) return false;
354 out.references.push_back(std::move(ref));
355 break;
356 }
357 case 5: {
358 if (wt != protobuf_wire::wire_type::varint) return false;
359 auto v = r.read_varint();
360 if (!v) return false;
361 out.flags = static_cast<std::uint32_t>(*v);
362 break;
363 }
364 case 6: {
365 if (wt != protobuf_wire::wire_type::length_delimited) return false;
366 const std::uint8_t* ptr; std::size_t len;
367 if (!r.read_length_delimited(&ptr, &len)) return false;
368 if (!decode_timestamp(ptr, len,
370 out.start_time_nanos))
371 return false;
372 break;
373 }
374 case 7: {
375 if (wt != protobuf_wire::wire_type::length_delimited) return false;
376 const std::uint8_t* ptr; std::size_t len;
377 if (!r.read_length_delimited(&ptr, &len)) return false;
378 if (!decode_timestamp(ptr, len,
380 out.duration_nanos))
381 return false;
382 break;
383 }
384 case 8: {
385 if (wt != protobuf_wire::wire_type::length_delimited) return false;
386 const std::uint8_t* ptr; std::size_t len;
387 if (!r.read_length_delimited(&ptr, &len)) return false;
388 key_value kv;
389 if (!decode_key_value(ptr, len, kv)) return false;
390 out.tags.push_back(std::move(kv));
391 break;
392 }
393 case 10: {
394 if (wt != protobuf_wire::wire_type::length_delimited) return false;
395 const std::uint8_t* ptr; std::size_t len;
396 if (!r.read_length_delimited(&ptr, &len)) return false;
397 if (!decode_process(ptr, len, out.proc)) return false;
398 break;
399 }
400 default:
401 if (!r.skip_field(wt)) return false;
402 }
403 }
404 return true;
405}
406
407inline bool decode_batch(const std::uint8_t* data, std::size_t size,
408 batch& out) {
409 protobuf_wire::reader r(data, size);
410 while (!r.eof()) {
411 std::uint32_t field_number;
413 if (!protobuf_wire::decode_tag(r, field_number, wt)) return false;
414 switch (field_number) {
415 case 1: {
416 if (wt != protobuf_wire::wire_type::length_delimited) return false;
417 const std::uint8_t* ptr; std::size_t len;
418 if (!r.read_length_delimited(&ptr, &len)) return false;
419 span s;
420 if (!decode_span(ptr, len, s)) return false;
421 out.spans.push_back(std::move(s));
422 break;
423 }
424 case 2: {
425 if (wt != protobuf_wire::wire_type::length_delimited) return false;
426 const std::uint8_t* ptr; std::size_t len;
427 if (!r.read_length_delimited(&ptr, &len)) return false;
428 if (!decode_process(ptr, len, out.proc)) return false;
429 out.has_process = true;
430 break;
431 }
432 default:
433 if (!r.skip_field(wt)) return false;
434 }
435 }
436 return true;
437}
438
439}}} // namespace kcenon::monitoring::jaeger_proto
Minimal protobuf wire reader used for round-trip tests.
std::optional< std::uint64_t > read_fixed64()
bool read_bytes(std::vector< std::uint8_t > &out)
std::optional< std::uint64_t > read_varint()
bool read_length_delimited(const std::uint8_t **out_ptr, std::size_t *out_len)
Read a length-delimited payload. Returns pointer into the underlying buffer and the length....
bool skip_field(wire_type wt)
Skip a field whose wire type is given.
std::vector< std::uint8_t > encode_process(const process &p)
std::vector< std::uint8_t > encode_duration(std::int64_t seconds, std::int32_t nanos)
bool decode_span(const std::uint8_t *data, std::size_t size, span &out)
std::vector< std::uint8_t > encode_batch(const batch &b)
std::vector< std::uint8_t > encode_span(const span &s)
std::vector< std::uint8_t > encode_timestamp(std::int64_t seconds, std::int32_t nanos)
bool decode_batch(const std::uint8_t *data, std::size_t size, batch &out)
bool decode_key_value(const std::uint8_t *data, std::size_t size, key_value &out)
std::vector< std::uint8_t > encode_key_value(const key_value &kv)
bool decode_span_ref(const std::uint8_t *data, std::size_t size, span_ref &out)
std::vector< std::uint8_t > encode_span_ref(const span_ref &ref)
bool decode_timestamp(const std::uint8_t *data, std::size_t size, std::int64_t &seconds, std::int32_t &nanos)
bool decode_process(const std::uint8_t *data, std::size_t size, process &out)
void encode_string_field(std::vector< std::uint8_t > &out, std::uint32_t field_number, const std::string &value)
Encode a string field.
void encode_bool_field(std::vector< std::uint8_t > &out, std::uint32_t field_number, bool value)
Encode a bool field (proto3 skips false).
bool decode_tag(reader &r, std::uint32_t &field_number, wire_type &wt)
Decode a tag into (field_number, wire_type).
void encode_uint64_field(std::vector< std::uint8_t > &out, std::uint32_t field_number, std::uint64_t value)
Encode a uint32 / uint64 / int64 varint field (skips zero).
void encode_enum_field(std::vector< std::uint8_t > &out, std::uint32_t field_number, std::int32_t value)
Encode an enum field (always written when nonzero).
void encode_bytes_field(std::vector< std::uint8_t > &out, std::uint32_t field_number, const std::vector< std::uint8_t > &value)
Encode a bytes field.
void encode_message_field(std::vector< std::uint8_t > &out, std::uint32_t field_number, const std::vector< std::uint8_t > &serialized)
Encode an embedded message field given its pre-serialized bytes.
void encode_tag(std::vector< std::uint8_t > &out, std::uint32_t field_number, wire_type wt)
Encode a tag (field_number << 3 | wire_type) as a varint.
void encode_varint(std::vector< std::uint8_t > &out, std::uint64_t value)
Encode an unsigned varint into the buffer.
@ varint
int32/int64/uint32/uint64/sint*‍/bool/enum
@ length_delimited
string/bytes/embedded messages/packed repeated
void encode_fixed64(std::vector< std::uint8_t > &out, std::uint64_t value)
Encode a fixed64 little-endian value (used for Zipkin timestamps).
Zero-dependency protobuf wire-format encoder and decoder primitives.
std::vector< std::uint8_t > v_binary
std::vector< std::uint8_t > trace_id
std::vector< std::uint8_t > span_id
std::vector< span_ref > references
std::vector< std::uint8_t > trace_id
std::vector< std::uint8_t > span_id