file_proxy.py 1.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354
  1. import io
  2. from typing import IO, TYPE_CHECKING, Any, List
  3. from .ansi import AnsiDecoder
  4. from .text import Text
  5. if TYPE_CHECKING:
  6. from .console import Console
  7. class FileProxy(io.TextIOBase):
  8. """Wraps a file (e.g. sys.stdout) and redirects writes to a console."""
  9. def __init__(self, console: "Console", file: IO[str]) -> None:
  10. self.__console = console
  11. self.__file = file
  12. self.__buffer: List[str] = []
  13. self.__ansi_decoder = AnsiDecoder()
  14. @property
  15. def rich_proxied_file(self) -> IO[str]:
  16. """Get proxied file."""
  17. return self.__file
  18. def __getattr__(self, name: str) -> Any:
  19. return getattr(self.__file, name)
  20. def write(self, text: str) -> int:
  21. if not isinstance(text, str):
  22. raise TypeError(f"write() argument must be str, not {type(text).__name__}")
  23. buffer = self.__buffer
  24. lines: List[str] = []
  25. while text:
  26. line, new_line, text = text.partition("\n")
  27. if new_line:
  28. lines.append("".join(buffer) + line)
  29. del buffer[:]
  30. else:
  31. buffer.append(line)
  32. break
  33. if lines:
  34. console = self.__console
  35. with console:
  36. output = Text("\n").join(
  37. self.__ansi_decoder.decode_line(line) for line in lines
  38. )
  39. console.print(output)
  40. return len(text)
  41. def flush(self) -> None:
  42. output = "".join(self.__buffer)
  43. if output:
  44. self.__console.print(output)
  45. del self.__buffer[:]