"""
Helpers and wrappers for common RPyC tasks
"""
import time
from rpyc.lib import spawn
from rpyc.lib.colls import WeakValueDict
from rpyc.lib.compat import callable
from rpyc.core.consts import HANDLE_BUFFITER, HANDLE_CALL
from rpyc.core.netref import syncreq, asyncreq
[docs]def buffiter(obj, chunk=10, max_chunk=1000, factor=2):
"""Buffered iterator - reads the remote iterator in chunks starting with
*chunk*, multiplying the chunk size by *factor* every time, as an
exponential-backoff, up to a chunk of *max_chunk* size.
``buffiter`` is very useful for tight loops, where you fetch an element
from the other side with every iterator. Instead of being limited by the
network's latency after every iteration, ``buffiter`` fetches a "chunk"
of elements every time, reducing the amount of network I/Os.
:param obj: An iterable object (supports ``iter()``)
:param chunk: the initial chunk size
:param max_chunk: the maximal chunk size
:param factor: the factor by which to multiply the chunk size after every
iterator (up to *max_chunk*). Must be >= 1.
:returns: an iterator
Example::
cursor = db.get_cursor()
for id, name, dob in buffiter(cursor.select("Id", "Name", "DoB")):
print id, name, dob
"""
if factor < 1:
raise ValueError(f"factor must be >= 1, got {factor!r}")
it = iter(obj)
count = chunk
while True:
items = syncreq(it, HANDLE_BUFFITER, count)
count = min(count * factor, max_chunk)
if not items:
break
for elem in items:
yield elem
[docs]def restricted(obj, attrs, wattrs=None):
"""Returns a 'restricted' version of an object, i.e., allowing access only to a subset of its
attributes. This is useful when returning a "broad" or "dangerous" object, where you don't
want the other party to have access to all of its attributes.
.. versionadded:: 3.2
:param obj: any object
:param attrs: the set of attributes exposed for reading (``getattr``) or writing (``setattr``).
The same set will serve both for reading and writing, unless wattrs is explicitly
given.
:param wattrs: the set of attributes exposed for writing (``setattr``). If ``None``,
``wattrs`` will default to ``attrs``. To disable setting attributes completely,
set to an empty tuple ``()``.
:returns: a restricted view of the object
Example::
class MyService(rpyc.Service):
def exposed_open(self, filename):
f = open(filename, "r")
return rpyc.restricted(f, {"read", "close"}) # disallow access to `seek` or `write`
"""
if wattrs is None:
wattrs = attrs
class Restricted(object):
def _rpyc_getattr(self, name):
if name not in attrs:
raise AttributeError(name)
return getattr(obj, name)
__getattr__ = _rpyc_getattr
def _rpyc_setattr(self, name, value):
if name not in wattrs:
raise AttributeError(name)
setattr(obj, name, value)
__setattr__ = _rpyc_setattr
return Restricted()
class _Async(object):
"""Creates an async proxy wrapper over an existing proxy. Async proxies
are cached. Invoking an async proxy will return an AsyncResult instead of
blocking"""
__slots__ = ("proxy", "__weakref__")
def __init__(self, proxy):
self.proxy = proxy
def __call__(self, *args, **kwargs):
return asyncreq(self.proxy, HANDLE_CALL, args, tuple(kwargs.items()))
def __repr__(self):
return f"async_({self.proxy!r})"
_async_proxies_cache = WeakValueDict()
[docs]def async_(proxy):
"""
Returns an asynchronous "version" of the given proxy. Invoking the returned
proxy will not block; instead it will return an
:class:`rpyc.core.async_.AsyncResult` object that you can test for completion
:param proxy: any **callable** RPyC proxy
:returns: the proxy, wrapped by an asynchronous wrapper
Example::
async_sleep = rpyc.async_(conn.modules.time.sleep)
res = async_sleep(5)
.. _async_note:
.. note::
In order to avoid overloading the GC, the returned asynchronous wrapper is
cached as a weak reference. Therefore, do not use::
rpyc.async_(foo)(5)
Always store the returned asynchronous wrapper in a variable, e.g. ::
a_foo = rpyc.async_(foo)
a_foo(5)
.. note::
Furthermore, async requests provide **no guarantee on execution
order**. In particular, multiple subsequent async requests may be
executed in reverse order.
"""
pid = id(proxy)
if pid in _async_proxies_cache:
return _async_proxies_cache[pid]
if not hasattr(proxy, "____conn__") or not hasattr(proxy, "____id_pack__"):
raise TypeError(f"'proxy' must be a Netref: {proxy!r}")
if not callable(proxy):
raise TypeError(f"'proxy' must be callable: {proxy!r}")
caller = _Async(proxy)
_async_proxies_cache[id(caller)] = _async_proxies_cache[pid] = caller
return caller
async_.__doc__ = _Async.__doc__
globals()['async'] = async_ # backward compatibility alias
[docs]class timed(object):
"""Creates a timed asynchronous proxy. Invoking the timed proxy will
run in the background and will raise an :class:`rpyc.core.async_.AsyncResultTimeout`
exception if the computation does not terminate within the given time frame
:param proxy: any **callable** RPyC proxy
:param timeout: the maximal number of seconds to allow the operation to run
:returns: a ``timed`` wrapped proxy
Example::
t_sleep = rpyc.timed(conn.modules.time.sleep, 6) # allow up to 6 seconds
t_sleep(4) # okay
t_sleep(8) # will time out and raise AsyncResultTimeout
"""
__slots__ = ("__weakref__", "proxy", "timeout")
def __init__(self, proxy, timeout):
self.proxy = async_(proxy)
self.timeout = timeout
def __call__(self, *args, **kwargs):
res = self.proxy(*args, **kwargs)
res.set_expiry(self.timeout)
return res
def __repr__(self):
return f"timed({self.proxy.proxy!r}, {self.timeout!r})"
[docs]class BgServingThread(object):
"""Runs an RPyC server in the background to serve all requests and replies
that arrive on the given RPyC connection. The thread is started upon the
the instantiation of the ``BgServingThread`` object; you can use the
:meth:`stop` method to stop the server thread
Example::
conn = rpyc.connect(...)
bg_server = BgServingThread(conn)
...
bg_server.stop()
.. note::
For a more detailed explanation of asynchronous operation and the role of the
``BgServingThread``, see :ref:`tut5`
"""
# these numbers are magical...
SERVE_INTERVAL = 0.0
SLEEP_INTERVAL = 0.1
def __init__(self, conn, callback=None, serve_interval=SERVE_INTERVAL, sleep_interval=SLEEP_INTERVAL):
self._conn = conn
self._active = True
self._callback = callback
self._serve_interval = serve_interval
self._sleep_interval = sleep_interval
self._thread = spawn(self._bg_server)
def __del__(self):
if self._active:
self.stop()
def _bg_server(self):
try:
while self._active:
self._conn.serve(self._serve_interval)
time.sleep(self._sleep_interval) # to reduce contention
except Exception:
if self._active:
self._active = False
if self._callback is None:
raise
self._callback()
[docs] def stop(self):
"""stop the server thread. once stopped, it cannot be resumed. you will
have to create a new BgServingThread object later."""
assert self._active
self._active = False
self._thread.join()
self._conn = None
[docs]def classpartial(*args, **kwargs):
"""Bind arguments to a class's __init__."""
cls, args = args[0], args[1:]
class Partial(cls):
__doc__ = cls.__doc__
def __new__(self):
return cls(*args, **kwargs)
Partial.__name__ = cls.__name__
return Partial