Source code for erbsland.ansi_convert._terminal

#  Copyright (c) 2026 Tobias Erbsland - https://erbsland.dev
#  SPDX-License-Identifier: Apache-2.0


"""
Public terminal emulator façade.
"""

from __future__ import annotations

from dataclasses import dataclass
from pathlib import Path

from ._converter import AnsiConverter, HtmlConverter, TextConverter
from ._model import Cursor, char_display_width
from ._parser import ANSIParser
from ._screen import ScreenBuffer, ScreenSnapshot


@dataclass(slots=True)
class _TerminalState:
    cursor: Cursor
    snapshot: ScreenSnapshot


[docs] class Terminal: """ANSI terminal emulator with scroll-back history and converters.""" def __init__( self, width: int = 120, height: int = 40, back_buffer_height: int = 2000, warn_unknown_sequences: bool = False, ) -> None: """ Create a new terminal emulator instance. :param width: Visible terminal columns. :param height: Visible terminal rows. :param back_buffer_height: Maximum number of history rows. :param warn_unknown_sequences: Emit `RuntimeWarning` for unsupported ANSI sequences when true. """ self.width = width self.height = height self.back_buffer_height = back_buffer_height self._main_buffer = ScreenBuffer(width, height, back_buffer_height) self._alternate_buffer = ScreenBuffer(width, height, 0) self._active_buffer = self._main_buffer self.cursor = Cursor() self._parser = ANSIParser(self, warn_unknown=warn_unknown_sequences) self.responses: list[str] = [] self._saved_cursor_dec: Cursor | None = None self._saved_cursor_sco: Cursor | None = None self._saved_main_state: _TerminalState | None = None @property def screen(self) -> ScreenBuffer: """Return the currently active screen buffer.""" return self._active_buffer
[docs] def write(self, data: str, collapse_capture_updates: bool = False) -> None: """Process terminal output text including ANSI escape sequences. :param data: Terminal output text. :param collapse_capture_updates: Apply a heuristic for text captures where carriage returns were normalized to newlines. """ self._parser.feed(data, collapse_capture_updates=collapse_capture_updates)
[docs] def writeFile( self, capture_file: str | Path, encoding: str = "utf-8", errors: str = "replace", collapse_capture_updates: bool | None = None, ) -> None: """Read and process terminal output from a capture file. The file is read as bytes to preserve raw carriage returns. If the capture appears to be a transcript with `CR`-only line endings (for example from `script`), it is normalized to newlines automatically. :param capture_file: Path to the capture file. :param encoding: Encoding used to decode bytes into text. :param errors: Decoder error handling strategy. :param collapse_capture_updates: Optional heuristic to collapse progress update lines when CR was normalized before parsing. If omitted, auto-detection is used for transcript-style captures. """ capture_path = Path(capture_file) data = capture_path.read_bytes().decode(encoding=encoding, errors=errors) looks_like_cr_only_transcript = "\n" not in data and "\r" in data and "\x1b" in data and data.count("\r") > 1 if looks_like_cr_only_transcript: data = data.replace("\r", "\n") if collapse_capture_updates is None: collapse_capture_updates = True if collapse_capture_updates is None: collapse_capture_updates = False self.write(data, collapse_capture_updates=collapse_capture_updates)
[docs] def to_text(self) -> str: """Convert current screen + history to plain text.""" return TextConverter().convert(self._active_buffer)
[docs] def to_ansi(self, esc_char: str = "\x1b") -> str: """Convert current screen + history to compact ANSI encoded text.""" return AnsiConverter(esc_char=esc_char).convert(self._active_buffer)
[docs] def to_html(self, class_prefix: str = "erbsland-ansi") -> str: """Convert current screen + history to compact HTML.""" return HtmlConverter(class_prefix=class_prefix).convert(self._active_buffer)
[docs] def clear(self) -> None: """Reset the active screen and cursor position.""" self._active_buffer.clear() self.cursor.x = 0 self.cursor.y = 0
[docs] def move_cursor_to(self, x: int, y: int) -> None: """Move cursor to an absolute position within visible screen bounds.""" self.cursor.x = max(0, min(self.width - 1, x)) self.cursor.y = max(0, min(self.height - 1, y))
[docs] def move_cursor_to_column(self, x: int) -> None: """Move cursor to a column in the current row.""" self.cursor.x = max(0, min(self.width - 1, x))
[docs] def cursor_up(self, lines: int) -> None: """Move cursor up by a number of rows.""" self.cursor.y = max(0, self.cursor.y - max(0, lines))
[docs] def cursor_down(self, lines: int) -> None: """Move cursor down by a number of rows.""" self.cursor.y = min(self.height - 1, self.cursor.y + max(0, lines))
[docs] def cursor_right(self, columns: int) -> None: """Move cursor right by a number of columns.""" self.cursor.x = min(self.width - 1, self.cursor.x + max(0, columns))
[docs] def cursor_left(self, columns: int) -> None: """Move cursor left by a number of columns.""" self.cursor.x = max(0, self.cursor.x - max(0, columns))
[docs] def carriage_return(self) -> None: """Move cursor to beginning of current line.""" self.cursor.x = 0
[docs] def line_feed(self) -> None: """Move cursor down one line and scroll if needed.""" self.cursor.x = 0 if self.cursor.y >= self.height - 1: self._active_buffer.scroll_up(1) else: self.cursor.y += 1
[docs] def reverse_index(self) -> None: """ESC M: move cursor one line up, scrolling the screen down if needed.""" if self.cursor.y > 0: self.cursor.y -= 1 return self._active_buffer.reverse_index()
[docs] def horizontal_tab(self) -> None: """Move cursor to next 8-column tab stop.""" self.cursor.x = min(self.width - 1, self.cursor.x) next_tab = ((self.cursor.x // 8) + 1) * 8 self.cursor.x = min(self.width - 1, next_tab)
[docs] def put_character(self, character: str) -> None: """Write one printable character at the cursor position.""" if self.cursor.x >= self.width: self._wrap_line() width = char_display_width(character) if width == 0: self._append_combining_mark(character) return if width == 2 and self.cursor.x >= self.width - 1: self._wrap_line() self._active_buffer.set_character( x=self.cursor.x, y=self.cursor.y, character=character, style=self.cursor.style, width=2 if width == 2 else 1, ) self.cursor.x += width
[docs] def erase_in_display(self, mode: int) -> None: """Execute CSI J with the active cursor position.""" self._active_buffer.erase_in_display(mode, self.cursor.x, self.cursor.y)
[docs] def erase_in_line(self, mode: int) -> None: """Execute CSI K with the active cursor position.""" self._active_buffer.erase_in_line(mode, self.cursor.x, self.cursor.y)
[docs] def save_cursor_dec(self) -> None: """ESC 7: save cursor and style state.""" self._saved_cursor_dec = self.cursor.copy()
[docs] def restore_cursor_dec(self) -> None: """ESC 8: restore cursor and style state.""" if self._saved_cursor_dec is not None: self.cursor = self._saved_cursor_dec.copy()
[docs] def save_cursor_sco(self) -> None: """CSI s: save cursor and style state.""" self._saved_cursor_sco = self.cursor.copy()
[docs] def restore_cursor_sco(self) -> None: """CSI u: restore cursor and style state.""" if self._saved_cursor_sco is not None: self.cursor = self._saved_cursor_sco.copy()
[docs] def report_cursor_position(self) -> None: """CSI 6n response emulation stored in `responses`.""" column = min(self.width - 1, self.cursor.x) + 1 self.responses.append(f"\x1b[{self.cursor.y + 1};{column}R")
[docs] def enable_alternate_buffer(self, save_cursor: bool = True) -> None: """Enable alternate screen mode (`?47h` / `?1049h`).""" if self._active_buffer is self._alternate_buffer: return saved_cursor = self.cursor.copy() if save_cursor else Cursor() self._saved_main_state = _TerminalState( cursor=saved_cursor, snapshot=self._main_buffer.snapshot(), ) self._alternate_buffer = ScreenBuffer(self.width, self.height, 0) self._active_buffer = self._alternate_buffer self.cursor.x = 0 self.cursor.y = 0
[docs] def disable_alternate_buffer(self, restore_cursor: bool = True) -> None: """Disable alternate screen mode (`?47l` / `?1049l`).""" if self._active_buffer is self._main_buffer: return self._active_buffer = self._main_buffer if self._saved_main_state is not None: self._main_buffer.restore(self._saved_main_state.snapshot) if restore_cursor: self.cursor = self._saved_main_state.cursor.copy() self._saved_main_state = None
def _append_combining_mark(self, combining_mark: str) -> None: if self.cursor.x > 0: self._active_buffer.append_combining(self.cursor.x - 1, self.cursor.y, combining_mark) return if self.cursor.y > 0: self._active_buffer.append_combining(self.width - 1, self.cursor.y - 1, combining_mark) def _wrap_line(self) -> None: self.cursor.x = 0 if self.cursor.y >= self.height - 1: self._active_buffer.scroll_up(1) else: self.cursor.y += 1