Source code for rpyc.utils.registry

"""
RPyC Registry Server maintains service information on RPyC services for *Service Registry and Discovery patterns*. Service Registry and Discovery patterns solve the connectivity problem for communication between services and external consumers. RPyC services will register with the server when :code:`auto_register` is :code:`True`.

Service registries such as `Avahi <http://en.wikipedia.org/wiki/Avahi_(software)>`_ and
`Bonjour <http://en.wikipedia.org/wiki/Bonjour_(software)>`_ are alternatives to the RPyC Registry Server. These alternatives do no support Windows and have more restrictive licensing.

Refer to :file:`rpyc/scripts/rpyc_registry.py` for more info.
"""
import sys
import socket
import time
import logging
from contextlib import closing
from rpyc.core import brine


DEFAULT_PRUNING_TIMEOUT = 4 * 60
MAX_DGRAM_SIZE = 1500
REGISTRY_PORT = 18811


# ------------------------------------------------------------------------------
# servers
# ------------------------------------------------------------------------------

[docs] class RegistryServer(object): """Base registry server""" def __init__(self, listenersock, pruning_timeout=None, logger=None, allow_listing=False): self.sock = listenersock self.port = self.sock.getsockname()[1] self.active = False self.services = {} if pruning_timeout is None: pruning_timeout = DEFAULT_PRUNING_TIMEOUT self.pruning_timeout = pruning_timeout if logger is None: logger = self._get_logger() self.allow_listing = allow_listing self.logger = logger def _get_logger(self): raise NotImplementedError()
[docs] def on_service_added(self, name, addrinfo): """called when a new service joins the registry (but not on keepalives). override this to add custom logic"""
[docs] def on_service_removed(self, name, addrinfo): """called when a service unregisters or is pruned. override this to add custom logic"""
def _add_service(self, name, addrinfo): """updates the service's keep-alive time stamp""" if name not in self.services: self.services[name] = {} is_new = addrinfo not in self.services[name] self.services[name][addrinfo] = time.time() if is_new: try: self.on_service_added(name, addrinfo) except Exception: self.logger.exception('error executing service add callback') def _remove_service(self, name, addrinfo): """removes a single server of the given service""" self.services[name].pop(addrinfo, None) if not self.services[name]: del self.services[name] try: self.on_service_removed(name, addrinfo) except Exception: self.logger.exception('error executing service remove callback')
[docs] def cmd_query(self, host, name): """implementation of the ``query`` command""" name = name.upper() self.logger.debug(f"querying for {name!r}") if name not in self.services: self.logger.debug("no such service") return () oldest = time.time() - self.pruning_timeout all_servers = sorted(self.services[name].items(), key=lambda x: x[1]) servers = [] for addrinfo, t in all_servers: if t < oldest: self.logger.debug(f"discarding stale {addrinfo[0]}:{addrinfo[1]}") self._remove_service(name, addrinfo) else: servers.append(addrinfo) self.logger.debug(f"replying with {servers!r}") return tuple(servers)
[docs] def cmd_list(self, host, filter_host): """implementation for the ``list`` command""" self.logger.debug("querying for services list:") if not self.allow_listing: self.logger.debug("listing is disabled") return None services = [] if filter_host[0]: for serv in self.services.keys(): known_hosts = [h[0] for h in self.services[serv].keys()] if filter_host[0] in known_hosts: services.append(serv) services = tuple(services) else: services = tuple(self.services.keys()) self.logger.debug(f"replying with {services}") return services
[docs] def cmd_register(self, host, names, port): """implementation of the ``register`` command""" self.logger.debug(f"registering {host}:{port} as {', '.join(names)}") for name in names: self._add_service(name.upper(), (host, port)) return "OK"
[docs] def cmd_unregister(self, host, port): """implementation of the ``unregister`` command""" self.logger.debug(f"unregistering {host}:{port}") for name in list(self.services.keys()): self._remove_service(name, (host, port)) return "OK"
def _recv(self): raise NotImplementedError() def _send(self, data, addrinfo): raise NotImplementedError() def _work(self): while self.active: try: data, addrinfo = self._recv() except (socket.error, socket.timeout): continue try: magic, cmd, args = brine.load(data) except Exception: continue if magic != "RPYC": self.logger.warning(f"invalid magic: {magic!r}") continue cmdfunc = getattr(self, f"cmd_{cmd.lower()}", None) if not cmdfunc: self.logger.warning(f"unknown command: {cmd!r}") continue try: reply = cmdfunc(addrinfo[0], *args) except Exception: self.logger.exception('error executing function') else: self._send(brine.dump(reply), addrinfo)
[docs] def start(self): """Starts the registry server (blocks)""" if self.active: raise ValueError("server is already running") if self.sock is None: raise ValueError("object disposed") addrinfo = self.sock.getsockname()[:2] self.logger.debug(f"server started on {addrinfo[0]}:{addrinfo[1]}") try: self.active = True self._work() except KeyboardInterrupt: self.logger.warning("User interrupt!") finally: self.active = False self.logger.debug("server closed") self.sock.close() self.sock = None
[docs] def close(self): """Closes (terminates) the registry server""" if not self.active: raise ValueError("server is not running") self.logger.debug("stopping server...") self.active = False
[docs] class UDPRegistryServer(RegistryServer): """UDP-based registry server. The server listens to UDP broadcasts and answers them. Useful in local networks, were broadcasts are allowed""" TIMEOUT = 1.0 def __init__(self, host="0.0.0.0", port=REGISTRY_PORT, pruning_timeout=None, logger=None, allow_listing=False): family, socktype, proto, _, sockaddr = socket.getaddrinfo(host, port, 0, socket.SOCK_DGRAM)[0] sock = socket.socket(family, socktype, proto) sock.bind(sockaddr) sock.settimeout(self.TIMEOUT) RegistryServer.__init__(self, sock, pruning_timeout=pruning_timeout, logger=logger, allow_listing=allow_listing) def _get_logger(self): return logging.getLogger(f"REGSRV/UDP/{self.port}") def _recv(self): return self.sock.recvfrom(MAX_DGRAM_SIZE) def _send(self, data, addrinfo): try: self.sock.sendto(data, addrinfo) except (socket.error, socket.timeout): pass
[docs] class TCPRegistryServer(RegistryServer): """TCP-based registry server. The server listens to a certain TCP port and answers requests. Useful when you need to cross routers in the network, since they block UDP broadcasts""" TIMEOUT = 3.0 def __init__(self, host="0.0.0.0", port=REGISTRY_PORT, pruning_timeout=None, logger=None, reuse_addr=True, allow_listing=False): family, socktype, proto, _, sockaddr = socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM)[0] sock = socket.socket(family, socktype, proto) if reuse_addr and sys.platform != "win32": # warning: reuseaddr is not what you expect on windows! sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind(sockaddr) sock.listen(10) sock.settimeout(self.TIMEOUT) RegistryServer.__init__(self, sock, pruning_timeout=pruning_timeout, logger=logger, allow_listing=allow_listing) self._connected_sockets = {} def _get_logger(self): return logging.getLogger(f"REGSRV/TCP/{self.port}") def _recv(self): sock2, _ = self.sock.accept() addrinfo = sock2.getpeername() data = sock2.recv(MAX_DGRAM_SIZE) self._connected_sockets[addrinfo] = sock2 return data, addrinfo def _send(self, data, addrinfo): sock2 = self._connected_sockets.pop(addrinfo) with closing(sock2): try: sock2.send(data) except (socket.error, socket.timeout): pass
# ------------------------------------------------------------------------------ # clients (registrars) # ------------------------------------------------------------------------------
[docs] class RegistryClient(object): """Base registry client. Also known as **registrar**""" REREGISTER_INTERVAL = 60 def __init__(self, ip, port, timeout, logger=None): self.ip = ip self.port = port self.timeout = timeout if logger is None: logger = self._get_logger() self.logger = logger def _get_logger(self): raise NotImplementedError()
[docs] def discover(self, name): """Sends a query for the specified service name. :param name: the service name (or one of its aliases) :returns: a list of ``(host, port)`` tuples """ raise NotImplementedError()
[docs] def list(self, filter_host=None): """ Send a query for the full lists of exposed servers :returns: a list of `` service_name `` """ raise NotImplementedError()
[docs] def register(self, aliases, port): """Registers the given service aliases with the given TCP port. This API is intended to be called only by an RPyC server. :param aliases: the :class:`service's <rpyc.core.service.Service>` aliases :param port: the listening TCP port of the server """ raise NotImplementedError()
[docs] def unregister(self, port): """Unregisters the given RPyC server. This API is intended to be called only by an RPyC server. :param port: the listening TCP port of the RPyC server to unregister """ raise NotImplementedError()
[docs] class UDPRegistryClient(RegistryClient): """UDP-based registry clients. By default, it sends UDP broadcasts (requires special user privileges on certain OS's) and collects the replies. You can also specify the IP address to send to. Example:: registrar = UDPRegistryClient() list_of_services = registrar.list() list_of_servers = registrar.discover("foo") .. note:: Consider using :func:`rpyc.utils.factory.discover` instead """ def __init__(self, ip="255.255.255.255", port=REGISTRY_PORT, timeout=2, bcast=None, logger=None, ipv6=False): RegistryClient.__init__(self, ip=ip, port=port, timeout=timeout, logger=logger) if ipv6: self.sock_family = socket.AF_INET6 self.bcast = False else: self.sock_family = socket.AF_INET if bcast is None: bcast = "255" in ip.split(".") self.bcast = bcast def _get_logger(self): return logging.getLogger('REGCLNT/UDP')
[docs] def discover(self, name): sock = socket.socket(self.sock_family, socket.SOCK_DGRAM) with closing(sock): if self.bcast: sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, True) data = brine.dump(("RPYC", "QUERY", (name,))) sock.sendto(data, (self.ip, self.port)) sock.settimeout(self.timeout) try: data, _ = sock.recvfrom(MAX_DGRAM_SIZE) except (socket.error, socket.timeout): servers = () else: servers = brine.load(data) return servers
[docs] def list(self, filter_host=None): sock = socket.socket(self.sock_family, socket.SOCK_DGRAM) with closing(sock): if self.bcast: sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, True) data = brine.dump(("RPYC", "LIST", ((filter_host,),))) sock.sendto(data, (self.ip, self.port)) sock.settimeout(self.timeout) try: data, _ = sock.recvfrom(MAX_DGRAM_SIZE * 10) except (socket.error, socket.timeout): services = () else: services = brine.load(data) return services
[docs] def register(self, aliases, port, interface=""): self.logger.info(f"registering on {self.ip}:{self.port}") sock = socket.socket(self.sock_family, socket.SOCK_DGRAM) with closing(sock): sock.bind((interface, 0)) if self.bcast: sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, True) data = brine.dump(("RPYC", "REGISTER", (aliases, port))) sock.sendto(data, (self.ip, self.port)) tmax = time.time() + self.timeout while time.time() < tmax: sock.settimeout(tmax - time.time()) try: data, address = sock.recvfrom(MAX_DGRAM_SIZE) rip, rport = address[:2] except socket.timeout: self.logger.warning("no registry acknowledged") return False if rport != self.port: continue try: reply = brine.load(data) except Exception: continue if reply == "OK": self.logger.info(f"registry {rip}:{rport} acknowledged") return True else: self.logger.warning("no registry acknowledged") return False
[docs] def unregister(self, port): self.logger.info(f"unregistering from {self.ip}:{self.port}") sock = socket.socket(self.sock_family, socket.SOCK_DGRAM) with closing(sock): if self.bcast: sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, True) data = brine.dump(("RPYC", "UNREGISTER", (port,))) sock.sendto(data, (self.ip, self.port))
[docs] class TCPRegistryClient(RegistryClient): """TCP-based registry client. You must specify the host (registry server) to connect to. Example:: registrar = TCPRegistryClient("localhost") list_of_services = registrar.list() list_of_servers = registrar.discover("foo") .. note:: Consider using :func:`rpyc.utils.factory.discover` instead """ def __init__(self, ip, port=REGISTRY_PORT, timeout=2, logger=None): RegistryClient.__init__(self, ip=ip, port=port, timeout=timeout, logger=logger) def _get_logger(self): return logging.getLogger('REGCLNT/TCP')
[docs] def discover(self, name): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) with closing(sock): sock.settimeout(self.timeout) data = brine.dump(("RPYC", "QUERY", (name,))) sock.connect((self.ip, self.port)) sock.send(data) try: data = sock.recv(MAX_DGRAM_SIZE) except (socket.error, socket.timeout): servers = () else: servers = brine.load(data) return servers
[docs] def list(self, filter_host=None): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) with closing(sock): sock.settimeout(self.timeout) data = brine.dump(("RPYC", "LIST", ((filter_host,),))) sock.connect((self.ip, self.port)) sock.send(data) try: data = sock.recv(MAX_DGRAM_SIZE) except (socket.error, socket.timeout): servers = () else: servers = brine.load(data) return servers
[docs] def register(self, aliases, port, interface=""): self.logger.info(f"registering on {self.ip}:{self.port}") sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) with closing(sock): sock.bind((interface, 0)) sock.settimeout(self.timeout) data = brine.dump(("RPYC", "REGISTER", (aliases, port))) try: sock.connect((self.ip, self.port)) sock.send(data) except (socket.error, socket.timeout): self.logger.warning("could not connect to registry") return False try: data = sock.recv(MAX_DGRAM_SIZE) except socket.timeout: self.logger.warning("registry did not acknowledge") return False try: reply = brine.load(data) except Exception: self.logger.warning("received corrupted data from registry") return False if reply == "OK": self.logger.info(f"registry {self.ip}:{self.port} acknowledged") return True
[docs] def unregister(self, port): self.logger.info(f"unregistering from {self.ip}:{self.port}") sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) with closing(sock): sock.settimeout(self.timeout) data = brine.dump(("RPYC", "UNREGISTER", (port,))) try: sock.connect((self.ip, self.port)) sock.send(data) except (socket.error, socket.timeout): self.logger.warning("could not connect to registry")