MIRA
RemoteConnection.h
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2012 by
3  * MetraLabs GmbH (MLAB), GERMANY
4  * and
5  * Neuroinformatics and Cognitive Robotics Labs (NICR) at TU Ilmenau, GERMANY
6  * All rights reserved.
7  *
8  * Contact: info@mira-project.org
9  *
10  * Commercial Usage:
11  * Licensees holding valid commercial licenses may use this file in
12  * accordance with the commercial license agreement provided with the
13  * software or, alternatively, in accordance with the terms contained in
14  * a written agreement between you and MLAB or NICR.
15  *
16  * GNU General Public License Usage:
17  * Alternatively, this file may be used under the terms of the GNU
18  * General Public License version 3.0 as published by the Free Software
19  * Foundation and appearing in the file LICENSE.GPL3 included in the
20  * packaging of this file. Please review the following information to
21  * ensure the GNU General Public License version 3.0 requirements will be
22  * met: http://www.gnu.org/copyleft/gpl.html.
23  * Alternatively you may (at your option) use any later version of the GNU
24  * General Public License if such license has been publicly approved by
25  * MLAB and NICR (or its successors, if any).
26  *
27  * IN NO EVENT SHALL "MLAB" OR "NICR" BE LIABLE TO ANY PARTY FOR DIRECT,
28  * INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT OF
29  * THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF "MLAB" OR
30  * "NICR" HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  *
32  * "MLAB" AND "NICR" SPECIFICALLY DISCLAIM ANY WARRANTIES, INCLUDING,
33  * BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
34  * FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
35  * ON AN "AS IS" BASIS, AND "MLAB" AND "NICR" HAVE NO OBLIGATION TO
36  * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS OR MODIFICATIONS.
37  */
38 
47 #ifndef _MIRA_REMOTECONNECTION_H_
48 #define _MIRA_REMOTECONNECTION_H_
49 
50 #ifndef Q_MOC_RUN
51 #include <boost/array.hpp>
52 #include <boost/make_shared.hpp>
53 #endif
54 
55 #include <utils/UUID.h>
57 #include <error/Exceptions.h>
58 
59 #include <rpc/RPCManager.h>
60 
61 #include <fw/RemoteAuthority.h>
63 
65 #include <fw/Channel.h>
66 #include <fw/FrameworkMessage.h>
67 #include <fw/ServiceLevel.h>
68 
69 namespace mira {
70 
72 
74 class Authority;
75 class MicroUnit;
76 
81 {
83  address(""),
84  keep(true),
85  forcePTP(false),
86  binaryFormatVersion(BinaryBufferSerializer::getSerializerFormatVersion()),
87  monitorOnly(false),
88  lastConnectionTry(Time::unixEpoch()) {}
89 
91  template<typename Reflector>
92  void reflect(Reflector& r)
93  {
94  r.member("Address", address, "");
95  r.roproperty("Address", address, "The address IP:port");
96 
97  r.member("KeepConnected", keep, "", true);
98  r.roproperty("KeepConnected", keep, "Whether to reconnect after a lost connection");
99 
100  r.member("ForcePTP", forcePTP, "", false);
101  r.roproperty("ForcePTP", forcePTP, "Whether to force PTP time sync for this connection");
102 
103  r.member("BinaryFormatVersion", binaryFormatVersion, "",
104  BinaryBufferSerializer::getSerializerFormatVersion());
105  r.roproperty("BinaryFormatVersion", binaryFormatVersion,
106  "Binary format version used by the remote framework");
107 
108  r.member("MonitorOnly", monitorOnly, "", false);
109  r.roproperty("MonitorOnly", monitorOnly,
110  "Monitor-only connections do not publish local channels, "
111  "services and authorities to remote framework");
112 
113  r.member("LastConnectionTry", lastConnectionTry, "",
115  r.roproperty("LastConnectionTry", lastConnectionTry,
116  "Last time tried to establish connection (UTC)");
117  }
118 
119  void validate() const;
120  boost::tuple<std::string, uint16> getHostPort() const;
121 
123  std::string address;
124 
129  bool keep;
130 
132  bool forcePTP;
133 
139 
145 
151 };
152 
153 template <typename SerializerTag>
154 class IsTransparentSerializable<KnownFramework, SerializerTag> : public std::true_type {};
155 
157 
159 {
160 public:
164  mStartOffset(other.mStartOffset),
165  mStartTime(other.mStartTime) {}
166 
167  explicit TimeOffsetCompensation(const Duration& compensationInterval = Duration::seconds(10))
168  : mCompensationInterval(compensationInterval.totalMicroseconds()),
169  mTargetOffset(Duration::invalid()),
170  mStartOffset(Duration::invalid()),
171  mStartTime(Time::invalid()) {}
172 
174  template<typename Reflector>
175  void reflect(Reflector& r)
176  {
177  r.property("CompensationInterval",
180  "Interval to reach target offset");
181  r.roproperty("TargetOffset", mTargetOffset, "Target offset");
182  r.roproperty("StartOffset", mStartOffset, "Start offset");
183  r.roproperty("StartTime", mStartTime, "Time when start offset was valid");
184  r.roproperty("CurrentOffset",
186  "Current offset (interpolated between start and target offset)");
187  }
188 
189 public:
190  bool isInitialized()const { return mTargetOffset.isValid(); }
191 
192 public:
194  void setCompensationInterval(const Duration& interval);
195 
201  void setTargetOffset(const Duration& target, const Time& localTs);
202 
203 public:
205  Duration queryLocalOffset(const Time& localTs) const;
206 
208  Duration queryRemoteOffset(const Time& remoteTs) const;
209 
210 public:
211  // assignment operator uses copy constructor (through by-value argument)
213  using std::swap;
215  swap(mTargetOffset, other.mTargetOffset);
216  swap(mStartOffset, other.mStartOffset);
217  swap(mStartTime, other.mStartTime);
218  return *this;
219  }
220 
221 protected:
224 
226  Duration offset(const Time& ts, bool local) const;
227 
228 public:
229  int mCompensationInterval; // total microseconds
233 
234  mutable boost::mutex mMutex;
235 };
236 
238 
277 class RemoteConnection : public Object
278 {
280 public:
281 
282  typedef std::map<std::string, Typename> ChannelTypeMap;
283  typedef std::set<std::string> StringSet;
284 
285  struct SendData {
288  };
289  typedef std::map<std::string, SendData> ChannelSendMap;
290 
291  typedef std::set<std::string> MetaSet;
292  typedef std::list<AuthorityDescription> AuthorityDescriptions;
293 
295 
303  {
304  public:
307  mConnection(iConnection)
308  {}
309 
315  virtual void onRPCfinished(Buffer<uint8>&& answer);
316 
318  void setConnection(RemoteConnection* iConnection);
319 
320  protected:
321 
322  boost::mutex mConnectionMutex;
324  };
325 
327 
334  {
335  public:
338  mConnection(iConnection)
339  {}
340 
346  virtual void onRPCrequested(Buffer<uint8>&& request);
347 
349  void setConnection(RemoteConnection* iConnection);
350 
351  protected:
352 
353  boost::mutex mConnectionMutex;
355  };
356 
358 
359 protected:
365 
370  RemoteConnection(boost::asio::io_service& service);
371 
372 public:
374  virtual ~RemoteConnection();
375 
376  template<typename Reflector>
377  void reflect(Reflector& r)
378  {
379  r.roproperty("RemoteFrameworkName", frameworkID, "ID/Name of the remote framework");
380  r.roproperty("RemoteProtocolVersion",
381  getter<std::string>( [&]()->std::string {
382  return MakeString() << MIRA_MAJOR_VERSION(remoteVersion) << "."
384  "Protocol version of the remote framework");
385  r.roproperty("Connected", synchronizedTime, "Time when the connection was established");
386  r.roproperty("PTPSync", getter(&RemoteConnection::isPTPSyncEnabled, this),
387  "Synchronization of clocks via PTP enabled?");
388  r.roproperty("PingTimeout", getter(&RemoteConnection::isPingTimeoutEnabled, this),
389  "Ping timeout enabled?");
390  r.property("TimeSynch", clockOffset, "PTP time synchronization");
391  }
392 
394  boost::asio::ip::tcp::socket& getSocket()
395  {
396  return mSocket;
397  }
398 
400  virtual void start();
401 
403  void stop();
404 
410  void onConnect(bool enablePTPTimeSync, bool enablePingTimeout);
411 
414  {
415  return mEnablePTPSync;
416  }
417 
420  {
421  return mEnablePingTimeout;
422  }
423 
430  void startTimeSync();
431 
432 
439  bool isSynchronized() const
440  {
441  return synchronizedTime.isValid();
442  }
443 
448  virtual void publishChannels(const ChannelTypeMap& channels) = 0;
449 
454  void unpublishChannel(const std::string& channel);
455 
460  virtual void subscribeChannel(const std::string& channelID, const ServiceLevel& serviceLevel) = 0;
461 
466  virtual void publishAuthorities(const AuthorityDescriptions& authorities) = 0;
467 
472  virtual void unpublishAuthorities(const AuthorityDescriptions& authorities) = 0;
473 
478  void migrateUnit(const std::string& id);
479 
484  bool hasAuthority(const std::string& id) const;
485 
490  virtual void publishServices(const StringSet& services) = 0;
491 
496  virtual void unpublishServices(const StringSet& services) = 0;
497 
502  template <typename BufferSequence>
503  void write(const BufferSequence& buffers)
504  {
505  boost::mutex::scoped_lock lock(mWriteMutex);
506  if (mStopped)
507  return;
508  try
509  {
510  std::size_t bytes = boost::asio::write(mSocket, buffers, boost::asio::transfer_all());
511  updateOutgoingStats(bytes);
512  }
513  catch (boost::system::system_error& e)
514  {
515  // let the derived class decide what to do with the error
516  onWriteError(e);
517  }
518  }
519 
521  {
522  return remoteID;
523  }
524 
525  std::string getGlobalID() const
526  {
527  return authority->getGlobalID();
528  }
529 
530  void setAuthority(std::unique_ptr<Authority> auth)
531  {
532  authority = std::move(auth);
533  }
534 
535  const std::string& getFrameworkID() const
536  {
537  return frameworkID;
538  }
539 
540  const KnownFramework& getAddress() const
541  {
542  return address;
543  }
544 
545  void setAddress(const KnownFramework& addr)
546  {
547  address = addr;
548  }
549 
550  void setAutoReconnect(bool autoReconnect)
551  {
552  address.keep = autoReconnect;
553  }
554 
555 protected:
556 
561  virtual void onDisconnect() {}
562 
564  virtual void onWriteError(boost::system::system_error& e) {}
565 
572  void syncTime();
573 
577  void sendPTP();
578 
583  void ping();
584 
588  bool hasPingTimeout() const;
589 
595  FrameworkMessageHeader header(0, msg);
596  boost::array<boost::asio::const_buffer, 1> buffers =
597  {{
598  header.asBuffer(),
599  }};
600  write(buffers);
601  }
602 
608  template <typename Arg, typename... Args>
609  void writeMessage(FrameworkMessageType msg, Arg&& arg, Args&&... args)
610  {
612  BinaryBufferOstream os(&buffer);
613  writeArgsToStream(os, std::forward<Arg>(arg), std::forward<Args>(args)...);
614  writeMessageFromBuffer(msg, buffer);
615  }
616 
622  template <typename BufferType>
623  void writeMessageFromBuffer(FrameworkMessageType msg, const BufferType& buffer) {
624  FrameworkMessageHeader header(buffer.size(), msg);
625  boost::array<boost::asio::const_buffer, 2> buffers =
626  {{
627  header.asBuffer(),
628  boost::asio::buffer(buffer.data(), buffer.size())
629  }};
630  write(buffers);
631  }
632 
638  template <typename DataType>
639  void writeMessageFromData(FrameworkMessageType msg, const DataType& data) {
640  FrameworkMessageHeader header(sizeof(DataType), msg);
641  boost::array<boost::asio::const_buffer, 2> buffers =
642  {{
643  header.asBuffer(),
644  boost::asio::buffer((void*)&data, sizeof(DataType))
645  }};
646  write(buffers);
647  }
648 
655 
661  void valueChanged(ChannelRead<void> value, ServiceLevel& serviceLevel);
662 
667  void parseMessage();
668 
671  std::string frameworkID;
673  std::unique_ptr<Authority> authority;
677 
682 
683 private:
684 
685  void init(); // called by constructors to initialize members
686 
688  bool isLocal() const;
689 
690  static void writeArgsToStream(BinaryBufferOstream& stream) {}
691 
692  template <typename Arg, typename... Args>
693  static void writeArgsToStream(BinaryBufferOstream& stream, Arg&& arg, Args&&... args)
694  {
695  stream << std::forward<Arg>(arg);
696  writeArgsToStream(stream, std::forward<Args>(args)...);
697  }
698 
699 protected:
700 
701  void receivedPTPFollowUp(uint64 timestamp);
702  void receivedPTPDelayResponse(uint64 timestamp);
703  void receivedPTPDelayRequest(uint64 timestamp);
704  void receivedPTPFinish();
706  void receivedUnsubscribeChannelRequest(const std::string& channelID);
711  virtual void receivedPublishServiceMsg() = 0;
714  virtual void receivedRPCRequestMsg() = 0;
715  virtual void receivedRPCResponseMsg() = 0;
717  void receivedMigrationMsg();
721  void receivedTypeMetaMsg();
722  void receivedChannelMetaMsg();
723  void receivedPingMsg();
724 
725  // This method must not be pure virtual: When a remote incoming connection
726  // times out and will be destroyed, some still pending valueChanged()
727  // callbacks will be executed, which will call sendData. At this time, the
728  // RemoteIncomingConnection is already destroyed (or will be destroyed) and
729  // then sendData will be a pure-virtual function.
730  // TODO: that still applies after r9735?
731  virtual void sendData(ChannelRead<void> value, ServiceLevel& serviceLevel) {}
732 
733  virtual int addBinaryFormatVersion(Buffer<uint8>& data) = 0;
734  void synchronizeFrameworks();
735  void updateOutgoingStats(std::size_t size);
736 
742  bool checkMessageHeader() const;
743 
744  void sendConnectDenied(const std::string& msg);
745 
746  void sendRPCMessagesThread();
747  void processPingThread();
748  void checkPingTimeoutThread();
750 
752  boost::asio::ip::tcp::socket mSocket;
753 
754  boost::condition_variable mRPCMessagesCondition;
755 
756  boost::mutex mWriteMutex;
757  boost::mutex mStopMutex;
758 
760 
761  boost::thread mSendRPCMessagesThread;
762  boost::thread mProcessPingThread;
763  boost::thread mCheckPingTimeoutThread;
765 
769 
770  enum AuthState {
776  };
778  std::string mAuthSignMsg; // random message that is used for strong authentication
779 
786 
789 
790  boost::shared_ptr<MicroUnit> mMigrationUnit;
791  std::string mMigrationNS;
792  std::string mMigrationID;
793  bool mStopped;
794 
795  boost::shared_ptr<RPCRemoteFinishHandler> mRPCFinishHandler;
796  boost::shared_ptr<RPCRemoteRequestHandler> mRPCRequestHandler;
797  std::map<std::string, boost::shared_ptr<RemoteAuthority>> mRemoteAuthorities;
798 
799  typedef std::pair<FrameworkMessageType, Buffer<uint8>> RPCMessage;
800  std::list<RPCMessage> mOutgoingRPCMessages;
801  boost::mutex mRPCMessagesMutex;
802 
803  std::unordered_map<std::string, ServiceLevel> mPendingChannelUpdates;
804  boost::mutex mChannelUpdatesMutex;
805 };
806 
807 template <uint8 BinaryFormatVersion>
809 {
810 protected:
813 
815  ConcreteRemoteConnection(boost::asio::io_service& service) : RemoteConnection(service) {}
816 
818 
819 public:
822 
823 protected:
824 
825  virtual void publishChannels(const ChannelTypeMap& channels);
826  virtual void subscribeChannel(const std::string& channelID, const ServiceLevel& serviceLevel);
827  virtual void publishAuthorities(const AuthorityDescriptions& authorities);
828  virtual void unpublishAuthorities(const AuthorityDescriptions& authorities);
829  virtual void publishServicesFiltered(const StringSet& services);
830  virtual void publishServices(const StringSet& services);
831  virtual void unpublishServicesFiltered(const StringSet& services);
832  virtual void unpublishServices(const StringSet& services);
833  virtual void sendData(ChannelRead<void> value, ServiceLevel& serviceLevel);
834  virtual int addBinaryFormatVersion(Buffer<uint8>& data);
835 
836  virtual void receivedPublishServiceMsg();
837  virtual void receivedRPCRequestMsg();
838  virtual void receivedRPCResponseMsg();
839 
840 private:
841 
843  boost::shared_ptr<BinarySerializer> createBinarySerializer(Buffer<uint8>* buffer)
844  {
845  return boost::make_shared<BinarySerializer>(buffer);
846  }
847 };
848 
850 
855 {
857 
858 protected:
860  friend class RemoteConnectionPool;
862 public:
863  template<typename Reflector>
864  void reflect(Reflector& r)
865  {
866  static const std::string incoming = "Incoming";
867  r.roproperty("ConnectDirection", incoming, "Incoming or outgoing connection?");
868  r.roproperty("Address", address.address, "Origin address of the connection");
870  }
871 
873  virtual void start();
874 
876  boost::asio::ip::tcp::endpoint& getEndpoint() {
877  return mPeerEndpoint;
878  }
879 
880 protected:
881 
883  virtual void onDisconnect();
884 
885 protected:
886 
887  void handleReadHeader(const boost::system::error_code& error);
888 
889  void handleReadMessage(const boost::system::error_code& error);
890 
891 protected:
892 
894  boost::asio::ip::tcp::endpoint mPeerEndpoint;
895 };
896 
898 
903 {
905 protected:
907 
908 public:
909  template<typename Reflector>
910  void reflect(Reflector& r)
911  {
912  static const std::string outgoing = "Outgoing";
913  r.roproperty("ConnectDirection", outgoing, "Incoming or outgoing connection?");
914  r.roproperty("ConnectDetails", address, "Active connect details");
916  }
917 
919  virtual void start();
920 
921 protected:
923  virtual void onDisconnect();
924 
926  virtual void onWriteError(boost::system::system_error& e);
927 
928 protected:
929 
930  void handleConnect(const boost::system::error_code& error,
931  boost::asio::ip::tcp::resolver::iterator iterator);
932 
933  void handleReadHeader(const boost::system::error_code& error);
934 
935  void handleReadMessage(const boost::system::error_code& error);
936 
937  std::string mHostName;
940 };
941 
942 template <uint8 BinaryFormatVersion>
944  public ConcreteRemoteConnection<BinaryFormatVersion>
945 {
946 protected:
949  friend class RemoteConnectionPool;
951 };
952 
955 
957 
958 }
959 
960 #endif
ConcreteRemoteConnection()
Constructs a remote connection that uses its own io service.
Definition: RemoteConnection.h:812
void write(const Value &value, std::ostream &ioStream, bool formatted=false, int precision=-1)
Writes a json::Value into a given stream using the JSON format.
Information and settings for a known remote framework.
Definition: RemoteConnection.h:80
Duration mTargetOffset
Definition: RemoteConnection.h:230
void handleConnect(const boost::system::error_code &error, boost::asio::ip::tcp::resolver::iterator iterator)
virtual void publishChannels(const ChannelTypeMap &channels)
Notifies the connected framework that we have at least one publisher for each of the channels in the ...
ConcreteRemoteOutgoingConnection(const KnownFramework &address)
Definition: RemoteConnection.h:947
uint32 remoteVersion
The protocol version of the connected framework.
Definition: RemoteConnection.h:672
QoS management informations.
boost::thread mCheckPingTimeoutThread
Definition: RemoteConnection.h:763
std::unordered_map< std::string, ServiceLevel > mPendingChannelUpdates
Definition: RemoteConnection.h:803
std::map< std::string, boost::shared_ptr< RemoteAuthority > > mRemoteAuthorities
Definition: RemoteConnection.h:797
Definition: BinarySerializer.h:316
uint64_t uint64
Definition: Types.h:65
void setConnection(RemoteConnection *iConnection)
Set the connection pointer. Used to reset the connection on disconnect.
bool hasPingTimeout() const
Check if the connection incoming ping&#39;s are still alive.
IOService mService
Definition: RemoteConnection.h:751
Definition: RemoteConnection.h:773
Type trait that indicates whether a type should be serialized "transparently", i.e.
Definition: IsTransparentSerializable.h:81
Duration queryRemoteOffset(const Time &remoteTs) const
Query offset to apply on remote timestamp.
Duration mStartOffset
Definition: RemoteConnection.h:231
std::list< RPCMessage > mOutgoingRPCMessages
Definition: RemoteConnection.h:800
void receivedPTPDelayRequest(uint64 timestamp)
boost::thread mSendChannelUpdatesThread
Definition: RemoteConnection.h:764
Time mPTPDelayLocal
Definition: RemoteConnection.h:783
Duration currentOffset() const
For use in readonly property getter (info on current offset)
Definition: RemoteConnection.h:223
virtual void sendData(ChannelRead< void > value, ServiceLevel &serviceLevel)
boost::asio::ip::tcp::endpoint & getEndpoint()
Returns a reference to the endpoint of the remote peer.
Definition: RemoteConnection.h:876
DispatcherThread::TimerPtr TimerPtr
Definition: DispatcherThread.h:524
void queueRPCMessage(FrameworkMessageType msg, Buffer< uint8 > &&answer)
Queue an outgoing RPC request or RPC response to be transmitted in a separate thread.
void receivedMigrationSinkSuccessMsg()
Definition: RemoteConnection.h:808
virtual void start()
Implementation of RemoteConnection.
bool isPTPSyncEnabled()
Is synchronization of clocks via PTP enabled?
Definition: RemoteConnection.h:413
ServiceLevel by channel name.
Definition: ServiceLevel.h:102
RPCRemoteRequestHandler(RemoteConnection *iConnection)
Constructor taking the connection pointer.
Definition: RemoteConnection.h:337
std::string mMigrationNS
Definition: RemoteConnection.h:791
boost::mutex mConnectionMutex
Definition: RemoteConnection.h:353
void receivedSubscribeChannelRequest()
void syncTime()
Time synchronization between frameworks.
StringSet publishedServices
List of services of the connected framework.
Definition: RemoteConnection.h:676
void receivedMigrationFinishedMsg()
ConcreteRemoteConnection(boost::asio::io_service &service)
Constructs a remote connection that uses a given io service.
Definition: RemoteConnection.h:815
Handler that must be implemented by the remote module to send RPC responses to a remote server which ...
Definition: RPCManager.h:133
int mCompensationInterval
Definition: RemoteConnection.h:229
RPCRemoteFinishHandler(RemoteConnection *iConnection)
Constructor taking the connection pointer.
Definition: RemoteConnection.h:306
Connection class for incoming connections.
Definition: RemoteConnection.h:854
void validate() const
Descriptive informations about an authority.
bool isValid() const
Checks if this duration is invalid.
Definition: Time.h:257
std::unique_ptr< Authority > authority
Our authority used for subscribing to data.
Definition: RemoteConnection.h:673
specialize cv::DataType for our ImgPixel and inherit from cv::DataType<Vec>
Definition: IOService.h:67
virtual void unpublishAuthorities(const AuthorityDescriptions &authorities)=0
Notifies the connected framework that the authorities in the authorities map do not longer exist in o...
Buffer< uint8 > mMessage
Definition: RemoteConnection.h:768
virtual void start()
Starts the connection. Can be implemented in derived classes.
TimeOffsetCompensation & operator=(TimeOffsetCompensation other)
Definition: RemoteConnection.h:212
UUID getRemoteID() const
Definition: RemoteConnection.h:520
RPCHandler for sending a rpc call to the server side.
Definition: RemoteConnection.h:333
boost::shared_ptr< RPCRemoteRequestHandler > mRPCRequestHandler
Definition: RemoteConnection.h:796
bool isValid() const
Returns true if this contains a valid time.
Definition: Time.h:575
Duration getCompensationInterval() const
std::set< std::string > MetaSet
Definition: RemoteConnection.h:291
virtual int addBinaryFormatVersion(Buffer< uint8 > &data)
boost::mutex mConnectionMutex
Definition: RemoteConnection.h:322
void valueChanged(ChannelRead< void > value, ServiceLevel &serviceLevel)
Channel callback method that gets registered on each channel the connected framework subscribes...
Time synchronizedTime
Time when the connection was fully established (e.g. PTP synchronized)
Definition: RemoteConnection.h:678
void receivedUnpublishAuthorityMsg()
#define MIRA_REFLECT_BASE(reflector, BaseClass)
Macro that can be used to reflect the base class easily.
Definition: ReflectorInterface.h:912
Time mPTPDelayRemote
Definition: RemoteConnection.h:784
virtual void publishServices(const StringSet &services)=0
Notifies the connected framework that the services in the services set exist in our framework...
RemoteConnection * mConnection
Definition: RemoteConnection.h:323
boost::shared_ptr< MicroUnit > mMigrationUnit
Definition: RemoteConnection.h:790
Definition: RemoteConnection.h:771
uint32_t uint32
Definition: Types.h:64
Setter< T > setter(void(*f)(const T &))
Creates a Setter for global or static class methods taking the argument by const reference.
Definition: GetterSetter.h:443
std::map< std::string, Typename > ChannelTypeMap
Definition: RemoteConnection.h:282
Time mPTPSyncRemote
Definition: RemoteConnection.h:782
Connection class for outgoing connections.
Definition: RemoteConnection.h:902
std::string frameworkID
The ID/Name of the connected framework.
Definition: RemoteConnection.h:671
void sendConnectDenied(const std::string &msg)
virtual void publishAuthorities(const AuthorityDescriptions &authorities)=0
Notifies the connected framework that the authorities in the authorities map exist in our framework...
boost::mutex mWriteMutex
Definition: RemoteConnection.h:756
virtual void unpublishServices(const StringSet &services)
Notifies the connected framework that the services in the services set do not longer exist in our fra...
AuthState mAuthState
Definition: RemoteConnection.h:777
virtual void unpublishAuthorities(const AuthorityDescriptions &authorities)
Notifies the connected framework that the authorities in the authorities map do not longer exist in o...
void receivedMigrationSinkFailureMsg()
RemoteConnection * mConnection
Definition: RemoteConnection.h:354
void receivedUnsubscribeChannelRequest(const std::string &channelID)
std::string getGlobalID() const
Definition: RemoteConnection.h:525
boost::mutex mChannelUpdatesMutex
Definition: RemoteConnection.h:804
virtual void receivedPublishServiceMsg()
An object that allows read access to data of a channel.
Definition: ChannelReadWrite.h:494
std::string mHostName
Definition: RemoteConnection.h:937
void parseMessage()
Parses an incoming message (stored in mMessage) and calls the respective receivedXXX method...
virtual void receivedRPCResponseMsg()=0
boost::mutex mMutex
Definition: RemoteConnection.h:234
void setConnection(RemoteConnection *iConnection)
Set the connection pointer. Used to reset the connection on disconnect.
boost::asio::ip::tcp::endpoint mPeerEndpoint
endpoint of the remote peer
Definition: RemoteConnection.h:894
Wrapper class for boost::posix_time::ptime for adding more functionality to it.
Definition: Time.h:418
#define MIRA_ABSTRACT_OBJECT(classIdentifier)
Use this MACRO instead of MIRA_OBJECT to declare the class as abstract.
Definition: FactoryMacros.h:235
std::set< std::string > StringSet
Definition: RemoteConnection.h:283
void setAddress(const KnownFramework &addr)
Definition: RemoteConnection.h:545
void migrateUnit(const std::string &id)
Send a request to the connected framework to transfer ownership of a unit to this framework...
friend class ClassFactoryDefaultConstClassBuilder
Definition: RemoteConnection.h:950
virtual void onDisconnect()
Implementation of RemoteConnection.
virtual void publishServicesFiltered(const StringSet &services)
virtual void unpublishServices(const StringSet &services)=0
Notifies the connected framework that the services in the services set do not longer exist in our fra...
void setAutoReconnect(bool autoReconnect)
Definition: RemoteConnection.h:550
Marker for indicating parameters that should be ignored if they are missing in the config file...
Definition: IgnoreMissing.h:73
Definition: RemoteConnection.h:943
boost::thread mSendRPCMessagesThread
Definition: RemoteConnection.h:761
void receivedUnpublishChannelMsg()
Commonly used exception classes.
void writeMessageFromData(FrameworkMessageType msg, const DataType &data)
Writes a message to the other framework.
Definition: RemoteConnection.h:639
TimerPtr mSyncTimeTimer
Definition: RemoteConnection.h:759
void updateOutgoingStats(std::size_t size)
boost::shared_ptr< RPCRemoteFinishHandler > mRPCFinishHandler
Definition: RemoteConnection.h:795
virtual void onWriteError(boost::system::system_error &e)
Implementation of RemoteConnection.
virtual void publishAuthorities(const AuthorityDescriptions &authorities)
Notifies the connected framework that the authorities in the authorities map exist in our framework...
void setTargetOffset(const Duration &target, const Time &localTs)
Set a new target offset.
Message types exchanged between remote frameworks.
const std::string & getFrameworkID() const
Definition: RemoteConnection.h:535
void writeMessage(FrameworkMessageType msg, Arg &&arg, Args &&... args)
Writes a message to the other framework.
Definition: RemoteConnection.h:609
void write(const BufferSequence &buffers)
Send data in the buffers to the connected framework.
Definition: RemoteConnection.h:503
sec_type seconds() const
Returns normalized number of seconds (0..59)
Definition: Time.h:280
Use this class to represent time durations.
Definition: Time.h:106
void reflect(Reflector &r)
Definition: RemoteConnection.h:377
void receivedPublishAuthorityMsg()
std::pair< FrameworkMessageType, Buffer< uint8 > > RPCMessage
Definition: RemoteConnection.h:799
void reflect(Reflector &r)
Definition: RemoteConnection.h:864
void receivedPTPDelayResponse(uint64 timestamp)
The object class acts as a generic base class for classes which should be used with the classFactory...
Definition: Object.h:144
uint8_t uint8
Definition: Types.h:62
boost::asio::ip::tcp::socket & getSocket()
Returns the network socket of this connection.
Definition: RemoteConnection.h:394
ChannelSendMap subscriptions
List of channels the connected framework is subscribed to + what was sent to them.
Definition: RemoteConnection.h:674
KnownFramework()
Definition: RemoteConnection.h:82
bool mPTPOutgoing
Definition: RemoteConnection.h:780
void handleReadMessage(const boost::system::error_code &error)
Getter< T > getter(T(*f)())
Creates a Getter for global or static class methods returning the result by value.
Definition: GetterSetter.h:136
bool isInitialized() const
Definition: RemoteConnection.h:190
ConcreteRemoteOutgoingConnection< 0 > RemoteOutgoingConnectionLegacy
Definition: RemoteConnection.h:954
void handleReadHeader(const boost::system::error_code &error)
ConcreteRemoteOutgoingConnection< 2 > RemoteOutgoingConnection
Definition: RemoteConnection.h:953
static Time now()
Returns the current utc based time.
Definition: Time.h:481
bool forcePTP
force PTP time sync
Definition: RemoteConnection.h:132
Definition: RemoteConnection.h:775
uint16 mPort
Definition: RemoteConnection.h:938
virtual ~ConcreteRemoteConnection()
Destructor.
Definition: RemoteConnection.h:821
virtual void onRPCrequested(Buffer< uint8 > &&request)
Implementation of RPCManager::RemoteRequestHandler Will send request to the framework that will proce...
Time mPingLastReceived
Definition: RemoteConnection.h:788
void reflect(Reflector &r)
Reflect method for serialization.
Definition: RemoteConnection.h:175
void receivedRequestMigrationMsg()
#define MIRA_OBJECT(classIdentifier)
Use this MACRO if you like the factory to automatically extract the class name from the given identif...
Definition: FactoryMacros.h:179
void setAuthority(std::unique_ptr< Authority > auth)
Definition: RemoteConnection.h:530
Framework channel classes.
KnownFramework address
The address of the connected framework.
Definition: RemoteConnection.h:669
uint16_t uint16
Definition: Types.h:63
AuthState
Definition: RemoteConnection.h:770
TimeOffsetCompensation(const TimeOffsetCompensation &other)
Definition: RemoteConnection.h:161
boost::tuple< std::string, uint16 > getHostPort() const
void handleReadMessage(const boost::system::error_code &error)
RPCHandler for getting notified when an rpc call on server side is finished and the response is ready...
Definition: RemoteConnection.h:302
virtual int addBinaryFormatVersion(Buffer< uint8 > &data)=0
virtual void subscribeChannel(const std::string &channelID, const ServiceLevel &serviceLevel)
Notify the connected remote frameworks that we have a subscriber for the given channel (assuming it h...
bool isPingTimeoutEnabled()
Is ping timeout enabled?
Definition: RemoteConnection.h:419
std::map< std::string, SendData > ChannelSendMap
Definition: RemoteConnection.h:289
Base class of connections between frameworks.
Definition: RemoteConnection.h:277
virtual void start()
Implementation of RemoteConnection.
virtual void onRPCfinished(Buffer< uint8 > &&answer)
Implementation of RPCManager::RemoteFinishHandler Will send answer back to calling framework using th...
virtual void publishServices(const StringSet &services)
Notifies the connected framework that the services in the services set exist in our framework...
boost::mutex mStopMutex
Definition: RemoteConnection.h:757
void receivedPTPFollowUp(uint64 timestamp)
Time mPingLastSend
Definition: RemoteConnection.h:787
#define MIRA_MINOR_VERSION(v)
Calculate the minor version of v.
Definition: FrameworkDefines.h:55
#define MIRA_MAJOR_VERSION(v)
Calculate the major version of v.
Definition: FrameworkDefines.h:53
RemoteOutgoingConnectionBase(const KnownFramework &address)
Contains internal RPCManager class.
void stop()
Close the socket.
bool keep
if true the information is stored in list of frameworks that we try to reconnect to after disconnect ...
Definition: RemoteConnection.h:129
Typedefs and serialization support for uuids.
boost::asio::mutable_buffers_1 asBuffer()
Returns this message as boost asio buffer.
Definition: FrameworkMessage.h:117
std::list< AuthorityDescription > AuthorityDescriptions
Definition: RemoteConnection.h:292
virtual void receivedPublishServiceMsg()=0
UUID remoteID
The UUID of the connected framework.
Definition: RemoteConnection.h:670
bool mEnablePingTimeout
Ping timeout enabled for this connection?
Definition: RemoteConnection.h:680
virtual void unpublishServicesFiltered(const StringSet &services)
Definition: BinarySerializer.h:257
Definition: RemoteConnection.h:158
void sendPTP()
Sends a PTP command used for time synchronization between frameworks.
uint8 metaVersion
Definition: RemoteConnection.h:286
virtual void onWriteError(boost::system::system_error &e)
Called when writing to the socket failed. Can be implemented in derived classes.
Definition: RemoteConnection.h:564
void receivedUnpublishServiceMsg()
virtual void onDisconnect()
Called in stop() when connection is about to be stopped.
Definition: RemoteConnection.h:561
void unpublishChannel(const std::string &channel)
Notifies the connected framework that we no longer have a publisher for the given channel...
bool checkMessageHeader() const
Returns true, if the message (header) is valid, i.e.
Wrapper class for boost::asio::io_service.
Definition: IOService.h:75
virtual void receivedRPCRequestMsg()
bool hasAuthority(const std::string &id) const
Check if a authority with given full id exists in the connected framework.
bool mStopScheduled
Definition: RemoteConnection.h:939
uint8 binaryFormatVersion
The binary format used by the framework (to enable connecting to legacy framework).
Definition: RemoteConnection.h:138
boost::uuids::uuid UUID
Shorter name for boost uuid.
Definition: UUID.h:69
TimeOffsetCompensation clockOffset
The clock offset between us and the connected framework.
Definition: RemoteConnection.h:681
Time mPTPSyncLocal
Definition: RemoteConnection.h:781
void writeMessageFromBuffer(FrameworkMessageType msg, const BufferType &buffer)
Writes a message to the other framework.
Definition: RemoteConnection.h:623
const KnownFramework & getAddress() const
Definition: RemoteConnection.h:540
boost::condition_variable mRPCMessagesCondition
Definition: RemoteConnection.h:754
bool monitorOnly
Local channels, services and authorities are not published to the remote side if monitor-only is true...
Definition: RemoteConnection.h:144
void startTimeSync()
Create a timer to frequently call syncTime.
MetaSet sentMetaInformation
Set of type meta information already sent.
Definition: RemoteConnection.h:675
Time mHeaderReceived
Definition: RemoteConnection.h:766
virtual void receivedRPCResponseMsg()
boost::asio::ip::tcp::socket mSocket
Definition: RemoteConnection.h:752
Definition: RemoteConnection.h:285
An authority class that represents a remote authority that is located in a connected framework...
boost::mutex mRPCMessagesMutex
Definition: RemoteConnection.h:801
std::string mMigrationID
Definition: RemoteConnection.h:792
RemoteConnection()
Constructs a remote connection that uses its own io service.
bool isSynchronized() const
synchronizeFrameworks() was executed.
Definition: RemoteConnection.h:439
friend class ClassFactoryDefaultConstClassBuilder
Definition: RemoteConnection.h:861
bool mEnablePTPSync
PTP Sync enabled for this connection?
Definition: RemoteConnection.h:679
void writeMessage(FrameworkMessageType msg)
Writes a message to the other framework.
Definition: RemoteConnection.h:594
Time lastData
Definition: RemoteConnection.h:287
friend void createConcreteRemoteConnectionInstances()
std::string mAuthSignMsg
Definition: RemoteConnection.h:778
void reflect(Reflector &r)
Reflect method for serialization.
Definition: RemoteConnection.h:92
FrameworkMessageType
Remote framework message types.
Definition: FrameworkMessage.h:63
virtual void receivedRPCRequestMsg()=0
bool mStopped
Definition: RemoteConnection.h:793
boost::thread mProcessPingThread
Definition: RemoteConnection.h:762
Duration queryLocalOffset(const Time &localTs) const
Query offset to apply on local timestamp.
Duration offset(const Time &ts, bool local) const
The actual offset calculation (interpolation between start and target offset)
virtual void onDisconnect()
Implementation of RemoteConnection.
virtual void subscribeChannel(const std::string &channelID, const ServiceLevel &serviceLevel)=0
Notify the connected remote frameworks that we have a subscriber for the given channel (assuming it h...
void ping()
Sends a ping command.
void setCompensationInterval(const Duration &interval)
FrameworkMessageHeader mHeader
Definition: RemoteConnection.h:767
TimeOffsetCompensation(const Duration &compensationInterval=Duration::seconds(10))
Definition: RemoteConnection.h:167
virtual void publishChannels(const ChannelTypeMap &channels)=0
Notifies the connected framework that we have at least one publisher for each of the channels in the ...
void handleReadHeader(const boost::system::error_code &error)
Wrapper for boost::asio::io_service.
std::string address
address in the form of host:port
Definition: RemoteConnection.h:123
Definition: RemoteConnection.h:774
void onConnect(bool enablePTPTimeSync, bool enablePingTimeout)
Called by RemoteModule::onIncomingConnected/onOutgoingConnected.
Connection pool that holds the ownership for RemoteConnections.
Owner of every RemoteConnection.
Definition: RemoteConnectionPool.h:115
void reflect(Reflector &r)
Definition: RemoteConnection.h:910
virtual void sendData(ChannelRead< void > value, ServiceLevel &serviceLevel)
Definition: RemoteConnection.h:731
Class for in-place stream formatting Used for constructs like:
Definition: MakeString.h:63
Definition: RemoteConnection.h:772
Time mStartTime
Definition: RemoteConnection.h:232
Time lastConnectionTry
The last time we tried to connect to that address.
Definition: RemoteConnection.h:150
Data that is sent as header in each message between remote frameworks.
Definition: FrameworkMessage.h:101
virtual ~RemoteConnection()
Destructor.
Handler that must be implemented by the remote module to send RPC requests to a remote server...
Definition: RPCManager.h:111
Time mLastPTP
Definition: RemoteConnection.h:785