Source code for sprockets.mixins.mediatype.transcoders
"""
Bundled media type transcoders.
- :class:`.JSONTranscoder` implements JSON encoding/decoding
- :class:`.MsgPackTranscoder` implements msgpack encoding/decoding
- :class:`.FormUrlEncodedTranscoder` implements the venerable form encoding
"""
from __future__ import annotations
import base64
import dataclasses
import json
import string
import typing
import urllib.parse
import uuid
import collections.abc
try:
import umsgpack
except ImportError: # pragma: no cover
umsgpack = None # type: ignore
from sprockets.mixins.mediatype import handlers, type_info
_FORM_URLENCODING = {c: '%{:02X}'.format(c) for c in range(0, 255)}
_FORM_URLENCODING.update({ord(c): c for c in string.ascii_letters})
_FORM_URLENCODING.update({ord(c): c for c in string.digits})
_FORM_URLENCODING.update({ord(c): c for c in '*-_.'})
_FORM_URLENCODING_PLUS = _FORM_URLENCODING.copy()
_FORM_URLENCODING_PLUS[ord(' ')] = '+'
[docs]class JSONTranscoder(handlers.TextContentHandler):
"""
JSON transcoder instance.
:param content_type: the content type that this encoder instance
implements. If omitted, ``application/json`` is used. This is
passed directly to the ``TextContentHandler`` initializer.
:param default_encoding: the encoding to use if none is specified.
If omitted, this defaults to ``utf-8``. This is passed directly to
the ``TextContentHandler`` initializer.
This JSON encoder uses :func:`json.loads` and :func:`json.dumps` to
implement JSON encoding/decoding. The :meth:`dump_object` method is
configured to handle types that the standard JSON module does not
support.
.. attribute:: dump_options
Keyword parameters that are passed to :func:`json.dumps` when
:meth:`.dumps` is called. By default, the :meth:`dump_object`
method is enabled as the default object hook.
.. attribute:: load_options
Keyword parameters that are passed to :func:`json.loads` when
:meth:`.loads` is called.
"""
dump_options: typing.Dict[str, typing.Any]
load_options: typing.Dict[str, typing.Any]
def __init__(self,
content_type: str = 'application/json',
default_encoding: str = 'utf-8') -> None:
super().__init__(content_type, self.dumps, self.loads,
default_encoding)
self.dump_options = {
'default': self.dump_object,
'separators': (',', ':'),
}
self.load_options = {}
[docs] def dumps(self, obj: type_info.Serializable) -> str:
"""Dump a :class:`object` instance into a JSON :class:`str`"""
return json.dumps(obj, **self.dump_options)
[docs] def loads(self, str_repr: str) -> type_info.Deserialized:
"""Transform :class:`str` into an :class:`object` instance."""
return typing.cast(type_info.Deserialized,
json.loads(str_repr, **self.load_options))
[docs] def dump_object(self, obj: type_info.Serializable) -> str:
"""
Called to encode unrecognized object.
:param obj: the object to encode
:return: the encoded object
:raises TypeError: when `obj` cannot be encoded
This method is passed as the ``default`` keyword parameter
to :func:`json.dumps`. It provides default representations for
a number of Python language/standard library types.
+----------------------------+---------------------------------------+
| Python Type | String Format |
+----------------------------+---------------------------------------+
| :class:`bytes`, | Base64 encoded string. |
| :class:`bytearray`, | |
| :class:`memoryview` | |
+----------------------------+---------------------------------------+
| :class:`datetime.datetime` | ISO8601 formatted timestamp in the |
| | extended format including separators, |
| | milliseconds, and the timezone |
| | designator. |
+----------------------------+---------------------------------------+
| :class:`uuid.UUID` | Same as ``str(value)`` |
+----------------------------+---------------------------------------+
"""
if isinstance(obj, uuid.UUID):
return str(obj)
if hasattr(obj, 'isoformat'):
return typing.cast(type_info.DefinesIsoFormat, obj).isoformat()
if isinstance(obj, (bytes, bytearray, memoryview)):
return base64.b64encode(obj).decode('ASCII')
raise TypeError('{!r} is not JSON serializable'.format(obj))
[docs]class MsgPackTranscoder(handlers.BinaryContentHandler):
"""
Msgpack Transcoder instance.
:param content_type: the content type that this encoder instance
implements. If omitted, ``application/msgpack`` is used. This
is passed directly to the ``BinaryContentHandler`` initializer.
This transcoder uses the `umsgpack`_ library to encode and decode
objects according to the `msgpack format`_.
.. _umsgpack: https://github.com/vsergeev/u-msgpack-python
.. _msgpack format: http://msgpack.org/index.html
"""
PACKABLE_TYPES = (bool, int, float)
def __init__(self, content_type: str = 'application/msgpack') -> None:
if umsgpack is None:
raise RuntimeError('Cannot import MsgPackTranscoder, '
'umsgpack is not available')
super().__init__(content_type, self.packb, self.unpackb)
[docs] def packb(self, data: type_info.Serializable) -> bytes:
"""Pack `data` into a :class:`bytes` instance."""
return umsgpack.packb(self.normalize_datum(data))
[docs] def unpackb(self, data: bytes) -> type_info.Deserialized:
"""Unpack a :class:`object` from a :class:`bytes` instance."""
return umsgpack.unpackb(data)
[docs] def normalize_datum(
self, datum: type_info.Serializable) -> type_info.MsgPackable:
"""
Convert `datum` into something that umsgpack likes.
:param datum: something that we want to process with umsgpack
:return: a packable version of `datum`
:raises TypeError: if `datum` cannot be packed
This message is called by :meth:`.packb` to recursively normalize
an input value before passing it to :func:`umsgpack.packb`. Values
are normalized according to the following table.
+-----------------------------------+-------------------------------+
| **Value** | **MsgPack Family** |
+-----------------------------------+-------------------------------+
| :data:`None` | `nil byte`_ (0xC0) |
+-----------------------------------+-------------------------------+
| :data:`True` | `true byte`_ (0xC3) |
+-----------------------------------+-------------------------------+
| :data:`False` | `false byte`_ (0xC2) |
+-----------------------------------+-------------------------------+
| :class:`int` | `integer family`_ |
+-----------------------------------+-------------------------------+
| :class:`float` | `float family`_ |
+-----------------------------------+-------------------------------+
| String | `str family`_ |
+-----------------------------------+-------------------------------+
| :class:`bytes` | `bin family`_ |
+-----------------------------------+-------------------------------+
| :class:`bytearray` | `bin family`_ |
+-----------------------------------+-------------------------------+
| :class:`memoryview` | `bin family`_ |
+-----------------------------------+-------------------------------+
| :class:`collections.abc.Sequence` | `array family`_ |
+-----------------------------------+-------------------------------+
| :class:`collections.abc.Set` | `array family`_ |
+-----------------------------------+-------------------------------+
| :class:`collections.abc.Mapping` | `map family`_ |
+-----------------------------------+-------------------------------+
| :class:`uuid.UUID` | Converted to String |
+-----------------------------------+-------------------------------+
.. _nil byte: https://github.com/msgpack/msgpack/blob/
0b8f5ac67cdd130f4d4d4fe6afb839b989fdb86a/spec.md#formats-nil
.. _true byte: https://github.com/msgpack/msgpack/blob/
0b8f5ac67cdd130f4d4d4fe6afb839b989fdb86a/spec.md#bool-format-family
.. _false byte: https://github.com/msgpack/msgpack/blob/
0b8f5ac67cdd130f4d4d4fe6afb839b989fdb86a/spec.md#bool-format-family
.. _integer family: https://github.com/msgpack/msgpack/blob/
0b8f5ac67cdd130f4d4d4fe6afb839b989fdb86a/spec.md#int-format-family
.. _float family: https://github.com/msgpack/msgpack/blob/
0b8f5ac67cdd130f4d4d4fe6afb839b989fdb86a/spec.md#float-format-family
.. _str family: https://github.com/msgpack/msgpack/blob/
0b8f5ac67cdd130f4d4d4fe6afb839b989fdb86a/spec.md#str-format-family
.. _array family: https://github.com/msgpack/msgpack/blob/
0b8f5ac67cdd130f4d4d4fe6afb839b989fdb86a/spec.md#array-format-family
.. _map family: https://github.com/msgpack/msgpack/blob/
0b8f5ac67cdd130f4d4d4fe6afb839b989fdb86a/spec.md
#mapping-format-family
.. _bin family: https://github.com/msgpack/msgpack/blob/
0b8f5ac67cdd130f4d4d4fe6afb839b989fdb86a/spec.md#bin-format-family
"""
if datum is None:
return datum
if isinstance(datum, self.PACKABLE_TYPES):
return datum
if isinstance(datum, uuid.UUID):
datum = str(datum)
if isinstance(datum, bytearray):
datum = bytes(datum)
if isinstance(datum, memoryview):
datum = datum.tobytes()
if hasattr(datum, 'isoformat'):
datum = typing.cast(type_info.DefinesIsoFormat, datum).isoformat()
if isinstance(datum, (bytes, str)):
return datum
if isinstance(datum, (collections.abc.Sequence, collections.abc.Set)):
return [self.normalize_datum(item) for item in datum]
if isinstance(datum, collections.abc.Mapping):
out = {}
for k, v in datum.items():
out[k] = self.normalize_datum(v)
return out
raise TypeError('{} is not msgpackable'.format(
datum.__class__.__name__))
[docs]@dataclasses.dataclass
class FormUrlEncodingOptions:
"""Configuration knobs for :class:`.FormUrlEncodedTranscoder`"""
encoding: str = 'utf-8'
"""Encoding use when generating the byte stream from character data."""
encode_sequences: bool = False
"""Encode sequence values as multiple name=value instances."""
literal_mapping: dict[typing.Literal[None, True, False],
str] = dataclasses.field(default_factory=lambda: {
None: '',
True: 'true',
False: 'false'
})
"""Mapping from supported literal values to strings."""
space_as_plus: bool = False
"""Quote spaces as ``%20`` or ``+``."""
[docs]class FormUrlEncodedTranscoder:
"""Opinionated transcoder for the venerable x-www-formurlencoded.
:param encoding_options: keyword parameters are used to initialize
:class:`FormUrlEncodingOptions`
This transcoder implements transcoding according to the current
W3C documentation. The encoding interface takes mappings or
sequences of pairs and encodes both the name and value. The
following table describes how each supported type is encoded.
+----------------------------+---------------------------------------+
| Value / Type | Encoding |
+============================+=======================================+
| character strings | UTF-8 codepoints before percent- |
| | encoding the resulting bytes |
+----------------------------+---------------------------------------+
| space character | ``%20`` or ``+`` |
+----------------------------+---------------------------------------+
| :data:`False` | ``false`` |
+----------------------------+---------------------------------------+
| :data:`True` | ``true`` |
+----------------------------+---------------------------------------+
| :data:`None` | the empty string |
+----------------------------+---------------------------------------+
| numbers | ``str(n)`` |
+----------------------------+---------------------------------------+
| byte sequences | percent-encoded bytes |
+----------------------------+---------------------------------------+
| :class:`uuid.UUID` | ``str(u)`` |
+----------------------------+---------------------------------------+
| :class:`datetime.datetime` | result of calling |
| | :meth:`~datetime.datetime.isoformat` |
+----------------------------+---------------------------------------+
https://url.spec.whatwg.org/#application/x-www-form-urlencoded
.. warning::
Types that are not explicitly mentioned above will result in
:meth:`to_bytes` simply calling ``str(value)`` and encoding
the result. This causes nested sequences to be encoded as
their ``repr``. For example, encoding ``{'a': [1, 2]}`` will
result in ``a=%5B1%2C%202%5D``. This matches what
:func:`urllib.parse.urlencode` does by default.
Better support for sequence values can be enabled by setting
the :attr:`~FormUrlEncodingOptions.encode_sequences` attribute
of :attr:`.options`. This mimics the ``doseq`` parameter of
:func:`urllib,parse.urlencode`.
.. attribute:: options
:type: FormUrlEncodingOptions
Controls the behavior of the transcoder
"""
content_type = 'application/x-www-formurlencoded'
def __init__(self, **encoding_options: typing.Any) -> None:
self.options = FormUrlEncodingOptions(**encoding_options)
[docs] def to_bytes(
self,
inst_data: type_info.Serializable,
encoding: typing.Optional[str] = None) -> typing.Tuple[str, bytes]:
"""Serialize `inst_data` into a byte stream and content type spec.
:param inst_data: the data to serialize
:param encoding: optional encoding override
Serialization is implemented as described in the W3C
`urlencoded serialization`_ algorithm. The :attr:`.options`
attribute controls the configurable details of the encoding
process.
The character encoding can be further overridden by specifying the
`encoding` parameter.
:returns: tuple of the content type and the resulting bytes
:raises: :exc:`TypeError` if a supplied value cannot be serialized
.. _urlencoded serialization: https://url.spec.whatwg.org/
#urlencoded-serializing
"""
# Select the appropriate encoding table and use the default
# character encoding if necessary. Binding these to local
# names removes branches from the inner loop.
chr_map: typing.Mapping[int, str]
chr_map = (_FORM_URLENCODING_PLUS
if self.options.space_as_plus else _FORM_URLENCODING)
if encoding is None:
encoding = self.options.encoding
# Generate a sequence of name+value tuples to encode or
# directly encode primitives
try:
tuples = self._convert_to_tuple_sequence(inst_data)
except TypeError:
# hopefully this is a primitive ... if not then the
# call to _encode will fail below
tuples = [(inst_data, None)]
prefix = '' # another micro-optimization
buf = []
for name, value in tuples:
buf.append(prefix)
buf.extend(self._encode(name, chr_map, encoding))
if value is not None:
buf.append('=')
buf.extend(self._encode(value, chr_map, encoding))
prefix = '&'
encoded = ''.join(buf)
return self.content_type, encoded.encode('ascii')
[docs] def from_bytes(
self,
data_bytes: bytes,
encoding: typing.Optional[str] = None) -> type_info.Deserialized:
"""Deserialize `bytes` into a Python object instance.
:param data_bytes: byte string to deserialize
:param encoding: optional encoding override
Deserialization is implemented according to the W3C
`urlencoded deserialization`_ algorithm. The :attr:`.options`
attribute controls the configurable details of the encoding
process.
:returns: the decoded Python object
.. _urlencoded deserialization: https://url.spec.whatwg.org/
#urlencoded-parsing
"""
dequote = (urllib.parse.unquote_plus
if self.options.space_as_plus else urllib.parse.unquote)
if encoding is None:
encoding = self.options.encoding
output = []
for part in data_bytes.decode('ascii').split('&'):
if not part:
continue
name, eq_present, value = part.partition('=')
name = dequote(name, encoding=encoding)
if eq_present:
output.append((name, dequote(value, encoding=encoding)))
else:
output.append((name, ''))
return dict(output)
def _encode(self, datum: typing.Union[bool, None, float, int, str,
type_info.DefinesIsoFormat],
char_map: typing.Mapping[int, str], encoding: str) -> str:
if isinstance(datum, str):
pass # optimization: skip additional checks for strings
elif (isinstance(datum, (float, int, str, uuid.UUID))
and not isinstance(datum, bool)):
datum = str(datum)
elif (isinstance(datum, collections.abc.Hashable)
and datum in self.options.literal_mapping):
# the isinstance Hashable check confuses mypy
datum = self.options.literal_mapping[datum] # type: ignore
elif isinstance(datum, (bytearray, bytes, memoryview)):
return ''.join(char_map[c] for c in datum)
elif isinstance(datum, type_info.DefinesIsoFormat):
datum = datum.isoformat()
else:
datum = str(datum)
return ''.join(char_map[c] for c in datum.encode(encoding))
def _convert_to_tuple_sequence(
self, value: type_info.Serializable
) -> typing.Iterable[typing.Tuple[typing.Any, typing.Any]]:
tuples: typing.Iterable[typing.Tuple[typing.Any, typing.Any]]
if isinstance(value, collections.abc.Mapping):
tuples = value.items()
else:
try:
tuples = [(a, b) for a, b in value] # type: ignore
except (TypeError, ValueError):
raise TypeError('Cannot convert value to sequence of tuples')
if self.options.encode_sequences:
out_tuples = []
for a, b in tuples:
if (not isinstance(b, (bytes, bytearray, memoryview, str))
and isinstance(b, collections.abc.Iterable)):
for value in b:
out_tuples.append((a, value))
else:
out_tuples.append((a, b))
tuples = out_tuples
return tuples