_termui_impl.py 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788
  1. """
  2. This module contains implementations for the termui module. To keep the
  3. import time of Click down, some infrequently used functionality is
  4. placed in this module and only imported as needed.
  5. """
  6. import contextlib
  7. import math
  8. import os
  9. import sys
  10. import time
  11. import typing as t
  12. from gettext import gettext as _
  13. from io import StringIO
  14. from shutil import which
  15. from types import TracebackType
  16. from ._compat import _default_text_stdout
  17. from ._compat import CYGWIN
  18. from ._compat import get_best_encoding
  19. from ._compat import isatty
  20. from ._compat import open_stream
  21. from ._compat import strip_ansi
  22. from ._compat import term_len
  23. from ._compat import WIN
  24. from .exceptions import ClickException
  25. from .utils import echo
  26. V = t.TypeVar("V")
  27. if os.name == "nt":
  28. BEFORE_BAR = "\r"
  29. AFTER_BAR = "\n"
  30. else:
  31. BEFORE_BAR = "\r\033[?25l"
  32. AFTER_BAR = "\033[?25h\n"
  33. class ProgressBar(t.Generic[V]):
  34. def __init__(
  35. self,
  36. iterable: t.Optional[t.Iterable[V]],
  37. length: t.Optional[int] = None,
  38. fill_char: str = "#",
  39. empty_char: str = " ",
  40. bar_template: str = "%(bar)s",
  41. info_sep: str = " ",
  42. show_eta: bool = True,
  43. show_percent: t.Optional[bool] = None,
  44. show_pos: bool = False,
  45. item_show_func: t.Optional[t.Callable[[t.Optional[V]], t.Optional[str]]] = None,
  46. label: t.Optional[str] = None,
  47. file: t.Optional[t.TextIO] = None,
  48. color: t.Optional[bool] = None,
  49. update_min_steps: int = 1,
  50. width: int = 30,
  51. ) -> None:
  52. self.fill_char = fill_char
  53. self.empty_char = empty_char
  54. self.bar_template = bar_template
  55. self.info_sep = info_sep
  56. self.show_eta = show_eta
  57. self.show_percent = show_percent
  58. self.show_pos = show_pos
  59. self.item_show_func = item_show_func
  60. self.label: str = label or ""
  61. if file is None:
  62. file = _default_text_stdout()
  63. # There are no standard streams attached to write to. For example,
  64. # pythonw on Windows.
  65. if file is None:
  66. file = StringIO()
  67. self.file = file
  68. self.color = color
  69. self.update_min_steps = update_min_steps
  70. self._completed_intervals = 0
  71. self.width: int = width
  72. self.autowidth: bool = width == 0
  73. if length is None:
  74. from operator import length_hint
  75. length = length_hint(iterable, -1)
  76. if length == -1:
  77. length = None
  78. if iterable is None:
  79. if length is None:
  80. raise TypeError("iterable or length is required")
  81. iterable = t.cast(t.Iterable[V], range(length))
  82. self.iter: t.Iterable[V] = iter(iterable)
  83. self.length = length
  84. self.pos = 0
  85. self.avg: t.List[float] = []
  86. self.last_eta: float
  87. self.start: float
  88. self.start = self.last_eta = time.time()
  89. self.eta_known: bool = False
  90. self.finished: bool = False
  91. self.max_width: t.Optional[int] = None
  92. self.entered: bool = False
  93. self.current_item: t.Optional[V] = None
  94. self.is_hidden: bool = not isatty(self.file)
  95. self._last_line: t.Optional[str] = None
  96. def __enter__(self) -> "ProgressBar[V]":
  97. self.entered = True
  98. self.render_progress()
  99. return self
  100. def __exit__(
  101. self,
  102. exc_type: t.Optional[t.Type[BaseException]],
  103. exc_value: t.Optional[BaseException],
  104. tb: t.Optional[TracebackType],
  105. ) -> None:
  106. self.render_finish()
  107. def __iter__(self) -> t.Iterator[V]:
  108. if not self.entered:
  109. raise RuntimeError("You need to use progress bars in a with block.")
  110. self.render_progress()
  111. return self.generator()
  112. def __next__(self) -> V:
  113. # Iteration is defined in terms of a generator function,
  114. # returned by iter(self); use that to define next(). This works
  115. # because `self.iter` is an iterable consumed by that generator,
  116. # so it is re-entry safe. Calling `next(self.generator())`
  117. # twice works and does "what you want".
  118. return next(iter(self))
  119. def render_finish(self) -> None:
  120. if self.is_hidden:
  121. return
  122. self.file.write(AFTER_BAR)
  123. self.file.flush()
  124. @property
  125. def pct(self) -> float:
  126. if self.finished:
  127. return 1.0
  128. return min(self.pos / (float(self.length or 1) or 1), 1.0)
  129. @property
  130. def time_per_iteration(self) -> float:
  131. if not self.avg:
  132. return 0.0
  133. return sum(self.avg) / float(len(self.avg))
  134. @property
  135. def eta(self) -> float:
  136. if self.length is not None and not self.finished:
  137. return self.time_per_iteration * (self.length - self.pos)
  138. return 0.0
  139. def format_eta(self) -> str:
  140. if self.eta_known:
  141. t = int(self.eta)
  142. seconds = t % 60
  143. t //= 60
  144. minutes = t % 60
  145. t //= 60
  146. hours = t % 24
  147. t //= 24
  148. if t > 0:
  149. return f"{t}d {hours:02}:{minutes:02}:{seconds:02}"
  150. else:
  151. return f"{hours:02}:{minutes:02}:{seconds:02}"
  152. return ""
  153. def format_pos(self) -> str:
  154. pos = str(self.pos)
  155. if self.length is not None:
  156. pos += f"/{self.length}"
  157. return pos
  158. def format_pct(self) -> str:
  159. return f"{int(self.pct * 100): 4}%"[1:]
  160. def format_bar(self) -> str:
  161. if self.length is not None:
  162. bar_length = int(self.pct * self.width)
  163. bar = self.fill_char * bar_length
  164. bar += self.empty_char * (self.width - bar_length)
  165. elif self.finished:
  166. bar = self.fill_char * self.width
  167. else:
  168. chars = list(self.empty_char * (self.width or 1))
  169. if self.time_per_iteration != 0:
  170. chars[
  171. int(
  172. (math.cos(self.pos * self.time_per_iteration) / 2.0 + 0.5)
  173. * self.width
  174. )
  175. ] = self.fill_char
  176. bar = "".join(chars)
  177. return bar
  178. def format_progress_line(self) -> str:
  179. show_percent = self.show_percent
  180. info_bits = []
  181. if self.length is not None and show_percent is None:
  182. show_percent = not self.show_pos
  183. if self.show_pos:
  184. info_bits.append(self.format_pos())
  185. if show_percent:
  186. info_bits.append(self.format_pct())
  187. if self.show_eta and self.eta_known and not self.finished:
  188. info_bits.append(self.format_eta())
  189. if self.item_show_func is not None:
  190. item_info = self.item_show_func(self.current_item)
  191. if item_info is not None:
  192. info_bits.append(item_info)
  193. return (
  194. self.bar_template
  195. % {
  196. "label": self.label,
  197. "bar": self.format_bar(),
  198. "info": self.info_sep.join(info_bits),
  199. }
  200. ).rstrip()
  201. def render_progress(self) -> None:
  202. import shutil
  203. if self.is_hidden:
  204. # Only output the label as it changes if the output is not a
  205. # TTY. Use file=stderr if you expect to be piping stdout.
  206. if self._last_line != self.label:
  207. self._last_line = self.label
  208. echo(self.label, file=self.file, color=self.color)
  209. return
  210. buf = []
  211. # Update width in case the terminal has been resized
  212. if self.autowidth:
  213. old_width = self.width
  214. self.width = 0
  215. clutter_length = term_len(self.format_progress_line())
  216. new_width = max(0, shutil.get_terminal_size().columns - clutter_length)
  217. if new_width < old_width:
  218. buf.append(BEFORE_BAR)
  219. buf.append(" " * self.max_width) # type: ignore
  220. self.max_width = new_width
  221. self.width = new_width
  222. clear_width = self.width
  223. if self.max_width is not None:
  224. clear_width = self.max_width
  225. buf.append(BEFORE_BAR)
  226. line = self.format_progress_line()
  227. line_len = term_len(line)
  228. if self.max_width is None or self.max_width < line_len:
  229. self.max_width = line_len
  230. buf.append(line)
  231. buf.append(" " * (clear_width - line_len))
  232. line = "".join(buf)
  233. # Render the line only if it changed.
  234. if line != self._last_line:
  235. self._last_line = line
  236. echo(line, file=self.file, color=self.color, nl=False)
  237. self.file.flush()
  238. def make_step(self, n_steps: int) -> None:
  239. self.pos += n_steps
  240. if self.length is not None and self.pos >= self.length:
  241. self.finished = True
  242. if (time.time() - self.last_eta) < 1.0:
  243. return
  244. self.last_eta = time.time()
  245. # self.avg is a rolling list of length <= 7 of steps where steps are
  246. # defined as time elapsed divided by the total progress through
  247. # self.length.
  248. if self.pos:
  249. step = (time.time() - self.start) / self.pos
  250. else:
  251. step = time.time() - self.start
  252. self.avg = self.avg[-6:] + [step]
  253. self.eta_known = self.length is not None
  254. def update(self, n_steps: int, current_item: t.Optional[V] = None) -> None:
  255. """Update the progress bar by advancing a specified number of
  256. steps, and optionally set the ``current_item`` for this new
  257. position.
  258. :param n_steps: Number of steps to advance.
  259. :param current_item: Optional item to set as ``current_item``
  260. for the updated position.
  261. .. versionchanged:: 8.0
  262. Added the ``current_item`` optional parameter.
  263. .. versionchanged:: 8.0
  264. Only render when the number of steps meets the
  265. ``update_min_steps`` threshold.
  266. """
  267. if current_item is not None:
  268. self.current_item = current_item
  269. self._completed_intervals += n_steps
  270. if self._completed_intervals >= self.update_min_steps:
  271. self.make_step(self._completed_intervals)
  272. self.render_progress()
  273. self._completed_intervals = 0
  274. def finish(self) -> None:
  275. self.eta_known = False
  276. self.current_item = None
  277. self.finished = True
  278. def generator(self) -> t.Iterator[V]:
  279. """Return a generator which yields the items added to the bar
  280. during construction, and updates the progress bar *after* the
  281. yielded block returns.
  282. """
  283. # WARNING: the iterator interface for `ProgressBar` relies on
  284. # this and only works because this is a simple generator which
  285. # doesn't create or manage additional state. If this function
  286. # changes, the impact should be evaluated both against
  287. # `iter(bar)` and `next(bar)`. `next()` in particular may call
  288. # `self.generator()` repeatedly, and this must remain safe in
  289. # order for that interface to work.
  290. if not self.entered:
  291. raise RuntimeError("You need to use progress bars in a with block.")
  292. if self.is_hidden:
  293. yield from self.iter
  294. else:
  295. for rv in self.iter:
  296. self.current_item = rv
  297. # This allows show_item_func to be updated before the
  298. # item is processed. Only trigger at the beginning of
  299. # the update interval.
  300. if self._completed_intervals == 0:
  301. self.render_progress()
  302. yield rv
  303. self.update(1)
  304. self.finish()
  305. self.render_progress()
  306. def pager(generator: t.Iterable[str], color: t.Optional[bool] = None) -> None:
  307. """Decide what method to use for paging through text."""
  308. stdout = _default_text_stdout()
  309. # There are no standard streams attached to write to. For example,
  310. # pythonw on Windows.
  311. if stdout is None:
  312. stdout = StringIO()
  313. if not isatty(sys.stdin) or not isatty(stdout):
  314. return _nullpager(stdout, generator, color)
  315. pager_cmd = (os.environ.get("PAGER", None) or "").strip()
  316. if pager_cmd:
  317. if WIN:
  318. if _tempfilepager(generator, pager_cmd, color):
  319. return
  320. elif _pipepager(generator, pager_cmd, color):
  321. return
  322. if os.environ.get("TERM") in ("dumb", "emacs"):
  323. return _nullpager(stdout, generator, color)
  324. if (WIN or sys.platform.startswith("os2")) and _tempfilepager(
  325. generator, "more", color
  326. ):
  327. return
  328. if _pipepager(generator, "less", color):
  329. return
  330. import tempfile
  331. fd, filename = tempfile.mkstemp()
  332. os.close(fd)
  333. try:
  334. if _pipepager(generator, "more", color):
  335. return
  336. return _nullpager(stdout, generator, color)
  337. finally:
  338. os.unlink(filename)
  339. def _pipepager(generator: t.Iterable[str], cmd: str, color: t.Optional[bool]) -> bool:
  340. """Page through text by feeding it to another program. Invoking a
  341. pager through this might support colors.
  342. Returns True if the command was found, False otherwise and thus another
  343. pager should be attempted.
  344. """
  345. cmd_absolute = which(cmd)
  346. if cmd_absolute is None:
  347. return False
  348. import subprocess
  349. env = dict(os.environ)
  350. # If we're piping to less we might support colors under the
  351. # condition that
  352. cmd_detail = cmd.rsplit("/", 1)[-1].split()
  353. if color is None and cmd_detail[0] == "less":
  354. less_flags = f"{os.environ.get('LESS', '')}{' '.join(cmd_detail[1:])}"
  355. if not less_flags:
  356. env["LESS"] = "-R"
  357. color = True
  358. elif "r" in less_flags or "R" in less_flags:
  359. color = True
  360. c = subprocess.Popen(
  361. [cmd_absolute],
  362. shell=True,
  363. stdin=subprocess.PIPE,
  364. env=env,
  365. errors="replace",
  366. text=True,
  367. )
  368. assert c.stdin is not None
  369. try:
  370. for text in generator:
  371. if not color:
  372. text = strip_ansi(text)
  373. c.stdin.write(text)
  374. except (OSError, KeyboardInterrupt):
  375. pass
  376. else:
  377. c.stdin.close()
  378. # Less doesn't respect ^C, but catches it for its own UI purposes (aborting
  379. # search or other commands inside less).
  380. #
  381. # That means when the user hits ^C, the parent process (click) terminates,
  382. # but less is still alive, paging the output and messing up the terminal.
  383. #
  384. # If the user wants to make the pager exit on ^C, they should set
  385. # `LESS='-K'`. It's not our decision to make.
  386. while True:
  387. try:
  388. c.wait()
  389. except KeyboardInterrupt:
  390. pass
  391. else:
  392. break
  393. return True
  394. def _tempfilepager(
  395. generator: t.Iterable[str],
  396. cmd: str,
  397. color: t.Optional[bool],
  398. ) -> bool:
  399. """Page through text by invoking a program on a temporary file.
  400. Returns True if the command was found, False otherwise and thus another
  401. pager should be attempted.
  402. """
  403. # Which is necessary for Windows, it is also recommended in the Popen docs.
  404. cmd_absolute = which(cmd)
  405. if cmd_absolute is None:
  406. return False
  407. import subprocess
  408. import tempfile
  409. fd, filename = tempfile.mkstemp()
  410. # TODO: This never terminates if the passed generator never terminates.
  411. text = "".join(generator)
  412. if not color:
  413. text = strip_ansi(text)
  414. encoding = get_best_encoding(sys.stdout)
  415. with open_stream(filename, "wb")[0] as f:
  416. f.write(text.encode(encoding))
  417. try:
  418. subprocess.call([cmd_absolute, filename])
  419. except OSError:
  420. # Command not found
  421. pass
  422. finally:
  423. os.close(fd)
  424. os.unlink(filename)
  425. return True
  426. def _nullpager(
  427. stream: t.TextIO, generator: t.Iterable[str], color: t.Optional[bool]
  428. ) -> None:
  429. """Simply print unformatted text. This is the ultimate fallback."""
  430. for text in generator:
  431. if not color:
  432. text = strip_ansi(text)
  433. stream.write(text)
  434. class Editor:
  435. def __init__(
  436. self,
  437. editor: t.Optional[str] = None,
  438. env: t.Optional[t.Mapping[str, str]] = None,
  439. require_save: bool = True,
  440. extension: str = ".txt",
  441. ) -> None:
  442. self.editor = editor
  443. self.env = env
  444. self.require_save = require_save
  445. self.extension = extension
  446. def get_editor(self) -> str:
  447. if self.editor is not None:
  448. return self.editor
  449. for key in "VISUAL", "EDITOR":
  450. rv = os.environ.get(key)
  451. if rv:
  452. return rv
  453. if WIN:
  454. return "notepad"
  455. for editor in "sensible-editor", "vim", "nano":
  456. if which(editor) is not None:
  457. return editor
  458. return "vi"
  459. def edit_file(self, filename: str) -> None:
  460. import subprocess
  461. editor = self.get_editor()
  462. environ: t.Optional[t.Dict[str, str]] = None
  463. if self.env:
  464. environ = os.environ.copy()
  465. environ.update(self.env)
  466. try:
  467. c = subprocess.Popen(f'{editor} "{filename}"', env=environ, shell=True)
  468. exit_code = c.wait()
  469. if exit_code != 0:
  470. raise ClickException(
  471. _("{editor}: Editing failed").format(editor=editor)
  472. )
  473. except OSError as e:
  474. raise ClickException(
  475. _("{editor}: Editing failed: {e}").format(editor=editor, e=e)
  476. ) from e
  477. def edit(self, text: t.Optional[t.AnyStr]) -> t.Optional[t.AnyStr]:
  478. import tempfile
  479. if not text:
  480. data = b""
  481. elif isinstance(text, (bytes, bytearray)):
  482. data = text
  483. else:
  484. if text and not text.endswith("\n"):
  485. text += "\n"
  486. if WIN:
  487. data = text.replace("\n", "\r\n").encode("utf-8-sig")
  488. else:
  489. data = text.encode("utf-8")
  490. fd, name = tempfile.mkstemp(prefix="editor-", suffix=self.extension)
  491. f: t.BinaryIO
  492. try:
  493. with os.fdopen(fd, "wb") as f:
  494. f.write(data)
  495. # If the filesystem resolution is 1 second, like Mac OS
  496. # 10.12 Extended, or 2 seconds, like FAT32, and the editor
  497. # closes very fast, require_save can fail. Set the modified
  498. # time to be 2 seconds in the past to work around this.
  499. os.utime(name, (os.path.getatime(name), os.path.getmtime(name) - 2))
  500. # Depending on the resolution, the exact value might not be
  501. # recorded, so get the new recorded value.
  502. timestamp = os.path.getmtime(name)
  503. self.edit_file(name)
  504. if self.require_save and os.path.getmtime(name) == timestamp:
  505. return None
  506. with open(name, "rb") as f:
  507. rv = f.read()
  508. if isinstance(text, (bytes, bytearray)):
  509. return rv
  510. return rv.decode("utf-8-sig").replace("\r\n", "\n") # type: ignore
  511. finally:
  512. os.unlink(name)
  513. def open_url(url: str, wait: bool = False, locate: bool = False) -> int:
  514. import subprocess
  515. def _unquote_file(url: str) -> str:
  516. from urllib.parse import unquote
  517. if url.startswith("file://"):
  518. url = unquote(url[7:])
  519. return url
  520. if sys.platform == "darwin":
  521. args = ["open"]
  522. if wait:
  523. args.append("-W")
  524. if locate:
  525. args.append("-R")
  526. args.append(_unquote_file(url))
  527. null = open("/dev/null", "w")
  528. try:
  529. return subprocess.Popen(args, stderr=null).wait()
  530. finally:
  531. null.close()
  532. elif WIN:
  533. if locate:
  534. url = _unquote_file(url)
  535. args = ["explorer", f"/select,{url}"]
  536. else:
  537. args = ["start"]
  538. if wait:
  539. args.append("/WAIT")
  540. args.append("")
  541. args.append(url)
  542. try:
  543. return subprocess.call(args)
  544. except OSError:
  545. # Command not found
  546. return 127
  547. elif CYGWIN:
  548. if locate:
  549. url = _unquote_file(url)
  550. args = ["cygstart", os.path.dirname(url)]
  551. else:
  552. args = ["cygstart"]
  553. if wait:
  554. args.append("-w")
  555. args.append(url)
  556. try:
  557. return subprocess.call(args)
  558. except OSError:
  559. # Command not found
  560. return 127
  561. try:
  562. if locate:
  563. url = os.path.dirname(_unquote_file(url)) or "."
  564. else:
  565. url = _unquote_file(url)
  566. c = subprocess.Popen(["xdg-open", url])
  567. if wait:
  568. return c.wait()
  569. return 0
  570. except OSError:
  571. if url.startswith(("http://", "https://")) and not locate and not wait:
  572. import webbrowser
  573. webbrowser.open(url)
  574. return 0
  575. return 1
  576. def _translate_ch_to_exc(ch: str) -> t.Optional[BaseException]:
  577. if ch == "\x03":
  578. raise KeyboardInterrupt()
  579. if ch == "\x04" and not WIN: # Unix-like, Ctrl+D
  580. raise EOFError()
  581. if ch == "\x1a" and WIN: # Windows, Ctrl+Z
  582. raise EOFError()
  583. return None
  584. if WIN:
  585. import msvcrt
  586. @contextlib.contextmanager
  587. def raw_terminal() -> t.Iterator[int]:
  588. yield -1
  589. def getchar(echo: bool) -> str:
  590. # The function `getch` will return a bytes object corresponding to
  591. # the pressed character. Since Windows 10 build 1803, it will also
  592. # return \x00 when called a second time after pressing a regular key.
  593. #
  594. # `getwch` does not share this probably-bugged behavior. Moreover, it
  595. # returns a Unicode object by default, which is what we want.
  596. #
  597. # Either of these functions will return \x00 or \xe0 to indicate
  598. # a special key, and you need to call the same function again to get
  599. # the "rest" of the code. The fun part is that \u00e0 is
  600. # "latin small letter a with grave", so if you type that on a French
  601. # keyboard, you _also_ get a \xe0.
  602. # E.g., consider the Up arrow. This returns \xe0 and then \x48. The
  603. # resulting Unicode string reads as "a with grave" + "capital H".
  604. # This is indistinguishable from when the user actually types
  605. # "a with grave" and then "capital H".
  606. #
  607. # When \xe0 is returned, we assume it's part of a special-key sequence
  608. # and call `getwch` again, but that means that when the user types
  609. # the \u00e0 character, `getchar` doesn't return until a second
  610. # character is typed.
  611. # The alternative is returning immediately, but that would mess up
  612. # cross-platform handling of arrow keys and others that start with
  613. # \xe0. Another option is using `getch`, but then we can't reliably
  614. # read non-ASCII characters, because return values of `getch` are
  615. # limited to the current 8-bit codepage.
  616. #
  617. # Anyway, Click doesn't claim to do this Right(tm), and using `getwch`
  618. # is doing the right thing in more situations than with `getch`.
  619. func: t.Callable[[], str]
  620. if echo:
  621. func = msvcrt.getwche # type: ignore
  622. else:
  623. func = msvcrt.getwch # type: ignore
  624. rv = func()
  625. if rv in ("\x00", "\xe0"):
  626. # \x00 and \xe0 are control characters that indicate special key,
  627. # see above.
  628. rv += func()
  629. _translate_ch_to_exc(rv)
  630. return rv
  631. else:
  632. import termios
  633. import tty
  634. @contextlib.contextmanager
  635. def raw_terminal() -> t.Iterator[int]:
  636. f: t.Optional[t.TextIO]
  637. fd: int
  638. if not isatty(sys.stdin):
  639. f = open("/dev/tty")
  640. fd = f.fileno()
  641. else:
  642. fd = sys.stdin.fileno()
  643. f = None
  644. try:
  645. old_settings = termios.tcgetattr(fd)
  646. try:
  647. tty.setraw(fd)
  648. yield fd
  649. finally:
  650. termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
  651. sys.stdout.flush()
  652. if f is not None:
  653. f.close()
  654. except termios.error:
  655. pass
  656. def getchar(echo: bool) -> str:
  657. with raw_terminal() as fd:
  658. ch = os.read(fd, 32).decode(get_best_encoding(sys.stdin), "replace")
  659. if echo and isatty(sys.stdout):
  660. sys.stdout.write(ch)
  661. _translate_ch_to_exc(ch)
  662. return ch