YODAU 1.0
YEAR OF THE DEPEND ADULT UNDERGARMENT
Loading...
Searching...
No Matches
stream_manager.hpp
Go to the documentation of this file.
1#ifndef YODAU_BACKEND_STREAM_MANAGER_HPP
2#define YODAU_BACKEND_STREAM_MANAGER_HPP
3
4#include "event.hpp"
5#include "frame.hpp"
6#include "stream.hpp"
7
8#include <chrono>
9#include <cstddef>
10#include <functional>
11#include <memory>
12#include <mutex>
13#include <ostream>
14#include <stop_token>
15#include <string>
16#include <thread>
17#include <unordered_map>
18#include <vector>
19
20namespace yodau::backend {
21
22/**
23 * @brief Central coordinator for streams, geometry, frame processing and
24 * events.
25 *
26 * The stream_manager owns:
27 * - a registry of streams (@ref stream) addressable by name,
28 * - a registry of lines (@ref line) addressable by name,
29 * - hooks for stream discovery, frame ingestion, background capture daemons,
30 * frame analysis, and event delivery.
31 *
32 * Typical responsibilities:
33 * - Add and look up streams/lines.
34 * - Connect lines to streams.
35 * - Start/stop stream daemons (background frame producers).
36 * - Accept manually pushed frames and throttle analysis per stream.
37 * - Deliver produced events to configured sinks.
38 *
39 * Thread-safety:
40 * - All public methods lock @ref mtx unless otherwise noted.
41 * - Background threads (daemons and fake-event generator) also use @ref mtx.
42 *
43 * @note All timestamps use std::chrono::steady_clock, i.e. monotonic time.
44 */
46public:
47 /**
48 * @brief Custom detector for local streams.
49 *
50 * The detector is expected to return a list of streams discovered at call
51 * time. Returned streams are moved into the manager if their names are not
52 * already present.
53 */
55
56 /**
57 * @brief Hook for manual frame pushing.
58 *
59 * If set via @ref set_manual_push_hook, @ref push_frame will delegate to
60 * this hook instead of analyzing frames internally.
61 *
62 * @param stream_name Name of the stream the frame belongs to.
63 * @param f Frame to process/consume (moved).
64 */
66 = std::function<void(const std::string& stream_name, frame&& f)>;
67
68 /**
69 * @brief Hook used to start a background daemon that produces frames.
70 *
71 * The manager provides an on-frame callback that the daemon should call for
72 * each produced frame. The daemon must also respect @p st and exit promptly
73 * when stop is requested.
74 *
75 * @param s Stream to run.
76 * @param on_frame Callback to deliver produced frames to the manager.
77 * @param st Stop token to observe for cancellation.
78 */
79 using daemon_start_fn = std::function<void(
80 const stream& s, std::function<void(frame&&)> on_frame,
81 std::stop_token st
82 )>;
83
84 /**
85 * @brief Frame analysis function.
86 *
87 * Called by @ref process_frame and (optionally) by the fake-event
88 * generator.
89 *
90 * @param s Stream metadata/context.
91 * @param f Frame to analyze (const reference).
92 * @return Vector of generated events (may be empty).
93 */
95 = std::function<std::vector<event>(const stream& s, const frame& f)>;
96
97 /**
98 * @brief Sink for individual events.
99 *
100 * If batch sink is not set, events are delivered one-by-one to this sink.
101 */
102 using event_sink_fn = std::function<void(const event& e)>;
103
104 /**
105 * @brief Sink for event batches.
106 *
107 * If set, @ref push_frame delivers events to this sink as a batch.
108 */
109 using event_batch_sink_fn
110 = std::function<void(const std::vector<event>& events)>;
111
112 /**
113 * @brief Construct manager and attempt to discover local streams.
114 *
115 * On Linux, the constructor probes /dev/video* devices and adds those that
116 * look like capture devices. After that, if a custom local detector is set,
117 * it may be used when @ref refresh_local_streams is called.
118 */
119 stream_manager();
120
121 /**
122 * @brief Dump all streams and lines to an output stream.
123 *
124 * Equivalent to calling @ref dump_stream and @ref dump_lines.
125 *
126 * @param out Output stream.
127 */
128 void dump(std::ostream& out) const;
129
130 /**
131 * @brief Dump all registered lines.
132 *
133 * Each line is printed with @ref line::dump.
134 *
135 * @param out Output stream.
136 */
137 void dump_lines(std::ostream& out) const;
138
139 /**
140 * @brief Dump all registered streams.
141 *
142 * If @p connections is true, prints connected line names per stream.
143 *
144 * @param out Output stream.
145 * @param connections Whether to include stream-line connections.
146 */
147 void dump_stream(std::ostream& out, bool connections = false) const;
148
149 /**
150 * @brief Set a custom local stream detector.
151 *
152 * The detector is stored and @ref refresh_local_streams is called
153 * immediately after setting.
154 *
155 * @param detector Detector functor.
156 */
157 void set_local_stream_detector(local_stream_detector_fn detector);
158
159 /**
160 * @brief Refresh local streams.
161 *
162 * Behavior:
163 * - On Linux: scans /dev/video* devices, validates capture capability,
164 * and auto-adds any not yet registered.
165 * - If a custom detector is set: calls it and adds returned streams
166 * that are not yet registered.
167 */
168 void refresh_local_streams();
169
170 /**
171 * @brief Add a new stream to the manager.
172 *
173 * If @p name is empty or already used, a unique name "stream_N" is
174 * generated.
175 *
176 * @param path Stream path/URL.
177 * @param name Optional explicit stream name.
178 * @param type Optional explicit type override passed to @ref stream ctor.
179 * @param loop Whether file streams should loop on EOF.
180 * @return Reference to the stored stream.
181 */
182 stream& add_stream(
183 const std::string& path, const std::string& name = {},
184 const std::string& type = {}, bool loop = true
185 );
186
187 /**
188 * @brief Add a new line (polyline/polygon) to the manager.
189 *
190 * If @p name is empty or already used, a unique name "line_N" is generated.
191 * The points string is parsed with @ref parse_points.
192 *
193 * @param points Textual representation of points.
194 * @param closed Whether the line is closed.
195 * @param name Optional explicit line name.
196 * @return Shared pointer to the stored immutable line.
197 * @throws std::runtime_error on invalid point string.
198 */
199 line_ptr add_line(
200 const std::string& points, bool closed = false,
201 const std::string& name = {}
202 );
203
204 /**
205 * @brief Connect an existing line to an existing stream.
206 *
207 * @param stream_name Name of the target stream.
208 * @param line_name Name of the line to connect.
209 * @return Reference to the stream.
210 * @throws std::runtime_error if stream or line is not found.
211 */
212 stream&
213 set_line(const std::string& stream_name, const std::string& line_name);
214
215 /**
216 * @brief Find a stream by name.
217 *
218 * @param name Stream name.
219 * @return Shared pointer to const stream, or empty if not found.
220 */
221 std::shared_ptr<const stream> find_stream(const std::string& name) const;
222
223 /**
224 * @brief List names of all registered streams.
225 */
226 std::vector<std::string> stream_names() const;
227
228 /**
229 * @brief List names of all registered lines.
230 */
231 std::vector<std::string> line_names() const;
232
233 /**
234 * @brief List names of lines connected to a given stream.
235 *
236 * @param stream_name Stream name.
237 * @return Vector of connected line names, or empty if stream not found.
238 */
239 std::vector<std::string> stream_lines(const std::string& stream_name) const;
240
241 /**
242 * @brief Set manual push hook.
243 *
244 * When set, @ref push_frame will call this hook and return immediately,
245 * skipping internal analysis and sinks.
246 *
247 * @param hook Hook functor (may be empty to unset).
248 */
249 void set_manual_push_hook(manual_push_fn hook);
250
251 /**
252 * @brief Set daemon start hook.
253 *
254 * This hook is required for @ref start_stream to do anything.
255 *
256 * @param hook Hook functor (may be empty to unset).
257 */
258 void set_daemon_start_hook(daemon_start_fn hook);
259
260 /**
261 * @brief Push a frame into the manager for a specific stream.
262 *
263 * Workflow:
264 * - If manual push hook is set, delegate to it.
265 * - Else analyze frame with @ref process_frame (throttled).
266 * - If batch sink is set, deliver whole batch.
267 * - Else if single-event sink is set, deliver events one-by-one.
268 *
269 * @param stream_name Stream name.
270 * @param f Frame to process (moved).
271 */
272 void push_frame(const std::string& stream_name, frame&& f);
273
274 /**
275 * @brief Start a daemon for a stream.
276 *
277 * Alias for @ref start_stream.
278 *
279 * @param stream_name Stream name.
280 */
281 void start_daemon(const std::string& stream_name);
282
283 /**
284 * @brief Set the frame processor.
285 *
286 * @param fn Analysis functor (may be empty to unset).
287 */
288 void set_frame_processor(frame_processor_fn fn);
289
290 /**
291 * @brief Analyze a frame and return generated events.
292 *
293 * Analysis is throttled per stream by @ref analysis_interval_ms.
294 * If the stream does not exist or processor is not set, returns empty list.
295 *
296 * @param stream_name Stream name.
297 * @param f Frame to analyze (moved into function; read-only for processor).
298 * @return Vector of produced events.
299 */
300 std::vector<event> process_frame(const std::string& stream_name, frame&& f);
301
302 /**
303 * @brief Set per-event sink.
304 *
305 * Ignored when batch sink is set.
306 *
307 * @param fn Sink functor (may be empty to unset).
308 */
309 void set_event_sink(event_sink_fn fn);
310
311 /**
312 * @brief Set batch event sink.
313 *
314 * When set, overrides per-event sink for delivery.
315 *
316 * @param fn Sink functor (may be empty to unset).
317 */
318 void set_event_batch_sink(event_batch_sink_fn fn);
319
320 /**
321 * @brief Set minimum analysis interval per stream, in milliseconds.
322 *
323 * Values <= 0 are ignored.
324 *
325 * @param ms Interval in ms.
326 */
327 void set_analysis_interval_ms(int ms);
328
329 /**
330 * @brief Start a stream daemon by name.
331 *
332 * Requirements:
333 * - @ref daemon_start hook must be set,
334 * - stream must exist,
335 * - daemon for this stream must not already be running.
336 *
337 * On Linux, local capture devices may be validated again before starting.
338 *
339 * @param name Stream name.
340 */
341 void start_stream(const std::string& name);
342
343 /**
344 * @brief Stop a running stream daemon by name.
345 *
346 * Requests stop on the associated @ref std::jthread and deactivates the
347 * stream (sets pipeline to @ref stream_pipeline::none).
348 *
349 * @param name Stream name.
350 */
351 void stop_stream(const std::string& name);
352
353 /**
354 * @brief Check whether a daemon for a stream is running.
355 *
356 * @param name Stream name.
357 * @return true if running.
358 */
359 bool is_stream_running(const std::string& name) const;
360
361 /**
362 * @brief Enable periodic fake events generation.
363 *
364 * Starts/keeps a background thread that calls the frame processor with a
365 * dummy frame on all streams every @p interval_ms milliseconds.
366 *
367 * @param interval_ms Period in ms (ignored if <= 0).
368 */
369 void enable_fake_events(int interval_ms = 700);
370
371 /**
372 * @brief Disable fake events generation.
373 *
374 * Requests stop on the fake-event thread (if running).
375 */
376 void disable_fake_events();
377
378 /**
379 * @brief Change the direction constraint of a stored line.
380 *
381 * Since lines are stored as immutable shared pointers, this method clones
382 * the line, changes @ref line::dir, and replaces the pointer in the
383 * registry.
384 *
385 * @param line_name Name of the line to reconfigure.
386 * @param dir New direction constraint.
387 * @throws std::runtime_error if the line does not exist.
388 */
389 void set_line_dir(const std::string& line_name, tripwire_dir dir);
390
391private:
392 /**
393 * @brief Take a snapshot of current streams.
394 *
395 * Used to iterate without holding the manager lock for long periods.
396 *
397 * @return Vector of shared pointers to streams.
398 */
399 std::vector<std::shared_ptr<stream>> snapshot_streams() const;
400
401 /**
402 * @brief Snapshot currently installed hooks.
403 *
404 * Copies hooks under lock so that background threads can use stable
405 * callables without racing with setters.
406 *
407 * @param fp Out: frame processor.
408 * @param es Out: per-event sink.
409 * @param bes Out: batch sink.
410 */
411 void snapshot_hooks(
412 frame_processor_fn& fp, event_sink_fn& es, event_batch_sink_fn& bes
413 ) const;
414
415 /**
416 * @brief Get current fake-event interval.
417 *
418 * @return Interval in milliseconds.
419 */
420 int current_fake_interval_ms() const;
421
422 /**
423 * @brief Background loop for fake-event generation.
424 *
425 * Periodically calls frame processor with a dummy frame on all streams and
426 * delivers events to configured sinks until @p st requests stop.
427 *
428 * @param st Stop token.
429 */
430 void run_fake_events(std::stop_token st);
431
432#ifdef __linux__
433 /**
434 * @brief Validate Linux capture devices before starting daemons.
435 *
436 * Local streams with /dev/video* paths may be rejected if they are not
437 * usable capture devices.
438 *
439 * @param s Stream to validate.
440 * @return true if the stream is OK to start.
441 */
442 static bool is_linux_capture_ok(const stream& s);
443#endif
444
445 /** @brief Registered streams keyed by name. */
446 std::unordered_map<std::string, std::shared_ptr<stream>> streams;
447
448 /** @brief Registered lines keyed by name. */
449 std::unordered_map<std::string, line_ptr> lines;
450
451 /** @brief Auto-name index for streams ("stream_N"). */
452 size_t stream_idx { 0 };
453
454 /** @brief Auto-name index for lines ("line_N"). */
455 size_t line_idx { 0 };
456
457 /** @brief Optional external local stream detector. */
458 local_stream_detector_fn stream_detector {};
459
460 /** @brief Optional manual frame push hook. */
461 manual_push_fn manual_push;
462
463 /** @brief Optional daemon start hook. */
464 daemon_start_fn daemon_start;
465
466 /** @brief Optional frame analysis hook. */
467 frame_processor_fn frame_processor;
468
469 /** @brief Optional per-event sink. */
470 event_sink_fn event_sink;
471
472 /** @brief Optional batch event sink. */
473 event_batch_sink_fn event_batch_sink;
474
475 /** @brief Per-stream analysis throttle interval. */
476 int analysis_interval_ms { 200 };
477
478 /**
479 * @brief Last analysis time per stream.
480 *
481 * Used to enforce @ref analysis_interval_ms.
482 */
483 std::unordered_map<std::string, std::chrono::steady_clock::time_point>
484 last_analysis_ts;
485
486 /** @brief Running daemon threads keyed by stream name. */
487 std::unordered_map<std::string, std::jthread> daemons;
488
489 /** @brief Fake-event generator thread. */
490 std::jthread fake_thread;
491
492 /** @brief Current fake-event interval. */
493 int fake_interval_ms { 700 };
494
495 /** @brief Whether fake-event generation is enabled. */
496 bool fake_enabled { false };
497
498 /** @brief Global mutex protecting manager state. */
499 mutable std::mutex mtx;
500};
501
502} // namespace yodau::backend
503
504#endif // YODAU_BACKEND_STREAM_MANAGER_HPP
Simple interactive CLI (REPL) for controlling a stream_manager.
void cmd_start_stream(const std::vector< std::string > &args) const
Handler for start-stream.
void dispatch_command(const std::string &cmd, const std::vector< std::string > &args) const
Dispatch a command to its handler.
void cmd_set_line(const std::vector< std::string > &args) const
Handler for set-line.
int run() const
Run the interactive command loop.
void cmd_add_line(const std::vector< std::string > &args) const
Handler for add-line.
static cxxopts::ParseResult parse_with_cxxopts(const std::string &cmd, const std::vector< std::string > &args, cxxopts::Options &options)
Parse command arguments using cxxopts.
backend::stream_manager & stream_mgr
Stream manager controlled by this CLI.
void cmd_stop_stream(const std::vector< std::string > &args) const
Handler for stop-stream.
void cmd_list_streams(const std::vector< std::string > &args) const
Handler for list-streams.
void cmd_add_stream(const std::vector< std::string > &args) const
Handler for add-stream.
void cmd_list_lines(const std::vector< std::string > &args) const
Handler for list-lines.
cli_client(backend::stream_manager &mgr)
Construct a CLI client operating on an existing manager.
Definition cli_client.cpp:5
static std::vector< std::string > tokenize(const std::string &line)
Split a line into whitespace-separated tokens.
Central coordinator for streams, geometry, frame processing and events.
std::function< void(const std::string &stream_name, frame &&f)> manual_push_fn
Hook for manual frame pushing.
Represents a single video stream and its analytic connections.
Definition stream.hpp:64
static yodau::backend::tripwire_dir parse_tripwire_dir(const std::string &s)
std::shared_ptr< line const > line_ptr
Shared, immutable line pointer.
Definition geometry.hpp:146
tripwire_dir
Allowed crossing direction for a tripwire.
Definition geometry.hpp:62
Generic event produced by the backend.
Definition event.hpp:44
Video frame container.
Definition frame.hpp:44