Source code for erbsland.ansi_convert._parser

#  Copyright (c) 2026 Tobias Erbsland - https://erbsland.dev
#  SPDX-License-Identifier: Apache-2.0


"""
ANSI escape parser implementation.
"""

from __future__ import annotations

import warnings
from typing import TYPE_CHECKING

from ._model import DEFAULT_STYLE, SGR_CODE_TO_BACKGROUND, SGR_CODE_TO_FOREGROUND

if TYPE_CHECKING:
    from ._terminal import Terminal


[docs] class ANSIParser: """Parser for ANSI control characters and escape sequences.""" def __init__(self, terminal: Terminal, warn_unknown: bool = False) -> None: self._terminal = terminal self._warn_unknown = warn_unknown self._line_cleared_with_2k = False self._collapse_capture_updates = False
[docs] def feed(self, data: str, collapse_capture_updates: bool = False) -> None: """Parse and apply terminal updates for the provided text.""" self._collapse_capture_updates = collapse_capture_updates index = 0 data_length = len(data) while index < data_length: character = data[index] if character == "\x1b": index = self._parse_escape(data, index) continue index = self._handle_regular_character(data, index)
def _parse_escape(self, data: str, start_index: int) -> int: if start_index + 1 >= len(data): return len(data) marker = data[start_index + 1] if marker == "[": sequence_end = start_index + 2 while sequence_end < len(data): code = ord(data[sequence_end]) if 0x40 <= code <= 0x7E: parameters = data[start_index + 2 : sequence_end] final = data[sequence_end] self._handle_csi(parameters, final) return sequence_end + 1 sequence_end += 1 return len(data) if marker == "M": self._terminal.reverse_index() return start_index + 2 if marker == "7": self._terminal.save_cursor_dec() return start_index + 2 if marker == "8": self._terminal.restore_cursor_dec() return start_index + 2 self._warn(f"unsupported ESC sequence: ESC {marker!r}") return start_index + 2 def _handle_regular_character(self, data: str, index: int) -> int: character = data[index] code = ord(character) if code == 0x07 or code == 0x7F: return index + 1 if code == 0x08: self._terminal.cursor_left(1) return index + 1 if code == 0x09: self._terminal.horizontal_tab() return index + 1 if code in (0x0A, 0x0B, 0x0C): if self._collapse_capture_updates and self._line_cleared_with_2k and data.startswith("\n", index + 1): self._terminal.carriage_return() elif ( self._collapse_capture_updates and self._line_cleared_with_2k and self._next_starts_with_csi_2k(data, index + 1) ): # Heuristic for captured TTY output where carriage returns were # normalized to newlines: keep in-place progress updates on one line. self._terminal.carriage_return() else: self._terminal.line_feed() self._line_cleared_with_2k = False return index + 1 if code == 0x0D: self._terminal.carriage_return() self._line_cleared_with_2k = False return index + 1 if code < 0x20: return index + 1 self._terminal.put_character(character) return index + 1 def _handle_csi(self, parameters: str, final: str) -> None: if parameters.startswith("?"): self._handle_private_mode(parameters[1:], final) return values = self._parse_parameters(parameters) if final == "A": self._terminal.cursor_up(self._first_or_default(values, 1)) return if final == "B": self._terminal.cursor_down(self._first_or_default(values, 1)) return if final == "C": self._terminal.cursor_right(self._first_or_default(values, 1)) return if final == "D": self._terminal.cursor_left(self._first_or_default(values, 1)) return if final == "E": self._terminal.cursor_down(self._first_or_default(values, 1)) self._terminal.carriage_return() return if final == "F": self._terminal.cursor_up(self._first_or_default(values, 1)) self._terminal.carriage_return() return if final == "G": self._terminal.move_cursor_to_column(self._first_or_default(values, 1) - 1) return if final in ("H", "f"): row = values[0] if len(values) > 0 and values[0] is not None else 1 column = values[1] if len(values) > 1 and values[1] is not None else 1 self._terminal.move_cursor_to(column - 1, row - 1) return if final == "J": self._terminal.erase_in_display(self._first_or_default(values, 0)) return if final == "K": mode = self._first_or_default(values, 0) self._terminal.erase_in_line(mode) self._line_cleared_with_2k = mode == 2 and self._terminal.cursor.x == 0 return if final == "m": self._apply_sgr(values or [0]) return if final == "n": if self._first_or_default(values, 0) == 6: self._terminal.report_cursor_position() return self._warn(f"unsupported CSI n query: {parameters!r}") return if final == "s": self._terminal.save_cursor_sco() return if final == "u": self._terminal.restore_cursor_sco() return self._warn(f"unsupported CSI sequence: ESC[{parameters}{final}") def _handle_private_mode(self, parameters: str, final: str) -> None: values = self._parse_parameters(parameters) enabled = final == "h" if final not in ("h", "l"): self._warn(f"unsupported private mode operation: ESC[?{parameters}{final}") return for value in values: if value is None: continue if value == 25: self._terminal.cursor.visible = enabled continue if value in (47, 1049): if enabled: self._terminal.enable_alternate_buffer(save_cursor=value == 1049) else: self._terminal.disable_alternate_buffer(restore_cursor=value == 1049) continue self._warn(f"unsupported private mode: ESC[?{value}{final}") def _apply_sgr(self, values: list[int | None]) -> None: if not values: values = [0] style = self._terminal.cursor.style for value in values: code = 0 if value is None else value if code == 0: style = DEFAULT_STYLE elif code == 1: style = style.with_updates(bold=True) elif code == 2: style = style.with_updates(dim=True) elif code == 3: style = style.with_updates(italic=True) elif code == 4: style = style.with_updates(underline=True) elif code == 5: style = style.with_updates(blink=True) elif code == 7: style = style.with_updates(reverse=True) elif code == 8: style = style.with_updates(hidden=True) elif code == 9: style = style.with_updates(strike=True) elif code == 22: style = style.with_updates(bold=False, dim=False) elif code == 23: style = style.with_updates(italic=False) elif code == 24: style = style.with_updates(underline=False) elif code == 25: style = style.with_updates(blink=False) elif code == 27: style = style.with_updates(reverse=False) elif code == 28: style = style.with_updates(hidden=False) elif code == 29: style = style.with_updates(strike=False) elif code == 39: style = style.with_updates(foreground=None) elif code == 49: style = style.with_updates(background=None) elif code in SGR_CODE_TO_FOREGROUND: style = style.with_updates(foreground=SGR_CODE_TO_FOREGROUND[code]) elif code in SGR_CODE_TO_BACKGROUND: style = style.with_updates(background=SGR_CODE_TO_BACKGROUND[code]) else: self._warn(f"unsupported SGR code: {code}") self._terminal.cursor.style = style @staticmethod def _parse_parameters(parameters: str) -> list[int | None]: if not parameters: return [] values: list[int | None] = [] for part in parameters.split(";"): if part == "": values.append(None) continue try: values.append(int(part)) except ValueError: values.append(None) return values @staticmethod def _first_or_default(values: list[int | None], default: int) -> int: if not values or values[0] is None: return default return values[0] def _warn(self, message: str) -> None: if self._warn_unknown: warnings.warn(message, RuntimeWarning, stacklevel=2) @staticmethod def _next_starts_with_csi_2k(data: str, index: int) -> bool: return data.startswith("\x1b[2K", index)