Source code for rpyc.core.channel

"""*Channel* is an abstraction layer over streams that works with *packets of data*,
rather than an endless stream of bytes, and adds support for compression.
"""
from rpyc.lib import safe_import
from rpyc.lib.compat import Struct, BYTES_LITERAL
zlib = safe_import("zlib")

# * 64 bit length field?
# * separate \n into a FlushingChannel subclass?
# * add thread safety as a subclass?


[docs] class Channel(object): """Channel implementation. Note: In order to avoid problems with all sorts of line-buffered transports, we deliberately add ``\\n`` at the end of each frame. """ COMPRESSION_THRESHOLD = 3000 COMPRESSION_LEVEL = 1 FRAME_HEADER = Struct("!LB") FLUSHER = BYTES_LITERAL("\n") # cause any line-buffered layers below us to flush __slots__ = ["stream", "compress"] def __init__(self, stream, compress=True): self.stream = stream if not zlib: compress = False self.compress = compress
[docs] def close(self): """closes the channel and underlying stream""" self.stream.close()
@property def closed(self): """indicates whether the underlying stream has been closed""" return self.stream.closed
[docs] def fileno(self): """returns the file descriptor of the underlying stream""" return self.stream.fileno()
[docs] def poll(self, timeout): """polls the underlying steam for data, waiting up to *timeout* seconds""" return self.stream.poll(timeout)
[docs] def recv(self): """Receives the next packet (or *frame*) from the underlying stream. This method will block until the packet has been read completely :returns: string of data """ header = self.stream.read(self.FRAME_HEADER.size) length, compressed = self.FRAME_HEADER.unpack(header) data = self.stream.read(length + len(self.FLUSHER))[:-len(self.FLUSHER)] if compressed: data = zlib.decompress(data) return data
[docs] def send(self, data): """Sends the given string of data as a packet over the underlying stream. Blocks until the packet has been sent. :param data: the byte string to send as a packet """ if self.compress and len(data) > self.COMPRESSION_THRESHOLD: compressed = 1 data = zlib.compress(data, self.COMPRESSION_LEVEL) else: compressed = 0 data_size = len(data) header = self.FRAME_HEADER.pack(data_size, compressed) flush_size = len(self.FLUSHER) if self.FRAME_HEADER.size + data_size + flush_size <= self.stream.MAX_IO_CHUNK: # avoid overhead from socket writes requiring GIL to be held self.stream.write(header + data + self.FLUSHER) else: # Data larger than 64KB, the extra writes are negligible part1 = self.stream.MAX_IO_CHUNK - self.FRAME_HEADER.size self.stream.write(header + data[:part1]) self.stream.write(data[part1:]) self.stream.write(self.FLUSHER)