diff options
| author | Eric Dao <eric@erickhangdao.com> | 2025-03-10 17:54:31 -0400 |
|---|---|---|
| committer | Eric Dao <eric@erickhangdao.com> | 2025-03-10 17:54:31 -0400 |
| commit | ab224e2e6ba65f5a369ec392f99cd8845ad06c98 (patch) | |
| tree | a1e757e9341863ed52b8ad4c5a1c45933aab9da4 /python/openvino/runtime/common/pipelines/src/async_pipeline.cpp | |
| parent | 40da1752f2c8639186b72f6838aa415e854d0b1d (diff) | |
| download | thesis-master.tar.gz thesis-master.tar.bz2 thesis-master.zip | |
Diffstat (limited to 'python/openvino/runtime/common/pipelines/src/async_pipeline.cpp')
| -rw-r--r-- | python/openvino/runtime/common/pipelines/src/async_pipeline.cpp | 166 |
1 files changed, 166 insertions, 0 deletions
diff --git a/python/openvino/runtime/common/pipelines/src/async_pipeline.cpp b/python/openvino/runtime/common/pipelines/src/async_pipeline.cpp new file mode 100644 index 0000000..3259280 --- /dev/null +++ b/python/openvino/runtime/common/pipelines/src/async_pipeline.cpp @@ -0,0 +1,166 @@ +/* +// Copyright (C) 2020-2022 Intel Corporation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +*/ + +#include "pipelines/async_pipeline.h" + +#include <chrono> +#include <cstdint> +#include <map> +#include <memory> +#include <stdexcept> +#include <string> +#include <utility> +#include <vector> + +#include <openvino/openvino.hpp> + +#include <models/model_base.h> +#include <models/results.h> +#include <utils/config_factory.h> +#include <utils/performance_metrics.hpp> +#include <utils/slog.hpp> + +struct InputData; +struct MetaData; + +AsyncPipeline::AsyncPipeline(std::unique_ptr<ModelBase>&& modelInstance, const ModelConfig& config, ov::Core& core) + : model(std::move(modelInstance)) { + compiledModel = model->compileModel(config, core); + // --------------------------- Create infer requests ------------------------------------------------ + unsigned int nireq = config.maxAsyncRequests; + if (nireq == 0) { + try { + nireq = compiledModel.get_property(ov::optimal_number_of_infer_requests); + } catch (const ov::Exception& ex) { + throw std::runtime_error( + std::string("Every device used with the demo should support compiled model's property " + "'OPTIMAL_NUMBER_OF_INFER_REQUESTS'. Failed to query the property with error: ") + + ex.what()); + } + } + slog::info << "\tNumber of inference requests: " << nireq << slog::endl; + requestsPool.reset(new RequestsPool(compiledModel, nireq)); + // --------------------------- Call onLoadCompleted to complete initialization of model ------------- + model->onLoadCompleted(requestsPool->getInferRequestsList()); +} + +AsyncPipeline::~AsyncPipeline() { + waitForTotalCompletion(); +} + +void AsyncPipeline::waitForData(bool shouldKeepOrder) { + std::unique_lock<std::mutex> lock(mtx); + + condVar.wait(lock, [&]() { + return callbackException != nullptr || requestsPool->isIdleRequestAvailable() || + (shouldKeepOrder ? completedInferenceResults.find(outputFrameId) != completedInferenceResults.end() + : !completedInferenceResults.empty()); + }); + + if (callbackException) { + std::rethrow_exception(callbackException); + } +} + +int64_t AsyncPipeline::submitData(const InputData& inputData, const std::shared_ptr<MetaData>& metaData) { + auto frameID = inputFrameId; + + auto request = requestsPool->getIdleRequest(); + if (!request) { + return -1; + } + + auto startTime = std::chrono::steady_clock::now(); + auto internalModelData = model->preprocess(inputData, request); + preprocessMetrics.update(startTime); + + request.set_callback( + [this, request, frameID, internalModelData, metaData, startTime](std::exception_ptr ex) mutable { + { + const std::lock_guard<std::mutex> lock(mtx); + inferenceMetrics.update(startTime); + try { + if (ex) { + std::rethrow_exception(ex); + } + InferenceResult result; + + result.frameId = frameID; + result.metaData = std::move(metaData); + result.internalModelData = std::move(internalModelData); + + for (const auto& outName : model->getOutputsNames()) { + auto tensor = request.get_tensor(outName); + result.outputsData.emplace(outName, tensor); + } + + completedInferenceResults.emplace(frameID, result); + requestsPool->setRequestIdle(request); + } catch (...) { + if (!callbackException) { + callbackException = std::current_exception(); + } + } + } + condVar.notify_one(); + }); + + inputFrameId++; + if (inputFrameId < 0) + inputFrameId = 0; + + request.start_async(); + + return frameID; +} + +std::unique_ptr<ResultBase> AsyncPipeline::getResult(bool shouldKeepOrder) { + auto infResult = AsyncPipeline::getInferenceResult(shouldKeepOrder); + if (infResult.IsEmpty()) { + return std::unique_ptr<ResultBase>(); + } + auto startTime = std::chrono::steady_clock::now(); + auto result = model->postprocess(infResult); + postprocessMetrics.update(startTime); + + *result = static_cast<ResultBase&>(infResult); + return result; +} + +InferenceResult AsyncPipeline::getInferenceResult(bool shouldKeepOrder) { + InferenceResult retVal; + { + const std::lock_guard<std::mutex> lock(mtx); + + const auto& it = + shouldKeepOrder ? completedInferenceResults.find(outputFrameId) : completedInferenceResults.begin(); + + if (it != completedInferenceResults.end()) { + retVal = std::move(it->second); + completedInferenceResults.erase(it); + } + } + + if (!retVal.IsEmpty()) { + outputFrameId = retVal.frameId; + outputFrameId++; + if (outputFrameId < 0) { + outputFrameId = 0; + } + } + + return retVal; +} |
