48 #ifndef _MIRA_CHANNEL_H_ 49 #define _MIRA_CHANNEL_H_ 51 #ifndef _MIRA_FRAMEWORK_H_ 52 # error "Channel.h must be included via the Framework.h. You must not include it directly. Use #include <fw/Framework.h> instead" 56 #include <boost/shared_ptr.hpp> 57 #include <boost/type_traits/is_polymorphic.hpp> 104 typedef typename Buffer::ValueType ValueType;
105 typedef typename Buffer::Slot Slot;
108 typedef boost::shared_ptr<Subscriber> SubscriberPtr;
119 "ConcreteChannel must have the same size as AbstractChannel");
121 static_assert(boost::is_polymorphic<ConcreteChannel>::value==
false,
122 "ConcreateChannel and AbstractChannel must not be polymorphic");
133 boost::mutex::scoped_lock lock(mSubscribersMutex);
134 mNrOfSubscribersWithoutChannelSubscriber++;
141 void addSubscriber(SubscriberPtr subscriber)
143 boost::mutex::scoped_lock lock(mSubscribersMutex);
144 mSubscribers.push_back(subscriber);
145 subscriber->attachChannel(
this);
152 void removeSubscriber()
154 boost::mutex::scoped_lock lock(mSubscribersMutex);
155 if (mNrOfSubscribersWithoutChannelSubscriber>0)
156 --mNrOfSubscribersWithoutChannelSubscriber;
163 void removeSubscriber(SubscriberPtr subscriber)
177 ChannelRead<T>
read();
215 ChannelReadInterval<T> readInterval(
const Time& timestamp, std::size_t nrSlots,
216 std::size_t olderSlots, std::size_t newerSlots,
237 ChannelReadInterval<T> readInterval(
const Time& from,
238 const Time& to = Time::eternity());
251 ChannelWrite<T>
write();
254 friend class Channel<T>;
267 struct ChannelCaster {
268 static ConcreteChannel<T>* cast(AbstractChannel* p)
272 promote_channel<T>(p);
274 assert(p->isTyped());
277 if(typeId<T>()!=p->getTypeId())
278 MIRA_THROW(XBadCast,
"Cannot cast channel '" << p->getID()
279 <<
"' (type '" << p->getTypename() <<
"') " 280 <<
" to typed channel of type '" << typeName<T>() <<
"'");
284 return static_cast<ConcreteChannel<T>*
>(p);
292 struct ChannelCaster<void> {
293 static ConcreteChannel<void>* cast(AbstractChannel* p)
297 return static_cast<ConcreteChannel<void>*
>(p);
305 struct ChannelCaster<T*> {
306 static ConcreteChannel<T*>* cast(AbstractChannel* p)
308 static_assert(std::is_base_of<mira::Object,T>::value,
309 "Pointers that are used in channels must be derived from " 310 "mira::Object. Pointers to other classes cannot be used.");
314 promote_channel<T*>(p);
317 assert(p->getBuffer()->isPolymorphic());
321 return static_cast<ConcreteChannel<T*>*
>(p);
328 inline ConcreteChannel<T>* channel_cast(AbstractChannel* p)
330 return ChannelCaster<T>::cast(p);
338 template <
typename T>
343 inline Channel<T> channel_cast(Channel<void> channel);
361 template <
typename T>
367 struct ParamHelper<void> {
421 template <
typename T>
435 mChannel(channel), mAccessFlags(accessFlags) {
436 assert(mChannel!=NULL);
452 return mChannel!=NULL;
463 "No channel assigned to the channel proxy (of type " <<
464 typeName<T>() <<
"). " 465 "You can obtain a channel using Authority::getChannel()!");
479 return mChannel->getID();
489 return mChannel->getTypeId();
503 return mChannel->getTypename();
512 mChannel->setTypename(
type);
520 return mChannel->getTypeMeta();
528 mChannel->setTypeMeta(meta);
546 return mChannel->getNrOfSlots() == 0;
554 return mChannel->hasSubscriber();
562 return mChannel->hasPublisher();
570 return mChannel->getBuffer()->getMaxSlots() == 1;
578 return mChannel->getBuffer()->getMaxSlots();
586 return mChannel->getBuffer()->getMinSlots();
595 return mChannel->getBuffer()->getStorageDuration();
603 return mChannel->getBuffer()->isAutoIncreasingStorageDuration();
611 return mChannel->getBuffer()->getSize();
637 validateReadAccess();
640 if(!timeout.isValid() || timeout.isInfinity())
641 end = Time::eternity();
645 while(!boost::this_thread::interruption_requested())
648 return mChannel->read();
649 }
catch(XInvalidRead& ex) {}
669 validateReadAccess();
672 if(!timeout.isValid() || timeout.isInfinity())
673 end = Time::eternity();
677 while(!boost::this_thread::interruption_requested())
698 validateReadAccess();
699 return mChannel->read();
724 validateReadAccess();
725 return mChannel->read(timestamp, mode, tolerance);
747 validateReadAccess();
748 Time newestSlotTime = mChannel->read()->timestamp;
751 for(; iter != interval.
end(); ++iter){
752 if(iter->sequenceID == sequenceID)
755 MIRA_THROW(XInvalidRead,
"No slot with sequenceID "<<sequenceID<<
756 " found within searchInterval of channel");
765 validateReadAccess();
766 return mChannel->read(timestamp,
NEAREST_SLOT, tolerance);
799 std::size_t olderSlots,
800 std::size_t newerSlots,
802 validateReadAccess();
803 return mChannel->readInterval(timestamp, nrSlots, olderSlots,
804 newerSlots, fillMode);
823 const Time& to=Time::eternity()) {
824 validateReadAccess();
825 return mChannel->readInterval(from, to);
844 validateWriteAccess();
845 return mChannel->write();
874 validateWriteAccess();
878 && (mChannel->getBuffer()->getMaxSlots()>1)
879 && (mChannel->getBuffer()->getSize()>0))
889 return mChannel->write();
906 validateReadAccess();
926 validateReadAccess();
927 ChannelRead<T> value = mChannel->read(timestamp, mode, tolerance);
937 validateReadAccess();
949 GetTime(
const Time& iT0) : t0(iT0) {}
952 typedef float result_type;
953 result_type operator()(
const Stamped<T>& p)
const {
955 return (p.timestamp-t0).totalMilliseconds();
964 typedef const T& result_type;
965 result_type operator()(
const Stamped<T>& p)
const {
992 template <
typename Filter>
994 validateReadAccess();
1000 filter.samplesBefore(),
1001 filter.samplesAfter());
1003 if (!filter.canExtrapolate())
1005 int samplesBefore = filter.samplesBefore();
1006 int samplesAfter = filter.samplesAfter();
1008 for (
auto it = data.
begin(); it != data.
end(); ++it)
1010 if (it->timestamp <= timestamp)
1012 else if (it->timestamp >= timestamp)
1016 if (samplesBefore > 0)
1017 MIRA_THROW(XRuntime,
"Filter::samplesBefore: Requirements not met. Missing " << samplesBefore <<
" items.");
1018 if (samplesAfter > 0)
1019 MIRA_THROW(XRuntime,
"Filter::samplesAfter: Requirements not met. Missing " << samplesAfter <<
" items.");
1027 if (mChannel->getNrOfSlots() == 1)
1034 const Time t0 = data.
begin()->timestamp;
1037 typedef boost::transform_iterator<GetTime, const_iterator> GetTimeIterator;
1038 GetTimeIterator begin1 = GetTimeIterator(data.
begin(), GetTime(t0));
1039 GetTimeIterator end1 = GetTimeIterator(data.
end() , GetTime(t0));
1041 typedef boost::transform_iterator<GetValue, const_iterator> GetValueIterator;
1043 GetValueIterator begin2=GetValueIterator(data.
begin(), GetValue());
1044 GetValueIterator end2 =GetValueIterator(data.
end() , GetValue());
1048 TimeContainer timeContainer(begin1, end1);
1049 ValueContainer valueContainer(begin2, end2);
1051 return makeStamped(filter.template apply<float, T>(timeContainer,
1053 (timestamp-t0).totalMilliseconds()),
1066 template <
typename U =
void>
1068 validateWriteAccess();
1075 template <
typename U =
void>
1077 validateWriteAccess();
1079 *writeSlot = std::move(value);
1092 template <
typename U>
1094 validateWriteAccess();
1096 *writeSlot =
makeStamped(std::forward<U>(value), timestamp);
1109 template <
typename U = T>
1110 typename std::enable_if<!std::is_void<U>::value>
::type 1112 validateWriteAccess();
1119 template <
typename U = T>
1120 typename std::enable_if<!std::is_void<U>::value>
::type 1122 validateWriteAccess();
1124 *writeSlot =
makeStamped(std::move(value), timestamp);
1133 template<
typename U>
1140 void dbgDump(
bool brief=
true) { mChannel->dbgDump(brief); }
1145 void validateReadAccess()
const 1149 MIRA_THROW(XAccessViolation,
"You are not allowed to read from channel '" 1150 << mChannel->getID() <<
"'. Forgot to subscribe?");
1153 void validateWriteAccess()
const 1157 MIRA_THROW(XAccessViolation,
"You are not allowed to write to channel '" 1158 << mChannel->getID() <<
"'. Forgot to publish?");
1163 ConcreteChannel<T>* mChannel;
1170 template<
typename U>
1171 inline Channel<U> channel_cast(Channel<void> channel)
1174 return Channel<U>(channel_cast<U>(channel.mChannel), channel.mAccessFlags);
1182 #include "impl/ChannelReadWrite.hpp" 1183 #include "impl/ChannelReadInterval.hpp" TypeMetaPtr getTypeMeta() const
Return the type meta information for this channel.
Definition: Channel.h:518
void post(U &&value, const Time ×tamp=Time::now())
Writes the specified data with the specified time stamp into the channel.
Definition: Channel.h:1093
An object that allows read access to a whole interval of channel data.
Definition: ChannelReadInterval.h:72
const std::string & getID() const
Return the channel ID, its name.
Definition: Channel.h:477
Read access (Authority is a subscriber)
Definition: Channel.h:353
bool hasPublisher() const
Returns true, if this channel has at least one publisher.
Definition: Channel.h:560
void finish()
Releases the lock explicitly.
Definition: ChannelReadWrite.h:543
std::size_t getMaxSlots() const
Returns the upper limit of slots that are allowed for this channel.
Definition: Channel.h:576
void post(const Stamped< T > &value)
Writes the specified data into the channel.
Definition: Channel.h:1067
An exception that occurs whenever a channel has no data.
Definition: Channel.h:88
void post(Stamped< T > &&value)
Same as above, for rvalue reference (move semantics)
Definition: Channel.h:1076
Write access (Authority is a publisher)
Definition: Channel.h:354
Macros for generating logical operators for using enum values as flags.
ChannelRead< T > read(uint32 sequenceID, const Duration &searchInterval=Duration::seconds(1))
Obtains read access to the element with the specified sequenceID within the given search interval If ...
Definition: Channel.h:745
void validate() const
Checks if the channel is valid otherwise throws an exception.
Definition: Channel.h:460
#define MIRA_ENUM_TO_FLAGS(EnumType)
Macro that can be used with enums that contain flags.
Definition: EnumToFlags.h:80
specialize cv::DataType for our ImgPixel and inherit from cv::DataType<Vec>
Definition: IOService.h:67
Subscriber classes used to subscribe on channels for automatic notification.
Channel(ConcreteChannel< T > *channel, ChannelAccessFlags accessFlags)
Is used by Framework to create a channel proxy object.
Definition: Channel.h:434
std::string Typename
Definition: Typename.h:60
void setTypeMeta(TypeMetaPtr meta)
Set the type meta information for this channel.
Definition: Channel.h:526
bool isTyped() const
Returns true, if the channel is typed and false, if it is untyped.
Definition: Channel.h:495
An object that allows exclusive write access to data of a channel.
Definition: ChannelReadWrite.h:594
#define MIRA_DEFINE_SERIALIZABLE_EXCEPTION(Ex, Base)
Macro for easily defining a new serializable exception class.
Definition: Exceptions.h:66
Definition: ChannelReadWrite.h:65
An object that allows read access to data of a channel.
Definition: ChannelReadWrite.h:435
ChannelReadInterval< T > readInterval(const Time ×tamp, std::size_t nrSlots, std::size_t olderSlots, std::size_t newerSlots, IntervalFillMode fillMode=PREFER_NEWER)
Obtains read access to an interval of elements before and after the requested timestamp.
Definition: Channel.h:797
#define MIRA_THROW(ex, msg)
Macro for throwing an exception.
Definition: Exception.h:81
Typename getTypename() const
Return the typename of the channel.
Definition: Channel.h:501
void finish()
Releases the lock explicitly and informs the Channel to signal all Subscribers that new data is avail...
Definition: ChannelReadWrite.h:704
Wrapper class for boost::posix_time::ptime for adding more functionality to it.
Definition: Time.h:421
ChannelRead< T > read(const Time ×tamp, SlotQueryMode mode=NEAREST_SLOT, const Duration &tolerance=Duration::infinity())
Obtains read access to the element at the specified timestamp.
Definition: Channel.h:722
const_iterator end() const
Definition: ChannelReadInterval.h:164
const_iterator begin() const
Definition: ChannelReadInterval.h:163
Classes for automatic locking/unlocking when reading and writing to channels.
MIRA_BASE_EXPORT void write(const Value &value, std::ostream &ioStream, bool formatted=false, int precision=-1)
Writes a json::Value into a given stream using the JSON format.
Commonly used exception classes.
std::size_t getNrOfSlots() const
Returns how many slots (i.e.
Definition: Channel.h:609
PropertyHint type(const std::string &t)
Sets the attribute "type" to the specified value.
Definition: PropertyHint.h:295
Definition: AbstractChannel.h:70
bool hasSoloSlot() const
Returns true, if this channel has one solo slot only.
Definition: Channel.h:568
sec_type seconds() const
Returns normalized number of seconds (0..59)
Definition: Time.h:283
Use this class to represent time durations.
Definition: Time.h:104
ChannelRead< T > read(const Time ×tamp, const Duration &tolerance)
Same as above, but always takes the nearest slot.
Definition: Channel.h:764
ChannelRead< T > read()
Obtains read access to the latest data element of this channel.
Definition: Channel.h:697
void reset()
Reset the proxy object so that it becomes invalid.
Definition: Channel.h:442
MIRA_BASE_EXPORT void read(const std::string &s, Value &oValue)
Read a json::Value from a string that contains JSON format.
Typed ChannelBuffer.
Definition: ChannelBuffer.h:860
The slot with smallest time difference to the requested will be chosen.
Definition: ChannelBuffer.h:103
Const iterator for iterating over the interval.
Definition: ChannelReadInterval.h:85
void setTypename(const Typename &type)
Set the typename of this channel.
Definition: Channel.h:510
Mix in for adding a time stamp, an optional frame id and an optional sequence id to data types like P...
Definition: Stamped.h:149
boost::shared_ptr< TypeMeta > TypeMetaPtr
Definition: MetaSerializer.h:309
std::enable_if<!std::is_void< U >::value >::type post(typename ParamHelper< T >::type &&value, const Time ×tamp=Time::now())
Same as above, for rvalue reference (move semantics)
Definition: Channel.h:1121
bool isAutoIncreasingStorageDuration() const
Returns whether the channel buffer is automatically increasing the storage duration.
Definition: Channel.h:601
std::enable_if<!std::is_void< U >::value >::type post(const typename ParamHelper< T >::type &value, const Time ×tamp=Time::now())
This allows post({}, Time()); to deduce we want to post an object of the channel's type...
Definition: Channel.h:1111
Duration getStorageDuration() const
Returns the timeframe that specifies how long a slot is guaranteed to be kept in the channel buffer...
Definition: Channel.h:593
Stamped< typename std::decay< T >::type > makeStamped(T &&value, const Time ×tamp=Time::now(), const std::string &frameID=std::string(), uint32 sequenceID=0)
Declare stamped classes for all standard data types including std::string.
Definition: Stamped.h:471
Base class for exceptions.
Definition: Exception.h:194
Generic buffer class that can be used as a replacement for std::vector whenever copying and reallocat...
Definition: Buffer.h:84
bool hasSubscriber() const
Returns true, if this channel has at least one subscriber.
Definition: Channel.h:552
static Duration infinity()
Returns a special duration time representing positive infinity.
Definition: Time.h:245
ChannelRead< T > waitForData(const Duration &timeout=Duration::infinity()) const
Waits until data in this channel becomes available.
Definition: Channel.h:635
Channel()
Create channel proxy that is not assigned to a channel.
Definition: Channel.h:429
ChannelWrite< T > write()
Obtains exclusive write access to the next free data slot of this channel.
Definition: Channel.h:843
No access at all (Authority is not a publisher nor a subscriber)
Definition: Channel.h:352
ChannelWrite< T > write(bool copyLatestData)
Same as write above with an additional parameter copyLatestData that allows you to specify whether th...
Definition: Channel.h:873
static Time now() static Time eternity()
Returns the current utc based time.
Definition: Time.h:484
std::size_t getMinSlots() const
Returns the number slots that are guaranteed to be kept in the channel buffer.
Definition: Channel.h:584
int getTypeId() const
Returns the type id of the channel.
Definition: Channel.h:487
Prefer filling the interval with slots with timestamp > requested.
Definition: ChannelBuffer.h:115
bool isValid() const
Returns true if the proxy object is valid.
Definition: Channel.h:451
IntervalFillMode
Mode that is used to determine what slots should be added to the interval when not enough slots are a...
Definition: ChannelBuffer.h:110
SlotQueryMode
Mode that is used to determine the slot obtained from a channel when no slot exists at the exact time...
Definition: ChannelBuffer.h:94
Base class for all framework channels.
#define MIRA_SLEEP(ms)
Sleeps for ms milliseconds This is a thread interruption point - if interruption of the current threa...
Definition: Thread.h:95
Implements AbstractChannelSubscriber for a concrete data type.
Definition: ChannelSubscriber.h:82
void removeSubscriber(AbstractChannelSubscriberPtr subscriber)
Removes the specified subscriber from this channel and calls its detach() method. ...
The framework that holds all manager classes and provides startup and shutdown of all framework relat...
Provides method for generating a unique id for any type.
bool isEmpty() const
Returns if the channel is empty (no data has been published)
Definition: Channel.h:544
bool waitForPublisher(const Duration &timeout=Duration::infinity()) const
Waits until this channel has at least one publisher.
Definition: Channel.h:667
Wraps an STL conform container around a range of values within another container. ...
Definition: IteratorRangeContainer.h:64
Classes for reading whole intervals of data from channels.
ChannelReadInterval< T > readInterval(const Time &from, const Time &to=Time::eternity())
Obtains read access to an interval of elements in a given time span.
Definition: Channel.h:822
ChannelAccessFlags
Flags specifying the access rights of an authority to a channel Can be combined.
Definition: Channel.h:351