MIRA
RemoteConnection.h
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2012 by
3  * MetraLabs GmbH (MLAB), GERMANY
4  * and
5  * Neuroinformatics and Cognitive Robotics Labs (NICR) at TU Ilmenau, GERMANY
6  * All rights reserved.
7  *
8  * Contact: info@mira-project.org
9  *
10  * Commercial Usage:
11  * Licensees holding valid commercial licenses may use this file in
12  * accordance with the commercial license agreement provided with the
13  * software or, alternatively, in accordance with the terms contained in
14  * a written agreement between you and MLAB or NICR.
15  *
16  * GNU General Public License Usage:
17  * Alternatively, this file may be used under the terms of the GNU
18  * General Public License version 3.0 as published by the Free Software
19  * Foundation and appearing in the file LICENSE.GPL3 included in the
20  * packaging of this file. Please review the following information to
21  * ensure the GNU General Public License version 3.0 requirements will be
22  * met: http://www.gnu.org/copyleft/gpl.html.
23  * Alternatively you may (at your option) use any later version of the GNU
24  * General Public License if such license has been publicly approved by
25  * MLAB and NICR (or its successors, if any).
26  *
27  * IN NO EVENT SHALL "MLAB" OR "NICR" BE LIABLE TO ANY PARTY FOR DIRECT,
28  * INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT OF
29  * THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF "MLAB" OR
30  * "NICR" HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  *
32  * "MLAB" AND "NICR" SPECIFICALLY DISCLAIM ANY WARRANTIES, INCLUDING,
33  * BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
34  * FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
35  * ON AN "AS IS" BASIS, AND "MLAB" AND "NICR" HAVE NO OBLIGATION TO
36  * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS OR MODIFICATIONS.
37  */
38 
47 #ifndef _MIRA_REMOTECONNECTION_H_
48 #define _MIRA_REMOTECONNECTION_H_
49 
50 #ifndef Q_MOC_RUN
51 #include <boost/array.hpp>
52 #include <boost/make_shared.hpp>
53 #endif
54 
55 #include <utils/UUID.h>
57 #include <error/Exceptions.h>
58 
59 #include <rpc/RPCManager.h>
60 
61 #include <fw/RemoteAuthority.h>
63 
65 #include <fw/Channel.h>
66 #include <fw/FrameworkMessage.h>
67 #include <fw/ServiceLevel.h>
68 
69 namespace mira {
70 
72 
76 
81 {
83  address(""),
84  keep(true),
85  forcePTP(false),
86  binaryFormatVersion(BinaryBufferSerializer::getSerializerFormatVersion()),
87  monitorOnly(false),
88  lastConnectionTry(Time::unixEpoch()) {}
89 
91  template<typename Reflector>
92  void reflect(Reflector& r)
93  {
94  r.member("Address", address, "");
95  r.roproperty("Address", address, "The address IP:port");
96 
97  r.member("KeepConnected", keep, "", true);
98  r.roproperty("KeepConnected", keep, "Whether to reconnect after a lost connection");
99 
100  r.member("ForcePTP", forcePTP, "", false);
101  r.roproperty("ForcePTP", forcePTP, "Whether to force PTP time sync for this connection");
102 
103  r.member("BinaryFormatVersion", binaryFormatVersion, "",
104  BinaryBufferSerializer::getSerializerFormatVersion());
105  r.roproperty("BinaryFormatVersion", binaryFormatVersion,
106  "Binary format version used by the remote framework");
107 
108  r.member("MonitorOnly", monitorOnly, "", false);
109  r.roproperty("MonitorOnly", monitorOnly,
110  "Monitor-only connections do not publish local channels, "
111  "services and authorities to remote framework");
112 
113  r.member("LastConnectionTry", lastConnectionTry, "",
115  r.roproperty("LastConnectionTry", lastConnectionTry,
116  "Last time tried to establish connection (UTC)");
117  }
118 
119  void validate() const;
120  boost::tuple<std::string, uint16> getHostPort() const;
121 
123  std::string address;
124 
129  bool keep;
130 
132  bool forcePTP;
133 
139 
145 
151 };
152 
153 template <typename SerializerTag>
154 class IsTransparentSerializable<KnownFramework, SerializerTag> : public std::true_type {};
155 
157 
159 {
160 public:
164  mStartOffset(other.mStartOffset),
165  mStartTime(other.mStartTime) {}
166 
167  explicit TimeOffsetCompensation(const Duration& compensationInterval = Duration::seconds(10))
168  : mCompensationInterval(compensationInterval.totalMicroseconds()),
169  mTargetOffset(Duration::invalid()),
170  mStartOffset(Duration::invalid()),
171  mStartTime(Time::invalid()) {}
172 
174  template<typename Reflector>
175  void reflect(Reflector& r)
176  {
177  r.property("CompensationInterval",
180  "Interval to reach target offset");
181  r.roproperty("TargetOffset", mTargetOffset, "Target offset");
182  r.roproperty("StartOffset", mStartOffset, "Start offset");
183  r.roproperty("StartTime", mStartTime, "Time when start offset was valid");
184  r.roproperty("CurrentOffset",
186  "Current offset (interpolated between start and target offset)");
187  }
188 
189 public:
190  bool isInitialized()const { return mTargetOffset.isValid(); }
191 
192 public:
194  void setCompensationInterval(const Duration& interval);
195 
196  void setTargetOffset(const Duration& target, const Time& ts = Time::now());
197 
198 public:
199  Duration queryOffset(const Time& ts) const;
200  Duration queryOffset() const { return queryOffset(Time::now()); } // for use in getter
201 
202 public:
203 
204  // assignment operator uses copy constructor (through by-value argument)
206  using std::swap;
208  swap(mTargetOffset, other.mTargetOffset);
209  swap(mStartOffset, other.mStartOffset);
210  swap(mStartTime, other.mStartTime);
211  return *this;
212  }
213 
214  // "Interface adaptors" to support assign from and query as Duration
216  setTargetOffset(target);
217  return *this;
218  }
219 
220  operator Duration() const { return queryOffset(); }
221 
222 protected:
223 
224  Duration offset(const Time& ts) const;
225 
226 protected:
227 
228  int mCompensationInterval; // total microseconds
232 
233  mutable boost::mutex mMutex;
234 };
235 
237 
276 class RemoteConnection : public Object
277 {
279 public:
280 
281  typedef std::map<std::string, Typename> ChannelTypeMap;
282  typedef std::set<std::string> StringSet;
283 
284  struct SendData {
285  uint8 metaVersion;
287  };
288  typedef std::map<std::string, SendData> ChannelSendMap;
289 
290  typedef std::set<std::string> MetaSet;
291  typedef std::list<AuthorityDescription> AuthorityDescriptions;
292 
294 
302  {
303  public:
306  mConnection(iConnection)
307  {}
308 
314  virtual void onRPCfinished(Buffer<uint8>&& answer);
315 
317  void setConnection(RemoteConnection* iConnection);
318 
319  protected:
320 
321  boost::mutex mConnectionMutex;
323  };
324 
326 
333  {
334  public:
337  mConnection(iConnection)
338  {}
339 
345  virtual void onRPCrequested(Buffer<uint8>&& request);
346 
348  void setConnection(RemoteConnection* iConnection);
349 
350  protected:
351 
352  boost::mutex mConnectionMutex;
354  };
355 
357 
358 protected:
364 
369  RemoteConnection(boost::asio::io_service& service);
370 
371 public:
373  virtual ~RemoteConnection();
374 
375  template<typename Reflector>
376  void reflect(Reflector& r)
377  {
378  r.roproperty("RemoteFrameworkName", frameworkID, "ID/Name of the remote framework");
379  r.roproperty("RemoteProtocolVersion",
380  getter<std::string>( [&]()->std::string {
381  return MakeString() << MIRA_MAJOR_VERSION(remoteVersion) << "."
383  "Protocol version of the remote framework");
384  r.roproperty("Connected", synchronizedTime, "Time when the connection was established");
385  r.roproperty("PTPSync", getter(&RemoteConnection::isPTPSyncEnabled, this),
386  "Synchronization of clocks via PTP enabled?");
387  r.roproperty("PingTimeout", getter(&RemoteConnection::isPingTimeoutEnabled, this),
388  "Ping timeout enabled?");
389  r.property("TimeSynch", clockOffset, "PTP time synchronization");
390  }
391 
393  boost::asio::ip::tcp::socket& getSocket()
394  {
395  return mSocket;
396  }
397 
399  virtual void start();
400 
402  void stop();
403 
409  void onConnect(bool enablePTPTimeSync, bool enablePingTimeout);
410 
413  {
414  return mEnablePTPSync;
415  }
416 
419  {
420  return mEnablePingTimeout;
421  }
422 
429  void startTimeSync();
430 
431 
438  bool isSynchronized() const
439  {
440  return synchronizedTime.isValid();
441  }
442 
447  virtual void publishChannels(const ChannelTypeMap& channels) = 0;
448 
453  void unpublishChannel(const std::string& channel);
454 
459  virtual void subscribeChannel(const std::string& channelID, const ServiceLevel& serviceLevel) = 0;
460 
465  virtual void publishAuthorities(const AuthorityDescriptions& authorities) = 0;
466 
471  virtual void unpublishAuthorities(const AuthorityDescriptions& authorities) = 0;
472 
477  void migrateUnit(const std::string& id);
478 
483  bool hasAuthority(const std::string& id) const;
484 
489  virtual void publishServices(const StringSet& services) = 0;
490 
495  virtual void unpublishServices(const StringSet& services) = 0;
496 
501  template <typename BufferSequence>
502  void write(const BufferSequence& buffers)
503  {
504  boost::mutex::scoped_lock lock(mWriteMutex);
505  if (mStopped)
506  return;
507  try
508  {
509  std::size_t bytes = boost::asio::write(mSocket, buffers, boost::asio::transfer_all());
510  updateOutgoingStats(bytes);
511  }
512  catch (boost::system::system_error& e)
513  {
514  // let the derived class decide what to do with the error
515  onWriteError(e);
516  }
517  }
518 
520  {
521  return remoteID;
522  }
523 
524  std::string getGlobalID() const
525  {
526  return authority->getGlobalID();
527  }
528 
529  void setAuthority(std::unique_ptr<Authority> auth)
530  {
531  authority = std::move(auth);
532  }
533 
534  const std::string& getFrameworkID() const
535  {
536  return frameworkID;
537  }
538 
539  const KnownFramework& getAddress() const
540  {
541  return address;
542  }
543 
544  void setAddress(const KnownFramework& addr)
545  {
546  address = addr;
547  }
548 
549  void setAutoReconnect(bool autoReconnect)
550  {
551  address.keep = autoReconnect;
552  }
553 
554 protected:
555 
560  virtual void onDisconnect() {}
561 
563  virtual void onWriteError(boost::system::system_error& e) {}
564 
571  void syncTime();
572 
576  void sendPTP();
577 
582  void ping();
583 
587  bool hasPingTimeout() const;
588 
595  FrameworkMessageHeader header(0, msg);
596  boost::array<boost::asio::const_buffer, 1> buffers =
597  {{
598  header.asBuffer(),
599  }};
600  write(buffers);
601  }
602 
604 
609  #define GEN_WRITEMESSAGE_WRITE(z,n,_) os << p##n;
610  #define GEN_WRITEMESSAGE_PARAM_DECL(z,n,_) , const P##n & p##n
611  #define GEN_WRITEMESSAGE(z,n,_) \
612  template<BOOST_PP_ENUM_PARAMS_Z(z,BOOST_PP_INC(n),typename P)> \
613  void writeMessage(FrameworkMessageType msg \
614  BOOST_PP_REPEAT_ ## z(BOOST_PP_INC(n),GEN_WRITEMESSAGE_PARAM_DECL,nil)) {\
615  BinaryBufferOstream::buffer_type buffer; \
616  BinaryBufferOstream os(&buffer); \
617  BOOST_PP_REPEAT_ ## z(BOOST_PP_INC(n),GEN_WRITEMESSAGE_WRITE,nil) \
618  FrameworkMessageHeader header(buffer.size(), msg); \
619  boost::array<boost::asio::const_buffer, 2> buffers = \
620  {{ \
621  header.asBuffer(), \
622  boost::asio::buffer(buffer.data(), buffer.size()) \
623  }}; \
624  write(buffers); \
625  }
626  BOOST_PP_REPEAT(BOOST_PP_INC(8),GEN_WRITEMESSAGE, nil);
627  #undef GEN_WRITEMESSAGE
628  #undef GEN_WRITEMESSAGE_WRITE
629  #undef GEN_WRITEMESSAGE_PARAM_DECL
630 
632 
638  template <typename BufferType>
639  void writeMessageFromBuffer(FrameworkMessageType msg, const BufferType& buffer) {
640  FrameworkMessageHeader header(buffer.size(), msg);
641  boost::array<boost::asio::const_buffer, 2> buffers =
642  {{
643  header.asBuffer(),
644  boost::asio::buffer(buffer.data(), buffer.size())
645  }};
646  write(buffers);
647  }
648 
654  template <typename DataType>
655  void writeMessageFromData(FrameworkMessageType msg, const DataType& data) {
656  FrameworkMessageHeader header(sizeof(DataType), msg);
657  boost::array<boost::asio::const_buffer, 2> buffers =
658  {{
659  header.asBuffer(),
660  boost::asio::buffer((void*)&data, sizeof(DataType))
661  }};
662  write(buffers);
663  }
664 
671 
677  void valueChanged(ChannelRead<void> value, ServiceLevel& serviceLevel);
678 
683  void parseMessage();
684 
687  std::string frameworkID;
688  uint32 remoteVersion;
689  std::unique_ptr<Authority> authority;
693 
698 
699 private:
700 
701  void init(); // called by constructors to initialize members
702 
704  bool isLocal() const;
705 
706 protected:
707 
708  void receivedPTPFollowUp(uint64 timestamp);
709  void receivedPTPDelayResponse(uint64 timestamp);
710  void receivedPTPDelayRequest(uint64 timestamp);
711  void receivedPTPFinish();
713  void receivedUnsubscribeChannelRequest(const std::string& channelID);
718  virtual void receivedPublishServiceMsg() = 0;
721  virtual void receivedRPCRequestMsg() = 0;
722  virtual void receivedRPCResponseMsg() = 0;
724  void receivedMigrationMsg();
728  void receivedTypeMetaMsg();
729  void receivedChannelMetaMsg();
730  void receivedPingMsg();
731 
732  // This method must not be pure virtual: When a remote incoming connection
733  // times out and will be destroyed, some still pending valueChanged()
734  // callbacks will be executed, which will call sendData. At this time, the
735  // RemoteIncomingConnection is already destroyed (or will be destroyed) and
736  // then sendData will be a pure-virtual function.
737  // TODO: that still applies after r9735?
738  virtual void sendData(ChannelRead<void> value, ServiceLevel& serviceLevel) {}
739 
740  virtual int addBinaryFormatVersion(Buffer<uint8>& data) = 0;
741  void synchronizeFrameworks();
742  void updateOutgoingStats(std::size_t size);
743 
749  bool checkMessageHeader() const;
750 
751  void sendConnectDenied(const std::string& msg);
752 
753  void sendRPCMessagesThread();
754  void processPingThread();
755  void checkPingTimeoutThread();
757 
759  boost::asio::ip::tcp::socket mSocket;
760 
761  boost::condition_variable mRPCMessagesCondition;
762 
763  boost::mutex mWriteMutex;
764  boost::mutex mStopMutex;
765 
767 
768  boost::thread mSendRPCMessagesThread;
769  boost::thread mProcessPingThread;
770  boost::thread mCheckPingTimeoutThread;
772 
776 
777  enum AuthState {
783  };
785  std::string mAuthSignMsg; // random message that is used for strong authentication
786 
793 
796 
797  boost::shared_ptr<MicroUnit> mMigrationUnit;
798  std::string mMigrationNS;
799  std::string mMigrationID;
800  bool mStopped;
801 
802  boost::shared_ptr<RPCRemoteFinishHandler> mRPCFinishHandler;
803  boost::shared_ptr<RPCRemoteRequestHandler> mRPCRequestHandler;
804  std::map<std::string, boost::shared_ptr<RemoteAuthority>> mRemoteAuthorities;
805 
806  typedef std::pair<FrameworkMessageType, Buffer<uint8>> RPCMessage;
807  std::list<RPCMessage> mOutgoingRPCMessages;
808  boost::mutex mRPCMessagesMutex;
809 
810  std::unordered_map<std::string, ServiceLevel> mPendingChannelUpdates;
811  boost::mutex mChannelUpdatesMutex;
812 };
813 
814 template <uint8 BinaryFormatVersion>
816 {
817 protected:
820 
822  ConcreteRemoteConnection(boost::asio::io_service& service) : RemoteConnection(service) {}
823 
825 
826 public:
829 
830 protected:
831 
832  virtual void publishChannels(const ChannelTypeMap& channels);
833  virtual void subscribeChannel(const std::string& channelID, const ServiceLevel& serviceLevel);
834  virtual void publishAuthorities(const AuthorityDescriptions& authorities);
835  virtual void unpublishAuthorities(const AuthorityDescriptions& authorities);
836  virtual void publishServicesFiltered(const StringSet& services);
837  virtual void publishServices(const StringSet& services);
838  virtual void unpublishServicesFiltered(const StringSet& services);
839  virtual void unpublishServices(const StringSet& services);
840  virtual void sendData(ChannelRead<void> value, ServiceLevel& serviceLevel);
841  virtual int addBinaryFormatVersion(Buffer<uint8>& data);
842 
843  virtual void receivedPublishServiceMsg();
844  virtual void receivedRPCRequestMsg();
845  virtual void receivedRPCResponseMsg();
846 
847 private:
848 
850  boost::shared_ptr<BinarySerializer> createBinarySerializer(Buffer<uint8>* buffer)
851  {
852  return boost::make_shared<BinarySerializer>(buffer);
853  }
854 };
855 
857 
862 {
864 
865 protected:
867  friend class RemoteConnectionPool;
869 public:
870  template<typename Reflector>
871  void reflect(Reflector& r)
872  {
873  static const std::string incoming = "Incoming";
874  r.roproperty("ConnectDirection", incoming, "Incoming or outgoing connection?");
875  r.roproperty("Address", address.address, "Origin address of the connection");
877  }
878 
880  virtual void start();
881 
883  boost::asio::ip::tcp::endpoint& getEndpoint() {
884  return mPeerEndpoint;
885  }
886 
887 protected:
888 
890  virtual void onDisconnect();
891 
892 protected:
893 
894  void handleReadHeader(const boost::system::error_code& error);
895 
896  void handleReadMessage(const boost::system::error_code& error);
897 
898 protected:
899 
901  boost::asio::ip::tcp::endpoint mPeerEndpoint;
902 };
903 
905 
910 {
912 protected:
914 
915 public:
916  template<typename Reflector>
917  void reflect(Reflector& r)
918  {
919  static const std::string outgoing = "Outgoing";
920  r.roproperty("ConnectDirection", outgoing, "Incoming or outgoing connection?");
921  r.roproperty("ConnectDetails", address, "Active connect details");
923  }
924 
926  virtual void start();
927 
928 protected:
930  virtual void onDisconnect();
931 
933  virtual void onWriteError(boost::system::system_error& e);
934 
935 protected:
936 
937  void handleConnect(const boost::system::error_code& error,
938  boost::asio::ip::tcp::resolver::iterator iterator);
939 
940  void handleReadHeader(const boost::system::error_code& error);
941 
942  void handleReadMessage(const boost::system::error_code& error);
943 
944  std::string mHostName;
945  uint16 mPort;
947 };
948 
949 template <uint8 BinaryFormatVersion>
951  public ConcreteRemoteConnection<BinaryFormatVersion>
952 {
953 protected:
956  friend class RemoteConnectionPool;
958 };
959 
962 
964 
965 }
966 
967 #endif
ConcreteRemoteConnection()
Constructs a remote connection that uses its own io service.
Definition: RemoteConnection.h:819
Information and settings for a known remote framework.
Definition: RemoteConnection.h:80
Duration mTargetOffset
Definition: RemoteConnection.h:229
void handleConnect(const boost::system::error_code &error, boost::asio::ip::tcp::resolver::iterator iterator)
virtual void publishChannels(const ChannelTypeMap &channels)
Notifies the connected framework that we have at least one publisher for each of the channels in the ...
ConcreteRemoteOutgoingConnection(const KnownFramework &address)
Definition: RemoteConnection.h:954
uint32 remoteVersion
The protocol version of the connected framework.
Definition: RemoteConnection.h:688
QoS management informations.
boost::thread mCheckPingTimeoutThread
Definition: RemoteConnection.h:770
std::unordered_map< std::string, ServiceLevel > mPendingChannelUpdates
Definition: RemoteConnection.h:810
std::map< std::string, boost::shared_ptr< RemoteAuthority > > mRemoteAuthorities
Definition: RemoteConnection.h:804
Definition: BinarySerializer.h:324
void setConnection(RemoteConnection *iConnection)
Set the connection pointer. Used to reset the connection on disconnect.
bool hasPingTimeout() const
Check if the connection incoming ping&#39;s are still alive.
IOService mService
Definition: RemoteConnection.h:758
Definition: RemoteConnection.h:780
Type trait that indicates whether a type should be serialized "transparently", i.e.
Definition: IsTransparentSerializable.h:81
Duration mStartOffset
Definition: RemoteConnection.h:230
std::list< RPCMessage > mOutgoingRPCMessages
Definition: RemoteConnection.h:807
void receivedPTPDelayRequest(uint64 timestamp)
boost::thread mSendChannelUpdatesThread
Definition: RemoteConnection.h:771
Time mPTPDelayLocal
Definition: RemoteConnection.h:790
virtual void sendData(ChannelRead< void > value, ServiceLevel &serviceLevel)
boost::asio::ip::tcp::endpoint & getEndpoint()
Returns a reference to the endpoint of the remote peer.
Definition: RemoteConnection.h:883
DispatcherThread::TimerPtr TimerPtr
Definition: DispatcherThread.h:515
void queueRPCMessage(FrameworkMessageType msg, Buffer< uint8 > &&answer)
Queue an outgoing RPC request or RPC response to be transmitted in a separate thread.
void receivedMigrationSinkSuccessMsg()
Definition: RemoteConnection.h:815
virtual void start()
Implementation of RemoteConnection.
bool isPTPSyncEnabled()
Is synchronization of clocks via PTP enabled?
Definition: RemoteConnection.h:412
ServiceLevel by channel name.
Definition: ServiceLevel.h:102
RPCRemoteRequestHandler(RemoteConnection *iConnection)
Constructor taking the connection pointer.
Definition: RemoteConnection.h:336
void setTargetOffset(const Duration &target, const Time &ts=Time::now())
std::string mMigrationNS
Definition: RemoteConnection.h:798
boost::mutex mConnectionMutex
Definition: RemoteConnection.h:352
void receivedSubscribeChannelRequest()
void syncTime()
Time synchronization between frameworks.
StringSet publishedServices
List of services of the connected framework.
Definition: RemoteConnection.h:692
void receivedMigrationFinishedMsg()
ConcreteRemoteConnection(boost::asio::io_service &service)
Constructs a remote connection that uses a given io service.
Definition: RemoteConnection.h:822
Handler that must be implemented by the remote module to send RPC responses to a remote server which ...
Definition: RPCManager.h:133
int mCompensationInterval
Definition: RemoteConnection.h:228
RPCRemoteFinishHandler(RemoteConnection *iConnection)
Constructor taking the connection pointer.
Definition: RemoteConnection.h:305
Connection class for incoming connections.
Definition: RemoteConnection.h:861
void validate() const
Descriptive informations about an authority.
bool isValid() const
Checks if this duration is invalid.
Definition: Time.h:260
Duration queryOffset() const
Definition: RemoteConnection.h:200
std::unique_ptr< Authority > authority
Our authority used for subscribing to data.
Definition: RemoteConnection.h:689
specialize cv::DataType for our ImgPixel and inherit from cv::DataType<Vec>
Definition: IOService.h:67
virtual void unpublishAuthorities(const AuthorityDescriptions &authorities)=0
Notifies the connected framework that the authorities in the authorities map do not longer exist in o...
Buffer< uint8 > mMessage
Definition: RemoteConnection.h:775
virtual void start()
Starts the connection. Can be implemented in derived classes.
TimeOffsetCompensation & operator=(TimeOffsetCompensation other)
Definition: RemoteConnection.h:205
UUID getRemoteID() const
Definition: RemoteConnection.h:519
RPCHandler for sending a rpc call to the server side.
Definition: RemoteConnection.h:332
boost::shared_ptr< RPCRemoteRequestHandler > mRPCRequestHandler
Definition: RemoteConnection.h:803
bool isValid() const
Returns true if this contains a valid time.
Definition: Time.h:583
Duration getCompensationInterval() const
std::set< std::string > MetaSet
Definition: RemoteConnection.h:290
virtual int addBinaryFormatVersion(Buffer< uint8 > &data)
boost::mutex mConnectionMutex
Definition: RemoteConnection.h:321
void valueChanged(ChannelRead< void > value, ServiceLevel &serviceLevel)
Channel callback method that gets registered on each channel the connected framework subscribes...
Time synchronizedTime
Time when the connection was fully established (e.g. PTP synchronized)
Definition: RemoteConnection.h:694
void receivedUnpublishAuthorityMsg()
#define MIRA_REFLECT_BASE(reflector, BaseClass)
Macro that can be used to reflect the base class easily.
Definition: ReflectorInterface.h:956
Time mPTPDelayRemote
Definition: RemoteConnection.h:791
virtual void publishServices(const StringSet &services)=0
Notifies the connected framework that the services in the services set exist in our framework...
RemoteConnection * mConnection
Definition: RemoteConnection.h:322
boost::shared_ptr< MicroUnit > mMigrationUnit
Definition: RemoteConnection.h:797
Definition: RemoteConnection.h:778
Setter< T > setter(void(*f)(const T &))
Creates a Setter for global or static class methods taking the argument by const reference.
Definition: GetterSetter.h:443
std::map< std::string, Typename > ChannelTypeMap
Definition: RemoteConnection.h:281
Time mPTPSyncRemote
Definition: RemoteConnection.h:789
Connection class for outgoing connections.
Definition: RemoteConnection.h:909
std::string frameworkID
The ID/Name of the connected framework.
Definition: RemoteConnection.h:687
void sendConnectDenied(const std::string &msg)
virtual void publishAuthorities(const AuthorityDescriptions &authorities)=0
Notifies the connected framework that the authorities in the authorities map exist in our framework...
boost::mutex mWriteMutex
Definition: RemoteConnection.h:763
virtual void unpublishServices(const StringSet &services)
Notifies the connected framework that the services in the services set do not longer exist in our fra...
AuthState mAuthState
Definition: RemoteConnection.h:784
virtual void unpublishAuthorities(const AuthorityDescriptions &authorities)
Notifies the connected framework that the authorities in the authorities map do not longer exist in o...
void receivedMigrationSinkFailureMsg()
RemoteConnection * mConnection
Definition: RemoteConnection.h:353
void receivedUnsubscribeChannelRequest(const std::string &channelID)
std::string getGlobalID() const
Definition: RemoteConnection.h:524
boost::mutex mChannelUpdatesMutex
Definition: RemoteConnection.h:811
virtual void receivedPublishServiceMsg()
TimeOffsetCompensation & operator=(const Duration &target)
Definition: RemoteConnection.h:215
An object that allows read access to data of a channel.
Definition: ChannelReadWrite.h:435
std::string mHostName
Definition: RemoteConnection.h:944
void parseMessage()
Parses an incoming message (stored in mMessage) and calls the respective receivedXXX method...
virtual void receivedRPCResponseMsg()=0
boost::mutex mMutex
Definition: RemoteConnection.h:233
void setConnection(RemoteConnection *iConnection)
Set the connection pointer. Used to reset the connection on disconnect.
boost::asio::ip::tcp::endpoint mPeerEndpoint
endpoint of the remote peer
Definition: RemoteConnection.h:901
Wrapper class for boost::posix_time::ptime for adding more functionality to it.
Definition: Time.h:421
#define MIRA_ABSTRACT_OBJECT(classIdentifier)
Use this MACRO instead of MIRA_OBJECT to declare the class as abstract.
Definition: FactoryMacros.h:239
std::set< std::string > StringSet
Definition: RemoteConnection.h:282
void setAddress(const KnownFramework &addr)
Definition: RemoteConnection.h:544
void migrateUnit(const std::string &id)
Send a request to the connected framework to transfer ownership of a unit to this framework...
friend class ClassFactoryDefaultConstClassBuilder
Definition: RemoteConnection.h:957
virtual void onDisconnect()
Implementation of RemoteConnection.
virtual void publishServicesFiltered(const StringSet &services)
virtual void unpublishServices(const StringSet &services)=0
Notifies the connected framework that the services in the services set do not longer exist in our fra...
void setAutoReconnect(bool autoReconnect)
Definition: RemoteConnection.h:549
Marker for indicating parameters that should be ignored if they are missing in the config file...
Definition: IgnoreMissing.h:73
Definition: RemoteConnection.h:950
boost::thread mSendRPCMessagesThread
Definition: RemoteConnection.h:768
void receivedUnpublishChannelMsg()
MIRA_BASE_EXPORT void write(const Value &value, std::ostream &ioStream, bool formatted=false, int precision=-1)
Writes a json::Value into a given stream using the JSON format.
Commonly used exception classes.
void writeMessageFromData(FrameworkMessageType msg, const DataType &data)
Writes a message to the other framework.
Definition: RemoteConnection.h:655
TimerPtr mSyncTimeTimer
Definition: RemoteConnection.h:766
void updateOutgoingStats(std::size_t size)
boost::shared_ptr< RPCRemoteFinishHandler > mRPCFinishHandler
Definition: RemoteConnection.h:802
virtual void onWriteError(boost::system::system_error &e)
Implementation of RemoteConnection.
virtual void publishAuthorities(const AuthorityDescriptions &authorities)
Notifies the connected framework that the authorities in the authorities map exist in our framework...
Message types exchanged between remote frameworks.
const std::string & getFrameworkID() const
Definition: RemoteConnection.h:534
#define MIRA_FRAMEWORK_EXPORT
Definition: FrameworkExports.h:61
void write(const BufferSequence &buffers)
Send data in the buffers to the connected framework.
Definition: RemoteConnection.h:502
sec_type seconds() const
Returns normalized number of seconds (0..59)
Definition: Time.h:283
Use this class to represent time durations.
Definition: Time.h:104
void reflect(Reflector &r)
Definition: RemoteConnection.h:376
void receivedPublishAuthorityMsg()
std::pair< FrameworkMessageType, Buffer< uint8 > > RPCMessage
Definition: RemoteConnection.h:806
void reflect(Reflector &r)
Definition: RemoteConnection.h:871
void receivedPTPDelayResponse(uint64 timestamp)
The object class acts as a generic base class for classes which should be used with the classFactory...
Definition: Object.h:144
boost::asio::ip::tcp::socket & getSocket()
Returns the network socket of this connection.
Definition: RemoteConnection.h:393
ChannelSendMap subscriptions
List of channels the connected framework is subscribed to + what was sent to them.
Definition: RemoteConnection.h:690
KnownFramework()
Definition: RemoteConnection.h:82
bool mPTPOutgoing
Definition: RemoteConnection.h:787
void handleReadMessage(const boost::system::error_code &error)
Authorities act as a facade to the framework.
Definition: Authority.h:93
Getter< T > getter(T(*f)())
Creates a Getter for global or static class methods returning the result by value.
Definition: GetterSetter.h:136
bool isInitialized() const
Definition: RemoteConnection.h:190
ConcreteRemoteOutgoingConnection< 0 > RemoteOutgoingConnectionLegacy
Definition: RemoteConnection.h:961
void handleReadHeader(const boost::system::error_code &error)
ConcreteRemoteOutgoingConnection< 2 > RemoteOutgoingConnection
Definition: RemoteConnection.h:960
bool forcePTP
force PTP time sync
Definition: RemoteConnection.h:132
Definition: RemoteConnection.h:782
uint16 mPort
Definition: RemoteConnection.h:945
virtual ~ConcreteRemoteConnection()
Destructor.
Definition: RemoteConnection.h:828
virtual void onRPCrequested(Buffer< uint8 > &&request)
Implementation of RPCManager::RemoteRequestHandler Will send request to the framework that will proce...
Time mPingLastReceived
Definition: RemoteConnection.h:795
void reflect(Reflector &r)
Reflect method for serialization.
Definition: RemoteConnection.h:175
void receivedRequestMigrationMsg()
#define MIRA_OBJECT(classIdentifier)
Use this MACRO if you like the factory to automatically extract the class name from the given identif...
Definition: FactoryMacros.h:183
void setAuthority(std::unique_ptr< Authority > auth)
Definition: RemoteConnection.h:529
Framework channel classes.
KnownFramework address
The address of the connected framework.
Definition: RemoteConnection.h:685
AuthState
Definition: RemoteConnection.h:777
TimeOffsetCompensation(const TimeOffsetCompensation &other)
Definition: RemoteConnection.h:161
boost::tuple< std::string, uint16 > getHostPort() const
void handleReadMessage(const boost::system::error_code &error)
RPCHandler for getting notified when an rpc call on server side is finished and the response is ready...
Definition: RemoteConnection.h:301
virtual int addBinaryFormatVersion(Buffer< uint8 > &data)=0
virtual void subscribeChannel(const std::string &channelID, const ServiceLevel &serviceLevel)
Notify the connected remote frameworks that we have a subscriber for the given channel (assuming it h...
bool isPingTimeoutEnabled()
Is ping timeout enabled?
Definition: RemoteConnection.h:418
std::map< std::string, SendData > ChannelSendMap
Definition: RemoteConnection.h:288
Base class of connections between frameworks.
Definition: RemoteConnection.h:276
virtual void start()
Implementation of RemoteConnection.
virtual void onRPCfinished(Buffer< uint8 > &&answer)
Implementation of RPCManager::RemoteFinishHandler Will send answer back to calling framework using th...
virtual void publishServices(const StringSet &services)
Notifies the connected framework that the services in the services set exist in our framework...
boost::mutex mStopMutex
Definition: RemoteConnection.h:764
void receivedPTPFollowUp(uint64 timestamp)
Time mPingLastSend
Definition: RemoteConnection.h:794
#define MIRA_MINOR_VERSION(v)
Calculate the minor version of v.
Definition: FrameworkDefines.h:55
#define MIRA_MAJOR_VERSION(v)
Calculate the major version of v.
Definition: FrameworkDefines.h:53
RemoteOutgoingConnectionBase(const KnownFramework &address)
Contains internal RPCManager class.
void stop()
Close the socket.
bool keep
if true the information is stored in list of frameworks that we try to reconnect to after disconnect ...
Definition: RemoteConnection.h:129
Typedefs and serialization support for uuids.
boost::asio::mutable_buffers_1 asBuffer()
Returns this message as boost asio buffer.
Definition: FrameworkMessage.h:117
std::list< AuthorityDescription > AuthorityDescriptions
Definition: RemoteConnection.h:291
virtual void receivedPublishServiceMsg()=0
UUID remoteID
The UUID of the connected framework.
Definition: RemoteConnection.h:686
bool mEnablePingTimeout
Ping timeout enabled for this connection?
Definition: RemoteConnection.h:696
virtual void unpublishServicesFiltered(const StringSet &services)
static Time now() static Time eternity()
Returns the current utc based time.
Definition: Time.h:484
Definition: BinarySerializer.h:257
Definition: RemoteConnection.h:158
void sendPTP()
Sends a PTP command used for time synchronization between frameworks.
uint8 metaVersion
Definition: RemoteConnection.h:285
virtual void onWriteError(boost::system::system_error &e)
Called when writing to the socket failed. Can be implemented in derived classes.
Definition: RemoteConnection.h:563
void receivedUnpublishServiceMsg()
virtual void onDisconnect()
Called in stop() when connection is about to be stopped.
Definition: RemoteConnection.h:560
void unpublishChannel(const std::string &channel)
Notifies the connected framework that we no longer have a publisher for the given channel...
bool checkMessageHeader() const
Returns true, if the message (header) is valid, i.e.
Wrapper class for boost::asio::io_service.
Definition: IOService.h:75
virtual void receivedRPCRequestMsg()
bool hasAuthority(const std::string &id) const
Check if a authority with given full id exists in the connected framework.
bool mStopScheduled
Definition: RemoteConnection.h:946
uint8 binaryFormatVersion
The binary format used by the framework (to enable connecting to legacy framework).
Definition: RemoteConnection.h:138
boost::uuids::uuid UUID
Shorter name for boost uuid.
Definition: UUID.h:66
TimeOffsetCompensation clockOffset
The clock offset between us and the connected framework.
Definition: RemoteConnection.h:697
Time mPTPSyncLocal
Definition: RemoteConnection.h:788
void writeMessageFromBuffer(FrameworkMessageType msg, const BufferType &buffer)
Writes a message to the other framework.
Definition: RemoteConnection.h:639
const KnownFramework & getAddress() const
Definition: RemoteConnection.h:539
boost::condition_variable mRPCMessagesCondition
Definition: RemoteConnection.h:761
bool monitorOnly
Local channels, services and authorities are not published to the remote side if monitor-only is true...
Definition: RemoteConnection.h:144
Duration offset(const Time &ts) const
void startTimeSync()
Create a timer to frequently call syncTime.
MetaSet sentMetaInformation
Set of type meta information already sent.
Definition: RemoteConnection.h:691
Time mHeaderReceived
Definition: RemoteConnection.h:773
virtual void receivedRPCResponseMsg()
boost::asio::ip::tcp::socket mSocket
Definition: RemoteConnection.h:759
Definition: RemoteConnection.h:284
An authority class that represents a remote authority that is located in a connected framework...
boost::mutex mRPCMessagesMutex
Definition: RemoteConnection.h:808
std::string mMigrationID
Definition: RemoteConnection.h:799
RemoteConnection()
Constructs a remote connection that uses its own io service.
bool isSynchronized() const
synchronizeFrameworks() was executed.
Definition: RemoteConnection.h:438
friend class ClassFactoryDefaultConstClassBuilder
Definition: RemoteConnection.h:868
bool mEnablePTPSync
PTP Sync enabled for this connection?
Definition: RemoteConnection.h:695
void writeMessage(FrameworkMessageType msg)
Writes a message to the other framework.
Definition: RemoteConnection.h:594
Time lastData
Definition: RemoteConnection.h:286
friend void createConcreteRemoteConnectionInstances()
std::string mAuthSignMsg
Definition: RemoteConnection.h:785
void reflect(Reflector &r)
Reflect method for serialization.
Definition: RemoteConnection.h:92
FrameworkMessageType
Remote framework message types.
Definition: FrameworkMessage.h:63
virtual void receivedRPCRequestMsg()=0
bool mStopped
Definition: RemoteConnection.h:800
boost::thread mProcessPingThread
Definition: RemoteConnection.h:769
virtual void onDisconnect()
Implementation of RemoteConnection.
virtual void subscribeChannel(const std::string &channelID, const ServiceLevel &serviceLevel)=0
Notify the connected remote frameworks that we have a subscriber for the given channel (assuming it h...
void ping()
Sends a ping command.
void setCompensationInterval(const Duration &interval)
FrameworkMessageHeader mHeader
Definition: RemoteConnection.h:774
TimeOffsetCompensation(const Duration &compensationInterval=Duration::seconds(10))
Definition: RemoteConnection.h:167
virtual void publishChannels(const ChannelTypeMap &channels)=0
Notifies the connected framework that we have at least one publisher for each of the channels in the ...
void handleReadHeader(const boost::system::error_code &error)
Wrapper for boost::asio::io_service.
std::string address
address in the form of host:port
Definition: RemoteConnection.h:123
Definition: RemoteConnection.h:781
void onConnect(bool enablePTPTimeSync, bool enablePingTimeout)
Called by RemoteModule::onIncomingConnected/onOutgoingConnected.
Connection pool that holds the ownership for RemoteConnections.
Owner of every RemoteConnection.
Definition: RemoteConnectionPool.h:115
void reflect(Reflector &r)
Definition: RemoteConnection.h:917
virtual void sendData(ChannelRead< void > value, ServiceLevel &serviceLevel)
Definition: RemoteConnection.h:738
Class for in-place stream formatting Used for constructs like:
Definition: MakeString.h:63
Definition: RemoteConnection.h:779
Time mStartTime
Definition: RemoteConnection.h:231
Units are basic modules of a complex application.
Definition: MicroUnit.h:69
Time lastConnectionTry
The last time we tried to connect to that address.
Definition: RemoteConnection.h:150
Data that is sent as header in each message between remote frameworks.
Definition: FrameworkMessage.h:101
virtual ~RemoteConnection()
Destructor.
Handler that must be implemented by the remote module to send RPC requests to a remote server...
Definition: RPCManager.h:111
Time mLastPTP
Definition: RemoteConnection.h:792