Skip to content

Commit 9710df8

Browse files
authored
Merge pull request sipeed#1390 from kiannidev/fix/1323-telegram-endless-typing
fix(telegram): stop typing indicator when LLM fails or hangs
2 parents a378d68 + d2a5411 commit 9710df8

4 files changed

Lines changed: 114 additions & 48 deletions

File tree

pkg/agent/loop.go

Lines changed: 53 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -278,58 +278,64 @@ func (al *AgentLoop) Run(ctx context.Context) error {
278278
return nil
279279
}
280280
// Process message
281-
// TODO: Re-enable media cleanup after inbound media is properly consumed by the agent.
282-
// Currently disabled because files are deleted before the LLM can access their content.
283-
// defer func() {
284-
// if al.mediaStore != nil && msg.MediaScope != "" {
285-
// if releaseErr := al.mediaStore.ReleaseAll(msg.MediaScope); releaseErr != nil {
286-
// logger.WarnCF("agent", "Failed to release media", map[string]any{
287-
// "scope": msg.MediaScope,
288-
// "error": releaseErr.Error(),
289-
// })
290-
// }
291-
// }
292-
// }()
293-
294-
response, err := al.processMessage(ctx, msg)
295-
if err != nil {
296-
response = fmt.Sprintf("Error processing message: %v", err)
297-
}
298-
299-
if response != "" {
300-
// Check if the message tool already sent a response during this round.
301-
// If so, skip publishing to avoid duplicate messages to the user.
302-
// Use default agent's tools to check (message tool is shared).
303-
alreadySent := false
304-
defaultAgent := al.GetRegistry().GetDefaultAgent()
305-
if defaultAgent != nil {
306-
if tool, ok := defaultAgent.Tools.Get("message"); ok {
307-
if mt, ok := tool.(*tools.MessageTool); ok {
308-
alreadySent = mt.HasSentInRound()
309-
}
281+
func() {
282+
defer func() {
283+
if al.channelManager != nil {
284+
al.channelManager.InvokeTypingStop(msg.Channel, msg.ChatID)
310285
}
286+
}()
287+
// TODO: Re-enable media cleanup after inbound media is properly consumed by the agent.
288+
// Currently disabled because files are deleted before the LLM can access their content.
289+
// defer func() {
290+
// if al.mediaStore != nil && msg.MediaScope != "" {
291+
// if releaseErr := al.mediaStore.ReleaseAll(msg.MediaScope); releaseErr != nil {
292+
// logger.WarnCF("agent", "Failed to release media", map[string]any{
293+
// "scope": msg.MediaScope,
294+
// "error": releaseErr.Error(),
295+
// })
296+
// }
297+
// }
298+
// }()
299+
300+
response, err := al.processMessage(ctx, msg)
301+
if err != nil {
302+
response = fmt.Sprintf("Error processing message: %v", err)
311303
}
312304

313-
if !alreadySent {
314-
al.bus.PublishOutbound(ctx, bus.OutboundMessage{
315-
Channel: msg.Channel,
316-
ChatID: msg.ChatID,
317-
Content: response,
318-
})
319-
logger.InfoCF("agent", "Published outbound response",
320-
map[string]any{
321-
"channel": msg.Channel,
322-
"chat_id": msg.ChatID,
323-
"content_len": len(response),
305+
if response != "" {
306+
// Check if the message tool already sent a response during this round.
307+
// If so, skip publishing to avoid duplicate messages to the user.
308+
// Use default agent's tools to check (message tool is shared).
309+
alreadySent := false
310+
defaultAgent := al.GetRegistry().GetDefaultAgent()
311+
if defaultAgent != nil {
312+
if tool, ok := defaultAgent.Tools.Get("message"); ok {
313+
if mt, ok := tool.(*tools.MessageTool); ok {
314+
alreadySent = mt.HasSentInRound()
315+
}
316+
}
317+
}
318+
if !alreadySent {
319+
al.bus.PublishOutbound(ctx, bus.OutboundMessage{
320+
Channel: msg.Channel,
321+
ChatID: msg.ChatID,
322+
Content: response,
324323
})
325-
} else {
326-
logger.DebugCF(
327-
"agent",
328-
"Skipped outbound (message tool already sent)",
329-
map[string]any{"channel": msg.Channel},
330-
)
324+
logger.InfoCF("agent", "Published outbound response",
325+
map[string]any{
326+
"channel": msg.Channel,
327+
"chat_id": msg.ChatID,
328+
"content_len": len(response),
329+
})
330+
} else {
331+
logger.DebugCF(
332+
"agent",
333+
"Skipped outbound (message tool already sent)",
334+
map[string]any{"channel": msg.Channel},
335+
)
336+
}
331337
}
332-
}
338+
}()
333339
default:
334340
time.Sleep(time.Microsecond * 200)
335341
}

pkg/channels/manager.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,19 @@ func (m *Manager) RecordTypingStop(channel, chatID string, stop func()) {
136136
}
137137
}
138138

139+
// InvokeTypingStop invokes the registered typing stop function for the given channel and chatID.
140+
// It is safe to call even when no typing indicator is active (no-op).
141+
// Used by the agent loop to stop typing when processing completes (success, error, or panic),
142+
// regardless of whether an outbound message is published.
143+
func (m *Manager) InvokeTypingStop(channel, chatID string) {
144+
key := channel + ":" + chatID
145+
if v, loaded := m.typingStops.LoadAndDelete(key); loaded {
146+
if entry, ok := v.(typingEntry); ok {
147+
entry.stop()
148+
}
149+
}
150+
}
151+
139152
// RecordReactionUndo registers a reaction undo function for later invocation.
140153
// Implements PlaceholderRecorder.
141154
func (m *Manager) RecordReactionUndo(channel, chatID string, undo func()) {

pkg/channels/manager_test.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -511,6 +511,43 @@ func TestPreSend_PlaceholderEditFails_FallsThrough(t *testing.T) {
511511
}
512512
}
513513

514+
func TestInvokeTypingStop_CallsRegisteredStop(t *testing.T) {
515+
m := newTestManager()
516+
var stopCalled bool
517+
518+
m.RecordTypingStop("telegram", "chat123", func() {
519+
stopCalled = true
520+
})
521+
522+
m.InvokeTypingStop("telegram", "chat123")
523+
524+
if !stopCalled {
525+
t.Fatal("expected typing stop func to be called")
526+
}
527+
}
528+
529+
func TestInvokeTypingStop_NoOpWhenNoEntry(t *testing.T) {
530+
m := newTestManager()
531+
// Should not panic
532+
m.InvokeTypingStop("telegram", "nonexistent")
533+
}
534+
535+
func TestInvokeTypingStop_Idempotent(t *testing.T) {
536+
m := newTestManager()
537+
var callCount int
538+
539+
m.RecordTypingStop("telegram", "chat123", func() {
540+
callCount++
541+
})
542+
543+
m.InvokeTypingStop("telegram", "chat123")
544+
m.InvokeTypingStop("telegram", "chat123") // Second call: entry already removed, no-op
545+
546+
if callCount != 1 {
547+
t.Fatalf("expected stop to be called once, got %d", callCount)
548+
}
549+
}
550+
514551
func TestPreSend_TypingStopCalled(t *testing.T) {
515552
m := newTestManager()
516553
var stopCalled bool

pkg/channels/telegram/telegram.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -302,10 +302,17 @@ func (c *TelegramChannel) sendChunk(
302302
return nil
303303
}
304304

305+
// maxTypingDuration limits how long the typing indicator can run.
306+
// Prevents endless typing when the LLM fails/hangs and preSend never invokes cancel.
307+
// Matches channels.Manager's typingStopTTL (5 min) so behavior is consistent.
308+
const maxTypingDuration = 5 * time.Minute
309+
305310
// StartTyping implements channels.TypingCapable.
306311
// It sends ChatAction(typing) immediately and then repeats every 4 seconds
307312
// (Telegram's typing indicator expires after ~5s) in a background goroutine.
308313
// The returned stop function is idempotent and cancels the goroutine.
314+
// The goroutine also exits automatically after maxTypingDuration if cancel is
315+
// never called (e.g. when the LLM fails or times out without publishing).
309316
func (c *TelegramChannel) StartTyping(ctx context.Context, chatID string) (func(), error) {
310317
cid, threadID, err := parseTelegramChatID(chatID)
311318
if err != nil {
@@ -319,12 +326,15 @@ func (c *TelegramChannel) StartTyping(ctx context.Context, chatID string) (func(
319326
_ = c.bot.SendChatAction(ctx, action)
320327

321328
typingCtx, cancel := context.WithCancel(ctx)
329+
// Cap lifetime so the goroutine cannot run indefinitely if cancel is never called
330+
maxCtx, maxCancel := context.WithTimeout(typingCtx, maxTypingDuration)
322331
go func() {
332+
defer maxCancel()
323333
ticker := time.NewTicker(4 * time.Second)
324334
defer ticker.Stop()
325335
for {
326336
select {
327-
case <-typingCtx.Done():
337+
case <-maxCtx.Done():
328338
return
329339
case <-ticker.C:
330340
a := tu.ChatAction(tu.ID(cid), telego.ChatActionTyping)

0 commit comments

Comments
 (0)