_internal.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211
  1. from __future__ import annotations
  2. import logging
  3. import re
  4. import sys
  5. import typing as t
  6. from datetime import datetime
  7. from datetime import timezone
  8. if t.TYPE_CHECKING:
  9. from _typeshed.wsgi import WSGIEnvironment
  10. from .wrappers.request import Request
  11. _logger: logging.Logger | None = None
  12. class _Missing:
  13. def __repr__(self) -> str:
  14. return "no value"
  15. def __reduce__(self) -> str:
  16. return "_missing"
  17. _missing = _Missing()
  18. def _wsgi_decoding_dance(s: str) -> str:
  19. return s.encode("latin1").decode(errors="replace")
  20. def _wsgi_encoding_dance(s: str) -> str:
  21. return s.encode().decode("latin1")
  22. def _get_environ(obj: WSGIEnvironment | Request) -> WSGIEnvironment:
  23. env = getattr(obj, "environ", obj)
  24. assert isinstance(
  25. env, dict
  26. ), f"{type(obj).__name__!r} is not a WSGI environment (has to be a dict)"
  27. return env
  28. def _has_level_handler(logger: logging.Logger) -> bool:
  29. """Check if there is a handler in the logging chain that will handle
  30. the given logger's effective level.
  31. """
  32. level = logger.getEffectiveLevel()
  33. current = logger
  34. while current:
  35. if any(handler.level <= level for handler in current.handlers):
  36. return True
  37. if not current.propagate:
  38. break
  39. current = current.parent # type: ignore
  40. return False
  41. class _ColorStreamHandler(logging.StreamHandler): # type: ignore[type-arg]
  42. """On Windows, wrap stream with Colorama for ANSI style support."""
  43. def __init__(self) -> None:
  44. try:
  45. import colorama
  46. except ImportError:
  47. stream = None
  48. else:
  49. stream = colorama.AnsiToWin32(sys.stderr)
  50. super().__init__(stream)
  51. def _log(type: str, message: str, *args: t.Any, **kwargs: t.Any) -> None:
  52. """Log a message to the 'werkzeug' logger.
  53. The logger is created the first time it is needed. If there is no
  54. level set, it is set to :data:`logging.INFO`. If there is no handler
  55. for the logger's effective level, a :class:`logging.StreamHandler`
  56. is added.
  57. """
  58. global _logger
  59. if _logger is None:
  60. _logger = logging.getLogger("werkzeug")
  61. if _logger.level == logging.NOTSET:
  62. _logger.setLevel(logging.INFO)
  63. if not _has_level_handler(_logger):
  64. _logger.addHandler(_ColorStreamHandler())
  65. getattr(_logger, type)(message.rstrip(), *args, **kwargs)
  66. @t.overload
  67. def _dt_as_utc(dt: None) -> None: ...
  68. @t.overload
  69. def _dt_as_utc(dt: datetime) -> datetime: ...
  70. def _dt_as_utc(dt: datetime | None) -> datetime | None:
  71. if dt is None:
  72. return dt
  73. if dt.tzinfo is None:
  74. return dt.replace(tzinfo=timezone.utc)
  75. elif dt.tzinfo != timezone.utc:
  76. return dt.astimezone(timezone.utc)
  77. return dt
  78. _TAccessorValue = t.TypeVar("_TAccessorValue")
  79. class _DictAccessorProperty(t.Generic[_TAccessorValue]):
  80. """Baseclass for `environ_property` and `header_property`."""
  81. read_only = False
  82. def __init__(
  83. self,
  84. name: str,
  85. default: _TAccessorValue | None = None,
  86. load_func: t.Callable[[str], _TAccessorValue] | None = None,
  87. dump_func: t.Callable[[_TAccessorValue], str] | None = None,
  88. read_only: bool | None = None,
  89. doc: str | None = None,
  90. ) -> None:
  91. self.name = name
  92. self.default = default
  93. self.load_func = load_func
  94. self.dump_func = dump_func
  95. if read_only is not None:
  96. self.read_only = read_only
  97. self.__doc__ = doc
  98. def lookup(self, instance: t.Any) -> t.MutableMapping[str, t.Any]:
  99. raise NotImplementedError
  100. @t.overload
  101. def __get__(
  102. self, instance: None, owner: type
  103. ) -> _DictAccessorProperty[_TAccessorValue]: ...
  104. @t.overload
  105. def __get__(self, instance: t.Any, owner: type) -> _TAccessorValue: ...
  106. def __get__(
  107. self, instance: t.Any | None, owner: type
  108. ) -> _TAccessorValue | _DictAccessorProperty[_TAccessorValue]:
  109. if instance is None:
  110. return self
  111. storage = self.lookup(instance)
  112. if self.name not in storage:
  113. return self.default # type: ignore
  114. value = storage[self.name]
  115. if self.load_func is not None:
  116. try:
  117. return self.load_func(value)
  118. except (ValueError, TypeError):
  119. return self.default # type: ignore
  120. return value # type: ignore
  121. def __set__(self, instance: t.Any, value: _TAccessorValue) -> None:
  122. if self.read_only:
  123. raise AttributeError("read only property")
  124. if self.dump_func is not None:
  125. self.lookup(instance)[self.name] = self.dump_func(value)
  126. else:
  127. self.lookup(instance)[self.name] = value
  128. def __delete__(self, instance: t.Any) -> None:
  129. if self.read_only:
  130. raise AttributeError("read only property")
  131. self.lookup(instance).pop(self.name, None)
  132. def __repr__(self) -> str:
  133. return f"<{type(self).__name__} {self.name}>"
  134. _plain_int_re = re.compile(r"-?\d+", re.ASCII)
  135. def _plain_int(value: str) -> int:
  136. """Parse an int only if it is only ASCII digits and ``-``.
  137. This disallows ``+``, ``_``, and non-ASCII digits, which are accepted by ``int`` but
  138. are not allowed in HTTP header values.
  139. Any leading or trailing whitespace is stripped
  140. """
  141. value = value.strip()
  142. if _plain_int_re.fullmatch(value) is None:
  143. raise ValueError
  144. return int(value)