Source code for rpyc.utils.factory

RPyC connection factories: ease the creation of a connection for the common
from __future__ import with_statement
import socket
from contextlib import closing
from functools import partial
import threading
    from thread import interrupt_main
except ImportError:
        from _thread import interrupt_main
    except ImportError:
        # assume jython (#83)
        from java.lang import System
        interrupt_main = System.exit

from import Channel
from import SocketStream, TunneledSocketStream, PipeStream
from rpyc.core.service import VoidService, MasterService, SlaveService
from rpyc.utils.registry import UDPRegistryClient
from rpyc.lib import safe_import, spawn
ssl = safe_import("ssl")

[docs] class DiscoveryError(Exception): pass
[docs] class ForbiddenError(Exception): pass
# ------------------------------------------------------------------------------ # API # ------------------------------------------------------------------------------
[docs] def connect_channel(channel, service=VoidService, config={}): """creates a connection over a given channel :param channel: the channel to use :param service: the local service to expose (defaults to Void) :param config: configuration dict :returns: an RPyC connection """ return service._connect(channel, config)
[docs] def connect_stream(stream, service=VoidService, config={}): """creates a connection over a given stream :param stream: the stream to use :param service: the local service to expose (defaults to Void) :param config: configuration dict :returns: an RPyC connection """ return connect_channel(Channel(stream), service=service, config=config)
[docs] def connect_pipes(input, output, service=VoidService, config={}): """ creates a connection over the given input/output pipes :param input: the input pipe :param output: the output pipe :param service: the local service to expose (defaults to Void) :param config: configuration dict :returns: an RPyC connection """ return connect_stream(PipeStream(input, output), service=service, config=config)
[docs] def connect_stdpipes(service=VoidService, config={}): """ creates a connection over the standard input/output pipes :param service: the local service to expose (defaults to Void) :param config: configuration dict :returns: an RPyC connection """ return connect_stream(PipeStream.from_std(), service=service, config=config)
[docs] def connect(host, port, service=VoidService, config={}, ipv6=False, keepalive=False): """ creates a socket-connection to the given host and port :param host: the hostname to connect to :param port: the TCP port to use :param service: the local service to expose (defaults to Void) :param config: configuration dict :param ipv6: whether to create an IPv6 socket (defaults to ``False``) :param keepalive: whether to set TCP keepalive on the socket (defaults to ``False``) :returns: an RPyC connection """ s = SocketStream.connect(host, port, ipv6=ipv6, keepalive=keepalive) return connect_stream(s, service, config)
[docs] def unix_connect(path, service=VoidService, config={}): """ creates a socket-connection to the given unix domain socket :param path: the path to the unix domain socket :param service: the local service to expose (defaults to Void) :param config: configuration dict :returns: an RPyC connection """ s = SocketStream.unix_connect(path) return connect_stream(s, service, config)
[docs] def ssl_connect(host, port, keyfile=None, certfile=None, ca_certs=None, cert_reqs=None, ssl_version=None, ciphers=None, service=VoidService, config={}, ipv6=False, keepalive=False, verify_mode=None): """ creates an SSL-wrapped connection to the given host (encrypted and authenticated). :param host: the hostname to connect to :param port: the TCP port to use :param service: the local service to expose (defaults to Void) :param config: configuration dict :param ipv6: whether to create an IPv6 socket or an IPv4 one(defaults to ``False``) :param keepalive: whether to set TCP keepalive on the socket (defaults to ``False``) :param keyfile: see ``ssl.SSLContext.load_cert_chain``. May be ``None`` :param certfile: see ``ssl.SSLContext.load_cert_chain``. May be ``None`` :param ca_certs: see ``ssl.SSLContext.load_verify_locations``. May be ``None`` :param cert_reqs: see ``ssl.SSLContext.verify_mode``. By default, if ``ca_cert`` is specified, the requirement is set to ``CERT_REQUIRED``; otherwise it is set to ``CERT_NONE`` :param ssl_version: see ``ssl.SSLContext``. The default is defined by ``ssl.create_default_context`` :param ciphers: see ``ssl.SSLContext.set_ciphers``. May be ``None``. New in Python 2.7/3.2 :param verify_mode: see ``ssl.SSLContext.verify_mode`` :returns: an RPyC connection """ ssl_kwargs = {"server_side": False} if keyfile is not None: ssl_kwargs["keyfile"] = keyfile if certfile is not None: ssl_kwargs["certfile"] = certfile if verify_mode is not None: ssl_kwargs["cert_reqs"] = verify_mode else: ssl_kwargs["cert_reqs"] = ssl.CERT_NONE if ca_certs is not None: ssl_kwargs["ca_certs"] = ca_certs ssl_kwargs["cert_reqs"] = ssl.CERT_REQUIRED if cert_reqs is not None: ssl_kwargs["cert_reqs"] = cert_reqs elif cert_reqs != ssl.CERT_NONE: ssl_kwargs["check_hostname"] = False if ssl_version is not None: ssl_kwargs["ssl_version"] = ssl_version if ciphers is not None: ssl_kwargs["ciphers"] = ciphers s = SocketStream.ssl_connect(host, port, ssl_kwargs, ipv6=ipv6, keepalive=keepalive) return connect_stream(s, service, config)
def _get_free_port(): """attempts to find a free port""" s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) with closing(s): s.bind(("localhost", 0)) return s.getsockname()[1] _ssh_connect_lock = threading.Lock()
[docs] def ssh_connect(remote_machine, remote_port, service=VoidService, config={}): """ Connects to an RPyC server over an SSH tunnel (created by plumbum). See `Plumbum tunneling <>`_ for further details. .. note:: This function attempts to allocate a free TCP port for the underlying tunnel, but doing so is inherently prone to a race condition with other processes who might bind the same port before sshd does. Albeit unlikely, there is no sure way around it. :param remote_machine: an :class:`plumbum.remote.RemoteMachine` instance :param remote_port: the port of the remote server :param service: the local service to expose (defaults to Void) :param config: configuration dict :returns: an RPyC connection """ with _ssh_connect_lock: loc_port = _get_free_port() tun = remote_machine.tunnel(loc_port, remote_port) stream = TunneledSocketStream.connect("localhost", loc_port) stream.tun = tun return service._connect(Channel(stream), config=config)
[docs] def discover(service_name, host=None, registrar=None, timeout=2): """ discovers hosts running the given service :param service_name: the service to look for :param host: limit the discovery to the given host only (None means any host) :param registrar: use this registry client to discover services. if None, use the default UDPRegistryClient with the default settings. :param timeout: the number of seconds to wait for a reply from the registry if no hosts are found, raises DiscoveryError :raises: ``DiscoveryError`` if no server is found :returns: a list of (ip, port) pairs """ if registrar is None: registrar = UDPRegistryClient(timeout=timeout) addrs = if not addrs: raise DiscoveryError(f"no servers exposing {service_name!r} were found") if host: ips = socket.gethostbyname_ex(host)[2] addrs = [(h, p) for h, p in addrs if h in ips] if not addrs: raise DiscoveryError(f"no servers exposing {service_name} were found on {host}") return addrs
def list_services(registrar=None, filter_host=None, timeout=2): services = () if registrar is None: registrar = UDPRegistryClient(timeout=timeout) services = registrar.list(filter_host) if services is None: raise ForbiddenError("Registry doesn't allow listing") return services
[docs] def connect_by_service(service_name, host=None, registrar=None, timeout=2, service=VoidService, config={}): """create a connection to an arbitrary server that exposes the requested service :param service_name: the service to discover :param host: limit discovery to the given host only (None means any host) :param service: the local service to expose (defaults to Void) :param config: configuration dict :raises: ``DiscoveryError`` if no server is found :returns: an RPyC connection """ # The registry server may have multiple services registered for the same service name, # some of which could be dead. We iterate over the list returned and return the first # one we could connect to. If none of the registered servers is responsive we re-throw # the exception addrs = discover(service_name, host=host, registrar=registrar, timeout=timeout) for host, port in addrs: try: return connect(host, port, service, config=config) except socket.error: pass raise DiscoveryError(f"All services are down: {addrs}")
[docs] def connect_subproc(args, service=VoidService, config={}): """runs an rpyc server on a child process that and connects to it over the stdio pipes. uses the subprocess module. :param args: the args to Popen, e.g., ["python", "-u", ""] :param service: the local service to expose (defaults to Void) :param config: configuration dict """ from subprocess import Popen, PIPE proc = Popen(args, stdin=PIPE, stdout=PIPE) conn = connect_pipes(proc.stdout, proc.stdin, service=service, config=config) conn.proc = proc # just so you can have control over the process return conn
def _server(listener, remote_service, remote_config, args=None): try: with closing(listener): client = listener.accept()[0] conn = connect_stream(SocketStream(client), service=remote_service, config=remote_config) if isinstance(args, dict): _oldstyle = (MasterService, SlaveService) is_newstyle = isinstance(remote_service, type) and not issubclass(remote_service, _oldstyle) is_newstyle |= not isinstance(remote_service, type) and not isinstance(remote_service, _oldstyle) is_voidservice = isinstance(remote_service, type) and issubclass(remote_service, VoidService) is_voidservice |= not isinstance(remote_service, type) and isinstance(remote_service, VoidService) if is_newstyle and not is_voidservice: conn._local_root.exposed_namespace.update(args) elif not is_voidservice: conn._local_root.namespace.update(args) conn.serve_all() except KeyboardInterrupt: interrupt_main()
[docs] def connect_thread(service=VoidService, config={}, remote_service=VoidService, remote_config={}): """starts an rpyc server on a new thread, bound to an arbitrary port, and connects to it over a socket. :param service: the local service to expose (defaults to Void) :param config: configuration dict :param remote_service: the remote service to expose (of the server; defaults to Void) :param remote_config: remote configuration dict (of the server) """ listener = socket.socket() listener.bind(("localhost", 0)) listener.listen(1) remote_server = partial(_server, listener, remote_service, remote_config) spawn(remote_server) host, port = listener.getsockname() return connect(host, port, service=service, config=config)
[docs] def connect_multiprocess(service=VoidService, config={}, remote_service=VoidService, remote_config={}, args={}): """starts an rpyc server on a new process, bound to an arbitrary port, and connects to it over a socket. Basically a copy of connect_thread(). However if args is used and if these are shared memory then changes will be bi-directional. That is we now have access to shared memory. :param service: the local service to expose (defaults to Void) :param config: configuration dict :param remote_service: the remote service to expose (of the server; defaults to Void) :param remote_config: remote configuration dict (of the server) :param args: dict of local vars to pass to new connection, form {'name':var} Contributed by *@tvanzyl* """ from multiprocessing import Process listener = socket.socket() listener.bind(("localhost", 0)) listener.listen(1) remote_server = partial(_server, listener, remote_service, remote_config, args) t = Process(target=remote_server) t.start() host, port = listener.getsockname() return connect(host, port, service=service, config=config)