10#include <linux/videodev2.h>
15bool is_capture_device(
const std::string& path) {
16 const int fd = ::open(path.c_str(), O_RDONLY | O_NONBLOCK);
21 v4l2_capability cap {};
22 const int rc = ::ioctl(fd, VIDIOC_QUERYCAP, &cap);
29 std::uint32_t caps = cap.capabilities;
30 if (caps & V4L2_CAP_DEVICE_CAPS) {
31 caps = cap.device_caps;
34 const bool capture = (caps & V4L2_CAP_VIDEO_CAPTURE)
35 || (caps & V4L2_CAP_VIDEO_CAPTURE_MPLANE);
37 const bool streaming = (caps & V4L2_CAP_STREAMING);
39 return capture && streaming;
47 std::scoped_lock lock(mtx);
54 std::scoped_lock lock(mtx);
55 out << lines.size() <<
" lines:";
56 for (
const auto& line : lines | std::views::values) {
63 std::ostream& out,
const bool connections
65 std::scoped_lock lock(mtx);
66 out << streams.size() <<
" streams:";
67 for (
const auto& stream : streams | std::views::values) {
69 stream->dump(out, connections);
74 local_stream_detector_fn detector
77 std::scoped_lock lock(mtx);
78 stream_detector = std::move(detector);
80 refresh_local_streams();
85 for (size_t idx = 0;; ++idx) {
86 std::string path =
"/dev/video" + std::to_string(idx);
87 if (!std::filesystem::exists(path)) {
90 if (!is_capture_device(path)) {
95 std::scoped_lock lock(mtx);
96 if (streams.contains(
"video" + std::to_string(idx))) {
101 add_stream(path,
"video" + std::to_string(idx),
"local");
105 local_stream_detector_fn det;
107 std::scoped_lock lock(mtx);
108 det = stream_detector;
115 auto detected_streams = det();
116 for (
auto& detected_stream : detected_streams) {
117 const auto name = detected_stream.get_name();
119 std::scoped_lock lock(mtx);
120 if (!streams.contains(name)) {
122 name, std::make_shared<stream>(std::move(detected_stream))
129 const std::string& path,
const std::string& name,
const std::string& type,
132 std::scoped_lock lock(mtx);
133 std::string stream_name = name;
134 while (stream_name.empty() || streams.contains(stream_name)) {
135 stream_name =
"stream_" + std::to_string(stream_idx++);
137 auto new_stream = std::make_shared<
stream>(path, stream_name, type, loop);
138 auto& ref = *new_stream;
139 streams.emplace(stream_name, std::move(new_stream));
144 const std::string& points,
const bool closed,
const std::string& name
146 std::scoped_lock lock(mtx);
147 std::vector<point> parsed_points = parse_points(points);
148 std::string line_name = name;
149 while (line_name.empty() || lines.contains(line_name)) {
150 line_name =
"line_" + std::to_string(line_idx++);
152 auto new_line = make_line(std::move(parsed_points), line_name, closed);
153 lines.emplace(line_name, new_line);
158 const std::string& stream_name,
const std::string& line_name
160 std::scoped_lock lock(mtx);
161 const auto stream_it = streams.find(stream_name);
162 if (stream_it == streams.end()) {
163 throw std::runtime_error(
"stream not found: " + stream_name);
165 const auto line_it = lines.find(line_name);
166 if (line_it == lines.end()) {
167 throw std::runtime_error(
"line not found: " + line_name);
169 stream_it->second->connect_line(line_it->second);
170 return *stream_it->second;
175 std::scoped_lock lock(mtx);
176 const auto it = streams.find(name);
177 if (it == streams.end()) {
184 std::scoped_lock lock(mtx);
185 return streams | std::views::keys
186 | std::ranges::to<std::vector<std::string>>();
190 std::scoped_lock lock(mtx);
191 return lines | std::views::keys
192 | std::ranges::to<std::vector<std::string>>();
196 const std::string& stream_name
198 std::scoped_lock lock(mtx);
199 const auto stream_it = streams.find(stream_name);
200 if (stream_it == streams.end()) {
203 return stream_it->second->line_names();
207 std::scoped_lock lock(mtx);
208 manual_push = std::move(hook);
214 std::scoped_lock lock(mtx);
215 daemon_start = std::move(hook);
219 const std::string& stream_name,
frame&& f
223 event_batch_sink_fn bes;
226 std::scoped_lock lock(mtx);
229 bes = event_batch_sink;
233 mp(stream_name, std::move(f));
237 auto events = process_frame(stream_name, std::move(f));
248 for (
const auto& e : events) {
254 const std::string& stream_name
256 start_stream(stream_name);
260 frame_processor_fn fn
262 std::scoped_lock lock(mtx);
263 frame_processor = std::move(fn);
266std::vector<yodau::backend::event>
268 const std::string& stream_name,
frame&& f
270 std::shared_ptr<
stream> sp;
271 frame_processor_fn fp;
272 const auto now = std::chrono::steady_clock::now();
276 std::scoped_lock lock(mtx);
277 auto it = streams.find(stream_name);
278 if (it == streams.end() || !frame_processor) {
282 fp = frame_processor;
285 const auto last_it = last_analysis_ts.find(stream_name);
286 if (last_it == last_analysis_ts.end()) {
287 last_analysis_ts[stream_name] = now;
291 = std::chrono::duration_cast<std::chrono::milliseconds>(
292 now - last_it->second
295 if (dt >= analysis_interval_ms) {
296 last_analysis_ts[stream_name] = now;
302 if (!allow || !sp || !fp) {
310 std::scoped_lock lock(mtx);
311 event_sink = std::move(fn);
315 event_batch_sink_fn fn
317 std::scoped_lock lock(mtx);
318 event_batch_sink = std::move(fn);
325 std::scoped_lock lock(mtx);
326 analysis_interval_ms = ms;
330 std::shared_ptr<
stream> sp;
334 std::scoped_lock lock(mtx);
335 if (!daemon_start || daemons.contains(name)) {
339 const auto it = streams.find(name);
340 if (it == streams.end() || !it->second) {
348 if (!is_linux_capture_ok(*sp)) {
356 std::jthread th([
this, name, sp, ds](std::stop_token st)
mutable {
358 *sp, [
this, name](frame&& f) { push_frame(name, std::move(f)); }, st
363 std::scoped_lock lock(mtx);
364 daemons.emplace(name, std::move(th));
370 std::shared_ptr<
stream> sp;
373 std::scoped_lock lock(mtx);
374 const auto it = daemons.find(name);
375 if (it == daemons.end()) {
379 th = std::move(it->second);
382 const auto sit = streams.find(name);
383 if (sit != streams.end()) {
391 std::scoped_lock lock(mtx);
397 const std::string& name
399 std::scoped_lock lock(mtx);
400 return daemons.contains(name);
405 std::scoped_lock lock(mtx);
407 if (interval_ms > 0) {
408 fake_interval_ms = interval_ms;
418 std::jthread th([
this](std::stop_token st) { run_fake_events(st); });
421 std::scoped_lock lock(mtx);
426 fake_thread = std::move(th);
434 std::scoped_lock lock(mtx);
440 fake_enabled =
false;
441 th = std::move(fake_thread);
442 fake_thread = std::jthread();
453 std::scoped_lock lock(mtx);
455 const auto it = lines.find(line_name);
456 if (it == lines.end() || !it->second) {
457 throw std::runtime_error(
"line not found: " + line_name);
460 const auto old_ptr = it->second;
461 auto new_ptr = std::make_shared<
line>(*old_ptr);
464 it->second = new_ptr;
467std::vector<std::shared_ptr<yodau::backend::stream>>
469 std::vector<std::shared_ptr<stream>> snap;
471 std::scoped_lock lock(mtx);
472 snap.reserve(streams.size());
473 for (
auto& sp : streams | std::views::values) {
483 frame_processor_fn& fp, event_sink_fn& es, event_batch_sink_fn& bes
485 std::scoped_lock lock(mtx);
486 fp = frame_processor;
488 bes = event_batch_sink;
492 std::scoped_lock lock(mtx);
493 return fake_interval_ms;
499 while (!st.stop_requested()) {
500 auto snap = snapshot_streams();
502 frame_processor_fn fp;
504 event_batch_sink_fn bes;
505 snapshot_hooks(fp, es, bes);
508 for (
const auto& sp : snap) {
509 auto evs = fp(*sp, dummy);
516 for (
const auto& e : evs) {
523 const int interval = current_fake_interval_ms();
524 std::this_thread::sleep_for(std::chrono::milliseconds(interval));
535 if (p.rfind(
"/dev/video", 0) != 0) {
539 return is_capture_device(p);
Central coordinator for streams, geometry, frame processing and events.
std::function< void(const std::string &stream_name, frame &&f)> manual_push_fn
Hook for manual frame pushing.
Represents a single video stream and its analytic connections.
std::string get_path() const
Get stream path or URL.
stream_type get_type() const
Get the stream transport/source type.
stream_pipeline
Processing pipeline mode for a stream.
std::shared_ptr< line const > line_ptr
Shared, immutable line pointer.
tripwire_dir
Allowed crossing direction for a tripwire.
Polyline / polygon described in percentage coordinates.