range.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. from __future__ import annotations
  2. import collections.abc as cabc
  3. import typing as t
  4. from datetime import datetime
  5. if t.TYPE_CHECKING:
  6. import typing_extensions as te
  7. T = t.TypeVar("T")
  8. class IfRange:
  9. """Very simple object that represents the `If-Range` header in parsed
  10. form. It will either have neither a etag or date or one of either but
  11. never both.
  12. .. versionadded:: 0.7
  13. """
  14. def __init__(self, etag: str | None = None, date: datetime | None = None):
  15. #: The etag parsed and unquoted. Ranges always operate on strong
  16. #: etags so the weakness information is not necessary.
  17. self.etag = etag
  18. #: The date in parsed format or `None`.
  19. self.date = date
  20. def to_header(self) -> str:
  21. """Converts the object back into an HTTP header."""
  22. if self.date is not None:
  23. return http.http_date(self.date)
  24. if self.etag is not None:
  25. return http.quote_etag(self.etag)
  26. return ""
  27. def __str__(self) -> str:
  28. return self.to_header()
  29. def __repr__(self) -> str:
  30. return f"<{type(self).__name__} {str(self)!r}>"
  31. class Range:
  32. """Represents a ``Range`` header. All methods only support only
  33. bytes as the unit. Stores a list of ranges if given, but the methods
  34. only work if only one range is provided.
  35. :raise ValueError: If the ranges provided are invalid.
  36. .. versionchanged:: 0.15
  37. The ranges passed in are validated.
  38. .. versionadded:: 0.7
  39. """
  40. def __init__(
  41. self, units: str, ranges: cabc.Sequence[tuple[int, int | None]]
  42. ) -> None:
  43. #: The units of this range. Usually "bytes".
  44. self.units = units
  45. #: A list of ``(begin, end)`` tuples for the range header provided.
  46. #: The ranges are non-inclusive.
  47. self.ranges = ranges
  48. for start, end in ranges:
  49. if start is None or (end is not None and (start < 0 or start >= end)):
  50. raise ValueError(f"{(start, end)} is not a valid range.")
  51. def range_for_length(self, length: int | None) -> tuple[int, int] | None:
  52. """If the range is for bytes, the length is not None and there is
  53. exactly one range and it is satisfiable it returns a ``(start, stop)``
  54. tuple, otherwise `None`.
  55. """
  56. if self.units != "bytes" or length is None or len(self.ranges) != 1:
  57. return None
  58. start, end = self.ranges[0]
  59. if end is None:
  60. end = length
  61. if start < 0:
  62. start += length
  63. if http.is_byte_range_valid(start, end, length):
  64. return start, min(end, length)
  65. return None
  66. def make_content_range(self, length: int | None) -> ContentRange | None:
  67. """Creates a :class:`~werkzeug.datastructures.ContentRange` object
  68. from the current range and given content length.
  69. """
  70. rng = self.range_for_length(length)
  71. if rng is not None:
  72. return ContentRange(self.units, rng[0], rng[1], length)
  73. return None
  74. def to_header(self) -> str:
  75. """Converts the object back into an HTTP header."""
  76. ranges = []
  77. for begin, end in self.ranges:
  78. if end is None:
  79. ranges.append(f"{begin}-" if begin >= 0 else str(begin))
  80. else:
  81. ranges.append(f"{begin}-{end - 1}")
  82. return f"{self.units}={','.join(ranges)}"
  83. def to_content_range_header(self, length: int | None) -> str | None:
  84. """Converts the object into `Content-Range` HTTP header,
  85. based on given length
  86. """
  87. range = self.range_for_length(length)
  88. if range is not None:
  89. return f"{self.units} {range[0]}-{range[1] - 1}/{length}"
  90. return None
  91. def __str__(self) -> str:
  92. return self.to_header()
  93. def __repr__(self) -> str:
  94. return f"<{type(self).__name__} {str(self)!r}>"
  95. class _CallbackProperty(t.Generic[T]):
  96. def __set_name__(self, owner: type[ContentRange], name: str) -> None:
  97. self.attr = f"_{name}"
  98. @t.overload
  99. def __get__(self, instance: None, owner: None) -> te.Self: ...
  100. @t.overload
  101. def __get__(self, instance: ContentRange, owner: type[ContentRange]) -> T: ...
  102. def __get__(
  103. self, instance: ContentRange | None, owner: type[ContentRange] | None
  104. ) -> te.Self | T:
  105. if instance is None:
  106. return self
  107. return instance.__dict__[self.attr] # type: ignore[no-any-return]
  108. def __set__(self, instance: ContentRange, value: T) -> None:
  109. instance.__dict__[self.attr] = value
  110. if instance.on_update is not None:
  111. instance.on_update(instance)
  112. class ContentRange:
  113. """Represents the content range header.
  114. .. versionadded:: 0.7
  115. """
  116. def __init__(
  117. self,
  118. units: str | None,
  119. start: int | None,
  120. stop: int | None,
  121. length: int | None = None,
  122. on_update: cabc.Callable[[ContentRange], None] | None = None,
  123. ) -> None:
  124. self.on_update = on_update
  125. self.set(start, stop, length, units)
  126. #: The units to use, usually "bytes"
  127. units: str | None = _CallbackProperty() # type: ignore[assignment]
  128. #: The start point of the range or `None`.
  129. start: int | None = _CallbackProperty() # type: ignore[assignment]
  130. #: The stop point of the range (non-inclusive) or `None`. Can only be
  131. #: `None` if also start is `None`.
  132. stop: int | None = _CallbackProperty() # type: ignore[assignment]
  133. #: The length of the range or `None`.
  134. length: int | None = _CallbackProperty() # type: ignore[assignment]
  135. def set(
  136. self,
  137. start: int | None,
  138. stop: int | None,
  139. length: int | None = None,
  140. units: str | None = "bytes",
  141. ) -> None:
  142. """Simple method to update the ranges."""
  143. assert http.is_byte_range_valid(start, stop, length), "Bad range provided"
  144. self._units: str | None = units
  145. self._start: int | None = start
  146. self._stop: int | None = stop
  147. self._length: int | None = length
  148. if self.on_update is not None:
  149. self.on_update(self)
  150. def unset(self) -> None:
  151. """Sets the units to `None` which indicates that the header should
  152. no longer be used.
  153. """
  154. self.set(None, None, units=None)
  155. def to_header(self) -> str:
  156. if self._units is None:
  157. return ""
  158. if self._length is None:
  159. length: str | int = "*"
  160. else:
  161. length = self._length
  162. if self._start is None:
  163. return f"{self._units} */{length}"
  164. return f"{self._units} {self._start}-{self._stop - 1}/{length}" # type: ignore[operator]
  165. def __bool__(self) -> bool:
  166. return self._units is not None
  167. def __str__(self) -> str:
  168. return self.to_header()
  169. def __repr__(self) -> str:
  170. return f"<{type(self).__name__} {str(self)!r}>"
  171. # circular dependencies
  172. from .. import http