Skip to content

Commit da65854

Browse files
authored
Merge pull request sipeed#1291 from statxc/feat/telegram-forum-topics
feat(telegram): support forum topics with per-topic session isolation
2 parents c0e755c + af6652c commit da65854

2 files changed

Lines changed: 262 additions & 31 deletions

File tree

pkg/channels/telegram/telegram.go

Lines changed: 73 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) err
168168
return channels.ErrNotRunning
169169
}
170170

171-
chatID, err := parseChatID(msg.ChatID)
171+
chatID, threadID, err := parseTelegramChatID(msg.ChatID)
172172
if err != nil {
173173
return fmt.Errorf("invalid chat ID %s: %w", msg.ChatID, channels.ErrSendFailed)
174174
}
@@ -200,7 +200,7 @@ func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) err
200200
continue
201201
}
202202

203-
if err := c.sendHTMLChunk(ctx, chatID, htmlContent, chunk); err != nil {
203+
if err := c.sendHTMLChunk(ctx, chatID, threadID, htmlContent, chunk); err != nil {
204204
return err
205205
}
206206
}
@@ -210,9 +210,12 @@ func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) err
210210

211211
// sendHTMLChunk sends a single HTML message, falling back to the original
212212
// markdown as plain text on parse failure so users never see raw HTML tags.
213-
func (c *TelegramChannel) sendHTMLChunk(ctx context.Context, chatID int64, htmlContent, mdFallback string) error {
213+
func (c *TelegramChannel) sendHTMLChunk(
214+
ctx context.Context, chatID int64, threadID int, htmlContent, mdFallback string,
215+
) error {
214216
tgMsg := tu.Message(tu.ID(chatID), htmlContent)
215217
tgMsg.ParseMode = telego.ModeHTML
218+
tgMsg.MessageThreadID = threadID
216219

217220
if _, err := c.bot.SendMessage(ctx, tgMsg); err != nil {
218221
logger.ErrorCF("telegram", "HTML parse failed, falling back to plain text", map[string]any{
@@ -232,13 +235,16 @@ func (c *TelegramChannel) sendHTMLChunk(ctx context.Context, chatID int64, htmlC
232235
// (Telegram's typing indicator expires after ~5s) in a background goroutine.
233236
// The returned stop function is idempotent and cancels the goroutine.
234237
func (c *TelegramChannel) StartTyping(ctx context.Context, chatID string) (func(), error) {
235-
cid, err := parseChatID(chatID)
238+
cid, threadID, err := parseTelegramChatID(chatID)
236239
if err != nil {
237240
return func() {}, err
238241
}
239242

243+
action := tu.ChatAction(tu.ID(cid), telego.ChatActionTyping)
244+
action.MessageThreadID = threadID
245+
240246
// Send the first typing action immediately
241-
_ = c.bot.SendChatAction(ctx, tu.ChatAction(tu.ID(cid), telego.ChatActionTyping))
247+
_ = c.bot.SendChatAction(ctx, action)
242248

243249
typingCtx, cancel := context.WithCancel(ctx)
244250
go func() {
@@ -249,7 +255,9 @@ func (c *TelegramChannel) StartTyping(ctx context.Context, chatID string) (func(
249255
case <-typingCtx.Done():
250256
return
251257
case <-ticker.C:
252-
_ = c.bot.SendChatAction(typingCtx, tu.ChatAction(tu.ID(cid), telego.ChatActionTyping))
258+
a := tu.ChatAction(tu.ID(cid), telego.ChatActionTyping)
259+
a.MessageThreadID = threadID
260+
_ = c.bot.SendChatAction(typingCtx, a)
253261
}
254262
}
255263
}()
@@ -259,7 +267,7 @@ func (c *TelegramChannel) StartTyping(ctx context.Context, chatID string) (func(
259267

260268
// EditMessage implements channels.MessageEditor.
261269
func (c *TelegramChannel) EditMessage(ctx context.Context, chatID string, messageID string, content string) error {
262-
cid, err := parseChatID(chatID)
270+
cid, _, err := parseTelegramChatID(chatID)
263271
if err != nil {
264272
return err
265273
}
@@ -288,12 +296,14 @@ func (c *TelegramChannel) SendPlaceholder(ctx context.Context, chatID string) (s
288296
text = "Thinking... 💭"
289297
}
290298

291-
cid, err := parseChatID(chatID)
299+
cid, threadID, err := parseTelegramChatID(chatID)
292300
if err != nil {
293301
return "", err
294302
}
295303

296-
pMsg, err := c.bot.SendMessage(ctx, tu.Message(tu.ID(cid), text))
304+
phMsg := tu.Message(tu.ID(cid), text)
305+
phMsg.MessageThreadID = threadID
306+
pMsg, err := c.bot.SendMessage(ctx, phMsg)
297307
if err != nil {
298308
return "", err
299309
}
@@ -307,7 +317,7 @@ func (c *TelegramChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMe
307317
return channels.ErrNotRunning
308318
}
309319

310-
chatID, err := parseChatID(msg.ChatID)
320+
chatID, threadID, err := parseTelegramChatID(msg.ChatID)
311321
if err != nil {
312322
return fmt.Errorf("invalid chat ID %s: %w", msg.ChatID, channels.ErrSendFailed)
313323
}
@@ -339,30 +349,34 @@ func (c *TelegramChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMe
339349
switch part.Type {
340350
case "image":
341351
params := &telego.SendPhotoParams{
342-
ChatID: tu.ID(chatID),
343-
Photo: telego.InputFile{File: file},
344-
Caption: part.Caption,
352+
ChatID: tu.ID(chatID),
353+
MessageThreadID: threadID,
354+
Photo: telego.InputFile{File: file},
355+
Caption: part.Caption,
345356
}
346357
_, err = c.bot.SendPhoto(ctx, params)
347358
case "audio":
348359
params := &telego.SendAudioParams{
349-
ChatID: tu.ID(chatID),
350-
Audio: telego.InputFile{File: file},
351-
Caption: part.Caption,
360+
ChatID: tu.ID(chatID),
361+
MessageThreadID: threadID,
362+
Audio: telego.InputFile{File: file},
363+
Caption: part.Caption,
352364
}
353365
_, err = c.bot.SendAudio(ctx, params)
354366
case "video":
355367
params := &telego.SendVideoParams{
356-
ChatID: tu.ID(chatID),
357-
Video: telego.InputFile{File: file},
358-
Caption: part.Caption,
368+
ChatID: tu.ID(chatID),
369+
MessageThreadID: threadID,
370+
Video: telego.InputFile{File: file},
371+
Caption: part.Caption,
359372
}
360373
_, err = c.bot.SendVideo(ctx, params)
361374
default: // "file" or unknown types
362375
params := &telego.SendDocumentParams{
363-
ChatID: tu.ID(chatID),
364-
Document: telego.InputFile{File: file},
365-
Caption: part.Caption,
376+
ChatID: tu.ID(chatID),
377+
MessageThreadID: threadID,
378+
Document: telego.InputFile{File: file},
379+
Caption: part.Caption,
366380
}
367381
_, err = c.bot.SendDocument(ctx, params)
368382
}
@@ -506,19 +520,28 @@ func (c *TelegramChannel) handleMessage(ctx context.Context, message *telego.Mes
506520
content = cleaned
507521
}
508522

523+
// For forum topics, embed the thread ID as "chatID/threadID" so replies
524+
// route to the correct topic and each topic gets its own session.
525+
// Only forum groups (IsForum) are handled; regular group reply threads
526+
// must share one session per group.
527+
compositeChatID := fmt.Sprintf("%d", chatID)
528+
threadID := message.MessageThreadID
529+
if message.Chat.IsForum && threadID != 0 {
530+
compositeChatID = fmt.Sprintf("%d/%d", chatID, threadID)
531+
}
532+
509533
logger.DebugCF("telegram", "Received message", map[string]any{
510534
"sender_id": sender.CanonicalID,
511-
"chat_id": fmt.Sprintf("%d", chatID),
535+
"chat_id": compositeChatID,
536+
"thread_id": threadID,
512537
"preview": utils.Truncate(content, 50),
513538
})
514539

515-
// Placeholder is now auto-triggered by BaseChannel.HandleMessage via PlaceholderCapable
516-
517540
peerKind := "direct"
518541
peerID := fmt.Sprintf("%d", user.ID)
519542
if message.Chat.Type != "private" {
520543
peerKind = "group"
521-
peerID = fmt.Sprintf("%d", chatID)
544+
peerID = compositeChatID
522545
}
523546

524547
peer := bus.Peer{Kind: peerKind, ID: peerID}
@@ -531,11 +554,17 @@ func (c *TelegramChannel) handleMessage(ctx context.Context, message *telego.Mes
531554
"is_group": fmt.Sprintf("%t", message.Chat.Type != "private"),
532555
}
533556

557+
// Set parent_peer metadata for per-topic agent binding.
558+
if message.Chat.IsForum && threadID != 0 {
559+
metadata["parent_peer_kind"] = "topic"
560+
metadata["parent_peer_id"] = fmt.Sprintf("%d", threadID)
561+
}
562+
534563
c.HandleMessage(c.ctx,
535564
peer,
536565
messageID,
537566
platformID,
538-
fmt.Sprintf("%d", chatID),
567+
compositeChatID,
539568
content,
540569
mediaPaths,
541570
metadata,
@@ -583,10 +612,23 @@ func (c *TelegramChannel) downloadFile(ctx context.Context, fileID, ext string)
583612
return c.downloadFileWithInfo(file, ext)
584613
}
585614

586-
func parseChatID(chatIDStr string) (int64, error) {
587-
var id int64
588-
_, err := fmt.Sscanf(chatIDStr, "%d", &id)
589-
return id, err
615+
// parseTelegramChatID splits "chatID/threadID" into its components.
616+
// Returns threadID=0 when no "/" is present (non-forum messages).
617+
func parseTelegramChatID(chatID string) (int64, int, error) {
618+
idx := strings.Index(chatID, "/")
619+
if idx == -1 {
620+
cid, err := strconv.ParseInt(chatID, 10, 64)
621+
return cid, 0, err
622+
}
623+
cid, err := strconv.ParseInt(chatID[:idx], 10, 64)
624+
if err != nil {
625+
return 0, 0, err
626+
}
627+
tid, err := strconv.Atoi(chatID[idx+1:])
628+
if err != nil {
629+
return 0, 0, fmt.Errorf("invalid thread ID in chat ID %q: %w", chatID, err)
630+
}
631+
return cid, tid, nil
590632
}
591633

592634
func markdownToTelegramHTML(text string) string {

0 commit comments

Comments
 (0)