summaryrefslogtreecommitdiff
path: root/python/openvino/runtime/common/pipelines
diff options
context:
space:
mode:
Diffstat (limited to 'python/openvino/runtime/common/pipelines')
-rw-r--r--python/openvino/runtime/common/pipelines/CMakeLists.txt15
-rw-r--r--python/openvino/runtime/common/pipelines/include/pipelines/async_pipeline.h121
-rw-r--r--python/openvino/runtime/common/pipelines/include/pipelines/metadata.h51
-rw-r--r--python/openvino/runtime/common/pipelines/include/pipelines/requests_pool.h67
-rw-r--r--python/openvino/runtime/common/pipelines/src/async_pipeline.cpp166
-rw-r--r--python/openvino/runtime/common/pipelines/src/requests_pool.cpp94
6 files changed, 514 insertions, 0 deletions
diff --git a/python/openvino/runtime/common/pipelines/CMakeLists.txt b/python/openvino/runtime/common/pipelines/CMakeLists.txt
new file mode 100644
index 0000000..b8b128a
--- /dev/null
+++ b/python/openvino/runtime/common/pipelines/CMakeLists.txt
@@ -0,0 +1,15 @@
+# Copyright (C) 2018-2019 Intel Corporation
+# SPDX-License-Identifier: Apache-2.0
+#
+
+file(GLOB SOURCES ./src/*.cpp)
+file(GLOB HEADERS ./include/pipelines/*.h)
+
+# Create named folders for the sources within the .vcproj
+# Empty name lists them directly under the .vcproj
+source_group("src" FILES ${SOURCES})
+source_group("include" FILES ${HEADERS})
+
+add_library(pipelines STATIC ${SOURCES} ${HEADERS})
+target_include_directories(pipelines PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/include")
+target_link_libraries(pipelines PRIVATE openvino::runtime models utils opencv_core opencv_imgproc)
diff --git a/python/openvino/runtime/common/pipelines/include/pipelines/async_pipeline.h b/python/openvino/runtime/common/pipelines/include/pipelines/async_pipeline.h
new file mode 100644
index 0000000..6661c00
--- /dev/null
+++ b/python/openvino/runtime/common/pipelines/include/pipelines/async_pipeline.h
@@ -0,0 +1,121 @@
+/*
+// Copyright (C) 2020-2022 Intel Corporation
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+*/
+
+#pragma once
+#include <stdint.h>
+
+#include <condition_variable>
+#include <exception>
+#include <memory>
+#include <mutex>
+#include <unordered_map>
+
+#include <openvino/openvino.hpp>
+
+#include <models/results.h>
+#include <utils/performance_metrics.hpp>
+
+#include "pipelines/requests_pool.h"
+
+class ModelBase;
+struct InputData;
+struct MetaData;
+struct ModelConfig;
+
+/// This is base class for asynchronous pipeline
+/// Derived classes should add functions for data submission and output processing
+class AsyncPipeline {
+public:
+ /// Loads model and performs required initialization
+ /// @param modelInstance pointer to model object. Object it points to should not be destroyed manually after passing
+ /// pointer to this function.
+ /// @param config - fine tuning configuration for model
+ /// @param core - reference to ov::Core instance to use.
+ /// If it is omitted, new instance of ov::Core will be created inside.
+ AsyncPipeline(std::unique_ptr<ModelBase>&& modelInstance, const ModelConfig& config, ov::Core& core);
+ virtual ~AsyncPipeline();
+
+ /// Waits until either output data becomes available or pipeline allows to submit more input data.
+ /// @param shouldKeepOrder if true, function will treat results as ready only if next sequential result (frame) is
+ /// ready (so results can be extracted in the same order as they were submitted). Otherwise, function will return if
+ /// any result is ready.
+ void waitForData(bool shouldKeepOrder = true);
+
+ /// @returns true if there's available infer requests in the pool
+ /// and next frame can be submitted for processing, false otherwise.
+ bool isReadyToProcess() {
+ return requestsPool->isIdleRequestAvailable();
+ }
+
+ /// Waits for all currently submitted requests to be completed.
+ ///
+ void waitForTotalCompletion() {
+ if (requestsPool)
+ requestsPool->waitForTotalCompletion();
+ }
+
+ /// Submits data to the model for inference
+ /// @param inputData - input data to be submitted
+ /// @param metaData - shared pointer to metadata container.
+ /// Might be null. This pointer will be passed through pipeline and put to the final result structure.
+ /// @returns -1 if image cannot be scheduled for processing (there's no free InferRequest available).
+ /// Otherwise returns unique sequential frame ID for this particular request. Same frame ID will be written in the
+ /// result structure.
+ virtual int64_t submitData(const InputData& inputData, const std::shared_ptr<MetaData>& metaData);
+
+ /// Gets available data from the queue
+ /// @param shouldKeepOrder if true, function will treat results as ready only if next sequential result (frame) is
+ /// ready (so results can be extracted in the same order as they were submitted). Otherwise, function will return if
+ /// any result is ready.
+ virtual std::unique_ptr<ResultBase> getResult(bool shouldKeepOrder = true);
+
+ PerformanceMetrics getInferenceMetircs() {
+ return inferenceMetrics;
+ }
+ PerformanceMetrics getPreprocessMetrics() {
+ return preprocessMetrics;
+ }
+ PerformanceMetrics getPostprocessMetrics() {
+ return postprocessMetrics;
+ }
+
+protected:
+ /// Returns processed result, if available
+ /// @param shouldKeepOrder if true, function will return processed data sequentially,
+ /// keeping original frames order (as they were submitted). Otherwise, function will return processed data in random
+ /// order.
+ /// @returns InferenceResult with processed information or empty InferenceResult (with negative frameID) if there's
+ /// no any results yet.
+ virtual InferenceResult getInferenceResult(bool shouldKeepOrder);
+
+ std::unique_ptr<RequestsPool> requestsPool;
+ std::unordered_map<int64_t, InferenceResult> completedInferenceResults;
+
+ ov::CompiledModel compiledModel;
+
+ std::mutex mtx;
+ std::condition_variable condVar;
+
+ int64_t inputFrameId = 0;
+ int64_t outputFrameId = 0;
+
+ std::exception_ptr callbackException = nullptr;
+
+ std::unique_ptr<ModelBase> model;
+ PerformanceMetrics inferenceMetrics;
+ PerformanceMetrics preprocessMetrics;
+ PerformanceMetrics postprocessMetrics;
+};
diff --git a/python/openvino/runtime/common/pipelines/include/pipelines/metadata.h b/python/openvino/runtime/common/pipelines/include/pipelines/metadata.h
new file mode 100644
index 0000000..aca18ee
--- /dev/null
+++ b/python/openvino/runtime/common/pipelines/include/pipelines/metadata.h
@@ -0,0 +1,51 @@
+/*
+// Copyright (C) 2018-2020 Intel Corporation
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+*/
+
+#pragma once
+#include <utils/ocv_common.hpp>
+
+struct MetaData {
+ virtual ~MetaData() {}
+
+ template <class T>
+ T& asRef() {
+ return dynamic_cast<T&>(*this);
+ }
+
+ template <class T>
+ const T& asRef() const {
+ return dynamic_cast<const T&>(*this);
+ }
+};
+
+struct ImageMetaData : public MetaData {
+ cv::Mat img;
+ std::chrono::steady_clock::time_point timeStamp;
+
+ ImageMetaData() {}
+
+ ImageMetaData(cv::Mat img, std::chrono::steady_clock::time_point timeStamp) : img(img), timeStamp(timeStamp) {}
+};
+
+struct ClassificationImageMetaData : public ImageMetaData {
+ unsigned int groundTruthId;
+
+ ClassificationImageMetaData(cv::Mat img,
+ std::chrono::steady_clock::time_point timeStamp,
+ unsigned int groundTruthId)
+ : ImageMetaData(img, timeStamp),
+ groundTruthId(groundTruthId) {}
+};
diff --git a/python/openvino/runtime/common/pipelines/include/pipelines/requests_pool.h b/python/openvino/runtime/common/pipelines/include/pipelines/requests_pool.h
new file mode 100644
index 0000000..d9b220e
--- /dev/null
+++ b/python/openvino/runtime/common/pipelines/include/pipelines/requests_pool.h
@@ -0,0 +1,67 @@
+/*
+// Copyright (C) 2020-2022 Intel Corporation
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+*/
+
+#pragma once
+
+#include <stddef.h>
+
+#include <mutex>
+#include <utility>
+#include <vector>
+
+#include <openvino/openvino.hpp>
+
+/// This is class storing requests pool for asynchronous pipeline
+///
+class RequestsPool {
+public:
+ RequestsPool(ov::CompiledModel& compiledModel, unsigned int size);
+ ~RequestsPool();
+
+ /// Returns idle request from the pool. Returned request is automatically marked as In Use (this status will be
+ /// reset after request processing completion) This function is thread safe as long as request is used only until
+ /// setRequestIdle call
+ /// @returns pointer to request with idle state or nullptr if all requests are in use.
+ ov::InferRequest getIdleRequest();
+
+ /// Sets particular request to Idle state
+ /// This function is thread safe as long as request provided is not used after call to this function
+ /// @param request - request to be returned to idle state
+ void setRequestIdle(const ov::InferRequest& request);
+
+ /// Returns number of requests in use. This function is thread safe.
+ /// @returns number of requests in use
+ size_t getInUseRequestsCount();
+
+ /// Returns number of requests in use. This function is thread safe.
+ /// @returns number of requests in use
+ bool isIdleRequestAvailable();
+
+ /// Waits for completion of every non-idle requests in pool.
+ /// getIdleRequest should not be called together with this function or after it to avoid race condition or invalid
+ /// state
+ /// @returns number of requests in use
+ void waitForTotalCompletion();
+
+ /// Returns list of all infer requests in the pool.
+ /// @returns list of all infer requests in the pool.
+ std::vector<ov::InferRequest> getInferRequestsList();
+
+private:
+ std::vector<std::pair<ov::InferRequest, bool>> requests;
+ size_t numRequestsInUse;
+ std::mutex mtx;
+};
diff --git a/python/openvino/runtime/common/pipelines/src/async_pipeline.cpp b/python/openvino/runtime/common/pipelines/src/async_pipeline.cpp
new file mode 100644
index 0000000..3259280
--- /dev/null
+++ b/python/openvino/runtime/common/pipelines/src/async_pipeline.cpp
@@ -0,0 +1,166 @@
+/*
+// Copyright (C) 2020-2022 Intel Corporation
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+*/
+
+#include "pipelines/async_pipeline.h"
+
+#include <chrono>
+#include <cstdint>
+#include <map>
+#include <memory>
+#include <stdexcept>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include <openvino/openvino.hpp>
+
+#include <models/model_base.h>
+#include <models/results.h>
+#include <utils/config_factory.h>
+#include <utils/performance_metrics.hpp>
+#include <utils/slog.hpp>
+
+struct InputData;
+struct MetaData;
+
+AsyncPipeline::AsyncPipeline(std::unique_ptr<ModelBase>&& modelInstance, const ModelConfig& config, ov::Core& core)
+ : model(std::move(modelInstance)) {
+ compiledModel = model->compileModel(config, core);
+ // --------------------------- Create infer requests ------------------------------------------------
+ unsigned int nireq = config.maxAsyncRequests;
+ if (nireq == 0) {
+ try {
+ nireq = compiledModel.get_property(ov::optimal_number_of_infer_requests);
+ } catch (const ov::Exception& ex) {
+ throw std::runtime_error(
+ std::string("Every device used with the demo should support compiled model's property "
+ "'OPTIMAL_NUMBER_OF_INFER_REQUESTS'. Failed to query the property with error: ") +
+ ex.what());
+ }
+ }
+ slog::info << "\tNumber of inference requests: " << nireq << slog::endl;
+ requestsPool.reset(new RequestsPool(compiledModel, nireq));
+ // --------------------------- Call onLoadCompleted to complete initialization of model -------------
+ model->onLoadCompleted(requestsPool->getInferRequestsList());
+}
+
+AsyncPipeline::~AsyncPipeline() {
+ waitForTotalCompletion();
+}
+
+void AsyncPipeline::waitForData(bool shouldKeepOrder) {
+ std::unique_lock<std::mutex> lock(mtx);
+
+ condVar.wait(lock, [&]() {
+ return callbackException != nullptr || requestsPool->isIdleRequestAvailable() ||
+ (shouldKeepOrder ? completedInferenceResults.find(outputFrameId) != completedInferenceResults.end()
+ : !completedInferenceResults.empty());
+ });
+
+ if (callbackException) {
+ std::rethrow_exception(callbackException);
+ }
+}
+
+int64_t AsyncPipeline::submitData(const InputData& inputData, const std::shared_ptr<MetaData>& metaData) {
+ auto frameID = inputFrameId;
+
+ auto request = requestsPool->getIdleRequest();
+ if (!request) {
+ return -1;
+ }
+
+ auto startTime = std::chrono::steady_clock::now();
+ auto internalModelData = model->preprocess(inputData, request);
+ preprocessMetrics.update(startTime);
+
+ request.set_callback(
+ [this, request, frameID, internalModelData, metaData, startTime](std::exception_ptr ex) mutable {
+ {
+ const std::lock_guard<std::mutex> lock(mtx);
+ inferenceMetrics.update(startTime);
+ try {
+ if (ex) {
+ std::rethrow_exception(ex);
+ }
+ InferenceResult result;
+
+ result.frameId = frameID;
+ result.metaData = std::move(metaData);
+ result.internalModelData = std::move(internalModelData);
+
+ for (const auto& outName : model->getOutputsNames()) {
+ auto tensor = request.get_tensor(outName);
+ result.outputsData.emplace(outName, tensor);
+ }
+
+ completedInferenceResults.emplace(frameID, result);
+ requestsPool->setRequestIdle(request);
+ } catch (...) {
+ if (!callbackException) {
+ callbackException = std::current_exception();
+ }
+ }
+ }
+ condVar.notify_one();
+ });
+
+ inputFrameId++;
+ if (inputFrameId < 0)
+ inputFrameId = 0;
+
+ request.start_async();
+
+ return frameID;
+}
+
+std::unique_ptr<ResultBase> AsyncPipeline::getResult(bool shouldKeepOrder) {
+ auto infResult = AsyncPipeline::getInferenceResult(shouldKeepOrder);
+ if (infResult.IsEmpty()) {
+ return std::unique_ptr<ResultBase>();
+ }
+ auto startTime = std::chrono::steady_clock::now();
+ auto result = model->postprocess(infResult);
+ postprocessMetrics.update(startTime);
+
+ *result = static_cast<ResultBase&>(infResult);
+ return result;
+}
+
+InferenceResult AsyncPipeline::getInferenceResult(bool shouldKeepOrder) {
+ InferenceResult retVal;
+ {
+ const std::lock_guard<std::mutex> lock(mtx);
+
+ const auto& it =
+ shouldKeepOrder ? completedInferenceResults.find(outputFrameId) : completedInferenceResults.begin();
+
+ if (it != completedInferenceResults.end()) {
+ retVal = std::move(it->second);
+ completedInferenceResults.erase(it);
+ }
+ }
+
+ if (!retVal.IsEmpty()) {
+ outputFrameId = retVal.frameId;
+ outputFrameId++;
+ if (outputFrameId < 0) {
+ outputFrameId = 0;
+ }
+ }
+
+ return retVal;
+}
diff --git a/python/openvino/runtime/common/pipelines/src/requests_pool.cpp b/python/openvino/runtime/common/pipelines/src/requests_pool.cpp
new file mode 100644
index 0000000..93230c9
--- /dev/null
+++ b/python/openvino/runtime/common/pipelines/src/requests_pool.cpp
@@ -0,0 +1,94 @@
+/*
+// Copyright (C) 2020-2022 Intel Corporation
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+*/
+
+#include "pipelines/requests_pool.h"
+
+#include <algorithm>
+#include <exception>
+#include <vector>
+
+#include <openvino/openvino.hpp>
+
+RequestsPool::RequestsPool(ov::CompiledModel& compiledModel, unsigned int size) : numRequestsInUse(0) {
+ for (unsigned int infReqId = 0; infReqId < size; ++infReqId) {
+ requests.emplace_back(compiledModel.create_infer_request(), false);
+ }
+}
+
+RequestsPool::~RequestsPool() {
+ // Setting empty callback to free resources allocated for previously assigned lambdas
+ for (auto& pair : requests) {
+ pair.first.set_callback([](std::exception_ptr) {});
+ }
+}
+
+ov::InferRequest RequestsPool::getIdleRequest() {
+ std::lock_guard<std::mutex> lock(mtx);
+
+ const auto& it = std::find_if(requests.begin(), requests.end(), [](const std::pair<ov::InferRequest, bool>& x) {
+ return !x.second;
+ });
+ if (it == requests.end()) {
+ return ov::InferRequest();
+ } else {
+ it->second = true;
+ numRequestsInUse++;
+ return it->first;
+ }
+}
+
+void RequestsPool::setRequestIdle(const ov::InferRequest& request) {
+ std::lock_guard<std::mutex> lock(mtx);
+ const auto& it = std::find_if(this->requests.begin(),
+ this->requests.end(),
+ [&request](const std::pair<ov::InferRequest, bool>& x) {
+ return x.first == request;
+ });
+ it->second = false;
+ numRequestsInUse--;
+}
+
+size_t RequestsPool::getInUseRequestsCount() {
+ std::lock_guard<std::mutex> lock(mtx);
+ return numRequestsInUse;
+}
+
+bool RequestsPool::isIdleRequestAvailable() {
+ std::lock_guard<std::mutex> lock(mtx);
+ return numRequestsInUse < requests.size();
+}
+
+void RequestsPool::waitForTotalCompletion() {
+ // Do not synchronize here to avoid deadlock (despite synchronization in other functions)
+ // Request status will be changed to idle in callback,
+ // upon completion of request we're waiting for. Synchronization is applied there
+ for (auto pair : requests) {
+ if (pair.second) {
+ pair.first.wait();
+ }
+ }
+}
+
+std::vector<ov::InferRequest> RequestsPool::getInferRequestsList() {
+ std::lock_guard<std::mutex> lock(mtx);
+ std::vector<ov::InferRequest> retVal;
+ retVal.reserve(requests.size());
+ for (auto& pair : requests) {
+ retVal.push_back(pair.first);
+ }
+
+ return retVal;
+}