Source code for craft_cli.printer

# Copyright 2023 Canonical Ltd.
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

"""The output (for different destinations) handler and helper functions."""

from __future__ import annotations

import itertools
import math
import os
import platform
import queue
import shutil
import sys
import threading
import time
from dataclasses import dataclass, field
from datetime import datetime
from functools import lru_cache
from typing import TYPE_CHECKING, Any, Callable, TextIO

if TYPE_CHECKING:
    import pathlib

# the char used to draw the progress bar ('FULL BLOCK')
_PROGRESS_BAR_SYMBOL = "█"

# seconds before putting the spinner to work
_SPINNER_THRESHOLD = 2

# seconds between each spinner char
_SPINNER_DELAY = 0.1

# set to true when running *application* tests so some behaviours change (see
# craft_cli/pytest_plugin.py )
TESTMODE = False

ANSI_CLEAR_LINE_TO_END = "\x1b[K"  # ANSI escape code to clear the rest of the line.
ANSI_HIDE_CURSOR = "\x1b[?25l"
ANSI_SHOW_CURSOR = "\x1b[?25h"


@dataclass
class _MessageInfo:
    """Comprehensive information for a message that may go to screen and log."""

    stream: TextIO | None
    text: str
    ephemeral: bool = False
    bar_progress: int | float | None = None
    bar_total: int | float | None = None
    use_timestamp: bool = False
    end_line: bool = False
    created_at: datetime = field(default_factory=datetime.now, compare=False)
    terminal_prefix: str = ""


@lru_cache
def _stream_is_terminal(stream: TextIO | None) -> bool:
    is_a_terminal = getattr(stream, "isatty", lambda: False)()
    return is_a_terminal and _get_terminal_width() > 0


def _get_terminal_width() -> int:
    """Return the number of columns of the terminal."""
    return shutil.get_terminal_size().columns


@lru_cache
def _supports_ansi_escape_sequences() -> bool:
    """Whether the current environment supports ANSI escape sequences."""
    if platform.system() != "Windows":
        return True
    return "WT_SESSION" in os.environ  # Windows Terminal supports ANSI escape sequences.


def _fill_line(text: str) -> str:
    """Turn the input text into a line that will fill the terminal."""
    if _supports_ansi_escape_sequences():
        return text + ANSI_CLEAR_LINE_TO_END
    width = _get_terminal_width()
    # Fill the line but leave one character for the cursor.
    n_spaces = width - len(text) % width - 1
    return text + " " * n_spaces


def _format_term_line(previous_line_end: str, text: str, spintext: str, *, ephemeral: bool) -> str:
    """Format a line to print to the terminal."""
    # fill with spaces until the very end, on one hand to clear a possible previous message,
    # but also to always have the cursor at the very end
    width = _get_terminal_width()
    usable = width - len(spintext) - 1  # the 1 is the cursor itself
    if len(text) > usable:
        if ephemeral:
            text = text[: usable - 1] + "…"
        elif spintext:
            # we need to rewrite the message with the spintext, use only the last line for
            # multiline messages, and ensure (again) that the last real line fits
            remaining_for_last_line = len(text) % width
            text = text[-remaining_for_last_line:]
            if len(text) > usable:
                text = text[: usable - 1] + "…"

    return previous_line_end + _fill_line(text + spintext)


class _Spinner(threading.Thread):
    """A supervisor thread that will repeat long-standing messages with a spinner besides it.

    This will be a long-lived single thread that will supervise each message received
    through the `supervise` method, and when it stays too long, the printer's `spin`
    will be called with that message and a text to "draw" a spinner, including the elapsed
    time.

    The timing related part of the code uses two constants: _SPINNER_THRESHOLD is how
    many seconds before activating the spinner for the message, and _SPINNER_DELAY is
    the time between `spin` calls.

    When a new message arrives (or None, to indicate that there is nothing to supervise) and
    the previous message was "being spinned", a last `spin` call will be done to clean
    the spinner.
    """

    def __init__(self, printer: Printer) -> None:
        super().__init__()
        # special flag used to stop the spinner thread
        self.stop_flag = object()

        # daemon mode, so if the app crashes this thread does not holds everything
        self.daemon = True

        # communication from the printer
        self.queue: queue.Queue[Any] = queue.Queue()

        # hold the printer, to make it spin
        self.printer = printer

        # a lock to wait the spinner to stop spinning
        self.lock = threading.Lock()

        # Keep the message under supervision available for examination.
        self._under_supervision: _MessageInfo | None = None

    def run(self) -> None:
        prv_msg = None
        t_init = time.time()
        while prv_msg is not self.stop_flag:
            try:
                new_msg = self.queue.get(timeout=_SPINNER_THRESHOLD)
            except queue.Empty:
                # waited too much, start to show a spinner (if have a previous message) until
                # we have further info
                if prv_msg is None or prv_msg.end_line:
                    continue
                spinchars = itertools.cycle("-\\|/")
                with self.lock:
                    while True:
                        t_delta = time.time() - t_init
                        spintext = f" {next(spinchars)} ({t_delta:.1f}s)"
                        self.printer.spin(prv_msg, spintext)
                        try:
                            new_msg = self.queue.get(timeout=_SPINNER_DELAY)
                        except queue.Empty:
                            # still nothing! keep going
                            continue
                        # got a new message: clean the spinner and exit from the spinning state
                        self.printer.spin(prv_msg, " ")
                        break

            prv_msg = new_msg
            t_init = time.time()

    def supervise(self, message: _MessageInfo | None) -> None:
        """Supervise a message to spin it if it remains too long."""
        # Don't bother the spinner if we're repeating the same message
        if message == self._under_supervision:
            return

        self._under_supervision = message
        self.queue.put(message)
        # (maybe) wait for the spinner to exit spinning state (which does some cleaning)
        self.lock.acquire()
        self.lock.release()

    def stop(self) -> None:
        """Stop self."""
        self.queue.put(self.stop_flag)
        self.join()


[docs] class Printer: """Handle writing the different messages to the different outputs (out, err and log). If TESTMODE is True, this class changes its behaviour: the spinner is never started, so there is no thread polluting messages when running tests if they take too long to run. """ def __init__(self, log_filepath: pathlib.Path) -> None: self.stopped = False # holder of the previous message self.prv_msg: _MessageInfo | None = None # open the log file (will be closed explicitly later) self.log = log_filepath.open("at", encoding="utf8") # keep account of output terminal streams with unfinished lines self.unfinished_stream: TextIO | None = None self.terminal_prefix = "" self.secrets: list[str] = [] # run the spinner supervisor self.spinner = _Spinner(self) if not TESTMODE: self.spinner.start() if _supports_ansi_escape_sequences() and _stream_is_terminal(sys.stderr): print(ANSI_HIDE_CURSOR, end="", file=sys.stderr, flush=True)
[docs] def set_terminal_prefix(self, prefix: str) -> None: """Set the string to be prepended to every message shown to the terminal.""" self.terminal_prefix = prefix
def _get_prefixed_message_text(self, message: _MessageInfo) -> str: """Get the message's text with the proper terminal prefix, if any.""" text = message.text prefix = message.terminal_prefix # Don't repeat text: can happen due to the spinner. if prefix and text != prefix: separator = ":: " # Don't duplicate the separator, which can come from multiple different # sources. if text.startswith(separator): separator = "" text = f"{prefix} {separator}{text}" return text def _get_line_end(self, spintext: str) -> str: """Get the end of line to use when writing a line to the terminal.""" if spintext: # forced to overwrite the previous message to present the spinner return "\r" if self.prv_msg is None or self.prv_msg.end_line: # first message, or previous message completed the line: start clean return "" if self.prv_msg.ephemeral: # the last one was ephemeral, overwrite it return "\r" # Previous line was ended; complete it. return "\n" def _write_line_terminal(self, message: _MessageInfo, *, spintext: str = "") -> None: """Write a simple line message to the screen.""" # prepare the text with (maybe) the timestamp and remove trailing spaces text = self._get_prefixed_message_text(message).rstrip() if message.use_timestamp: timestamp_str = message.created_at.isoformat(sep=" ", timespec="milliseconds") text = f"{timestamp_str} {text}" previous_line_end = self._get_line_end(spintext) if self.prv_msg and self.prv_msg.ephemeral and self.prv_msg.stream != message.stream: # If the last message's stream is different from this new one, # send a carriage return to the original stream only. print("\r", flush=True, file=self.prv_msg.stream, end="") previous_line_end = "" if self.prv_msg and previous_line_end == "\n": previous_line_end = "" print(flush=True, file=self.prv_msg.stream) # fill with spaces until the very end, on one hand to clear a possible previous message, # but also to always have the cursor at the very end width = _get_terminal_width() usable = width - len(spintext) - 1 # the 1 is the cursor itself if len(text) > usable: if message.ephemeral: text = text[: usable - 1] + "…" elif spintext: # we need to rewrite the message with the spintext, use only the last line for # multiline messages, and ensure (again) that the last real line fits remaining_for_last_line = len(text) % width text = text[-remaining_for_last_line:] if len(text) > usable: text = text[: usable - 1] + "…" # We don't need to rewrite the same ephemeral message repeatedly. should_overwrite = spintext or message.end_line or not message.ephemeral if should_overwrite or message != self.prv_msg: line = _format_term_line( previous_line_end, text, spintext, ephemeral=message.ephemeral ) print(line, end="", flush=True, file=message.stream) if message.end_line: # finish the just shown line, as we need a clean terminal for some external thing print(flush=True, file=message.stream) self.unfinished_stream = None else: self.unfinished_stream = message.stream def _write_line_captured(self, message: _MessageInfo) -> None: """Write a simple line message to a captured output.""" # prepare the text with (maybe) the timestamp if message.use_timestamp: timestamp_str = message.created_at.isoformat(sep=" ", timespec="milliseconds") text = timestamp_str + " " + message.text else: text = message.text print(text, file=message.stream) def _write_bar_terminal(self, message: _MessageInfo) -> None: """Write a progress bar to the screen.""" # prepare the text with (maybe) the timestamp if message.use_timestamp: timestamp_str = message.created_at.isoformat(sep=" ", timespec="milliseconds") text = timestamp_str + " " + message.text else: text = message.text if self.prv_msg is None or self.prv_msg.end_line: # first message, or previous message completed the line: start clean maybe_cr = "" elif self.prv_msg.ephemeral: # the last one was ephemeral, overwrite it maybe_cr = "\r" else: # complete the previous line, leaving that message ok maybe_cr = "" print(flush=True, file=self.prv_msg.stream) if message.bar_progress is None or message.bar_total is None: # pragma: no cover # Should not happen as the caller checks the message raise ValueError("Tried to write a bar message with invalid attributes") numerical_progress = f"{message.bar_progress}/{message.bar_total}" bar_percentage = min(message.bar_progress / message.bar_total, 1) # terminal size minus the text and numerical progress, and 5 (the cursor at the end, # two spaces before and after the bar, and two surrounding brackets) terminal_width = _get_terminal_width() bar_width = terminal_width - len(text) - len(numerical_progress) - 5 # only show the bar with progress if there is enough space, otherwise just the # message (truncated, if needed) if bar_width > 0: completed_width = math.floor(bar_width * min(bar_percentage, 100)) completed_bar = _PROGRESS_BAR_SYMBOL * completed_width empty_bar = " " * (bar_width - completed_width) line = f"{maybe_cr}{text} [{completed_bar}{empty_bar}] {numerical_progress}" else: text = text[: terminal_width - 1] # space for cursor line = f"{maybe_cr}{text}" print(line, end="", flush=True, file=message.stream) self.unfinished_stream = message.stream def _write_bar_captured(self, message: _MessageInfo) -> None: """Do not write any progress bar to the captured output.""" def _show(self, msg: _MessageInfo) -> None: """Show the composed message.""" # show the message in one way or the other only if there is a stream if msg.stream is None: return # the writing functions depend on the final output: if the stream is captured or it's # a real terminal write_line: Callable[[_MessageInfo], None] if _stream_is_terminal(msg.stream): write_line = self._write_line_terminal write_bar = self._write_bar_terminal else: write_line = self._write_line_captured write_bar = self._write_bar_captured if msg.bar_progress is None: # regular message, send it to the spinner and write it self.spinner.supervise(msg) write_line(msg) else: # progress bar, send None to the spinner (as it's not a "spinnable" message) # and write it self.spinner.supervise(None) write_bar(msg) self.prv_msg = msg def _log(self, message: _MessageInfo) -> None: """Write the line message to the log file.""" # prepare the text with (maybe) the timestamp timestamp_str = message.created_at.isoformat(sep=" ", timespec="milliseconds") self.log.write(f"{timestamp_str} {message.text}\n") # Flush the file: protect a bit in case of crashes, and multiprocess-based # parallelism. self.log.flush()
[docs] def spin(self, message: _MessageInfo, spintext: str) -> None: """Write a line message including a spin text, only to a terminal.""" if _stream_is_terminal(message.stream): self._write_line_terminal(message, spintext=spintext)
[docs] def show( # noqa: PLR0913 (too many parameters) self, stream: TextIO | None, text: str, *, ephemeral: bool = False, use_timestamp: bool = False, end_line: bool = False, avoid_logging: bool = False, ) -> None: """Show a text to the given stream if not stopped.""" if self.stopped: return text = self._apply_secrets(text) msg = _MessageInfo( stream=stream, text=text.rstrip(), ephemeral=ephemeral, use_timestamp=use_timestamp, end_line=end_line, terminal_prefix=self._apply_secrets(self.terminal_prefix), ) self._show(msg) if not avoid_logging: self._log(msg)
[docs] def progress_bar( self, stream: TextIO | None, text: str, *, progress: float, total: float, use_timestamp: bool, ) -> None: """Show a progress bar to the given stream.""" text = self._apply_secrets(text) msg = _MessageInfo( stream=stream, text=text.rstrip(), bar_progress=progress, bar_total=total, ephemeral=True, # so it gets eventually overwritten by other message use_timestamp=use_timestamp, ) self._show(msg)
[docs] def stop(self) -> None: """Stop the printing infrastructure. In detail: - stop the spinner - show the cursor - add a new line to the screen (if needed) - close the log file """ if not TESTMODE: self.spinner.stop() if _supports_ansi_escape_sequences() and _stream_is_terminal(sys.stderr): print(ANSI_SHOW_CURSOR, end="", file=sys.stderr, flush=True) if self.unfinished_stream is not None: # With unfinished_stream set, the prv_msg object is valid. if self.prv_msg is not None and self.prv_msg.ephemeral: # If the last printed message is of 'ephemeral' type, the stop # request must clean and reset the line. cleaner = " " * (_get_terminal_width() - 1) line = "\r" + cleaner + "\r" print(line, end="", flush=True, file=self.prv_msg.stream) else: # The last printed message is permanent. Leave the cursor on # the next clean line. print(flush=True, file=self.unfinished_stream) self.log.close() self.stopped = True
[docs] def set_secrets(self, secrets: list[str]) -> None: """Set the list of strings that should be masked out in all outputs.""" # Keep a copy, to protect against clients modifying the list on accident. self.secrets = secrets.copy()
def _apply_secrets(self, text: str) -> str: for secret in self.secrets: text = text.replace(secret, "*****") return text