# -*- coding=utf-8 -*-
import functools
import os
import signal
import sys
import threading
import time
import typing
from io import StringIO
import colorama
from .cursor import hide_cursor, show_cursor
from .misc import decode_for_output, to_text
from .termcolors import COLOR_MAP, COLORS, DISABLE_COLORS, colored
if typing.TYPE_CHECKING:
from typing import (
Any,
Callable,
ContextManager,
Dict,
IO,
Optional,
Text,
Type,
TypeVar,
Union,
)
TSignalMap = Dict[
Type[signal.SIGINT],
Callable[..., int, str, Union["DummySpinner", "VistirSpinner"]],
]
_T = TypeVar("_T", covariant=True)
try:
import yaspin
import yaspin.spinners
import yaspin.core
Spinners = yaspin.spinners.Spinners
SpinBase = yaspin.core.Yaspin
except ImportError: # pragma: no cover
yaspin = None
Spinners = None
SpinBase = None
if os.name == "nt": # pragma: no cover
def handler(signum, frame, spinner):
"""Signal handler, used to gracefully shut down the ``spinner`` instance
when specified signal is received by the process running the ``spinner``.
``signum`` and ``frame`` are mandatory arguments. Check ``signal.signal``
function for more details.
"""
spinner.fail()
spinner.stop()
else: # pragma: no cover
[docs] def handler(signum, frame, spinner):
"""Signal handler, used to gracefully shut down the ``spinner`` instance
when specified signal is received by the process running the ``spinner``.
``signum`` and ``frame`` are mandatory arguments. Check ``signal.signal``
function for more details.
"""
spinner.red.fail("✘")
spinner.stop()
CLEAR_LINE = chr(27) + "[K"
TRANSLATION_MAP = {10004: u"OK", 10008: u"x"}
decode_output = functools.partial(decode_for_output, translation_map=TRANSLATION_MAP)
[docs]class DummySpinner(object):
def __init__(self, text="", **kwargs):
# type: (str, Any) -> None
if DISABLE_COLORS:
colorama.init()
self.text = decode_output(text) if text else ""
self.stdout = kwargs.get("stdout", sys.stdout)
self.stderr = kwargs.get("stderr", sys.stderr)
self.out_buff = StringIO()
self.write_to_stdout = kwargs.get("write_to_stdout", False)
super(DummySpinner, self).__init__()
def __enter__(self):
if self.text and self.text != "None":
if self.write_to_stdout:
self.write(self.text)
return self
def __exit__(self, exc_type, exc_val, tb):
if exc_type:
import traceback
formatted_tb = traceback.format_exception(exc_type, exc_val, tb)
self.write_err("".join(formatted_tb))
self._close_output_buffer()
return False
def __getattr__(self, k): # pragma: no cover
try:
retval = super(DummySpinner, self).__getattribute__(k)
except AttributeError:
if k in COLOR_MAP.keys() or k.upper() in COLORS:
return self
raise
else:
return retval
def _close_output_buffer(self):
if self.out_buff and not self.out_buff.closed:
try:
self.out_buff.close()
except Exception:
pass
[docs] def fail(self, exitcode=1, text="FAIL"):
# type: (int, str) -> None
if text is not None and text != "None":
if self.write_to_stdout:
self.write(text)
else:
self.write_err(text)
self._close_output_buffer()
[docs] def ok(self, text="OK"):
# type: (str) -> int
if text is not None and text != "None":
if self.write_to_stdout:
self.write(text)
else:
self.write_err(text)
self._close_output_buffer()
return 0
[docs] def hide_and_write(self, text, target=None):
# type: (str, Optional[str]) -> None
if not target:
target = self.stdout
if text is None or isinstance(text, str) and text == "None":
pass
target.write(decode_output(u"\r", target_stream=target))
self._hide_cursor(target=target)
target.write(decode_output(u"{0}\n".format(text), target_stream=target))
target.write(CLEAR_LINE)
self._show_cursor(target=target)
[docs] def write(self, text=None):
# type: (Optional[str]) -> None
if not self.write_to_stdout:
return self.write_err(text)
if text is None or isinstance(text, str) and text == "None":
pass
if not self.stdout.closed:
stdout = self.stdout
else:
stdout = sys.stdout
stdout.write(decode_output(u"\r", target_stream=stdout))
text = to_text(text)
line = decode_output(u"{0}\n".format(text), target_stream=stdout)
stdout.write(line)
stdout.write(CLEAR_LINE)
[docs] def write_err(self, text=None):
# type: (Optional[str]) -> None
if text is None or isinstance(text, str) and text == "None":
pass
text = to_text(text)
if not self.stderr.closed:
stderr = self.stderr
else:
if sys.stderr.closed:
print(text)
return
stderr = sys.stderr
stderr.write(decode_output(u"\r", target_stream=stderr))
line = decode_output(u"{0}\n".format(text), target_stream=stderr)
stderr.write(line)
stderr.write(CLEAR_LINE)
@staticmethod
def _hide_cursor(target=None):
# type: (Optional[IO]) -> None
pass
@staticmethod
def _show_cursor(target=None):
# type: (Optional[IO]) -> None
pass
if SpinBase is None:
SpinBase = DummySpinner
[docs]class VistirSpinner(SpinBase):
"A spinner class for handling spinners on windows and posix."
def __init__(self, *args, **kwargs):
# type: (Any, Any)
"""
Get a spinner object or a dummy spinner to wrap a context.
Keyword Arguments:
:param str spinner_name: A spinner type e.g. "dots" or "bouncingBar" (default: {"bouncingBar"})
:param str start_text: Text to start off the spinner with (default: {None})
:param dict handler_map: Handler map for signals to be handled gracefully (default: {None})
:param bool nospin: If true, use the dummy spinner (default: {False})
:param bool write_to_stdout: Writes to stdout if true, otherwise writes to stderr (default: True)
"""
self.handler = handler
colorama.init()
sigmap = {} # type: TSignalMap
if handler:
sigmap.update({signal.SIGINT: handler, signal.SIGTERM: handler})
handler_map = kwargs.pop("handler_map", {})
if os.name == "nt":
sigmap[signal.SIGBREAK] = handler
else:
sigmap[signal.SIGALRM] = handler
if handler_map:
sigmap.update(handler_map)
spinner_name = kwargs.pop("spinner_name", "bouncingBar")
start_text = kwargs.pop("start_text", None)
_text = kwargs.pop("text", "Running...")
kwargs["text"] = start_text if start_text is not None else _text
kwargs["sigmap"] = sigmap
kwargs["spinner"] = getattr(Spinners, spinner_name, "")
write_to_stdout = kwargs.pop("write_to_stdout", True)
self.stdout = kwargs.pop("stdout", sys.stdout)
self.stderr = kwargs.pop("stderr", sys.stderr)
self.out_buff = StringIO()
self.write_to_stdout = write_to_stdout
self.is_dummy = bool(yaspin is None)
self._stop_spin = None # type: Optional[threading.Event]
self._hide_spin = None # type: Optional[threading.Event]
self._spin_thread = None # type: Optional[threading.Thread]
super(VistirSpinner, self).__init__(*args, **kwargs)
if DISABLE_COLORS:
colorama.deinit()
[docs] def ok(self, text=u"OK", err=False):
# type: (str, bool) -> None
"""Set Ok (success) finalizer to a spinner."""
# Do not display spin text for ok state
self._text = None
_text = to_text(text) if text else u"OK"
err = err or not self.write_to_stdout
self._freeze(_text, err=err)
[docs] def fail(self, text=u"FAIL", err=False):
# type: (str, bool) -> None
"""Set fail finalizer to a spinner."""
# Do not display spin text for fail state
self._text = None
_text = text if text else u"FAIL"
err = err or not self.write_to_stdout
self._freeze(_text, err=err)
[docs] def hide_and_write(self, text, target=None):
# type: (str, Optional[str]) -> None
if not target:
target = self.stdout
if text is None or isinstance(text, str) and text == u"None":
pass
target.write(decode_output(u"\r"))
self._hide_cursor(target=target)
target.write(decode_output(u"{0}\n".format(text)))
target.write(CLEAR_LINE)
self._show_cursor(target=target)
[docs] def write(self, text): # pragma: no cover
# type: (str) -> None
if not self.write_to_stdout:
return self.write_err(text)
stdout = self.stdout
if self.stdout.closed:
stdout = sys.stdout
stdout.write(decode_output(u"\r", target_stream=stdout))
stdout.write(decode_output(CLEAR_LINE, target_stream=stdout))
if text is None:
text = ""
text = decode_output(u"{0}\n".format(text), target_stream=stdout)
stdout.write(text)
self.out_buff.write(text)
[docs] def write_err(self, text): # pragma: no cover
# type: (str) -> None
"""Write error text in the terminal without breaking the spinner."""
stderr = self.stderr
if self.stderr.closed:
stderr = sys.stderr
stderr.write(decode_output(u"\r", target_stream=stderr))
stderr.write(decode_output(CLEAR_LINE, target_stream=stderr))
if text is None:
text = ""
text = decode_output(u"{0}\n".format(text), target_stream=stderr)
self.stderr.write(text)
self.out_buff.write(decode_output(text, target_stream=self.out_buff))
[docs] def start(self):
# type: () -> None
if self._sigmap:
self._register_signal_handlers()
target = self.stdout if self.write_to_stdout else self.stderr
if target.isatty():
self._hide_cursor(target=target)
self._stop_spin = threading.Event()
self._hide_spin = threading.Event()
self._spin_thread = threading.Thread(target=self._spin)
self._spin_thread.start()
[docs] def stop(self):
# type: () -> None
if self._dfl_sigmap:
# Reset registered signal handlers to default ones
self._reset_signal_handlers()
if self._spin_thread:
self._stop_spin.set()
self._spin_thread.join()
target = self.stdout if self.write_to_stdout else self.stderr
if target.isatty():
target.write("\r")
if self.write_to_stdout:
self._clear_line()
else:
self._clear_err()
if target.isatty():
self._show_cursor(target=target)
self.out_buff.close()
def _freeze(self, final_text, err=False):
# type: (str, bool) -> None
"""Stop spinner, compose last frame and 'freeze' it."""
if not final_text:
final_text = ""
target = self.stderr if err else self.stdout
if target.closed:
target = sys.stderr if err else sys.stdout
text = to_text(final_text)
last_frame = self._compose_out(text, mode="last")
self._last_frame = decode_output(last_frame, target_stream=target)
# Should be stopped here, otherwise prints after
# self._freeze call will mess up the spinner
self.stop()
target.write(self._last_frame)
def _compose_color_func(self):
# type: () -> Callable[..., str]
fn = functools.partial(
colored, color=self._color, on_color=self._on_color, attrs=list(self._attrs)
)
return fn
def _compose_out(self, frame, mode=None):
# type: (str, Optional[str]) -> Text
# Ensure Unicode input
frame = to_text(frame)
if self._text is None:
self._text = u""
text = to_text(self._text)
if self._color_func is not None:
frame = self._color_func(frame)
if self._side == "right":
frame, text = text, frame
# Mode
frame = to_text(frame)
if not mode:
out = u"\r{0} {1}".format(frame, text)
else:
out = u"{0} {1}\n".format(frame, text)
return out
def _spin(self):
# type: () -> None
target = self.stdout if self.write_to_stdout else self.stderr
clear_fn = self._clear_line if self.write_to_stdout else self._clear_err
while not self._stop_spin.is_set():
if self._hide_spin.is_set():
# Wait a bit to avoid wasting cycles
time.sleep(self._interval)
continue
# Compose output
spin_phase = next(self._cycle)
out = self._compose_out(spin_phase)
out = decode_output(out, target)
# Write
target.write(out)
clear_fn()
target.flush()
# Wait
time.sleep(self._interval)
target.write("\b")
def _register_signal_handlers(self):
# type: () -> None
# SIGKILL cannot be caught or ignored, and the receiving
# process cannot perform any clean-up upon receiving this
# signal.
try:
if signal.SIGKILL in self._sigmap.keys():
raise ValueError(
"Trying to set handler for SIGKILL signal. "
"SIGKILL cannot be cought or ignored in POSIX systems."
)
except AttributeError:
pass
for sig, sig_handler in self._sigmap.items():
# A handler for a particular signal, once set, remains
# installed until it is explicitly reset. Store default
# signal handlers for subsequent reset at cleanup phase.
dfl_handler = signal.getsignal(sig)
self._dfl_sigmap[sig] = dfl_handler
# ``signal.SIG_DFL`` and ``signal.SIG_IGN`` are also valid
# signal handlers and are not callables.
if callable(sig_handler):
# ``signal.signal`` accepts handler function which is
# called with two arguments: signal number and the
# interrupted stack frame. ``functools.partial`` solves
# the problem of passing spinner instance into the handler
# function.
sig_handler = functools.partial(sig_handler, spinner=self)
signal.signal(sig, sig_handler)
def _reset_signal_handlers(self):
# type: () -> None
for sig, sig_handler in self._dfl_sigmap.items():
signal.signal(sig, sig_handler)
@staticmethod
def _hide_cursor(target=None):
# type: (Optional[IO]) -> None
if not target:
target = sys.stdout
hide_cursor(stream=target)
@staticmethod
def _show_cursor(target=None):
# type: (Optional[IO]) -> None
if not target:
target = sys.stdout
show_cursor(stream=target)
@staticmethod
def _clear_err():
# type: () -> None
sys.stderr.write(CLEAR_LINE)
@staticmethod
def _clear_line():
# type: () -> None
sys.stdout.write(CLEAR_LINE)