file_storage.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209
  1. from __future__ import annotations
  2. import collections.abc as cabc
  3. import mimetypes
  4. import os
  5. import typing as t
  6. from io import BytesIO
  7. from os import fsdecode
  8. from os import fspath
  9. from .._internal import _plain_int
  10. from .headers import Headers
  11. from .structures import MultiDict
  12. class FileStorage:
  13. """The :class:`FileStorage` class is a thin wrapper over incoming files.
  14. It is used by the request object to represent uploaded files. All the
  15. attributes of the wrapper stream are proxied by the file storage so
  16. it's possible to do ``storage.read()`` instead of the long form
  17. ``storage.stream.read()``.
  18. """
  19. def __init__(
  20. self,
  21. stream: t.IO[bytes] | None = None,
  22. filename: str | None = None,
  23. name: str | None = None,
  24. content_type: str | None = None,
  25. content_length: int | None = None,
  26. headers: Headers | None = None,
  27. ):
  28. self.name = name
  29. self.stream = stream or BytesIO()
  30. # If no filename is provided, attempt to get the filename from
  31. # the stream object. Python names special streams like
  32. # ``<stderr>`` with angular brackets, skip these streams.
  33. if filename is None:
  34. filename = getattr(stream, "name", None)
  35. if filename is not None:
  36. filename = fsdecode(filename)
  37. if filename and filename[0] == "<" and filename[-1] == ">":
  38. filename = None
  39. else:
  40. filename = fsdecode(filename)
  41. self.filename = filename
  42. if headers is None:
  43. headers = Headers()
  44. self.headers = headers
  45. if content_type is not None:
  46. headers["Content-Type"] = content_type
  47. if content_length is not None:
  48. headers["Content-Length"] = str(content_length)
  49. def _parse_content_type(self) -> None:
  50. if not hasattr(self, "_parsed_content_type"):
  51. self._parsed_content_type = http.parse_options_header(self.content_type)
  52. @property
  53. def content_type(self) -> str | None:
  54. """The content-type sent in the header. Usually not available"""
  55. return self.headers.get("content-type")
  56. @property
  57. def content_length(self) -> int:
  58. """The content-length sent in the header. Usually not available"""
  59. if "content-length" in self.headers:
  60. try:
  61. return _plain_int(self.headers["content-length"])
  62. except ValueError:
  63. pass
  64. return 0
  65. @property
  66. def mimetype(self) -> str:
  67. """Like :attr:`content_type`, but without parameters (eg, without
  68. charset, type etc.) and always lowercase. For example if the content
  69. type is ``text/HTML; charset=utf-8`` the mimetype would be
  70. ``'text/html'``.
  71. .. versionadded:: 0.7
  72. """
  73. self._parse_content_type()
  74. return self._parsed_content_type[0].lower()
  75. @property
  76. def mimetype_params(self) -> dict[str, str]:
  77. """The mimetype parameters as dict. For example if the content
  78. type is ``text/html; charset=utf-8`` the params would be
  79. ``{'charset': 'utf-8'}``.
  80. .. versionadded:: 0.7
  81. """
  82. self._parse_content_type()
  83. return self._parsed_content_type[1]
  84. def save(
  85. self, dst: str | os.PathLike[str] | t.IO[bytes], buffer_size: int = 16384
  86. ) -> None:
  87. """Save the file to a destination path or file object. If the
  88. destination is a file object you have to close it yourself after the
  89. call. The buffer size is the number of bytes held in memory during
  90. the copy process. It defaults to 16KB.
  91. For secure file saving also have a look at :func:`secure_filename`.
  92. :param dst: a filename, :class:`os.PathLike`, or open file
  93. object to write to.
  94. :param buffer_size: Passed as the ``length`` parameter of
  95. :func:`shutil.copyfileobj`.
  96. .. versionchanged:: 1.0
  97. Supports :mod:`pathlib`.
  98. """
  99. from shutil import copyfileobj
  100. close_dst = False
  101. if hasattr(dst, "__fspath__"):
  102. dst = fspath(dst)
  103. if isinstance(dst, str):
  104. dst = open(dst, "wb")
  105. close_dst = True
  106. try:
  107. copyfileobj(self.stream, dst, buffer_size)
  108. finally:
  109. if close_dst:
  110. dst.close()
  111. def close(self) -> None:
  112. """Close the underlying file if possible."""
  113. try:
  114. self.stream.close()
  115. except Exception:
  116. pass
  117. def __bool__(self) -> bool:
  118. return bool(self.filename)
  119. def __getattr__(self, name: str) -> t.Any:
  120. try:
  121. return getattr(self.stream, name)
  122. except AttributeError:
  123. # SpooledTemporaryFile on Python < 3.11 doesn't implement IOBase,
  124. # get the attribute from its backing file instead.
  125. if hasattr(self.stream, "_file"):
  126. return getattr(self.stream._file, name)
  127. raise
  128. def __iter__(self) -> cabc.Iterator[bytes]:
  129. return iter(self.stream)
  130. def __repr__(self) -> str:
  131. return f"<{type(self).__name__}: {self.filename!r} ({self.content_type!r})>"
  132. class FileMultiDict(MultiDict[str, FileStorage]):
  133. """A special :class:`MultiDict` that has convenience methods to add
  134. files to it. This is used for :class:`EnvironBuilder` and generally
  135. useful for unittesting.
  136. .. versionadded:: 0.5
  137. """
  138. def add_file(
  139. self,
  140. name: str,
  141. file: str | os.PathLike[str] | t.IO[bytes] | FileStorage,
  142. filename: str | None = None,
  143. content_type: str | None = None,
  144. ) -> None:
  145. """Adds a new file to the dict. `file` can be a file name or
  146. a :class:`file`-like or a :class:`FileStorage` object.
  147. :param name: the name of the field.
  148. :param file: a filename or :class:`file`-like object
  149. :param filename: an optional filename
  150. :param content_type: an optional content type
  151. """
  152. if isinstance(file, FileStorage):
  153. self.add(name, file)
  154. return
  155. if isinstance(file, (str, os.PathLike)):
  156. if filename is None:
  157. filename = os.fspath(file)
  158. file_obj: t.IO[bytes] = open(file, "rb")
  159. else:
  160. file_obj = file # type: ignore[assignment]
  161. if filename and content_type is None:
  162. content_type = (
  163. mimetypes.guess_type(filename)[0] or "application/octet-stream"
  164. )
  165. self.add(name, FileStorage(file_obj, filename, name, content_type))
  166. # circular dependencies
  167. from .. import http