diff options
Diffstat (limited to 'python/openvino/runtime/common/pipelines')
6 files changed, 514 insertions, 0 deletions
diff --git a/python/openvino/runtime/common/pipelines/CMakeLists.txt b/python/openvino/runtime/common/pipelines/CMakeLists.txt new file mode 100644 index 0000000..b8b128a --- /dev/null +++ b/python/openvino/runtime/common/pipelines/CMakeLists.txt @@ -0,0 +1,15 @@ +# Copyright (C) 2018-2019 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 +# + +file(GLOB SOURCES ./src/*.cpp) +file(GLOB HEADERS ./include/pipelines/*.h) + +# Create named folders for the sources within the .vcproj +# Empty name lists them directly under the .vcproj +source_group("src" FILES ${SOURCES}) +source_group("include" FILES ${HEADERS}) + +add_library(pipelines STATIC ${SOURCES} ${HEADERS}) +target_include_directories(pipelines PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/include") +target_link_libraries(pipelines PRIVATE openvino::runtime models utils opencv_core opencv_imgproc) diff --git a/python/openvino/runtime/common/pipelines/include/pipelines/async_pipeline.h b/python/openvino/runtime/common/pipelines/include/pipelines/async_pipeline.h new file mode 100644 index 0000000..6661c00 --- /dev/null +++ b/python/openvino/runtime/common/pipelines/include/pipelines/async_pipeline.h @@ -0,0 +1,121 @@ +/* +// Copyright (C) 2020-2022 Intel Corporation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +*/ + +#pragma once +#include <stdint.h> + +#include <condition_variable> +#include <exception> +#include <memory> +#include <mutex> +#include <unordered_map> + +#include <openvino/openvino.hpp> + +#include <models/results.h> +#include <utils/performance_metrics.hpp> + +#include "pipelines/requests_pool.h" + +class ModelBase; +struct InputData; +struct MetaData; +struct ModelConfig; + +/// This is base class for asynchronous pipeline +/// Derived classes should add functions for data submission and output processing +class AsyncPipeline { +public: + /// Loads model and performs required initialization + /// @param modelInstance pointer to model object. Object it points to should not be destroyed manually after passing + /// pointer to this function. + /// @param config - fine tuning configuration for model + /// @param core - reference to ov::Core instance to use. + /// If it is omitted, new instance of ov::Core will be created inside. + AsyncPipeline(std::unique_ptr<ModelBase>&& modelInstance, const ModelConfig& config, ov::Core& core); + virtual ~AsyncPipeline(); + + /// Waits until either output data becomes available or pipeline allows to submit more input data. + /// @param shouldKeepOrder if true, function will treat results as ready only if next sequential result (frame) is + /// ready (so results can be extracted in the same order as they were submitted). Otherwise, function will return if + /// any result is ready. + void waitForData(bool shouldKeepOrder = true); + + /// @returns true if there's available infer requests in the pool + /// and next frame can be submitted for processing, false otherwise. + bool isReadyToProcess() { + return requestsPool->isIdleRequestAvailable(); + } + + /// Waits for all currently submitted requests to be completed. + /// + void waitForTotalCompletion() { + if (requestsPool) + requestsPool->waitForTotalCompletion(); + } + + /// Submits data to the model for inference + /// @param inputData - input data to be submitted + /// @param metaData - shared pointer to metadata container. + /// Might be null. This pointer will be passed through pipeline and put to the final result structure. + /// @returns -1 if image cannot be scheduled for processing (there's no free InferRequest available). + /// Otherwise returns unique sequential frame ID for this particular request. Same frame ID will be written in the + /// result structure. + virtual int64_t submitData(const InputData& inputData, const std::shared_ptr<MetaData>& metaData); + + /// Gets available data from the queue + /// @param shouldKeepOrder if true, function will treat results as ready only if next sequential result (frame) is + /// ready (so results can be extracted in the same order as they were submitted). Otherwise, function will return if + /// any result is ready. + virtual std::unique_ptr<ResultBase> getResult(bool shouldKeepOrder = true); + + PerformanceMetrics getInferenceMetircs() { + return inferenceMetrics; + } + PerformanceMetrics getPreprocessMetrics() { + return preprocessMetrics; + } + PerformanceMetrics getPostprocessMetrics() { + return postprocessMetrics; + } + +protected: + /// Returns processed result, if available + /// @param shouldKeepOrder if true, function will return processed data sequentially, + /// keeping original frames order (as they were submitted). Otherwise, function will return processed data in random + /// order. + /// @returns InferenceResult with processed information or empty InferenceResult (with negative frameID) if there's + /// no any results yet. + virtual InferenceResult getInferenceResult(bool shouldKeepOrder); + + std::unique_ptr<RequestsPool> requestsPool; + std::unordered_map<int64_t, InferenceResult> completedInferenceResults; + + ov::CompiledModel compiledModel; + + std::mutex mtx; + std::condition_variable condVar; + + int64_t inputFrameId = 0; + int64_t outputFrameId = 0; + + std::exception_ptr callbackException = nullptr; + + std::unique_ptr<ModelBase> model; + PerformanceMetrics inferenceMetrics; + PerformanceMetrics preprocessMetrics; + PerformanceMetrics postprocessMetrics; +}; diff --git a/python/openvino/runtime/common/pipelines/include/pipelines/metadata.h b/python/openvino/runtime/common/pipelines/include/pipelines/metadata.h new file mode 100644 index 0000000..aca18ee --- /dev/null +++ b/python/openvino/runtime/common/pipelines/include/pipelines/metadata.h @@ -0,0 +1,51 @@ +/* +// Copyright (C) 2018-2020 Intel Corporation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +*/ + +#pragma once +#include <utils/ocv_common.hpp> + +struct MetaData { + virtual ~MetaData() {} + + template <class T> + T& asRef() { + return dynamic_cast<T&>(*this); + } + + template <class T> + const T& asRef() const { + return dynamic_cast<const T&>(*this); + } +}; + +struct ImageMetaData : public MetaData { + cv::Mat img; + std::chrono::steady_clock::time_point timeStamp; + + ImageMetaData() {} + + ImageMetaData(cv::Mat img, std::chrono::steady_clock::time_point timeStamp) : img(img), timeStamp(timeStamp) {} +}; + +struct ClassificationImageMetaData : public ImageMetaData { + unsigned int groundTruthId; + + ClassificationImageMetaData(cv::Mat img, + std::chrono::steady_clock::time_point timeStamp, + unsigned int groundTruthId) + : ImageMetaData(img, timeStamp), + groundTruthId(groundTruthId) {} +}; diff --git a/python/openvino/runtime/common/pipelines/include/pipelines/requests_pool.h b/python/openvino/runtime/common/pipelines/include/pipelines/requests_pool.h new file mode 100644 index 0000000..d9b220e --- /dev/null +++ b/python/openvino/runtime/common/pipelines/include/pipelines/requests_pool.h @@ -0,0 +1,67 @@ +/* +// Copyright (C) 2020-2022 Intel Corporation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +*/ + +#pragma once + +#include <stddef.h> + +#include <mutex> +#include <utility> +#include <vector> + +#include <openvino/openvino.hpp> + +/// This is class storing requests pool for asynchronous pipeline +/// +class RequestsPool { +public: + RequestsPool(ov::CompiledModel& compiledModel, unsigned int size); + ~RequestsPool(); + + /// Returns idle request from the pool. Returned request is automatically marked as In Use (this status will be + /// reset after request processing completion) This function is thread safe as long as request is used only until + /// setRequestIdle call + /// @returns pointer to request with idle state or nullptr if all requests are in use. + ov::InferRequest getIdleRequest(); + + /// Sets particular request to Idle state + /// This function is thread safe as long as request provided is not used after call to this function + /// @param request - request to be returned to idle state + void setRequestIdle(const ov::InferRequest& request); + + /// Returns number of requests in use. This function is thread safe. + /// @returns number of requests in use + size_t getInUseRequestsCount(); + + /// Returns number of requests in use. This function is thread safe. + /// @returns number of requests in use + bool isIdleRequestAvailable(); + + /// Waits for completion of every non-idle requests in pool. + /// getIdleRequest should not be called together with this function or after it to avoid race condition or invalid + /// state + /// @returns number of requests in use + void waitForTotalCompletion(); + + /// Returns list of all infer requests in the pool. + /// @returns list of all infer requests in the pool. + std::vector<ov::InferRequest> getInferRequestsList(); + +private: + std::vector<std::pair<ov::InferRequest, bool>> requests; + size_t numRequestsInUse; + std::mutex mtx; +}; diff --git a/python/openvino/runtime/common/pipelines/src/async_pipeline.cpp b/python/openvino/runtime/common/pipelines/src/async_pipeline.cpp new file mode 100644 index 0000000..3259280 --- /dev/null +++ b/python/openvino/runtime/common/pipelines/src/async_pipeline.cpp @@ -0,0 +1,166 @@ +/* +// Copyright (C) 2020-2022 Intel Corporation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +*/ + +#include "pipelines/async_pipeline.h" + +#include <chrono> +#include <cstdint> +#include <map> +#include <memory> +#include <stdexcept> +#include <string> +#include <utility> +#include <vector> + +#include <openvino/openvino.hpp> + +#include <models/model_base.h> +#include <models/results.h> +#include <utils/config_factory.h> +#include <utils/performance_metrics.hpp> +#include <utils/slog.hpp> + +struct InputData; +struct MetaData; + +AsyncPipeline::AsyncPipeline(std::unique_ptr<ModelBase>&& modelInstance, const ModelConfig& config, ov::Core& core) + : model(std::move(modelInstance)) { + compiledModel = model->compileModel(config, core); + // --------------------------- Create infer requests ------------------------------------------------ + unsigned int nireq = config.maxAsyncRequests; + if (nireq == 0) { + try { + nireq = compiledModel.get_property(ov::optimal_number_of_infer_requests); + } catch (const ov::Exception& ex) { + throw std::runtime_error( + std::string("Every device used with the demo should support compiled model's property " + "'OPTIMAL_NUMBER_OF_INFER_REQUESTS'. Failed to query the property with error: ") + + ex.what()); + } + } + slog::info << "\tNumber of inference requests: " << nireq << slog::endl; + requestsPool.reset(new RequestsPool(compiledModel, nireq)); + // --------------------------- Call onLoadCompleted to complete initialization of model ------------- + model->onLoadCompleted(requestsPool->getInferRequestsList()); +} + +AsyncPipeline::~AsyncPipeline() { + waitForTotalCompletion(); +} + +void AsyncPipeline::waitForData(bool shouldKeepOrder) { + std::unique_lock<std::mutex> lock(mtx); + + condVar.wait(lock, [&]() { + return callbackException != nullptr || requestsPool->isIdleRequestAvailable() || + (shouldKeepOrder ? completedInferenceResults.find(outputFrameId) != completedInferenceResults.end() + : !completedInferenceResults.empty()); + }); + + if (callbackException) { + std::rethrow_exception(callbackException); + } +} + +int64_t AsyncPipeline::submitData(const InputData& inputData, const std::shared_ptr<MetaData>& metaData) { + auto frameID = inputFrameId; + + auto request = requestsPool->getIdleRequest(); + if (!request) { + return -1; + } + + auto startTime = std::chrono::steady_clock::now(); + auto internalModelData = model->preprocess(inputData, request); + preprocessMetrics.update(startTime); + + request.set_callback( + [this, request, frameID, internalModelData, metaData, startTime](std::exception_ptr ex) mutable { + { + const std::lock_guard<std::mutex> lock(mtx); + inferenceMetrics.update(startTime); + try { + if (ex) { + std::rethrow_exception(ex); + } + InferenceResult result; + + result.frameId = frameID; + result.metaData = std::move(metaData); + result.internalModelData = std::move(internalModelData); + + for (const auto& outName : model->getOutputsNames()) { + auto tensor = request.get_tensor(outName); + result.outputsData.emplace(outName, tensor); + } + + completedInferenceResults.emplace(frameID, result); + requestsPool->setRequestIdle(request); + } catch (...) { + if (!callbackException) { + callbackException = std::current_exception(); + } + } + } + condVar.notify_one(); + }); + + inputFrameId++; + if (inputFrameId < 0) + inputFrameId = 0; + + request.start_async(); + + return frameID; +} + +std::unique_ptr<ResultBase> AsyncPipeline::getResult(bool shouldKeepOrder) { + auto infResult = AsyncPipeline::getInferenceResult(shouldKeepOrder); + if (infResult.IsEmpty()) { + return std::unique_ptr<ResultBase>(); + } + auto startTime = std::chrono::steady_clock::now(); + auto result = model->postprocess(infResult); + postprocessMetrics.update(startTime); + + *result = static_cast<ResultBase&>(infResult); + return result; +} + +InferenceResult AsyncPipeline::getInferenceResult(bool shouldKeepOrder) { + InferenceResult retVal; + { + const std::lock_guard<std::mutex> lock(mtx); + + const auto& it = + shouldKeepOrder ? completedInferenceResults.find(outputFrameId) : completedInferenceResults.begin(); + + if (it != completedInferenceResults.end()) { + retVal = std::move(it->second); + completedInferenceResults.erase(it); + } + } + + if (!retVal.IsEmpty()) { + outputFrameId = retVal.frameId; + outputFrameId++; + if (outputFrameId < 0) { + outputFrameId = 0; + } + } + + return retVal; +} diff --git a/python/openvino/runtime/common/pipelines/src/requests_pool.cpp b/python/openvino/runtime/common/pipelines/src/requests_pool.cpp new file mode 100644 index 0000000..93230c9 --- /dev/null +++ b/python/openvino/runtime/common/pipelines/src/requests_pool.cpp @@ -0,0 +1,94 @@ +/* +// Copyright (C) 2020-2022 Intel Corporation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +*/ + +#include "pipelines/requests_pool.h" + +#include <algorithm> +#include <exception> +#include <vector> + +#include <openvino/openvino.hpp> + +RequestsPool::RequestsPool(ov::CompiledModel& compiledModel, unsigned int size) : numRequestsInUse(0) { + for (unsigned int infReqId = 0; infReqId < size; ++infReqId) { + requests.emplace_back(compiledModel.create_infer_request(), false); + } +} + +RequestsPool::~RequestsPool() { + // Setting empty callback to free resources allocated for previously assigned lambdas + for (auto& pair : requests) { + pair.first.set_callback([](std::exception_ptr) {}); + } +} + +ov::InferRequest RequestsPool::getIdleRequest() { + std::lock_guard<std::mutex> lock(mtx); + + const auto& it = std::find_if(requests.begin(), requests.end(), [](const std::pair<ov::InferRequest, bool>& x) { + return !x.second; + }); + if (it == requests.end()) { + return ov::InferRequest(); + } else { + it->second = true; + numRequestsInUse++; + return it->first; + } +} + +void RequestsPool::setRequestIdle(const ov::InferRequest& request) { + std::lock_guard<std::mutex> lock(mtx); + const auto& it = std::find_if(this->requests.begin(), + this->requests.end(), + [&request](const std::pair<ov::InferRequest, bool>& x) { + return x.first == request; + }); + it->second = false; + numRequestsInUse--; +} + +size_t RequestsPool::getInUseRequestsCount() { + std::lock_guard<std::mutex> lock(mtx); + return numRequestsInUse; +} + +bool RequestsPool::isIdleRequestAvailable() { + std::lock_guard<std::mutex> lock(mtx); + return numRequestsInUse < requests.size(); +} + +void RequestsPool::waitForTotalCompletion() { + // Do not synchronize here to avoid deadlock (despite synchronization in other functions) + // Request status will be changed to idle in callback, + // upon completion of request we're waiting for. Synchronization is applied there + for (auto pair : requests) { + if (pair.second) { + pair.first.wait(); + } + } +} + +std::vector<ov::InferRequest> RequestsPool::getInferRequestsList() { + std::lock_guard<std::mutex> lock(mtx); + std::vector<ov::InferRequest> retVal; + retVal.reserve(requests.size()); + for (auto& pair : requests) { + retVal.push_back(pair.first); + } + + return retVal; +} |
