headers.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662
  1. from __future__ import annotations
  2. import collections.abc as cabc
  3. import re
  4. import typing as t
  5. from .._internal import _missing
  6. from ..exceptions import BadRequestKeyError
  7. from .mixins import ImmutableHeadersMixin
  8. from .structures import iter_multi_items
  9. from .structures import MultiDict
  10. if t.TYPE_CHECKING:
  11. import typing_extensions as te
  12. from _typeshed.wsgi import WSGIEnvironment
  13. T = t.TypeVar("T")
  14. class Headers:
  15. """An object that stores some headers. It has a dict-like interface,
  16. but is ordered, can store the same key multiple times, and iterating
  17. yields ``(key, value)`` pairs instead of only keys.
  18. This data structure is useful if you want a nicer way to handle WSGI
  19. headers which are stored as tuples in a list.
  20. From Werkzeug 0.3 onwards, the :exc:`KeyError` raised by this class is
  21. also a subclass of the :class:`~exceptions.BadRequest` HTTP exception
  22. and will render a page for a ``400 BAD REQUEST`` if caught in a
  23. catch-all for HTTP exceptions.
  24. Headers is mostly compatible with the Python :class:`wsgiref.headers.Headers`
  25. class, with the exception of `__getitem__`. :mod:`wsgiref` will return
  26. `None` for ``headers['missing']``, whereas :class:`Headers` will raise
  27. a :class:`KeyError`.
  28. To create a new ``Headers`` object, pass it a list, dict, or
  29. other ``Headers`` object with default values. These values are
  30. validated the same way values added later are.
  31. :param defaults: The list of default values for the :class:`Headers`.
  32. .. versionchanged:: 3.1
  33. Implement ``|`` and ``|=`` operators.
  34. .. versionchanged:: 2.1.0
  35. Default values are validated the same as values added later.
  36. .. versionchanged:: 0.9
  37. This data structure now stores unicode values similar to how the
  38. multi dicts do it. The main difference is that bytes can be set as
  39. well which will automatically be latin1 decoded.
  40. .. versionchanged:: 0.9
  41. The :meth:`linked` function was removed without replacement as it
  42. was an API that does not support the changes to the encoding model.
  43. """
  44. def __init__(
  45. self,
  46. defaults: (
  47. Headers
  48. | MultiDict[str, t.Any]
  49. | cabc.Mapping[str, t.Any | list[t.Any] | tuple[t.Any, ...] | set[t.Any]]
  50. | cabc.Iterable[tuple[str, t.Any]]
  51. | None
  52. ) = None,
  53. ) -> None:
  54. self._list: list[tuple[str, str]] = []
  55. if defaults is not None:
  56. self.extend(defaults)
  57. @t.overload
  58. def __getitem__(self, key: str) -> str: ...
  59. @t.overload
  60. def __getitem__(self, key: int) -> tuple[str, str]: ...
  61. @t.overload
  62. def __getitem__(self, key: slice) -> te.Self: ...
  63. def __getitem__(self, key: str | int | slice) -> str | tuple[str, str] | te.Self:
  64. if isinstance(key, str):
  65. return self._get_key(key)
  66. if isinstance(key, int):
  67. return self._list[key]
  68. return self.__class__(self._list[key])
  69. def _get_key(self, key: str) -> str:
  70. ikey = key.lower()
  71. for k, v in self._list:
  72. if k.lower() == ikey:
  73. return v
  74. raise BadRequestKeyError(key)
  75. def __eq__(self, other: object) -> bool:
  76. if other.__class__ is not self.__class__:
  77. return NotImplemented
  78. def lowered(item: tuple[str, ...]) -> tuple[str, ...]:
  79. return item[0].lower(), *item[1:]
  80. return set(map(lowered, other._list)) == set(map(lowered, self._list)) # type: ignore[attr-defined]
  81. __hash__ = None # type: ignore[assignment]
  82. @t.overload
  83. def get(self, key: str) -> str | None: ...
  84. @t.overload
  85. def get(self, key: str, default: str) -> str: ...
  86. @t.overload
  87. def get(self, key: str, default: T) -> str | T: ...
  88. @t.overload
  89. def get(self, key: str, type: cabc.Callable[[str], T]) -> T | None: ...
  90. @t.overload
  91. def get(self, key: str, default: T, type: cabc.Callable[[str], T]) -> T: ...
  92. def get( # type: ignore[misc]
  93. self,
  94. key: str,
  95. default: str | T | None = None,
  96. type: cabc.Callable[[str], T] | None = None,
  97. ) -> str | T | None:
  98. """Return the default value if the requested data doesn't exist.
  99. If `type` is provided and is a callable it should convert the value,
  100. return it or raise a :exc:`ValueError` if that is not possible. In
  101. this case the function will return the default as if the value was not
  102. found:
  103. >>> d = Headers([('Content-Length', '42')])
  104. >>> d.get('Content-Length', type=int)
  105. 42
  106. :param key: The key to be looked up.
  107. :param default: The default value to be returned if the key can't
  108. be looked up. If not further specified `None` is
  109. returned.
  110. :param type: A callable that is used to cast the value in the
  111. :class:`Headers`. If a :exc:`ValueError` is raised
  112. by this callable the default value is returned.
  113. .. versionchanged:: 3.0
  114. The ``as_bytes`` parameter was removed.
  115. .. versionchanged:: 0.9
  116. The ``as_bytes`` parameter was added.
  117. """
  118. try:
  119. rv = self._get_key(key)
  120. except KeyError:
  121. return default
  122. if type is None:
  123. return rv
  124. try:
  125. return type(rv)
  126. except ValueError:
  127. return default
  128. @t.overload
  129. def getlist(self, key: str) -> list[str]: ...
  130. @t.overload
  131. def getlist(self, key: str, type: cabc.Callable[[str], T]) -> list[T]: ...
  132. def getlist(
  133. self, key: str, type: cabc.Callable[[str], T] | None = None
  134. ) -> list[str] | list[T]:
  135. """Return the list of items for a given key. If that key is not in the
  136. :class:`Headers`, the return value will be an empty list. Just like
  137. :meth:`get`, :meth:`getlist` accepts a `type` parameter. All items will
  138. be converted with the callable defined there.
  139. :param key: The key to be looked up.
  140. :param type: A callable that is used to cast the value in the
  141. :class:`Headers`. If a :exc:`ValueError` is raised
  142. by this callable the value will be removed from the list.
  143. :return: a :class:`list` of all the values for the key.
  144. .. versionchanged:: 3.0
  145. The ``as_bytes`` parameter was removed.
  146. .. versionchanged:: 0.9
  147. The ``as_bytes`` parameter was added.
  148. """
  149. ikey = key.lower()
  150. if type is not None:
  151. result = []
  152. for k, v in self:
  153. if k.lower() == ikey:
  154. try:
  155. result.append(type(v))
  156. except ValueError:
  157. continue
  158. return result
  159. return [v for k, v in self if k.lower() == ikey]
  160. def get_all(self, name: str) -> list[str]:
  161. """Return a list of all the values for the named field.
  162. This method is compatible with the :mod:`wsgiref`
  163. :meth:`~wsgiref.headers.Headers.get_all` method.
  164. """
  165. return self.getlist(name)
  166. def items(self, lower: bool = False) -> t.Iterable[tuple[str, str]]:
  167. for key, value in self:
  168. if lower:
  169. key = key.lower()
  170. yield key, value
  171. def keys(self, lower: bool = False) -> t.Iterable[str]:
  172. for key, _ in self.items(lower):
  173. yield key
  174. def values(self) -> t.Iterable[str]:
  175. for _, value in self.items():
  176. yield value
  177. def extend(
  178. self,
  179. arg: (
  180. Headers
  181. | MultiDict[str, t.Any]
  182. | cabc.Mapping[str, t.Any | list[t.Any] | tuple[t.Any, ...] | set[t.Any]]
  183. | cabc.Iterable[tuple[str, t.Any]]
  184. | None
  185. ) = None,
  186. /,
  187. **kwargs: str,
  188. ) -> None:
  189. """Extend headers in this object with items from another object
  190. containing header items as well as keyword arguments.
  191. To replace existing keys instead of extending, use
  192. :meth:`update` instead.
  193. If provided, the first argument can be another :class:`Headers`
  194. object, a :class:`MultiDict`, :class:`dict`, or iterable of
  195. pairs.
  196. .. versionchanged:: 1.0
  197. Support :class:`MultiDict`. Allow passing ``kwargs``.
  198. """
  199. if arg is not None:
  200. for key, value in iter_multi_items(arg):
  201. self.add(key, value)
  202. for key, value in iter_multi_items(kwargs):
  203. self.add(key, value)
  204. def __delitem__(self, key: str | int | slice) -> None:
  205. if isinstance(key, str):
  206. self._del_key(key)
  207. return
  208. del self._list[key]
  209. def _del_key(self, key: str) -> None:
  210. key = key.lower()
  211. new = []
  212. for k, v in self._list:
  213. if k.lower() != key:
  214. new.append((k, v))
  215. self._list[:] = new
  216. def remove(self, key: str) -> None:
  217. """Remove a key.
  218. :param key: The key to be removed.
  219. """
  220. return self._del_key(key)
  221. @t.overload
  222. def pop(self) -> tuple[str, str]: ...
  223. @t.overload
  224. def pop(self, key: str) -> str: ...
  225. @t.overload
  226. def pop(self, key: int | None = ...) -> tuple[str, str]: ...
  227. @t.overload
  228. def pop(self, key: str, default: str) -> str: ...
  229. @t.overload
  230. def pop(self, key: str, default: T) -> str | T: ...
  231. def pop(
  232. self,
  233. key: str | int | None = None,
  234. default: str | T = _missing, # type: ignore[assignment]
  235. ) -> str | tuple[str, str] | T:
  236. """Removes and returns a key or index.
  237. :param key: The key to be popped. If this is an integer the item at
  238. that position is removed, if it's a string the value for
  239. that key is. If the key is omitted or `None` the last
  240. item is removed.
  241. :return: an item.
  242. """
  243. if key is None:
  244. return self._list.pop()
  245. if isinstance(key, int):
  246. return self._list.pop(key)
  247. try:
  248. rv = self._get_key(key)
  249. except KeyError:
  250. if default is not _missing:
  251. return default
  252. raise
  253. self.remove(key)
  254. return rv
  255. def popitem(self) -> tuple[str, str]:
  256. """Removes a key or index and returns a (key, value) item."""
  257. return self._list.pop()
  258. def __contains__(self, key: str) -> bool:
  259. """Check if a key is present."""
  260. try:
  261. self._get_key(key)
  262. except KeyError:
  263. return False
  264. return True
  265. def __iter__(self) -> t.Iterator[tuple[str, str]]:
  266. """Yield ``(key, value)`` tuples."""
  267. return iter(self._list)
  268. def __len__(self) -> int:
  269. return len(self._list)
  270. def add(self, key: str, value: t.Any, /, **kwargs: t.Any) -> None:
  271. """Add a new header tuple to the list.
  272. Keyword arguments can specify additional parameters for the header
  273. value, with underscores converted to dashes::
  274. >>> d = Headers()
  275. >>> d.add('Content-Type', 'text/plain')
  276. >>> d.add('Content-Disposition', 'attachment', filename='foo.png')
  277. The keyword argument dumping uses :func:`dump_options_header`
  278. behind the scenes.
  279. .. versionchanged:: 0.4.1
  280. keyword arguments were added for :mod:`wsgiref` compatibility.
  281. """
  282. if kwargs:
  283. value = _options_header_vkw(value, kwargs)
  284. value_str = _str_header_value(value)
  285. self._list.append((key, value_str))
  286. def add_header(self, key: str, value: t.Any, /, **kwargs: t.Any) -> None:
  287. """Add a new header tuple to the list.
  288. An alias for :meth:`add` for compatibility with the :mod:`wsgiref`
  289. :meth:`~wsgiref.headers.Headers.add_header` method.
  290. """
  291. self.add(key, value, **kwargs)
  292. def clear(self) -> None:
  293. """Clears all headers."""
  294. self._list.clear()
  295. def set(self, key: str, value: t.Any, /, **kwargs: t.Any) -> None:
  296. """Remove all header tuples for `key` and add a new one. The newly
  297. added key either appears at the end of the list if there was no
  298. entry or replaces the first one.
  299. Keyword arguments can specify additional parameters for the header
  300. value, with underscores converted to dashes. See :meth:`add` for
  301. more information.
  302. .. versionchanged:: 0.6.1
  303. :meth:`set` now accepts the same arguments as :meth:`add`.
  304. :param key: The key to be inserted.
  305. :param value: The value to be inserted.
  306. """
  307. if kwargs:
  308. value = _options_header_vkw(value, kwargs)
  309. value_str = _str_header_value(value)
  310. if not self._list:
  311. self._list.append((key, value_str))
  312. return
  313. iter_list = iter(self._list)
  314. ikey = key.lower()
  315. for idx, (old_key, _) in enumerate(iter_list):
  316. if old_key.lower() == ikey:
  317. # replace first occurrence
  318. self._list[idx] = (key, value_str)
  319. break
  320. else:
  321. # no existing occurrences
  322. self._list.append((key, value_str))
  323. return
  324. # remove remaining occurrences
  325. self._list[idx + 1 :] = [t for t in iter_list if t[0].lower() != ikey]
  326. def setlist(self, key: str, values: cabc.Iterable[t.Any]) -> None:
  327. """Remove any existing values for a header and add new ones.
  328. :param key: The header key to set.
  329. :param values: An iterable of values to set for the key.
  330. .. versionadded:: 1.0
  331. """
  332. if values:
  333. values_iter = iter(values)
  334. self.set(key, next(values_iter))
  335. for value in values_iter:
  336. self.add(key, value)
  337. else:
  338. self.remove(key)
  339. def setdefault(self, key: str, default: t.Any) -> str:
  340. """Return the first value for the key if it is in the headers,
  341. otherwise set the header to the value given by ``default`` and
  342. return that.
  343. :param key: The header key to get.
  344. :param default: The value to set for the key if it is not in the
  345. headers.
  346. """
  347. try:
  348. return self._get_key(key)
  349. except KeyError:
  350. pass
  351. self.set(key, default)
  352. return self._get_key(key)
  353. def setlistdefault(self, key: str, default: cabc.Iterable[t.Any]) -> list[str]:
  354. """Return the list of values for the key if it is in the
  355. headers, otherwise set the header to the list of values given
  356. by ``default`` and return that.
  357. Unlike :meth:`MultiDict.setlistdefault`, modifying the returned
  358. list will not affect the headers.
  359. :param key: The header key to get.
  360. :param default: An iterable of values to set for the key if it
  361. is not in the headers.
  362. .. versionadded:: 1.0
  363. """
  364. if key not in self:
  365. self.setlist(key, default)
  366. return self.getlist(key)
  367. @t.overload
  368. def __setitem__(self, key: str, value: t.Any) -> None: ...
  369. @t.overload
  370. def __setitem__(self, key: int, value: tuple[str, t.Any]) -> None: ...
  371. @t.overload
  372. def __setitem__(
  373. self, key: slice, value: cabc.Iterable[tuple[str, t.Any]]
  374. ) -> None: ...
  375. def __setitem__(
  376. self,
  377. key: str | int | slice,
  378. value: t.Any | tuple[str, t.Any] | cabc.Iterable[tuple[str, t.Any]],
  379. ) -> None:
  380. """Like :meth:`set` but also supports index/slice based setting."""
  381. if isinstance(key, str):
  382. self.set(key, value)
  383. elif isinstance(key, int):
  384. self._list[key] = value[0], _str_header_value(value[1]) # type: ignore[index]
  385. else:
  386. self._list[key] = [(k, _str_header_value(v)) for k, v in value] # type: ignore[misc]
  387. def update(
  388. self,
  389. arg: (
  390. Headers
  391. | MultiDict[str, t.Any]
  392. | cabc.Mapping[
  393. str, t.Any | list[t.Any] | tuple[t.Any, ...] | cabc.Set[t.Any]
  394. ]
  395. | cabc.Iterable[tuple[str, t.Any]]
  396. | None
  397. ) = None,
  398. /,
  399. **kwargs: t.Any | list[t.Any] | tuple[t.Any, ...] | cabc.Set[t.Any],
  400. ) -> None:
  401. """Replace headers in this object with items from another
  402. headers object and keyword arguments.
  403. To extend existing keys instead of replacing, use :meth:`extend`
  404. instead.
  405. If provided, the first argument can be another :class:`Headers`
  406. object, a :class:`MultiDict`, :class:`dict`, or iterable of
  407. pairs.
  408. .. versionadded:: 1.0
  409. """
  410. if arg is not None:
  411. if isinstance(arg, (Headers, MultiDict)):
  412. for key in arg.keys():
  413. self.setlist(key, arg.getlist(key))
  414. elif isinstance(arg, cabc.Mapping):
  415. for key, value in arg.items():
  416. if isinstance(value, (list, tuple, set)):
  417. self.setlist(key, value)
  418. else:
  419. self.set(key, value)
  420. else:
  421. for key, value in arg:
  422. self.set(key, value)
  423. for key, value in kwargs.items():
  424. if isinstance(value, (list, tuple, set)):
  425. self.setlist(key, value)
  426. else:
  427. self.set(key, value)
  428. def __or__(
  429. self,
  430. other: cabc.Mapping[
  431. str, t.Any | list[t.Any] | tuple[t.Any, ...] | cabc.Set[t.Any]
  432. ],
  433. ) -> te.Self:
  434. if not isinstance(other, cabc.Mapping):
  435. return NotImplemented
  436. rv = self.copy()
  437. rv.update(other)
  438. return rv
  439. def __ior__(
  440. self,
  441. other: (
  442. cabc.Mapping[str, t.Any | list[t.Any] | tuple[t.Any, ...] | cabc.Set[t.Any]]
  443. | cabc.Iterable[tuple[str, t.Any]]
  444. ),
  445. ) -> te.Self:
  446. if not isinstance(other, (cabc.Mapping, cabc.Iterable)):
  447. return NotImplemented
  448. self.update(other)
  449. return self
  450. def to_wsgi_list(self) -> list[tuple[str, str]]:
  451. """Convert the headers into a list suitable for WSGI.
  452. :return: list
  453. """
  454. return list(self)
  455. def copy(self) -> te.Self:
  456. return self.__class__(self._list)
  457. def __copy__(self) -> te.Self:
  458. return self.copy()
  459. def __str__(self) -> str:
  460. """Returns formatted headers suitable for HTTP transmission."""
  461. strs = []
  462. for key, value in self.to_wsgi_list():
  463. strs.append(f"{key}: {value}")
  464. strs.append("\r\n")
  465. return "\r\n".join(strs)
  466. def __repr__(self) -> str:
  467. return f"{type(self).__name__}({list(self)!r})"
  468. def _options_header_vkw(value: str, kw: dict[str, t.Any]) -> str:
  469. return http.dump_options_header(
  470. value, {k.replace("_", "-"): v for k, v in kw.items()}
  471. )
  472. _newline_re = re.compile(r"[\r\n]")
  473. def _str_header_value(value: t.Any) -> str:
  474. if not isinstance(value, str):
  475. value = str(value)
  476. if _newline_re.search(value) is not None:
  477. raise ValueError("Header values must not contain newline characters.")
  478. return value # type: ignore[no-any-return]
  479. class EnvironHeaders(ImmutableHeadersMixin, Headers): # type: ignore[misc]
  480. """Read only version of the headers from a WSGI environment. This
  481. provides the same interface as `Headers` and is constructed from
  482. a WSGI environment.
  483. From Werkzeug 0.3 onwards, the `KeyError` raised by this class is also a
  484. subclass of the :exc:`~exceptions.BadRequest` HTTP exception and will
  485. render a page for a ``400 BAD REQUEST`` if caught in a catch-all for
  486. HTTP exceptions.
  487. """
  488. def __init__(self, environ: WSGIEnvironment) -> None:
  489. super().__init__()
  490. self.environ = environ
  491. def __eq__(self, other: object) -> bool:
  492. if not isinstance(other, EnvironHeaders):
  493. return NotImplemented
  494. return self.environ is other.environ
  495. __hash__ = None # type: ignore[assignment]
  496. def __getitem__(self, key: str) -> str: # type: ignore[override]
  497. return self._get_key(key)
  498. def _get_key(self, key: str) -> str:
  499. if not isinstance(key, str):
  500. raise BadRequestKeyError(key)
  501. key = key.upper().replace("-", "_")
  502. if key in {"CONTENT_TYPE", "CONTENT_LENGTH"}:
  503. return self.environ[key] # type: ignore[no-any-return]
  504. return self.environ[f"HTTP_{key}"] # type: ignore[no-any-return]
  505. def __len__(self) -> int:
  506. return sum(1 for _ in self)
  507. def __iter__(self) -> cabc.Iterator[tuple[str, str]]:
  508. for key, value in self.environ.items():
  509. if key.startswith("HTTP_") and key not in {
  510. "HTTP_CONTENT_TYPE",
  511. "HTTP_CONTENT_LENGTH",
  512. }:
  513. yield key[5:].replace("_", "-").title(), value
  514. elif key in {"CONTENT_TYPE", "CONTENT_LENGTH"} and value:
  515. yield key.replace("_", "-").title(), value
  516. def copy(self) -> t.NoReturn:
  517. raise TypeError(f"cannot create {type(self).__name__!r} copies")
  518. def __or__(self, other: t.Any) -> t.NoReturn:
  519. raise TypeError(f"cannot create {type(self).__name__!r} copies")
  520. # circular dependencies
  521. from .. import http