GCC Code Coverage Report


Directory: ./
File: backend/src/stream_manager.cpp
Date: 2025-11-24 00:30:48
Exec Total Coverage
Lines: 0 338 0.0%
Functions: 0 36 0.0%
Branches: 0 364 0.0%

Line Branch Exec Source
1 #include "stream_manager.hpp"
2
3 #include <chrono>
4 #include <filesystem>
5 #include <ranges>
6 #include <thread>
7
8 #ifdef __linux__
9 #include <fcntl.h>
10 #include <linux/videodev2.h>
11 #include <sys/ioctl.h>
12 #include <unistd.h>
13
14 namespace {
15 bool is_capture_device(const std::string& path) {
16 const int fd = ::open(path.c_str(), O_RDONLY | O_NONBLOCK);
17 if (fd < 0) {
18 return false;
19 }
20
21 v4l2_capability cap {};
22 const int rc = ::ioctl(fd, VIDIOC_QUERYCAP, &cap);
23 ::close(fd);
24
25 if (rc < 0) {
26 return false;
27 }
28
29 std::uint32_t caps = cap.capabilities;
30 if (caps & V4L2_CAP_DEVICE_CAPS) {
31 caps = cap.device_caps;
32 }
33
34 const bool capture = (caps & V4L2_CAP_VIDEO_CAPTURE)
35 || (caps & V4L2_CAP_VIDEO_CAPTURE_MPLANE);
36
37 const bool streaming = (caps & V4L2_CAP_STREAMING);
38
39 return capture && streaming;
40 }
41 }
42 #endif
43
44 yodau::backend::stream_manager::stream_manager() { refresh_local_streams(); }
45
46 void yodau::backend::stream_manager::dump(std::ostream& out) const {
47 std::scoped_lock lock(mtx);
48 dump_stream(out);
49 out << "\n";
50 dump_lines(out);
51 }
52
53 void yodau::backend::stream_manager::dump_lines(std::ostream& out) const {
54 std::scoped_lock lock(mtx);
55 out << lines.size() << " lines:";
56 for (const auto& line : lines | std::views::values) {
57 out << "\n\t";
58 line->dump(out);
59 }
60 }
61
62 void yodau::backend::stream_manager::dump_stream(
63 std::ostream& out, const bool connections
64 ) const {
65 std::scoped_lock lock(mtx);
66 out << streams.size() << " streams:";
67 for (const auto& stream : streams | std::views::values) {
68 out << "\n\t";
69 stream->dump(out, connections);
70 }
71 }
72
73 void yodau::backend::stream_manager::set_local_stream_detector(
74 local_stream_detector_fn detector
75 ) {
76 {
77 std::scoped_lock lock(mtx);
78 stream_detector = std::move(detector);
79 }
80 refresh_local_streams();
81 }
82
83 void yodau::backend::stream_manager::refresh_local_streams() {
84 #ifdef __linux__
85 for (size_t idx = 0;; ++idx) {
86 std::string path = "/dev/video" + std::to_string(idx);
87 if (!std::filesystem::exists(path)) {
88 break;
89 }
90 if (!is_capture_device(path)) {
91 continue;
92 }
93
94 {
95 std::scoped_lock lock(mtx);
96 if (streams.contains("video" + std::to_string(idx))) {
97 continue;
98 }
99 }
100
101 add_stream(path, "video" + std::to_string(idx), "local");
102 }
103 #endif
104
105 local_stream_detector_fn det;
106 {
107 std::scoped_lock lock(mtx);
108 det = stream_detector;
109 }
110
111 if (!det) {
112 return;
113 }
114
115 auto detected_streams = det();
116 for (auto& detected_stream : detected_streams) {
117 const auto name = detected_stream.get_name();
118
119 std::scoped_lock lock(mtx);
120 if (!streams.contains(name)) { // todo: update existing streams?
121 streams.emplace(
122 name, std::make_shared<stream>(std::move(detected_stream))
123 );
124 }
125 }
126 }
127
128 yodau::backend::stream& yodau::backend::stream_manager::add_stream(
129 const std::string& path, const std::string& name, const std::string& type,
130 bool loop
131 ) {
132 std::scoped_lock lock(mtx);
133 std::string stream_name = name;
134 while (stream_name.empty() || streams.contains(stream_name)) {
135 stream_name = "stream_" + std::to_string(stream_idx++);
136 }
137 auto new_stream = std::make_shared<stream>(path, stream_name, type, loop);
138 auto& ref = *new_stream;
139 streams.emplace(stream_name, std::move(new_stream));
140 return ref;
141 }
142
143 yodau::backend::line_ptr yodau::backend::stream_manager::add_line(
144 const std::string& points, const bool closed, const std::string& name
145 ) {
146 std::scoped_lock lock(mtx);
147 std::vector<point> parsed_points = parse_points(points);
148 std::string line_name = name;
149 while (line_name.empty() || lines.contains(line_name)) {
150 line_name = "line_" + std::to_string(line_idx++);
151 }
152 auto new_line = make_line(std::move(parsed_points), line_name, closed);
153 lines.emplace(line_name, new_line);
154 return new_line;
155 }
156
157 yodau::backend::stream& yodau::backend::stream_manager::set_line(
158 const std::string& stream_name, const std::string& line_name
159 ) {
160 std::scoped_lock lock(mtx);
161 const auto stream_it = streams.find(stream_name);
162 if (stream_it == streams.end()) {
163 throw std::runtime_error("stream not found: " + stream_name);
164 }
165 const auto line_it = lines.find(line_name);
166 if (line_it == lines.end()) {
167 throw std::runtime_error("line not found: " + line_name);
168 }
169 stream_it->second->connect_line(line_it->second);
170 return *stream_it->second;
171 }
172
173 std::shared_ptr<const yodau::backend::stream>
174 yodau::backend::stream_manager::find_stream(const std::string& name) const {
175 std::scoped_lock lock(mtx);
176 const auto it = streams.find(name);
177 if (it == streams.end()) {
178 return {};
179 }
180 return it->second;
181 }
182
183 std::vector<std::string> yodau::backend::stream_manager::stream_names() const {
184 std::scoped_lock lock(mtx);
185 return streams | std::views::keys
186 | std::ranges::to<std::vector<std::string>>();
187 }
188
189 std::vector<std::string> yodau::backend::stream_manager::line_names() const {
190 std::scoped_lock lock(mtx);
191 return lines | std::views::keys
192 | std::ranges::to<std::vector<std::string>>();
193 }
194
195 std::vector<std::string> yodau::backend::stream_manager::stream_lines(
196 const std::string& stream_name
197 ) const {
198 std::scoped_lock lock(mtx);
199 const auto stream_it = streams.find(stream_name);
200 if (stream_it == streams.end()) {
201 return {};
202 }
203 return stream_it->second->line_names();
204 }
205
206 void yodau::backend::stream_manager::set_manual_push_hook(manual_push_fn hook) {
207 std::scoped_lock lock(mtx);
208 manual_push = std::move(hook);
209 }
210
211 void yodau::backend::stream_manager::set_daemon_start_hook(
212 daemon_start_fn hook
213 ) {
214 std::scoped_lock lock(mtx);
215 daemon_start = std::move(hook);
216 }
217
218 void yodau::backend::stream_manager::push_frame(
219 const std::string& stream_name, frame&& f
220 ) {
221 manual_push_fn mp;
222 event_sink_fn es;
223 event_batch_sink_fn bes;
224
225 {
226 std::scoped_lock lock(mtx);
227 mp = manual_push;
228 es = event_sink;
229 bes = event_batch_sink;
230 }
231
232 if (mp) {
233 mp(stream_name, std::move(f));
234 return;
235 }
236
237 auto events = process_frame(stream_name, std::move(f));
238
239 if (bes) {
240 bes(events);
241 return;
242 }
243
244 if (!es) {
245 return;
246 }
247
248 for (const auto& e : events) {
249 es(e);
250 }
251 }
252
253 void yodau::backend::stream_manager::start_daemon(
254 const std::string& stream_name
255 ) {
256 start_stream(stream_name);
257 }
258
259 void yodau::backend::stream_manager::set_frame_processor(
260 frame_processor_fn fn
261 ) {
262 std::scoped_lock lock(mtx);
263 frame_processor = std::move(fn);
264 }
265
266 std::vector<yodau::backend::event>
267 yodau::backend::stream_manager::process_frame(
268 const std::string& stream_name, frame&& f
269 ) {
270 std::shared_ptr<stream> sp;
271 frame_processor_fn fp;
272 const auto now = std::chrono::steady_clock::now();
273 bool allow = false;
274
275 {
276 std::scoped_lock lock(mtx);
277 auto it = streams.find(stream_name);
278 if (it == streams.end() || !frame_processor) {
279 return {};
280 }
281
282 fp = frame_processor;
283 sp = it->second;
284
285 const auto last_it = last_analysis_ts.find(stream_name);
286 if (last_it == last_analysis_ts.end()) {
287 last_analysis_ts[stream_name] = now;
288 allow = true;
289 } else {
290 const auto dt
291 = std::chrono::duration_cast<std::chrono::milliseconds>(
292 now - last_it->second
293 )
294 .count();
295 if (dt >= analysis_interval_ms) {
296 last_analysis_ts[stream_name] = now;
297 allow = true;
298 }
299 }
300 }
301
302 if (!allow || !sp || !fp) {
303 return {};
304 }
305
306 return fp(*sp, f);
307 }
308
309 void yodau::backend::stream_manager::set_event_sink(event_sink_fn fn) {
310 std::scoped_lock lock(mtx);
311 event_sink = std::move(fn);
312 }
313
314 void yodau::backend::stream_manager::set_event_batch_sink(
315 event_batch_sink_fn fn
316 ) {
317 std::scoped_lock lock(mtx);
318 event_batch_sink = std::move(fn);
319 }
320
321 void yodau::backend::stream_manager::set_analysis_interval_ms(int ms) {
322 if (ms <= 0) {
323 return;
324 }
325 std::scoped_lock lock(mtx);
326 analysis_interval_ms = ms;
327 }
328
329 void yodau::backend::stream_manager::start_stream(const std::string& name) {
330 std::shared_ptr<stream> sp;
331 daemon_start_fn ds;
332
333 {
334 std::scoped_lock lock(mtx);
335 if (!daemon_start || daemons.contains(name)) {
336 return;
337 }
338
339 const auto it = streams.find(name);
340 if (it == streams.end() || !it->second) {
341 return;
342 }
343
344 sp = it->second;
345 ds = daemon_start;
346
347 #ifdef __linux__
348 if (!is_linux_capture_ok(*sp)) {
349 return;
350 }
351 #endif
352
353 sp->activate(stream_pipeline::automatic);
354 }
355
356 std::jthread th([this, name, sp, ds](std::stop_token st) mutable {
357 ds(
358 *sp, [this, name](frame&& f) { push_frame(name, std::move(f)); }, st
359 );
360 });
361
362 {
363 std::scoped_lock lock(mtx);
364 daemons.emplace(name, std::move(th));
365 }
366 }
367
368 void yodau::backend::stream_manager::stop_stream(const std::string& name) {
369 std::jthread th;
370 std::shared_ptr<stream> sp;
371
372 {
373 std::scoped_lock lock(mtx);
374 const auto it = daemons.find(name);
375 if (it == daemons.end()) {
376 return;
377 }
378
379 th = std::move(it->second);
380 daemons.erase(it);
381
382 const auto sit = streams.find(name);
383 if (sit != streams.end()) {
384 sp = sit->second;
385 }
386 }
387
388 th.request_stop();
389
390 if (sp) {
391 std::scoped_lock lock(mtx);
392 sp->deactivate();
393 }
394 }
395
396 bool yodau::backend::stream_manager::is_stream_running(
397 const std::string& name
398 ) const {
399 std::scoped_lock lock(mtx);
400 return daemons.contains(name);
401 }
402
403 void yodau::backend::stream_manager::enable_fake_events(const int interval_ms) {
404 {
405 std::scoped_lock lock(mtx);
406
407 if (interval_ms > 0) {
408 fake_interval_ms = interval_ms;
409 }
410
411 if (fake_enabled) {
412 return;
413 }
414
415 fake_enabled = true;
416 }
417
418 std::jthread th([this](std::stop_token st) { run_fake_events(st); });
419
420 {
421 std::scoped_lock lock(mtx);
422 if (!fake_enabled) {
423 th.request_stop();
424 return;
425 }
426 fake_thread = std::move(th);
427 }
428 }
429
430 void yodau::backend::stream_manager::disable_fake_events() {
431 std::jthread th;
432
433 {
434 std::scoped_lock lock(mtx);
435
436 if (!fake_enabled) {
437 return;
438 }
439
440 fake_enabled = false;
441 th = std::move(fake_thread);
442 fake_thread = std::jthread();
443 }
444
445 if (th.joinable()) {
446 th.request_stop();
447 }
448 }
449
450 void yodau::backend::stream_manager::set_line_dir(
451 const std::string& line_name, tripwire_dir dir
452 ) {
453 std::scoped_lock lock(mtx);
454
455 const auto it = lines.find(line_name);
456 if (it == lines.end() || !it->second) {
457 throw std::runtime_error("line not found: " + line_name);
458 }
459
460 const auto old_ptr = it->second;
461 auto new_ptr = std::make_shared<line>(*old_ptr);
462 new_ptr->dir = dir;
463
464 it->second = new_ptr;
465 }
466
467 std::vector<std::shared_ptr<yodau::backend::stream>>
468 yodau::backend::stream_manager::snapshot_streams() const {
469 std::vector<std::shared_ptr<stream>> snap;
470
471 std::scoped_lock lock(mtx);
472 snap.reserve(streams.size());
473 for (auto& sp : streams | std::views::values) {
474 if (sp) {
475 snap.push_back(sp);
476 }
477 }
478
479 return snap;
480 }
481
482 void yodau::backend::stream_manager::snapshot_hooks(
483 frame_processor_fn& fp, event_sink_fn& es, event_batch_sink_fn& bes
484 ) const {
485 std::scoped_lock lock(mtx);
486 fp = frame_processor;
487 es = event_sink;
488 bes = event_batch_sink;
489 }
490
491 int yodau::backend::stream_manager::current_fake_interval_ms() const {
492 std::scoped_lock lock(mtx);
493 return fake_interval_ms;
494 }
495
496 void yodau::backend::stream_manager::run_fake_events(std::stop_token st) {
497 frame dummy;
498
499 while (!st.stop_requested()) {
500 auto snap = snapshot_streams();
501
502 frame_processor_fn fp;
503 event_sink_fn es;
504 event_batch_sink_fn bes;
505 snapshot_hooks(fp, es, bes);
506
507 if (fp) {
508 for (const auto& sp : snap) {
509 auto evs = fp(*sp, dummy);
510
511 if (bes) {
512 if (!evs.empty()) {
513 bes(evs);
514 }
515 } else if (es) {
516 for (const auto& e : evs) {
517 es(e);
518 }
519 }
520 }
521 }
522
523 const int interval = current_fake_interval_ms();
524 std::this_thread::sleep_for(std::chrono::milliseconds(interval));
525 }
526 }
527
528 #ifdef __linux__
529 bool yodau::backend::stream_manager::is_linux_capture_ok(const stream& s) {
530 if (s.get_type() != local) {
531 return true;
532 }
533
534 const auto& p = s.get_path();
535 if (p.rfind("/dev/video", 0) != 0) {
536 return true;
537 }
538
539 return is_capture_device(p);
540 }
541 #endif
542