"""*Brine* is a simple, fast and secure object serializer for **immutable** objects.
The following types are supported: ``int``, ``bool``, ``str``, ``float``,
``unicode``, ``bytes``, ``slice``, ``complex``, ``tuple`` (of simple types),
``frozenset`` (of simple types) as well as the following singletons: ``None``,
``NotImplemented``, and ``Ellipsis``.
Example::
>>> x = ("he", 7, u"llo", 8, (), 900, None, True, Ellipsis, 18.2, 18.2j + 13,
... slice(1,2,3), frozenset([5,6,7]), NotImplemented)
>>> dumpable(x)
True
>>> y = dump(x)
>>> y.encode("hex")
'140e0b686557080c6c6c6f580216033930300003061840323333333333331b402a000000000000403233333333333319125152531a1255565705'
>>> z = load(y)
>>> x == z
True
"""
from rpyc.lib.compat import Struct, BytesIO, BYTES_LITERAL
# singletons
TAG_NONE = b"\x00"
TAG_EMPTY_STR = b"\x01"
TAG_EMPTY_TUPLE = b"\x02"
TAG_TRUE = b"\x03"
TAG_FALSE = b"\x04"
TAG_NOT_IMPLEMENTED = b"\x05"
TAG_ELLIPSIS = b"\x06"
# types
TAG_UNICODE = b"\x08"
# deprecated w/ py2 support TAG_LONG = b"\x09"
TAG_STR1 = b"\x0a"
TAG_STR2 = b"\x0b"
TAG_STR3 = b"\x0c"
TAG_STR4 = b"\x0d"
TAG_STR_L1 = b"\x0e"
TAG_STR_L4 = b"\x0f"
TAG_TUP1 = b"\x10"
TAG_TUP2 = b"\x11"
TAG_TUP3 = b"\x12"
TAG_TUP4 = b"\x13"
TAG_TUP_L1 = b"\x14"
TAG_TUP_L4 = b"\x15"
TAG_INT_L1 = b"\x16"
TAG_INT_L4 = b"\x17"
TAG_FLOAT = b"\x18"
TAG_SLICE = b"\x19"
TAG_FSET = b"\x1a"
TAG_COMPLEX = b"\x1b"
IMM_INTS = dict((i, bytes([i + 0x50])) for i in range(-0x30, 0xa0))
# Below "!" is used to set byte order as network (= big-endian). See https://docs.python.org/3/library/struct.html
F8 = Struct("!d") # Python type float w/ size [8] (ctype double)
C16 = Struct("!dd") # Successive floats (complex numbers)
I1 = Struct("!B") # Python type int w/ size [1] (ctype unsigned char)
I4 = Struct("!L") # Python type int w/ size [4] (ctype unsigned long)
# I8I8 is successive ints w/ size 8 and was introduced to pack local thread id and remote thread id. Since
# PyThread_get_thread_ident returns a type of unsigned long, a platform dependent size, we
# need 8 bytes of length to support LP64/64-bit platforms. See
# - https://unix.org/whitepapers/64bit.html
# - https://en.wikipedia.org/wiki/Integer_(computer_science)#Long_integer
# TODO: Switch to native_id when 3.7 is EOL b/c PyThread_get_thread_ident is inheritly hosed due to casting.
I8I8 = Struct("!QQ")
_dump_registry = {}
_load_registry = {}
IMM_INTS_LOADER = dict((v, k) for k, v in IMM_INTS.items())
def register(coll, key):
def deco(func):
coll[key] = func
return func
return deco
# ===============================================================================
# dumping
# ===============================================================================
@register(_dump_registry, type(None))
def _dump_none(obj, stream):
stream.append(TAG_NONE)
@register(_dump_registry, type(NotImplemented))
def _dump_notimplemeted(obj, stream):
stream.append(TAG_NOT_IMPLEMENTED)
@register(_dump_registry, type(Ellipsis))
def _dump_ellipsis(obj, stream):
stream.append(TAG_ELLIPSIS)
@register(_dump_registry, bool)
def _dump_bool(obj, stream):
if obj:
stream.append(TAG_TRUE)
else:
stream.append(TAG_FALSE)
@register(_dump_registry, slice)
def _dump_slice(obj, stream):
stream.append(TAG_SLICE)
_dump((obj.start, obj.stop, obj.step), stream)
@register(_dump_registry, frozenset)
def _dump_frozenset(obj, stream):
stream.append(TAG_FSET)
_dump(tuple(obj), stream)
@register(_dump_registry, int)
def _dump_int(obj, stream):
if obj in IMM_INTS:
stream.append(IMM_INTS[obj])
else:
obj = BYTES_LITERAL(str(obj))
lenobj = len(obj)
if lenobj < 256:
stream.append(TAG_INT_L1 + I1.pack(lenobj) + obj)
else:
stream.append(TAG_INT_L4 + I4.pack(lenobj) + obj)
@register(_dump_registry, float)
def _dump_float(obj, stream):
stream.append(TAG_FLOAT + F8.pack(obj))
@register(_dump_registry, complex)
def _dump_complex(obj, stream):
stream.append(TAG_COMPLEX + C16.pack(obj.real, obj.imag))
@register(_dump_registry, bytes)
def _dump_bytes(obj, stream):
lenobj = len(obj)
if lenobj == 0:
stream.append(TAG_EMPTY_STR)
elif lenobj == 1:
stream.append(TAG_STR1 + obj)
elif lenobj == 2:
stream.append(TAG_STR2 + obj)
elif lenobj == 3:
stream.append(TAG_STR3 + obj)
elif lenobj == 4:
stream.append(TAG_STR4 + obj)
elif lenobj < 256:
stream.append(TAG_STR_L1 + I1.pack(lenobj) + obj)
else:
stream.append(TAG_STR_L4 + I4.pack(lenobj) + obj)
@register(_dump_registry, type(u""))
def _dump_str(obj, stream):
stream.append(TAG_UNICODE)
_dump_bytes(obj.encode("utf8"), stream)
@register(_dump_registry, tuple)
def _dump_tuple(obj, stream):
lenobj = len(obj)
if lenobj == 0:
stream.append(TAG_EMPTY_TUPLE)
elif lenobj == 1:
stream.append(TAG_TUP1)
elif lenobj == 2:
stream.append(TAG_TUP2)
elif lenobj == 3:
stream.append(TAG_TUP3)
elif lenobj == 4:
stream.append(TAG_TUP4)
elif lenobj < 256:
stream.append(TAG_TUP_L1 + I1.pack(lenobj))
else:
stream.append(TAG_TUP_L4 + I4.pack(lenobj))
for item in obj:
_dump(item, stream)
def _undumpable(obj, stream):
raise TypeError(f"cannot dump {obj}")
def _dump(obj, stream):
_dump_registry.get(type(obj), _undumpable)(obj, stream)
# ===============================================================================
# loading
# ===============================================================================
@register(_load_registry, TAG_NONE)
def _load_none(stream):
return None
@register(_load_registry, TAG_NOT_IMPLEMENTED)
def _load_nonimp(stream):
return NotImplemented
@register(_load_registry, TAG_ELLIPSIS)
def _load_elipsis(stream):
return Ellipsis
@register(_load_registry, TAG_TRUE)
def _load_true(stream):
return True
@register(_load_registry, TAG_FALSE)
def _load_false(stream):
return False
@register(_load_registry, TAG_EMPTY_TUPLE)
def _load_empty_tuple(stream):
return ()
@register(_load_registry, TAG_EMPTY_STR)
def _load_empty_str(stream):
return b""
@register(_load_registry, TAG_FLOAT)
def _load_float(stream):
return F8.unpack(stream.read(8))[0]
@register(_load_registry, TAG_COMPLEX)
def _load_complex(stream):
real, imag = C16.unpack(stream.read(16))
return complex(real, imag)
@register(_load_registry, TAG_STR1)
def _load_str1(stream):
return stream.read(1)
@register(_load_registry, TAG_STR2)
def _load_str2(stream):
return stream.read(2)
@register(_load_registry, TAG_STR3)
def _load_str3(stream):
return stream.read(3)
@register(_load_registry, TAG_STR4)
def _load_str4(stream):
return stream.read(4)
@register(_load_registry, TAG_STR_L1)
def _load_str_l1(stream):
l, = I1.unpack(stream.read(1))
return stream.read(l)
@register(_load_registry, TAG_STR_L4)
def _load_str_l4(stream):
l, = I4.unpack(stream.read(4))
return stream.read(l)
@register(_load_registry, TAG_UNICODE)
def _load_unicode(stream):
obj = _load(stream)
return obj.decode("utf-8")
@register(_load_registry, TAG_TUP1)
def _load_tup1(stream):
return (_load(stream),)
@register(_load_registry, TAG_TUP2)
def _load_tup2(stream):
return (_load(stream), _load(stream))
@register(_load_registry, TAG_TUP3)
def _load_tup3(stream):
return (_load(stream), _load(stream), _load(stream))
@register(_load_registry, TAG_TUP4)
def _load_tup4(stream):
return (_load(stream), _load(stream), _load(stream), _load(stream))
@register(_load_registry, TAG_TUP_L1)
def _load_tup_l1(stream):
l, = I1.unpack(stream.read(1))
return tuple(_load(stream) for i in range(l))
@register(_load_registry, TAG_TUP_L4)
def _load_tup_l4(stream):
l, = I4.unpack(stream.read(4))
return tuple(_load(stream) for i in range(l))
@register(_load_registry, TAG_SLICE)
def _load_slice(stream):
start, stop, step = _load(stream)
return slice(start, stop, step)
@register(_load_registry, TAG_FSET)
def _load_frozenset(stream):
return frozenset(_load(stream))
@register(_load_registry, TAG_INT_L1)
def _load_int_l1(stream):
l, = I1.unpack(stream.read(1))
return int(stream.read(l))
@register(_load_registry, TAG_INT_L4)
def _load_int_l4(stream):
l, = I4.unpack(stream.read(4))
return int(stream.read(l))
def _load(stream):
tag = stream.read(1)
if tag in IMM_INTS_LOADER:
return IMM_INTS_LOADER[tag]
return _load_registry.get(tag)(stream)
# ===============================================================================
# API
# ===============================================================================
[docs]def dump(obj):
"""Converts (dumps) the given object to a byte-string representation
:param obj: any :func:`dumpable` object
:returns: a byte-string representation of the object
"""
stream = []
_dump(obj, stream)
return b"".join(stream)
[docs]def load(data):
"""Recreates (loads) an object from its byte-string representation
:param data: the byte-string representation of an object
:returns: the dumped object
"""
stream = BytesIO(data)
return _load(stream)
simple_types = frozenset([type(None), int, bool, float, bytes, str, complex, type(NotImplemented), type(Ellipsis)])
[docs]def dumpable(obj):
"""Indicates whether the given object is *dumpable* by brine
:returns: ``True`` if the object is dumpable (e.g., :func:`dump` would succeed),
``False`` otherwise
"""
if type(obj) in simple_types:
return True
if type(obj) in (tuple, frozenset):
return all(dumpable(item) for item in obj)
if type(obj) is slice:
return dumpable(obj.start) and dumpable(obj.stop) and dumpable(obj.step)
return False
if __name__ == "__main__":
import doctest
doctest.testmod()