48 #ifndef _MIRA_CHANNEL_H_ 49 #define _MIRA_CHANNEL_H_ 51 #ifndef _MIRA_FRAMEWORK_H_ 52 # error "Channel.h must be included via the Framework.h. You must not include it directly. Use #include <fw/Framework.h> instead" 56 #include <boost/shared_ptr.hpp> 57 #include <boost/type_traits/is_polymorphic.hpp> 104 typedef typename Buffer::ValueType ValueType;
105 typedef typename Buffer::Slot Slot;
108 typedef boost::shared_ptr<Subscriber> SubscriberPtr;
119 "ConcreteChannel must have the same size as AbstractChannel");
121 static_assert(boost::is_polymorphic<ConcreteChannel>::value==
false,
122 "ConcreateChannel and AbstractChannel must not be polymorphic");
133 boost::mutex::scoped_lock lock(mSubscribersMutex);
134 mNrOfSubscribersWithoutChannelSubscriber++;
141 void addSubscriber(SubscriberPtr subscriber)
143 boost::mutex::scoped_lock lock(mSubscribersMutex);
144 mSubscribers.push_back(subscriber);
145 subscriber->attachChannel(
this);
157 ChannelRead<T>
read();
195 ChannelReadInterval<T> readInterval(
const Time& timestamp, std::size_t nrSlots,
196 std::size_t olderSlots, std::size_t newerSlots,
217 ChannelReadInterval<T> readInterval(
const Time& from,
232 ChannelWrite<T>
write();
235 friend class Channel<T>;
248 struct ChannelCaster {
249 static ConcreteChannel<T>* cast(AbstractChannel* p)
253 promote_channel<T>(p);
255 assert(p->isTyped());
258 if(typeId<T>()!=p->getTypeId())
259 MIRA_THROW(XBadCast,
"Cannot cast channel '" << p->getID()
260 <<
"' (type '" << p->getTypename() <<
"') " 261 <<
" to typed channel of type '" << typeName<T>() <<
"'");
265 return static_cast<ConcreteChannel<T>*
>(p);
273 struct ChannelCaster<void> {
274 static ConcreteChannel<void>* cast(AbstractChannel* p)
278 return static_cast<ConcreteChannel<void>*
>(p);
286 struct ChannelCaster<T*> {
287 static ConcreteChannel<T*>* cast(AbstractChannel* p)
289 static_assert(std::is_base_of<mira::Object,T>::value,
290 "Pointers that are used in channels must be derived from " 291 "mira::Object. Pointers to other classes cannot be used.");
295 promote_channel<T*>(p);
298 assert(p->getBuffer()->isPolymorphic());
302 return static_cast<ConcreteChannel<T*>*
>(p);
309 inline ConcreteChannel<T>* channel_cast(AbstractChannel* p)
311 return ChannelCaster<T>::cast(p);
319 template <
typename T>
324 inline Channel<T> channel_cast(Channel<void> channel);
342 template <
typename T>
348 struct ParamHelper<void> {
402 template <
typename T>
416 mChannel(channel), mAccessFlags(accessFlags) {
417 assert(mChannel!=NULL);
433 return mChannel!=NULL;
444 "No channel assigned to the channel proxy (of type " <<
445 typeName<T>() <<
"). " 446 "You can obtain a channel using Authority::getChannel()!");
460 return mChannel->getID();
470 return mChannel->getTypeId();
484 return mChannel->getTypename();
493 mChannel->setTypename(
type);
501 return mChannel->getTypeMeta();
509 mChannel->setTypeMeta(meta);
527 return mChannel->getNrOfSlots() == 0;
535 return mChannel->hasSubscriber();
543 return mChannel->hasPublisher();
551 return mChannel->getBuffer()->getMaxSlots() == 1;
559 return mChannel->getBuffer()->getMaxSlots();
567 return mChannel->getBuffer()->getMinSlots();
576 return mChannel->getBuffer()->getStorageDuration();
584 return mChannel->getBuffer()->isAutoIncreasingStorageDuration();
592 return mChannel->getBuffer()->getSize();
618 validateReadAccess();
621 if(!timeout.isValid() || timeout.isInfinity())
626 while(!boost::this_thread::interruption_requested())
629 return mChannel->read();
630 }
catch(XInvalidRead& ex) {}
650 validateReadAccess();
653 if(!timeout.isValid() || timeout.isInfinity())
658 while(!boost::this_thread::interruption_requested())
679 validateReadAccess();
680 return mChannel->read();
705 validateReadAccess();
706 return mChannel->read(timestamp, mode, tolerance);
728 validateReadAccess();
729 Time newestSlotTime = mChannel->read()->timestamp;
732 for(; iter != interval.
end(); ++iter){
733 if(iter->sequenceID == sequenceID)
736 MIRA_THROW(XInvalidRead,
"No slot with sequenceID "<<sequenceID<<
737 " found within searchInterval of channel");
746 validateReadAccess();
747 return mChannel->read(timestamp,
NEAREST_SLOT, tolerance);
780 std::size_t olderSlots,
781 std::size_t newerSlots,
783 validateReadAccess();
784 return mChannel->readInterval(timestamp, nrSlots, olderSlots,
785 newerSlots, fillMode);
805 validateReadAccess();
806 return mChannel->readInterval(from, to);
826 validateWriteAccess();
827 return mChannel->write();
862 validateWriteAccess();
866 && (mChannel->getBuffer()->getMaxSlots()>1)
867 && (mChannel->getBuffer()->getSize()>0))
877 return mChannel->write();
894 validateReadAccess();
914 validateReadAccess();
915 ChannelRead<T> value = mChannel->read(timestamp, mode, tolerance);
925 validateReadAccess();
937 GetTime(
const Time& iT0) : t0(iT0) {}
940 typedef float result_type;
941 result_type operator()(
const Stamped<T>& p)
const {
943 return (p.timestamp-t0).totalMilliseconds();
952 typedef const T& result_type;
953 result_type operator()(
const Stamped<T>& p)
const {
980 template <
typename Filter>
982 validateReadAccess();
988 filter.samplesBefore(),
989 filter.samplesAfter());
991 if (!filter.canExtrapolate())
993 int samplesBefore = filter.samplesBefore();
994 int samplesAfter = filter.samplesAfter();
996 for (
auto it = data.
begin(); it != data.
end(); ++it)
998 if (it->timestamp <= timestamp)
1000 else if (it->timestamp >= timestamp)
1004 if (samplesBefore > 0)
1005 MIRA_THROW(XRuntime,
"Filter::samplesBefore: Requirements not met. Missing " << samplesBefore <<
" items.");
1006 if (samplesAfter > 0)
1007 MIRA_THROW(XRuntime,
"Filter::samplesAfter: Requirements not met. Missing " << samplesAfter <<
" items.");
1015 if (mChannel->getNrOfSlots() == 1)
1022 const Time t0 = data.
begin()->timestamp;
1025 typedef boost::transform_iterator<GetTime, const_iterator> GetTimeIterator;
1026 GetTimeIterator begin1 = GetTimeIterator(data.
begin(), GetTime(t0));
1027 GetTimeIterator end1 = GetTimeIterator(data.
end() , GetTime(t0));
1029 typedef boost::transform_iterator<GetValue, const_iterator> GetValueIterator;
1031 GetValueIterator begin2=GetValueIterator(data.
begin(), GetValue());
1032 GetValueIterator end2 =GetValueIterator(data.
end() , GetValue());
1036 TimeContainer timeContainer(begin1, end1);
1037 ValueContainer valueContainer(begin2, end2);
1039 return makeStamped(filter.template apply<float, T>(timeContainer,
1041 (timestamp-t0).totalMilliseconds()),
1054 template <
typename U =
void>
1056 validateWriteAccess();
1063 template <
typename U =
void>
1065 validateWriteAccess();
1067 *writeSlot = std::move(value);
1080 template <
typename U>
1081 typename std::enable_if<!std::is_base_of_v<StampedHeader, std::decay_t<U>>>
::type 1083 validateWriteAccess();
1085 *writeSlot =
makeStamped(std::forward<U>(value), timestamp);
1098 template <
typename U = T>
1099 typename std::enable_if<!std::is_void<U>::value>
::type 1101 validateWriteAccess();
1108 template <
typename U = T>
1109 typename std::enable_if<!std::is_void<U>::value>
::type 1111 validateWriteAccess();
1113 *writeSlot =
makeStamped(std::move(value), timestamp);
1122 template<
typename U>
1129 void dbgDump(
bool brief=
true) { mChannel->dbgDump(brief); }
1134 void validateReadAccess()
const 1138 MIRA_THROW(XAccessViolation,
"You are not allowed to read from channel '" 1139 << mChannel->getID() <<
"'. Forgot to subscribe?");
1142 void validateWriteAccess()
const 1146 MIRA_THROW(XAccessViolation,
"You are not allowed to write to channel '" 1147 << mChannel->getID() <<
"'. Forgot to publish?");
1152 ConcreteChannel<T>* mChannel;
1159 template<
typename U>
1160 inline Channel<U> channel_cast(Channel<void> channel)
1163 return Channel<U>(channel_cast<U>(channel.mChannel), channel.mAccessFlags);
1171 #include "impl/ChannelReadWrite.hpp" 1172 #include "impl/ChannelReadInterval.hpp" TypeMetaPtr getTypeMeta() const
Return the type meta information for this channel.
Definition: Channel.h:499
void write(const Value &value, std::ostream &ioStream, bool formatted=false, int precision=-1)
Writes a json::Value into a given stream using the JSON format.
std::enable_if<!std::is_base_of_v< StampedHeader, std::decay_t< U > > >::type post(U &&value, const Time ×tamp=Time::now())
Writes the specified data with the specified time stamp into the channel.
Definition: Channel.h:1082
An object that allows read access to a whole interval of channel data.
Definition: ChannelReadInterval.h:72
const std::string & getID() const
Return the channel ID, its name.
Definition: Channel.h:458
Read access (Authority is a subscriber)
Definition: Channel.h:334
bool hasPublisher() const
Returns true, if this channel has at least one publisher.
Definition: Channel.h:541
void finish()
Releases the lock explicitly.
Definition: ChannelReadWrite.h:601
std::size_t getMaxSlots() const
Returns the upper limit of slots that are allowed for this channel.
Definition: Channel.h:557
void post(const Stamped< T > &value)
Writes the specified data into the channel.
Definition: Channel.h:1055
An exception that occurs whenever a channel has no data.
Definition: Channel.h:88
void post(Stamped< T > &&value)
Same as above, for rvalue reference (move semantics)
Definition: Channel.h:1064
Write access (Authority is a publisher)
Definition: Channel.h:335
Macros for generating logical operators for using enum values as flags.
ChannelRead< T > read(uint32 sequenceID, const Duration &searchInterval=Duration::seconds(1))
Obtains read access to the element with the specified sequenceID within the given search interval If ...
Definition: Channel.h:726
void validate() const
Checks if the channel is valid otherwise throws an exception.
Definition: Channel.h:441
#define MIRA_ENUM_TO_FLAGS(EnumType)
Macro that can be used with enums that contain flags.
Definition: EnumToFlags.h:80
specialize cv::DataType for our ImgPixel and inherit from cv::DataType<Vec>
Definition: IOService.h:67
Subscriber classes used to subscribe on channels for automatic notification.
Channel(ConcreteChannel< T > *channel, ChannelAccessFlags accessFlags)
Is used by Framework to create a channel proxy object.
Definition: Channel.h:415
std::string Typename
Definition: Typename.h:60
void setTypeMeta(TypeMetaPtr meta)
Set the type meta information for this channel.
Definition: Channel.h:507
bool isTyped() const
Returns true, if the channel is typed and false, if it is untyped.
Definition: Channel.h:476
An object that allows exclusive write access to data of a channel.
Definition: ChannelReadWrite.h:652
void read(const std::string &s, Value &oValue)
Read a json::Value from a string that contains JSON format.
#define MIRA_DEFINE_SERIALIZABLE_EXCEPTION(Ex, Base)
Macro for easily defining a new serializable exception class.
Definition: Exceptions.h:66
Definition: ChannelReadWrite.h:67
An object that allows read access to data of a channel.
Definition: ChannelReadWrite.h:494
ChannelReadInterval< T > readInterval(const Time ×tamp, std::size_t nrSlots, std::size_t olderSlots, std::size_t newerSlots, IntervalFillMode fillMode=PREFER_NEWER)
Obtains read access to an interval of elements before and after the requested timestamp.
Definition: Channel.h:778
#define MIRA_THROW(ex, msg)
Macro for throwing an exception.
Definition: Exception.h:78
Typename getTypename() const
Return the typename of the channel.
Definition: Channel.h:482
void finish()
Releases the lock explicitly and informs the Channel to signal all Subscribers that new data is avail...
Definition: ChannelReadWrite.h:767
Wrapper class for boost::posix_time::ptime for adding more functionality to it.
Definition: Time.h:418
ChannelRead< T > read(const Time ×tamp, SlotQueryMode mode=NEAREST_SLOT, const Duration &tolerance=Duration::infinity())
Obtains read access to the element at the specified timestamp.
Definition: Channel.h:703
const_iterator end() const
Definition: ChannelReadInterval.h:164
const_iterator begin() const
Definition: ChannelReadInterval.h:163
Classes for automatic locking/unlocking when reading and writing to channels.
Commonly used exception classes.
std::size_t getNrOfSlots() const
Returns how many slots (i.e.
Definition: Channel.h:590
PropertyHint type(const std::string &t)
Sets the attribute "type" to the specified value.
Definition: PropertyHint.h:295
Definition: AbstractChannel.h:70
bool hasSoloSlot() const
Returns true, if this channel has one solo slot only.
Definition: Channel.h:549
sec_type seconds() const
Returns normalized number of seconds (0..59)
Definition: Time.h:280
Use this class to represent time durations.
Definition: Time.h:106
ChannelRead< T > read(const Time ×tamp, const Duration &tolerance)
Same as above, but always takes the nearest slot.
Definition: Channel.h:745
ChannelRead< T > read()
Obtains read access to the latest data element of this channel.
Definition: Channel.h:678
void reset()
Reset the proxy object so that it becomes invalid.
Definition: Channel.h:423
Typed ChannelBuffer.
Definition: ChannelBuffer.h:890
The slot with smallest time difference to the requested will be chosen.
Definition: ChannelBuffer.h:102
Const iterator for iterating over the interval.
Definition: ChannelReadInterval.h:85
void setTypename(const Typename &type)
Set the typename of this channel.
Definition: Channel.h:491
static Time now()
Returns the current utc based time.
Definition: Time.h:481
Mix in for adding a time stamp, an optional frame id and an optional sequence id to data types like P...
Definition: Stamped.h:149
boost::shared_ptr< TypeMeta > TypeMetaPtr
Definition: MetaSerializer.h:309
std::enable_if<!std::is_void< U >::value >::type post(typename ParamHelper< T >::type &&value, const Time ×tamp=Time::now())
Same as above, for rvalue reference (move semantics)
Definition: Channel.h:1110
bool isAutoIncreasingStorageDuration() const
Returns whether the channel buffer is automatically increasing the storage duration.
Definition: Channel.h:582
std::enable_if<!std::is_void< U >::value >::type post(const typename ParamHelper< T >::type &value, const Time ×tamp=Time::now())
This allows post({}, Time()); to deduce we want to post an object of the channel's type...
Definition: Channel.h:1100
Duration getStorageDuration() const
Returns the timeframe that specifies how long a slot is guaranteed to be kept in the channel buffer...
Definition: Channel.h:574
Stamped< typename std::decay< T >::type > makeStamped(T &&value, const Time ×tamp=Time::now(), const std::string &frameID=std::string(), uint32 sequenceID=0)
Declare stamped classes for all standard data types including std::string.
Definition: Stamped.h:471
static Time eternity()
Returns 9999/12/31 23:59:59.999999.
Definition: Time.h:489
Base class for exceptions.
Definition: Exception.h:195
Generic buffer class that can be used as a replacement for std::vector whenever copying and reallocat...
Definition: Buffer.h:84
bool hasSubscriber() const
Returns true, if this channel has at least one subscriber.
Definition: Channel.h:533
static Duration infinity()
Returns a special duration time representing positive infinity.
Definition: Time.h:242
ChannelRead< T > waitForData(const Duration &timeout=Duration::infinity()) const
Waits until data in this channel becomes available.
Definition: Channel.h:616
Channel()
Create channel proxy that is not assigned to a channel.
Definition: Channel.h:410
ChannelWrite< T > write()
Obtains exclusive write access to the next free data slot of this channel.
Definition: Channel.h:825
No access at all (Authority is not a publisher nor a subscriber)
Definition: Channel.h:333
ChannelWrite< T > write(bool copyLatestData)
Same as write above with an additional parameter copyLatestData that allows you to specify whether th...
Definition: Channel.h:861
std::size_t getMinSlots() const
Returns the number slots that are guaranteed to be kept in the channel buffer.
Definition: Channel.h:565
int getTypeId() const
Returns the type id of the channel.
Definition: Channel.h:468
Prefer filling the interval with slots with timestamp > requested.
Definition: ChannelBuffer.h:114
bool isValid() const
Returns true if the proxy object is valid.
Definition: Channel.h:432
IntervalFillMode
Mode that is used to determine what slots should be added to the interval when not enough slots are a...
Definition: ChannelBuffer.h:109
SlotQueryMode
Mode that is used to determine the slot obtained from a channel when no slot exists at the exact time...
Definition: ChannelBuffer.h:93
Base class for all framework channels.
#define MIRA_SLEEP(ms)
Sleeps for ms milliseconds This is a thread interruption point - if interruption of the current threa...
Definition: Thread.h:97
Implements AbstractChannelSubscriber for a concrete data type.
Definition: ChannelSubscriber.h:82
The framework that holds all manager classes and provides startup and shutdown of all framework relat...
Provides method for generating a unique id for any type.
bool isEmpty() const
Returns if the channel is empty (no data has been published)
Definition: Channel.h:525
bool waitForPublisher(const Duration &timeout=Duration::infinity()) const
Waits until this channel has at least one publisher.
Definition: Channel.h:648
Wraps an STL conform container around a range of values within another container. ...
Definition: IteratorRangeContainer.h:64
Classes for reading whole intervals of data from channels.
ChannelReadInterval< T > readInterval(const Time &from, const Time &to=Time::eternity())
Obtains read access to an interval of elements in a given time span.
Definition: Channel.h:803
ChannelAccessFlags
Flags specifying the access rights of an authority to a channel Can be combined.
Definition: Channel.h:332