Skip to content

Commit b3bbef7

Browse files
authored
feat: add duplicate tool result protection and find_tool_call/2 (issue-553 PR 4/17) (#571)
1 parent 9301cdd commit b3bbef7

File tree

4 files changed

+104
-293
lines changed

4 files changed

+104
-293
lines changed

apps/frontman_server/lib/frontman_server/tasks.ex

Lines changed: 20 additions & 128 deletions
Original file line numberDiff line numberDiff line change
@@ -49,17 +49,6 @@ defmodule FrontmanServer.Tasks do
4949
{:ok, tasks}
5050
end
5151

52-
@doc """
53-
Checks if a task exists for the given scope.
54-
"""
55-
@spec task_exists?(Scope.t(), String.t()) :: boolean()
56-
def task_exists?(%Scope{} = scope, task_id) do
57-
case get_task_by_id(scope, task_id) do
58-
{:ok, _schema} -> true
59-
{:error, :not_found} -> false
60-
end
61-
end
62-
6352
@doc """
6453
Gets a task by ID. Returns the task with interactions loaded.
6554
@@ -141,14 +130,6 @@ defmodule FrontmanServer.Tasks do
141130
@spec topic(String.t()) :: String.t()
142131
def topic(task_id), do: "task:#{task_id}"
143132

144-
@doc """
145-
Subscribes the calling process to task events.
146-
"""
147-
@spec subscribe(atom(), String.t()) :: :ok | {:error, term()}
148-
def subscribe(pubsub, task_id) do
149-
Phoenix.PubSub.subscribe(pubsub, topic(task_id))
150-
end
151-
152133
@doc """
153134
Creates a new task and stores it.
154135
@@ -171,37 +152,6 @@ defmodule FrontmanServer.Tasks do
171152
end
172153
end
173154

174-
@doc """
175-
Gets all interactions for a task.
176-
177-
Requires authorization - scope.user.id must match task.user_id.
178-
"""
179-
@spec get_interactions(Scope.t(), String.t()) ::
180-
{:ok, [Interaction.t()]} | {:error, :not_found}
181-
def get_interactions(%Scope{} = scope, task_id) do
182-
with {:ok, _schema} <- get_task_by_id(scope, task_id) do
183-
{:ok, load_interactions(task_id)}
184-
end
185-
end
186-
187-
@doc """
188-
Gets LLM-formatted messages for a task.
189-
190-
Requires authorization - scope.user.id must match task.user_id.
191-
192-
Note: Project rules (AGENTS.md, etc.) are now appended to the system prompt
193-
in Prompts.build/1 rather than prepended to user messages. This ensures they
194-
appear after the base prompt and before context-specific guidance.
195-
"""
196-
@spec get_llm_messages(Scope.t(), String.t()) ::
197-
{:ok, list(map())} | {:error, :not_found}
198-
def get_llm_messages(%Scope{} = scope, task_id) do
199-
with {:ok, interactions} <- get_interactions(scope, task_id) do
200-
messages = Interaction.to_llm_messages(interactions)
201-
{:ok, messages}
202-
end
203-
end
204-
205155
@doc """
206156
Adds a discovered project rule to the task.
207157
@@ -212,7 +162,9 @@ defmodule FrontmanServer.Tasks do
212162
| {:error, :not_found}
213163
def add_discovered_project_rule(%Scope{} = scope, task_id, path, content) do
214164
with {:ok, schema} <- get_task_by_id(scope, task_id) do
215-
if discovered_project_rule_loaded?(scope, task_id, path) do
165+
interactions = load_interactions(task_id)
166+
167+
if rule_loaded?(interactions, path) do
216168
{:ok, :already_loaded}
217169
else
218170
interaction = Interaction.DiscoveredProjectRule.new(path, content)
@@ -221,27 +173,12 @@ defmodule FrontmanServer.Tasks do
221173
end
222174
end
223175

224-
@doc """
225-
Gets all discovered project rules for a task.
226-
"""
227-
@spec get_discovered_project_rules(Scope.t(), String.t()) ::
228-
{:ok, [Interaction.DiscoveredProjectRule.t()]} | {:error, :not_found}
229-
def get_discovered_project_rules(%Scope{} = scope, task_id) do
230-
with {:ok, interactions} <- get_interactions(scope, task_id) do
231-
rules = Enum.filter(interactions, &match?(%Interaction.DiscoveredProjectRule{}, &1))
232-
{:ok, rules}
233-
end
234-
end
235-
236-
@doc """
237-
Checks if a project rule with the given path has already been loaded.
238-
"""
239-
@spec discovered_project_rule_loaded?(Scope.t(), String.t(), String.t()) :: boolean()
240-
def discovered_project_rule_loaded?(%Scope{} = scope, task_id, path) do
241-
case get_discovered_project_rules(scope, task_id) do
242-
{:ok, rules} -> Enum.any?(rules, &(&1.path == path))
243-
{:error, _} -> false
244-
end
176+
@spec rule_loaded?([Interaction.t()], String.t()) :: boolean()
177+
defp rule_loaded?(interactions, path) do
178+
Enum.any?(interactions, fn
179+
%Interaction.DiscoveredProjectRule{path: p} -> p == path
180+
_ -> false
181+
end)
245182
end
246183

247184
@doc """
@@ -254,51 +191,17 @@ defmodule FrontmanServer.Tasks do
254191
| {:error, :not_found}
255192
def add_discovered_project_structure(%Scope{} = scope, task_id, summary) do
256193
with {:ok, schema} <- get_task_by_id(scope, task_id) do
257-
case get_discovered_project_structure(scope, task_id) do
258-
{:ok, nil} ->
259-
interaction = Interaction.DiscoveredProjectStructure.new(summary)
260-
append_interaction(schema, interaction)
261-
262-
{:ok, _existing} ->
263-
{:ok, :already_loaded}
194+
interactions = load_interactions(task_id)
264195

265-
{:error, _} ->
266-
interaction = Interaction.DiscoveredProjectStructure.new(summary)
267-
append_interaction(schema, interaction)
196+
if Enum.any?(interactions, &match?(%Interaction.DiscoveredProjectStructure{}, &1)) do
197+
{:ok, :already_loaded}
198+
else
199+
interaction = Interaction.DiscoveredProjectStructure.new(summary)
200+
append_interaction(schema, interaction)
268201
end
269202
end
270203
end
271204

272-
@doc """
273-
Gets the discovered project structure summary for a task, if any.
274-
"""
275-
@spec get_discovered_project_structure(Scope.t(), String.t()) ::
276-
{:ok, String.t() | nil} | {:error, :not_found}
277-
def get_discovered_project_structure(%Scope{} = scope, task_id) do
278-
with {:ok, interactions} <- get_interactions(scope, task_id) do
279-
summary =
280-
interactions
281-
|> Enum.find(&match?(%Interaction.DiscoveredProjectStructure{}, &1))
282-
|> case do
283-
nil -> nil
284-
struct -> struct.summary
285-
end
286-
287-
{:ok, summary}
288-
end
289-
end
290-
291-
@doc """
292-
Checks if any user messages in the task contain annotations.
293-
"""
294-
@spec has_annotations?(Scope.t(), String.t()) :: boolean()
295-
def has_annotations?(%Scope{} = scope, task_id) do
296-
case get_interactions(scope, task_id) do
297-
{:ok, interactions} -> Interaction.has_annotations?(interactions)
298-
{:error, _} -> false
299-
end
300-
end
301-
302205
@spec append_interaction(TaskSchema.t(), Interaction.t()) ::
303206
{:ok, Interaction.t()} | {:error, Ecto.Changeset.t()}
304207
defp append_interaction(%TaskSchema{id: task_id}, interaction) do
@@ -316,9 +219,8 @@ defmodule FrontmanServer.Tasks do
316219

317220
# Bump the task's updated_at so it sorts to the top of the sessions list
318221
defp touch_task(task_id) do
319-
import Ecto.Query
320-
321-
from(t in TaskSchema, where: t.id == ^task_id)
222+
TaskSchema
223+
|> TaskSchema.by_id(task_id)
322224
|> Repo.update_all(set: [updated_at: DateTime.utc_now(:second)])
323225
end
324226

@@ -356,18 +258,6 @@ defmodule FrontmanServer.Tasks do
356258
end
357259
end
358260

359-
@doc """
360-
Creates and appends an AgentSpawned interaction.
361-
"""
362-
@spec add_agent_spawned(Scope.t(), String.t(), map()) ::
363-
{:ok, Interaction.AgentSpawned.t()} | {:error, :not_found}
364-
def add_agent_spawned(%Scope{} = scope, task_id, config \\ %{}) do
365-
with {:ok, schema} <- get_task_by_id(scope, task_id) do
366-
interaction = Interaction.AgentSpawned.new(config)
367-
append_interaction(schema, interaction)
368-
end
369-
end
370-
371261
@doc """
372262
Creates and appends an AgentCompleted interaction.
373263
"""
@@ -396,9 +286,11 @@ defmodule FrontmanServer.Tasks do
396286
Creates and appends a ToolResult interaction.
397287
398288
Routes the result to the waiting executor so the agent can continue.
289+
Duplicate tool results for the same tool_call_id are prevented by a
290+
unique partial index on the interactions table.
399291
"""
400292
@spec add_tool_result(Scope.t(), String.t(), map(), term(), boolean()) ::
401-
{:ok, Interaction.ToolResult.t()} | {:error, :not_found}
293+
{:ok, Interaction.ToolResult.t()} | {:error, :not_found | Ecto.Changeset.t()}
402294
def add_tool_result(
403295
%Scope{} = scope,
404296
task_id,

apps/frontman_server/lib/frontman_server/tasks/interaction_schema.ex

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,10 @@ defmodule FrontmanServer.Tasks.InteractionSchema do
4949
|> validate_required([:task_id, :type, :data, :sequence])
5050
|> validate_inclusion(:type, Interaction.known_type_strings())
5151
|> foreign_key_constraint(:task_id)
52+
|> unique_constraint([:task_id, :data],
53+
name: :interactions_tool_result_uniqueness,
54+
message: "duplicate tool result for this tool_call_id"
55+
)
5256
end
5357

5458
# Reserve 6 decimal digits for the tiebreaker (0–999_999).
@@ -84,6 +88,27 @@ defmodule FrontmanServer.Tasks.InteractionSchema do
8488
from(i in query, where: i.task_id == ^task_id)
8589
end
8690

91+
@doc """
92+
Filters interactions by their type discriminator (e.g. "tool_call", "tool_result").
93+
"""
94+
@spec by_type(Ecto.Queryable.t(), String.t()) :: Ecto.Query.t()
95+
def by_type(query \\ __MODULE__, type) do
96+
from(i in query, where: i.type == ^type)
97+
end
98+
99+
@doc """
100+
Filters interactions by a JSONB data field value (e.g. `tool_call_id`, `tool_name`).
101+
"""
102+
@spec by_data_field(Ecto.Queryable.t(), String.t(), String.t()) :: Ecto.Query.t()
103+
def by_data_field(query \\ __MODULE__, field, value) do
104+
from(i in query, where: fragment("?->>? = ?", i.data, ^field, ^value))
105+
end
106+
107+
@spec limited(Ecto.Queryable.t(), pos_integer()) :: Ecto.Query.t()
108+
def limited(query \\ __MODULE__, count) do
109+
from(i in query, limit: ^count)
110+
end
111+
87112
@doc """
88113
Orders interactions by sequence number for deterministic ordering.
89114
Falls back to inserted_at for legacy rows without sequence (during migration period).
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
defmodule FrontmanServer.Repo.Migrations.AddUniqueIndexToolResultDedup do
2+
use Ecto.Migration
3+
4+
def up do
5+
# Prevent duplicate tool_result interactions for the same tool_call_id within a task.
6+
# Uses raw SQL because Ecto's unique_index DSL does not support expression columns.
7+
execute("""
8+
CREATE UNIQUE INDEX interactions_tool_result_uniqueness
9+
ON interactions (task_id, (data->>'tool_call_id'))
10+
WHERE type = 'tool_result'
11+
""")
12+
end
13+
14+
def down do
15+
execute("DROP INDEX IF EXISTS interactions_tool_result_uniqueness")
16+
end
17+
end

0 commit comments

Comments
 (0)