YODAU 1.0
YEAR OF THE DEPEND ADULT UNDERGARMENT
Loading...
Searching...
No Matches
yodau::backend::stream_manager Class Reference

Central coordinator for streams, geometry, frame processing and events. More...

#include <backend/include/stream_manager.hpp>

Public Types

using local_stream_detector_fn = std::function<std::vector<stream>()>
 Custom detector for local streams.
using manual_push_fn = std::function<void(const std::string& stream_name, frame&& f)>
 Hook for manual frame pushing.
using daemon_start_fn
 Hook used to start a background daemon that produces frames.
using frame_processor_fn = std::function<std::vector<event>(const stream& s, const frame& f)>
 Frame analysis function.
using event_sink_fn = std::function<void(const event& e)>
 Sink for individual events.
using event_batch_sink_fn = std::function<void(const std::vector<event>& events)>
 Sink for event batches.

Public Member Functions

 stream_manager ()
 Construct manager and attempt to discover local streams.
void dump (std::ostream &out) const
 Dump all streams and lines to an output stream.
void dump_lines (std::ostream &out) const
 Dump all registered lines.
void dump_stream (std::ostream &out, bool connections=false) const
 Dump all registered streams.
void set_local_stream_detector (local_stream_detector_fn detector)
 Set a custom local stream detector.
void refresh_local_streams ()
 Refresh local streams.
streamadd_stream (const std::string &path, const std::string &name={}, const std::string &type={}, bool loop=true)
 Add a new stream to the manager.
line_ptr add_line (const std::string &points, bool closed=false, const std::string &name={})
 Add a new line (polyline/polygon) to the manager.
streamset_line (const std::string &stream_name, const std::string &line_name)
 Connect an existing line to an existing stream.
std::shared_ptr< const streamfind_stream (const std::string &name) const
 Find a stream by name.
std::vector< std::string > stream_names () const
 List names of all registered streams.
std::vector< std::string > line_names () const
 List names of all registered lines.
std::vector< std::string > stream_lines (const std::string &stream_name) const
 List names of lines connected to a given stream.
void set_manual_push_hook (manual_push_fn hook)
 Set manual push hook.
void set_daemon_start_hook (daemon_start_fn hook)
 Set daemon start hook.
void push_frame (const std::string &stream_name, frame &&f)
 Push a frame into the manager for a specific stream.
void start_daemon (const std::string &stream_name)
 Start a daemon for a stream.
void set_frame_processor (frame_processor_fn fn)
 Set the frame processor.
std::vector< eventprocess_frame (const std::string &stream_name, frame &&f)
 Analyze a frame and return generated events.
void set_event_sink (event_sink_fn fn)
 Set per-event sink.
void set_event_batch_sink (event_batch_sink_fn fn)
 Set batch event sink.
void set_analysis_interval_ms (int ms)
 Set minimum analysis interval per stream, in milliseconds.
void start_stream (const std::string &name)
 Start a stream daemon by name.
void stop_stream (const std::string &name)
 Stop a running stream daemon by name.
bool is_stream_running (const std::string &name) const
 Check whether a daemon for a stream is running.
void enable_fake_events (int interval_ms=700)
 Enable periodic fake events generation.
void disable_fake_events ()
 Disable fake events generation.
void set_line_dir (const std::string &line_name, tripwire_dir dir)
 Change the direction constraint of a stored line.

Private Member Functions

std::vector< std::shared_ptr< stream > > snapshot_streams () const
 Take a snapshot of current streams.
void snapshot_hooks (frame_processor_fn &fp, event_sink_fn &es, event_batch_sink_fn &bes) const
 Snapshot currently installed hooks.
int current_fake_interval_ms () const
 Get current fake-event interval.
void run_fake_events (std::stop_token st)
 Background loop for fake-event generation.

Private Attributes

std::unordered_map< std::string, std::shared_ptr< stream > > streams
 Registered streams keyed by name.
std::unordered_map< std::string, line_ptrlines
 Registered lines keyed by name.
size_t stream_idx { 0 }
 Auto-name index for streams ("stream_N").
size_t line_idx { 0 }
 Auto-name index for lines ("line_N").
local_stream_detector_fn stream_detector {}
 Optional external local stream detector.
manual_push_fn manual_push
 Optional manual frame push hook.
daemon_start_fn daemon_start
 Optional daemon start hook.
frame_processor_fn frame_processor
 Optional frame analysis hook.
event_sink_fn event_sink
 Optional per-event sink.
event_batch_sink_fn event_batch_sink
 Optional batch event sink.
int analysis_interval_ms { 200 }
 Per-stream analysis throttle interval.
std::unordered_map< std::string, std::chrono::steady_clock::time_point > last_analysis_ts
 Last analysis time per stream.
std::unordered_map< std::string, std::jthread > daemons
 Running daemon threads keyed by stream name.
std::jthread fake_thread
 Fake-event generator thread.
int fake_interval_ms { 700 }
 Current fake-event interval.
bool fake_enabled { false }
 Whether fake-event generation is enabled.
std::mutex mtx
 Global mutex protecting manager state.

Detailed Description

Central coordinator for streams, geometry, frame processing and events.

The stream_manager owns:

  • a registry of streams (stream) addressable by name,
  • a registry of lines (line) addressable by name,
  • hooks for stream discovery, frame ingestion, background capture daemons, frame analysis, and event delivery.

Typical responsibilities:

  • Add and look up streams/lines.
  • Connect lines to streams.
  • Start/stop stream daemons (background frame producers).
  • Accept manually pushed frames and throttle analysis per stream.
  • Deliver produced events to configured sinks.

Thread-safety:

  • All public methods lock mtx unless otherwise noted.
  • Background threads (daemons and fake-event generator) also use mtx.
Note
All timestamps use std::chrono::steady_clock, i.e. monotonic time.

Definition at line 45 of file stream_manager.hpp.

Member Typedef Documentation

◆ daemon_start_fn

Initial value:
std::function<void(
const stream& s, std::function<void(frame&&)> on_frame,
std::stop_token st
)>
Represents a single video stream and its analytic connections.
Definition stream.hpp:64
Video frame container.
Definition frame.hpp:44

Hook used to start a background daemon that produces frames.

The manager provides an on-frame callback that the daemon should call for each produced frame. The daemon must also respect st and exit promptly when stop is requested.

Parameters
sStream to run.
on_frameCallback to deliver produced frames to the manager.
stStop token to observe for cancellation.

Definition at line 78 of file stream_manager.hpp.

◆ event_batch_sink_fn

using yodau::backend::stream_manager::event_batch_sink_fn = std::function<void(const std::vector<event>& events)>

Sink for event batches.

If set, push_frame delivers events to this sink as a batch.

Definition at line 107 of file stream_manager.hpp.

◆ event_sink_fn

using yodau::backend::stream_manager::event_sink_fn = std::function<void(const event& e)>

Sink for individual events.

If batch sink is not set, events are delivered one-by-one to this sink.

Definition at line 100 of file stream_manager.hpp.

◆ frame_processor_fn

using yodau::backend::stream_manager::frame_processor_fn = std::function<std::vector<event>(const stream& s, const frame& f)>

Frame analysis function.

Called by process_frame and (optionally) by the fake-event generator.

Parameters
sStream metadata/context.
fFrame to analyze (const reference).
Returns
Vector of generated events (may be empty).

Definition at line 93 of file stream_manager.hpp.

◆ local_stream_detector_fn

Custom detector for local streams.

The detector is expected to return a list of streams discovered at call time. Returned streams are moved into the manager if their names are not already present.

Definition at line 54 of file stream_manager.hpp.

◆ manual_push_fn

using yodau::backend::stream_manager::manual_push_fn = std::function<void(const std::string& stream_name, frame&& f)>

Hook for manual frame pushing.

If set via set_manual_push_hook, push_frame will delegate to this hook instead of analyzing frames internally.

Parameters
stream_nameName of the stream the frame belongs to.
fFrame to process/consume (moved).

Definition at line 65 of file stream_manager.hpp.

Constructor & Destructor Documentation

◆ stream_manager()

yodau::backend::stream_manager::stream_manager ( )

Construct manager and attempt to discover local streams.

On Linux, the constructor probes /dev/video* devices and adds those that look like capture devices. After that, if a custom local detector is set, it may be used when refresh_local_streams is called.

Definition at line 44 of file stream_manager.cpp.

void refresh_local_streams()
Refresh local streams.

Member Function Documentation

◆ add_line()

yodau::backend::line_ptr yodau::backend::stream_manager::add_line ( const std::string & points,
bool closed = false,
const std::string & name = {} )

Add a new line (polyline/polygon) to the manager.

If name is empty or already used, a unique name "line_N" is generated. The points string is parsed with parse_points.

Parameters
pointsTextual representation of points.
closedWhether the line is closed.
nameOptional explicit line name.
Returns
Shared pointer to the stored immutable line.
Exceptions
std::runtime_erroron invalid point string.

Definition at line 143 of file stream_manager.cpp.

145 {
146 std::scoped_lock lock(mtx);
147 std::vector<point> parsed_points = parse_points(points);
148 std::string line_name = name;
149 while (line_name.empty() || lines.contains(line_name)) {
150 line_name = "line_" + std::to_string(line_idx++);
151 }
152 auto new_line = make_line(std::move(parsed_points), line_name, closed);
153 lines.emplace(line_name, new_line);
154 return new_line;
155}
size_t line_idx
Auto-name index for lines ("line_N").
std::mutex mtx
Global mutex protecting manager state.
std::unordered_map< std::string, line_ptr > lines
Registered lines keyed by name.
line_ptr make_line(std::vector< point > points, std::string name, bool closed=false)
Create and normalize a line.
Definition geometry.cpp:82
std::vector< point > parse_points(const std::string &points_str)
Parse points from a textual representation.
Definition geometry.cpp:94

◆ add_stream()

yodau::backend::stream & yodau::backend::stream_manager::add_stream ( const std::string & path,
const std::string & name = {},
const std::string & type = {},
bool loop = true )

Add a new stream to the manager.

If name is empty or already used, a unique name "stream_N" is generated.

Parameters
pathStream path/URL.
nameOptional explicit stream name.
typeOptional explicit type override passed to stream ctor.
loopWhether file streams should loop on EOF.
Returns
Reference to the stored stream.

Definition at line 128 of file stream_manager.cpp.

131 {
132 std::scoped_lock lock(mtx);
133 std::string stream_name = name;
134 while (stream_name.empty() || streams.contains(stream_name)) {
135 stream_name = "stream_" + std::to_string(stream_idx++);
136 }
137 auto new_stream = std::make_shared<stream>(path, stream_name, type, loop);
138 auto& ref = *new_stream;
139 streams.emplace(stream_name, std::move(new_stream));
140 return ref;
141}
size_t stream_idx
Auto-name index for streams ("stream_N").
std::unordered_map< std::string, std::shared_ptr< stream > > streams
Registered streams keyed by name.

◆ current_fake_interval_ms()

int yodau::backend::stream_manager::current_fake_interval_ms ( ) const
private

Get current fake-event interval.

Returns
Interval in milliseconds.

Definition at line 491 of file stream_manager.cpp.

491 {
492 std::scoped_lock lock(mtx);
493 return fake_interval_ms;
494}
int fake_interval_ms
Current fake-event interval.

◆ disable_fake_events()

void yodau::backend::stream_manager::disable_fake_events ( )

Disable fake events generation.

Requests stop on the fake-event thread (if running).

Definition at line 430 of file stream_manager.cpp.

430 {
431 std::jthread th;
432
433 {
434 std::scoped_lock lock(mtx);
435
436 if (!fake_enabled) {
437 return;
438 }
439
440 fake_enabled = false;
441 th = std::move(fake_thread);
442 fake_thread = std::jthread();
443 }
444
445 if (th.joinable()) {
446 th.request_stop();
447 }
448}
std::jthread fake_thread
Fake-event generator thread.
bool fake_enabled
Whether fake-event generation is enabled.

◆ dump()

void yodau::backend::stream_manager::dump ( std::ostream & out) const

Dump all streams and lines to an output stream.

Equivalent to calling dump_stream and dump_lines.

Parameters
outOutput stream.

Definition at line 46 of file stream_manager.cpp.

46 {
47 std::scoped_lock lock(mtx);
48 dump_stream(out);
49 out << "\n";
50 dump_lines(out);
51}
void dump_stream(std::ostream &out, bool connections=false) const
Dump all registered streams.
void dump_lines(std::ostream &out) const
Dump all registered lines.

◆ dump_lines()

void yodau::backend::stream_manager::dump_lines ( std::ostream & out) const

Dump all registered lines.

Each line is printed with line::dump.

Parameters
outOutput stream.

Definition at line 53 of file stream_manager.cpp.

53 {
54 std::scoped_lock lock(mtx);
55 out << lines.size() << " lines:";
56 for (const auto& line : lines | std::views::values) {
57 out << "\n\t";
58 line->dump(out);
59 }
60}

◆ dump_stream()

void yodau::backend::stream_manager::dump_stream ( std::ostream & out,
bool connections = false ) const

Dump all registered streams.

If connections is true, prints connected line names per stream.

Parameters
outOutput stream.
connectionsWhether to include stream-line connections.

Definition at line 62 of file stream_manager.cpp.

64 {
65 std::scoped_lock lock(mtx);
66 out << streams.size() << " streams:";
67 for (const auto& stream : streams | std::views::values) {
68 out << "\n\t";
69 stream->dump(out, connections);
70 }
71}

◆ enable_fake_events()

void yodau::backend::stream_manager::enable_fake_events ( int interval_ms = 700)

Enable periodic fake events generation.

Starts/keeps a background thread that calls the frame processor with a dummy frame on all streams every interval_ms milliseconds.

Parameters
interval_msPeriod in ms (ignored if <= 0).

Definition at line 403 of file stream_manager.cpp.

403 {
404 {
405 std::scoped_lock lock(mtx);
406
407 if (interval_ms > 0) {
408 fake_interval_ms = interval_ms;
409 }
410
411 if (fake_enabled) {
412 return;
413 }
414
415 fake_enabled = true;
416 }
417
418 std::jthread th([this](std::stop_token st) { run_fake_events(st); });
419
420 {
421 std::scoped_lock lock(mtx);
422 if (!fake_enabled) {
423 th.request_stop();
424 return;
425 }
426 fake_thread = std::move(th);
427 }
428}
void run_fake_events(std::stop_token st)
Background loop for fake-event generation.

◆ find_stream()

std::shared_ptr< const yodau::backend::stream > yodau::backend::stream_manager::find_stream ( const std::string & name) const

Find a stream by name.

Parameters
nameStream name.
Returns
Shared pointer to const stream, or empty if not found.

Definition at line 174 of file stream_manager.cpp.

174 {
175 std::scoped_lock lock(mtx);
176 const auto it = streams.find(name);
177 if (it == streams.end()) {
178 return {};
179 }
180 return it->second;
181}

◆ is_stream_running()

bool yodau::backend::stream_manager::is_stream_running ( const std::string & name) const

Check whether a daemon for a stream is running.

Parameters
nameStream name.
Returns
true if running.

Definition at line 396 of file stream_manager.cpp.

398 {
399 std::scoped_lock lock(mtx);
400 return daemons.contains(name);
401}
std::unordered_map< std::string, std::jthread > daemons
Running daemon threads keyed by stream name.

◆ line_names()

std::vector< std::string > yodau::backend::stream_manager::line_names ( ) const

List names of all registered lines.

Definition at line 189 of file stream_manager.cpp.

189 {
190 std::scoped_lock lock(mtx);
191 return lines | std::views::keys
192 | std::ranges::to<std::vector<std::string>>();
193}

◆ process_frame()

std::vector< yodau::backend::event > yodau::backend::stream_manager::process_frame ( const std::string & stream_name,
frame && f )

Analyze a frame and return generated events.

Analysis is throttled per stream by analysis_interval_ms. If the stream does not exist or processor is not set, returns empty list.

Parameters
stream_nameStream name.
fFrame to analyze (moved into function; read-only for processor).
Returns
Vector of produced events.

Definition at line 267 of file stream_manager.cpp.

269 {
270 std::shared_ptr<stream> sp;
272 const auto now = std::chrono::steady_clock::now();
273 bool allow = false;
274
275 {
276 std::scoped_lock lock(mtx);
277 auto it = streams.find(stream_name);
278 if (it == streams.end() || !frame_processor) {
279 return {};
280 }
281
282 fp = frame_processor;
283 sp = it->second;
284
285 const auto last_it = last_analysis_ts.find(stream_name);
286 if (last_it == last_analysis_ts.end()) {
287 last_analysis_ts[stream_name] = now;
288 allow = true;
289 } else {
290 const auto dt
291 = std::chrono::duration_cast<std::chrono::milliseconds>(
292 now - last_it->second
293 )
294 .count();
295 if (dt >= analysis_interval_ms) {
296 last_analysis_ts[stream_name] = now;
297 allow = true;
298 }
299 }
300 }
301
302 if (!allow || !sp || !fp) {
303 return {};
304 }
305
306 return fp(*sp, f);
307}
std::unordered_map< std::string, std::chrono::steady_clock::time_point > last_analysis_ts
Last analysis time per stream.
std::function< std::vector< event >(const stream &s, const frame &f)> frame_processor_fn
Frame analysis function.
frame_processor_fn frame_processor
Optional frame analysis hook.
int analysis_interval_ms
Per-stream analysis throttle interval.

◆ push_frame()

void yodau::backend::stream_manager::push_frame ( const std::string & stream_name,
frame && f )

Push a frame into the manager for a specific stream.

Workflow:

  • If manual push hook is set, delegate to it.
  • Else analyze frame with process_frame (throttled).
  • If batch sink is set, deliver whole batch.
  • Else if single-event sink is set, deliver events one-by-one.
Parameters
stream_nameStream name.
fFrame to process (moved).

Definition at line 218 of file stream_manager.cpp.

220 {
222 event_sink_fn es;
224
225 {
226 std::scoped_lock lock(mtx);
227 mp = manual_push;
228 es = event_sink;
229 bes = event_batch_sink;
230 }
231
232 if (mp) {
233 mp(stream_name, std::move(f));
234 return;
235 }
236
237 auto events = process_frame(stream_name, std::move(f));
238
239 if (bes) {
240 bes(events);
241 return;
242 }
243
244 if (!es) {
245 return;
246 }
247
248 for (const auto& e : events) {
249 es(e);
250 }
251}
std::function< void(const event &e)> event_sink_fn
Sink for individual events.
std::vector< event > process_frame(const std::string &stream_name, frame &&f)
Analyze a frame and return generated events.
event_sink_fn event_sink
Optional per-event sink.
std::function< void(const std::string &stream_name, frame &&f)> manual_push_fn
Hook for manual frame pushing.
manual_push_fn manual_push
Optional manual frame push hook.
event_batch_sink_fn event_batch_sink
Optional batch event sink.
std::function< void(const std::vector< event > &events)> event_batch_sink_fn
Sink for event batches.

◆ refresh_local_streams()

void yodau::backend::stream_manager::refresh_local_streams ( )

Refresh local streams.

Behavior:

  • On Linux: scans /dev/video* devices, validates capture capability, and auto-adds any not yet registered.
  • If a custom detector is set: calls it and adds returned streams that are not yet registered.

Definition at line 83 of file stream_manager.cpp.

83 {
84#ifdef __linux__
85 for (size_t idx = 0;; ++idx) {
86 std::string path = "/dev/video" + std::to_string(idx);
87 if (!std::filesystem::exists(path)) {
88 break;
89 }
90 if (!is_capture_device(path)) {
91 continue;
92 }
93
94 {
95 std::scoped_lock lock(mtx);
96 if (streams.contains("video" + std::to_string(idx))) {
97 continue;
98 }
99 }
100
101 add_stream(path, "video" + std::to_string(idx), "local");
102 }
103#endif
104
106 {
107 std::scoped_lock lock(mtx);
108 det = stream_detector;
109 }
110
111 if (!det) {
112 return;
113 }
114
115 auto detected_streams = det();
116 for (auto& detected_stream : detected_streams) {
117 const auto name = detected_stream.get_name();
118
119 std::scoped_lock lock(mtx);
120 if (!streams.contains(name)) { // todo: update existing streams?
121 streams.emplace(
122 name, std::make_shared<stream>(std::move(detected_stream))
123 );
124 }
125 }
126}
stream & add_stream(const std::string &path, const std::string &name={}, const std::string &type={}, bool loop=true)
Add a new stream to the manager.
std::function< std::vector< stream >()> local_stream_detector_fn
Custom detector for local streams.
local_stream_detector_fn stream_detector
Optional external local stream detector.

◆ run_fake_events()

void yodau::backend::stream_manager::run_fake_events ( std::stop_token st)
private

Background loop for fake-event generation.

Periodically calls frame processor with a dummy frame on all streams and delivers events to configured sinks until st requests stop.

Parameters
stStop token.

Definition at line 496 of file stream_manager.cpp.

496 {
497 frame dummy;
498
499 while (!st.stop_requested()) {
500 auto snap = snapshot_streams();
501
503 event_sink_fn es;
505 snapshot_hooks(fp, es, bes);
506
507 if (fp) {
508 for (const auto& sp : snap) {
509 auto evs = fp(*sp, dummy);
510
511 if (bes) {
512 if (!evs.empty()) {
513 bes(evs);
514 }
515 } else if (es) {
516 for (const auto& e : evs) {
517 es(e);
518 }
519 }
520 }
521 }
522
523 const int interval = current_fake_interval_ms();
524 std::this_thread::sleep_for(std::chrono::milliseconds(interval));
525 }
526}
std::vector< std::shared_ptr< stream > > snapshot_streams() const
Take a snapshot of current streams.
void snapshot_hooks(frame_processor_fn &fp, event_sink_fn &es, event_batch_sink_fn &bes) const
Snapshot currently installed hooks.
int current_fake_interval_ms() const
Get current fake-event interval.

◆ set_analysis_interval_ms()

void yodau::backend::stream_manager::set_analysis_interval_ms ( int ms)

Set minimum analysis interval per stream, in milliseconds.

Values <= 0 are ignored.

Parameters
msInterval in ms.

Definition at line 321 of file stream_manager.cpp.

321 {
322 if (ms <= 0) {
323 return;
324 }
325 std::scoped_lock lock(mtx);
327}

◆ set_daemon_start_hook()

void yodau::backend::stream_manager::set_daemon_start_hook ( daemon_start_fn hook)

Set daemon start hook.

This hook is required for start_stream to do anything.

Parameters
hookHook functor (may be empty to unset).

Definition at line 211 of file stream_manager.cpp.

213 {
214 std::scoped_lock lock(mtx);
215 daemon_start = std::move(hook);
216}
daemon_start_fn daemon_start
Optional daemon start hook.

◆ set_event_batch_sink()

void yodau::backend::stream_manager::set_event_batch_sink ( event_batch_sink_fn fn)

Set batch event sink.

When set, overrides per-event sink for delivery.

Parameters
fnSink functor (may be empty to unset).

Definition at line 314 of file stream_manager.cpp.

316 {
317 std::scoped_lock lock(mtx);
318 event_batch_sink = std::move(fn);
319}

◆ set_event_sink()

void yodau::backend::stream_manager::set_event_sink ( event_sink_fn fn)

Set per-event sink.

Ignored when batch sink is set.

Parameters
fnSink functor (may be empty to unset).

Definition at line 309 of file stream_manager.cpp.

309 {
310 std::scoped_lock lock(mtx);
311 event_sink = std::move(fn);
312}

◆ set_frame_processor()

void yodau::backend::stream_manager::set_frame_processor ( frame_processor_fn fn)

Set the frame processor.

Parameters
fnAnalysis functor (may be empty to unset).

Definition at line 259 of file stream_manager.cpp.

261 {
262 std::scoped_lock lock(mtx);
263 frame_processor = std::move(fn);
264}

◆ set_line()

yodau::backend::stream & yodau::backend::stream_manager::set_line ( const std::string & stream_name,
const std::string & line_name )

Connect an existing line to an existing stream.

Parameters
stream_nameName of the target stream.
line_nameName of the line to connect.
Returns
Reference to the stream.
Exceptions
std::runtime_errorif stream or line is not found.

Definition at line 157 of file stream_manager.cpp.

159 {
160 std::scoped_lock lock(mtx);
161 const auto stream_it = streams.find(stream_name);
162 if (stream_it == streams.end()) {
163 throw std::runtime_error("stream not found: " + stream_name);
164 }
165 const auto line_it = lines.find(line_name);
166 if (line_it == lines.end()) {
167 throw std::runtime_error("line not found: " + line_name);
168 }
169 stream_it->second->connect_line(line_it->second);
170 return *stream_it->second;
171}

◆ set_line_dir()

void yodau::backend::stream_manager::set_line_dir ( const std::string & line_name,
tripwire_dir dir )

Change the direction constraint of a stored line.

Since lines are stored as immutable shared pointers, this method clones the line, changes line::dir, and replaces the pointer in the registry.

Parameters
line_nameName of the line to reconfigure.
dirNew direction constraint.
Exceptions
std::runtime_errorif the line does not exist.

Definition at line 450 of file stream_manager.cpp.

452 {
453 std::scoped_lock lock(mtx);
454
455 const auto it = lines.find(line_name);
456 if (it == lines.end() || !it->second) {
457 throw std::runtime_error("line not found: " + line_name);
458 }
459
460 const auto old_ptr = it->second;
461 auto new_ptr = std::make_shared<line>(*old_ptr);
462 new_ptr->dir = dir;
463
464 it->second = new_ptr;
465}

◆ set_local_stream_detector()

void yodau::backend::stream_manager::set_local_stream_detector ( local_stream_detector_fn detector)

Set a custom local stream detector.

The detector is stored and refresh_local_streams is called immediately after setting.

Parameters
detectorDetector functor.

Definition at line 73 of file stream_manager.cpp.

75 {
76 {
77 std::scoped_lock lock(mtx);
78 stream_detector = std::move(detector);
79 }
81}

◆ set_manual_push_hook()

void yodau::backend::stream_manager::set_manual_push_hook ( manual_push_fn hook)

Set manual push hook.

When set, push_frame will call this hook and return immediately, skipping internal analysis and sinks.

Parameters
hookHook functor (may be empty to unset).

Definition at line 206 of file stream_manager.cpp.

206 {
207 std::scoped_lock lock(mtx);
208 manual_push = std::move(hook);
209}

◆ snapshot_hooks()

void yodau::backend::stream_manager::snapshot_hooks ( frame_processor_fn & fp,
event_sink_fn & es,
event_batch_sink_fn & bes ) const
private

Snapshot currently installed hooks.

Copies hooks under lock so that background threads can use stable callables without racing with setters.

Parameters
fpOut: frame processor.
esOut: per-event sink.
besOut: batch sink.

Definition at line 482 of file stream_manager.cpp.

484 {
485 std::scoped_lock lock(mtx);
486 fp = frame_processor;
487 es = event_sink;
488 bes = event_batch_sink;
489}

◆ snapshot_streams()

std::vector< std::shared_ptr< yodau::backend::stream > > yodau::backend::stream_manager::snapshot_streams ( ) const
private

Take a snapshot of current streams.

Used to iterate without holding the manager lock for long periods.

Returns
Vector of shared pointers to streams.

Definition at line 468 of file stream_manager.cpp.

468 {
469 std::vector<std::shared_ptr<stream>> snap;
470
471 std::scoped_lock lock(mtx);
472 snap.reserve(streams.size());
473 for (auto& sp : streams | std::views::values) {
474 if (sp) {
475 snap.push_back(sp);
476 }
477 }
478
479 return snap;
480}

◆ start_daemon()

void yodau::backend::stream_manager::start_daemon ( const std::string & stream_name)

Start a daemon for a stream.

Alias for start_stream.

Parameters
stream_nameStream name.

Definition at line 253 of file stream_manager.cpp.

255 {
256 start_stream(stream_name);
257}
void start_stream(const std::string &name)
Start a stream daemon by name.

◆ start_stream()

void yodau::backend::stream_manager::start_stream ( const std::string & name)

Start a stream daemon by name.

Requirements:

  • daemon_start hook must be set,
  • stream must exist,
  • daemon for this stream must not already be running.

On Linux, local capture devices may be validated again before starting.

Parameters
nameStream name.

Definition at line 329 of file stream_manager.cpp.

329 {
330 std::shared_ptr<stream> sp;
332
333 {
334 std::scoped_lock lock(mtx);
335 if (!daemon_start || daemons.contains(name)) {
336 return;
337 }
338
339 const auto it = streams.find(name);
340 if (it == streams.end() || !it->second) {
341 return;
342 }
343
344 sp = it->second;
345 ds = daemon_start;
346
347#ifdef __linux__
348 if (!is_linux_capture_ok(*sp)) {
349 return;
350 }
351#endif
352
353 sp->activate(stream_pipeline::automatic);
354 }
355
356 std::jthread th([this, name, sp, ds](std::stop_token st) mutable {
357 ds(
358 *sp, [this, name](frame&& f) { push_frame(name, std::move(f)); }, st
359 );
360 });
361
362 {
363 std::scoped_lock lock(mtx);
364 daemons.emplace(name, std::move(th));
365 }
366}
std::function< void( const stream &s, std::function< void(frame &&)> on_frame, std::stop_token st)> daemon_start_fn
Hook used to start a background daemon that produces frames.
void push_frame(const std::string &stream_name, frame &&f)
Push a frame into the manager for a specific stream.

References yodau::backend::automatic.

◆ stop_stream()

void yodau::backend::stream_manager::stop_stream ( const std::string & name)

Stop a running stream daemon by name.

Requests stop on the associated std::jthread and deactivates the stream (sets pipeline to stream_pipeline::none).

Parameters
nameStream name.

Definition at line 368 of file stream_manager.cpp.

368 {
369 std::jthread th;
370 std::shared_ptr<stream> sp;
371
372 {
373 std::scoped_lock lock(mtx);
374 const auto it = daemons.find(name);
375 if (it == daemons.end()) {
376 return;
377 }
378
379 th = std::move(it->second);
380 daemons.erase(it);
381
382 const auto sit = streams.find(name);
383 if (sit != streams.end()) {
384 sp = sit->second;
385 }
386 }
387
388 th.request_stop();
389
390 if (sp) {
391 std::scoped_lock lock(mtx);
392 sp->deactivate();
393 }
394}

◆ stream_lines()

std::vector< std::string > yodau::backend::stream_manager::stream_lines ( const std::string & stream_name) const

List names of lines connected to a given stream.

Parameters
stream_nameStream name.
Returns
Vector of connected line names, or empty if stream not found.

Definition at line 195 of file stream_manager.cpp.

197 {
198 std::scoped_lock lock(mtx);
199 const auto stream_it = streams.find(stream_name);
200 if (stream_it == streams.end()) {
201 return {};
202 }
203 return stream_it->second->line_names();
204}

◆ stream_names()

std::vector< std::string > yodau::backend::stream_manager::stream_names ( ) const

List names of all registered streams.

Definition at line 183 of file stream_manager.cpp.

183 {
184 std::scoped_lock lock(mtx);
185 return streams | std::views::keys
186 | std::ranges::to<std::vector<std::string>>();
187}

Member Data Documentation

◆ analysis_interval_ms

int yodau::backend::stream_manager::analysis_interval_ms { 200 }
private

Per-stream analysis throttle interval.

Definition at line 473 of file stream_manager.hpp.

◆ daemon_start

daemon_start_fn yodau::backend::stream_manager::daemon_start
private

Optional daemon start hook.

Definition at line 461 of file stream_manager.hpp.

◆ daemons

std::unordered_map<std::string, std::jthread> yodau::backend::stream_manager::daemons
private

Running daemon threads keyed by stream name.

Definition at line 484 of file stream_manager.hpp.

◆ event_batch_sink

event_batch_sink_fn yodau::backend::stream_manager::event_batch_sink
private

Optional batch event sink.

Definition at line 470 of file stream_manager.hpp.

◆ event_sink

event_sink_fn yodau::backend::stream_manager::event_sink
private

Optional per-event sink.

Definition at line 467 of file stream_manager.hpp.

◆ fake_enabled

bool yodau::backend::stream_manager::fake_enabled { false }
private

Whether fake-event generation is enabled.

Definition at line 493 of file stream_manager.hpp.

493{ 700 };

◆ fake_interval_ms

int yodau::backend::stream_manager::fake_interval_ms { 700 }
private

Current fake-event interval.

Definition at line 490 of file stream_manager.hpp.

◆ fake_thread

std::jthread yodau::backend::stream_manager::fake_thread
private

Fake-event generator thread.

Definition at line 487 of file stream_manager.hpp.

◆ frame_processor

frame_processor_fn yodau::backend::stream_manager::frame_processor
private

Optional frame analysis hook.

Definition at line 464 of file stream_manager.hpp.

◆ last_analysis_ts

std::unordered_map<std::string, std::chrono::steady_clock::time_point> yodau::backend::stream_manager::last_analysis_ts
private

Last analysis time per stream.

Used to enforce analysis_interval_ms.

Definition at line 481 of file stream_manager.hpp.

◆ line_idx

size_t yodau::backend::stream_manager::line_idx { 0 }
private

Auto-name index for lines ("line_N").

Definition at line 452 of file stream_manager.hpp.

452{ 0 };

◆ lines

std::unordered_map<std::string, line_ptr> yodau::backend::stream_manager::lines
private

Registered lines keyed by name.

Definition at line 446 of file stream_manager.hpp.

◆ manual_push

manual_push_fn yodau::backend::stream_manager::manual_push
private

Optional manual frame push hook.

Definition at line 458 of file stream_manager.hpp.

◆ mtx

std::mutex yodau::backend::stream_manager::mtx
mutableprivate

Global mutex protecting manager state.

Definition at line 496 of file stream_manager.hpp.

◆ stream_detector

local_stream_detector_fn yodau::backend::stream_manager::stream_detector {}
private

Optional external local stream detector.

Definition at line 455 of file stream_manager.hpp.

455{ 0 };

◆ stream_idx

size_t yodau::backend::stream_manager::stream_idx { 0 }
private

Auto-name index for streams ("stream_N").

Definition at line 449 of file stream_manager.hpp.

◆ streams

std::unordered_map<std::string, std::shared_ptr<stream> > yodau::backend::stream_manager::streams
private

Registered streams keyed by name.

Definition at line 443 of file stream_manager.hpp.


The documentation for this class was generated from the following files: