48 #ifndef _MIRA_CHANNEL_H_
49 #define _MIRA_CHANNEL_H_
51 #ifndef _MIRA_FRAMEWORK_H_
52 # error "Channel.h must be included via the Framework.h. You must not include it directly. Use #include <fw/Framework.h> instead"
56 #include <boost/shared_ptr.hpp>
57 #include <boost/type_traits/is_polymorphic.hpp>
104 typedef typename Buffer::ValueType ValueType;
105 typedef typename Buffer::Slot Slot;
108 typedef boost::shared_ptr<Subscriber> SubscriberPtr;
119 "ConcreteChannel must have the same size as AbstractChannel");
121 static_assert(boost::is_polymorphic<ConcreteChannel>::value==
false,
122 "ConcreateChannel and AbstractChannel must not be polymorphic");
133 boost::mutex::scoped_lock lock(mSubscribersMutex);
134 mNrOfSubscribersWithoutChannelSubscriber++;
141 void addSubscriber(SubscriberPtr subscriber)
143 boost::mutex::scoped_lock lock(mSubscribersMutex);
144 mSubscribers.push_back(subscriber);
145 subscriber->attachChannel(
this);
152 void removeSubscriber()
154 boost::mutex::scoped_lock lock(mSubscribersMutex);
155 if (mNrOfSubscribersWithoutChannelSubscriber>0)
156 --mNrOfSubscribersWithoutChannelSubscriber;
163 void removeSubscriber(SubscriberPtr subscriber)
216 std::size_t olderSlots, std::size_t newerSlots,
238 const Time& to = Time::eternity());
254 friend class Channel<T>;
267 struct ChannelCaster {
272 promote_channel<T>(p);
280 <<
" to typed channel of type '" << typeName<T>() <<
"'");
292 struct ChannelCaster<void> {
293 static ConcreteChannel<void>* cast(AbstractChannel* p)
297 return static_cast<ConcreteChannel<void>*
>(p);
305 struct ChannelCaster<T*> {
306 static ConcreteChannel<T*>* cast(AbstractChannel* p)
308 static_assert(std::is_base_of<mira::Object,T>::value,
309 "Pointers that are used in channels must be derived from "
310 "mira::Object. Pointers to other classes cannot be used.");
314 promote_channel<T*>(p);
317 assert(p->getBuffer()->isPolymorphic());
321 return static_cast<ConcreteChannel<T*>*
>(p);
328 inline ConcreteChannel<T>* channel_cast(AbstractChannel* p)
330 return ChannelCaster<T>::cast(p);
338 template <
typename T>
343 inline Channel<T> channel_cast(Channel<void> channel);
361 template <
typename T>
367 struct ParamHelper<void> {
421 template <
typename T>
435 mChannel(channel), mAccessFlags(accessFlags) {
436 assert(mChannel!=NULL);
452 return mChannel!=NULL;
462 MIRA_THROW(XAccessViolation,
"No channel assigned to the channel proxy. "
463 "You can obtain a channel using Authority::getChannel()!");
477 return mChannel->getID();
487 return mChannel->getTypeId();
501 return mChannel->getTypename();
510 mChannel->setTypename(type);
518 return mChannel->getTypeMeta();
526 mChannel->setTypeMeta(meta);
544 return mChannel->getNrOfSlots() == 0;
552 return mChannel->hasSubscriber();
560 return mChannel->hasPublisher();
568 return mChannel->getBuffer()->getMaxSlots() == 1;
576 return mChannel->getBuffer()->getMaxSlots();
584 return mChannel->getBuffer()->getMinSlots();
593 return mChannel->getBuffer()->getStorageDuration();
601 return mChannel->getBuffer()->isAutoIncreasingStorageDuration();
609 return mChannel->getBuffer()->getSize();
635 validateReadAccess();
638 if(!timeout.isValid() || timeout.isInfinity())
639 end = Time::eternity();
643 while(!boost::this_thread::interruption_requested())
646 return mChannel->read();
647 }
catch(XInvalidRead& ex) {}
667 validateReadAccess();
670 if(!timeout.isValid() || timeout.isInfinity())
671 end = Time::eternity();
675 while(!boost::this_thread::interruption_requested())
696 validateReadAccess();
697 return mChannel->read();
722 validateReadAccess();
723 return mChannel->read(timestamp, mode, tolerance);
745 validateReadAccess();
746 Time newestSlotTime = mChannel->read()->timestamp;
749 for(; iter != interval.
end(); ++iter){
750 if(iter->sequenceID == sequenceID)
753 MIRA_THROW(XInvalidRead,
"No slot with sequenceID "<<sequenceID<<
754 " found within searchInterval of channel");
763 validateReadAccess();
764 return mChannel->read(timestamp,
NEAREST_SLOT, tolerance);
797 std::size_t olderSlots,
798 std::size_t newerSlots,
800 validateReadAccess();
801 return mChannel->readInterval(timestamp, nrSlots, olderSlots,
802 newerSlots, fillMode);
821 const Time& to=Time::eternity()) {
822 validateReadAccess();
823 return mChannel->readInterval(from, to);
842 validateWriteAccess();
843 return mChannel->write();
872 validateWriteAccess();
876 && (mChannel->getBuffer()->getMaxSlots()>1)
877 && (mChannel->getBuffer()->getSize()>0))
887 return mChannel->write();
904 validateReadAccess();
924 validateReadAccess();
925 ChannelRead<T> value = mChannel->read(timestamp, mode, tolerance);
935 validateReadAccess();
947 GetTime(
const Time& iT0) : t0(iT0) {}
950 typedef float result_type;
951 result_type operator()(
const Stamped<T>& p)
const {
953 return (p.timestamp-t0).totalMilliseconds();
962 typedef const T& result_type;
963 result_type operator()(
const Stamped<T>& p)
const {
990 template <
typename Filter>
992 validateReadAccess();
998 filter.samplesBefore(),
999 filter.samplesAfter());
1001 if (!filter.canExtrapolate())
1003 int samplesBefore = filter.samplesBefore();
1004 int samplesAfter = filter.samplesAfter();
1006 for (
auto it = data.
begin(); it != data.
end(); ++it)
1008 if (it->timestamp <= timestamp)
1010 else if (it->timestamp >= timestamp)
1014 if (samplesBefore > 0)
1015 MIRA_THROW(XRuntime,
"Filter::samplesBefore: Requirements not met. Missing " << samplesBefore <<
" items.");
1016 if (samplesAfter > 0)
1017 MIRA_THROW(XRuntime,
"Filter::samplesAfter: Requirements not met. Missing " << samplesAfter <<
" items.");
1025 if (mChannel->getNrOfSlots() == 1)
1032 const Time t0 = data.
begin()->timestamp;
1035 typedef boost::transform_iterator<GetTime, const_iterator> GetTimeIterator;
1036 GetTimeIterator begin1 = GetTimeIterator(data.
begin(), GetTime(t0));
1037 GetTimeIterator end1 = GetTimeIterator(data.
end() , GetTime(t0));
1039 typedef boost::transform_iterator<GetValue, const_iterator> GetValueIterator;
1041 GetValueIterator begin2=GetValueIterator(data.
begin(), GetValue());
1042 GetValueIterator end2 =GetValueIterator(data.
end() , GetValue());
1046 TimeContainer timeContainer(begin1, end1);
1047 ValueContainer valueContainer(begin2, end2);
1049 return makeStamped(filter.template apply<float, T>(timeContainer,
1051 (timestamp-t0).totalMilliseconds()),
1064 template <
typename U =
void>
1066 validateWriteAccess();
1073 template <
typename U =
void>
1075 validateWriteAccess();
1077 *writeSlot = std::move(value);
1090 template <
typename U>
1092 validateWriteAccess();
1094 *writeSlot =
makeStamped(std::forward<U>(value), timestamp);
1107 template <
typename U = T>
1108 typename std::enable_if<!std::is_void<U>::value>
::type
1110 validateWriteAccess();
1117 template <
typename U = T>
1118 typename std::enable_if<!std::is_void<U>::value>
::type
1120 validateWriteAccess();
1122 *writeSlot =
makeStamped(std::move(value), timestamp);
1131 template<
typename U>
1138 void dbgDump(
bool brief=
true) { mChannel->dbgDump(brief); }
1143 void validateReadAccess()
const
1147 MIRA_THROW(XAccessViolation,
"You are not allowed to read from channel '"
1148 << mChannel->getID() <<
"'. Forgot to subscribe?");
1151 void validateWriteAccess()
const
1155 MIRA_THROW(XAccessViolation,
"You are not allowed to write to channel '"
1156 << mChannel->getID() <<
"'. Forgot to publish?");
1161 ConcreteChannel<T>* mChannel;
1168 template<
typename U>
1169 inline Channel<U> channel_cast(Channel<void> channel)
1172 return Channel<U>(channel_cast<U>(channel.mChannel), channel.mAccessFlags);
1180 #include "impl/ChannelReadWrite.hpp"
1181 #include "impl/ChannelReadInterval.hpp"
void post(U &&value, const Time ×tamp=Time::now())
Writes the specified data with the specified time stamp into the channel.
Definition: Channel.h:1091
TypeMetaPtr getTypeMeta() const
Return the type meta information for this channel.
Definition: Channel.h:516
An object that allows read access to a whole interval of channel data.
Definition: ChannelReadInterval.h:72
Read access (Authority is a subscriber)
Definition: Channel.h:353
void finish()
Releases the lock explicitly.
Definition: ChannelReadWrite.h:543
void post(const Stamped< T > &value)
Writes the specified data into the channel.
Definition: Channel.h:1065
An exception that occurs whenever a channel has no data.
Definition: Channel.h:88
void post(Stamped< T > &&value)
Same as above, for rvalue reference (move semantics)
Definition: Channel.h:1074
Write access (Authority is a publisher)
Definition: Channel.h:354
std::size_t getNrOfSlots() const
Returns how many slots (i.e.
Definition: Channel.h:607
std::size_t getMinSlots() const
Returns the number slots that are guaranteed to be kept in the channel buffer.
Definition: Channel.h:582
Macros for generating logical operators for using enum values as flags.
ChannelRead< T > read(uint32 sequenceID, const Duration &searchInterval=Duration::seconds(1))
Obtains read access to the element with the specified sequenceID within the given search interval If ...
Definition: Channel.h:743
bool isValid() const
Returns true if the proxy object is valid.
Definition: Channel.h:451
Typename getTypename() const
Returns the platform independent typename of the items that are stored in the slots of the underlying...
Definition: AbstractChannel.h:107
#define MIRA_ENUM_TO_FLAGS(EnumType)
Macro that can be used with enums that contain flags.
Definition: EnumToFlags.h:80
int getTypeId() const
Returns the non-portable typeid of the items that are stored in the slots of the underlying channel b...
Definition: AbstractChannel.h:99
Subscriber classes used to subscribe on channels for automatic notification.
Channel(ConcreteChannel< T > *channel, ChannelAccessFlags accessFlags)
Is used by Framework to create a channel proxy object.
Definition: Channel.h:434
std::string Typename
Definition: Typename.h:59
void setTypeMeta(TypeMetaPtr meta)
Set the type meta information for this channel.
Definition: Channel.h:524
const_iterator begin() const
Definition: ChannelReadInterval.h:163
An object that allows exclusive write access to data of a channel.
Definition: ChannelReadWrite.h:594
bool isTyped() const
Returns true, if the channel is typed and false, if it is untyped.
Definition: Channel.h:493
const_iterator end() const
Definition: ChannelReadInterval.h:164
#define MIRA_DEFINE_SERIALIZABLE_EXCEPTION(Ex, Base)
Macro for easily defining a new serializable exception class.
Definition: Exceptions.h:66
Definition: ChannelReadWrite.h:65
An object that allows read access to data of a channel.
Definition: ChannelReadWrite.h:435
ChannelReadInterval< T > readInterval(const Time ×tamp, std::size_t nrSlots, std::size_t olderSlots, std::size_t newerSlots, IntervalFillMode fillMode=PREFER_NEWER)
Obtains read access to an interval of elements before and after the requested timestamp.
Definition: Channel.h:795
#define MIRA_THROW(ex, msg)
Macro for throwing an exception.
Definition: Exception.h:91
void finish()
Releases the lock explicitly and informs the Channel to signal all Subscribers that new data is avail...
Definition: ChannelReadWrite.h:704
Wrapper class for boost::posix_time::ptime for adding more functionality to it.
Definition: Time.h:421
ChannelRead< T > read(const Time ×tamp, SlotQueryMode mode=NEAREST_SLOT, const Duration &tolerance=Duration::infinity())
Obtains read access to the element at the specified timestamp.
Definition: Channel.h:720
Duration getStorageDuration() const
Returns the timeframe that specifies how long a slot is guaranteed to be kept in the channel buffer...
Definition: Channel.h:591
Classes for automatic locking/unlocking when reading and writing to channels.
MIRA_BASE_EXPORT void write(const Value &value, std::ostream &ioStream, bool formatted=false, int precision=-1)
Writes a json::Value into a given stream using the JSON format.
bool isTyped() const
Returns true, if the channel is typed and false, if it is untyped.
Definition: AbstractChannel.h:143
Commonly used exception classes.
PropertyHint type(const std::string &t)
Sets the attribute "type" to the specified value.
Definition: PropertyHint.h:295
Definition: AbstractChannel.h:70
ChannelRead< T > waitForData(const Duration &timeout=Duration::infinity()) const
Waits until data in this channel becomes available.
Definition: Channel.h:633
Use this class to represent time durations.
Definition: Time.h:104
ChannelRead< T > read(const Time ×tamp, const Duration &tolerance)
Same as above, but always takes the nearest slot.
Definition: Channel.h:762
bool isAutoIncreasingStorageDuration() const
Returns whether the channel buffer is automatically increasing the storage duration.
Definition: Channel.h:599
ChannelRead< T > read()
Obtains read access to the latest data element of this channel.
Definition: Channel.h:695
void reset()
Reset the proxy object so that it becomes invalid.
Definition: Channel.h:442
bool isEmpty() const
Returns if the channel is empty (no data has been published)
Definition: Channel.h:542
MIRA_BASE_EXPORT void read(const std::string &s, Value &oValue)
Read a json::Value from a string that contains JSON format.
Typed ChannelBuffer.
Definition: ChannelBuffer.h:860
The slot with smallest time difference to the requested will be chosen.
Definition: ChannelBuffer.h:103
Const iterator for iterating over the interval.
Definition: ChannelReadInterval.h:85
void setTypename(const Typename &type)
Set the typename of this channel.
Definition: Channel.h:508
Mix in for adding a time stamp, an optional frame id and an optional sequence id to data types like P...
Definition: Stamped.h:149
boost::shared_ptr< TypeMeta > TypeMetaPtr
Definition: MetaSerializer.h:309
std::enable_if<!std::is_void< U >::value >::type post(typename ParamHelper< T >::type &&value, const Time ×tamp=Time::now())
Same as above, for rvalue reference (move semantics)
Definition: Channel.h:1119
sec_type seconds() const
Returns normalized number of seconds (0..59)
Definition: Time.h:283
std::enable_if<!std::is_void< U >::value >::type post(const typename ParamHelper< T >::type &value, const Time ×tamp=Time::now())
This allows post({}, Time()); to deduce we want to post an object of the channel's type...
Definition: Channel.h:1109
Stamped< typename std::decay< T >::type > makeStamped(T &&value, const Time ×tamp=Time::now(), const std::string &frameID=std::string(), uint32 sequenceID=0)
Declare stamped classes for all standard data types including std::string.
Definition: Stamped.h:471
Base class for exceptions.
Definition: Exception.h:204
Generic buffer class that can be used as a replacement for std::vector whenever copying and reallocat...
Definition: Buffer.h:84
static Duration infinity()
Returns a special duration time representing positive infinity.
Definition: Time.h:245
Channel()
Create channel proxy that is not assigned to a channel.
Definition: Channel.h:429
bool hasPublisher() const
Returns true, if this channel has at least one publisher.
Definition: Channel.h:558
const std::string & getID() const
Returns the channel id of this channel.
Definition: AbstractChannel.h:89
ChannelWrite< T > write()
Obtains exclusive write access to the next free data slot of this channel.
Definition: Channel.h:841
No access at all (Authority is not a publisher nor a subscriber)
Definition: Channel.h:352
ChannelWrite< T > write(bool copyLatestData)
Same as write above with an additional parameter copyLatestData that allows you to specify whether th...
Definition: Channel.h:871
static Time now() static Time eternity()
Returns the current utc based time.
Definition: Time.h:484
const std::string & getID() const
Return the channel ID, its name.
Definition: Channel.h:475
Typename getTypename() const
Return the typename of the channel.
Definition: Channel.h:499
Prefer filling the interval with slots with timestamp > requested.
Definition: ChannelBuffer.h:115
void validate() const
Checks if the channel is valid otherwise throws an exception.
Definition: Channel.h:460
std::size_t getMaxSlots() const
Returns the upper limit of slots that are allowed for this channel.
Definition: Channel.h:574
IntervalFillMode
Mode that is used to determine what slots should be added to the interval when not enough slots are a...
Definition: ChannelBuffer.h:110
SlotQueryMode
Mode that is used to determine the slot obtained from a channel when no slot exists at the exact time...
Definition: ChannelBuffer.h:94
bool hasSubscriber() const
Returns true, if this channel has at least one subscriber.
Definition: Channel.h:550
Base class for all framework channels.
#define MIRA_SLEEP(ms)
Sleeps for ms milliseconds This is a thread interruption point - if interruption of the current threa...
Definition: Thread.h:95
Implements AbstractChannelSubscriber for a concrete data type.
Definition: ChannelSubscriber.h:82
void removeSubscriber(AbstractChannelSubscriberPtr subscriber)
Removes the specified subscriber from this channel and calls its detach() method. ...
The framework that holds all manager classes and provides startup and shutdown of all framework relat...
Provides method for generating a unique id for any type.
bool hasSoloSlot() const
Returns true, if this channel has one solo slot only.
Definition: Channel.h:566
int getTypeId() const
Returns the type id of the channel.
Definition: Channel.h:485
Wraps an STL conform container around a range of values within another container. ...
Definition: IteratorRangeContainer.h:64
bool waitForPublisher(const Duration &timeout=Duration::infinity()) const
Waits until this channel has at least one publisher.
Definition: Channel.h:665
Classes for reading whole intervals of data from channels.
ChannelReadInterval< T > readInterval(const Time &from, const Time &to=Time::eternity())
Obtains read access to an interval of elements in a given time span.
Definition: Channel.h:820
ChannelAccessFlags
Flags specifying the access rights of an authority to a channel Can be combined.
Definition: Channel.h:351