// Copyright (C) 2018-2022 Intel Corporation // SPDX-License-Identifier: Apache-2.0 // #pragma once #include #include #include #include #include #include #include #include #include #include #include #include "utils/performance_metrics.hpp" // VideoFrame can represent not a single image but the whole grid class VideoFrame { public: typedef std::shared_ptr Ptr; VideoFrame(unsigned sourceID, int64_t frameId, const cv::Mat& frame = cv::Mat()) : sourceID{sourceID}, frameId{frameId}, frame{frame} {} virtual ~VideoFrame() = default; // A user has to define how it is reconstructed const unsigned sourceID; const int64_t frameId; cv::Mat frame; PerformanceMetrics::TimePoint timestamp; }; class Worker; class Task { public: explicit Task(VideoFrame::Ptr sharedVideoFrame, float priority = 0): sharedVideoFrame{sharedVideoFrame}, priority{priority} {} virtual bool isReady() = 0; virtual void process() = 0; virtual ~Task() = default; std::string name; VideoFrame::Ptr sharedVideoFrame; // it is possible that two tasks try to draw on the same cvMat const float priority; }; struct HigherPriority { bool operator()(const std::shared_ptr& lhs, const std::shared_ptr& rhs) const { return lhs->priority > rhs->priority || (lhs->priority == rhs->priority && lhs->sharedVideoFrame->frameId < rhs->sharedVideoFrame->frameId) || (lhs->priority == rhs->priority && lhs->sharedVideoFrame->frameId == rhs->sharedVideoFrame->frameId && lhs < rhs); } }; class Worker { public: explicit Worker(unsigned threadNum): threadPool(threadNum), running{false} {} ~Worker() { stop(); } void runThreads() { running = true; for (std::thread& t : threadPool) { t = std::thread(&Worker::threadFunc, this); } } void push(std::shared_ptr task) { tasksMutex.lock(); tasks.insert(task); tasksMutex.unlock(); tasksCondVar.notify_one(); } void threadFunc() { while (running) { std::unique_lock lk(tasksMutex); while (running && tasks.empty()) { tasksCondVar.wait(lk); } try { auto it = std::find_if(tasks.begin(), tasks.end(), [](const std::shared_ptr& task){return task->isReady();}); if (tasks.end() != it) { const std::shared_ptr task = std::move(*it); tasks.erase(it); lk.unlock(); task->process(); } } catch (...) { std::lock_guard lock{exceptionMutex}; if (nullptr == currentException) { currentException = std::current_exception(); stop(); } } } } void stop() { running = false; tasksCondVar.notify_all(); } void join() { for (auto& t : threadPool) { t.join(); } if (nullptr != currentException) { std::rethrow_exception(currentException); } } private: std::condition_variable tasksCondVar; std::set, HigherPriority> tasks; std::mutex tasksMutex; std::vector threadPool; std::atomic running; std::exception_ptr currentException; std::mutex exceptionMutex; }; void tryPush(const std::weak_ptr& worker, std::shared_ptr&& task) { try { std::shared_ptr(worker)->push(task); } catch (const std::bad_weak_ptr&) {} } template class ConcurrentContainer { public: C container; mutable std::mutex mutex; bool lockedEmpty() const noexcept { std::lock_guard lock{mutex}; return container.empty(); } typename C::size_type lockedSize() const noexcept { std::lock_guard lock{mutex}; return container.size(); } void lockedPushBack(const typename C::value_type& value) { std::lock_guard lock{mutex}; container.push_back(value); } bool lockedTryPop(typename C::value_type& value) { mutex.lock(); if (!container.empty()) { value = container.back(); container.pop_back(); mutex.unlock(); return true; } else { mutex.unlock(); return false; } } operator C() const { std::lock_guard lock{mutex}; return container; } };