Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/beige-forks-judge.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"chat": patch
---

Fix Slack structured streaming when `thread.post(stream)` is called from a handler created by an interactive (`block_actions`) payload.
The team ID is now resolved from `team.id` in addition to `team_id` / `team`.
5 changes: 1 addition & 4 deletions packages/adapter-slack/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -436,10 +436,7 @@ settings:
When streaming in an assistant thread, you can attach Block Kit elements to the final message:

```typescript
const raw = message.raw as { team_id?: string; team?: string };
await thread.adapter.stream(thread.id, textStream, {
recipientUserId: message.author.userId,
recipientTeamId: raw.team_id ?? raw.team,
await thread.post(textStream, {
stopBlocks: [
{ type: "actions", elements: [{ type: "button", text: { type: "plain_text", text: "Retry" }, action_id: "retry" }] },
],
Expand Down
1 change: 1 addition & 0 deletions packages/adapter-slack/src/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2632,6 +2632,7 @@ interface MockableClient {
update: ReturnType<typeof vi.fn>;
delete: ReturnType<typeof vi.fn>;
};
chatStream: ReturnType<typeof vi.fn>;
conversations: {
open: ReturnType<typeof vi.fn>;
replies: ReturnType<typeof vi.fn>;
Expand Down
102 changes: 89 additions & 13 deletions packages/chat/src/thread.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
import { Card } from "./cards";
import type { Message } from "./message";
import {
createMockAdapter,
createMockState,
Expand All @@ -9,7 +10,7 @@ import {
import { Plan } from "./plan";
import { StreamingPlan } from "./streaming-plan";
import { ThreadImpl } from "./thread";
import type { Adapter, Message, ScheduledMessage, StreamChunk } from "./types";
import type { Adapter, ScheduledMessage, StreamChunk } from "./types";
import { NotImplementedError } from "./types";

describe("ThreadImpl", () => {
Expand Down Expand Up @@ -595,36 +596,56 @@ describe("ThreadImpl", () => {
}
});

it("should pass stream options from current message context", async () => {
it.each([
{
expectedTeamId: "T123",
label: "team_id",
raw: { team_id: "T123", type: "app_mention" },
},
{
expectedTeamId: "T234",
label: "team string",
raw: { team: "T234", type: "message" },
},
{
expectedTeamId: "T345",
label: "team.id",
raw: { team: { id: "T345" }, type: "block_actions" },
},
{
expectedTeamId: "T456",
label: "user.team_id fallback",
raw: {
type: "block_actions",
user: { team_id: "T456" },
},
},
])("should pass stream options from Slack current message context via $label", async ({
raw,
expectedTeamId,
}) => {
const mockStream = vi.fn().mockResolvedValue({
id: "msg-stream",
threadId: "t1",
raw: "Hello",
});
mockAdapter.stream = mockStream;

// Create thread with current message context
const threadWithContext = new ThreadImpl({
id: "slack:C123:1234.5678",
adapter: mockAdapter,
channelId: "C123",
stateAdapter: mockState,
currentMessage: {
id: "original-msg",
threadId: "slack:C123:1234.5678",
text: "test",
formatted: { type: "root", children: [] },
raw: { team_id: "T123" },
currentMessage: createTestMessage("original-msg", "test", {
raw,
author: {
userId: "U456",
userName: "user",
fullName: "Test User",
isBot: false,
isMe: false,
},
metadata: { dateSent: new Date(), edited: false },
attachments: [],
},
}),
});

const textStream = createTextStream(["Hello"]);
Expand All @@ -635,11 +656,66 @@ describe("ThreadImpl", () => {
expect.any(Object),
expect.objectContaining({
recipientUserId: "U456",
recipientTeamId: "T123",
recipientTeamId: expectedTeamId,
})
);
});

it("should forward structured stream chunks to adapter.stream from an action-created thread", async () => {
const mockStream = vi.fn().mockResolvedValue({
id: "msg-stream",
threadId: "t1",
raw: "Hello",
});
mockAdapter.stream = mockStream;

const threadWithActionContext = new ThreadImpl({
id: "slack:C123:1234.5678",
adapter: mockAdapter,
channelId: "C123",
stateAdapter: mockState,
currentMessage: createTestMessage("action-msg", "", {
raw: {
team: { domain: "workspace", id: "T123" },
type: "block_actions",
},
author: {
userId: "U456",
userName: "user",
fullName: "Test User",
isBot: false,
isMe: false,
},
}),
});

const taskChunk: StreamChunk = {
id: "task-1",
status: "pending",
title: "Thinking",
type: "task_update",
};
async function* structuredStream(): AsyncIterable<string | StreamChunk> {
yield "Picking option...";
yield taskChunk;
}

await threadWithActionContext.post(
structuredStream() as unknown as AsyncIterable<string>
);

expect(mockStream).toHaveBeenCalledTimes(1);
const [, passedStream] = mockStream.mock.calls[0];
const collected: Array<string | StreamChunk> = [];
for await (const chunk of passedStream as AsyncIterable<
string | StreamChunk
>) {
collected.push(chunk);
}
expect(collected).toContain("Picking option...");
expect(collected).toContainEqual(taskChunk);
});

it("should pass StreamingPlan PostableObject options to adapter.stream", async () => {
const mockStream = vi.fn().mockResolvedValue({
id: "msg-stream",
Expand Down
53 changes: 47 additions & 6 deletions packages/chat/src/thread.ts
Original file line number Diff line number Diff line change
Expand Up @@ -574,12 +574,12 @@ export class ThreadImpl<TState = Record<string, unknown>>
const options: StreamOptions = { ...callerOptions };
if (this._currentMessage) {
options.recipientUserId = this._currentMessage.author.userId;
// Extract teamId from raw Slack payload
const raw = this._currentMessage.raw as {
team_id?: string;
team?: string;
};
options.recipientTeamId = raw?.team_id ?? raw?.team;
// recipientTeamId is only consumed by the Slack adapter; other adapters
// ignore it. Derivation is Slack-specific because `currentMessage.raw`
// shape varies across Slack webhook types (message events vs block_actions).
options.recipientTeamId = this.extractSlackRecipientTeamId(
this._currentMessage.raw
);
}

// Use native streaming if adapter supports it
Expand Down Expand Up @@ -650,6 +650,47 @@ export class ThreadImpl<TState = Record<string, unknown>>
return this.fallbackStream(textOnlyStream, options);
}

/**
* Slack payloads carry the workspace ID in a few different shapes depending on
* the webhook type:
* - Message events: `team_id` or `team` as a string
* - `block_actions` payloads: `team.id` (object), with `user.team_id` as a fallback
*/
private extractSlackRecipientTeamId(raw: unknown): string | undefined {
if (!raw || typeof raw !== "object") {
return undefined;
}

const payload = raw as {
team?: { id?: unknown } | string;
team_id?: unknown;
user?: { team_id?: unknown };
};

if (typeof payload.team_id === "string" && payload.team_id) {
return payload.team_id;
}

if (typeof payload.team === "string" && payload.team) {
return payload.team;
}

if (
payload.team &&
typeof payload.team === "object" &&
typeof payload.team.id === "string" &&
payload.team.id
) {
return payload.team.id;
}

if (typeof payload.user?.team_id === "string" && payload.user.team_id) {
return payload.user.team_id;
}

return undefined;
}

async startTyping(status?: string): Promise<void> {
await this.adapter.startTyping(this.id, status);
}
Expand Down
Loading