MIRA
Writing a Unit


Writing a unit

In the following tutorial we will go through the process of creating Units. While we proceed we will add more functionality and complexity to the units learning new concepts and ways of how to use MIRA. During this tutorial you will learn the following things:

We will start with a unit that only consumes data from channels and is therefore operating in a passive, reactive way. Units that are reactive (that have no worker thread and do not perform cyclic operations) can be implemented using MicroUnit as base class. Units that are active (that do have a worker thread and perform cyclic operations) can be implemented using the Unit base class. We will see the internal differences later.

The specification of our first unit would be as follows:

Since we do not actively produce new data we can use the MicroUnit class as base class.

Step 1 - Create the micro unit

You can either create a domain and a micro unit yourself manually or use MIRAWizard (see the QuickStart for a howto). The latter approach is strongly advised. Lets call our micro unit "MeanCalculator" After you have created your micro unit you should have a source file that contains something like:

#include <fw/MicroUnit.h>
using namespace mira;
class MeanCalculator : public MicroUnit
{
MIRA_OBJECT(MeanCalculator)
public:
virtual void initialize()
{
}
};

If your code looks different don't worry. You can see that we only need a single include and derive our class from MicroUnit. The MIRA_OBJECT macro must be used in all classes that directly or indirectly derive from mira::Object and can be created using the class factory. The macro creates some static code in your class used by that factory. We only need one method yet - initialize() we get to it shortly. The last macro MIRA_CLASS_SERIALIZATION tells the serialization framework as well as the class factory that our MeanCalculator is derived from MicroUnit.

If you have used mirawizard to create your unit you will already have a CMakeLists.txt for your domain. It should look like:

MIRA_REQUIRE_PACKAGE(MIRAFramework)
MIRA_ADD_LIBRARY(MeanCalculator
SHARED
SOURCE
src/MeanCalculator.C
LINK_LIBS
MIRAFramework
)

This is all you need to build a library called 'MeanCalculator' that contains your unit.

Step 2 - Subscribe to a channel

In order to receive data updates from a channel we need to subscribe on that channel with a callback that gets called every time new data is available. We use the initialize() function to do that because this function is called after the unit was created and before it gets started.

virtual void initialize()
{
subscribe<float>("FloatChannel", &MeanCalculator::onNewData);
}

We also need to add the onNewData callback.

void onNewData(ChannelRead<float> data)
{
}

The onNewData method is now called every time new data is available in "FloatChannel". The parameter to this function contains the read locked data of the latest update.

Step 3 - Calculate the mean

Now we can calculate the mean value. We will do this by using a queue collecting up to 10 values. When we have reached 10 values we will throw away the oldest and calculate the mean.

#include <queue>
...
void onNewData(ChannelRead<float> data)
{
mQueue.push_back(data->value());
if (mQueue.size() > 10)
mQueue.pop_front();
double sum = 0.0;
foreach(float f, mQueue)
sum += f;
float mean = (float)(sum / mQueue.size());
}
std::queue<float> mQueue;

Note that we use a double for sum to prevent an overflow.

Step 4 - Publish the mean

Now that we have used this rather stupid way of calculating the mean we can publish it in a channel for all the other modules interested in the mean of the 10 last float values.

virtual void initialize()
{
subscribe<float>("FloatChannel", &MeanCalculator::onNewData);
mMeanChannel = publish<float>("MeanChannel");
}
void onNewData(ChannelRead<float> data)
{
mQueue.push_back(data->value());
if (mQueue.size() > 10)
mQueue.pop_front();
double sum = 0.0;
foreach(float f, mQueue)
sum += f;
float mean = (float)(sum / mQueue.size());
ChannelWrite<float> w = mMeanChannel.write();
w->value() = mean;
}
std::queue<float> mQueue;
Channel<float> mMeanChannel;

Now every time we have computed a new mean we will write it to the channel "MeanChannel". We could use the much shorter

mMeanChannel.post(mean);

but note that this should only be used for data where copying is cheap.

Now the first end user uses our unit and comes to us and says: "It only uses the last 10 values for computing the mean. Can't we make this configurable somehow?"

And you reply with: "YES"

Step 5 - Adding parameters

First we need to decide if we only want to configure the size of our mean window once in a configuration file or if the user should be able to alter the value during runtime using a property editor. In our case the second case would be exactly what the user requested. All we need to do is to add a reflect method (if it's not already contained in your code) to our unit and use the new parameter "WindowSize" in our computation.

class MeanCalculator : public MicroUnit
{
MIRA_OBJECT(MeanCalculator)
public:
template<typename Reflector>
void reflect(Reflector& r)
{
r.property("WindowSize", mWindowSize,
"The size of the window for mean computation",
10);
}
void onNewData(ChannelRead<float> data)
{
mQueue.push_back(data->value());
while (mQueue.size() > mWindowSize)
mQueue.pop_front();
double sum = 0.0;
foreach(float f, mQueue)
sum += f;
float mean = (float)(sum / mQueue.size());
mMeanChannel.post(mean);
}
...
int mWindowSize;
}

Note that we also have replaced the if by an while to keep the size of the queue consistent to the window size. Now the user can alter the size of the window on the fly during runtime. The window size is by default 10 like before.

The next thing that bothers us is the use of the queue. Imagine an application that uses 1000 instances of our mean computing unit. Every unit would keep its own copies of the last N float values. In case of float values this should not be the resource killer but what about other data like images. Luckily the channel provides us with a feature that is exactly what we are looking for - a history.

Step 6 - Using the channel history

To use the channel history we have to change the type of our window size parameter from an int to a time duration.

template<typename Reflector>
void reflect(Reflector& r)
{
r.property("WindowSize", mWindowSize,
"The size of the window for mean computation",
Duration::seconds(1));
}
...
Duration mWindowSize;

Note that we also had to change the default value for our property. Now we will alter the call to subscribe by adding our window size as a parameter. This will tell the channel to keep an history of the data up to the requested time duration. Also we store a reference to the float channel for later.

{
mFloatChannel = subscribe<float>("FloatChannel", &MeanCalculator::onNewData, mWindowSize);
...
}
...
Channel<float> mFloatChannel;

The next change would be in the onNewData function. Here we will not only use the latest value of the channel but read the whole history up to a length of our window size.

void onNewData(ChannelRead<float> data)
{
ChannelReadInterval<float> interval = mFloatChannel.readInterval(data->timestamp - mWindowSize,
data->timestamp);
double sum = 0.0;
foreach(float f, interval)
sum += f;
float mean = (float)(sum / interval.size());
mMeanChannel.post(mean);
}

That was easy, huh? We are reading a history from our float channel ranging from the newest one back to the one that was put in the channel mWindowSize time before the latest update. The interval contains read locked data from the channel. The call to readInterval will ensure that if mWindowSize increases also the history of preserved data in the channel is increased. Now we can get rid of mQueue.

Again a user of our unit approaches. He criticises that when he heard of our unit with name MeanCalculator he thought it would provide a service that can compute the mean of a vector of floats. Looking at the code he was shocked to find no service interface for doing this. he demands to add this functionality. We could argument that the functionality of our unit is stupid enough already but hey - this is a tutorial!

Step 7 - Turn your unit into a service

So we like to provide a service method for any module that computes the mean on a vector of float values. First we need to add a method that does exactly that:

float calculateMean(const std::vector<float>& values)
{
if (values.size() == 0)
MIRA_THROW(XInvalidParameter, "The vector contains no data");
double sum = 0.0;
foreach(float f, values)
sum += f;
return (float)(sum / values.size());
}

Nothing special here. We again use a double to prevent an overflow and throw an exception if the vector is emtpy. Now lets expose this method as a service function that can be called by other modules using RPC. Therefore we need to extend our reflect method and add an include that provides the reflect mechanism for std::vectors.

#include <serialization/adapters/std/vector>
...
template<typename Reflector>
void reflect(Reflector& r)
{
...
r.method("calculateMean", &MeanCalculator::calculateMean,
"This method calculates the mean of a vector of floats",
"values", "container with values", std::vector<float>({3,4,5}));
}

Finally we need to publish our unit as a service:

virtual void initialize()
{
...
publishService(*this);
}

And we are done. Now other modules can call our "calculateMean" method via RPC and will be presented with a nice anc clean mean - or an exception if they pass an empty vector.

Up to now we have never tested our unit. We did rely on our coding skills. But now its time to run our unit in an application.

Step 8 - Write a configuration file

In order to use our unit in an application we need to write a configuration file. There is a tutorial covering that in more detail available here. We name our config file "MeanTest.xml" and it should look like:

<root>
<unit id="MyMeanCalculator">
<instance class="MeanCalculator">
<WindowSize>5000</WindowSize>
</instance>
</unit>
</root>

This config instantiates our unit with name "MyMeanCalculator" and sets the window size to 5 seconds.

We can launch this config using either mira or miracenter. We will use miracenter here because it will give us some debugging and visualization features.

miracenter MeanTest.xml

This launches our config and we should be presented with the graphical user interface of miracenter.

Since no one yet provides data in the "FloatChannel" the only thing we can test right now is the call to "calculateMean". Add the RPCView if it is not already opened and search for "MyMeanCalculator" double click on "calculateMean" and add the following JSON code in the window below the "call" button:

[1, 2, 3, 4, 5]

and click "call". You should see the JSON response containing something like (or exactly):

3

Now it is time to also test the original functionality - calculating the mean value on the "FloatChannel". Therefore we need to create another unit. This time the unit will actively produce float values so we can use Unit as a base class.

Step 9 - Create a unit

We again should use the wizard to create a Unit this time but we can put the unit in the same domain. Lets call our unit "FloatProducer". The code should look like:

#include <fw/Unit.h>
using namespace mira;
class FloatProducer : public Unit
{
MIRA_OBJECT(FloatProducer)
public:
FloatProducer() :
Unit(Duration::milliseconds(100))
{
}
virtual void initialize()
{
}
virtual void process(const Timer& timer)
{
}
};

The code slightly differs from the one of our MeanCalculator class. Lets explain the differences. This time we derive from Unit. The Unit class provides a cyclic timer that calls the process() method in a given interval. The interval is specified in the constructor - 100 milliseconds in our case. We also have the initialize method and the macros for the class factory.

Add the source file to your already created library or put the Unit in a separate library. If you decide to put both into a single library your CMakeLists.txt should look like:

MIRA_REQUIRE_PACKAGE(MIRAFramework)
MIRA_ADD_LIBRARY(MeanMachine
SHARED
SOURCE
src/MeanCalculator.C
src/FloatProducer.C
LINK_LIBS
MIRAFramework
)

Now both Units are build and linked into the same library named 'MeanMachine'.

We start by publishing our "FloatChannel":

virtual void initialize()
{
mFloatChannel = publish<float>("FloatChannel");
}
...
Channel<float> mFloatChannel;

For the value of that channel we start with 0.0f and increase the value each time process() is called.

virtual void initialize()
{
...
mFloat = 0.0f;
}
virtual void process(const Timer& timer)
{
mFloat += 1.0f;
mFloatChannel.post(mFloat);
}
...
float mFloat;

The value of "FloatChannel" is now updated every 100 milliseconds with a value that is increasing by 1.0f.

To test the combination of our two units compile everything and extend our configuration file as follows:

<root>
<unit id="MyFloatProducer">
<instance class="FloatProducer" />
</unit>
<unit id="MyMeanCalculator">
<instance class="MeanCalculator">
<WindowSize>5000</WindowSize>
</instance>
</unit>
</root>

and launch the config again using miracenter. In the ChannelView you should see two channels "FloatChannel" and "MeanChannel". Open a Text View and drag both channels to it. You should see the content of both channels as JSON.