MIRA
DispatcherThread.h
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2012 by
3  * MetraLabs GmbH (MLAB), GERMANY
4  * and
5  * Neuroinformatics and Cognitive Robotics Labs (NICR) at TU Ilmenau, GERMANY
6  * All rights reserved.
7  *
8  * Contact: info@mira-project.org
9  *
10  * Commercial Usage:
11  * Licensees holding valid commercial licenses may use this file in
12  * accordance with the commercial license agreement provided with the
13  * software or, alternatively, in accordance with the terms contained in
14  * a written agreement between you and MLAB or NICR.
15  *
16  * GNU General Public License Usage:
17  * Alternatively, this file may be used under the terms of the GNU
18  * General Public License version 3.0 as published by the Free Software
19  * Foundation and appearing in the file LICENSE.GPL3 included in the
20  * packaging of this file. Please review the following information to
21  * ensure the GNU General Public License version 3.0 requirements will be
22  * met: http://www.gnu.org/copyleft/gpl.html.
23  * Alternatively you may (at your option) use any later version of the GNU
24  * General Public License if such license has been publicly approved by
25  * MLAB and NICR (or its successors, if any).
26  *
27  * IN NO EVENT SHALL "MLAB" OR "NICR" BE LIABLE TO ANY PARTY FOR DIRECT,
28  * INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT OF
29  * THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF "MLAB" OR
30  * "NICR" HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  *
32  * "MLAB" AND "NICR" SPECIFICALLY DISCLAIM ANY WARRANTIES, INCLUDING,
33  * BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
34  * FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
35  * ON AN "AS IS" BASIS, AND "MLAB" AND "NICR" HAVE NO OBLIGATION TO
36  * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS OR MODIFICATIONS.
37  */
38 
48 #ifndef _MIRA_DISPATCHERTHREAD_H_
49 #define _MIRA_DISPATCHERTHREAD_H_
50 
51 #include <set>
52 #include <deque>
53 
54 #ifndef Q_MOC_RUN
55 #include <boost/function.hpp>
56 #include <boost/thread/condition_variable.hpp>
57 #endif
58 
59 #include <error/LoggingAux.h>
60 #include <utils/Time.h>
61 #include <utils/StringAlgorithms.h>
62 #include <thread/Thread.h>
63 #include <fw/Status.h>
64 #include <fw/Runnable.h>
65 
66 namespace mira {
67 
68 
69 template<typename T, typename Sequence = std::vector<T>,
70  typename Compare = std::less<typename Sequence::value_type> >
71 class IteratablePriorityQueue : public std::priority_queue<T,Sequence,Compare>
72 {
73  typedef std::priority_queue<T,Sequence,Compare> Base;
74 public:
75 
76  // types
77  typedef typename Sequence::iterator iterator;
78  typedef typename Sequence::const_iterator const_iterator;
79 
80 public:
81  explicit IteratablePriorityQueue(const Compare& x, const Sequence& s) :
82  Base(x,s) {}
83 
84  explicit IteratablePriorityQueue(const Compare& x = Compare(), Sequence&& s = Sequence()) :
85  Base(x,std::move(s)) {}
86 
87 
88  template<typename InputIterator>
89  IteratablePriorityQueue(InputIterator first, InputIterator last, const Compare& x, const Sequence& s) :
90  Base(first,last,x,s) {}
91 
92  template<typename InputIterator>
93  IteratablePriorityQueue(InputIterator first, InputIterator last, const Compare& x, Sequence&& s) :
94  Base(first,last,x,std::move(s)) {}
95 
96 public:
97 
98  iterator begin() { return this->c.begin(); }
99  iterator end() { return this->c.end(); }
100 
101  const_iterator begin() const { return this->c.begin(); }
102  const_iterator end() const { return this->c.end(); }
103 
104 
105 };
106 
107 
108 
109 
110 
112 
131 {
132 public:
133 
135  class Timer;
137  typedef boost::function<void(const Timer&)> TimerCallback;
138 
146  class Timer : public IRunnable, boost::noncopyable
147  {
148  protected:
150  friend class DispatcherThread;
151 
157  Timer(TimerCallback callback, Duration period,
158  Duration tolerance = Duration::invalid());
159 
165  Timer(TimerCallback callback, Time invokationTime);
166 
167  protected:
168 
173  currentExpected += mPeriod;
174  }
175 
176  void run(DispatcherThread* dispatcher) override;
177 
178  public:
179 
183  bool isActive() const { return mActive; }
184 
189  void setPeriod(Duration period) { mPeriod = period; }
190 
194  Duration getPeriod() const { return mPeriod; }
195 
201  void setTolerance(Duration tolerance) { mTolerance = tolerance; }
202 
207  Duration getTolerance() const { return mTolerance; }
208 
216  {
217  if (mPeriod > Duration::seconds(0))
218  {
220  if (dt > mTolerance)
221  return dt;
222  }
223  return Duration::seconds(0);
224  }
225 
226 
227  public:
228 
232  void start();
233 
238  void stop();
243  return currentExpected;
244  }
245 
246  public:
247 
250 
253 
256 
259 
260  private:
261 
262  TimerCallback mCallback;
263  Duration mPeriod;
264  Duration mTolerance;
265  bool mOneShot;
266  bool mActive;
267  };
268 
269  typedef boost::shared_ptr<Timer> TimerPtr;
270 
271 public:
272 
277  DispatcherThread(const std::string& name = "");
278 
279  virtual ~DispatcherThread();
280 
281  void setName(const std::string& name);
282 
283  bool insertRunnable(IRunnablePtr runnable, bool singleton = false, Time time=Time::now());
284  void removeRunnable(IRunnablePtr runnable);
285 
286 
294  void addImmediateHandler(IRunnablePtr runnable);
295 
303  template <typename F>
304  void addImmediateHandlerFunction(F&& fn, DiagnosticsModulePtr errorModule=nullptr) {
305  IRunnablePtr r(new FunctionRunnable<F>(static_cast<F&&>(fn),errorModule));
307  }
308 
312  void addFinalizeHandler(IRunnablePtr runnable);
313 
319  template <typename F>
320  void addFinalizeHandlerFunction(F&& fn, DiagnosticsModulePtr errorModule=nullptr) {
321  IRunnablePtr r(new FunctionRunnable<F>(static_cast<F&&>(fn),errorModule));
323  }
324 
331  TimerPtr createTimer(Duration period, Duration periodTolerance,
332  TimerCallback callback, bool oneshot=false);
333 
339  TimerPtr createTimer(Duration period, TimerCallback callback,
340  bool oneshot=false);
341 
346  TimerPtr createTimer(Time time, TimerCallback callback);
347 
353  void removeTimer(TimerPtr timer);
354 
358  bool hasTimer(TimerPtr timer);
359 
377  void start(bool startThread = true);
378 
383  void stop();
384 
391  bool hasUnrecoverableFailure() const;
392 
398 
400  bool isRunning() const;
401 
416  bool spin(Duration maxWait = Duration::infinity());
417 
425  boost::thread::id getThreadID() const {
426  return mThreadId;
427  }
428 
440  [[nodiscard]] bool hasWork(Time horizon = Time::now()) const;
441 
442 protected:
450  void run();
451 
452 private:
453 
454  void runThread();
455 
456 private:
457 
459  bool processImmediateHandlers();
460  bool processFinalizeHandlers();
461  bool processSpin(const Duration& maxWait);
462 
463 private:
464 
465  bool isInterruptionRequested();
466 
467 protected:
468 
470  struct QueueItem
471  {
474 
475 
479 
480  bool operator<(const QueueItem& other) const { return time>other.time; }
481  };
482 
484 
485 
487  std::set<IRunnablePtr> mPendingRunnables;
488 
489  mutable boost::mutex mConditionMutex;
490  boost::condition_variable mCondition;
491 
493 
494  void postProcessTimer(TimerPtr timer);
495 
497  std::string mName;
498 
499  std::list<IRunnablePtr> mImmediateHandlers;
500  mutable boost::mutex mImmediateHandlerMutex;
501 
502  std::list<IRunnablePtr> mFinalizeHandlers;
503  boost::mutex mFinalizeHandlerMutex;
504 
505  std::set<TimerPtr> mTimers;
506  boost::mutex mTimerMutex; // protects above handler sets
507 
510 
511  // unfortunatelly, we need our own interruption requested flag, since
512  // boost::this_thread::interruption_requested() is cleared after a
513  // boost::thread_interrupted exception is thrown, however, we need this
514  // flag to be true, until we really have terminated
516 
517  boost::thread mThread;
518  boost::thread::id mThreadId;
520 
521 };
522 
526 
528 
529 } // namespace
530 
531 #endif /* _MIRA_DISPATCHERTHREAD_H_ */
bool toBeRemoved
Definition: DispatcherThread.h:478
boost::mutex mConditionMutex
Definition: DispatcherThread.h:489
void start(bool startThread=true)
Starts the dispatcher.
std::set< TimerPtr > mTimers
Definition: DispatcherThread.h:505
static Duration invalid()
Returns an invalid duration.
Definition: Time.h:249
Time currentExpected
time the current callback should be happening
Definition: DispatcherThread.h:252
Sequence::iterator iterator
Definition: DispatcherThread.h:77
bool mInterruptionRequested
Definition: DispatcherThread.h:515
void run(DispatcherThread *dispatcher) override
Called from dispatcher thread this runnable is attached to, whenever the runnable should be executed...
boost::condition_variable mCondition
Definition: DispatcherThread.h:490
DispatcherThread::Timer Timer
Definition: DispatcherThread.h:523
DispatcherThread::TimerPtr TimerPtr
Definition: DispatcherThread.h:524
bool mUnrecoverableFailure
Definition: DispatcherThread.h:508
TimerPtr createTimer(Duration period, Duration periodTolerance, TimerCallback callback, bool oneshot=false)
Creates and adds a timer that gets called cyclic in a given period and has a tolerance for exceeding ...
Class representing timers and tasks that can be registered and executed by the dispatcher thread...
Definition: DispatcherThread.h:146
TODO Add description.
DispatcherThread(const std::string &name="")
Construct a dispatcher thread with an optional name.
specialize cv::DataType for our ImgPixel and inherit from cv::DataType<Vec>
Definition: IOService.h:67
void setName(const std::string &name)
boost::function< void(const Timer &)> TimerCallback
Signature of a timer/task callback function.
Definition: DispatcherThread.h:135
boost::mutex mFinalizeHandlerMutex
Definition: DispatcherThread.h:503
bool mIsRunning
Definition: DispatcherThread.h:509
IteratablePriorityQueue(const Compare &x, const Sequence &s)
Definition: DispatcherThread.h:81
bool spin(Duration maxWait=Duration::infinity())
Invoke the dispatcher manually (instead of starting a separate thread via the start() method)...
Time and Duration wrapper class.
STL namespace.
void addImmediateHandlerFunction(F &&fn, DiagnosticsModulePtr errorModule=nullptr)
Adds a function that is executed once as soon as possible within the DispatcherThreads main thread...
Definition: DispatcherThread.h:304
Runnable that wraps any function pointer.
Definition: Runnable.h:115
QueueItem(IRunnablePtr r, Time t)
Definition: DispatcherThread.h:473
bool operator<(const QueueItem &other) const
Definition: DispatcherThread.h:480
void postProcessTimer(TimerPtr timer)
IteratablePriorityQueue(InputIterator first, InputIterator last, const Compare &x, Sequence &&s)
Definition: DispatcherThread.h:93
Wrapper class for boost::posix_time::ptime for adding more functionality to it.
Definition: Time.h:418
IteratablePriorityQueue(InputIterator first, InputIterator last, const Compare &x, const Sequence &s)
Definition: DispatcherThread.h:89
IteratablePriorityQueue< QueueItem, std::deque< QueueItem > > Queue
Definition: DispatcherThread.h:483
boost::mutex mTimerMutex
Definition: DispatcherThread.h:506
void setTolerance(Duration tolerance)
Set the tolerance that is allowed to exceed the next invocation time before issuing a warning and res...
Definition: DispatcherThread.h:201
boost::shared_ptr< Timer > TimerPtr
Definition: DispatcherThread.h:269
std::list< IRunnablePtr > mImmediateHandlers
Definition: DispatcherThread.h:499
void run()
Runs this thread dispatchers main loop.
Base for all runnable classes that can be signaled when they need to run again.
Definition: Runnable.h:72
Auxiliary logging macros for special entities like exceptions, etc.
Includes, defines and functions for threads.
Definition: DispatcherThread.h:71
Duration getExceedance() const
Checks and returns the amount of time the timer has exceeded the expected invocation time...
Definition: DispatcherThread.h:215
DispatcherThread::TimerCallback TimerCallback
Definition: DispatcherThread.h:525
Time getNextInvocationTime() const
Return next time of execution.
Definition: DispatcherThread.h:242
void setPeriod(Duration period)
Changes the period of the timer.
Definition: DispatcherThread.h:189
void start()
Start the timer (activates it)
sec_type seconds() const
Returns normalized number of seconds (0..59)
Definition: Time.h:280
Time last
time the last callback happened
Definition: DispatcherThread.h:249
Use this class to represent time durations.
Definition: Time.h:106
bool insertRunnable(IRunnablePtr runnable, bool singleton=false, Time time=Time::now())
void stop()
Stops the dispatcher, if it is running.
iterator end()
Definition: DispatcherThread.h:99
bool isRunning() const
Returns true, if the dispatcher currently is running.
static Time now()
Returns the current utc based time.
Definition: Time.h:481
boost::thread::id getThreadID() const
Returns the thread id of the thread that is used to process the dispatcher.
Definition: DispatcherThread.h:425
boost::shared_ptr< IRunnable > IRunnablePtr
a runnable pointer
Definition: Runnable.h:63
void addImmediateHandler(IRunnablePtr runnable)
Adds a runnable that is executed once as soon as possible within the DispatcherThreads main thread...
void stop()
Stops a timer (deactivates it)
XXX.
Definition: DispatcherThread.h:470
Class that can be used whenever you want to have ONE thread where several handlers are assigned to...
Definition: DispatcherThread.h:130
boost::mutex mImmediateHandlerMutex
Definition: DispatcherThread.h:500
void updateInvocationTime()
Called by dispatcher when timer is rescheduled.
Definition: DispatcherThread.h:172
static Duration infinity()
Returns a special duration time representing positive infinity.
Definition: Time.h:242
Duration getTolerance() const
Get the tolerance for exceed the next invocation time.
Definition: DispatcherThread.h:207
bool isActive() const
Returns if the timer is active.
Definition: DispatcherThread.h:183
IteratablePriorityQueue(const Compare &x=Compare(), Sequence &&s=Sequence())
Definition: DispatcherThread.h:84
Duration getPeriod() const
Returns the current period of the timer.
Definition: DispatcherThread.h:194
void removeTimer(TimerPtr timer)
Removes the given timer (timer callback will not be called again and timer will not be rescheduled)...
QueueItem()
Definition: DispatcherThread.h:472
Sequence::const_iterator const_iterator
Definition: DispatcherThread.h:78
Time time
Definition: DispatcherThread.h:477
bool hasTimer(TimerPtr timer)
Returns true, if the given timer.
boost::thread mThread
Definition: DispatcherThread.h:517
std::set< IRunnablePtr > mPendingRunnables
Definition: DispatcherThread.h:487
Queue mQueue
Definition: DispatcherThread.h:486
bool mPendingSignal
Definition: DispatcherThread.h:519
IRunnablePtr runnable
Definition: DispatcherThread.h:476
std::string mName
Definition: DispatcherThread.h:497
Exception that can be thrown in every handler to indicate an unrecoverable failure.
Definition: DispatcherThread.h:397
bool hasWork(Time horizon=Time::now()) const
Checks if there is a work item (either an immediate handler or an item in the execution queue with an...
const_iterator end() const
Definition: DispatcherThread.h:102
bool hasUnrecoverableFailure() const
Returns true, if there was a failure while processing the immediate handlers.
const_iterator begin() const
Definition: DispatcherThread.h:101
Include file for including all string algorithm headers.
boost::thread::id mThreadId
Definition: DispatcherThread.h:518
iterator begin()
Definition: DispatcherThread.h:98
void addFinalizeHandler(IRunnablePtr runnable)
Adds a runnable that is executed once just before the thread terminates.
void removeRunnable(IRunnablePtr runnable)
Base class for modules that want to use diagnostics and set the current status.
Definition: Status.h:136
void addFinalizeHandlerFunction(F &&fn, DiagnosticsModulePtr errorModule=nullptr)
Adds a function that is executed once just before the thread terminates.
Definition: DispatcherThread.h:320
std::list< IRunnablePtr > mFinalizeHandlers
Definition: DispatcherThread.h:502
Status and status management classes used for diagnostics.
Timer(TimerCallback callback, Duration period, Duration tolerance=Duration::invalid())
Constructs a timer with a given callback that is called whenever the timers invocation time is due an...
Duration lastDuration
How long the last callback ran for.
Definition: DispatcherThread.h:258
Time current
time the current callback was actually called (Time::now() as of the beginning of the callback) ...
Definition: DispatcherThread.h:255