diff options
| author | Eric Dao <eric@erickhangdao.com> | 2025-03-10 17:54:31 -0400 |
|---|---|---|
| committer | Eric Dao <eric@erickhangdao.com> | 2025-03-10 17:54:31 -0400 |
| commit | ab224e2e6ba65f5a369ec392f99cd8845ad06c98 (patch) | |
| tree | a1e757e9341863ed52b8ad4c5a1c45933aab9da4 /python/openvino/runtime/streaming/streaming_inference_app/streaming_inference_app.cpp | |
| parent | 40da1752f2c8639186b72f6838aa415e854d0b1d (diff) | |
| download | thesis-master.tar.gz thesis-master.tar.bz2 thesis-master.zip | |
Diffstat (limited to 'python/openvino/runtime/streaming/streaming_inference_app/streaming_inference_app.cpp')
| -rw-r--r-- | python/openvino/runtime/streaming/streaming_inference_app/streaming_inference_app.cpp | 413 |
1 files changed, 413 insertions, 0 deletions
diff --git a/python/openvino/runtime/streaming/streaming_inference_app/streaming_inference_app.cpp b/python/openvino/runtime/streaming/streaming_inference_app/streaming_inference_app.cpp new file mode 100644 index 0000000..d0e1ed0 --- /dev/null +++ b/python/openvino/runtime/streaming/streaming_inference_app/streaming_inference_app.cpp @@ -0,0 +1,413 @@ +// Copyright 2023 Intel Corporation. +// +// This software and the related documents are Intel copyrighted materials, +// and your use of them is governed by the express license under which they +// were provided to you ("License"). Unless the License provides otherwise, +// you may not use, modify, copy, publish, distribute, disclose or transmit +// this software or the related documents without Intel's prior written +// permission. +// +// This software and the related documents are provided as is, with no express +// or implied warranties, other than those that are expressly stated in the +// License. + +#include "streaming_inference_app.h" +#include <fcntl.h> +#include <signal.h> +#include <sys/utsname.h> +#include <unistd.h> +#include <algorithm> +#include <filesystem> +#include <fstream> +#include <sstream> +#include <thread> +#include "dla_plugin_config.hpp" + +using namespace std::chrono_literals; + +std::ofstream StreamingInferenceApp::_resultsStream("results.txt"); +std::mutex StreamingInferenceApp::_signalMutex; +std::condition_variable StreamingInferenceApp::_signalConditionVariable; +std::chrono::time_point<std::chrono::system_clock> StreamingInferenceApp::_startTime; + +int main(int numParams, char* paramValues[]) { + StreamingInferenceApp app(numParams, paramValues); + + try { + app.Run(); + } catch (const std::exception& ex) { + std::cerr << ex.what() << '\n'; + } + return 0; +} + +StreamingInferenceApp::StreamingInferenceApp(int numParams, char* paramValues[]) + : _commandLine(numParams, paramValues) { + OsStartup(); + LoadClassNames(); +} + +StreamingInferenceApp::~StreamingInferenceApp() { + timespec waitTimeout = {}; + if (_pCancelSemaphore) { + // Reset the cancel semaphore + int r = 0; + do { + r = ::sem_timedwait(_pCancelSemaphore, &waitTimeout); + } while (r == 0); + ::sem_close(_pCancelSemaphore); + } + + if (_pReadyForImageStreamSemaphore) { + // Reset the ready semaphore + int r = 0; + do { + r = ::sem_timedwait(_pReadyForImageStreamSemaphore, &waitTimeout); + } while (r == 0); + ::sem_close(_pReadyForImageStreamSemaphore); + } +} + +void StreamingInferenceApp::Run() { + std::filesystem::path pluginsFilename = "plugins.xml"; + + std::string deviceName; + std::string arch; + std::string model; + + // Get the command line options for the model, arch file, and device + if (not _commandLine.GetOption("model", model) or not _commandLine.GetOption("arch", arch) or + not _commandLine.GetOption("device", deviceName)) { + return Usage(); + } + + std::filesystem::path architectureFilename = arch; + std::filesystem::path compiledModelFilename = model; + + // Check that the provided files do in fact exist + if (not CheckFileExists(architectureFilename, "architecture") or not CheckFileExists(pluginsFilename, "plugins") or + not CheckFileExists(compiledModelFilename, "compiled model")) { + return; + } + + InferenceEngine::Core inferenceEngine(pluginsFilename); + + // Setup CoreDLA private configuration parameters + const std::map<std::string, std::string> configParameters; + inferenceEngine.SetConfig({{DLIAPlugin::properties::arch_path.name(), architectureFilename}}, "FPGA"); + + // If dropSourceBuffers is 0, no input buffers are dropped + // If dropSourceBuffers is 1, then 1 buffer is processed, 1 gets dropped + // If dropSourceBuffers is 2, then 1 buffer is processed, 2 get dropped, etc. + uint32_t dropSourceBuffers = 0; + + inferenceEngine.SetConfig({{DLIAPlugin::properties::streaming_drop_source_buffers.name(), std::to_string(dropSourceBuffers)}, + {DLIAPlugin::properties::external_streaming.name(), CONFIG_VALUE(YES)}}, + "FPGA"); + + std::ifstream inputFile(compiledModelFilename, std::fstream::binary); + if (not inputFile) { + std::cout << "Failed to load compiled model file.\n"; + return; + } + + // Load the model to the device + InferenceEngine::ExecutableNetwork importedNetwork = inferenceEngine.ImportNetwork(inputFile, deviceName, {}); + + // The plugin defines the number of inferences requests required for streaming + uint32_t numStreamingInferenceRequests = importedNetwork.GetMetric(DLIAPlugin::properties::num_streaming_inference_requests.name()).as<uint32_t>(); + const std::string cancelSemaphoreName = importedNetwork.GetMetric(DLIAPlugin::properties::cancel_semaphore_name.name()).as<std::string>(); + _cancelSemaphoreName = cancelSemaphoreName; + + for (uint32_t i = 0; i < numStreamingInferenceRequests; i++) { + auto spInferenceData = std::make_shared<SingleInferenceData>(this, importedNetwork, i); + _inferences.push_back(spInferenceData); + } + + // Start the inference requests. Streaming inferences will reschedule + // themselves when complete + for (auto& inference : _inferences) { + inference->StartAsync(); + } + + std::cout << "Ready to start image input stream.\n"; + + // Signal the image streaming app that we are ready, so it can + // begin transferring files + SetReadyForImageStreamSemaphore(); + + // Wait until Ctrl+C + bool done = false; + while (not done) { + std::unique_lock<std::mutex> lock(_signalMutex); + done = (_signalConditionVariable.wait_for(lock, 1000ms) != std::cv_status::timeout); + } + + SetShutdownSemaphore(); + + for (auto& inference : _inferences) { + inference->Cancel(); + } + + _inferences.clear(); +} + + +void StreamingInferenceApp::SetShutdownSemaphore() { + _pCancelSemaphore = ::sem_open(_cancelSemaphoreName.c_str(), O_CREAT, 0644, 0); + if (_pCancelSemaphore) { + ::sem_post(_pCancelSemaphore); + } +} + + +void StreamingInferenceApp::SetReadyForImageStreamSemaphore() { + _pReadyForImageStreamSemaphore = ::sem_open("/CoreDLA_ready_for_streaming", O_CREAT, 0644, 0); + if (_pReadyForImageStreamSemaphore) { + ::sem_post(_pReadyForImageStreamSemaphore); + } +} + + +/** + * Print a help menu to the console + */ +void StreamingInferenceApp::Usage() { + std::cout << "Usage:\n"; + std::cout << "\tstreaming_inference_app -model=<model> -arch=<arch> -device=<device>\n\n"; + std::cout << "Where:\n"; + std::cout << "\t<model> is the compiled model binary file, eg /home/root/resnet-50-tf/RN50_Performance_no_folding.bin\n"; + std::cout << "\t<arch> is the architecture file, eg /home/root/resnet-50-tf/A10_Performance.arch\n"; + std::cout << "\t<device> is the OpenVINO device ID, eg HETERO:FPGA or HETERO:FPGA,CPU\n"; +} + + +/** + * Check that a file exists + * + * @param[in] filename Filename to check + * @param[in] message Description of file to display if it does not exist + * @returns true if the file exists, false otherwise + */ +bool StreamingInferenceApp::CheckFileExists(const std::filesystem::path& filename, const std::string& message) { + if (not std::filesystem::exists(filename)) { + std::cout << "Can't find " << message << ", '" << filename.c_str() << "'\n"; + return false; + } + + return true; +} + +//////////// + +std::atomic<uint32_t> SingleInferenceData::_atomic{0}; +uint32_t SingleInferenceData::_numResults = 0; + +SingleInferenceData::SingleInferenceData(StreamingInferenceApp* pApp, + InferenceEngine::ExecutableNetwork& importedNetwork, + uint32_t index) + : _pApp(pApp), _importedNetwork(importedNetwork), _index(index), _inferenceCount(0) { + // Set up output blob + InferenceEngine::ConstOutputsDataMap outputsInfo = importedNetwork.GetOutputsInfo(); + std::shared_ptr<const InferenceEngine::Data> spOutputInfo = outputsInfo.begin()->second; + std::string outputName = outputsInfo.begin()->first; + + _spOutputBlob = CreateOutputBlob(spOutputInfo); + + // Create an inference request and set its completion callback + _inferenceRequest = importedNetwork.CreateInferRequest(); + auto inferenceRequestCompleteCB = [=]() { ProcessResult(); }; + _inferenceRequest.SetCompletionCallback(inferenceRequestCompleteCB); + + // Assign the output blob to the inference request + _inferenceRequest.SetBlob(outputName, _spOutputBlob); +} + + +std::shared_ptr<InferenceEngine::Blob> SingleInferenceData::CreateOutputBlob( + std::shared_ptr<const InferenceEngine::Data> spOutputInfo) { + const InferenceEngine::TensorDesc& outputTensorDesc = spOutputInfo->getTensorDesc(); + std::shared_ptr<InferenceEngine::Blob> pOutputBob = InferenceEngine::make_shared_blob<float>(outputTensorDesc); + pOutputBob->allocate(); + + InferenceEngine::MemoryBlob::Ptr pMemoryBlob = InferenceEngine::as<InferenceEngine::MemoryBlob>(pOutputBob); + if (pMemoryBlob) { + auto lockedMemory = pMemoryBlob->wmap(); + float* pOutputBlobData = lockedMemory.as<float*>(); + if (pOutputBlobData) { + size_t outputSize = pOutputBob->size(); + for (size_t i = 0; i < outputSize; i++) { + pOutputBlobData[i] = 0.0f; + } + } + } + + return pOutputBob; +} + + +void SingleInferenceData::StartAsync() { + _inferenceCount = _atomic++; + _inferenceRequest.StartAsync(); +} + +void SingleInferenceData::Wait() { _inferenceRequest.Wait(); } + +void SingleInferenceData::Cancel() { _inferenceRequest.Cancel(); } + + +/** + * Stores the results of an inference + * + * The index corresponds to the category of the image, and the score is + * the confidence level of the image. + */ +class ResultItem { + public: + uint32_t _index; + float _score; + bool operator<(const ResultItem& other) { return (_score > other._score); } +}; + + +/** + * Called when inference request has completed + * + * The inference results are floating point numbers consisting of the score for each category. + * The scores are then sorted and the highest is written to the console. The top 5 scores of the + * first 1000 images are saved to results.txt. + * + * Set as a callback in SingleInferenceData() + */ +void SingleInferenceData::ProcessResult() { + if (_pApp and _pApp->IsCancelling()) { + return; + } + + // Increment the number of inference results that have returned thus far + _numResults++; + + // If this is the first returned inference, store the current time to calculate the inference rate + if (_numResults == 1) { + StreamingInferenceApp::_startTime = std::chrono::system_clock::now(); + } else if (_numResults == 101) { + // The inference rate is calculated afer 100 results have been received + auto endTime = std::chrono::system_clock::now(); + auto duration = endTime - StreamingInferenceApp::_startTime; + double durationMS = (double)std::chrono::duration_cast<std::chrono::milliseconds>(duration).count(); + double durationSecondsOne = durationMS / 100000.0; + double rate = 1.0 / durationSecondsOne; + std::cout << "Inference rate = " << rate << '\n'; + } + + // Create a float pointer to the returned data + size_t outputSize = _spOutputBlob->size(); + float* pOutputData = _spOutputBlob->buffer().as<float*>(); + if (!pOutputData) { + return; + } + + // Store each score as a ResultItem + std::vector<ResultItem> results; + for (size_t i = 0; i < outputSize; i++) { + results.push_back({(uint32_t)i, pOutputData[i]}); + } + + // Sort the scores and set up the output streams + std::sort(results.begin(), results.end()); + std::stringstream fileString; + std::stringstream outString; + bool flushFile = false; + + // Store the top 5 results of the first 1000 images to be written to a file + if (_numResults <= 1000) { + fileString << "Result: image[" << _numResults << "]\n"; + fileString << std::fixed << std::setprecision(1); + + for (size_t i = 0; i < 5; i++) { + std::string className = _pApp->_imageNetClasses[results[i]._index]; + float score = results[i]._score * 100.0f; + fileString << (i + 1) << ". " << className << ", score = " << score << '\n'; + } + + fileString << '\n'; + } + + if (_numResults == 1001) { + fileString << "End of results capture\n"; + flushFile = true; + } + + // Store the top score to write to the console + outString << std::fixed << std::setprecision(1); + std::string className = _pApp->_imageNetClasses[results[0]._index]; + float score = results[0]._score * 100.0f; + outString << _numResults << " - " << className << ", score = " << score << '\n'; + + // Write the results to the file + std::string writeFileString = fileString.str(); + if (not writeFileString.empty()) { + StreamingInferenceApp::_resultsStream << writeFileString; + if (flushFile) { + StreamingInferenceApp::_resultsStream << std::endl; + } + } + + // Write the top score to the console + std::cout << outString.str(); + + // Start again + StartAsync(); +} + + +/** + * Load the categories and store them in _imageNetClasses + */ +void StreamingInferenceApp::LoadClassNames() { + _imageNetClasses.resize(1001); + + bool validClassFile = false; + std::filesystem::path classNameFilePath = "categories.txt"; + + if (std::filesystem::exists(classNameFilePath)) { + size_t classIndex = 0; + std::ifstream classNameStream(classNameFilePath); + + if (classNameStream) { + std::string className; + while (std::getline(classNameStream, className)) { + if (classIndex < 1001) _imageNetClasses[classIndex] = className; + + classIndex++; + } + + validClassFile = (classIndex == 1001); + if (not validClassFile) { + std::cout << "Ignoring the categories.txt file. The file is expected to be a text file " + "with 1000 lines.\n"; + } + } + } else { + std::cout << "No categories.txt file found. This file should contain 1000\n" + "lines, with the name of each category on each line.\n"; + } + + if (not validClassFile) { + _imageNetClasses[0] = "NONE"; + for (size_t i = 1; i <= 1000; i++) { + _imageNetClasses[i] = "Image class #" + std::to_string(i); + } + } +} + +static void SigIntHandler(int) { + std::cout << "\nCtrl+C detected. Shutting down application\n"; + std::lock_guard<std::mutex> lock(StreamingInferenceApp::_signalMutex); + StreamingInferenceApp::_signalConditionVariable.notify_one(); +} + +void StreamingInferenceApp::OsStartup() { + // Ctrl+C will exit the application + signal(SIGINT, SigIntHandler); +} |
