# Copyright (c) 2026 Tobias Erbsland - https://erbsland.dev
# SPDX-License-Identifier: Apache-2.0
"""
Public terminal emulator façade.
"""
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from ._converter import AnsiConverter, HtmlConverter, TextConverter
from ._model import Cursor, char_display_width
from ._parser import ANSIParser
from ._screen import ScreenBuffer, ScreenSnapshot
@dataclass(slots=True)
class _TerminalState:
cursor: Cursor
snapshot: ScreenSnapshot
[docs]
class Terminal:
"""ANSI terminal emulator with scroll-back history and converters."""
def __init__(
self,
width: int = 120,
height: int = 40,
back_buffer_height: int = 2000,
warn_unknown_sequences: bool = False,
) -> None:
"""
Create a new terminal emulator instance.
:param width: Visible terminal columns.
:param height: Visible terminal rows.
:param back_buffer_height: Maximum number of history rows.
:param warn_unknown_sequences: Emit `RuntimeWarning` for unsupported ANSI
sequences when true.
"""
self.width = width
self.height = height
self.back_buffer_height = back_buffer_height
self._main_buffer = ScreenBuffer(width, height, back_buffer_height)
self._alternate_buffer = ScreenBuffer(width, height, 0)
self._active_buffer = self._main_buffer
self.cursor = Cursor()
self._parser = ANSIParser(self, warn_unknown=warn_unknown_sequences)
self.responses: list[str] = []
self._saved_cursor_dec: Cursor | None = None
self._saved_cursor_sco: Cursor | None = None
self._saved_main_state: _TerminalState | None = None
@property
def screen(self) -> ScreenBuffer:
"""Return the currently active screen buffer."""
return self._active_buffer
[docs]
def write(self, data: str, collapse_capture_updates: bool = False) -> None:
"""Process terminal output text including ANSI escape sequences.
:param data: Terminal output text.
:param collapse_capture_updates: Apply a heuristic for text captures
where carriage returns were normalized to newlines.
"""
self._parser.feed(data, collapse_capture_updates=collapse_capture_updates)
[docs]
def writeFile(
self,
capture_file: str | Path,
encoding: str = "utf-8",
errors: str = "replace",
collapse_capture_updates: bool | None = None,
) -> None:
"""Read and process terminal output from a capture file.
The file is read as bytes to preserve raw carriage returns. If the
capture appears to be a transcript with `CR`-only line endings (for
example from `script`), it is normalized to newlines automatically.
:param capture_file: Path to the capture file.
:param encoding: Encoding used to decode bytes into text.
:param errors: Decoder error handling strategy.
:param collapse_capture_updates: Optional heuristic to collapse
progress update lines when CR was normalized before parsing.
If omitted, auto-detection is used for transcript-style captures.
"""
capture_path = Path(capture_file)
data = capture_path.read_bytes().decode(encoding=encoding, errors=errors)
looks_like_cr_only_transcript = "\n" not in data and "\r" in data and "\x1b" in data and data.count("\r") > 1
if looks_like_cr_only_transcript:
data = data.replace("\r", "\n")
if collapse_capture_updates is None:
collapse_capture_updates = True
if collapse_capture_updates is None:
collapse_capture_updates = False
self.write(data, collapse_capture_updates=collapse_capture_updates)
[docs]
def to_text(self) -> str:
"""Convert current screen + history to plain text."""
return TextConverter().convert(self._active_buffer)
[docs]
def to_ansi(self, esc_char: str = "\x1b") -> str:
"""Convert current screen + history to compact ANSI encoded text."""
return AnsiConverter(esc_char=esc_char).convert(self._active_buffer)
[docs]
def to_html(self, class_prefix: str = "erbsland-ansi") -> str:
"""Convert current screen + history to compact HTML."""
return HtmlConverter(class_prefix=class_prefix).convert(self._active_buffer)
[docs]
def clear(self) -> None:
"""Reset the active screen and cursor position."""
self._active_buffer.clear()
self.cursor.x = 0
self.cursor.y = 0
[docs]
def move_cursor_to(self, x: int, y: int) -> None:
"""Move cursor to an absolute position within visible screen bounds."""
self.cursor.x = max(0, min(self.width - 1, x))
self.cursor.y = max(0, min(self.height - 1, y))
[docs]
def move_cursor_to_column(self, x: int) -> None:
"""Move cursor to a column in the current row."""
self.cursor.x = max(0, min(self.width - 1, x))
[docs]
def cursor_up(self, lines: int) -> None:
"""Move cursor up by a number of rows."""
self.cursor.y = max(0, self.cursor.y - max(0, lines))
[docs]
def cursor_down(self, lines: int) -> None:
"""Move cursor down by a number of rows."""
self.cursor.y = min(self.height - 1, self.cursor.y + max(0, lines))
[docs]
def cursor_right(self, columns: int) -> None:
"""Move cursor right by a number of columns."""
self.cursor.x = min(self.width - 1, self.cursor.x + max(0, columns))
[docs]
def cursor_left(self, columns: int) -> None:
"""Move cursor left by a number of columns."""
self.cursor.x = max(0, self.cursor.x - max(0, columns))
[docs]
def carriage_return(self) -> None:
"""Move cursor to beginning of current line."""
self.cursor.x = 0
[docs]
def line_feed(self) -> None:
"""Move cursor down one line and scroll if needed."""
self.cursor.x = 0
if self.cursor.y >= self.height - 1:
self._active_buffer.scroll_up(1)
else:
self.cursor.y += 1
[docs]
def reverse_index(self) -> None:
"""ESC M: move cursor one line up, scrolling the screen down if needed."""
if self.cursor.y > 0:
self.cursor.y -= 1
return
self._active_buffer.reverse_index()
[docs]
def horizontal_tab(self) -> None:
"""Move cursor to next 8-column tab stop."""
self.cursor.x = min(self.width - 1, self.cursor.x)
next_tab = ((self.cursor.x // 8) + 1) * 8
self.cursor.x = min(self.width - 1, next_tab)
[docs]
def put_character(self, character: str) -> None:
"""Write one printable character at the cursor position."""
if self.cursor.x >= self.width:
self._wrap_line()
width = char_display_width(character)
if width == 0:
self._append_combining_mark(character)
return
if width == 2 and self.cursor.x >= self.width - 1:
self._wrap_line()
self._active_buffer.set_character(
x=self.cursor.x,
y=self.cursor.y,
character=character,
style=self.cursor.style,
width=2 if width == 2 else 1,
)
self.cursor.x += width
[docs]
def erase_in_display(self, mode: int) -> None:
"""Execute CSI J with the active cursor position."""
self._active_buffer.erase_in_display(mode, self.cursor.x, self.cursor.y)
[docs]
def erase_in_line(self, mode: int) -> None:
"""Execute CSI K with the active cursor position."""
self._active_buffer.erase_in_line(mode, self.cursor.x, self.cursor.y)
[docs]
def save_cursor_dec(self) -> None:
"""ESC 7: save cursor and style state."""
self._saved_cursor_dec = self.cursor.copy()
[docs]
def restore_cursor_dec(self) -> None:
"""ESC 8: restore cursor and style state."""
if self._saved_cursor_dec is not None:
self.cursor = self._saved_cursor_dec.copy()
[docs]
def save_cursor_sco(self) -> None:
"""CSI s: save cursor and style state."""
self._saved_cursor_sco = self.cursor.copy()
[docs]
def restore_cursor_sco(self) -> None:
"""CSI u: restore cursor and style state."""
if self._saved_cursor_sco is not None:
self.cursor = self._saved_cursor_sco.copy()
[docs]
def report_cursor_position(self) -> None:
"""CSI 6n response emulation stored in `responses`."""
column = min(self.width - 1, self.cursor.x) + 1
self.responses.append(f"\x1b[{self.cursor.y + 1};{column}R")
[docs]
def enable_alternate_buffer(self, save_cursor: bool = True) -> None:
"""Enable alternate screen mode (`?47h` / `?1049h`)."""
if self._active_buffer is self._alternate_buffer:
return
saved_cursor = self.cursor.copy() if save_cursor else Cursor()
self._saved_main_state = _TerminalState(
cursor=saved_cursor,
snapshot=self._main_buffer.snapshot(),
)
self._alternate_buffer = ScreenBuffer(self.width, self.height, 0)
self._active_buffer = self._alternate_buffer
self.cursor.x = 0
self.cursor.y = 0
[docs]
def disable_alternate_buffer(self, restore_cursor: bool = True) -> None:
"""Disable alternate screen mode (`?47l` / `?1049l`)."""
if self._active_buffer is self._main_buffer:
return
self._active_buffer = self._main_buffer
if self._saved_main_state is not None:
self._main_buffer.restore(self._saved_main_state.snapshot)
if restore_cursor:
self.cursor = self._saved_main_state.cursor.copy()
self._saved_main_state = None
def _append_combining_mark(self, combining_mark: str) -> None:
if self.cursor.x > 0:
self._active_buffer.append_combining(self.cursor.x - 1, self.cursor.y, combining_mark)
return
if self.cursor.y > 0:
self._active_buffer.append_combining(self.width - 1, self.cursor.y - 1, combining_mark)
def _wrap_line(self) -> None:
self.cursor.x = 0
if self.cursor.y >= self.height - 1:
self._active_buffer.scroll_up(1)
else:
self.cursor.y += 1