123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662 |
- from __future__ import annotations
- import collections.abc as cabc
- import re
- import typing as t
- from .._internal import _missing
- from ..exceptions import BadRequestKeyError
- from .mixins import ImmutableHeadersMixin
- from .structures import iter_multi_items
- from .structures import MultiDict
- if t.TYPE_CHECKING:
- import typing_extensions as te
- from _typeshed.wsgi import WSGIEnvironment
- T = t.TypeVar("T")
- class Headers:
- """An object that stores some headers. It has a dict-like interface,
- but is ordered, can store the same key multiple times, and iterating
- yields ``(key, value)`` pairs instead of only keys.
- This data structure is useful if you want a nicer way to handle WSGI
- headers which are stored as tuples in a list.
- From Werkzeug 0.3 onwards, the :exc:`KeyError` raised by this class is
- also a subclass of the :class:`~exceptions.BadRequest` HTTP exception
- and will render a page for a ``400 BAD REQUEST`` if caught in a
- catch-all for HTTP exceptions.
- Headers is mostly compatible with the Python :class:`wsgiref.headers.Headers`
- class, with the exception of `__getitem__`. :mod:`wsgiref` will return
- `None` for ``headers['missing']``, whereas :class:`Headers` will raise
- a :class:`KeyError`.
- To create a new ``Headers`` object, pass it a list, dict, or
- other ``Headers`` object with default values. These values are
- validated the same way values added later are.
- :param defaults: The list of default values for the :class:`Headers`.
- .. versionchanged:: 3.1
- Implement ``|`` and ``|=`` operators.
- .. versionchanged:: 2.1.0
- Default values are validated the same as values added later.
- .. versionchanged:: 0.9
- This data structure now stores unicode values similar to how the
- multi dicts do it. The main difference is that bytes can be set as
- well which will automatically be latin1 decoded.
- .. versionchanged:: 0.9
- The :meth:`linked` function was removed without replacement as it
- was an API that does not support the changes to the encoding model.
- """
- def __init__(
- self,
- defaults: (
- Headers
- | MultiDict[str, t.Any]
- | cabc.Mapping[str, t.Any | list[t.Any] | tuple[t.Any, ...] | set[t.Any]]
- | cabc.Iterable[tuple[str, t.Any]]
- | None
- ) = None,
- ) -> None:
- self._list: list[tuple[str, str]] = []
- if defaults is not None:
- self.extend(defaults)
- @t.overload
- def __getitem__(self, key: str) -> str: ...
- @t.overload
- def __getitem__(self, key: int) -> tuple[str, str]: ...
- @t.overload
- def __getitem__(self, key: slice) -> te.Self: ...
- def __getitem__(self, key: str | int | slice) -> str | tuple[str, str] | te.Self:
- if isinstance(key, str):
- return self._get_key(key)
- if isinstance(key, int):
- return self._list[key]
- return self.__class__(self._list[key])
- def _get_key(self, key: str) -> str:
- ikey = key.lower()
- for k, v in self._list:
- if k.lower() == ikey:
- return v
- raise BadRequestKeyError(key)
- def __eq__(self, other: object) -> bool:
- if other.__class__ is not self.__class__:
- return NotImplemented
- def lowered(item: tuple[str, ...]) -> tuple[str, ...]:
- return item[0].lower(), *item[1:]
- return set(map(lowered, other._list)) == set(map(lowered, self._list)) # type: ignore[attr-defined]
- __hash__ = None # type: ignore[assignment]
- @t.overload
- def get(self, key: str) -> str | None: ...
- @t.overload
- def get(self, key: str, default: str) -> str: ...
- @t.overload
- def get(self, key: str, default: T) -> str | T: ...
- @t.overload
- def get(self, key: str, type: cabc.Callable[[str], T]) -> T | None: ...
- @t.overload
- def get(self, key: str, default: T, type: cabc.Callable[[str], T]) -> T: ...
- def get( # type: ignore[misc]
- self,
- key: str,
- default: str | T | None = None,
- type: cabc.Callable[[str], T] | None = None,
- ) -> str | T | None:
- """Return the default value if the requested data doesn't exist.
- If `type` is provided and is a callable it should convert the value,
- return it or raise a :exc:`ValueError` if that is not possible. In
- this case the function will return the default as if the value was not
- found:
- >>> d = Headers([('Content-Length', '42')])
- >>> d.get('Content-Length', type=int)
- 42
- :param key: The key to be looked up.
- :param default: The default value to be returned if the key can't
- be looked up. If not further specified `None` is
- returned.
- :param type: A callable that is used to cast the value in the
- :class:`Headers`. If a :exc:`ValueError` is raised
- by this callable the default value is returned.
- .. versionchanged:: 3.0
- The ``as_bytes`` parameter was removed.
- .. versionchanged:: 0.9
- The ``as_bytes`` parameter was added.
- """
- try:
- rv = self._get_key(key)
- except KeyError:
- return default
- if type is None:
- return rv
- try:
- return type(rv)
- except ValueError:
- return default
- @t.overload
- def getlist(self, key: str) -> list[str]: ...
- @t.overload
- def getlist(self, key: str, type: cabc.Callable[[str], T]) -> list[T]: ...
- def getlist(
- self, key: str, type: cabc.Callable[[str], T] | None = None
- ) -> list[str] | list[T]:
- """Return the list of items for a given key. If that key is not in the
- :class:`Headers`, the return value will be an empty list. Just like
- :meth:`get`, :meth:`getlist` accepts a `type` parameter. All items will
- be converted with the callable defined there.
- :param key: The key to be looked up.
- :param type: A callable that is used to cast the value in the
- :class:`Headers`. If a :exc:`ValueError` is raised
- by this callable the value will be removed from the list.
- :return: a :class:`list` of all the values for the key.
- .. versionchanged:: 3.0
- The ``as_bytes`` parameter was removed.
- .. versionchanged:: 0.9
- The ``as_bytes`` parameter was added.
- """
- ikey = key.lower()
- if type is not None:
- result = []
- for k, v in self:
- if k.lower() == ikey:
- try:
- result.append(type(v))
- except ValueError:
- continue
- return result
- return [v for k, v in self if k.lower() == ikey]
- def get_all(self, name: str) -> list[str]:
- """Return a list of all the values for the named field.
- This method is compatible with the :mod:`wsgiref`
- :meth:`~wsgiref.headers.Headers.get_all` method.
- """
- return self.getlist(name)
- def items(self, lower: bool = False) -> t.Iterable[tuple[str, str]]:
- for key, value in self:
- if lower:
- key = key.lower()
- yield key, value
- def keys(self, lower: bool = False) -> t.Iterable[str]:
- for key, _ in self.items(lower):
- yield key
- def values(self) -> t.Iterable[str]:
- for _, value in self.items():
- yield value
- def extend(
- self,
- arg: (
- Headers
- | MultiDict[str, t.Any]
- | cabc.Mapping[str, t.Any | list[t.Any] | tuple[t.Any, ...] | set[t.Any]]
- | cabc.Iterable[tuple[str, t.Any]]
- | None
- ) = None,
- /,
- **kwargs: str,
- ) -> None:
- """Extend headers in this object with items from another object
- containing header items as well as keyword arguments.
- To replace existing keys instead of extending, use
- :meth:`update` instead.
- If provided, the first argument can be another :class:`Headers`
- object, a :class:`MultiDict`, :class:`dict`, or iterable of
- pairs.
- .. versionchanged:: 1.0
- Support :class:`MultiDict`. Allow passing ``kwargs``.
- """
- if arg is not None:
- for key, value in iter_multi_items(arg):
- self.add(key, value)
- for key, value in iter_multi_items(kwargs):
- self.add(key, value)
- def __delitem__(self, key: str | int | slice) -> None:
- if isinstance(key, str):
- self._del_key(key)
- return
- del self._list[key]
- def _del_key(self, key: str) -> None:
- key = key.lower()
- new = []
- for k, v in self._list:
- if k.lower() != key:
- new.append((k, v))
- self._list[:] = new
- def remove(self, key: str) -> None:
- """Remove a key.
- :param key: The key to be removed.
- """
- return self._del_key(key)
- @t.overload
- def pop(self) -> tuple[str, str]: ...
- @t.overload
- def pop(self, key: str) -> str: ...
- @t.overload
- def pop(self, key: int | None = ...) -> tuple[str, str]: ...
- @t.overload
- def pop(self, key: str, default: str) -> str: ...
- @t.overload
- def pop(self, key: str, default: T) -> str | T: ...
- def pop(
- self,
- key: str | int | None = None,
- default: str | T = _missing, # type: ignore[assignment]
- ) -> str | tuple[str, str] | T:
- """Removes and returns a key or index.
- :param key: The key to be popped. If this is an integer the item at
- that position is removed, if it's a string the value for
- that key is. If the key is omitted or `None` the last
- item is removed.
- :return: an item.
- """
- if key is None:
- return self._list.pop()
- if isinstance(key, int):
- return self._list.pop(key)
- try:
- rv = self._get_key(key)
- except KeyError:
- if default is not _missing:
- return default
- raise
- self.remove(key)
- return rv
- def popitem(self) -> tuple[str, str]:
- """Removes a key or index and returns a (key, value) item."""
- return self._list.pop()
- def __contains__(self, key: str) -> bool:
- """Check if a key is present."""
- try:
- self._get_key(key)
- except KeyError:
- return False
- return True
- def __iter__(self) -> t.Iterator[tuple[str, str]]:
- """Yield ``(key, value)`` tuples."""
- return iter(self._list)
- def __len__(self) -> int:
- return len(self._list)
- def add(self, key: str, value: t.Any, /, **kwargs: t.Any) -> None:
- """Add a new header tuple to the list.
- Keyword arguments can specify additional parameters for the header
- value, with underscores converted to dashes::
- >>> d = Headers()
- >>> d.add('Content-Type', 'text/plain')
- >>> d.add('Content-Disposition', 'attachment', filename='foo.png')
- The keyword argument dumping uses :func:`dump_options_header`
- behind the scenes.
- .. versionchanged:: 0.4.1
- keyword arguments were added for :mod:`wsgiref` compatibility.
- """
- if kwargs:
- value = _options_header_vkw(value, kwargs)
- value_str = _str_header_value(value)
- self._list.append((key, value_str))
- def add_header(self, key: str, value: t.Any, /, **kwargs: t.Any) -> None:
- """Add a new header tuple to the list.
- An alias for :meth:`add` for compatibility with the :mod:`wsgiref`
- :meth:`~wsgiref.headers.Headers.add_header` method.
- """
- self.add(key, value, **kwargs)
- def clear(self) -> None:
- """Clears all headers."""
- self._list.clear()
- def set(self, key: str, value: t.Any, /, **kwargs: t.Any) -> None:
- """Remove all header tuples for `key` and add a new one. The newly
- added key either appears at the end of the list if there was no
- entry or replaces the first one.
- Keyword arguments can specify additional parameters for the header
- value, with underscores converted to dashes. See :meth:`add` for
- more information.
- .. versionchanged:: 0.6.1
- :meth:`set` now accepts the same arguments as :meth:`add`.
- :param key: The key to be inserted.
- :param value: The value to be inserted.
- """
- if kwargs:
- value = _options_header_vkw(value, kwargs)
- value_str = _str_header_value(value)
- if not self._list:
- self._list.append((key, value_str))
- return
- iter_list = iter(self._list)
- ikey = key.lower()
- for idx, (old_key, _) in enumerate(iter_list):
- if old_key.lower() == ikey:
- # replace first occurrence
- self._list[idx] = (key, value_str)
- break
- else:
- # no existing occurrences
- self._list.append((key, value_str))
- return
- # remove remaining occurrences
- self._list[idx + 1 :] = [t for t in iter_list if t[0].lower() != ikey]
- def setlist(self, key: str, values: cabc.Iterable[t.Any]) -> None:
- """Remove any existing values for a header and add new ones.
- :param key: The header key to set.
- :param values: An iterable of values to set for the key.
- .. versionadded:: 1.0
- """
- if values:
- values_iter = iter(values)
- self.set(key, next(values_iter))
- for value in values_iter:
- self.add(key, value)
- else:
- self.remove(key)
- def setdefault(self, key: str, default: t.Any) -> str:
- """Return the first value for the key if it is in the headers,
- otherwise set the header to the value given by ``default`` and
- return that.
- :param key: The header key to get.
- :param default: The value to set for the key if it is not in the
- headers.
- """
- try:
- return self._get_key(key)
- except KeyError:
- pass
- self.set(key, default)
- return self._get_key(key)
- def setlistdefault(self, key: str, default: cabc.Iterable[t.Any]) -> list[str]:
- """Return the list of values for the key if it is in the
- headers, otherwise set the header to the list of values given
- by ``default`` and return that.
- Unlike :meth:`MultiDict.setlistdefault`, modifying the returned
- list will not affect the headers.
- :param key: The header key to get.
- :param default: An iterable of values to set for the key if it
- is not in the headers.
- .. versionadded:: 1.0
- """
- if key not in self:
- self.setlist(key, default)
- return self.getlist(key)
- @t.overload
- def __setitem__(self, key: str, value: t.Any) -> None: ...
- @t.overload
- def __setitem__(self, key: int, value: tuple[str, t.Any]) -> None: ...
- @t.overload
- def __setitem__(
- self, key: slice, value: cabc.Iterable[tuple[str, t.Any]]
- ) -> None: ...
- def __setitem__(
- self,
- key: str | int | slice,
- value: t.Any | tuple[str, t.Any] | cabc.Iterable[tuple[str, t.Any]],
- ) -> None:
- """Like :meth:`set` but also supports index/slice based setting."""
- if isinstance(key, str):
- self.set(key, value)
- elif isinstance(key, int):
- self._list[key] = value[0], _str_header_value(value[1]) # type: ignore[index]
- else:
- self._list[key] = [(k, _str_header_value(v)) for k, v in value] # type: ignore[misc]
- def update(
- self,
- arg: (
- Headers
- | MultiDict[str, t.Any]
- | cabc.Mapping[
- str, t.Any | list[t.Any] | tuple[t.Any, ...] | cabc.Set[t.Any]
- ]
- | cabc.Iterable[tuple[str, t.Any]]
- | None
- ) = None,
- /,
- **kwargs: t.Any | list[t.Any] | tuple[t.Any, ...] | cabc.Set[t.Any],
- ) -> None:
- """Replace headers in this object with items from another
- headers object and keyword arguments.
- To extend existing keys instead of replacing, use :meth:`extend`
- instead.
- If provided, the first argument can be another :class:`Headers`
- object, a :class:`MultiDict`, :class:`dict`, or iterable of
- pairs.
- .. versionadded:: 1.0
- """
- if arg is not None:
- if isinstance(arg, (Headers, MultiDict)):
- for key in arg.keys():
- self.setlist(key, arg.getlist(key))
- elif isinstance(arg, cabc.Mapping):
- for key, value in arg.items():
- if isinstance(value, (list, tuple, set)):
- self.setlist(key, value)
- else:
- self.set(key, value)
- else:
- for key, value in arg:
- self.set(key, value)
- for key, value in kwargs.items():
- if isinstance(value, (list, tuple, set)):
- self.setlist(key, value)
- else:
- self.set(key, value)
- def __or__(
- self,
- other: cabc.Mapping[
- str, t.Any | list[t.Any] | tuple[t.Any, ...] | cabc.Set[t.Any]
- ],
- ) -> te.Self:
- if not isinstance(other, cabc.Mapping):
- return NotImplemented
- rv = self.copy()
- rv.update(other)
- return rv
- def __ior__(
- self,
- other: (
- cabc.Mapping[str, t.Any | list[t.Any] | tuple[t.Any, ...] | cabc.Set[t.Any]]
- | cabc.Iterable[tuple[str, t.Any]]
- ),
- ) -> te.Self:
- if not isinstance(other, (cabc.Mapping, cabc.Iterable)):
- return NotImplemented
- self.update(other)
- return self
- def to_wsgi_list(self) -> list[tuple[str, str]]:
- """Convert the headers into a list suitable for WSGI.
- :return: list
- """
- return list(self)
- def copy(self) -> te.Self:
- return self.__class__(self._list)
- def __copy__(self) -> te.Self:
- return self.copy()
- def __str__(self) -> str:
- """Returns formatted headers suitable for HTTP transmission."""
- strs = []
- for key, value in self.to_wsgi_list():
- strs.append(f"{key}: {value}")
- strs.append("\r\n")
- return "\r\n".join(strs)
- def __repr__(self) -> str:
- return f"{type(self).__name__}({list(self)!r})"
- def _options_header_vkw(value: str, kw: dict[str, t.Any]) -> str:
- return http.dump_options_header(
- value, {k.replace("_", "-"): v for k, v in kw.items()}
- )
- _newline_re = re.compile(r"[\r\n]")
- def _str_header_value(value: t.Any) -> str:
- if not isinstance(value, str):
- value = str(value)
- if _newline_re.search(value) is not None:
- raise ValueError("Header values must not contain newline characters.")
- return value # type: ignore[no-any-return]
- class EnvironHeaders(ImmutableHeadersMixin, Headers): # type: ignore[misc]
- """Read only version of the headers from a WSGI environment. This
- provides the same interface as `Headers` and is constructed from
- a WSGI environment.
- From Werkzeug 0.3 onwards, the `KeyError` raised by this class is also a
- subclass of the :exc:`~exceptions.BadRequest` HTTP exception and will
- render a page for a ``400 BAD REQUEST`` if caught in a catch-all for
- HTTP exceptions.
- """
- def __init__(self, environ: WSGIEnvironment) -> None:
- super().__init__()
- self.environ = environ
- def __eq__(self, other: object) -> bool:
- if not isinstance(other, EnvironHeaders):
- return NotImplemented
- return self.environ is other.environ
- __hash__ = None # type: ignore[assignment]
- def __getitem__(self, key: str) -> str: # type: ignore[override]
- return self._get_key(key)
- def _get_key(self, key: str) -> str:
- if not isinstance(key, str):
- raise BadRequestKeyError(key)
- key = key.upper().replace("-", "_")
- if key in {"CONTENT_TYPE", "CONTENT_LENGTH"}:
- return self.environ[key] # type: ignore[no-any-return]
- return self.environ[f"HTTP_{key}"] # type: ignore[no-any-return]
- def __len__(self) -> int:
- return sum(1 for _ in self)
- def __iter__(self) -> cabc.Iterator[tuple[str, str]]:
- for key, value in self.environ.items():
- if key.startswith("HTTP_") and key not in {
- "HTTP_CONTENT_TYPE",
- "HTTP_CONTENT_LENGTH",
- }:
- yield key[5:].replace("_", "-").title(), value
- elif key in {"CONTENT_TYPE", "CONTENT_LENGTH"} and value:
- yield key.replace("_", "-").title(), value
- def copy(self) -> t.NoReturn:
- raise TypeError(f"cannot create {type(self).__name__!r} copies")
- def __or__(self, other: t.Any) -> t.NoReturn:
- raise TypeError(f"cannot create {type(self).__name__!r} copies")
- # circular dependencies
- from .. import http
|