"""
Content handling for Tornado.
- :func:`.install` creates a settings object and installs it into
the :class:`tornado.web.Application` instance
- :func:`.get_settings` retrieve a :class:`.ContentSettings` object
from a :class:`tornado.web.Application` instance
- :func:`.set_default_content_type` sets the content type that is
used when an ``Accept`` or ``Content-Type`` header is omitted.
- :func:`.add_binary_content_type` register transcoders for a binary
content type
- :func:`.add_text_content_type` register transcoders for a textual
content type
- :func:`.add_transcoder` register a custom transcoder instance
for a content type
- :class:`.ContentSettings` an instance of this is attached to
:class:`tornado.web.Application` to hold the content mapping
information for the application
- :class:`.ContentMixin` attaches a :class:`.ContentSettings`
instance to the application and implements request decoding &
response encoding methods
This module is the primary interface for this library. It exposes
functions for registering new content handlers and a mix-in that
adds content handling methods to :class:`~tornado.web.RequestHandler`
instances.
"""
from __future__ import annotations
import logging
import typing
import warnings
try:
from typing import Literal
except ImportError: # pragma: no cover
# "ignore" is required to avoid an incompatible import
# error due to different bindings of _SpecialForm
from typing_extensions import Literal # type: ignore
from ietfparse import algorithms, datastructures, errors, headers
from tornado import web
from sprockets.mixins.mediatype import handlers, type_info
logger = logging.getLogger(__name__)
SETTINGS_KEY = 'sprockets.mixins.mediatype.ContentSettings'
"""Key in application.settings to store the ContentSettings instance."""
_warning_issued = False
[docs]class ContentSettings:
"""
Content selection settings.
An instance of this class is stashed in ``application.settings``
under the :data:`.SETTINGS_KEY` key. Instead of creating an
instance of this class yourself, use the :func:`.install`
function to install it into the application.
The settings instance contains the list of available content
types and handlers associated with them. Each handler implements
the :class:`~sprockets.mixins.mediatype.type_info.Transcoder`
interface. Use :func:`add_transcoder` to add support for a
specific content type to the application.
This class acts as a mapping from content-type string to the
appropriate handler instance. Add new content types and find
handlers using the common ``dict`` syntax:
.. code-block:: python
class SomeHandler(web.RequestHandler):
def get(self):
settings = ContentSettings.get_settings(self.application)
response_body = settings['application/msgpack'].to_bytes(
response_dict, encoding='utf-8')
self.write(response_body)
def make_application():
app = web.Application([('/', SomeHandler)])
add_transcoder(app, transcoders.JSONTranscoder())
add_transcoder(app, transcoders.MsgPackTranscoder())
return app
Of course, that is quite tedious, so use the :class:`.ContentMixin`
instead of using the settings directly.
"""
default_encoding: typing.Union[str, None]
_available_types: typing.List[datastructures.ContentType]
_default_content_type: typing.Union[str, None]
_handlers: typing.Dict[str, type_info.Transcoder]
def __init__(self) -> None:
self._handlers = {}
self._available_types = []
self._default_content_type = None
self.default_encoding = None
def __getitem__(self, content_type: str) -> type_info.Transcoder:
parsed = headers.parse_content_type(content_type)
return self._handlers[str(parsed)]
def __setitem__(self, content_type: str,
handler: type_info.Transcoder) -> None:
parsed = headers.parse_content_type(content_type)
content_type = str(parsed)
if content_type in self._handlers:
logger.warning('handler for %s already set to %r', content_type,
self._handlers[content_type])
return
self._available_types.append(parsed)
self._handlers[content_type] = handler
[docs] def get(
self,
content_type: str,
default: typing.Union[type_info.Transcoder, None] = None
) -> typing.Union[type_info.Transcoder, None]:
"""Retrieve the handler for a specific content type."""
return self._handlers.get(content_type, default)
@property
def available_content_types(
self) -> typing.Sequence[datastructures.ContentType]:
"""
List of the content types that are registered.
This is a sequence of :class:`ietfparse.datastructures.ContentType`
instances.
"""
return self._available_types
@property
def default_content_type(self) -> typing.Union[str, None]:
return self._default_content_type
@default_content_type.setter
def default_content_type(self, new_value: typing.Union[str, None]) -> None:
if new_value is None:
warnings.warn(
DeprecationWarning(
'Using sprockets.mixins.mediatype without a default'
' content type is deprecated and will become an error'
' in a future version'))
self._default_content_type = new_value
[docs]def install(application: type_info.HasSettings,
default_content_type: typing.Optional[str],
encoding: typing.Optional[str] = None) -> ContentSettings:
"""Install the media type management settings and return it"""
try:
settings = typing.cast(ContentSettings,
application.settings[SETTINGS_KEY])
except KeyError:
settings = application.settings[SETTINGS_KEY] = ContentSettings()
settings.default_content_type = default_content_type
settings.default_encoding = encoding
return settings
@typing.overload
def get_settings(
application: type_info.HasSettings,
force_instance: Literal[False] = False
) -> typing.Union[ContentSettings, None]:
... # pragma: no cover
@typing.overload
def get_settings(application: type_info.HasSettings,
force_instance: Literal[True]) -> ContentSettings:
... # pragma: no cover
[docs]def get_settings(
application: type_info.HasSettings,
force_instance: bool = False) -> typing.Union[ContentSettings, None]:
"""
Retrieve the media type settings for a application.
:param application:
:param force_instance: if :data:`True` then create the
instance if it does not exist
:return: the content settings instance or :data:`None` if
`force_instance` is not :data:`True` and :func:`.install`
has not been called
"""
try:
return typing.cast(ContentSettings, application.settings[SETTINGS_KEY])
except KeyError:
if not force_instance:
return None
return install(application, None)
[docs]def add_binary_content_type(application: type_info.HasSettings,
content_type: str, pack: type_info.PackBFunction,
unpack: type_info.UnpackBFunction) -> None:
"""
Add handler for a binary content type.
:param application: the application to modify
:param content_type: the content type to add
:param pack: function that packs a dictionary to a byte string.
See :any:`type_info.PackBFunction`
:param unpack: function that takes a byte string and returns a
dictionary. See :any:`type_info.UnpackBFunction`
"""
add_transcoder(application,
handlers.BinaryContentHandler(content_type, pack, unpack))
[docs]def add_text_content_type(application: type_info.HasSettings,
content_type: str, default_encoding: str,
dumps: type_info.DumpSFunction,
loads: type_info.LoadSFunction) -> None:
"""
Add handler for a text content type.
:param application: the application to modify
:param content_type: the content type to add
:param default_encoding: encoding to use when one is unspecified
:param dumps: function that dumps a dictionary to a string.
See :any:`type_info.DumpSFunction`
:param loads: function that loads a dictionary from a string.
See :any:`type_info.LoadSFunction`
Note that the ``charset`` parameter is stripped from `content_type`
if it is present.
"""
parsed = headers.parse_content_type(content_type)
parsed.parameters.pop('charset', None)
normalized = str(parsed)
add_transcoder(
application,
handlers.TextContentHandler(normalized, dumps, loads,
default_encoding))
[docs]def add_transcoder(application: type_info.HasSettings,
transcoder: type_info.Transcoder,
content_type: typing.Optional[str] = None) -> None:
"""
Register a transcoder for a specific content type.
:param application: the application to modify
:param transcoder: object that translates between :class:`bytes`
and object instances
:param content_type: the content type to add. If this is
unspecified or :data:`None`, then the transcoder's
``content_type`` attribute is used.
The `transcoder` instance is required to implement the
:class:`~sprockets.mixins.mediatype.type_info.Transcoder`
protocol.
"""
settings = get_settings(application, force_instance=True)
settings[content_type or transcoder.content_type] = transcoder
[docs]def set_default_content_type(application: type_info.HasSettings,
content_type: str,
encoding: typing.Optional[str] = None) -> None:
"""
Store the default content type for an application.
:param application: the application to modify
:param content_type: the content type to default to
:param encoding: encoding to use when one is unspecified
"""
settings = get_settings(application, force_instance=True)
settings.default_content_type = content_type
settings.default_encoding = encoding
[docs]class ContentMixin(web.RequestHandler):
"""
Mix this in to add some content handling methods.
.. code-block:: python
class MyHandler(ContentMixin, web.RequestHandler):
def post(self):
body = self.get_request_body()
# do stuff --> response_dict
self.send_response(response_dict)
:meth:`get_request_body` will deserialize the request data into
a dictionary based on the :http:header:`Content-Type` request
header. Similarly, :meth:`send_response` takes a dictionary,
serializes it based on the :http:header:`Accept` request header
and the application :class:`ContentSettings`, and writes it out,
using ``self.write()``.
"""
def initialize(self) -> None:
super().initialize()
self._request_body: typing.Optional[type_info.Deserialized] = None
self._best_response_match: typing.Optional[str] = None
self._logger = getattr(self, 'logger', logger)
[docs] def get_response_content_type(self) -> typing.Union[str, None]:
"""Select the content type will be used in the response.
This method implements proactive content negotiation as
described in :rfc:`7231#section-3.4.1` using the
:http:header:`Accept` request header or the configured
default content type if the header is not present. The
selected response type is cached and returned. It will
be used when :meth:`.send_response` is called.
Note that this method is called by :meth:`.send_response`
so you will seldom need to call it directly.
"""
if self._best_response_match is None:
settings = get_settings(self.application, force_instance=True)
acceptable = headers.parse_accept(
self.request.headers.get(
'Accept', settings.default_content_type
if settings.default_content_type else '*/*'))
try:
selected, _ = algorithms.select_content_type(
acceptable, settings.available_content_types)
self._best_response_match = '/'.join(
[selected.content_type, selected.content_subtype])
if selected.content_suffix is not None:
self._best_response_match = '+'.join(
[self._best_response_match, selected.content_suffix])
except errors.NoMatch:
self._best_response_match = settings.default_content_type
return self._best_response_match
[docs] def get_request_body(self) -> type_info.Deserialized:
"""
Fetch (and cache) the request body as a dictionary.
:raise tornado.web.HTTPError:
- if the content type cannot be matched, then the status code
is set to 415 Unsupported Media Type.
- if decoding the content body fails, then the status code is
set to 400 Bad Syntax.
"""
if self._request_body is None:
settings = get_settings(self.application, force_instance=True)
content_type = self.request.headers.get(
'Content-Type', settings.default_content_type)
try:
content_type_header = headers.parse_content_type(content_type)
except ValueError:
raise web.HTTPError(400, 'failed to parse content type %s',
content_type)
content_type = '/'.join([
content_type_header.content_type,
content_type_header.content_subtype
])
if content_type_header.content_suffix is not None:
content_type = '+'.join(
[content_type, content_type_header.content_suffix])
try:
handler = settings[content_type]
except KeyError:
raise web.HTTPError(415, 'cannot decode body of type %s',
content_type)
try:
self._request_body = handler.from_bytes(self.request.body)
except Exception:
self._logger.error('failed to decode request body')
raise web.HTTPError(400, 'failed to decode request')
return self._request_body
[docs] def send_response(self,
body: type_info.Serializable,
set_content_type: typing.Optional[bool] = True) -> None:
"""
Serialize and send ``body`` in the response.
:param body: the body to serialize
:param set_content_type: should the :http:header:`Content-Type`
header be set? Defaults to :data:`True`
The transcoder for the response is selected by calling
:meth:`.get_response_content_type` which chooses an
appropriate transcoder based on the :http:header:`Accept`
header from the request.
"""
settings = get_settings(self.application, force_instance=True)
response_type = self.get_response_content_type()
if response_type is None:
self._logger.error('failed to find a suitable response '
'content type for request')
self._logger.error('please set a default content type')
raise web.HTTPError(406)
try:
handler = settings[response_type]
except KeyError:
self._logger.error(
'no transcoder for the selected response content type %s, '
'is the default content type %r correct?', response_type,
settings.default_content_type)
raise web.HTTPError(500)
else:
try:
content_type, data_bytes = handler.to_bytes(body)
except (TypeError, ValueError) as e:
self._logger.error(
'selected transcoder (%s) failed to encode response '
'body: %s', handler.__class__.__name__, e)
raise web.HTTPError(500, reason='Response Encoding Failure')
else:
if set_content_type:
self.set_header('Content-Type', content_type)
self.add_header('Vary', 'Accept')
self.write(data_bytes)