*Channel* is an abstraction layer over streams that works with *packets of data*,
rather than an endless stream of bytes, and adds support for compression.
from rpyc.lib import safe_import
from rpyc.lib.compat import Struct, BYTES_LITERAL
zlib = safe_import("zlib")
# * 64 bit length field?
# * separate \n into a FlushingChannel subclass?
# * add thread safety as a subclass?
Note: In order to avoid problems with all sorts of line-buffered transports,
we deliberately add ``\\n`` at the end of each frame.
COMPRESSION_THRESHOLD = 3000
COMPRESSION_LEVEL = 1
FRAME_HEADER = Struct("!LB")
FLUSHER = BYTES_LITERAL("\n") # cause any line-buffered layers below us to flush
__slots__ = ["stream", "compress"]
def __init__(self, stream, compress = True):
self.stream = stream
if not zlib:
compress = False
self.compress = compress
[docs] def close(self):
"""closes the channel and underlying stream"""
"""indicates whether the underlying stream has been closed"""
[docs] def fileno(self):
"""returns the file descriptor of the underlying stream"""
[docs] def poll(self, timeout):
"""polls the underlying steam for data, waiting up to *timeout* seconds"""
[docs] def recv(self):
"""Receives the next packet (or *frame*) from the underlying stream.
This method will block until the packet has been read completely
:returns: string of data
header = self.stream.read(self.FRAME_HEADER.size)
length, compressed = self.FRAME_HEADER.unpack(header)
data = self.stream.read(length + len(self.FLUSHER))[:-len(self.FLUSHER)]
data = zlib.decompress(data)
[docs] def send(self, data):
"""Sends the given string of data as a packet over the underlying
stream. Blocks until the packet has been sent.
:param data: the byte string to send as a packet
if self.compress and len(data) > self.COMPRESSION_THRESHOLD:
compressed = 1
data = zlib.compress(data, self.COMPRESSION_LEVEL)
compressed = 0
header = self.FRAME_HEADER.pack(len(data), compressed)
buf = header + data + self.FLUSHER