MIRA
RemoteConnection.h
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2012 by
3  * MetraLabs GmbH (MLAB), GERMANY
4  * and
5  * Neuroinformatics and Cognitive Robotics Labs (NICR) at TU Ilmenau, GERMANY
6  * All rights reserved.
7  *
8  * Contact: info@mira-project.org
9  *
10  * Commercial Usage:
11  * Licensees holding valid commercial licenses may use this file in
12  * accordance with the commercial license agreement provided with the
13  * software or, alternatively, in accordance with the terms contained in
14  * a written agreement between you and MLAB or NICR.
15  *
16  * GNU General Public License Usage:
17  * Alternatively, this file may be used under the terms of the GNU
18  * General Public License version 3.0 as published by the Free Software
19  * Foundation and appearing in the file LICENSE.GPL3 included in the
20  * packaging of this file. Please review the following information to
21  * ensure the GNU General Public License version 3.0 requirements will be
22  * met: http://www.gnu.org/copyleft/gpl.html.
23  * Alternatively you may (at your option) use any later version of the GNU
24  * General Public License if such license has been publicly approved by
25  * MLAB and NICR (or its successors, if any).
26  *
27  * IN NO EVENT SHALL "MLAB" OR "NICR" BE LIABLE TO ANY PARTY FOR DIRECT,
28  * INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT OF
29  * THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF "MLAB" OR
30  * "NICR" HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  *
32  * "MLAB" AND "NICR" SPECIFICALLY DISCLAIM ANY WARRANTIES, INCLUDING,
33  * BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
34  * FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
35  * ON AN "AS IS" BASIS, AND "MLAB" AND "NICR" HAVE NO OBLIGATION TO
36  * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS OR MODIFICATIONS.
37  */
38 
47 #ifndef _MIRA_REMOTECONNECTION_H_
48 #define _MIRA_REMOTECONNECTION_H_
49 
50 #ifndef Q_MOC_RUN
51 #include <boost/array.hpp>
52 #include <boost/make_shared.hpp>
53 #endif
54 
55 #include <utils/UUID.h>
57 #include <error/Exceptions.h>
58 
59 #include <rpc/RPCManager.h>
60 
61 #include <fw/RemoteAuthority.h>
63 
65 #include <fw/Channel.h>
66 #include <fw/FrameworkMessage.h>
67 #include <fw/ServiceLevel.h>
68 
69 namespace mira {
70 
72 
76 
81 {
83  address(""),
84  keep(true),
85  forcePTP(false),
86  binaryFormatVersion(BinaryBufferSerializer::getSerializerFormatVersion()),
87  monitorOnly(false),
88  lastConnectionTry(Time::unixEpoch()) {}
89 
91  template<typename Reflector>
92  void reflect(Reflector& r)
93  {
94  r.member("Address", address, "");
95  r.roproperty("Address", address, "The address IP:port");
96 
97  r.member("KeepConnected", keep, "", true);
98  r.roproperty("KeepConnected", keep, "Whether to reconnect after a lost connection");
99 
100  r.member("ForcePTP", forcePTP, "", false);
101  r.roproperty("ForcePTP", forcePTP, "Whether to force PTP time sync for this connection");
102 
103  r.member("BinaryFormatVersion", binaryFormatVersion, "",
104  BinaryBufferSerializer::getSerializerFormatVersion());
105  r.roproperty("BinaryFormatVersion", binaryFormatVersion,
106  "Binary format version used by the remote framework");
107 
108  r.member("MonitorOnly", monitorOnly, "", false);
109  r.roproperty("MonitorOnly", monitorOnly,
110  "Monitor-only connections do not publish local channels, "
111  "services and authorities to remote framework");
112 
113  r.member("LastConnectionTry", lastConnectionTry, "",
115  r.roproperty("LastConnectionTry", lastConnectionTry,
116  "Last time tried to establish connection (UTC)");
117  }
118 
119  void validate() const;
120  boost::tuple<std::string, uint16> getHostPort() const;
121 
123  std::string address;
124 
129  bool keep;
130 
132  bool forcePTP;
133 
139 
145 
151 };
152 
153 template <typename SerializerTag>
154 class IsTransparentSerializable<KnownFramework, SerializerTag> : public std::true_type {};
155 
157 
159 {
160 public:
164  mStartOffset(other.mStartOffset),
165  mStartTime(other.mStartTime) {}
166 
167  explicit TimeOffsetCompensation(const Duration& compensationInterval = Duration::seconds(10))
168  : mCompensationInterval(compensationInterval.totalMicroseconds()),
169  mTargetOffset(Duration::invalid()),
170  mStartOffset(Duration::invalid()),
171  mStartTime(Time::invalid()) {}
172 
174  template<typename Reflector>
175  void reflect(Reflector& r)
176  {
177  r.property("CompensationInterval",
180  "Interval to reach target offset");
181  r.roproperty("TargetOffset", mTargetOffset, "Target offset");
182  r.roproperty("StartOffset", mStartOffset, "Start offset");
183  r.roproperty("StartTime", mStartTime, "Time when start offset was valid");
184  r.roproperty("CurrentOffset",
186  "Current offset (interpolated between start and target offset)");
187  }
188 
189 public:
190  bool isInitialized()const { return mTargetOffset.isValid(); }
191 
192 public:
194  void setCompensationInterval(const Duration& interval);
195 
196  void setTargetOffset(const Duration& target, const Time& ts = Time::now());
197 
198 public:
199  Duration queryOffset(const Time& ts) const;
200  Duration queryOffset() const { return queryOffset(Time::now()); } // for use in getter
201 
202 public:
203 
204  // assignment operator uses copy constructor (through by-value argument)
206  using std::swap;
208  swap(mTargetOffset, other.mTargetOffset);
209  swap(mStartOffset, other.mStartOffset);
210  swap(mStartTime, other.mStartTime);
211  return *this;
212  }
213 
214  // "Interface adaptors" to support assign from and query as Duration
216  setTargetOffset(target);
217  return *this;
218  }
219 
220  operator Duration() const { return queryOffset(); }
221 
222 protected:
223 
224  Duration offset(const Time& ts) const;
225 
226 protected:
227 
228  int mCompensationInterval; // total microseconds
232 
233  mutable boost::mutex mMutex;
234 };
235 
237 
276 class RemoteConnection : public Object
277 {
279 public:
280 
281  typedef std::map<std::string, Typename> ChannelTypeMap;
282  typedef std::set<std::string> StringSet;
283 
284  struct SendData {
285  uint8 metaVersion;
287  };
288  typedef std::map<std::string, SendData> ChannelSendMap;
289 
290  typedef std::set<std::string> MetaSet;
291  typedef std::list<AuthorityDescription> AuthorityDescriptions;
292 
294 
302  {
303  public:
306  mConnection(iConnection)
307  {}
308 
314  virtual void onRPCfinished(Buffer<uint8>&& answer);
315 
317  void setConnection(RemoteConnection* iConnection);
318 
319  protected:
320 
321  boost::mutex mConnectionMutex;
323  };
324 
326 
333  {
334  public:
337  mConnection(iConnection)
338  {}
339 
345  virtual void onRPCrequested(Buffer<uint8>&& request);
346 
348  void setConnection(RemoteConnection* iConnection);
349 
350  protected:
351 
352  boost::mutex mConnectionMutex;
354  };
355 
357 
358 protected:
364 
369  RemoteConnection(boost::asio::io_service& service);
370 
371 public:
373  virtual ~RemoteConnection();
374 
375  template<typename Reflector>
376  void reflect(Reflector& r)
377  {
378  r.roproperty("RemoteFrameworkName", frameworkID, "ID/Name of the remote framework");
379  r.roproperty("RemoteProtocolVersion",
380  getter<std::string>( [&]()->std::string {
381  return MakeString() << MIRA_MAJOR_VERSION(remoteVersion) << "."
383  "Protocol version of the remote framework");
384  r.roproperty("Connected", synchronizedTime, "Time when the connection was established");
385  r.roproperty("PTPSync", getter(&RemoteConnection::isPTPSyncEnabled, this),
386  "Synchronization of clocks via PTP enabled?");
387  r.roproperty("PingTimeout", getter(&RemoteConnection::isPingTimeoutEnabled, this),
388  "Ping timeout enabled?");
389  r.property("TimeSynch", clockOffset, "PTP time synchronization");
390  }
391 
393  boost::asio::ip::tcp::socket& getSocket()
394  {
395  return mSocket;
396  }
397 
399  virtual void start();
400 
402  void stop();
403 
409  void onConnect(bool enablePTPTimeSync, bool enablePingTimeout);
410 
413  {
414  return mEnablePTPSync;
415  }
416 
419  {
420  return mEnablePingTimeout;
421  }
422 
429  void startTimeSync();
430 
431 
438  bool isSynchronized() const
439  {
440  return synchronizedTime.isValid();
441  }
442 
447  virtual void publishChannels(const ChannelTypeMap& channels) = 0;
448 
453  void unpublishChannel(const std::string& channel);
454 
459  virtual void subscribeChannel(const std::string& channelID, const ServiceLevel& serviceLevel) = 0;
460 
465  virtual void publishAuthorities(const AuthorityDescriptions& authorities) = 0;
466 
471  virtual void unpublishAuthorities(const AuthorityDescriptions& authorities) = 0;
472 
477  void migrateUnit(const std::string& id);
478 
483  bool hasAuthority(const std::string& id) const;
484 
489  virtual void publishServices(const StringSet& services) = 0;
490 
495  virtual void unpublishServices(const StringSet& services) = 0;
496 
501  template <typename BufferSequence>
502  void write(const BufferSequence& buffers)
503  {
504  boost::mutex::scoped_lock lock(mWriteMutex);
505  if (mStopped)
506  return;
507  try
508  {
509  std::size_t bytes = boost::asio::write(mSocket, buffers, boost::asio::transfer_all());
510  updateOutgoingStats(bytes);
511  }
512  catch (boost::system::system_error& e)
513  {
514  // let the derived class decide what to do with the error
515  onWriteError(e);
516  }
517  }
518 
520  {
521  return remoteID;
522  }
523 
524  std::string getGlobalID() const
525  {
526  return authority->getGlobalID();
527  }
528 
529  void setAuthority(std::unique_ptr<Authority> auth)
530  {
531  authority = std::move(auth);
532  }
533 
534  const std::string& getFrameworkID() const
535  {
536  return frameworkID;
537  }
538 
539  const KnownFramework& getAddress() const
540  {
541  return address;
542  }
543 
544  void setAddress(const KnownFramework& addr)
545  {
546  address = addr;
547  }
548 
549  void setAutoReconnect(bool autoReconnect)
550  {
551  address.keep = autoReconnect;
552  }
553 
554 protected:
555 
560  virtual void onDisconnect() {}
561 
563  virtual void onWriteError(boost::system::system_error& e) {}
564 
571  void syncTime();
572 
576  void sendPTP();
577 
582  void ping();
583 
587  bool hasPingTimeout() const;
588 
594  FrameworkMessageHeader header(0, msg);
595  boost::array<boost::asio::const_buffer, 1> buffers =
596  {{
597  header.asBuffer(),
598  }};
599  write(buffers);
600  }
601 
607  template <typename Arg, typename... Args>
608  void writeMessage(FrameworkMessageType msg, Arg&& arg, Args&&... args)
609  {
611  BinaryBufferOstream os(&buffer);
612  writeArgsToStream(os, std::forward<Arg>(arg), std::forward<Args>(args)...);
613  writeMessageFromBuffer(msg, buffer);
614  }
615 
621  template <typename BufferType>
622  void writeMessageFromBuffer(FrameworkMessageType msg, const BufferType& buffer) {
623  FrameworkMessageHeader header(buffer.size(), msg);
624  boost::array<boost::asio::const_buffer, 2> buffers =
625  {{
626  header.asBuffer(),
627  boost::asio::buffer(buffer.data(), buffer.size())
628  }};
629  write(buffers);
630  }
631 
637  template <typename DataType>
638  void writeMessageFromData(FrameworkMessageType msg, const DataType& data) {
639  FrameworkMessageHeader header(sizeof(DataType), msg);
640  boost::array<boost::asio::const_buffer, 2> buffers =
641  {{
642  header.asBuffer(),
643  boost::asio::buffer((void*)&data, sizeof(DataType))
644  }};
645  write(buffers);
646  }
647 
654 
660  void valueChanged(ChannelRead<void> value, ServiceLevel& serviceLevel);
661 
666  void parseMessage();
667 
670  std::string frameworkID;
671  uint32 remoteVersion;
672  std::unique_ptr<Authority> authority;
676 
681 
682 private:
683 
684  void init(); // called by constructors to initialize members
685 
687  bool isLocal() const;
688 
689  static void writeArgsToStream(BinaryBufferOstream& stream) {}
690 
691  template <typename Arg, typename... Args>
692  static void writeArgsToStream(BinaryBufferOstream& stream, Arg&& arg, Args&&... args)
693  {
694  stream << std::forward<Arg>(arg);
695  writeArgsToStream(stream, std::forward<Args>(args)...);
696  }
697 
698 protected:
699 
700  void receivedPTPFollowUp(uint64 timestamp);
701  void receivedPTPDelayResponse(uint64 timestamp);
702  void receivedPTPDelayRequest(uint64 timestamp);
703  void receivedPTPFinish();
705  void receivedUnsubscribeChannelRequest(const std::string& channelID);
710  virtual void receivedPublishServiceMsg() = 0;
713  virtual void receivedRPCRequestMsg() = 0;
714  virtual void receivedRPCResponseMsg() = 0;
716  void receivedMigrationMsg();
720  void receivedTypeMetaMsg();
721  void receivedChannelMetaMsg();
722  void receivedPingMsg();
723 
724  // This method must not be pure virtual: When a remote incoming connection
725  // times out and will be destroyed, some still pending valueChanged()
726  // callbacks will be executed, which will call sendData. At this time, the
727  // RemoteIncomingConnection is already destroyed (or will be destroyed) and
728  // then sendData will be a pure-virtual function.
729  // TODO: that still applies after r9735?
730  virtual void sendData(ChannelRead<void> value, ServiceLevel& serviceLevel) {}
731 
732  virtual int addBinaryFormatVersion(Buffer<uint8>& data) = 0;
733  void synchronizeFrameworks();
734  void updateOutgoingStats(std::size_t size);
735 
741  bool checkMessageHeader() const;
742 
743  void sendConnectDenied(const std::string& msg);
744 
745  void sendRPCMessagesThread();
746  void processPingThread();
747  void checkPingTimeoutThread();
749 
751  boost::asio::ip::tcp::socket mSocket;
752 
753  boost::condition_variable mRPCMessagesCondition;
754 
755  boost::mutex mWriteMutex;
756  boost::mutex mStopMutex;
757 
759 
760  boost::thread mSendRPCMessagesThread;
761  boost::thread mProcessPingThread;
762  boost::thread mCheckPingTimeoutThread;
764 
768 
769  enum AuthState {
775  };
777  std::string mAuthSignMsg; // random message that is used for strong authentication
778 
785 
788 
789  boost::shared_ptr<MicroUnit> mMigrationUnit;
790  std::string mMigrationNS;
791  std::string mMigrationID;
792  bool mStopped;
793 
794  boost::shared_ptr<RPCRemoteFinishHandler> mRPCFinishHandler;
795  boost::shared_ptr<RPCRemoteRequestHandler> mRPCRequestHandler;
796  std::map<std::string, boost::shared_ptr<RemoteAuthority>> mRemoteAuthorities;
797 
798  typedef std::pair<FrameworkMessageType, Buffer<uint8>> RPCMessage;
799  std::list<RPCMessage> mOutgoingRPCMessages;
800  boost::mutex mRPCMessagesMutex;
801 
802  std::unordered_map<std::string, ServiceLevel> mPendingChannelUpdates;
803  boost::mutex mChannelUpdatesMutex;
804 };
805 
806 template <uint8 BinaryFormatVersion>
808 {
809 protected:
812 
814  ConcreteRemoteConnection(boost::asio::io_service& service) : RemoteConnection(service) {}
815 
817 
818 public:
821 
822 protected:
823 
824  virtual void publishChannels(const ChannelTypeMap& channels);
825  virtual void subscribeChannel(const std::string& channelID, const ServiceLevel& serviceLevel);
826  virtual void publishAuthorities(const AuthorityDescriptions& authorities);
827  virtual void unpublishAuthorities(const AuthorityDescriptions& authorities);
828  virtual void publishServicesFiltered(const StringSet& services);
829  virtual void publishServices(const StringSet& services);
830  virtual void unpublishServicesFiltered(const StringSet& services);
831  virtual void unpublishServices(const StringSet& services);
832  virtual void sendData(ChannelRead<void> value, ServiceLevel& serviceLevel);
833  virtual int addBinaryFormatVersion(Buffer<uint8>& data);
834 
835  virtual void receivedPublishServiceMsg();
836  virtual void receivedRPCRequestMsg();
837  virtual void receivedRPCResponseMsg();
838 
839 private:
840 
842  boost::shared_ptr<BinarySerializer> createBinarySerializer(Buffer<uint8>* buffer)
843  {
844  return boost::make_shared<BinarySerializer>(buffer);
845  }
846 };
847 
849 
854 {
856 
857 protected:
859  friend class RemoteConnectionPool;
861 public:
862  template<typename Reflector>
863  void reflect(Reflector& r)
864  {
865  static const std::string incoming = "Incoming";
866  r.roproperty("ConnectDirection", incoming, "Incoming or outgoing connection?");
867  r.roproperty("Address", address.address, "Origin address of the connection");
869  }
870 
872  virtual void start();
873 
875  boost::asio::ip::tcp::endpoint& getEndpoint() {
876  return mPeerEndpoint;
877  }
878 
879 protected:
880 
882  virtual void onDisconnect();
883 
884 protected:
885 
886  void handleReadHeader(const boost::system::error_code& error);
887 
888  void handleReadMessage(const boost::system::error_code& error);
889 
890 protected:
891 
893  boost::asio::ip::tcp::endpoint mPeerEndpoint;
894 };
895 
897 
902 {
904 protected:
906 
907 public:
908  template<typename Reflector>
909  void reflect(Reflector& r)
910  {
911  static const std::string outgoing = "Outgoing";
912  r.roproperty("ConnectDirection", outgoing, "Incoming or outgoing connection?");
913  r.roproperty("ConnectDetails", address, "Active connect details");
915  }
916 
918  virtual void start();
919 
920 protected:
922  virtual void onDisconnect();
923 
925  virtual void onWriteError(boost::system::system_error& e);
926 
927 protected:
928 
929  void handleConnect(const boost::system::error_code& error,
930  boost::asio::ip::tcp::resolver::iterator iterator);
931 
932  void handleReadHeader(const boost::system::error_code& error);
933 
934  void handleReadMessage(const boost::system::error_code& error);
935 
936  std::string mHostName;
937  uint16 mPort;
939 };
940 
941 template <uint8 BinaryFormatVersion>
943  public ConcreteRemoteConnection<BinaryFormatVersion>
944 {
945 protected:
948  friend class RemoteConnectionPool;
950 };
951 
954 
956 
957 }
958 
959 #endif
ConcreteRemoteConnection()
Constructs a remote connection that uses its own io service.
Definition: RemoteConnection.h:811
Information and settings for a known remote framework.
Definition: RemoteConnection.h:80
Duration mTargetOffset
Definition: RemoteConnection.h:229
void handleConnect(const boost::system::error_code &error, boost::asio::ip::tcp::resolver::iterator iterator)
virtual void publishChannels(const ChannelTypeMap &channels)
Notifies the connected framework that we have at least one publisher for each of the channels in the ...
ConcreteRemoteOutgoingConnection(const KnownFramework &address)
Definition: RemoteConnection.h:946
uint32 remoteVersion
The protocol version of the connected framework.
Definition: RemoteConnection.h:671
QoS management informations.
boost::thread mCheckPingTimeoutThread
Definition: RemoteConnection.h:762
std::unordered_map< std::string, ServiceLevel > mPendingChannelUpdates
Definition: RemoteConnection.h:802
std::map< std::string, boost::shared_ptr< RemoteAuthority > > mRemoteAuthorities
Definition: RemoteConnection.h:796
Definition: BinarySerializer.h:324
void setConnection(RemoteConnection *iConnection)
Set the connection pointer. Used to reset the connection on disconnect.
bool hasPingTimeout() const
Check if the connection incoming ping&#39;s are still alive.
IOService mService
Definition: RemoteConnection.h:750
Definition: RemoteConnection.h:772
Type trait that indicates whether a type should be serialized "transparently", i.e.
Definition: IsTransparentSerializable.h:81
Duration mStartOffset
Definition: RemoteConnection.h:230
std::list< RPCMessage > mOutgoingRPCMessages
Definition: RemoteConnection.h:799
void receivedPTPDelayRequest(uint64 timestamp)
boost::thread mSendChannelUpdatesThread
Definition: RemoteConnection.h:763
Time mPTPDelayLocal
Definition: RemoteConnection.h:782
virtual void sendData(ChannelRead< void > value, ServiceLevel &serviceLevel)
boost::asio::ip::tcp::endpoint & getEndpoint()
Returns a reference to the endpoint of the remote peer.
Definition: RemoteConnection.h:875
DispatcherThread::TimerPtr TimerPtr
Definition: DispatcherThread.h:513
void queueRPCMessage(FrameworkMessageType msg, Buffer< uint8 > &&answer)
Queue an outgoing RPC request or RPC response to be transmitted in a separate thread.
void receivedMigrationSinkSuccessMsg()
Definition: RemoteConnection.h:807
virtual void start()
Implementation of RemoteConnection.
bool isPTPSyncEnabled()
Is synchronization of clocks via PTP enabled?
Definition: RemoteConnection.h:412
ServiceLevel by channel name.
Definition: ServiceLevel.h:102
RPCRemoteRequestHandler(RemoteConnection *iConnection)
Constructor taking the connection pointer.
Definition: RemoteConnection.h:336
void setTargetOffset(const Duration &target, const Time &ts=Time::now())
std::string mMigrationNS
Definition: RemoteConnection.h:790
boost::mutex mConnectionMutex
Definition: RemoteConnection.h:352
void receivedSubscribeChannelRequest()
void syncTime()
Time synchronization between frameworks.
StringSet publishedServices
List of services of the connected framework.
Definition: RemoteConnection.h:675
void receivedMigrationFinishedMsg()
ConcreteRemoteConnection(boost::asio::io_service &service)
Constructs a remote connection that uses a given io service.
Definition: RemoteConnection.h:814
Handler that must be implemented by the remote module to send RPC responses to a remote server which ...
Definition: RPCManager.h:133
int mCompensationInterval
Definition: RemoteConnection.h:228
RPCRemoteFinishHandler(RemoteConnection *iConnection)
Constructor taking the connection pointer.
Definition: RemoteConnection.h:305
Connection class for incoming connections.
Definition: RemoteConnection.h:853
void validate() const
Descriptive informations about an authority.
bool isValid() const
Checks if this duration is invalid.
Definition: Time.h:260
Duration queryOffset() const
Definition: RemoteConnection.h:200
std::unique_ptr< Authority > authority
Our authority used for subscribing to data.
Definition: RemoteConnection.h:672
specialize cv::DataType for our ImgPixel and inherit from cv::DataType<Vec>
Definition: IOService.h:67
virtual void unpublishAuthorities(const AuthorityDescriptions &authorities)=0
Notifies the connected framework that the authorities in the authorities map do not longer exist in o...
Buffer< uint8 > mMessage
Definition: RemoteConnection.h:767
virtual void start()
Starts the connection. Can be implemented in derived classes.
TimeOffsetCompensation & operator=(TimeOffsetCompensation other)
Definition: RemoteConnection.h:205
UUID getRemoteID() const
Definition: RemoteConnection.h:519
RPCHandler for sending a rpc call to the server side.
Definition: RemoteConnection.h:332
boost::shared_ptr< RPCRemoteRequestHandler > mRPCRequestHandler
Definition: RemoteConnection.h:795
bool isValid() const
Returns true if this contains a valid time.
Definition: Time.h:583
Duration getCompensationInterval() const
std::set< std::string > MetaSet
Definition: RemoteConnection.h:290
virtual int addBinaryFormatVersion(Buffer< uint8 > &data)
boost::mutex mConnectionMutex
Definition: RemoteConnection.h:321
void valueChanged(ChannelRead< void > value, ServiceLevel &serviceLevel)
Channel callback method that gets registered on each channel the connected framework subscribes...
Time synchronizedTime
Time when the connection was fully established (e.g. PTP synchronized)
Definition: RemoteConnection.h:677
void receivedUnpublishAuthorityMsg()
#define MIRA_REFLECT_BASE(reflector, BaseClass)
Macro that can be used to reflect the base class easily.
Definition: ReflectorInterface.h:912
Time mPTPDelayRemote
Definition: RemoteConnection.h:783
virtual void publishServices(const StringSet &services)=0
Notifies the connected framework that the services in the services set exist in our framework...
RemoteConnection * mConnection
Definition: RemoteConnection.h:322
boost::shared_ptr< MicroUnit > mMigrationUnit
Definition: RemoteConnection.h:789
Definition: RemoteConnection.h:770
Setter< T > setter(void(*f)(const T &))
Creates a Setter for global or static class methods taking the argument by const reference.
Definition: GetterSetter.h:443
std::map< std::string, Typename > ChannelTypeMap
Definition: RemoteConnection.h:281
Time mPTPSyncRemote
Definition: RemoteConnection.h:781
Connection class for outgoing connections.
Definition: RemoteConnection.h:901
std::string frameworkID
The ID/Name of the connected framework.
Definition: RemoteConnection.h:670
void sendConnectDenied(const std::string &msg)
virtual void publishAuthorities(const AuthorityDescriptions &authorities)=0
Notifies the connected framework that the authorities in the authorities map exist in our framework...
boost::mutex mWriteMutex
Definition: RemoteConnection.h:755
virtual void unpublishServices(const StringSet &services)
Notifies the connected framework that the services in the services set do not longer exist in our fra...
AuthState mAuthState
Definition: RemoteConnection.h:776
virtual void unpublishAuthorities(const AuthorityDescriptions &authorities)
Notifies the connected framework that the authorities in the authorities map do not longer exist in o...
void receivedMigrationSinkFailureMsg()
RemoteConnection * mConnection
Definition: RemoteConnection.h:353
void receivedUnsubscribeChannelRequest(const std::string &channelID)
std::string getGlobalID() const
Definition: RemoteConnection.h:524
boost::mutex mChannelUpdatesMutex
Definition: RemoteConnection.h:803
virtual void receivedPublishServiceMsg()
TimeOffsetCompensation & operator=(const Duration &target)
Definition: RemoteConnection.h:215
An object that allows read access to data of a channel.
Definition: ChannelReadWrite.h:440
std::string mHostName
Definition: RemoteConnection.h:936
void parseMessage()
Parses an incoming message (stored in mMessage) and calls the respective receivedXXX method...
virtual void receivedRPCResponseMsg()=0
boost::mutex mMutex
Definition: RemoteConnection.h:233
void setConnection(RemoteConnection *iConnection)
Set the connection pointer. Used to reset the connection on disconnect.
boost::asio::ip::tcp::endpoint mPeerEndpoint
endpoint of the remote peer
Definition: RemoteConnection.h:893
Wrapper class for boost::posix_time::ptime for adding more functionality to it.
Definition: Time.h:421
#define MIRA_ABSTRACT_OBJECT(classIdentifier)
Use this MACRO instead of MIRA_OBJECT to declare the class as abstract.
Definition: FactoryMacros.h:239
std::set< std::string > StringSet
Definition: RemoteConnection.h:282
void setAddress(const KnownFramework &addr)
Definition: RemoteConnection.h:544
void migrateUnit(const std::string &id)
Send a request to the connected framework to transfer ownership of a unit to this framework...
friend class ClassFactoryDefaultConstClassBuilder
Definition: RemoteConnection.h:949
virtual void onDisconnect()
Implementation of RemoteConnection.
virtual void publishServicesFiltered(const StringSet &services)
virtual void unpublishServices(const StringSet &services)=0
Notifies the connected framework that the services in the services set do not longer exist in our fra...
void setAutoReconnect(bool autoReconnect)
Definition: RemoteConnection.h:549
Marker for indicating parameters that should be ignored if they are missing in the config file...
Definition: IgnoreMissing.h:73
Definition: RemoteConnection.h:942
boost::thread mSendRPCMessagesThread
Definition: RemoteConnection.h:760
void receivedUnpublishChannelMsg()
MIRA_BASE_EXPORT void write(const Value &value, std::ostream &ioStream, bool formatted=false, int precision=-1)
Writes a json::Value into a given stream using the JSON format.
Commonly used exception classes.
void writeMessageFromData(FrameworkMessageType msg, const DataType &data)
Writes a message to the other framework.
Definition: RemoteConnection.h:638
TimerPtr mSyncTimeTimer
Definition: RemoteConnection.h:758
void updateOutgoingStats(std::size_t size)
boost::shared_ptr< RPCRemoteFinishHandler > mRPCFinishHandler
Definition: RemoteConnection.h:794
virtual void onWriteError(boost::system::system_error &e)
Implementation of RemoteConnection.
virtual void publishAuthorities(const AuthorityDescriptions &authorities)
Notifies the connected framework that the authorities in the authorities map exist in our framework...
Message types exchanged between remote frameworks.
const std::string & getFrameworkID() const
Definition: RemoteConnection.h:534
#define MIRA_FRAMEWORK_EXPORT
Definition: FrameworkExports.h:61
void writeMessage(FrameworkMessageType msg, Arg &&arg, Args &&... args)
Writes a message to the other framework.
Definition: RemoteConnection.h:608
void write(const BufferSequence &buffers)
Send data in the buffers to the connected framework.
Definition: RemoteConnection.h:502
sec_type seconds() const
Returns normalized number of seconds (0..59)
Definition: Time.h:283
Use this class to represent time durations.
Definition: Time.h:104
void reflect(Reflector &r)
Definition: RemoteConnection.h:376
void receivedPublishAuthorityMsg()
std::pair< FrameworkMessageType, Buffer< uint8 > > RPCMessage
Definition: RemoteConnection.h:798
void reflect(Reflector &r)
Definition: RemoteConnection.h:863
void receivedPTPDelayResponse(uint64 timestamp)
The object class acts as a generic base class for classes which should be used with the classFactory...
Definition: Object.h:144
boost::asio::ip::tcp::socket & getSocket()
Returns the network socket of this connection.
Definition: RemoteConnection.h:393
ChannelSendMap subscriptions
List of channels the connected framework is subscribed to + what was sent to them.
Definition: RemoteConnection.h:673
KnownFramework()
Definition: RemoteConnection.h:82
bool mPTPOutgoing
Definition: RemoteConnection.h:779
void handleReadMessage(const boost::system::error_code &error)
Authorities act as a facade to the framework.
Definition: Authority.h:94
Getter< T > getter(T(*f)())
Creates a Getter for global or static class methods returning the result by value.
Definition: GetterSetter.h:136
bool isInitialized() const
Definition: RemoteConnection.h:190
ConcreteRemoteOutgoingConnection< 0 > RemoteOutgoingConnectionLegacy
Definition: RemoteConnection.h:953
void handleReadHeader(const boost::system::error_code &error)
ConcreteRemoteOutgoingConnection< 2 > RemoteOutgoingConnection
Definition: RemoteConnection.h:952
bool forcePTP
force PTP time sync
Definition: RemoteConnection.h:132
Definition: RemoteConnection.h:774
uint16 mPort
Definition: RemoteConnection.h:937
virtual ~ConcreteRemoteConnection()
Destructor.
Definition: RemoteConnection.h:820
virtual void onRPCrequested(Buffer< uint8 > &&request)
Implementation of RPCManager::RemoteRequestHandler Will send request to the framework that will proce...
Time mPingLastReceived
Definition: RemoteConnection.h:787
void reflect(Reflector &r)
Reflect method for serialization.
Definition: RemoteConnection.h:175
void receivedRequestMigrationMsg()
#define MIRA_OBJECT(classIdentifier)
Use this MACRO if you like the factory to automatically extract the class name from the given identif...
Definition: FactoryMacros.h:183
void setAuthority(std::unique_ptr< Authority > auth)
Definition: RemoteConnection.h:529
Framework channel classes.
KnownFramework address
The address of the connected framework.
Definition: RemoteConnection.h:668
AuthState
Definition: RemoteConnection.h:769
TimeOffsetCompensation(const TimeOffsetCompensation &other)
Definition: RemoteConnection.h:161
boost::tuple< std::string, uint16 > getHostPort() const
void handleReadMessage(const boost::system::error_code &error)
RPCHandler for getting notified when an rpc call on server side is finished and the response is ready...
Definition: RemoteConnection.h:301
virtual int addBinaryFormatVersion(Buffer< uint8 > &data)=0
virtual void subscribeChannel(const std::string &channelID, const ServiceLevel &serviceLevel)
Notify the connected remote frameworks that we have a subscriber for the given channel (assuming it h...
bool isPingTimeoutEnabled()
Is ping timeout enabled?
Definition: RemoteConnection.h:418
std::map< std::string, SendData > ChannelSendMap
Definition: RemoteConnection.h:288
Base class of connections between frameworks.
Definition: RemoteConnection.h:276
virtual void start()
Implementation of RemoteConnection.
virtual void onRPCfinished(Buffer< uint8 > &&answer)
Implementation of RPCManager::RemoteFinishHandler Will send answer back to calling framework using th...
virtual void publishServices(const StringSet &services)
Notifies the connected framework that the services in the services set exist in our framework...
boost::mutex mStopMutex
Definition: RemoteConnection.h:756
void receivedPTPFollowUp(uint64 timestamp)
Time mPingLastSend
Definition: RemoteConnection.h:786
#define MIRA_MINOR_VERSION(v)
Calculate the minor version of v.
Definition: FrameworkDefines.h:55
#define MIRA_MAJOR_VERSION(v)
Calculate the major version of v.
Definition: FrameworkDefines.h:53
RemoteOutgoingConnectionBase(const KnownFramework &address)
Contains internal RPCManager class.
void stop()
Close the socket.
bool keep
if true the information is stored in list of frameworks that we try to reconnect to after disconnect ...
Definition: RemoteConnection.h:129
Typedefs and serialization support for uuids.
boost::asio::mutable_buffers_1 asBuffer()
Returns this message as boost asio buffer.
Definition: FrameworkMessage.h:117
std::list< AuthorityDescription > AuthorityDescriptions
Definition: RemoteConnection.h:291
virtual void receivedPublishServiceMsg()=0
UUID remoteID
The UUID of the connected framework.
Definition: RemoteConnection.h:669
bool mEnablePingTimeout
Ping timeout enabled for this connection?
Definition: RemoteConnection.h:679
virtual void unpublishServicesFiltered(const StringSet &services)
static Time now() static Time eternity()
Returns the current utc based time.
Definition: Time.h:484
Definition: BinarySerializer.h:257
Definition: RemoteConnection.h:158
void sendPTP()
Sends a PTP command used for time synchronization between frameworks.
uint8 metaVersion
Definition: RemoteConnection.h:285
virtual void onWriteError(boost::system::system_error &e)
Called when writing to the socket failed. Can be implemented in derived classes.
Definition: RemoteConnection.h:563
void receivedUnpublishServiceMsg()
virtual void onDisconnect()
Called in stop() when connection is about to be stopped.
Definition: RemoteConnection.h:560
void unpublishChannel(const std::string &channel)
Notifies the connected framework that we no longer have a publisher for the given channel...
bool checkMessageHeader() const
Returns true, if the message (header) is valid, i.e.
Wrapper class for boost::asio::io_service.
Definition: IOService.h:75
virtual void receivedRPCRequestMsg()
bool hasAuthority(const std::string &id) const
Check if a authority with given full id exists in the connected framework.
bool mStopScheduled
Definition: RemoteConnection.h:938
uint8 binaryFormatVersion
The binary format used by the framework (to enable connecting to legacy framework).
Definition: RemoteConnection.h:138
boost::uuids::uuid UUID
Shorter name for boost uuid.
Definition: UUID.h:66
TimeOffsetCompensation clockOffset
The clock offset between us and the connected framework.
Definition: RemoteConnection.h:680
Time mPTPSyncLocal
Definition: RemoteConnection.h:780
void writeMessageFromBuffer(FrameworkMessageType msg, const BufferType &buffer)
Writes a message to the other framework.
Definition: RemoteConnection.h:622
const KnownFramework & getAddress() const
Definition: RemoteConnection.h:539
boost::condition_variable mRPCMessagesCondition
Definition: RemoteConnection.h:753
bool monitorOnly
Local channels, services and authorities are not published to the remote side if monitor-only is true...
Definition: RemoteConnection.h:144
Duration offset(const Time &ts) const
void startTimeSync()
Create a timer to frequently call syncTime.
MetaSet sentMetaInformation
Set of type meta information already sent.
Definition: RemoteConnection.h:674
Time mHeaderReceived
Definition: RemoteConnection.h:765
virtual void receivedRPCResponseMsg()
boost::asio::ip::tcp::socket mSocket
Definition: RemoteConnection.h:751
Definition: RemoteConnection.h:284
An authority class that represents a remote authority that is located in a connected framework...
boost::mutex mRPCMessagesMutex
Definition: RemoteConnection.h:800
std::string mMigrationID
Definition: RemoteConnection.h:791
RemoteConnection()
Constructs a remote connection that uses its own io service.
bool isSynchronized() const
synchronizeFrameworks() was executed.
Definition: RemoteConnection.h:438
friend class ClassFactoryDefaultConstClassBuilder
Definition: RemoteConnection.h:860
bool mEnablePTPSync
PTP Sync enabled for this connection?
Definition: RemoteConnection.h:678
void writeMessage(FrameworkMessageType msg)
Writes a message to the other framework.
Definition: RemoteConnection.h:593
Time lastData
Definition: RemoteConnection.h:286
friend void createConcreteRemoteConnectionInstances()
std::string mAuthSignMsg
Definition: RemoteConnection.h:777
void reflect(Reflector &r)
Reflect method for serialization.
Definition: RemoteConnection.h:92
FrameworkMessageType
Remote framework message types.
Definition: FrameworkMessage.h:63
virtual void receivedRPCRequestMsg()=0
bool mStopped
Definition: RemoteConnection.h:792
boost::thread mProcessPingThread
Definition: RemoteConnection.h:761
virtual void onDisconnect()
Implementation of RemoteConnection.
virtual void subscribeChannel(const std::string &channelID, const ServiceLevel &serviceLevel)=0
Notify the connected remote frameworks that we have a subscriber for the given channel (assuming it h...
void ping()
Sends a ping command.
void setCompensationInterval(const Duration &interval)
FrameworkMessageHeader mHeader
Definition: RemoteConnection.h:766
TimeOffsetCompensation(const Duration &compensationInterval=Duration::seconds(10))
Definition: RemoteConnection.h:167
virtual void publishChannels(const ChannelTypeMap &channels)=0
Notifies the connected framework that we have at least one publisher for each of the channels in the ...
void handleReadHeader(const boost::system::error_code &error)
Wrapper for boost::asio::io_service.
std::string address
address in the form of host:port
Definition: RemoteConnection.h:123
Definition: RemoteConnection.h:773
void onConnect(bool enablePTPTimeSync, bool enablePingTimeout)
Called by RemoteModule::onIncomingConnected/onOutgoingConnected.
Connection pool that holds the ownership for RemoteConnections.
Owner of every RemoteConnection.
Definition: RemoteConnectionPool.h:115
void reflect(Reflector &r)
Definition: RemoteConnection.h:909
virtual void sendData(ChannelRead< void > value, ServiceLevel &serviceLevel)
Definition: RemoteConnection.h:730
Class for in-place stream formatting Used for constructs like:
Definition: MakeString.h:63
Definition: RemoteConnection.h:771
Time mStartTime
Definition: RemoteConnection.h:231
Units are basic modules of a complex application.
Definition: MicroUnit.h:69
Time lastConnectionTry
The last time we tried to connect to that address.
Definition: RemoteConnection.h:150
Data that is sent as header in each message between remote frameworks.
Definition: FrameworkMessage.h:101
virtual ~RemoteConnection()
Destructor.
Handler that must be implemented by the remote module to send RPC requests to a remote server...
Definition: RPCManager.h:111
Time mLastPTP
Definition: RemoteConnection.h:784