# Copyright (c) 2026 Tobias Erbsland - https://erbsland.dev
# SPDX-License-Identifier: Apache-2.0
"""
ANSI escape parser implementation.
"""
from __future__ import annotations
import warnings
from typing import TYPE_CHECKING
from ._model import DEFAULT_STYLE, SGR_CODE_TO_BACKGROUND, SGR_CODE_TO_FOREGROUND
if TYPE_CHECKING:
from ._terminal import Terminal
[docs]
class ANSIParser:
"""Parser for ANSI control characters and escape sequences."""
def __init__(self, terminal: Terminal, warn_unknown: bool = False) -> None:
self._terminal = terminal
self._warn_unknown = warn_unknown
self._line_cleared_with_2k = False
self._collapse_capture_updates = False
[docs]
def feed(self, data: str, collapse_capture_updates: bool = False) -> None:
"""Parse and apply terminal updates for the provided text."""
self._collapse_capture_updates = collapse_capture_updates
index = 0
data_length = len(data)
while index < data_length:
character = data[index]
if character == "\x1b":
index = self._parse_escape(data, index)
continue
index = self._handle_regular_character(data, index)
def _parse_escape(self, data: str, start_index: int) -> int:
if start_index + 1 >= len(data):
return len(data)
marker = data[start_index + 1]
if marker == "[":
sequence_end = start_index + 2
while sequence_end < len(data):
code = ord(data[sequence_end])
if 0x40 <= code <= 0x7E:
parameters = data[start_index + 2 : sequence_end]
final = data[sequence_end]
self._handle_csi(parameters, final)
return sequence_end + 1
sequence_end += 1
return len(data)
if marker == "M":
self._terminal.reverse_index()
return start_index + 2
if marker == "7":
self._terminal.save_cursor_dec()
return start_index + 2
if marker == "8":
self._terminal.restore_cursor_dec()
return start_index + 2
self._warn(f"unsupported ESC sequence: ESC {marker!r}")
return start_index + 2
def _handle_regular_character(self, data: str, index: int) -> int:
character = data[index]
code = ord(character)
if code == 0x07 or code == 0x7F:
return index + 1
if code == 0x08:
self._terminal.cursor_left(1)
return index + 1
if code == 0x09:
self._terminal.horizontal_tab()
return index + 1
if code in (0x0A, 0x0B, 0x0C):
if self._collapse_capture_updates and self._line_cleared_with_2k and data.startswith("\n", index + 1):
self._terminal.carriage_return()
elif (
self._collapse_capture_updates
and self._line_cleared_with_2k
and self._next_starts_with_csi_2k(data, index + 1)
):
# Heuristic for captured TTY output where carriage returns were
# normalized to newlines: keep in-place progress updates on one line.
self._terminal.carriage_return()
else:
self._terminal.line_feed()
self._line_cleared_with_2k = False
return index + 1
if code == 0x0D:
self._terminal.carriage_return()
self._line_cleared_with_2k = False
return index + 1
if code < 0x20:
return index + 1
self._terminal.put_character(character)
return index + 1
def _handle_csi(self, parameters: str, final: str) -> None:
if parameters.startswith("?"):
self._handle_private_mode(parameters[1:], final)
return
values = self._parse_parameters(parameters)
if final == "A":
self._terminal.cursor_up(self._first_or_default(values, 1))
return
if final == "B":
self._terminal.cursor_down(self._first_or_default(values, 1))
return
if final == "C":
self._terminal.cursor_right(self._first_or_default(values, 1))
return
if final == "D":
self._terminal.cursor_left(self._first_or_default(values, 1))
return
if final == "E":
self._terminal.cursor_down(self._first_or_default(values, 1))
self._terminal.carriage_return()
return
if final == "F":
self._terminal.cursor_up(self._first_or_default(values, 1))
self._terminal.carriage_return()
return
if final == "G":
self._terminal.move_cursor_to_column(self._first_or_default(values, 1) - 1)
return
if final in ("H", "f"):
row = values[0] if len(values) > 0 and values[0] is not None else 1
column = values[1] if len(values) > 1 and values[1] is not None else 1
self._terminal.move_cursor_to(column - 1, row - 1)
return
if final == "J":
self._terminal.erase_in_display(self._first_or_default(values, 0))
return
if final == "K":
mode = self._first_or_default(values, 0)
self._terminal.erase_in_line(mode)
self._line_cleared_with_2k = mode == 2 and self._terminal.cursor.x == 0
return
if final == "m":
self._apply_sgr(values or [0])
return
if final == "n":
if self._first_or_default(values, 0) == 6:
self._terminal.report_cursor_position()
return
self._warn(f"unsupported CSI n query: {parameters!r}")
return
if final == "s":
self._terminal.save_cursor_sco()
return
if final == "u":
self._terminal.restore_cursor_sco()
return
self._warn(f"unsupported CSI sequence: ESC[{parameters}{final}")
def _handle_private_mode(self, parameters: str, final: str) -> None:
values = self._parse_parameters(parameters)
enabled = final == "h"
if final not in ("h", "l"):
self._warn(f"unsupported private mode operation: ESC[?{parameters}{final}")
return
for value in values:
if value is None:
continue
if value == 25:
self._terminal.cursor.visible = enabled
continue
if value in (47, 1049):
if enabled:
self._terminal.enable_alternate_buffer(save_cursor=value == 1049)
else:
self._terminal.disable_alternate_buffer(restore_cursor=value == 1049)
continue
self._warn(f"unsupported private mode: ESC[?{value}{final}")
def _apply_sgr(self, values: list[int | None]) -> None:
if not values:
values = [0]
style = self._terminal.cursor.style
for value in values:
code = 0 if value is None else value
if code == 0:
style = DEFAULT_STYLE
elif code == 1:
style = style.with_updates(bold=True)
elif code == 2:
style = style.with_updates(dim=True)
elif code == 3:
style = style.with_updates(italic=True)
elif code == 4:
style = style.with_updates(underline=True)
elif code == 5:
style = style.with_updates(blink=True)
elif code == 7:
style = style.with_updates(reverse=True)
elif code == 8:
style = style.with_updates(hidden=True)
elif code == 9:
style = style.with_updates(strike=True)
elif code == 22:
style = style.with_updates(bold=False, dim=False)
elif code == 23:
style = style.with_updates(italic=False)
elif code == 24:
style = style.with_updates(underline=False)
elif code == 25:
style = style.with_updates(blink=False)
elif code == 27:
style = style.with_updates(reverse=False)
elif code == 28:
style = style.with_updates(hidden=False)
elif code == 29:
style = style.with_updates(strike=False)
elif code == 39:
style = style.with_updates(foreground=None)
elif code == 49:
style = style.with_updates(background=None)
elif code in SGR_CODE_TO_FOREGROUND:
style = style.with_updates(foreground=SGR_CODE_TO_FOREGROUND[code])
elif code in SGR_CODE_TO_BACKGROUND:
style = style.with_updates(background=SGR_CODE_TO_BACKGROUND[code])
else:
self._warn(f"unsupported SGR code: {code}")
self._terminal.cursor.style = style
@staticmethod
def _parse_parameters(parameters: str) -> list[int | None]:
if not parameters:
return []
values: list[int | None] = []
for part in parameters.split(";"):
if part == "":
values.append(None)
continue
try:
values.append(int(part))
except ValueError:
values.append(None)
return values
@staticmethod
def _first_or_default(values: list[int | None], default: int) -> int:
if not values or values[0] is None:
return default
return values[0]
def _warn(self, message: str) -> None:
if self._warn_unknown:
warnings.warn(message, RuntimeWarning, stacklevel=2)
@staticmethod
def _next_starts_with_csi_2k(data: str, index: int) -> bool:
return data.startswith("\x1b[2K", index)