MIRA
RPCManager.h
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2012 by
3  * MetraLabs GmbH (MLAB), GERMANY
4  * and
5  * Neuroinformatics and Cognitive Robotics Labs (NICR) at TU Ilmenau, GERMANY
6  * All rights reserved.
7  *
8  * Contact: info@mira-project.org
9  *
10  * Commercial Usage:
11  * Licensees holding valid commercial licenses may use this file in
12  * accordance with the commercial license agreement provided with the
13  * software or, alternatively, in accordance with the terms contained in
14  * a written agreement between you and MLAB or NICR.
15  *
16  * GNU General Public License Usage:
17  * Alternatively, this file may be used under the terms of the GNU
18  * General Public License version 3.0 as published by the Free Software
19  * Foundation and appearing in the file LICENSE.GPL3 included in the
20  * packaging of this file. Please review the following information to
21  * ensure the GNU General Public License version 3.0 requirements will be
22  * met: http://www.gnu.org/copyleft/gpl.html.
23  * Alternatively you may (at your option) use any later version of the GNU
24  * General Public License if such license has been publicly approved by
25  * MLAB and NICR (or its successors, if any).
26  *
27  * IN NO EVENT SHALL "MLAB" OR "NICR" BE LIABLE TO ANY PARTY FOR DIRECT,
28  * INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT OF
29  * THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF "MLAB" OR
30  * "NICR" HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  *
32  * "MLAB" AND "NICR" SPECIFICALLY DISCLAIM ANY WARRANTIES, INCLUDING,
33  * BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
34  * FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
35  * ON AN "AS IS" BASIS, AND "MLAB" AND "NICR" HAVE NO OBLIGATION TO
36  * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS OR MODIFICATIONS.
37  */
38 
47 #ifndef _MIRA_RPCMANAGER_H_
48 #define _MIRA_RPCMANAGER_H_
49 
50 #include <set>
51 
52 #include <platform/Platform.h>
53 
54 #include <rpc/RPCServer.h>
55 #include <rpc/RPCClient.h>
56 #include <rpc/RPCMacros.h>
57 #include <rpc/RPCCallDefinition.h>
59 #include <rpc/AbstractRPCHandler.h>
61 
62 namespace mira {
63 
65 
97 {
98 public:
99 
112  {
113  public:
115  virtual void onRPCrequested(Buffer<uint8>&& request) = 0;
116  };
117  typedef boost::shared_ptr<RemoteRequestHandler> RemoteRequestHandlerPtr;
118 
119 
134  {
135  public:
136  virtual ~RemoteFinishHandler() {}
137  virtual void onRPCfinished(Buffer<uint8>&& response) = 0;
138  };
139  typedef boost::shared_ptr<RemoteFinishHandler> RemoteFinishHandlerPtr;
140 
141 public:
142 
144  template<typename Reflector>
145  void reflect(Reflector& r)
146  {
147  r.interface("IRPCManager");
148  r.method("getLocalServices", &RPCManager::getLocalServices, this,
149  "Get all registered local services");
150  r.method("getLocalService", &RPCManager::getLocalService, this,
151  "Returns a reference to the local service object with the given name",
152  "name", "name of the service", "MyService");
153  r.method("getRemoteServices", &RPCManager::getRemoteServices, this,
154  "Get all registered remote services");
155  r.method("getRemoteService", &RPCManager::getRemoteService, this,
156  "Returns a reference to the remote service object with the given name",
157  "name", "name of the service", "MyService");
158  r.method("existsService", &RPCManager::existsService, this,
159  "Returns true, if a service with the given name exists, otherwise false",
160  "name", "name of the service", "MyService");
161  r.method("queryServicesForInterface", &RPCManager::queryServicesForInterface, this,
162  "Returns a string list with the names of all registered services (local"
163  "and remote), that implement the specified interface",
164  "name", "name of the interface", "MyInterface");
165  r.method("implementsInterface", &RPCManager::implementsInterface, this,
166  "Returns if the given service implements a certain interface",
167  "name", "name of the service", "MyService",
168  "interface", "name of the interface", "MyInterface");
169  }
170 
171 public: // local service registering
172 
179  template <typename Service>
180  void addLocalService(const std::string& name, Service& service,
181  AbstractRPCHandlerPtr handler)
182  {
183  boost::mutex::scoped_lock lock(mServiceManagementMutex);
184 
185  // reflect the service to extend the service object and get the new methods
186  // and interfaces
187  std::set<RPCSignature> addedMethods;
188  std::set<std::string> addedInterfaces;
189  const RPCServer::Service& s = mServer.registerServiceObject(name,
190  service, &addedMethods, &addedInterfaces);
191 
192 
193  // add the service methods and their handlers
194  foreach(const RPCSignature& s, addedMethods)
195  {
196  std::string method = name + "." + s.name;
197  auto it = mLocalServiceMethods.find(method);
198  if(it == mLocalServiceMethods.end())
199  mLocalServiceMethods.insert(std::make_pair(method, handler));
200  else {
201  if(it->second != handler)
202  MIRA_THROW(XLogical, "The service method '" << method <<
203  "' was already registered with a different handler");
204  }
205  }
206 
207  // loop through all new interfaces of the service s
208  foreach(const std::string& ifc, addedInterfaces)
209  handleCallbacks(ifc, name);
210 
211  mLocalServices.insert(name);
212  }
213 
217  void removeLocalService(const std::string& name);
218 
222  std::set<std::string> getLocalServices() const;
223 
229  const RPCServer::Service& getLocalService(const std::string& name) const {
230  return mServer.getService(name);
231  }
232 
233 public: // remote service registering
234 
236 
246  void addRemoteService(const RemoteService& serviceInfo,
247  RemoteRequestHandlerPtr handler,
248  uint8 binaryFormatVersion = BinaryBufferSerializer::getSerializerFormatVersion());
249 
253  void removeRemoteService(const std::string& name);
254 
258  std::set<std::string> getRemoteServices() const;
259 
265  const RemoteService& getRemoteService(const std::string& name) const;
266 
267 public: // interface handling
268 
275  void registerCallback(const std::string& interface,
277 
281  void unregisterCallback(const std::string& interface,
283 
287  void unregisterCallback(AbstractInterfaceCallbackHandlerPtr callback);
288 
292  bool existsService(const std::string& name) const
293  {
294  boost::mutex::scoped_lock lock(mServiceManagementMutex);
295 
296  // check local services first
297  if(mServer.existsService(name))
298  return true;
299 
300  // otherwise examine all remote services
301  for(auto it=mRemoteServices.begin(); it!=mRemoteServices.end(); ++it)
302  if(it->second.name==name)
303  return true; // found a remote service with the desired name
304 
305  return false; // nothing found
306  }
307 
315  bool waitForService(const std::string& name,
316  Duration timeout = Duration::invalid()) const;
317 
326  std::string waitForServiceInterface(const std::string& interface,
327  Duration timeout = Duration::invalid()) const;
328 
333  std::list<std::string> queryServicesForInterface(const std::string& interface) const
334  {
335  boost::mutex::scoped_lock lock(mServiceManagementMutex);
336 
337  // get the local interfaces first
338  std::list<std::string> res = mServer.queryServicesForInterface(interface);
339 
340  // now examine all remote services
341  for(auto it=mRemoteServices.begin(); it!=mRemoteServices.end(); ++it)
342  // if remote service has the interface ...
343  if(it->second.interfaces.count(interface)!=0)
344  res.push_back(it->second.name); // ... add it
345 
346  return res;
347  }
348 
352  bool implementsInterface(const std::string& name, const std::string& interface) const
353  {
354  boost::mutex::scoped_lock lock(mServiceManagementMutex);
355  // get the local interfaces first
356  if (mServer.implementsInterface(name, interface))
357  return true;
358 
359  // now examine all remote services
360  auto it = mRemoteServices.find(name);
361  if (it == mRemoteServices.end())
362  return false;
363  return it->second.interfaces.count(interface) > 0;
364  }
365 
366 public: // for handling remote requests and their corresponding responses
367 
374  void handleRemoteRequest(Buffer<uint8> buffer,
375  RemoteFinishHandlerPtr handler = RemoteFinishHandlerPtr());
376 
384  void handleRemoteRequest(Buffer<uint8> buffer,
385  RemoteFinishHandlerPtr handler, uint8 binaryFormatVersion);
386 
391  void handleRemoteResponse(Buffer<uint8> buffer);
392 
398  void handleRemoteResponse(Buffer<uint8> buffer, uint8 binaryFormatVersion);
399 
400 public:
401 
410  template<typename R, typename... ARGS>
411  RPCFuture<R> call(const std::string& service, const std::string& method, ARGS&&... args)
412  {
413  Buffer<uint8> buffer;
414  boost::mutex::scoped_lock lock(mServiceManagementMutex);
415  if (mLocalServices.count(service) > 0) {
416  lock.unlock();
417  BinaryRPCBackend::ClientRequest request(&buffer);
418  RPCFuture<R> future = mClient.call<BinaryRPCBackend, R>(
419  request, service, method, forwardRPCParams(std::forward<ARGS>(args))...);
420  handleLocalRequest<BinaryRPCBackend>(std::move(buffer), service);
421  return std::move(future);
422  }
423  else {
424  auto it = mRemoteServices.find(service);
425  if (it == mRemoteServices.end()) {
426  MIRA_THROW(XRPC, "The requested service '" << service << "' is not known.");
427  };
428 
429  lock.unlock();
430  RPCFuture<R> future;
431  if (it->second.binaryFormatVersion == BinaryBufferSerializer::getSerializerFormatVersion()) {
432  BinaryRPCBackend::ClientRequest request(&buffer);
433  future = mClient.call<BinaryRPCBackend, R>(request, service, method,
434  forwardRPCParams(std::forward<ARGS>(args))...);
435  }
436  else if (it->second.binaryFormatVersion == 0) {
437  BinaryRPCBackendLegacy::ClientRequest request(&buffer);
438  future = mClient.call<BinaryRPCBackendLegacy, R>(
439  request, service, method, forwardRPCParams(std::forward<ARGS>(args))...);
440  }
441  else {
442  MIRA_THROW(XRPC, "Requested service expects binary format version "
443  << (int)it->second.binaryFormatVersion
444  << ". RPCManager::call() only implemented for versions 0, "
445  << (int)BinaryBufferSerializer::getSerializerFormatVersion());
446  };
447  it->second.handler->onRPCrequested(std::move(buffer));
448  boost::mutex::scoped_lock lock(mPendingRequestsMutex);
449  mPendingRemoteRequests[future.callId()] = service;
450  return std::move(future);
451  }
452  }
453 
476  RPCFuture<JSONRPCResponse> callJSON(const json::Value& jsonCall);
477 
485  RPCFuture<JSONRPCResponse> callJSON(const std::string& jsonCall);
486 
498  RPCFuture<JSONRPCResponse> callJSON(const std::string& service,
499  const std::string& method,
500  const json::Value& params = json::Value());
501 
513  RPCFuture<JSONRPCResponse> callJSON(const std::string& service,
514  const std::string& method,
515  const std::string& params);
516 
525  RPCFuture<JSONRPCResponse> callJSON(const RPCCallDefinition& rpc);
526 
527 protected:
528 
532  RPCFuture<JSONRPCResponse> callJSONIntern(const std::string& service,
533  const std::string& callID,
534  const json::Value& request);
535 
545  template <typename Backend>
546  void handleLocalRequest(typename Backend::ContainerType buffer,
547  const std::string& service);
548 
549 
559  template <typename Backend>
560  void handleRequest(typename Backend::ContainerType buffer,
561  RemoteFinishHandlerPtr finishHandler, bool remote);
562 
563 private:
564 
570  void handleCallbacks(const std::string& interface,
571  const std::string& servicename);
572 
573 protected:
574 
575  // implements DeferredInvokerFinishHandler
576  virtual void onRPCfinished(AbstractDeferredInvoker* invoker);
577 
578 private:
579 
585  struct AbstractRequestContext
586  {
587  AbstractRequestContext(RemoteFinishHandlerPtr iHandler, bool iRemote) :
588  handler(iHandler), remote(iRemote){}
589 
590  virtual ~AbstractRequestContext() {}
591 
592  virtual void onFinished(RPCClient& client) = 0;
593 
594  public:
595  RemoteFinishHandlerPtr handler;
596  bool remote;
597  };
598  typedef boost::shared_ptr<AbstractRequestContext> AbstractRequestContextPtr;
599 
606  template<typename Backend>
607  struct RequestContext;
608 
609 private:
610 
611  // everything is just passed through except const char* (string literal),
612  // which is explicitly converted to std::string
613  // thus we get the correct RPC signature without the need for explicit cast as in
614  // call<void>("Service", "method", std::string("param"));
615 
616  template<typename T>
617  T&& forwardRPCParams(T&& p) { return std::forward<T>(p); }
618 
619  std::string forwardRPCParams(const char* p) { return std::string(p); }
620 
621 private:
622 
624  RPCServer mServer;
625 
627  RPCClient mClient;
628 
634  struct RemoteServiceDetail : public RemoteService
635  {
636  RemoteServiceDetail(const RemoteService& serviceInfo,
637  RemoteRequestHandlerPtr iHandler,
638  uint8 iBinaryFormatVersion = BinaryBufferSerializer::getSerializerFormatVersion()) :
639  RemoteService(serviceInfo), handler(iHandler),
640  binaryFormatVersion(iBinaryFormatVersion) {}
641 
642  RemoteRequestHandlerPtr handler;
643  uint8 binaryFormatVersion;
644  };
645 
647  mutable boost::mutex mServiceManagementMutex;
648 
653  std::map<std::string, AbstractRPCHandlerPtr> mLocalServiceMethods;
654 
656  std::set<std::string> mLocalServices;
657 
659  std::map<std::string, RemoteServiceDetail> mRemoteServices;
660 
662  mutable boost::mutex mPendingRequestsMutex;
664  std::map<AbstractDeferredInvoker*, AbstractRequestContextPtr> mPendingRequests;
666  std::map<std::string, std::string> mPendingRemoteRequests;
667 
669  mutable boost::mutex mCallbacksMutex;
671  std::multimap<std::string, AbstractInterfaceCallbackHandlerPtr> mInterfaceCallbacks;
672 };
673 
675 
676 }
677 
678 #endif
boost::shared_ptr< RemoteFinishHandler > RemoteFinishHandlerPtr
Definition: RPCManager.h:139
static Duration invalid()
Returns an invalid duration.
Definition: Time.h:252
boost::shared_ptr< AbstractRPCHandler > AbstractRPCHandlerPtr
Definition: AbstractRPCHandler.h:92
Contains all available information about a single RPC service, including the service&#39; name...
Definition: RPCServer.h:302
Abstract interface for handler(s) that get called upon new services with a special interface get avai...
Server side implementation of RPC calls.
BinaryRPCBackendTempl< 2 > BinaryRPCBackend
Definition: BinaryRPCBackend.h:437
Implementation of the client side of an RPC call.
Handler that must be implemented by the remote module to send RPC responses to a remote server which ...
Definition: RPCManager.h:133
specialize cv::DataType for our ImgPixel and inherit from cv::DataType<Vec>
Definition: IOService.h:67
virtual ~RemoteRequestHandler()
Definition: RPCManager.h:114
Abstract interface for DeferredInvoker which is a class to support different RPC backends.
Definition: AbstractDeferredInvoker.h:80
RPCCallDefinition for storing all information required to call a service method.
void reflect(Reflector &r)
Reflect method for serialization.
Definition: RPCManager.h:145
Binary client-side request.
Definition: BinaryRPCBackend.h:228
BinaryRPCBackendTempl< 0 > BinaryRPCBackendLegacy
Definition: BinaryRPCBackend.h:436
An RPCFuture is a proxy for the result of an asynchronous RPC call.
Definition: RPCFuture.h:173
#define MIRA_THROW(ex, msg)
Macro for throwing an exception.
Definition: Exception.h:81
std::list< std::string > queryServicesForInterface(const std::string &interface) const
Returns a string list with the names of all registered services (local and remote), that implement the specified interface.
Definition: RPCManager.h:333
An exception that is thrown by the RPCServer if an RPC call fails.
Definition: RPCError.h:76
bool implementsInterface(const std::string &name, const std::string &interface) const
Returns if the given service implements a certain interface.
Definition: RPCManager.h:352
This is a specialization for JSON RPC calls.
Definition: RPCFuture.h:315
Use this class to represent time durations.
Definition: Time.h:104
RPCFuture< R > call(const std::string &service, const std::string &method, ARGS &&... args)
Method that is used by local C++ clients for requesting an rpc call.
Definition: RPCManager.h:411
Abstract interface for DeferredInvoker.
Abstract interface for RPCHandler(s).
std::string name
The method&#39;s name.
Definition: RPCSignature.h:134
std::set< std::string > getLocalServices() const
Get all registered local services.
Utility macros used in generating RPC method definitions with BOOST_PP.
json_spirit::mValue Value
A value is an abstract description of data in JSON (underlying data can either be one of the JSON bas...
Definition: JSON.h:176
boost::shared_ptr< RemoteRequestHandler > RemoteRequestHandlerPtr
Definition: RPCManager.h:117
std::set< std::string > getRemoteServices() const
Get all registered remote services.
void addLocalService(const std::string &name, Service &service, AbstractRPCHandlerPtr handler)
Adds a local service.
Definition: RPCManager.h:180
bool existsService(const std::string &name) const
Returns true, if a service with the given name exists, otherwise false.
Definition: RPCManager.h:292
virtual ~RemoteFinishHandler()
Definition: RPCManager.h:136
const RemoteService & getRemoteService(const std::string &name) const
Returns a reference to the remote service info object with the given name.
const std::string & callId()
query call ID
Definition: RPCFuture.h:87
const RPCServer::Service & getLocalService(const std::string &name) const
Returns a reference to the local service object with the given name.
Definition: RPCManager.h:229
Handler that is called when a deferred RPC call was executed and finished and therefore when the resp...
Definition: AbstractDeferredInvoker.h:67
#define MIRA_BASE_EXPORT
This is required because on windows there is a macro defined called ERROR.
Definition: Platform.h:153
boost::shared_ptr< AbstractInterfaceCallbackHandler > AbstractInterfaceCallbackHandlerPtr
Definition: AbstractInterfaceCallbackHandler.h:76
Stores info required to call an RPC method - service name, method name, arguments.
Definition: RPCCallDefinition.h:62
This class is for internal use only.
Definition: RPCManager.h:96
Stores the signature of an RPC method including the methods name and its parameter types...
Definition: RPCSignature.h:68
RPCServer::ServiceInfo< RPCServer::MethodInfoSet > RemoteService
Definition: RPCManager.h:235
Platform dependent defines and macros.
Handler that must be implemented by the remote module to send RPC requests to a remote server...
Definition: RPCManager.h:111