MIRA
Tutorial: Creating a Unit that Subscribes to Data


Prerequisites

This tutorial assumes that you have created a domain and that you know how to create, compile and launch a unit as described in the previous tutorials. To run the Unit from this tutorial you need the "FloatPublisher" Unit of the previous tutorial. If you did not complete that tutorial, you can download the code of that Unit here.

Introduction

In this tutorial we will get to know MicroUnits, a special type of Units that are reactive only and therefore have no worker thread to perform cyclic operations. The MicroUnits concept is suitable for Units that react on incoming data only. In this tutorial, we will create such a MicroUnit, that subscribes on the data that is produced by our "FloatPublisher". Our new Unit will compute the moving average on that data within a certain window. This mean value will be published again to be used by other units.

Creating the Unit

First, we create the MicroUnit using the mirawizard similar to the previous tutorial. This time we select "MicroUnit" as component type. Let's call our MicroUnit "MeanCalculator" and place it into the "tutorials" namespace within our Tutorials domain. After you have created the MicroUnit you should have a source file that contains something like:

#include <fw/MicroUnit.h>
using namespace mira;
namespace tutorials {
class MeanCalculator : public MicroUnit
{
MIRA_OBJECT(MeanCalculator)
public:
MeanCalculator();
template<typename Reflector>
void reflect(Reflector& r)
{
}
protected:
virtual void initialize();
};
MeanCalculator::MeanCalculator()
{
}
void MeanCalculator::initialize()
{
}
}
MIRA_CLASS_SERIALIZATION(tutorials::MeanCalculator, mira::MicroUnit );

Subscribe to a channel

In order to receive the data from our "FloatPublisher" that it writes to the "FloatChannel", we need to subscribe on that channel. Again, we use the initialize() function to do that because this function is called after the unit was created and before it gets started.

void MeanCalculator::initialize()
{
subscribe<float>("FloatChannel", &MeanCalculator::onNewData);
}

Similar to publishing a channel, we need to specify the data type of the channel and its name that we want to subscribe. Moreover, we can specify a callback method that is called whenever new data becomes available.

We also need to add the onNewData callback method to our class:

class MeanCalculator
{
...
private:
void onNewData(ChannelRead<float> data);
};
...
void MeanCalculator::onNewData(ChannelRead<float> data)
{
}

This function will automatically be called when new data is published to the channel, with the read-locked data of the latest update as parameter.

Now we can calculate the mean value. We will do this by using a queue collecting up to 10 values. When we have reached 10 values, we will throw away the oldest and calculate the mean.

#include <deque>
class MeanCalculator
{
...
// class members
private:
std::deque<float> mQueue;
};
...
void MeanCalculator::onNewData(ChannelRead<float> data)
{
mQueue.push_back(data->value());
if (mQueue.size() > 10)
mQueue.pop_front();
float sum = 0.0;
foreach(float f, mQueue)
sum += f;
float mean = sum / mQueue.size();
std::cout << "MeanCalculator: " << mean << std::endl;
}

For demonstration purposes, we write out the computed average on the console.

Publish the mean

Now that we have used this naive but simple way for calculating the moving average, we can publish it in a channel for all the other modules interested in the mean of the 10 last float values. As we have learned in the previous tutorial, we can do this by adding the following lines:

class MeanCalculator
{
...
// class members
private:
std::deque<float> mQueue;
Channel<float> mMeanChannel;
};
...
void MeanCalculator::initialize()
{
subscribe<float>("FloatChannel", &MeanCalculator::onNewData);
mMeanChannel = publish<float>("MeanChannel");
}
void MeanCalculator::onNewData(ChannelRead<float> data)
{
mQueue.push_back(data->value());
if (mQueue.size() > 10)
mQueue.pop_front();
float sum = 0.0;
foreach(float f, mQueue)
sum += f;
float mean = sum / mQueue.size();
std::cout << "MeanCalculator: " << mean << std::endl;
ChannelWrite<float> w = mMeanChannel.write();
w->value() = mean;
}

Now every time we have computed a new mean, we will write it to the channel "MeanChannel".

Instead of

ChannelWrite<float> w = mMeanChannel.write();
w->value() = mean;

we could use the much shorter

mMeanChannel.post(mean);

However, please note that this should only be used for data where copying is cheap, since the post() method will result in copying of the data, while the ChannelWrite version can be used without additional unnecessary data copying.

Compile and Launch the Unit

We are already done, and the code of our Unit should look like this.

Now we can compile and test it. The mirawizard has already added the following lines to our domain's CMakeLists.txt, that build a shared library for our Unit:

MIRA_ADD_LIBRARY(MeanCalculator
SHARED
SOURCE
src/MeanCalculator.C
LINK_LIBS
MIRABase
MIRAFramework
)

As in the previous tutorial, we do not need to make any changes here and can build the unit by typing:

> make

within our project's root directory.

To start our unit, we have to adapt our configuration XML file. Therefore, we open the file domains/Tutorials/etc/Tutorials.xml and modify it as follows:

<root>
<unit id="MyFloatProducer" class="tutorials::FloatProducer"/>
<unit id="MyMeanCalculator" class="tutorials::MeanCalculator"/>
</root>

The second <unit>-line now additionally starts our new "MeanCalculator" Unit beside our "FloatProducer"

Now we can start that configuration file using the mira tool, and should see the outputs of our two Units:

/home/user/myproject> mira domains/Tutorials/etc/Tutorial.xml
FloatProducer: 1
MeanCalculator: 1
FloatProducer: 2
MeanCalculator: 1.5
FloatProducer: 3
MeanCalculator: 2
FloatProducer: 4
MeanCalculator: 2.5
FloatProducer: 5
MeanCalculator: 3
...

As you can see, whenever the "FloatProducer" generates a value, the "MeanCalculator" is activated by its onNewData() callback and computes a new mean value.