Source code for rpyc.utils.classic

from __future__ import with_statement
import sys
import os
import inspect
from rpyc.lib.compat import pickle, execute
from rpyc.core.service import ClassicService, Slave
from rpyc.utils import factory
from rpyc.core.service import ModuleNamespace  # noqa: F401
from rpyc.core.consts import STREAM_CHUNK
from contextlib import contextmanager


DEFAULT_SERVER_PORT = 18812
DEFAULT_SERVER_SSL_PORT = 18821

SlaveService = ClassicService   # avoid renaming SlaveService in this module for now

# ===============================================================================
# connecting
# ===============================================================================


[docs] def connect_channel(channel): """ Creates an RPyC connection over the given ``channel`` :param channel: the :class:`rpyc.core.channel.Channel` instance :returns: an RPyC connection exposing ``SlaveService`` """ return factory.connect_channel(channel, SlaveService)
[docs] def connect_stream(stream): """ Creates an RPyC connection over the given stream :param channel: the :class:`rpyc.core.stream.Stream` instance :returns: an RPyC connection exposing ``SlaveService`` """ return factory.connect_stream(stream, SlaveService)
[docs] def connect_stdpipes(): """ Creates an RPyC connection over the standard pipes (``stdin`` and ``stdout``) :returns: an RPyC connection exposing ``SlaveService`` """ return factory.connect_stdpipes(SlaveService)
[docs] def connect_pipes(input, output): """ Creates an RPyC connection over two pipes :param input: the input pipe :param output: the output pipe :returns: an RPyC connection exposing ``SlaveService`` """ return factory.connect_pipes(input, output, SlaveService)
[docs] def connect(host, port=DEFAULT_SERVER_PORT, ipv6=False, keepalive=False): """ Creates a socket connection to the given host and port. :param host: the host to connect to :param port: the TCP port :param ipv6: whether to create an IPv6 socket or IPv4 :returns: an RPyC connection exposing ``SlaveService`` """ return factory.connect(host, port, SlaveService, ipv6=ipv6, keepalive=keepalive)
[docs] def unix_connect(path): """ Creates a socket connection to the given host and port. :param path: the path to the unix domain socket :returns: an RPyC connection exposing ``SlaveService`` """ return factory.unix_connect(path, SlaveService)
[docs] def ssl_connect(host, port=DEFAULT_SERVER_SSL_PORT, keyfile=None, certfile=None, ca_certs=None, cert_reqs=None, ssl_version=None, ciphers=None, ipv6=False): """Creates a secure (``SSL``) socket connection to the given host and port, authenticating with the given certfile and CA file. :param host: the host to connect to :param port: the TCP port to use :param ipv6: whether to create an IPv6 socket or an IPv4 one The following arguments are passed to `ssl.SSLContext <http://docs.python.org/dev/library/ssl.html#ssl.SSLContext>`_ and its corresponding methods: :param keyfile: see ``ssl.SSLContext.load_cert_chain``. May be ``None`` :param certfile: see ``ssl.SSLContext.load_cert_chain``. May be ``None`` :param ca_certs: see ``ssl.SSLContext.load_verify_locations``. May be ``None`` :param cert_reqs: see ``ssl.SSLContext.verify_mode``. By default, if ``ca_cert`` is specified, the requirement is set to ``CERT_REQUIRED``; otherwise it is set to ``CERT_NONE`` :param ssl_version: see ``ssl.SSLContext``. The default is defined by ``ssl.create_default_context`` :param ciphers: see ``ssl.SSLContext.set_ciphers``. May be ``None``. New in Python 2.7/3.2 :returns: an RPyC connection exposing ``SlaveService`` .. _wrap_socket: """ return factory.ssl_connect(host, port, keyfile=keyfile, certfile=certfile, ssl_version=ssl_version, ca_certs=ca_certs, service=SlaveService, ipv6=ipv6)
[docs] def ssh_connect(remote_machine, remote_port): """Connects to the remote server over an SSH tunnel. See :func:`rpyc.utils.factory.ssh_connect` for more info. :param remote_machine: the :class:`plumbum.remote.RemoteMachine` instance :param remote_port: the remote TCP port :returns: an RPyC connection exposing ``SlaveService`` """ return factory.ssh_connect(remote_machine, remote_port, SlaveService)
[docs] def connect_subproc(server_file=None): """Runs an RPyC classic server as a subprocess and returns an RPyC connection to it over stdio :param server_file: The full path to the server script (``rpyc_classic.py``). If not given, ``which rpyc_classic.py`` will be attempted. :returns: an RPyC connection exposing ``SlaveService`` """ if server_file is None: server_file = os.popen("which rpyc_classic.py").read().strip() if not server_file: raise ValueError("server_file not given and could not be inferred") return factory.connect_subproc([sys.executable, "-u", server_file, "-q", "-m", "stdio"], SlaveService)
[docs] def connect_thread(): """ Starts a SlaveService on a thread and connects to it. Useful for testing purposes. See :func:`rpyc.utils.factory.connect_thread` :returns: an RPyC connection exposing ``SlaveService`` """ return factory.connect_thread(SlaveService, remote_service=SlaveService)
[docs] def connect_multiprocess(args={}): """ Starts a SlaveService on a multiprocess process and connects to it. Useful for testing purposes and running multicore code that's uses shared memory. See :func:`rpyc.utils.factory.connect_multiprocess` :returns: an RPyC connection exposing ``SlaveService`` """ return factory.connect_multiprocess(SlaveService, remote_service=SlaveService, args=args)
# =============================================================================== # remoting utilities # ===============================================================================
[docs] def upload(conn, localpath, remotepath, filter=None, ignore_invalid=False, chunk_size=STREAM_CHUNK): """uploads a file or a directory to the given remote path :param localpath: the local file or directory :param remotepath: the remote path :param filter: a predicate that accepts the filename and determines whether it should be uploaded; None means any file :param chunk_size: the IO chunk size """ if os.path.isdir(localpath): upload_dir(conn, localpath, remotepath, filter, chunk_size) elif os.path.isfile(localpath): upload_file(conn, localpath, remotepath, chunk_size) else: if not ignore_invalid: raise ValueError(f"cannot upload {localpath!r}")
def upload_file(conn, localpath, remotepath, chunk_size=STREAM_CHUNK): with open(localpath, "rb") as lf: with conn.builtin.open(remotepath, "wb") as rf: while True: buf = lf.read(chunk_size) if not buf: break rf.write(buf) def upload_dir(conn, localpath, remotepath, filter=None, chunk_size=STREAM_CHUNK): if not conn.modules.os.path.isdir(remotepath): conn.modules.os.makedirs(remotepath) for fn in os.listdir(localpath): if not filter or filter(fn): lfn = os.path.join(localpath, fn) rfn = conn.modules.os.path.join(remotepath, fn) upload(conn, lfn, rfn, filter=filter, ignore_invalid=True, chunk_size=chunk_size)
[docs] def download(conn, remotepath, localpath, filter=None, ignore_invalid=False, chunk_size=STREAM_CHUNK): """ download a file or a directory to the given remote path :param localpath: the local file or directory :param remotepath: the remote path :param filter: a predicate that accepts the filename and determines whether it should be downloaded; None means any file :param chunk_size: the IO chunk size """ if conn.modules.os.path.isdir(remotepath): download_dir(conn, remotepath, localpath, filter, chunk_size) elif conn.modules.os.path.isfile(remotepath): download_file(conn, remotepath, localpath, chunk_size) else: if not ignore_invalid: raise ValueError(f"cannot download {remotepath!r}")
def download_file(conn, remotepath, localpath, chunk_size=STREAM_CHUNK): with conn.builtin.open(remotepath, "rb") as rf: with open(localpath, "wb") as lf: while True: buf = rf.read(chunk_size) if not buf: break lf.write(buf) def download_dir(conn, remotepath, localpath, filter=None, chunk_size=STREAM_CHUNK): if not os.path.isdir(localpath): os.makedirs(localpath) for fn in conn.modules.os.listdir(remotepath): if not filter or filter(fn): rfn = conn.modules.os.path.join(remotepath, fn) lfn = os.path.join(localpath, fn) download(conn, rfn, lfn, filter=filter, ignore_invalid=True, chunk_size=chunk_size)
[docs] def upload_package(conn, module, remotepath=None, chunk_size=STREAM_CHUNK): """ uploads a module or a package to the remote party :param conn: the RPyC connection to use :param module: the local module/package object to upload :param remotepath: the remote path (if ``None``, will default to the remote system's python library (as reported by ``distutils``) :param chunk_size: the IO chunk size .. note:: ``upload_module`` is just an alias to ``upload_package`` example:: import foo.bar ... rpyc.classic.upload_package(conn, foo.bar) """ if remotepath is None: site = conn.modules["distutils.sysconfig"].get_python_lib() remotepath = conn.modules.os.path.join(site, module.__name__) localpath = os.path.dirname(os.path.abspath(inspect.getsourcefile(module))) upload(conn, localpath, remotepath, chunk_size=chunk_size)
upload_module = upload_package
[docs] def obtain(proxy): """obtains (copies) a remote object from a proxy object. the object is ``pickled`` on the remote side and ``unpickled`` locally, thus moved **by value**. changes made to the local object will not reflect remotely. :param proxy: an RPyC proxy object .. note:: the remote object to must be ``pickle``-able :returns: a copy of the remote object """ return pickle.loads(pickle.dumps(proxy))
[docs] def deliver(conn, localobj): """delivers (recreates) a local object on the other party. the object is ``pickled`` locally and ``unpickled`` on the remote side, thus moved **by value**. changes made to the remote object will not reflect locally. :param conn: the RPyC connection :param localobj: the local object to deliver .. note:: the object must be ``picklable`` :returns: a proxy to the remote object """ # bytes-cast needed for IronPython-to-CPython communication, see #251: return conn.modules["rpyc.lib.compat"].pickle.loads( bytes(pickle.dumps(localobj)))
[docs] @contextmanager def redirected_stdio(conn): r""" Redirects the other party's ``stdin``, ``stdout`` and ``stderr`` to those of the local party, so remote IO will occur locally. Example usage:: with redirected_stdio(conn): conn.modules.sys.stdout.write("hello\n") # will be printed locally """ orig_stdin = conn.modules.sys.stdin orig_stdout = conn.modules.sys.stdout orig_stderr = conn.modules.sys.stderr try: conn.modules.sys.stdin = sys.stdin conn.modules.sys.stdout = sys.stdout conn.modules.sys.stderr = sys.stderr yield finally: conn.modules.sys.stdin = orig_stdin conn.modules.sys.stdout = orig_stdout conn.modules.sys.stderr = orig_stderr
[docs] def pm(conn): """same as ``pdb.pm()`` but on a remote exception :param conn: the RPyC connection """ # pdb.post_mortem(conn.root.getconn()._last_traceback) with redirected_stdio(conn): conn.modules.pdb.post_mortem(conn.root.getconn()._last_traceback)
[docs] def interact(conn, namespace=None): """remote interactive interpreter :param conn: the RPyC connection :param namespace: the namespace to use (a ``dict``) """ if namespace is None: namespace = {} namespace["conn"] = conn with redirected_stdio(conn): conn.execute("""def _rinteract(ns): import code code.interact(local = dict(ns))""") conn.namespace["_rinteract"](namespace)
[docs] class MockClassicConnection(object): """Mock classic RPyC connection object. Useful when you want the same code to run remotely or locally. """ def __init__(self): self.root = Slave() ClassicService._install(self, self.root)
[docs] def teleport_function(conn, func, globals=None, def_=True): """ "Teleports" a function (including nested functions/closures) over the RPyC connection. The function is passed in bytecode form and reconstructed on the other side. The function cannot have non-brinable defaults (e.g., ``def f(x, y=[8]):``, since a ``list`` isn't brinable), or make use of non-builtin globals (like modules). You can overcome the second restriction by moving the necessary imports into the function body, e.g. :: def f(x, y): import os return (os.getpid() + y) * x .. note:: While it is not forbidden to "teleport" functions across different Python versions, it *may* result in errors due to Python bytecode differences. It is recommended to ensure both the client and the server are of the same Python version when using this function. :param conn: the RPyC connection :param func: the function object to be delivered to the other party """ if globals is None: globals = conn.namespace from rpyc.utils.teleportation import export_function exported = export_function(func) return conn.modules["rpyc.utils.teleportation"].import_function( exported, globals, def_)