MIRA
Publishing / Subscribing


Contents

Overview

Communication via Channels uses the publish/subscribe pattern. This is a messaging pattern where senders (publishers) of messages do not program the messages to be sent directly to specific receivers (subscribers). Instead, published messages are written to the Channels, without knowledge of which, if any, subscribers there might be. Subscribers express interest in one or more Channels, and only receive messages that are of interest, without knowledge of what, if any, publishers there are. This decoupling of publishers and subscribers can allow for greater scalability and a more dynamic network topology. (cf. Wikipedia)

Publishing a Channel

Before writing data to a channel, your Unit/Authority needs to be a publisher of this channel. You can publish a channel by calling the publish() method wspecifying the name and the type of the channel.

Channel<Pose2> poseChannel = publish<Pose2>("Pose");

The call returns a Channel object you can store and use for writing data to the channel.

In the code examples given on this page, we assume that you are implementing a Unit. However, as you will learn on the Authority page, the whole functionality of the framework can also be used outside of units, e.g. when you are implementing a stand-alone application. In that case you need an Authority object, to call the methods. The above example then needs to be slightly modified as follows:

// assume you have an Authority object created and checked in at the framework
Channel<Pose2> poseChannel = authority.publish<Pose2>("Pose");

Subscribing to Channels

To be able to read data from a channel you need to subscribe to that channel first. To subscribe to a channel you need to call the subscribe() method and specify the type, the name of the channel and optionally the length of the history as a time span. If a time span is given the channel will create the required amount of slots that are necessary to provide a history of data of the desired length, if the allowed number of slots is not exceeded. The number of maximum slots can be changed in the configuration file.

Channel<Pose2> poseChannel = subscribe<Pose2>("Pose");
// or
Channel<Pose2> poseChannel = subscribe<Pose2>("Pose", Duration::seconds(2));

Additionally, you can specify a callback method, that is called each time the data in the channel changes.

void poseCallback(ChannelRead<Pose2> pose)
{
// do something with pose
}
...
// subscribes with a global function as callback
Channel<Pose2> poseChannel = subscribe<Pose2>("Pose", poseCallback);
// subscribes with a member function of the unit as callback
Channel<Pose2> poseChannel = subscribe<Pose2>("Pose", &MyUnit::poseCallback);
// subscribes with a bound function
Channel<Pose2> poseChannel = subscribe<Pose2>("Pose", boost::bind(&MyUnit::poseCallback, this, _1, some bound parameters));

As callback method you can specify a global function, a member function or a function bound with boost::bind.

When calling the specified callback, the framework will pass a ChannelRead object pointing to the latest data of the channel. Internally, the framework will automatically create a thread that calls the subscriber callbacks. This results in a big advantage of the framework since we can perform complex and time consuming operations within the subscriber callback without blocking the framework or other Units or Authorities (if the queue has size > 1).

Threading hints

By default all channel subscriber callbacks of a Unit/Authority share the main dispatcher thread of that authority. This means if one of the callbacks, handlers or timers blocks, the other callbacks will not be called until the blocking callback returns. If you want to ensure that a certain callback gets its separate notification thread and is always called even if other callbacks of the same Unit/Authority may block, you can subscribe the callback using an independent thread by specifying the optional parameter independentThread:

Channel<Pose2> poseChannel = subscribe<Pose2>("Pose", poseCallback, true);
...
The returned Channel object has the same read and access rights as the Unit/Authority at the time after calling publish(). This means if the Unit/Authority was not a subscriber to the channel when calling publish, the returned channel object is only allowed to write. To avoid mistakes when subscribing to and publishing a channel at the same time, first call publish(), then subscribe() and store the channel object returned by the last call to subscribe(), which will then have both read and write access rights.

Writing to Channels

Before reading or writing data from or to the channel you first need to obtain a channel object from the framework. You can do this by either storing the channel object returned by the publish() or subscribe() method as shown above, or you can obtain the channel object by calling getChannel():

Channel<Pose2> poseChannel = getChannel<Pose2>("Pose");

In the call to getChannel() you need to specify the data type as template parameter and the name of the channel.

In order to write data into the channel you can obtain a write accessor by calling the channel's write() method. This method returns a ChannelWrite object. This ChannelWrite object allows direct access to the data element using the -> operator (similar to an STL iterator) and automatically takes care of locking.

// pose will be a locked reference to a slot
ChannelWrite<Pose2> pose = poseChannel.write();
// we can start writing immediately and don't have to worry if we block longer
pose->timestamp = Time::now();
pose->value() = Pose2(1,2,0);
// when pose goes out of scope it will be unlocked and the framework will notify all subscribers
// one can also call pose.finish() or pose.discard();

If the write accessor object gets out of scope the data is unlocked automatically and the channel and all subscribers are informed about the new data. You can also call finish() to unlock the data manually. Calling finish() before leaving the scope has the advantage that an exception could be thrown from it if the written data was invalid (in particular, if timestamp is an invalid time). Exceptions cannot be thrown from the write accessor's destructor, there they have to be caught internally and just result in error messages, so the writer cannot handle them and it might be harder to spot logical errors in the code.

If you want to discard the written data you can call discard() to unlock the data without informing the channel and the subscribers about the data.

The write method can also take an optional parameter that can be set to specify whether the channel should copy the latest data available into the ChannelWrite slot. This is suitable whenever the caller wants to modify the latest data partially since it allows to omit an additional read call.

// obtain a write slot and make sure that it already contains the last data
ChannelWrite<Pose2> pose = poseChannel.write(true);
// and we can start writing immediately and don't have to worry if we block longer
pose->timestamp = Time::now();
pose->x() = 10;...
When using the copy function of the write method the Channel object must also have read access rights i.e. the writer must be subscribed to the channel.

Reading Data from Channels

Reading is done likewise. The channel's read() method returns a ChannelRead object that allows direct access to the data element using the -> operator and automatically takes care of locking. If the object gets out of scope the data is unlocked automatically. You can also call finish() to unlock the data manually.

// pose will be a locked reference to the slot containing the newest data
ChannelRead<Pose2> pose = poseChannel.read();
// start reading our data and don't worry if we block
computeSomeThingWithPose(pose);
...
// when pose goes out of scope it will be unlocked,
// one can also call pose.finish();

Reading Data from the Past

Normally you will be interested in the newest data of the channel. But sometimes it can be necessary to access older data. This is possible since each channel is able to store a history of data. Reading older data can be achieved by specifying a time in the read function.

ChannelRead<Pose2> oldPose = poseChannel.read(Time::now()-Duration::milliseconds(500));

Now oldPose contains data that is about 500 milliseconds old.

The main application of this mechanism is to synchronize the data of different channels, e.g:

// get the latest image
ChannelRead<Image> currentImage = imageChannel.read();
// due to delays in hardware etc. the image may be a few ms old, hence don't
// use the latest pose. Instead use the pose that was captured at the same time
// as the image
ChannelRead<Pose2> poseForImage = poseChannel.read(currentImage->timestamp);

The above read method usually will return the slot which time is the nearest neighbor to the given time. If you want to make sure to use either older or newer data you can change this default behavior by specifying an optional second parameter. See the documentation of the read() method for more details.

When reading data at a certain timestamp you can also specify a tolerance. If the data whose timestamp is closest to the specified one is still farther away in time than the specified tolerance, an exception is thrown that tells you that no data was found. This spares you doing this check manually.

// read the matching pose to the image, but make sure that the difference
// in the timestamps is never larger than 10 ms, otherwise throw an exception
ChannelRead<Pose2> poseForImage = poseChannel.read(currentImage->timestamp, Duration::milliseconds(10));

Reading Intervals of Data

The data that is obtained using the above read mechanism only represents snapshots or chunks of data. For streaming data (like audio) it is necessary to obtain a range of data frames. This can be achieved by reading an interval of data that represents a data stream over time. An interval can be obtained using the readInterval() method. As parameters this method takes two time stamps, that specify the begin and the end of the desired interval. The second parameter is optional and if not specified set to Time::eternity(). The returned interval will follows the following rules:

Using this mechanism you can make sure that you never miss any data (as long as the channel's queue size is large enough). Therefore, the time stamp that specifies the end of the interval in a previous call must be used to specify the beginning of the interval in the next call to readInterval() as shown in the following example, which reads an interval of audio frames that are stored within the channel:

Time lastTime = Time::now();
while(...)
{
// read the data that became available since 'lastTime'
ChannelReadInterval<AudioFrame> interval = audioChannel.readInterval(lastTime);
// do a long processing on the interval ...
// ... iterate over the interval ...
foreach(const Stamped<AudioFrame>& frame, interval)
{
// do something with frame ...
// update currentTime to the time of the frame
currentTime = frame.timestamp;
}
// store the time stamp up to that we have processed the data
lastTime = currentTime;
}

As shown in the this example you can treat the returned interval as a read-only STL container. It provides iterators and other STL-conform methods and types.

Similar to ChannelRead and ChannelWrite, that were described above, the ChannelReadInterval also takes care of locking and unlocking the underlying data. Hence, all requested slots will be unlocked automatically when the interval goes out of scope, the finish() method is called, or a different interval is assigned to a variable.

Moreover, you can assign a single slot of the interval to a ChannelRead object as shown below:

...
// get a single read accessor to one slot of the interval
ChannelRead<AudioFrame> singleFrame = interval.begin();
// unlocks all other unreferenced slots of the interval
interval.finish();
// singleFrame is still locked and can be accessed for reading

A second variant of the readInterval() method takes four parameters. It can be used for requesting an interval by specifying a time, the number of slots and the number of desired slots before and after that time:

// requesting 6 slots where 3 should be older than the timestamp and 2 should be newer.
ChannelReadInterval<AudioFrame> interval =
audioChannel.readInterval(Time::now()-Duration::milliseconds(500), 6, 3, 2);
for(ChannelReadInterval<AudioFrame>::iterator it=interval.begin(); it!=interval.end(); ++i)
{
// do something with frame ...
}

In the above example 6 slots were requested but only 5 slots were specified (3 before and 2 after). The remaining slot will be chosen according to the optional parameter. By default newer slots will be added to the interval until the number of requested slots is reached. If there are not enough new slots available, older slots will be added. This behavior can be changed by specifying PREFER_OLDER as last parameter.

Subscribing to Channels with zero data loss

As mentioned above, using intervals allows to read from channels without missing any data. You can also use this feature when subscribing to a channel. Instead of the above subscribe() methods you can use the subscribeInterval() method.
With this method you can specify a callback that takes a ChannelReadInterval object as parameter:

void poseCallback(ChannelReadInterval<Pose2> interval)
{
// iterate through all values
foreach(const Stamped<Pose2>& value, interval) {
...
}
}
...
// subscribes with a member function of the unit as callback
subscribeInterval<Pose2>("Pose", &MyUnit::poseCallback, Duration::seconds(5));
// subscribes with a bound function
subscribeInterval<Pose2>("Pose", boost::bind(&MyUnit::poseCallback, this, _1, some bound parameters), Duration::seconds(5));

In the call to subscribeInterval() its important to pass a duration as third parameter. The duration specifies how long data is buffered within the channel. This duration should be longer than the maximum expected blocking time of the callback to ensure that no data is missed until the callback can be called the next time.

However, it is also possible to use subscribeIntervalByElements() by passing a function taking a ChannelRead<T> instead of a ChannelReadInterval<T> object. The following code shows an example:

void poseCallback(ChannelRead<Pose2> pose)
{
// no iteration needed
...
}
...
// subscribe with a member function
subscribeIntervalByElements<Pose2>("Pose", &MyUnit::poseCallback, Duration::seconds(5));
// subscribe with a bound function
subscribeIntervalByElements<Pose2>("Pose", boost::bind(&MyUnit::poseCallback, this, _1), Duration::seconds(5));

The subscribeIntervalByElements() function mainly does what was already mentioned above by wrapping the ChannelRead<T> callback function in a foreach loop iterating through the interval and eventually using the subscribeInterval function with this very wrapper function.

Accessing Channels using post() and get()

Beside the above mentioned mechanism for reading and writing data you can access a channel also using its post() and get() method.

The post() method allows to write or "post" a value directly to the channel without acquiring a ChannelWrite object:

floatChannel.post(123.4f, Time::now());

Similarly, the get() method allows to read a value directly from the channel:

float value = floatChannel.get();

Like the read() method, the get() method also takes an optional parameter that can specify a time in the past where to obtain the data:

// obtain value 50 ms ago.
float value = floatChannel.get(Time::now()-Duration::milliseconds(50));
Using the post() and get() method for writing or reading data will lead to unnecessary copying of data and should be used for lightweight objects and built-in datatypes only. For large objects like range scans or images you MUST use the write() and read() method to obtain read and write access without any performance penalties.

Reading Interpolated Data

When reading data from a past timestamp, the get() method also allows you to interpolate between the data values that are stored around the desired timestamp.

This can be achieved by specifying a "Filter" as second parameter beside the timestamp. The following example reads a value from the "floatChannel" 50 milliseconds ago and linearly interpolates between the two values that are stored before and after that moment in time:

// obtain value 50 ms ago.
float value = floatChannel.get(Time::now()-Duration::milliseconds(50), LinearInterpolator());

Beside the LinearInterpolator you can also use other interpolators or implement your own. You could e.g implement a filter that computes the average of the values that are stored around the desired timestamp...

Waiting for Data in Channels to Become Available

Since the startup of authorities in a framework follows no specific order or remote frameworks containing other channels will connect later, it is sometimes necessary to wait for a certain channel to contain data. Channels provide a method for this purpose: waitForData. This method blocks for a given time span or forever (if no time is given) until data becomes available in the channel. The acquired ChannelRead object is returned.

Channel<int> channel = subscribe<int>("IntChannel");
ChannelRead<int> r = channel.waitForData(Duration::seconds(5));
if (r.isValid())
...do something with it