1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
|
// Copyright 2023 Intel Corporation.
//
// This software and the related documents are Intel copyrighted materials,
// and your use of them is governed by the express license under which they
// were provided to you ("License"). Unless the License provides otherwise,
// you may not use, modify, copy, publish, distribute, disclose or transmit
// this software or the related documents without Intel's prior written
// permission.
//
// This software and the related documents are provided as is, with no express
// or implied warranties, other than those that are expressly stated in the
// License.
#include "stream_controller_comms.h"
#include <chrono>
#include <cstring>
#include <iostream>
#include <sstream>
#include <thread>
// StreamControllerComms provides an interface to the Stream Controller
// microcode running in the NIOS-V
static const uint32_t messageReadyMagicNumber = 0x55225522;
static constexpr uint32_t mailboxRamSize = 0x1000;
StreamControllerComms::StreamControllerComms() {}
bool StreamControllerComms::IsPresent() {
// Check there is an interface to the stream controller
if (!_mmdWrapper.bIsStreamControllerValid(_streamControllerInstance)) {
return false;
}
// Check that the stream controller responds
bool isPresent = Ping();
return isPresent;
}
// Query for the current status
Payload<StatusMessagePayload> StreamControllerComms::GetStatus() {
BusyCheck busyCheck(_busyFlag);
if (!busyCheck) {
return {};
}
if (SendMessage(MessageType_GetStatus)) {
if (ReceiveMessage() == MessageType_Status) {
return _receivedStatusMessage;
}
}
return {};
}
// Schedule an inference request with the stream controller
bool StreamControllerComms::ScheduleItems(std::vector<Payload<CoreDlaJobPayload>> items) {
BusyCheck busyCheck(_busyFlag);
if (!busyCheck) {
return false;
}
bool status = true;
for (auto& job : items) {
bool thisJobStatus = false;
if (SendMessage(MessageType_ScheduleItem, job.GetPayload(), job.GetSize())) {
if (ReceiveMessage() == MessageType_NoOperation) {
thisJobStatus = true;
}
}
if (!thisJobStatus) {
status = false;
}
}
return status;
}
// Send a ping command to the stream controller and wait for a pong
// response.
bool StreamControllerComms::Ping() {
BusyCheck busyCheck(_busyFlag);
if (!busyCheck) {
return false;
}
if (SendMessage(MessageType_Ping)) {
return (ReceiveMessage() == MessageType_Pong);
}
return false;
}
// Initialize and reset the stream controller
//
// sourceBufferSize:
// The size of the MSGDMA buffers that the stream
// controller will receive from the layout transform
// dropSourceBuffers:
// How many source buffers to drop between each
// processed one. 0 by default unless set in the configuration
// by the app with DLIAPlugin::properties::streaming_drop_source_buffers.name()
// numInferenceRequest:
// A constant value set in the executable network. The
// stream controller will start executing once it has
// received this number of inference requests from OpenVINO
bool StreamControllerComms::Initialize(uint32_t sourceBufferSize,
uint32_t dropSourceBuffers,
uint32_t numInferenceRequests) {
BusyCheck busyCheck(_busyFlag);
if (!busyCheck) {
return false;
}
Payload<InitializeStreamControllerPayload> initializePayload{};
initializePayload._sourceBufferSize = sourceBufferSize;
initializePayload._dropSourceBuffers = dropSourceBuffers;
initializePayload._numInferenceRequests = numInferenceRequests;
if (SendMessage(
MessageType_InitializeStreamController, initializePayload.GetPayload(), initializePayload.GetSize())) {
if (ReceiveMessage() == MessageType_NoOperation) {
return true;
}
}
return false;
}
// Receive a message from the stream controller by reading from the
// mailbox memory until the magic number is set to indicate a message is ready.
// Only the Status return message has a payload
MessageType StreamControllerComms::ReceiveMessage() {
uint32_t receiveMessageOffset = mailboxRamSize / 2;
MessageHeader* pReceiveMessage = nullptr;
uint32_t messageReadyMagicNumberOffset = receiveMessageOffset;
uint32_t payloadOffset = static_cast<uint32_t>(receiveMessageOffset + (size_t)&pReceiveMessage->_payload);
uint32_t waitCount = 0;
while (waitCount < 100) {
MessageHeader messageHeader;
_mmdWrapper.ReadFromStreamController(
_streamControllerInstance, receiveMessageOffset, sizeof(messageHeader), &messageHeader);
if (messageHeader._messageReadyMagicNumber == messageReadyMagicNumber) {
MessageType messageType = static_cast<MessageType>(messageHeader._messageType);
uint32_t sequenceId = messageHeader._sequenceID;
bool ok = false;
if (messageType == MessageType_Status) {
ok = StatusMessageHandler(payloadOffset);
} else if (messageType == MessageType_Pong) {
ok = true;
}
if (!ok) {
_numBadMessages++;
}
_mmdWrapper.WriteToStreamController(
_streamControllerInstance, messageReadyMagicNumberOffset, sizeof(sequenceId), &sequenceId);
_lastReceiveSequenceID = sequenceId;
return messageType;
}
std::this_thread::sleep_for(std::chrono::milliseconds(1));
waitCount++;
}
return MessageType_Invalid;
}
// Send a message to the stream controller by writing to the mailbox memory,
// and wait for the message to be received/processed
bool StreamControllerComms::SendMessage(MessageType messageType, void* pPayload, size_t payloadSize) {
uint32_t sendMessageOffset = 0;
MessageHeader* pSendMessage = nullptr;
uint32_t messageReadyMagicNumberOffset = 0;
uint32_t messageTypeOffset = static_cast<uint32_t>((size_t)&pSendMessage->_messageType);
uint32_t sequenceIDOffset = static_cast<uint32_t>((size_t)&pSendMessage->_sequenceID);
uint32_t payloadOffset = static_cast<uint32_t>((size_t)&pSendMessage->_payload);
uint32_t uintMessageType = static_cast<uint32_t>(messageType);
_mmdWrapper.WriteToStreamController(
_streamControllerInstance, messageTypeOffset, sizeof(uintMessageType), &uintMessageType);
_mmdWrapper.WriteToStreamController(
_streamControllerInstance, sequenceIDOffset, sizeof(_sendSequenceID), &_sendSequenceID);
if (payloadSize > 0) {
_mmdWrapper.WriteToStreamController(_streamControllerInstance, payloadOffset, payloadSize, pPayload);
}
// Signal the message as ready
_mmdWrapper.WriteToStreamController(_streamControllerInstance,
messageReadyMagicNumberOffset,
sizeof(messageReadyMagicNumber),
&messageReadyMagicNumber);
// Wait until the message has been processed by looking for the sequence ID
// in the magic number position
uint32_t waitCount = 0;
while (waitCount < 100) {
MessageHeader messageHeader;
_mmdWrapper.ReadFromStreamController(
_streamControllerInstance, sendMessageOffset, sizeof(messageHeader), &messageHeader);
if (messageHeader._messageReadyMagicNumber == _sendSequenceID) {
_sendSequenceID++;
return true;
}
std::this_thread::sleep_for(std::chrono::milliseconds(1));
waitCount++;
}
return false;
}
// Read the status message payload
bool StreamControllerComms::StatusMessageHandler(uint32_t payloadOffset) {
_mmdWrapper.ReadFromStreamController(
_streamControllerInstance, payloadOffset, sizeof(_receivedStatusMessage), &_receivedStatusMessage);
return true;
}
// Parse the status message payload into a string
std::string StreamControllerComms::GetStatusString(Payload<StatusMessagePayload>& statusPayload) {
std::ostringstream stringStream;
stringStream << static_cast<uint32_t>(statusPayload._status) << "," << statusPayload._statusLineNumber << ","
<< statusPayload._numReceivedSourceBuffers << "," << statusPayload._numScheduledInferences << ","
<< statusPayload._numExecutedJobs;
return stringStream.str();
}
///////////////////////////////////////////////////////////////////////////////
// BusyFlag is used to prevent concurrent access to the stream controller,
// without holding a mutex when sending/receiving commands
using LockGuard = std::lock_guard<std::recursive_mutex>;
bool BusyFlag::Lock() {
LockGuard lock(_mutex);
if (_busy) {
return false;
}
_busy = true;
return true;
}
void BusyFlag::Release() {
LockGuard lock(_mutex);
_busy = false;
}
BusyCheck::BusyCheck(BusyFlag& busyFlag) : _busyFlag(busyFlag), _haveLocked(false) {}
BusyCheck::~BusyCheck() {
if (_haveLocked) {
_busyFlag.Release();
}
}
BusyCheck::operator bool() {
bool locked = _busyFlag.Lock();
if (locked) {
_haveLocked = true;
}
return locked;
}
|