// Copyright 2023 Intel Corporation. // // This software and the related documents are Intel copyrighted materials, // and your use of them is governed by the express license under which they // were provided to you ("License"). Unless the License provides otherwise, // you may not use, modify, copy, publish, distribute, disclose or transmit // this software or the related documents without Intel's prior written // permission. // // This software and the related documents are provided as is, with no express // or implied warranties, other than those that are expressly stated in the // License. #include "streaming_inference_app.h" #include #include #include #include #include #include #include #include #include #include "dla_plugin_config.hpp" using namespace std::chrono_literals; std::ofstream StreamingInferenceApp::_resultsStream("results.txt"); std::mutex StreamingInferenceApp::_signalMutex; std::condition_variable StreamingInferenceApp::_signalConditionVariable; std::chrono::time_point StreamingInferenceApp::_startTime; int main(int numParams, char* paramValues[]) { StreamingInferenceApp app(numParams, paramValues); try { app.Run(); } catch (const std::exception& ex) { std::cerr << ex.what() << '\n'; } return 0; } StreamingInferenceApp::StreamingInferenceApp(int numParams, char* paramValues[]) : _commandLine(numParams, paramValues) { OsStartup(); LoadClassNames(); } StreamingInferenceApp::~StreamingInferenceApp() { timespec waitTimeout = {}; if (_pCancelSemaphore) { // Reset the cancel semaphore int r = 0; do { r = ::sem_timedwait(_pCancelSemaphore, &waitTimeout); } while (r == 0); ::sem_close(_pCancelSemaphore); } if (_pReadyForImageStreamSemaphore) { // Reset the ready semaphore int r = 0; do { r = ::sem_timedwait(_pReadyForImageStreamSemaphore, &waitTimeout); } while (r == 0); ::sem_close(_pReadyForImageStreamSemaphore); } } void StreamingInferenceApp::Run() { std::filesystem::path pluginsFilename = "plugins.xml"; std::string deviceName; std::string arch; std::string model; // Get the command line options for the model, arch file, and device if (not _commandLine.GetOption("model", model) or not _commandLine.GetOption("arch", arch) or not _commandLine.GetOption("device", deviceName)) { return Usage(); } std::filesystem::path architectureFilename = arch; std::filesystem::path compiledModelFilename = model; // Check that the provided files do in fact exist if (not CheckFileExists(architectureFilename, "architecture") or not CheckFileExists(pluginsFilename, "plugins") or not CheckFileExists(compiledModelFilename, "compiled model")) { return; } InferenceEngine::Core inferenceEngine(pluginsFilename); // Setup CoreDLA private configuration parameters const std::map configParameters; inferenceEngine.SetConfig({{DLIAPlugin::properties::arch_path.name(), architectureFilename}}, "FPGA"); // If dropSourceBuffers is 0, no input buffers are dropped // If dropSourceBuffers is 1, then 1 buffer is processed, 1 gets dropped // If dropSourceBuffers is 2, then 1 buffer is processed, 2 get dropped, etc. uint32_t dropSourceBuffers = 0; inferenceEngine.SetConfig({{DLIAPlugin::properties::streaming_drop_source_buffers.name(), std::to_string(dropSourceBuffers)}, {DLIAPlugin::properties::external_streaming.name(), CONFIG_VALUE(YES)}}, "FPGA"); std::ifstream inputFile(compiledModelFilename, std::fstream::binary); if (not inputFile) { std::cout << "Failed to load compiled model file.\n"; return; } // Load the model to the device InferenceEngine::ExecutableNetwork importedNetwork = inferenceEngine.ImportNetwork(inputFile, deviceName, {}); // The plugin defines the number of inferences requests required for streaming uint32_t numStreamingInferenceRequests = importedNetwork.GetMetric(DLIAPlugin::properties::num_streaming_inference_requests.name()).as(); const std::string cancelSemaphoreName = importedNetwork.GetMetric(DLIAPlugin::properties::cancel_semaphore_name.name()).as(); _cancelSemaphoreName = cancelSemaphoreName; for (uint32_t i = 0; i < numStreamingInferenceRequests; i++) { auto spInferenceData = std::make_shared(this, importedNetwork, i); _inferences.push_back(spInferenceData); } // Start the inference requests. Streaming inferences will reschedule // themselves when complete for (auto& inference : _inferences) { inference->StartAsync(); } std::cout << "Ready to start image input stream.\n"; // Signal the image streaming app that we are ready, so it can // begin transferring files SetReadyForImageStreamSemaphore(); // Wait until Ctrl+C bool done = false; while (not done) { std::unique_lock lock(_signalMutex); done = (_signalConditionVariable.wait_for(lock, 1000ms) != std::cv_status::timeout); } SetShutdownSemaphore(); for (auto& inference : _inferences) { inference->Cancel(); } _inferences.clear(); } void StreamingInferenceApp::SetShutdownSemaphore() { _pCancelSemaphore = ::sem_open(_cancelSemaphoreName.c_str(), O_CREAT, 0644, 0); if (_pCancelSemaphore) { ::sem_post(_pCancelSemaphore); } } void StreamingInferenceApp::SetReadyForImageStreamSemaphore() { _pReadyForImageStreamSemaphore = ::sem_open("/CoreDLA_ready_for_streaming", O_CREAT, 0644, 0); if (_pReadyForImageStreamSemaphore) { ::sem_post(_pReadyForImageStreamSemaphore); } } /** * Print a help menu to the console */ void StreamingInferenceApp::Usage() { std::cout << "Usage:\n"; std::cout << "\tstreaming_inference_app -model= -arch= -device=\n\n"; std::cout << "Where:\n"; std::cout << "\t is the compiled model binary file, eg /home/root/resnet-50-tf/RN50_Performance_no_folding.bin\n"; std::cout << "\t is the architecture file, eg /home/root/resnet-50-tf/A10_Performance.arch\n"; std::cout << "\t is the OpenVINO device ID, eg HETERO:FPGA or HETERO:FPGA,CPU\n"; } /** * Check that a file exists * * @param[in] filename Filename to check * @param[in] message Description of file to display if it does not exist * @returns true if the file exists, false otherwise */ bool StreamingInferenceApp::CheckFileExists(const std::filesystem::path& filename, const std::string& message) { if (not std::filesystem::exists(filename)) { std::cout << "Can't find " << message << ", '" << filename.c_str() << "'\n"; return false; } return true; } //////////// std::atomic SingleInferenceData::_atomic{0}; uint32_t SingleInferenceData::_numResults = 0; SingleInferenceData::SingleInferenceData(StreamingInferenceApp* pApp, InferenceEngine::ExecutableNetwork& importedNetwork, uint32_t index) : _pApp(pApp), _importedNetwork(importedNetwork), _index(index), _inferenceCount(0) { // Set up output blob InferenceEngine::ConstOutputsDataMap outputsInfo = importedNetwork.GetOutputsInfo(); std::shared_ptr spOutputInfo = outputsInfo.begin()->second; std::string outputName = outputsInfo.begin()->first; _spOutputBlob = CreateOutputBlob(spOutputInfo); // Create an inference request and set its completion callback _inferenceRequest = importedNetwork.CreateInferRequest(); auto inferenceRequestCompleteCB = [=]() { ProcessResult(); }; _inferenceRequest.SetCompletionCallback(inferenceRequestCompleteCB); // Assign the output blob to the inference request _inferenceRequest.SetBlob(outputName, _spOutputBlob); } std::shared_ptr SingleInferenceData::CreateOutputBlob( std::shared_ptr spOutputInfo) { const InferenceEngine::TensorDesc& outputTensorDesc = spOutputInfo->getTensorDesc(); std::shared_ptr pOutputBob = InferenceEngine::make_shared_blob(outputTensorDesc); pOutputBob->allocate(); InferenceEngine::MemoryBlob::Ptr pMemoryBlob = InferenceEngine::as(pOutputBob); if (pMemoryBlob) { auto lockedMemory = pMemoryBlob->wmap(); float* pOutputBlobData = lockedMemory.as(); if (pOutputBlobData) { size_t outputSize = pOutputBob->size(); for (size_t i = 0; i < outputSize; i++) { pOutputBlobData[i] = 0.0f; } } } return pOutputBob; } void SingleInferenceData::StartAsync() { _inferenceCount = _atomic++; _inferenceRequest.StartAsync(); } void SingleInferenceData::Wait() { _inferenceRequest.Wait(); } void SingleInferenceData::Cancel() { _inferenceRequest.Cancel(); } /** * Stores the results of an inference * * The index corresponds to the category of the image, and the score is * the confidence level of the image. */ class ResultItem { public: uint32_t _index; float _score; bool operator<(const ResultItem& other) { return (_score > other._score); } }; /** * Called when inference request has completed * * The inference results are floating point numbers consisting of the score for each category. * The scores are then sorted and the highest is written to the console. The top 5 scores of the * first 1000 images are saved to results.txt. * * Set as a callback in SingleInferenceData() */ void SingleInferenceData::ProcessResult() { if (_pApp and _pApp->IsCancelling()) { return; } // Increment the number of inference results that have returned thus far _numResults++; // If this is the first returned inference, store the current time to calculate the inference rate if (_numResults == 1) { StreamingInferenceApp::_startTime = std::chrono::system_clock::now(); } else if (_numResults == 101) { // The inference rate is calculated afer 100 results have been received auto endTime = std::chrono::system_clock::now(); auto duration = endTime - StreamingInferenceApp::_startTime; double durationMS = (double)std::chrono::duration_cast(duration).count(); double durationSecondsOne = durationMS / 100000.0; double rate = 1.0 / durationSecondsOne; std::cout << "Inference rate = " << rate << '\n'; } // Create a float pointer to the returned data size_t outputSize = _spOutputBlob->size(); float* pOutputData = _spOutputBlob->buffer().as(); if (!pOutputData) { return; } // Store each score as a ResultItem std::vector results; for (size_t i = 0; i < outputSize; i++) { results.push_back({(uint32_t)i, pOutputData[i]}); } // Sort the scores and set up the output streams std::sort(results.begin(), results.end()); std::stringstream fileString; std::stringstream outString; bool flushFile = false; // Store the top 5 results of the first 1000 images to be written to a file if (_numResults <= 1000) { fileString << "Result: image[" << _numResults << "]\n"; fileString << std::fixed << std::setprecision(1); for (size_t i = 0; i < 5; i++) { std::string className = _pApp->_imageNetClasses[results[i]._index]; float score = results[i]._score * 100.0f; fileString << (i + 1) << ". " << className << ", score = " << score << '\n'; } fileString << '\n'; } if (_numResults == 1001) { fileString << "End of results capture\n"; flushFile = true; } // Store the top score to write to the console outString << std::fixed << std::setprecision(1); std::string className = _pApp->_imageNetClasses[results[0]._index]; float score = results[0]._score * 100.0f; outString << _numResults << " - " << className << ", score = " << score << '\n'; // Write the results to the file std::string writeFileString = fileString.str(); if (not writeFileString.empty()) { StreamingInferenceApp::_resultsStream << writeFileString; if (flushFile) { StreamingInferenceApp::_resultsStream << std::endl; } } // Write the top score to the console std::cout << outString.str(); // Start again StartAsync(); } /** * Load the categories and store them in _imageNetClasses */ void StreamingInferenceApp::LoadClassNames() { _imageNetClasses.resize(1001); bool validClassFile = false; std::filesystem::path classNameFilePath = "categories.txt"; if (std::filesystem::exists(classNameFilePath)) { size_t classIndex = 0; std::ifstream classNameStream(classNameFilePath); if (classNameStream) { std::string className; while (std::getline(classNameStream, className)) { if (classIndex < 1001) _imageNetClasses[classIndex] = className; classIndex++; } validClassFile = (classIndex == 1001); if (not validClassFile) { std::cout << "Ignoring the categories.txt file. The file is expected to be a text file " "with 1000 lines.\n"; } } } else { std::cout << "No categories.txt file found. This file should contain 1000\n" "lines, with the name of each category on each line.\n"; } if (not validClassFile) { _imageNetClasses[0] = "NONE"; for (size_t i = 1; i <= 1000; i++) { _imageNetClasses[i] = "Image class #" + std::to_string(i); } } } static void SigIntHandler(int) { std::cout << "\nCtrl+C detected. Shutting down application\n"; std::lock_guard lock(StreamingInferenceApp::_signalMutex); StreamingInferenceApp::_signalConditionVariable.notify_one(); } void StreamingInferenceApp::OsStartup() { // Ctrl+C will exit the application signal(SIGINT, SigIntHandler); }