-
Notifications
You must be signed in to change notification settings - Fork 851
Expand file tree
/
Copy path__init__.py
More file actions
425 lines (366 loc) · 16.6 KB
/
__init__.py
File metadata and controls
425 lines (366 loc) · 16.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
from __future__ import annotations
import asyncio
import shlex
from collections.abc import Awaitable, Coroutine
from dataclasses import dataclass
from enum import Enum
from typing import Any
from kosong.chat_provider import APIStatusError, ChatProviderError
from loguru import logger
from rich.console import Group, RenderableType
from rich.panel import Panel
from rich.table import Table
from rich.text import Text
from kimi_cli.soul import LLMNotSet, LLMNotSupported, MaxStepsReached, RunCancelled, Soul, run_soul
from kimi_cli.soul.kimisoul import KimiSoul
from kimi_cli.ui.shell import update as _update_mod
from kimi_cli.ui.shell.console import console
from kimi_cli.ui.shell.echo import render_user_echo_text
from kimi_cli.ui.shell.prompt import (
CustomPromptSession,
PromptMode,
UserInput,
toast,
)
from kimi_cli.ui.shell.replay import replay_recent_history
from kimi_cli.ui.shell.slash import registry as shell_slash_registry
from kimi_cli.ui.shell.slash import shell_mode_registry
from kimi_cli.ui.shell.update import LATEST_VERSION_FILE, UpdateResult, do_update, semver_tuple
from kimi_cli.ui.shell.visualize import visualize
from kimi_cli.utils.envvar import get_env_bool
from kimi_cli.utils.logging import open_original_stderr
from kimi_cli.utils.signals import install_sigint_handler
from kimi_cli.utils.slashcmd import SlashCommand, SlashCommandCall, parse_slash_command_call
from kimi_cli.utils.subprocess_env import get_clean_env
from kimi_cli.utils.term import ensure_new_line, ensure_tty_sane, maybe_disable_kitty_keyboard_protocol
from kimi_cli.wire.types import ContentPart, StatusUpdate
class Shell:
def __init__(self, soul: Soul, welcome_info: list[WelcomeInfoItem] | None = None):
self.soul = soul
self._welcome_info = list(welcome_info or [])
self._background_tasks: set[asyncio.Task[Any]] = set()
self._prompt_session: CustomPromptSession | None = None
self._available_slash_commands: dict[str, SlashCommand[Any]] = {
**{cmd.name: cmd for cmd in soul.available_slash_commands},
**{cmd.name: cmd for cmd in shell_slash_registry.list_commands()},
}
"""Shell-level slash commands + soul-level slash commands. Name to command mapping."""
@property
def available_slash_commands(self) -> dict[str, SlashCommand[Any]]:
"""Get all available slash commands, including shell-level and soul-level commands."""
return self._available_slash_commands
@staticmethod
def _should_exit_input(user_input: UserInput) -> bool:
return user_input.command.strip() in {"exit", "quit", "/exit", "/quit"}
@staticmethod
def _agent_slash_command_call(user_input: UserInput) -> SlashCommandCall | None:
if user_input.mode != PromptMode.AGENT:
return None
display_call = parse_slash_command_call(user_input.command)
if display_call is None:
return None
resolved_call = parse_slash_command_call(user_input.resolved_command)
if resolved_call is None or resolved_call.name != display_call.name:
return display_call
return resolved_call
@staticmethod
def _should_echo_agent_input(user_input: UserInput) -> bool:
if user_input.mode != PromptMode.AGENT:
return False
if Shell._should_exit_input(user_input):
return False
return Shell._agent_slash_command_call(user_input) is None
@staticmethod
def _echo_agent_input(user_input: UserInput) -> None:
console.print(render_user_echo_text(user_input.command))
async def run(self, command: str | None = None) -> bool:
if command is not None:
# run single command and exit
logger.info("Running agent with command: {command}", command=command)
return await self.run_soul_command(command)
# Start auto-update background task if not disabled
if get_env_bool("KIMI_CLI_NO_AUTO_UPDATE"):
logger.info("Auto-update disabled by KIMI_CLI_NO_AUTO_UPDATE environment variable")
else:
self._start_background_task(self._auto_update())
_print_welcome_info(self.soul.name or "Kimi Code CLI", self._welcome_info)
if isinstance(self.soul, KimiSoul):
await replay_recent_history(
self.soul.context.history,
wire_file=self.soul.wire_file,
)
async def _plan_mode_toggle() -> bool:
if isinstance(self.soul, KimiSoul):
return await self.soul.toggle_plan_mode_from_manual()
return False
maybe_disable_kitty_keyboard_protocol()
with CustomPromptSession(
status_provider=lambda: self.soul.status,
model_capabilities=self.soul.model_capabilities or set(),
model_name=self.soul.model_name,
thinking=self.soul.thinking or False,
agent_mode_slash_commands=list(self._available_slash_commands.values()),
shell_mode_slash_commands=shell_mode_registry.list_commands(),
editor_command_provider=lambda: (
self.soul.runtime.config.default_editor if isinstance(self.soul, KimiSoul) else ""
),
plan_mode_toggle_callback=_plan_mode_toggle,
) as prompt_session:
self._prompt_session = prompt_session
try:
while True:
ensure_tty_sane()
try:
ensure_new_line()
user_input = await prompt_session.prompt()
except KeyboardInterrupt:
logger.debug("Exiting by KeyboardInterrupt")
console.print("[grey50]Tip: press Ctrl-D or send 'exit' to quit[/grey50]")
continue
except EOFError:
logger.debug("Exiting by EOF")
console.print("Bye!")
break
if not user_input:
logger.debug("Got empty input, skipping")
continue
logger.debug("Got user input: {user_input}", user_input=user_input)
if self._should_echo_agent_input(user_input):
self._echo_agent_input(user_input)
if self._should_exit_input(user_input):
logger.debug("Exiting by slash command")
console.print("Bye!")
break
if user_input.mode == PromptMode.SHELL:
await self._run_shell_command(user_input.command)
continue
if slash_cmd_call := self._agent_slash_command_call(user_input):
await self._run_slash_command(slash_cmd_call)
continue
await self.run_soul_command(user_input.content)
console.print()
finally:
self._prompt_session = None
ensure_tty_sane()
return True
async def _run_shell_command(self, command: str) -> None:
"""Run a shell command in foreground."""
if not command.strip():
return
# Check if it's an allowed slash command in shell mode
if slash_cmd_call := parse_slash_command_call(command):
if shell_mode_registry.find_command(slash_cmd_call.name):
await self._run_slash_command(slash_cmd_call)
return
else:
console.print(
f'[yellow]"/{slash_cmd_call.name}" is not available in shell mode. '
"Press Ctrl-X to switch to agent mode.[/yellow]"
)
return
# Check if user is trying to use 'cd' command
stripped_cmd = command.strip()
split_cmd: list[str] | None = None
try:
split_cmd = shlex.split(stripped_cmd)
except ValueError as exc:
logger.debug("Failed to parse shell command for cd check: {error}", error=exc)
if split_cmd and len(split_cmd) == 2 and split_cmd[0] == "cd":
console.print(
"[yellow]Warning: Directory changes are not preserved across command executions."
"[/yellow]"
)
return
logger.info("Running shell command: {cmd}", cmd=command)
proc: asyncio.subprocess.Process | None = None
def _handler():
logger.debug("SIGINT received.")
if proc:
proc.terminate()
loop = asyncio.get_running_loop()
remove_sigint = install_sigint_handler(loop, _handler)
try:
# TODO: For the sake of simplicity, we now use `create_subprocess_shell`.
# Later we should consider making this behave like a real shell.
with open_original_stderr() as stderr:
kwargs: dict[str, Any] = {}
if stderr is not None:
kwargs["stderr"] = stderr
proc = await asyncio.create_subprocess_shell(command, env=get_clean_env(), **kwargs)
await proc.wait()
except Exception as e:
logger.exception("Failed to run shell command:")
console.print(f"[red]Failed to run shell command: {e}[/red]")
finally:
remove_sigint()
async def _run_slash_command(self, command_call: SlashCommandCall) -> None:
from kimi_cli.cli import Reload, SwitchToWeb
if command_call.name not in self._available_slash_commands:
logger.info("Unknown slash command /{command}", command=command_call.name)
console.print(
f'[red]Unknown slash command "/{command_call.name}", '
'type "/" for all available commands[/red]'
)
return
command = shell_slash_registry.find_command(command_call.name)
if command is None:
# the input is a soul-level slash command call
await self.run_soul_command(command_call.raw_input)
return
logger.debug(
"Running shell-level slash command: /{command} with args: {args}",
command=command_call.name,
args=command_call.args,
)
try:
ret = command.func(self, command_call.args)
if isinstance(ret, Awaitable):
await ret
except (Reload, SwitchToWeb):
# just propagate
raise
except (asyncio.CancelledError, KeyboardInterrupt):
# Handle Ctrl-C during slash command execution, return to shell prompt
logger.debug("Slash command interrupted by KeyboardInterrupt")
console.print("[red]Interrupted by user[/red]")
except Exception as e:
logger.exception("Unknown error:")
console.print(f"[red]Unknown error: {e}[/red]")
raise # re-raise unknown error
async def run_soul_command(self, user_input: str | list[ContentPart]) -> bool:
"""
Run the soul and handle any known exceptions.
Returns:
bool: Whether the run is successful.
"""
logger.info("Running soul with user input: {user_input}", user_input=user_input)
cancel_event = asyncio.Event()
def _handler():
logger.debug("SIGINT received.")
cancel_event.set()
loop = asyncio.get_running_loop()
remove_sigint = install_sigint_handler(loop, _handler)
try:
snap = self.soul.status
await run_soul(
self.soul,
user_input,
lambda wire: visualize(
wire.ui_side(merge=False), # shell UI maintain its own merge buffer
initial_status=StatusUpdate(
context_usage=snap.context_usage,
context_tokens=snap.context_tokens,
max_context_tokens=snap.max_context_tokens,
),
cancel_event=cancel_event,
prompt_session=self._prompt_session,
steer=self.soul.steer if isinstance(self.soul, KimiSoul) else None,
),
cancel_event,
self.soul.wire_file if isinstance(self.soul, KimiSoul) else None,
)
return True
except LLMNotSet:
logger.exception("LLM not set:")
console.print('[red]LLM not set, send "/login" to login[/red]')
except LLMNotSupported as e:
# actually unsupported input/mode should already be blocked by prompt session
logger.exception("LLM not supported:")
console.print(f"[red]{e}[/red]")
except ChatProviderError as e:
logger.exception("LLM provider error:")
if isinstance(e, APIStatusError) and e.status_code == 401:
console.print("[red]Authorization failed, please check your login status[/red]")
elif isinstance(e, APIStatusError) and e.status_code == 402:
console.print("[red]Membership expired, please renew your plan[/red]")
elif isinstance(e, APIStatusError) and e.status_code == 403:
console.print("[red]Quota exceeded, please upgrade your plan or retry later[/red]")
else:
console.print(f"[red]LLM provider error: {e}[/red]")
except MaxStepsReached as e:
logger.warning("Max steps reached: {n_steps}", n_steps=e.n_steps)
console.print(f"[yellow]{e}[/yellow]")
except RunCancelled:
logger.info("Cancelled by user")
console.print("[red]Interrupted by user[/red]")
except Exception as e:
logger.exception("Unexpected error:")
console.print(f"[red]Unexpected error: {e}[/red]")
raise # re-raise unknown error
finally:
remove_sigint()
return False
async def _auto_update(self) -> None:
toast("checking for updates...", topic="update", duration=2.0)
result = await do_update(print=False, check_only=True)
if result == UpdateResult.UPDATE_AVAILABLE:
while True:
toast(
f"new version found, run `{_update_mod.UPGRADE_COMMAND}` to upgrade",
topic="update",
duration=30.0,
)
await asyncio.sleep(60.0)
elif result == UpdateResult.UPDATED:
toast("auto updated, restart to use the new version", topic="update", duration=5.0)
def _start_background_task(self, coro: Coroutine[Any, Any, Any]) -> asyncio.Task[Any]:
task = asyncio.create_task(coro)
self._background_tasks.add(task)
def _cleanup(t: asyncio.Task[Any]) -> None:
self._background_tasks.discard(t)
try:
t.result()
except asyncio.CancelledError:
pass
except Exception:
logger.exception("Background task failed:")
task.add_done_callback(_cleanup)
return task
_KIMI_BLUE = "dodger_blue1"
_LOGO = f"""\
[{_KIMI_BLUE}]\
▐█▛█▛█▌
▐█████▌\
[{_KIMI_BLUE}]\
"""
@dataclass(slots=True)
class WelcomeInfoItem:
class Level(Enum):
INFO = "grey50"
WARN = "yellow"
ERROR = "red"
name: str
value: str
level: Level = Level.INFO
def _print_welcome_info(name: str, info_items: list[WelcomeInfoItem]) -> None:
head = Text.from_markup("Welcome to Kimi Code CLI!")
help_text = Text.from_markup("[grey50]Send /help for help information.[/grey50]")
# Use Table for precise width control
logo = Text.from_markup(_LOGO)
table = Table(show_header=False, show_edge=False, box=None, padding=(0, 1), expand=False)
table.add_column(justify="left")
table.add_column(justify="left")
table.add_row(logo, Group(head, help_text))
rows: list[RenderableType] = [table]
if info_items:
rows.append(Text("")) # empty line
for item in info_items:
rows.append(Text(f"{item.name}: {item.value}", style=item.level.value))
if LATEST_VERSION_FILE.exists():
from kimi_cli.constant import VERSION as current_version
latest_version = LATEST_VERSION_FILE.read_text(encoding="utf-8").strip()
if semver_tuple(latest_version) > semver_tuple(current_version):
rows.append(
Text.from_markup(
f"\n[yellow]New version available: {latest_version}. "
f"Please run `{_update_mod.UPGRADE_COMMAND}` to upgrade.[/yellow]"
)
)
console.print(
Panel(
Group(*rows),
border_style=_KIMI_BLUE,
expand=False,
padding=(1, 2),
)
)