-
Notifications
You must be signed in to change notification settings - Fork 851
Expand file tree
/
Copy pathterm.py
More file actions
202 lines (150 loc) · 5.48 KB
/
term.py
File metadata and controls
202 lines (150 loc) · 5.48 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
from __future__ import annotations
import contextlib
import os
import re
import sys
import time
from kimi_cli.utils.envvar import get_env_bool
def ensure_new_line() -> None:
"""Ensure the next prompt starts at column 0 regardless of prior command output."""
if not sys.stdout.isatty() or not sys.stdin.isatty():
return
needs_break = True
if sys.platform == "win32":
column = _cursor_column_windows()
needs_break = column not in (None, 0)
else:
column = _cursor_column_unix()
needs_break = column not in (None, 1)
if needs_break:
_write_newline()
def ensure_tty_sane() -> None:
"""Restore basic tty settings so Ctrl-C works after raw-mode operations."""
if sys.platform == "win32" or not sys.stdin.isatty():
return
try:
import termios
except Exception:
return
try:
fd = sys.stdin.fileno()
attrs = termios.tcgetattr(fd)
except Exception:
return
desired = termios.ISIG | termios.IEXTEN | termios.ICANON | termios.ECHO
if (attrs[3] & desired) == desired:
return
attrs[3] |= desired
with contextlib.suppress(OSError):
termios.tcsetattr(fd, termios.TCSADRAIN, attrs)
def maybe_disable_kitty_keyboard_protocol() -> None:
"""Disable kitty keyboard protocol in terminals that send CSI-u sequences.
This is primarily a workaround for VS Code's integrated terminal, which can
emit CSI-u key sequences that prompt_toolkit doesn't parse.
"""
if sys.platform == "win32":
return
if not sys.stdout.isatty() or not sys.stdin.isatty():
return
if not _should_disable_kitty_keyboard_protocol():
return
_write_escape("\x1b[<u")
def _should_disable_kitty_keyboard_protocol() -> bool:
env_value = os.getenv("KIMI_CLI_DISABLE_KITTY_KEYS")
if env_value is not None:
return get_env_bool("KIMI_CLI_DISABLE_KITTY_KEYS", default=False)
return _is_vscode_terminal()
def _is_vscode_terminal() -> bool:
return os.getenv("TERM_PROGRAM") == "vscode" or "VSCODE_IPC_HOOK_CLI" in os.environ
def _cursor_position_unix() -> tuple[int, int] | None:
"""Get cursor position (row, column) on Unix. Both are 1-indexed."""
assert sys.platform != "win32"
import select
import termios
import tty
_CURSOR_QUERY = "\x1b[6n"
_CURSOR_POSITION_RE = re.compile(r"\x1b\[(\d+);(\d+)R")
fd = sys.stdin.fileno()
oldterm = termios.tcgetattr(fd)
try:
tty.setcbreak(fd)
sys.stdout.write(_CURSOR_QUERY)
sys.stdout.flush()
response = ""
deadline = time.monotonic() + 0.2
while time.monotonic() < deadline:
timeout = max(0.01, deadline - time.monotonic())
ready, _, _ = select.select([sys.stdin], [], [], timeout)
if not ready:
continue
try:
chunk = os.read(fd, 32)
except OSError:
break
if not chunk:
break
response += chunk.decode(encoding="utf-8", errors="ignore")
match = _CURSOR_POSITION_RE.search(response)
if match:
return int(match.group(1)), int(match.group(2))
finally:
termios.tcsetattr(fd, termios.TCSADRAIN, oldterm)
return None
def _cursor_column_unix() -> int | None:
pos = _cursor_position_unix()
return pos[1] if pos else None
def _cursor_position_windows() -> tuple[int, int] | None:
"""Get cursor position (row, column) on Windows. Both are 1-indexed."""
assert sys.platform == "win32"
import ctypes
from ctypes import wintypes
kernel32 = ctypes.windll.kernel32
_STD_OUTPUT_HANDLE = -11 # Windows API constant for standard output handle
handle = kernel32.GetStdHandle(_STD_OUTPUT_HANDLE)
invalid_handle_value = ctypes.c_void_p(-1).value
if handle in (0, invalid_handle_value):
return None
class COORD(ctypes.Structure):
_fields_ = [("X", wintypes.SHORT), ("Y", wintypes.SHORT)]
class SMALL_RECT(ctypes.Structure):
_fields_ = [
("Left", wintypes.SHORT),
("Top", wintypes.SHORT),
("Right", wintypes.SHORT),
("Bottom", wintypes.SHORT),
]
class CONSOLE_SCREEN_BUFFER_INFO(ctypes.Structure):
_fields_ = [
("dwSize", COORD),
("dwCursorPosition", COORD),
("wAttributes", wintypes.WORD),
("srWindow", SMALL_RECT),
("dwMaximumWindowSize", COORD),
]
csbi = CONSOLE_SCREEN_BUFFER_INFO()
if not kernel32.GetConsoleScreenBufferInfo(handle, ctypes.byref(csbi)):
return None
# Windows returns 0-indexed, convert to 1-indexed for consistency
return int(csbi.dwCursorPosition.Y) + 1, int(csbi.dwCursorPosition.X) + 1
def _cursor_column_windows() -> int | None:
pos = _cursor_position_windows()
return pos[1] if pos else None
def _write_newline() -> None:
sys.stdout.write("\n")
sys.stdout.flush()
def _write_escape(value: str) -> None:
sys.stdout.write(value)
sys.stdout.flush()
def get_cursor_row() -> int | None:
"""Get the current cursor row (1-indexed)."""
if not sys.stdout.isatty() or not sys.stdin.isatty():
return None
if sys.platform == "win32":
pos = _cursor_position_windows()
else:
pos = _cursor_position_unix()
return pos[0] if pos else None
if __name__ == "__main__":
print("test", end="", flush=True)
ensure_new_line()
print("next line")