summaryrefslogtreecommitdiff
path: root/python/openvino/runtime/coredla_device/src
diff options
context:
space:
mode:
Diffstat (limited to 'python/openvino/runtime/coredla_device/src')
-rw-r--r--python/openvino/runtime/coredla_device/src/coredla_batch_job.cpp125
-rw-r--r--python/openvino/runtime/coredla_device/src/coredla_device.cpp574
-rw-r--r--python/openvino/runtime/coredla_device/src/coredla_graph_job.cpp279
-rw-r--r--python/openvino/runtime/coredla_device/src/device_memory_allocator.cpp80
-rw-r--r--python/openvino/runtime/coredla_device/src/mmd_wrapper.cpp172
-rw-r--r--python/openvino/runtime/coredla_device/src/stream_controller_comms.cpp274
6 files changed, 1504 insertions, 0 deletions
diff --git a/python/openvino/runtime/coredla_device/src/coredla_batch_job.cpp b/python/openvino/runtime/coredla_device/src/coredla_batch_job.cpp
new file mode 100644
index 0000000..9ac7598
--- /dev/null
+++ b/python/openvino/runtime/coredla_device/src/coredla_batch_job.cpp
@@ -0,0 +1,125 @@
+// Copyright 2020-2023 Intel Corporation.
+//
+// This software and the related documents are Intel copyrighted materials,
+// and your use of them is governed by the express license under which they
+// were provided to you ("License"). Unless the License provides otherwise,
+// you may not use, modify, copy, publish, distribute, disclose or transmit
+// this software or the related documents without Intel's prior written
+// permission.
+//
+// This software and the related documents are provided as is, with no express
+// or implied warranties, other than those that are expressly stated in the
+// License.
+
+#include "coredla_batch_job.h" //CoreDlaBatchJob
+#include "dla_dma_constants.h" //DLA_DMA_CSR_OFFSET_***
+#include "stream_controller_comms.h"
+
+static constexpr int CONFIG_READER_DATA_BYTES = 8;
+
+std::unique_ptr<BatchJob> CoreDlaBatchJob::MakeUnique(MmdWrapper* mmdWrapper,
+ uint64_t totalConfigWords,
+ uint64_t configBaseAddrDDR,
+ uint64_t inputAddrDDR,
+ uint64_t outputAddrDDR,
+ uint64_t inputSizeDDR,
+ uint64_t outputSizeDDR,
+ const bool enableIstream,
+ const bool enableOstream,
+ int instance,
+ std::shared_ptr<StreamControllerComms> spStreamControllerComms) {
+ return std::unique_ptr<BatchJob>(new CoreDlaBatchJob(mmdWrapper,
+ totalConfigWords,
+ configBaseAddrDDR,
+ inputAddrDDR,
+ outputAddrDDR,
+ inputSizeDDR,
+ outputSizeDDR,
+ enableIstream,
+ enableOstream,
+ instance,
+ spStreamControllerComms));
+}
+CoreDlaBatchJob::CoreDlaBatchJob(MmdWrapper* mmdWrapper,
+ uint64_t totalConfigWords,
+ uint64_t configBaseAddrDDR,
+ uint64_t inputAddrDDR,
+ uint64_t outputAddrDDR,
+ uint64_t inputSizeDDR,
+ uint64_t outputSizeDDR,
+ const bool enableIstream,
+ const bool enableOstream,
+ int instance,
+ std::shared_ptr<StreamControllerComms> spStreamControllerComms)
+: mmdWrapper_(mmdWrapper)
+, instance_(instance)
+, totalConfigWords_(totalConfigWords)
+, configBaseAddrDDR_(configBaseAddrDDR)
+, inputAddrDDR_(inputAddrDDR)
+, outputAddrDDR_(outputAddrDDR)
+, inputSizeDDR_(inputSizeDDR)
+, outputSizeDDR_(outputSizeDDR)
+, enableIstream_(enableIstream)
+, enableOstream_(enableOstream)
+, lastJobQueueNumber_(0)
+, spStreamControllerComms_(spStreamControllerComms) {
+}
+
+// This function must be called by a single thread
+// It can be called on a different thread than StartDla or WaitForDla
+void CoreDlaBatchJob::LoadInputFeatureToDDR(void* inputArray) {
+ mmdWrapper_->WriteToDDR(instance_, inputAddrDDR_, inputSizeDDR_, inputArray);
+ StartDla();
+}
+
+void CoreDlaBatchJob::ScheduleInputFeature() const {
+ if (spStreamControllerComms_) {
+ // Send message to NIOS-V
+ uint64_t configurationSize64 = (totalConfigWords_ / CONFIG_READER_DATA_BYTES) - 2;
+ uint32_t configurationBaseAddressDDR = static_cast<uint32_t>(configBaseAddrDDR_);
+ uint32_t configurationSize = static_cast<uint32_t>(configurationSize64);
+ uint32_t inputAddressDDR = static_cast<uint32_t>(inputAddrDDR_);
+ uint32_t outputAddressDDR = static_cast<uint32_t>(outputAddrDDR_);
+
+ Payload<CoreDlaJobPayload> item;
+ item._configurationBaseAddressDDR = configurationBaseAddressDDR;
+ item._configurationSize = configurationSize;
+ item._inputAddressDDR = inputAddressDDR;
+ item._outputAddressDDR = outputAddressDDR;
+
+ spStreamControllerComms_->ScheduleItems( { item } );
+ }
+}
+
+// This function must be called by a single thread
+// It can be called on a different thread than WaitForDla or LoadInputFeatureToDDR
+void CoreDlaBatchJob::StartDla() {
+ //////////////////////////////////////
+ // Write to CSR to start the FPGA //
+ //////////////////////////////////////
+
+ // interrupt mask was already enabled in the DlaDevice constructor
+
+ // intermediate buffer address was already set when the graph was loaded
+
+ // base address for config reader
+ mmdWrapper_->WriteToCsr(instance_, DLA_DMA_CSR_OFFSET_CONFIG_BASE_ADDR, configBaseAddrDDR_);
+
+ // how many words for config reader to read
+ // hardware wants the number of words minus 2 since the implementation is a down counter which ends at -1, the sign
+ // bit is used to denote the end of the counter range
+ mmdWrapper_->WriteToCsr(instance_, DLA_DMA_CSR_OFFSET_CONFIG_RANGE_MINUS_TWO, (totalConfigWords_ / CONFIG_READER_DATA_BYTES) - 2);
+
+ if (enableIstream_ && enableOstream_) {
+ // Arm the streaming interface. Will continuously load configs.
+ const unsigned int enable = 1;
+ mmdWrapper_->WriteToCsr(instance_, DLA_CSR_OFFSET_READY_STREAMING_IFACE, enable);
+ } else {
+ // base address for feature reader -- this will trigger one run of DLA
+ mmdWrapper_->WriteToCsr(instance_, DLA_DMA_CSR_OFFSET_INPUT_OUTPUT_BASE_ADDR, inputAddrDDR_);
+ }
+}
+
+void CoreDlaBatchJob::ReadOutputFeatureFromDDR(void* outputArray) const {
+ mmdWrapper_->ReadFromDDR(instance_, outputAddrDDR_, outputSizeDDR_, outputArray);
+}
diff --git a/python/openvino/runtime/coredla_device/src/coredla_device.cpp b/python/openvino/runtime/coredla_device/src/coredla_device.cpp
new file mode 100644
index 0000000..b28d8a2
--- /dev/null
+++ b/python/openvino/runtime/coredla_device/src/coredla_device.cpp
@@ -0,0 +1,574 @@
+// Copyright 2020-2023 Intel Corporation.
+//
+// This software and the related documents are Intel copyrighted materials,
+// and your use of them is governed by the express license under which they
+// were provided to you ("License"). Unless the License provides otherwise,
+// you may not use, modify, copy, publish, distribute, disclose or transmit
+// this software or the related documents without Intel's prior written
+// permission.
+//
+// This software and the related documents are provided as is, with no express
+// or implied warranties, other than those that are expressly stated in the
+// License.
+
+#include "coredla_device.h" //CoreDlaDevice
+#include "coredla_batch_job.h" //CoreDlaBatchJob
+#include "coredla_graph_job.h" //CoreDlaBatchJob
+#include "dla_dma_constants.h" //DLA_DMA_CSR_OFFSET_***
+#include "stream_controller_comms.h"
+
+#include <algorithm> //std::count
+#include <cassert> //assert
+#include <chrono> //std::chrono::seconds
+#include <cstddef> //size_t
+#include <cstdlib> //std::getenv
+#ifndef USE_OLD_COREDLA_DEVICE
+#include <cinttypes> //printf formatters
+#endif
+#include <mutex> //std::mutex
+#include <stdexcept> //std::runtime_error
+#include <string> //std::string
+#include <iostream> //std::cerr
+#include <stdint.h> //
+#include <thread>
+#include <cinttypes>
+
+std::unique_ptr<Device> Device::MakeUnique(const arch_params* archParams,
+ uint32_t waitForDlaTimeoutSeconds) {
+ return std::unique_ptr<Device>(new CoreDlaDevice(waitForDlaTimeoutSeconds));
+}
+
+void InterruptServiceRoutine(int handle, void* data) {
+ InterruptServiceRoutineData* isrData = static_cast<InterruptServiceRoutineData*>(data);
+ // clear interrupt status -- write 1 to clear that bit
+ constexpr int writeDataToClearInterruptStatus = 3;
+ const int numInstances = static_cast<int>(isrData->jobsFinished.size());
+ for (int i = 0; i < numInstances; i++) {
+ isrData->mmdWrapper->WriteToCsr(i, DLA_DMA_CSR_OFFSET_INTERRUPT_CONTROL, writeDataToClearInterruptStatus);
+ }
+ for (int i = 0; i < numInstances; i++) {
+ isrData->desc_queue_diag[i] = isrData->mmdWrapper->ReadFromCsr(i, DLA_DMA_CSR_OFFSET_DESC_DIAGNOSTICS);
+ // ask the csr how many jobs have finished
+ uint32_t completionCount = isrData->mmdWrapper->ReadFromCsr(i, DLA_DMA_CSR_OFFSET_COMPLETION_COUNT);
+ // check if the completionCount wraps around (overflow detection) and save this information
+ if (isrData->prevCount[i] > completionCount)
+ isrData->base_multiplier[i] ++;
+ isrData->prevCount[i] = completionCount;
+ // we add base_multiplier to account for the fact that a wrap around is actually an increment of 1
+ std::unique_lock<std::mutex> isrMutexLock(isrData->isrMutex[i]);
+ isrData->jobsFinished[i] = (uint64_t) isrData->base_multiplier[i] * UINT32_MAX + completionCount + isrData->base_multiplier[i];
+ isrData->isrCondVar[i].notify_all();
+ }
+}
+
+CoreDlaDevice::CoreDlaDevice(uint32_t waitForDlaTimeoutSeconds)
+: waitForDlaTimeoutSeconds_(waitForDlaTimeoutSeconds) {
+#ifdef COREDLA_RUNTIME_POLLING
+ runtimePolling_ = true;
+#else
+ runtimePolling_ = false;
+#endif
+ // mmdWrapper_ ctor runs first, which will open a handle to the MMD. Now determine the number of hardware instances
+ // by writing a nonzero value to some offset and then reading it back. While trying to enable the interrupt
+ // mask, test for this.
+ numInstances_ = 0;
+ for (int i = 0; i < mmdWrapper_.GetMaxInstances(); i++) {
+ constexpr uint32_t allInterruptsMask = (1<<DLA_DMA_CSR_INTERRUPT_ERROR_BIT) | (1<<DLA_DMA_CSR_INTERRUPT_DONE_BIT);
+ // clear any pending interrupts (there may be pending interrupts from last run), then enable mask for instance count
+ mmdWrapper_.WriteToCsr(i, DLA_DMA_CSR_OFFSET_INTERRUPT_CONTROL, allInterruptsMask);
+ mmdWrapper_.WriteToCsr(i, DLA_DMA_CSR_OFFSET_INTERRUPT_MASK, allInterruptsMask);
+ uint32_t readData = mmdWrapper_.ReadFromCsr(i, DLA_DMA_CSR_OFFSET_INTERRUPT_MASK);
+ if (allInterruptsMask == readData) numInstances_ = i + 1;
+ }
+ LOG_AND_PRINT(Logger::INFO, "numInstances_: %d\n", numInstances_);
+ assert(numInstances_ >= 1);
+ jobsWaited_.resize(numInstances_, 0);
+
+ uint32_t license = mmdWrapper_.ReadFromCsr(0, DLA_DMA_CSR_OFFSET_LICENSE_FLAG);
+ if (license == 0) {
+ DLA_LOG("Using unlicensed IP\n");
+ }
+ else if (license == 1) {
+ DLA_LOG("Using licensed IP\n");
+ }
+ else {
+ throw std::runtime_error("Unrecongnized license flag");
+ }
+#ifndef USE_OLD_COREDLA_DEVICE
+ startClocksActive.resize(numInstances_, 0);
+ startClockAllJobs.resize(numInstances_, 0);
+#endif
+ startNumInputFeatureMemoryReads.resize(numInstances_, 0);
+ startNumFilterMemoryReads.resize(numInstances_, 0);
+ startNumOutputFeatureMemoryWrites.resize(numInstances_, 0);
+
+ // Package up the data that interrupt service routine needs
+ isrData_.mmdWrapper = &mmdWrapper_;
+ isrData_.jobsFinished = std::vector<uint64_t>(numInstances_, 0);
+ isrData_.base_multiplier = std::vector<uint32_t>(numInstances_, 0);
+ isrData_.prevCount = std::vector<uint32_t>(numInstances_, 0);
+ isrData_.desc_queue_diag = std::vector<uint32_t>(numInstances_, 0);
+ isrData_.isrMutex = std::vector<std::mutex>(numInstances_);
+ isrData_.isrCondVar = std::vector<std::condition_variable>(numInstances_);
+
+ if (runtimePolling_) {
+ // disable the interrupt mask -- it was originally enabled to determine how many instances were present
+ for (int i = 0; i < mmdWrapper_.GetMaxInstances(); i++) {
+ constexpr uint32_t disableInterruptMaskValue = 0;
+ mmdWrapper_.WriteToCsr(i, DLA_DMA_CSR_OFFSET_INTERRUPT_MASK, disableInterruptMaskValue);
+ }
+ }
+ else {
+ // register an interrupt handler
+ mmdWrapper_.RegisterISR(&InterruptServiceRoutine, &isrData_);
+ }
+
+ // Record the current counters
+ for(int i=0; i < numInstances_; i++) {
+#ifndef USE_OLD_COREDLA_DEVICE
+ jobsWaited_[i] = mmdWrapper_.ReadFromCsr(i, DLA_DMA_CSR_OFFSET_COMPLETION_COUNT);
+ isrData_.jobsFinished[i] = jobsWaited_[i];
+
+ startClocksActive[i] = GetClocksActive(i);
+ startClockAllJobs[i] = GetClocksAllJobs(i);
+#endif
+ startNumInputFeatureMemoryReads.at(i) = GetNumInputFeatureMemoryReadsTotal(i);
+ startNumFilterMemoryReads.at(i) = GetNumFilterMemoryReadsTotal(i);
+ startNumOutputFeatureMemoryWrites.at(i) = GetNumOutputFeatureMemoryWritesTotal(i);
+ }
+
+ // Allocator needs access to mmd to write to CSR the start address of the shared intermediate buffer allocated in DDR
+ ddrAllocator_ = std::unique_ptr<DeviceMemoryAllocator[]>(new DeviceMemoryAllocator[numInstances_]);
+ for (int i = 0; i < numInstances_; i++) {
+ ddrAllocator_[i].Initialize(mmdWrapper_.GetDDRSizePerInstance(), &mmdWrapper_);
+ }
+
+// Choose which data pattern you want, all zeros or all ones can also be useful for IP debug purposes
+#define DEBUG_RUNTIME_MEMORY_TEST_PATTERN(ADDR, INDEX) ((ADDR * 12345) + (INDEX * 6789))
+ //#define DEBUG_RUNTIME_MEMORY_TEST_PATTERN(ADDR,INDEX) (0)
+ //#define DEBUG_RUNTIME_MEMORY_TEST_PATTERN(ADDR,INDEX) (0xffffffffffffffffULL)
+ bool run_memory_test = getenv("COREDLA_RUNTIME_MEMORY_TEST") != nullptr;
+ if (run_memory_test) {
+ // Ensure host can access all of the device memory that is accessible by all CoreDLA instances
+ // This is not necessarily the total device memory e.g. only 1 CoreDLA instance but 2 DDR banks
+ DLA_LOG("starting memory test with %d instances\n", numInstances_);
+ constexpr uint64_t CHUNK_SIZE = 1ULL << 20; // one address check is 1 MB
+ const uint64_t ADDR_LIMIT = mmdWrapper_.GetDDRSizePerInstance();
+ int mismatch = 0;
+ uint64_t expected;
+ uint64_t* data = new uint64_t[CHUNK_SIZE / sizeof(uint64_t)];
+
+ for (int inst = 0; inst < numInstances_; ++inst) {
+ // write to entire fpga ddr
+ for (uint64_t addr = 0; addr < ADDR_LIMIT; addr += CHUNK_SIZE) {
+ for (uint64_t index = 0; index < CHUNK_SIZE / sizeof(uint64_t); index++)
+ data[index] = DEBUG_RUNTIME_MEMORY_TEST_PATTERN(addr, index);
+ mmdWrapper_.WriteToDDR(inst, addr, CHUNK_SIZE, static_cast<const void*>(data));
+ }
+ // read back entire fpga ddr and compare to expected
+ for (uint64_t addr = 0; addr < ADDR_LIMIT; addr += CHUNK_SIZE) {
+ mmdWrapper_.ReadFromDDR(inst, addr, CHUNK_SIZE, data);
+ for (uint64_t index = 0; index < CHUNK_SIZE / sizeof(uint64_t); index++) {
+ expected = DEBUG_RUNTIME_MEMORY_TEST_PATTERN(addr, index);
+ if (data[index] != expected) {
+ if (mismatch < 10) {
+#if (!defined(USE_OLD_COREDLA_DEVICE) || defined(_WIN32))
+ DLA_LOG("memory test mismatch, addr %" PRIu64 ", index %" PRIu64 ", got %" PRIu64 ", expected %" PRIu64
+ "\n",
+ addr,
+ index,
+ data[index],
+ expected);
+#else
+ DLA_LOG("memory test mismatch, addr %lu, index %lu, got %lu, expected %lu\n",
+ addr,
+ index,
+ data[index],
+ expected);
+#endif
+ }
+ mismatch++;
+ }
+ }
+ }
+ }
+ delete[] data;
+ DLA_LOG("finished memory test ");
+ if (mismatch == 0) {
+ DLA_LOG("SUCCESS\n");
+ } else {
+ DLA_LOG("FAILURE (%d mismatches)\n", mismatch);
+ }
+ }
+}
+
+CoreDlaDevice::~CoreDlaDevice() {
+ // Avoid the scenario where some CoreDLA job has been started but something goes wrong
+ // in the runtime which causes it to exit, e.g. assertion failure or uncaught exception.
+ // CoreDLA will still raise an interrupt when the job finishes, yet the runtime will no
+ // longer be able to deal with it. Better to shut off interurpts.
+ for (int instance = 0; instance < numInstances_; instance++) {
+ // MmDWrapper.WriteToCSR might throw exception, and the destructor should not have
+ // unhandled exception, so we need to handle exceptions internally
+ try {
+ mmdWrapper_.WriteToCsr(instance, DLA_DMA_CSR_OFFSET_INTERRUPT_MASK, 0);
+ } catch (const std::exception& e) {
+ std::cerr << "Failed to shut off the DMA CSR interrupt mask due to " << e.what() << std::endl;
+ }
+ }
+}
+
+GraphJob* CoreDlaDevice::CreateGraphJob(const dla::CompiledResult* compiledResult,
+#ifndef USE_OLD_COREDLA_DEVICE
+ size_t numPipelines,
+#else
+ uint64_t numPipelines,
+#endif
+ int instance,
+ std::string AES_key,
+ std::string IV_key,
+ bool encryption_enabled,
+ const std::string export_dir,
+ const std::string parameter_rom_export_dir) {
+ assert(instance < numInstances_);
+ (void) export_dir; // unused in HW runtime. CoreDLA utilizes base pointers, which the SW emulator utilizes this variable. We void it here.
+ allGraphJobs_.push_back(move(
+ CoreDlaGraphJob::MakeUnique(&ddrAllocator_[instance], &mmdWrapper_, compiledResult, numPipelines, instance, spStreamControllerComms_)));
+ return (allGraphJobs_.back()).get();
+}
+
+// This function must be called by a single thread
+void CoreDlaDevice::WaitForDla(int instance, size_t threadId, std::function<bool()> isCancelledPredicate) {
+ // ISR updates jobsFinished, if not enough jobs have finished then sleep until ISR runs again
+ // it is possible that several hardware jobs could finish around the same time
+ // by the time software handles the first interrupt, hardware could report that 2 jobs have
+ // finished, for example the second time that waitForInterrupt runs, software already tracks
+ // that the second job has finished and therefore don't need to sleep waiting for ISR
+ std::unique_lock<std::mutex> isrMutexLock(isrData_.isrMutex[instance]);
+ uint32_t completionCount = 0;
+ bool timedOut = false;
+ auto timeoutDuration = std::chrono::seconds(waitForDlaTimeoutSeconds_);
+
+ if (runtimePolling_) {
+ std::chrono::time_point<std::chrono::system_clock> pollingEndingTime =
+ std::chrono::system_clock::now() + timeoutDuration;
+
+ while (isrData_.jobsFinished[instance] == jobsWaited_[instance]) {
+ // Update isrData_.jobsFinished[instance] here (polling)
+ if (isCancelledPredicate and isCancelledPredicate()) {
+ break;
+ }
+
+ completionCount = mmdWrapper_.ReadFromCsr(instance, DLA_DMA_CSR_OFFSET_COMPLETION_COUNT);
+ isrData_.jobsFinished[instance] = completionCount;
+ if (std::chrono::system_clock::now() > pollingEndingTime) {
+ timedOut = true;
+ break;
+ }
+ }
+ } else {
+ while (isrData_.jobsFinished[instance] == jobsWaited_[instance]) {
+ // isrData_.jobsFinished[instance] is updated in the ISR
+ if (std::cv_status::timeout == isrData_.isrCondVar[instance].wait_for(isrMutexLock, timeoutDuration)) {
+ timedOut = true;
+ break;
+ }
+ }
+ }
+
+ if (timedOut) {
+ std::string str_poll_vs_int = "interrupt";
+ if (runtimePolling_) {
+ str_poll_vs_int = "polling";
+ }
+ std::string timeoutMsg = "WaitForDla " + str_poll_vs_int + " timeout with threadId_" + std::to_string(threadId) + "\n";
+
+ // Timeout has happened if we get here
+ timeoutMsg += "If inference on one batch is expected to take more than " +
+ std::to_string(waitForDlaTimeoutSeconds_) +
+ " seconds, then increase WAIT_FOR_DLA_TIMEOUT in dlia_plugin.cpp and "
+ "recompile the runtime.\n";
+ DLA_LOG("%s", timeoutMsg.c_str()); // this should always print, even if logging
+ // verbosity is too low
+ LOG(Logger::WARNING, "%s", timeoutMsg.c_str());
+ std::string exceptionMsg = "FATAL ERROR: inference on FPGA did not complete";
+ exceptionMsg += ", jobs finished " + std::to_string(isrData_.jobsFinished[instance]);
+ exceptionMsg += ", jobs waited " + std::to_string(jobsWaited_[instance]);
+ throw std::runtime_error(exceptionMsg);
+ }
+
+ if ((isrData_.desc_queue_diag[instance] >> DLA_DMA_CSR_DESC_DIAGNOSTICS_OUT_OF_INFERENCES_BIT) & 0x01) {
+ std::cerr << "ERROR: Out of free inferences on this IP. " <<
+ "The Intel FPGA AI suite cannot continue without a license!" << std::endl;
+ std::string exceptionMsg = "Inference on FPGA exited with a license error";
+ exceptionMsg += ", jobs finished " + std::to_string(isrData_.jobsFinished[instance]);
+ exceptionMsg += ", jobs waited " + std::to_string(jobsWaited_[instance]);
+ exceptionMsg += "\nPlease check your license. The Intel FPGA AI suite cannot continue without a license!";
+ throw std::runtime_error(exceptionMsg);
+ }
+
+ jobsWaited_[instance]++;
+}
+
+#ifndef USE_OLD_COREDLA_DEVICE
+uint64_t CoreDlaDevice::GetClocksActive(int instance) const {
+ //Important: To satisfy the anti-rollover feature of the 64-bit counters in the DMA CSR
+ //the host must first read the lower 32-bit of the counter,
+ //then immediately read the higher 32-bit of the counter
+ uint32_t clocksActiveLo = mmdWrapper_.ReadFromCsr(instance, DLA_DMA_CSR_OFFSET_CLOCKS_ACTIVE_LO);
+ uint32_t clocksActiveHi = mmdWrapper_.ReadFromCsr(instance, DLA_DMA_CSR_OFFSET_CLOCKS_ACTIVE_HI);
+ return (((uint64_t)clocksActiveHi) << 32) | clocksActiveLo;
+}
+
+double CoreDlaDevice::GetActiveHWTimeMs(int instance) const {
+ uint64_t clocksActive = GetClocksActive(instance) - startClocksActive[instance];
+ // DDR clock freq is in MHz, so dividing by that would give microseconds, multiply by 1000 to get milliseconds
+ return clocksActive / (1000.0 * mmdWrapper_.GetDDRClockFreq());
+}
+
+uint64_t CoreDlaDevice::GetClocksAllJobs(int instance) const {
+ //Important: To satisfy the anti-rollover feature of the 64-bit counters in the DMA CSR
+ //the host must first read the lower 32-bit of the counter,
+ //then immediately read the higher 32-bit of the counter
+ uint32_t clocksAllJobsLo = mmdWrapper_.ReadFromCsr(instance, DLA_DMA_CSR_OFFSET_CLOCKS_ALL_JOBS_LO);
+ uint32_t clocksAllJobsHi = mmdWrapper_.ReadFromCsr(instance, DLA_DMA_CSR_OFFSET_CLOCKS_ALL_JOBS_HI);
+ return (((uint64_t)clocksAllJobsHi) << 32) | clocksAllJobsLo;
+}
+
+double CoreDlaDevice::GetAvgHWTimePerJobMs(uint64_t num_jobs, int instance) const {
+ uint64_t clocksAllJobs = GetClocksAllJobs(instance) - startClockAllJobs[instance];
+ // DDR clock freq is in MHz, so dividing by that would give microseconds, multiply by 1000 to get milliseconds
+ return clocksAllJobs / (1000.0 * mmdWrapper_.GetDDRClockFreq() * num_jobs);
+}
+#else
+double CoreDlaDevice::GetActiveHWTimeMs(int instance) const {
+ //Important: To satisfy the anti-rollover feature of the 64-bit counters in the DMA CSR
+ //the host must first read the lower 32-bit of the counter,
+ //then immediately read the higher 32-bit of the counter
+ uint32_t clocksActiveLo = mmdWrapper_.ReadFromCsr(instance, DLA_DMA_CSR_OFFSET_CLOCKS_ACTIVE_LO);
+ uint32_t clocksActiveHi = mmdWrapper_.ReadFromCsr(instance, DLA_DMA_CSR_OFFSET_CLOCKS_ACTIVE_HI);
+ uint64_t clocksActive = (((uint64_t)clocksActiveHi) << 32) | clocksActiveLo;
+ // DDR clock freq is in MHz, so dividing by that would give microseconds, multiply by 1000 to get milliseconds
+ return clocksActive / (1000.0 * mmdWrapper_.GetDDRClockFreq());
+}
+
+double CoreDlaDevice::GetAvgHWTimePerJobMs(uint64_t num_jobs, int instance) const {
+ //Important: To satisfy the anti-rollover feature of the 64-bit counters in the DMA CSR
+ //the host must first read the lower 32-bit of the counter,
+ //then immediately read the higher 32-bit of the counter
+ uint32_t clocksAllJobsLo = mmdWrapper_.ReadFromCsr(instance, DLA_DMA_CSR_OFFSET_CLOCKS_ALL_JOBS_LO);
+ uint32_t clocksAllJobsHi = mmdWrapper_.ReadFromCsr(instance, DLA_DMA_CSR_OFFSET_CLOCKS_ALL_JOBS_HI);
+ uint64_t clocksAllJobs = (((uint64_t)clocksAllJobsHi) << 32) | clocksAllJobsLo;
+ // DDR clock freq is in MHz, so dividing by that would give microseconds, multiply by 1000 to get milliseconds
+ return clocksAllJobs / (1000.0 * mmdWrapper_.GetDDRClockFreq() * num_jobs);
+}
+#endif
+
+uint64_t CoreDlaDevice::GetNumInputFeatureMemoryReads(int instance) const {
+ return GetNumInputFeatureMemoryReadsTotal(instance) - startNumInputFeatureMemoryReads.at(instance);
+}
+
+uint64_t CoreDlaDevice::GetNumFilterMemoryReads(int instance) const {
+ return GetNumFilterMemoryReadsTotal(instance) - startNumFilterMemoryReads.at(instance);
+}
+
+uint64_t CoreDlaDevice::GetNumOutputFeatureMemoryWrites(int instance) const {
+ return GetNumOutputFeatureMemoryWritesTotal(instance) - startNumOutputFeatureMemoryWrites.at(instance);
+}
+
+uint64_t CoreDlaDevice::GetNumInputFeatureMemoryReadsTotal(int instance) const {
+ //Important: To satisfy the anti-rollover feature of the 64-bit counters in the DMA CSR
+ //the host must first read the lower 32-bit of the counter,
+ //then immediately read the higher 32-bit of the counter
+ uint32_t numIFReadsLo = mmdWrapper_.ReadFromCsr(instance, DLA_DMA_CSR_OFFSET_INPUT_FEATURE_READ_COUNT_LO);
+ uint32_t numIFReadsHi = mmdWrapper_.ReadFromCsr(instance, DLA_DMA_CSR_OFFSET_INPUT_FEATURE_READ_COUNT_HI);
+ uint64_t numIFReads = (((uint64_t) numIFReadsHi) << 32) | ((uint64_t) numIFReadsLo);
+ return numIFReads;
+}
+
+uint64_t CoreDlaDevice::GetNumFilterMemoryReadsTotal(int instance) const {
+ //Important: To satisfy the anti-rollover feature of the 64-bit counters in the DMA CSR
+ //the host must first read the lower 32-bit of the counter,
+ //then immediately read the higher 32-bit of the counter
+ uint32_t numWeightReadsLo = mmdWrapper_.ReadFromCsr(instance, DLA_DMA_CSR_OFFSET_INPUT_FILTER_READ_COUNT_LO);
+ uint32_t numWeightReadsHi = mmdWrapper_.ReadFromCsr(instance, DLA_DMA_CSR_OFFSET_INPUT_FILTER_READ_COUNT_HI);
+ uint64_t numWeightReads = (((uint64_t) numWeightReadsHi) << 32) | ((uint64_t) numWeightReadsLo);
+ return numWeightReads;
+}
+
+uint64_t CoreDlaDevice::GetNumOutputFeatureMemoryWritesTotal(int instance) const {
+ //Important: To satisfy the anti-rollover feature of the 64-bit counters in the DMA CSR
+ //the host must first read the lower 32-bit of the counter,
+ //then immediately read the higher 32-bit of the counter
+ uint32_t numOFReadsLo = mmdWrapper_.ReadFromCsr(instance, DLA_DMA_CSR_OFFSET_OUTPUT_FEATURE_WRITE_COUNT_LO);
+ uint32_t numOFReadsHi = mmdWrapper_.ReadFromCsr(instance, DLA_DMA_CSR_OFFSET_OUTPUT_FEATURE_WRITE_COUNT_HI);
+ uint64_t numOFReads = (((uint64_t) numOFReadsHi) << 32) | ((uint64_t) numOFReadsLo);
+ return numOFReads;
+}
+
+// Read one 32-bit value from the debug network, return value indicates whether read was successful. A read can fail if
+// the module number and address have not been implemented. The debug network is fault tolerant to both read requests
+// never being accepted as well as read responses never being produced.
+bool CoreDlaDevice::ReadDebugCsr(
+ uint32_t moduleNum, uint32_t address, int instance, uint32_t& readData, bool verbose) const {
+ assert(moduleNum <= 0xff);
+ assert(address <= 0xffffff);
+ uint32_t addr = ((moduleNum & 0xff) << 24) | (address & 0xffffff);
+
+ // Step 1: send the address that the debug network will use to issue a read request. Writing once to this CSR offset
+ // will cause the debug network to issue one read request.
+ mmdWrapper_.WriteToCsr(instance, DLA_DMA_CSR_OFFSET_DEBUG_NETWORK_ADDR, addr);
+
+ // Optional step: read back the value sent to CSR, sanity check that it is correct. Note this is all handled
+ // internally to the CSR, e.g. the CSR does not go ask the debug network what address it sent.
+ uint32_t addrCheck = mmdWrapper_.ReadFromCsr(instance, DLA_DMA_CSR_OFFSET_DEBUG_NETWORK_ADDR);
+ if (addr != addrCheck) {
+ if (verbose) DLA_LOG("ReadDebugCsr addr read back check failed, expected %u, got %u\n", addr, addrCheck);
+ return false;
+ }
+
+ // Step 2: the debug network should produce a read response which is cached by the CSR. Poll the corresponding status
+ // register inside the CSR until this happens, or until the runtime decides to give up and declare the read a failure.
+ // Do not throw an exception if the read fails, it is allowed to fail if the runtime is trying to figure out which
+ // external debug-capable modules are attached to the debug network. Once the runtime has determined that a module is
+ // attached, only then should read failures should cause an exception.
+ uint32_t isValid = mmdWrapper_.ReadFromCsr(instance, DLA_DMA_CSR_OFFSET_DEBUG_NETWORK_VALID);
+ int retry = 5;
+ while (!isValid && retry) {
+ --retry;
+ isValid = mmdWrapper_.ReadFromCsr(instance, DLA_DMA_CSR_OFFSET_DEBUG_NETWORK_VALID);
+ }
+ if (!isValid) {
+ if (verbose) DLA_LOG("ReadDebugCsr failed to read at addr %u\n", addr);
+ return false;
+ }
+
+ // Step 3: runtime has confirmed the CSR has a cached the read response from debug network, now go and get the value.
+ readData = mmdWrapper_.ReadFromCsr(instance, DLA_DMA_CSR_OFFSET_DEBUG_NETWORK_DATA);
+ if (verbose) DLA_LOG("ReadDebugCsr, addr %u, data %u\n", addr, readData);
+ return true;
+}
+
+// This is a helper function that throws an exception if runtime fails to read from the debug network. This should only
+// be called if the runtime has already confirmed that a module is attached to the debug network i.e. a previous read to
+// this module number had succeeded.
+void ReadDebugNetworkError(int moduleNum, int address, int instance) {
+ std::string msg = "ReadDebugNetwork failure, instance " + std::to_string(instance) +
+ ", failed to read at module number " + std::to_string(moduleNum) + " address " +
+ std::to_string(address);
+ throw std::runtime_error(msg);
+}
+
+// Modules attached to the debug network have a ROM to specify the offset and description of the registers. Traverse
+// this ROM, then return a map of key/value pairs, where the key is a human readable string describing what kind of
+// information the debug register contains, and the value is the data of the debug register. Note that the runtime must
+// completely tranverse the ROM before reading any of the debug register values, and the runtime must read the debug
+// register values in the order that they occur inside the ROM. Usually profiling counters are 64-bit values, and since
+// there is only a 32-bit read available, it takes more than one read to get all the data. The counters could still be
+// updating when the runtime wants to read them, so typically there is a freeze register which can be activated by
+// reading from a special address (hardware will see an incoming read request to this address, that is how it knows to
+// freeze the counters). The offset for the freeze register will typically go first in the ROM, even if it is not the
+// first offset in the address space.
+DebugNetworkData CoreDlaDevice::ReadDebugNetwork(int instance) const {
+ DebugNetworkData result;
+ for (uint32_t moduleNum = 0; moduleNum < 256; moduleNum++) {
+ // Read the ROM to get the offsets and descriptions
+ std::vector<uint32_t> offset;
+ std::vector<std::string> description;
+ uint32_t address = 0, readData = 0;
+ bool first = true, success = false;
+ while (1) {
+ // Parse the offset
+ success = ReadDebugCsr(moduleNum, address, instance, readData);
+ if (!success) {
+ // Failure to read is allowed on the very first time, it is assumed that no external debug-capable module is
+ // attached to the debug network at this moduleNum
+ if (first)
+ break;
+ else
+ ReadDebugNetworkError(moduleNum, address, instance);
+ }
+ if (!readData) break; // end of list is indicated with offset = 0
+ first = false;
+ address += 4;
+ offset.push_back(readData);
+
+ // Parse the description string
+ std::string str;
+ bool endOfStringSeen = false;
+ while (!endOfStringSeen) {
+ success = ReadDebugCsr(moduleNum, address, instance, readData);
+ if (!success) ReadDebugNetworkError(moduleNum, address, instance);
+ address += 4;
+ for (int i = 0; i < 4; i++) {
+ if (readData & 0xff) {
+ str += ((char)(readData & 0xff));
+ readData >>= 8;
+ } else {
+ endOfStringSeen = true;
+ break;
+ }
+ }
+ }
+ description.push_back(str);
+ }
+
+ assert(offset.size() == description.size());
+
+ // Read the profiling counters
+ for (size_t i = 0; i < offset.size(); i++) {
+ address = offset[i];
+ success = ReadDebugCsr(moduleNum, address, instance, readData);
+ if (!success) ReadDebugNetworkError(moduleNum, address, instance);
+
+ int descriptionOccurenceCnt = result.count(description[i]);
+ // Same description name should show up 2 times in maximum
+ if (descriptionOccurenceCnt == 2) {
+ throw std::runtime_error("More than 2 profiling counter descriptions are the same.");
+ } else if (descriptionOccurenceCnt && (address - offset[i - 1] != 4)) {
+ // same description existed before
+ // check if the two addresses associatede with the same decription are consecutive (offset by 4)
+ throw std::runtime_error("Profiling counter addresses with name: " + description[i] + " are not consecutive");
+ } else if (std::count(offset.begin(), offset.end(), address) > 1) {
+ // same address shows up more than once
+ throw std::runtime_error("Duplicate profiling counter address: " + address);
+ }
+
+ // Avoid printing special stuff like _Freeze and _Unfreeze
+ if (description[i].at(0) != '_') {
+ if (descriptionOccurenceCnt) {
+ // This key has existed before, concatenate 2 uint32_t into uint64_t
+ result[description[i]] |= (((uint64_t)readData) << 32);
+ } else {
+ result[description[i]] = readData;
+ }
+ }
+ }
+ }
+ return result;
+}
+
+int CoreDlaDevice::GetSizeCsrDescriptorQueue() const { return DLA_DMA_CSR_DESCRIPTOR_QUEUE_LOGICAL_SIZE; }
+
+double CoreDlaDevice::GetCoreDlaClockFreq() const { return mmdWrapper_.GetCoreDlaClockFreq(); }
+
+std::string CoreDlaDevice::SchedulerGetStatus() const {
+ if (!spStreamControllerComms_) return "";
+
+ Payload<StatusMessagePayload> statusPayload = spStreamControllerComms_->GetStatus();
+ return spStreamControllerComms_->GetStatusString(statusPayload);
+}
+
+bool CoreDlaDevice::InitializeScheduler(uint32_t sourceBufferSize,
+ uint32_t dropSourceBuffers,
+ uint32_t numInferenceRequests,
+ const std::string source_fifo_file) {
+ spStreamControllerComms_ = std::make_shared<StreamControllerComms>();
+ if (spStreamControllerComms_->IsPresent()) {
+ bool initOK = spStreamControllerComms_->Initialize(sourceBufferSize, dropSourceBuffers, numInferenceRequests);
+ return initOK;
+ } else {
+ spStreamControllerComms_.reset();
+ return false;
+ }
+}
diff --git a/python/openvino/runtime/coredla_device/src/coredla_graph_job.cpp b/python/openvino/runtime/coredla_device/src/coredla_graph_job.cpp
new file mode 100644
index 0000000..c1f349f
--- /dev/null
+++ b/python/openvino/runtime/coredla_device/src/coredla_graph_job.cpp
@@ -0,0 +1,279 @@
+// Copyright 2020-2023 Intel Corporation.
+//
+// This software and the related documents are Intel copyrighted materials,
+// and your use of them is governed by the express license under which they
+// were provided to you ("License"). Unless the License provides otherwise,
+// you may not use, modify, copy, publish, distribute, disclose or transmit
+// this software or the related documents without Intel's prior written
+// permission.
+//
+// This software and the related documents are provided as is, with no express
+// or implied warranties, other than those that are expressly stated in the
+// License.
+
+#include "coredla_graph_job.h" //CoreDlaGraphJob
+
+#include <cinttypes>
+#include <cstdlib> //std::getenv
+#include <iomanip> //std::hex
+#include <iostream> //std::cerr
+#include <sstream> //std::stringstream
+#include <string> //std::string
+
+#define BUILD_VERSION_CSR_OFFSET (ARCH_HASH_SIZE)
+#define ARCH_NAME_CSR_OFFSET (ARCH_HASH_SIZE + BUILD_VERSION_SIZE)
+
+#define FLAG_DISABLE_ARCH_CHECK "DLA_DISABLE_ARCH_CHECK"
+#define FLAG_DISABLE_VERSION_CHECK "DLA_DISABLE_VERSION_CHECK"
+
+std::unique_ptr<GraphJob> CoreDlaGraphJob::MakeUnique(DeviceMemoryAllocator *ddrBufferAllocator,
+ MmdWrapper *mmdWrapper,
+ const dla::CompiledResult *compiledResult,
+ uint64_t numPipelines,
+ int instance,
+ std::shared_ptr<StreamControllerComms> spStreamControllerComms) {
+ return std::unique_ptr<GraphJob>(new CoreDlaGraphJob(
+ ddrBufferAllocator, mmdWrapper, compiledResult, numPipelines, instance, spStreamControllerComms));
+}
+
+std::string get_env_var_wrapper(const std::string &env_var) {
+ const char *env_var_ptr = std::getenv(env_var.c_str());
+ if (env_var_ptr == nullptr) {
+ return "";
+ }
+
+ return std::string(env_var_ptr);
+}
+
+std::string arch_hash_to_string(const std::vector<int> &arch_hash) {
+ std::stringstream s;
+ for (size_t i = 0; i < ARCH_HASH_WORD_SIZE; ++i) {
+ s << std::setfill('0') << std::setw(8) << std::hex << std::right << arch_hash[i] << " ";
+ }
+
+ return s.str();
+}
+
+std::string read_string_from_bitstream_rom(MmdWrapper *mmdWrapper,
+ const int instance,
+ const uint32_t str_word_size_in_bytes,
+ const uint32_t str_offset_in_rom) {
+ std::string str_from_rom;
+ bool done = false;
+ for (uint32_t i = 0; i < str_word_size_in_bytes && (!done); ++i) {
+ int chunk = mmdWrapper->ReadFromCsr(instance, str_offset_in_rom + i * 4);
+ // Parse the int word into chars. Stops at any NUL char.
+ for (int j = 0; j < 4; ++j) {
+ char rom_char = (chunk >> (j * 8)) & 0xFF;
+ if (rom_char == 0) {
+ done = true;
+ break;
+ } else {
+ str_from_rom.push_back(rom_char);
+ }
+ }
+ }
+ return str_from_rom;
+}
+
+CoreDlaGraphJob::CoreDlaGraphJob(DeviceMemoryAllocator *ddrBufferAllocator,
+ MmdWrapper *mmdWrapper,
+ const dla::CompiledResult *compiledResult,
+ uint64_t numPipelines,
+ int instance,
+ std::shared_ptr<StreamControllerComms> spStreamControllerComms)
+ : configFilterBiasBufferSizeDDR_(0),
+ intermediateBufferSizeDDR_(0),
+ ddrBufferAllocator_(ddrBufferAllocator),
+ mmdWrapper_(mmdWrapper),
+ batchJobsRequested_(0),
+ instance_(instance) {
+ // First read the arch_md5, build_version_string and arch_name string from
+ // the metadata stored in the bitstream discovery ROM, then compare them
+ // against the information present in the compiled result. Fail if it does not match.
+
+ // ARCH_HASH_SIZE bytes for the arch hash.
+ std::vector<int> bitstream_arch_hash;
+ DLA_LOG("Read hash from bitstream ROM...\n");
+ for (size_t i = 0; i < ARCH_HASH_WORD_SIZE; ++i) {
+ bitstream_arch_hash.push_back(mmdWrapper_->ReadFromCsr(instance_, i * 4));
+ }
+
+ // Next BUILD_VERSION_SIZE bytes are for the build version string
+ DLA_LOG("Read build version string from bitstream ROM...\n");
+ std::string bitstream_build_version =
+ read_string_from_bitstream_rom(mmdWrapper_, instance_, BUILD_VERSION_WORD_SIZE, BUILD_VERSION_CSR_OFFSET);
+
+ // Next ARCH_NAME_SIZE bytes are for the arch name string
+ DLA_LOG("Read arch name string from bitstream ROM...\n");
+ std::string bitstream_arch_name =
+ read_string_from_bitstream_rom(mmdWrapper_, instance_, ARCH_NAME_WORD_SIZE, ARCH_NAME_CSR_OFFSET);
+
+ // ************************ Perform all checks *******************************
+ // ***************************************************************************
+ if (get_env_var_wrapper(FLAG_DISABLE_ARCH_CHECK) != "1") {
+ DLA_LOG("Runtime arch check is enabled. Check started...\n");
+
+ for (size_t i = 0; i < ARCH_HASH_WORD_SIZE; ++i) {
+ if (compiledResult->get_arch_hash()[i] != bitstream_arch_hash[i]) {
+ std::cerr << "Arch check failed: "
+ << "compiledResult arch hash is " << arch_hash_to_string(compiledResult->get_arch_hash())
+ << ", compiledResult arch is " << compiledResult->get_arch_name() << ", bitstream arch_hash is "
+ << arch_hash_to_string(bitstream_arch_hash) << ", bitstream arch is " << bitstream_arch_name
+ << std::endl;
+
+ std::cerr << "This check can be disabled by setting environment variable " << FLAG_DISABLE_ARCH_CHECK << "=1."
+ << std::endl;
+ std::exit(1);
+ }
+ }
+ DLA_LOG("Runtime arch check passed.\n");
+ } else {
+ DLA_ERROR(
+ "Environment variable %s is set to 1; "
+ "architecture check will be skipped. "
+ "This might cause undefined behavior including hanging, "
+ "and the user should only disable the check if "
+ "they understand the potential consequences.\n",
+ FLAG_DISABLE_ARCH_CHECK);
+ }
+
+ if (get_env_var_wrapper(FLAG_DISABLE_VERSION_CHECK) != "1") {
+ DLA_LOG(
+ "Runtime build version check is enabled. "
+ "Check started...\n");
+ if (bitstream_build_version != compiledResult->get_build_version_string()) {
+ std::cerr << "Build version check failed:"
+ << "compiledResult build version is " << compiledResult->get_build_version_string()
+ << ", bitstream build version is " << bitstream_build_version << std::endl;
+
+ std::cerr << "This check can be disabled by setting environment variable " << FLAG_DISABLE_VERSION_CHECK << "=1."
+ << std::endl;
+
+ std::exit(1);
+ }
+ DLA_LOG("Runtime build version check passed.\n");
+ } else {
+ DLA_ERROR(
+ "Environment variable %s is set to 1; "
+ "build version check will be skipped. "
+ "This might cause undefined behavior including hanging, "
+ "and the user should only disable the check if "
+ "they understand the potential consequences.\n",
+ FLAG_DISABLE_VERSION_CHECK);
+ }
+
+ // Checks completed. Allocate buffers and write to DDR
+ intermediateBufferSizeDDR_ = compiledResult->get_conv_intermediate_size_in_bytes();
+ uint64_t totalConfigBytes = compiledResult->get_ddrfree_header().enable_parameter_rom ?
+ 0 :
+ compiledResult->get_config_size_in_bytes();
+ auto &config_fbs_array = compiledResult->get_config_filter_bias_scale_array();
+ auto config_fbs_raw_array = compiledResult->get_ddrfree_header().enable_parameter_rom ?
+ nullptr :
+ config_fbs_array[0].data();
+ configFilterBiasBufferSizeDDR_ = compiledResult->get_ddrfree_header().enable_parameter_rom ?
+ 0 :
+ config_fbs_array[0].size();
+
+ // TODO: uncomment when buffer_t object is added
+ // assert(config_filter_bias_graph_buffer_size_ddr == config_filter_bias_buffer->size_in_bytes());
+ // Allocate graph buffer (config, filter, bias, io) in DDR
+ uint64_t inputSizeDDR = compiledResult->get_conv_input_size_in_bytes();
+ uint64_t outputSizeDDR = compiledResult->get_conv_output_size_in_bytes();
+
+ // DMA data path width in bytes for feature and filter data
+ // TODO: move this into the arch
+ constexpr uint64_t featureWordSize = 32;
+ constexpr uint64_t filterWordSize = 64;
+
+ // Sanity check that buffer sizes are sufficiently aligned to ensure address alignment.
+ // Input, output, and intermediate buffers contain feature words.
+ assert(inputSizeDDR % featureWordSize == 0);
+ assert(outputSizeDDR % featureWordSize == 0);
+ assert(intermediateBufferSizeDDR_ % featureWordSize == 0);
+ // filter contains filter words, and config must be padded to a filter word size
+ assert(totalConfigBytes % filterWordSize == 0);
+ assert(configFilterBiasBufferSizeDDR_ % filterWordSize == 0);
+
+ // Allocate the intermediate buffer.
+ ddrBufferAllocator_->AllocateSharedBuffer(intermediateBufferSizeDDR_, instance_);
+
+ // Allocate the input/output buffer.
+ // Output buffer must come immediately after the input buffer, so from an allocation perspective this is one buffer.
+ // Note there is an input/output buffer pair allocated for each pipeline. The input/output pair must be contiguous for
+ // each pipeline, but input/output pairs from different pipelines are allowed to have a gap. We could call the
+ // allocator for each input/output buffer pair, however because everything is sized and aligned to the feature word
+ // size, we won't get gaps between them due to alignment. Calling the allocator once per pipeline would result in the
+ // same allocation as calling the allocator just once and using offsets within this big buffer for each pipeline.
+ uint64_t inputOutputBufferSize = numPipelines * (inputSizeDDR + outputSizeDDR); // how much space to allocate
+ uint64_t inputOutputBufferAlignment = featureWordSize; // starting address must be aligned to this
+ uint64_t inputOutputBufferAddr; // where did the allocator place this buffer
+ ddrBufferAllocator_->AllocatePrivateBuffer(inputOutputBufferSize, inputOutputBufferAlignment, inputOutputBufferAddr);
+
+ // Allocate the config/filter buffer.
+ // Filter buffer must come immediately after the config buffer, so from an allocation perspective this is one buffer.
+ uint64_t configFilterBufferSize = configFilterBiasBufferSizeDDR_;
+ uint64_t configFilterBufferAlignment = filterWordSize;
+ uint64_t configFilterBufferAddr;
+ ddrBufferAllocator_->AllocatePrivateBuffer(
+ configFilterBufferSize, configFilterBufferAlignment, configFilterBufferAddr);
+
+ // Print the allocation results
+ bool print_allocation_result = getenv("COREDLA_RUNTIME_DEBUG") != nullptr;
+ ios_base::fmtflags coutFlags = cout.flags(); // printing in both decimal and hex, save cout state to undo it later
+ if (print_allocation_result) {
+ DLA_LOG("FPGA DDR allocation results\n");
+ // Intermediate buffer address is hardcoded to 0 in device_memory_allocator.cpp, don't bother printing this
+ DLA_LOG(" Config buffer is at address %" PRIu64, configFilterBufferAddr);
+ DLA_LOG(" (%#" PRIx64 ")\n", configFilterBufferAddr);
+ const uint64_t filter_buffer_address = configFilterBufferAddr + totalConfigBytes;
+ DLA_LOG(" Filter/bias/scale buffer is at address %" PRIu64, filter_buffer_address);
+ DLA_LOG(" (%#" PRIx64 ")\n", filter_buffer_address);
+ }
+
+ const bool enable_istream = compiledResult->get_input_configuration().begin()->second.enable_input_streaming;
+ const bool enable_ostream = compiledResult->get_output_configuration().output_streaming_enabled;
+
+ // Write graph buffer to DDR
+ if (!compiledResult->get_ddrfree_header().enable_parameter_rom) {
+ mmdWrapper_->WriteToDDR(instance_, configFilterBufferAddr, configFilterBiasBufferSizeDDR_, config_fbs_raw_array);
+ } else {
+ DLA_LOG(" Ddrfree graph constants are not written to DDR.\n");
+ }
+
+ for (uint64_t i = 0; i < numPipelines; i++) {
+ uint64_t inputAddrDDR = inputOutputBufferAddr + i * (inputSizeDDR + outputSizeDDR);
+ uint64_t outputAddrDDR = inputAddrDDR + inputSizeDDR;
+ if (print_allocation_result) {
+ DLA_LOG(" Input buffer %" PRIu64 " is at address %" PRIu64, i, inputAddrDDR);
+ DLA_LOG(" (%#" PRIx64 ")\n", inputAddrDDR);
+ DLA_LOG(" Output buffer %" PRIu64 " is at address %" PRIu64, i, outputAddrDDR);
+ DLA_LOG(" (%#" PRIx64 ")\n", outputAddrDDR);
+ }
+ batchJobs_.push_back(move(CoreDlaBatchJob::MakeUnique(mmdWrapper_,
+ totalConfigBytes,
+ configFilterBufferAddr,
+ inputAddrDDR,
+ outputAddrDDR,
+ inputSizeDDR,
+ outputSizeDDR,
+ enable_istream,
+ enable_ostream,
+ instance_,
+ spStreamControllerComms)));
+ }
+ cout.flags(coutFlags); // restore the state of cout
+}
+
+BatchJob *CoreDlaGraphJob::GetBatchJob() {
+ graphJobMutex.lock();
+ if (batchJobsRequested_ >= batchJobs_.size()) {
+ graphJobMutex.unlock();
+ return nullptr;
+ }
+ auto *batchJob = batchJobs_[batchJobsRequested_].get();
+ batchJobsRequested_++;
+ graphJobMutex.unlock();
+ return batchJob;
+}
diff --git a/python/openvino/runtime/coredla_device/src/device_memory_allocator.cpp b/python/openvino/runtime/coredla_device/src/device_memory_allocator.cpp
new file mode 100644
index 0000000..48844f4
--- /dev/null
+++ b/python/openvino/runtime/coredla_device/src/device_memory_allocator.cpp
@@ -0,0 +1,80 @@
+// Copyright 2020 Intel Corporation.
+//
+// This software and the related documents are Intel copyrighted materials,
+// and your use of them is governed by the express license under which they
+// were provided to you ("License"). Unless the License provides otherwise,
+// you may not use, modify, copy, publish, distribute, disclose or transmit
+// this software or the related documents without Intel's prior written
+// permission.
+//
+// This software and the related documents are provided as is, with no express
+// or implied warranties, other than those that are expressly stated in the
+// License.
+
+#include "device_memory_allocator.h" //DeviceMemoryAllocator
+#include "dla_dma_constants.h" //DLA_DMA_CSR_OFFSET_***
+
+#include <stdexcept> //std::runtime_error
+#include <string> //std::string
+
+void DeviceMemoryAllocator::Initialize(uint64_t totalSize, MmdWrapper* mmdWrapper) {
+ totalGlobalMemSize_ = totalSize;
+ mmdWrapper_ = mmdWrapper;
+ currentIntermediateMaxBufferSizeAllocated_ = 0;
+ currentStartAddressGraphBufferSpace_ = totalSize;
+}
+
+// The intermediate buffer is shared among all graphs. It gets placed at the lowest address
+// and grows upwards (if a new graph is added which needs a bigger intermediate buffer).
+void DeviceMemoryAllocator::AllocateSharedBuffer(uint64_t bufferSize, int instance) {
+ if (bufferSize > currentIntermediateMaxBufferSizeAllocated_) {
+ currentIntermediateMaxBufferSizeAllocated_ = bufferSize;
+
+ // error intermediate buffer grows into the region of memory used for private buffers
+ if (currentIntermediateMaxBufferSizeAllocated_ > currentStartAddressGraphBufferSpace_) {
+ std::string msg = "FPGA DDR allocation failed, intermediate buffer grew upwards to " +
+ std::to_string(currentIntermediateMaxBufferSizeAllocated_) +
+ ", remaining unallocated space is limited to " +
+ std::to_string(currentStartAddressGraphBufferSpace_);
+ throw std::runtime_error(msg);
+ }
+
+ // tell the fpga where the intermediate buffer is located. At address 0 now. Will change in future with multiple
+ // pe_arrays
+ mmdWrapper_->WriteToCsr(instance, DLA_DMA_CSR_OFFSET_INTERMEDIATE_BASE_ADDR, 0);
+ }
+}
+
+// The config, filter, input, and output buffers are specific to a graph and therefore require
+// their own space in device memory. Note that filter must come immediately after config, so the
+// allocator allocates both of these together as one buffer. Likewise output must come immediately
+// after input. Private buffers are allocated from the highest to lowest address since the size is
+// known at allocation time. Hardware requires the address to have some alignment, which is
+// specified by the bufferAlignment argument.
+void DeviceMemoryAllocator::AllocatePrivateBuffer(uint64_t bufferSize, uint64_t bufferAlignment, uint64_t& bufferAddr) {
+ uint64_t maxInflatedBufferSize = bufferSize + bufferAlignment; // be conservative for how much space buffer may take
+
+ // error if the graph does not fit in fpga ddr
+ if (currentIntermediateMaxBufferSizeAllocated_ + maxInflatedBufferSize > currentStartAddressGraphBufferSpace_) {
+ std::string msg =
+ "FPGA DDR allocation failed, allocating buffer of size " + std::to_string(maxInflatedBufferSize) +
+ " exceeds the remaining space available of size " +
+ std::to_string(currentStartAddressGraphBufferSpace_ - currentIntermediateMaxBufferSizeAllocated_) +
+ ". This could be caused by the graph being too large or splitting the graph into too many subgraphs. " +
+ "Memory requirements for large graphs can be reduced by selecting different folding options, " +
+ "reducing batch size or selecting architectures with less padding.";
+ throw std::runtime_error(msg);
+ }
+
+ currentStartAddressGraphBufferSpace_ -= bufferSize; // allocate from highest to lowest address
+ currentStartAddressGraphBufferSpace_ -=
+ (currentStartAddressGraphBufferSpace_ % bufferAlignment); // correct for alignment
+ bufferAddr = currentStartAddressGraphBufferSpace_;
+}
+
+void DeviceMemoryAllocator::Clear() {
+ currentIntermediateMaxBufferSizeAllocated_ = 0;
+ currentStartAddressGraphBufferSpace_ = totalGlobalMemSize_;
+}
+
+DeviceMemoryAllocator::~DeviceMemoryAllocator() { Clear(); }
diff --git a/python/openvino/runtime/coredla_device/src/mmd_wrapper.cpp b/python/openvino/runtime/coredla_device/src/mmd_wrapper.cpp
new file mode 100644
index 0000000..bbb052a
--- /dev/null
+++ b/python/openvino/runtime/coredla_device/src/mmd_wrapper.cpp
@@ -0,0 +1,172 @@
+// Copyright 2020-2023 Intel Corporation.
+//
+// This software and the related documents are Intel copyrighted materials,
+// and your use of them is governed by the express license under which they
+// were provided to you ("License"). Unless the License provides otherwise,
+// you may not use, modify, copy, publish, distribute, disclose or transmit
+// this software or the related documents without Intel's prior written
+// permission.
+//
+// This software and the related documents are provided as is, with no express
+// or implied warranties, other than those that are expressly stated in the
+// License.
+
+#include "mmd_wrapper.h"
+#include "aocl_mmd.h" // aocl_mmd_***
+#include "dla_dma_constants.h" // DLA_DMA_CSR_OFFSET_***
+
+#include <cassert> // assert
+#include <cstddef> // size_t
+#include <iostream> // std::cerr
+#include <stdexcept> // std::runtime_error
+#include <string> // std::string
+
+// All board variants must obey the CoreDLA CSR spec, which says that all access must be
+// - 32 bits in size
+// - address must be 4 byte aligned
+// - within the address range, CSR size is 2048 bytes
+constexpr uint64_t DLA_CSR_ALIGNMENT = 4;
+constexpr uint64_t DLA_CSR_SIZE = 2048;
+
+// assert(status == 0) is removed by the c++ processor when compiling in release mode
+// this is a handy workaround for suppressing the compiler warning about an unused variable
+template <class T>
+void suppress_warning_unused_varible(const T &) {}
+
+MmdWrapper::MmdWrapper() {
+ // Open the MMD
+ constexpr size_t MAX_BOARD_NAMES_LEN = 4096;
+ char name[MAX_BOARD_NAMES_LEN];
+ size_t sz;
+ int status = aocl_mmd_get_offline_info(AOCL_MMD_BOARD_NAMES, MAX_BOARD_NAMES_LEN, name, &sz);
+ if (status) {
+ std::string msg = "Failed to query a board name from MMD. Perhaps no FPGA device is available?";
+ throw std::runtime_error(msg);
+ }
+ int handle = aocl_mmd_open(name);
+ if (handle < 0) {
+ std::string msg = "Failed to open MMD";
+ throw std::runtime_error(msg);
+ }
+ handle_ = handle;
+
+ // Query some board-specific information from the MMD. Some values can be hardcoded constants
+ // where different boards have different constants, e.g. capacity of FPGA DDR. Others values may
+ // be determined experimentally e.g. start and stop a counter with a known duration in between to
+ // measure the clk_dla frequency.
+ maxInstances_ = dla_mmd_get_max_num_instances();
+ ddrSizePerInstance_ = dla_mmd_get_ddr_size_per_instance();
+ coreDlaClockFreq_ = dla_mmd_get_coredla_clock_freq(handle_);
+
+ // On DE10 Agilex boards with GCC 8.3.0, we noticed that the clock frequency was being read as 0,
+ // around 50% of the time, and around 10% of the time on GCC 9.2.0, causing failures on perf_est
+ // tests. This retry loop will recall the function until the coreDlaClockFreq is non zero, or
+ // it exhausts 10 retries.
+ // We have no idea why this happens currently, but it typically passes by the second try.
+ int clockFreqRetries = 10;
+ while (coreDlaClockFreq_ == 0 && clockFreqRetries > 0) {
+ coreDlaClockFreq_ = dla_mmd_get_coredla_clock_freq(handle_);
+ clockFreqRetries--;
+ }
+ ddrClockFreq_ = dla_mmd_get_ddr_clock_freq();
+}
+
+MmdWrapper::~MmdWrapper() {
+ // Close the MMD
+ int status = aocl_mmd_close(handle_);
+ if (status) {
+ // Avoid throwning an exception from a Destructor. We are ultimately
+ // part of a (virtual) OpenVINO destructor, so we should follow the
+ // noexcept(true) that it advertises. Perhaps we can close the mmd
+ // as a separate step prior to destruction to make signaling errors
+ // easier?
+ std::cerr << "Failed to close MMD" << std::endl;
+ std::cerr << "Error status " << status << std::endl;
+ std::exit(1);
+ }
+}
+
+void MmdWrapper::RegisterISR(interrupt_service_routine_signature func, void *data) const {
+ // register an interrupt handler
+ int status = aocl_mmd_set_interrupt_handler(handle_, func, data);
+ if (status) {
+ std::string msg = "Failed to register an interrupt handler with MMD";
+ throw std::runtime_error(msg);
+ }
+}
+
+void MmdWrapper::WriteToCsr(int instance, uint32_t addr, uint32_t data) const {
+ assert(instance >= 0 && instance < maxInstances_);
+ assert(addr + sizeof(uint32_t) <= DLA_CSR_SIZE);
+ assert(addr % DLA_CSR_ALIGNMENT == 0);
+ int status = dla_mmd_csr_write(handle_, instance, addr, &data);
+ assert(status == 0);
+ suppress_warning_unused_varible(status);
+}
+
+uint32_t MmdWrapper::ReadFromCsr(int instance, uint32_t addr) const {
+ assert(instance >= 0 && instance < maxInstances_);
+ assert(addr + sizeof(uint32_t) <= DLA_CSR_SIZE);
+ assert(addr % DLA_CSR_ALIGNMENT == 0);
+ uint32_t data;
+ int status = dla_mmd_csr_read(handle_, instance, addr, &data);
+ assert(status == 0);
+ suppress_warning_unused_varible(status);
+ return data;
+}
+
+void MmdWrapper::WriteToDDR(int instance, uint64_t addr, uint64_t length, const void *data) const {
+ assert(instance >= 0 && instance < maxInstances_);
+ assert(addr + length <= ddrSizePerInstance_);
+ int status = dla_mmd_ddr_write(handle_, instance, addr, length, data);
+ assert(status == 0);
+ suppress_warning_unused_varible(status);
+}
+
+void MmdWrapper::ReadFromDDR(int instance, uint64_t addr, uint64_t length, void *data) const {
+ assert(instance >= 0 && instance < maxInstances_);
+ assert(addr + length <= ddrSizePerInstance_);
+ int status = dla_mmd_ddr_read(handle_, instance, addr, length, data);
+ assert(status == 0);
+ suppress_warning_unused_varible(status);
+}
+
+#ifndef STREAM_CONTROLLER_ACCESS
+// Stream controller access is not supported by the platform abstraction
+bool MmdWrapper::bIsStreamControllerValid(int instance) const { return false; }
+
+// 32-bit handshake with each Stream Controller CSR
+void MmdWrapper::WriteToStreamController(int instance, uint32_t addr, uint64_t length, const void *data) const {
+ assert(false);
+}
+
+void MmdWrapper::ReadFromStreamController(int instance, uint32_t addr, uint64_t length, void *data) const {
+ assert(false);
+}
+#else
+// If the mmd layer supports accesses to the Stream Controller
+bool MmdWrapper::bIsStreamControllerValid(int instance) const {
+ assert(instance >= 0 && instance < maxInstances_);
+ bool status = dla_is_stream_controller_valid(handle_, instance);
+ return status;
+}
+
+// 32-bit handshake with each Stream Controller CSR
+void MmdWrapper::WriteToStreamController(int instance, uint32_t addr, uint64_t length, const void *data) const {
+ assert(instance >= 0 && instance < maxInstances_);
+ assert(addr % sizeof(uint32_t) == 0);
+ assert(length % sizeof(uint32_t) == 0);
+ int status = dla_mmd_stream_controller_write(handle_, instance, addr, length, data);
+ assert(status == 0);
+ suppress_warning_unused_varible(status);
+}
+
+void MmdWrapper::ReadFromStreamController(int instance, uint32_t addr, uint64_t length, void *data) const {
+ assert(instance >= 0 && instance < maxInstances_);
+ assert(addr % sizeof(uint32_t) == 0);
+ assert(length % sizeof(uint32_t) == 0);
+ int status = dla_mmd_stream_controller_read(handle_, instance, addr, length, data);
+ assert(status == 0);
+ suppress_warning_unused_varible(status);
+}
+#endif
diff --git a/python/openvino/runtime/coredla_device/src/stream_controller_comms.cpp b/python/openvino/runtime/coredla_device/src/stream_controller_comms.cpp
new file mode 100644
index 0000000..677f6e4
--- /dev/null
+++ b/python/openvino/runtime/coredla_device/src/stream_controller_comms.cpp
@@ -0,0 +1,274 @@
+// Copyright 2023 Intel Corporation.
+//
+// This software and the related documents are Intel copyrighted materials,
+// and your use of them is governed by the express license under which they
+// were provided to you ("License"). Unless the License provides otherwise,
+// you may not use, modify, copy, publish, distribute, disclose or transmit
+// this software or the related documents without Intel's prior written
+// permission.
+//
+// This software and the related documents are provided as is, with no express
+// or implied warranties, other than those that are expressly stated in the
+// License.
+
+#include "stream_controller_comms.h"
+#include <chrono>
+#include <cstring>
+#include <iostream>
+#include <sstream>
+#include <thread>
+
+// StreamControllerComms provides an interface to the Stream Controller
+// microcode running in the NIOS-V
+
+static const uint32_t messageReadyMagicNumber = 0x55225522;
+static constexpr uint32_t mailboxRamSize = 0x1000;
+
+StreamControllerComms::StreamControllerComms() {}
+
+bool StreamControllerComms::IsPresent() {
+ // Check there is an interface to the stream controller
+ if (!_mmdWrapper.bIsStreamControllerValid(_streamControllerInstance)) {
+ return false;
+ }
+
+ // Check that the stream controller responds
+ bool isPresent = Ping();
+ return isPresent;
+}
+
+// Query for the current status
+Payload<StatusMessagePayload> StreamControllerComms::GetStatus() {
+ BusyCheck busyCheck(_busyFlag);
+ if (!busyCheck) {
+ return {};
+ }
+
+ if (SendMessage(MessageType_GetStatus)) {
+ if (ReceiveMessage() == MessageType_Status) {
+ return _receivedStatusMessage;
+ }
+ }
+
+ return {};
+}
+
+// Schedule an inference request with the stream controller
+bool StreamControllerComms::ScheduleItems(std::vector<Payload<CoreDlaJobPayload>> items) {
+ BusyCheck busyCheck(_busyFlag);
+ if (!busyCheck) {
+ return false;
+ }
+
+ bool status = true;
+
+ for (auto& job : items) {
+ bool thisJobStatus = false;
+
+ if (SendMessage(MessageType_ScheduleItem, job.GetPayload(), job.GetSize())) {
+ if (ReceiveMessage() == MessageType_NoOperation) {
+ thisJobStatus = true;
+ }
+ }
+
+ if (!thisJobStatus) {
+ status = false;
+ }
+ }
+
+ return status;
+}
+
+// Send a ping command to the stream controller and wait for a pong
+// response.
+bool StreamControllerComms::Ping() {
+ BusyCheck busyCheck(_busyFlag);
+ if (!busyCheck) {
+ return false;
+ }
+
+ if (SendMessage(MessageType_Ping)) {
+ return (ReceiveMessage() == MessageType_Pong);
+ }
+
+ return false;
+}
+
+// Initialize and reset the stream controller
+//
+// sourceBufferSize:
+// The size of the MSGDMA buffers that the stream
+// controller will receive from the layout transform
+// dropSourceBuffers:
+// How many source buffers to drop between each
+// processed one. 0 by default unless set in the configuration
+// by the app with DLIAPlugin::properties::streaming_drop_source_buffers.name()
+// numInferenceRequest:
+// A constant value set in the executable network. The
+// stream controller will start executing once it has
+// received this number of inference requests from OpenVINO
+bool StreamControllerComms::Initialize(uint32_t sourceBufferSize,
+ uint32_t dropSourceBuffers,
+ uint32_t numInferenceRequests) {
+ BusyCheck busyCheck(_busyFlag);
+ if (!busyCheck) {
+ return false;
+ }
+
+ Payload<InitializeStreamControllerPayload> initializePayload{};
+ initializePayload._sourceBufferSize = sourceBufferSize;
+ initializePayload._dropSourceBuffers = dropSourceBuffers;
+ initializePayload._numInferenceRequests = numInferenceRequests;
+
+ if (SendMessage(
+ MessageType_InitializeStreamController, initializePayload.GetPayload(), initializePayload.GetSize())) {
+ if (ReceiveMessage() == MessageType_NoOperation) {
+ return true;
+ }
+ }
+
+ return false;
+}
+
+// Receive a message from the stream controller by reading from the
+// mailbox memory until the magic number is set to indicate a message is ready.
+// Only the Status return message has a payload
+MessageType StreamControllerComms::ReceiveMessage() {
+ uint32_t receiveMessageOffset = mailboxRamSize / 2;
+ MessageHeader* pReceiveMessage = nullptr;
+ uint32_t messageReadyMagicNumberOffset = receiveMessageOffset;
+ uint32_t payloadOffset = static_cast<uint32_t>(receiveMessageOffset + (size_t)&pReceiveMessage->_payload);
+ uint32_t waitCount = 0;
+
+ while (waitCount < 100) {
+ MessageHeader messageHeader;
+ _mmdWrapper.ReadFromStreamController(
+ _streamControllerInstance, receiveMessageOffset, sizeof(messageHeader), &messageHeader);
+ if (messageHeader._messageReadyMagicNumber == messageReadyMagicNumber) {
+ MessageType messageType = static_cast<MessageType>(messageHeader._messageType);
+ uint32_t sequenceId = messageHeader._sequenceID;
+
+ bool ok = false;
+
+ if (messageType == MessageType_Status) {
+ ok = StatusMessageHandler(payloadOffset);
+ } else if (messageType == MessageType_Pong) {
+ ok = true;
+ }
+
+ if (!ok) {
+ _numBadMessages++;
+ }
+
+ _mmdWrapper.WriteToStreamController(
+ _streamControllerInstance, messageReadyMagicNumberOffset, sizeof(sequenceId), &sequenceId);
+ _lastReceiveSequenceID = sequenceId;
+ return messageType;
+ }
+
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ waitCount++;
+ }
+
+ return MessageType_Invalid;
+}
+
+// Send a message to the stream controller by writing to the mailbox memory,
+// and wait for the message to be received/processed
+bool StreamControllerComms::SendMessage(MessageType messageType, void* pPayload, size_t payloadSize) {
+ uint32_t sendMessageOffset = 0;
+ MessageHeader* pSendMessage = nullptr;
+ uint32_t messageReadyMagicNumberOffset = 0;
+ uint32_t messageTypeOffset = static_cast<uint32_t>((size_t)&pSendMessage->_messageType);
+ uint32_t sequenceIDOffset = static_cast<uint32_t>((size_t)&pSendMessage->_sequenceID);
+ uint32_t payloadOffset = static_cast<uint32_t>((size_t)&pSendMessage->_payload);
+
+ uint32_t uintMessageType = static_cast<uint32_t>(messageType);
+
+ _mmdWrapper.WriteToStreamController(
+ _streamControllerInstance, messageTypeOffset, sizeof(uintMessageType), &uintMessageType);
+ _mmdWrapper.WriteToStreamController(
+ _streamControllerInstance, sequenceIDOffset, sizeof(_sendSequenceID), &_sendSequenceID);
+
+ if (payloadSize > 0) {
+ _mmdWrapper.WriteToStreamController(_streamControllerInstance, payloadOffset, payloadSize, pPayload);
+ }
+
+ // Signal the message as ready
+ _mmdWrapper.WriteToStreamController(_streamControllerInstance,
+ messageReadyMagicNumberOffset,
+ sizeof(messageReadyMagicNumber),
+ &messageReadyMagicNumber);
+
+ // Wait until the message has been processed by looking for the sequence ID
+ // in the magic number position
+ uint32_t waitCount = 0;
+ while (waitCount < 100) {
+ MessageHeader messageHeader;
+ _mmdWrapper.ReadFromStreamController(
+ _streamControllerInstance, sendMessageOffset, sizeof(messageHeader), &messageHeader);
+
+ if (messageHeader._messageReadyMagicNumber == _sendSequenceID) {
+ _sendSequenceID++;
+ return true;
+ }
+
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ waitCount++;
+ }
+
+ return false;
+}
+
+// Read the status message payload
+bool StreamControllerComms::StatusMessageHandler(uint32_t payloadOffset) {
+ _mmdWrapper.ReadFromStreamController(
+ _streamControllerInstance, payloadOffset, sizeof(_receivedStatusMessage), &_receivedStatusMessage);
+ return true;
+}
+
+// Parse the status message payload into a string
+std::string StreamControllerComms::GetStatusString(Payload<StatusMessagePayload>& statusPayload) {
+ std::ostringstream stringStream;
+ stringStream << static_cast<uint32_t>(statusPayload._status) << "," << statusPayload._statusLineNumber << ","
+ << statusPayload._numReceivedSourceBuffers << "," << statusPayload._numScheduledInferences << ","
+ << statusPayload._numExecutedJobs;
+ return stringStream.str();
+}
+
+///////////////////////////////////////////////////////////////////////////////
+
+// BusyFlag is used to prevent concurrent access to the stream controller,
+// without holding a mutex when sending/receiving commands
+using LockGuard = std::lock_guard<std::recursive_mutex>;
+
+bool BusyFlag::Lock() {
+ LockGuard lock(_mutex);
+ if (_busy) {
+ return false;
+ }
+
+ _busy = true;
+ return true;
+}
+
+void BusyFlag::Release() {
+ LockGuard lock(_mutex);
+ _busy = false;
+}
+
+BusyCheck::BusyCheck(BusyFlag& busyFlag) : _busyFlag(busyFlag), _haveLocked(false) {}
+
+BusyCheck::~BusyCheck() {
+ if (_haveLocked) {
+ _busyFlag.Release();
+ }
+}
+
+BusyCheck::operator bool() {
+ bool locked = _busyFlag.Lock();
+ if (locked) {
+ _haveLocked = true;
+ }
+ return locked;
+}