-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Expand file tree
/
Copy pathFunctionTriggers.cs
More file actions
229 lines (198 loc) · 9.54 KB
/
FunctionTriggers.cs
File metadata and controls
229 lines (198 loc) · 9.54 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
// Copyright (c) Microsoft. All rights reserved.
using System.Net;
using System.Text.Json;
using Microsoft.Agents.AI;
using Microsoft.Agents.AI.DurableTask;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Azure.Functions.Worker.Http;
using Microsoft.DurableTask;
using Microsoft.DurableTask.Client;
using Microsoft.Extensions.Logging;
namespace AgentOrchestration_HITL;
public static class FunctionTriggers
{
[Function(nameof(RunOrchestrationAsync))]
public static async Task<object> RunOrchestrationAsync(
[OrchestrationTrigger] TaskOrchestrationContext context)
{
// Get the input from the orchestration
ContentGenerationInput input = context.GetInput<ContentGenerationInput>()
?? throw new InvalidOperationException("Content generation input is required");
// Get the writer agent
DurableAIAgent writerAgent = context.GetAgent("WriterAgent");
AgentSession writerSession = await writerAgent.GetNewSessionAsync();
// Set initial status
context.SetCustomStatus($"Starting content generation for topic: {input.Topic}");
// Step 1: Generate initial content
AgentResponse<GeneratedContent> writerResponse = await writerAgent.RunAsync<GeneratedContent>(
message: $"Write a short article about '{input.Topic}'.",
session: writerSession);
GeneratedContent content = writerResponse.Result;
// Human-in-the-loop iteration - we set a maximum number of attempts to avoid infinite loops
int iterationCount = 0;
while (iterationCount++ < input.MaxReviewAttempts)
{
context.SetCustomStatus(
$"Requesting human feedback. Iteration #{iterationCount}. Timeout: {input.ApprovalTimeoutHours} hour(s).");
// Step 2: Notify user to review the content
await context.CallActivityAsync(nameof(NotifyUserForApproval), content);
// Step 3: Wait for human feedback with configurable timeout
HumanApprovalResponse humanResponse;
try
{
humanResponse = await context.WaitForExternalEvent<HumanApprovalResponse>(
eventName: "HumanApproval",
timeout: TimeSpan.FromHours(input.ApprovalTimeoutHours));
}
catch (OperationCanceledException)
{
// Timeout occurred - treat as rejection
context.SetCustomStatus(
$"Human approval timed out after {input.ApprovalTimeoutHours} hour(s). Treating as rejection.");
throw new TimeoutException($"Human approval timed out after {input.ApprovalTimeoutHours} hour(s).");
}
if (humanResponse.Approved)
{
context.SetCustomStatus("Content approved by human reviewer. Publishing content...");
// Step 4: Publish the approved content
await context.CallActivityAsync(nameof(PublishContent), content);
context.SetCustomStatus($"Content published successfully at {context.CurrentUtcDateTime:s}");
return new { content = content.Content };
}
context.SetCustomStatus("Content rejected by human reviewer. Incorporating feedback and regenerating...");
// Incorporate human feedback and regenerate
writerResponse = await writerAgent.RunAsync<GeneratedContent>(
message: $"""
The content was rejected by a human reviewer. Please rewrite the article incorporating their feedback.
Human Feedback: {humanResponse.Feedback}
""",
session: writerSession);
content = writerResponse.Result;
}
// If we reach here, it means we exhausted the maximum number of iterations
throw new InvalidOperationException(
$"Content could not be approved after {input.MaxReviewAttempts} iterations.");
}
// POST /hitl/run
[Function(nameof(StartOrchestrationAsync))]
public static async Task<HttpResponseData> StartOrchestrationAsync(
[HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "hitl/run")] HttpRequestData req,
[DurableClient] DurableTaskClient client)
{
// Read the input from the request body
ContentGenerationInput? input = await req.ReadFromJsonAsync<ContentGenerationInput>();
if (input is null || string.IsNullOrWhiteSpace(input.Topic))
{
HttpResponseData badRequestResponse = req.CreateResponse(HttpStatusCode.BadRequest);
await badRequestResponse.WriteAsJsonAsync(new { error = "Topic is required" });
return badRequestResponse;
}
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
orchestratorName: nameof(RunOrchestrationAsync),
input: input);
HttpResponseData response = req.CreateResponse(HttpStatusCode.Accepted);
await response.WriteAsJsonAsync(new
{
message = "HITL content generation orchestration started.",
topic = input.Topic,
instanceId,
statusQueryGetUri = GetStatusQueryGetUri(req, instanceId),
});
return response;
}
// POST /hitl/approve/{instanceId}
[Function(nameof(SendHumanApprovalAsync))]
public static async Task<HttpResponseData> SendHumanApprovalAsync(
[HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "hitl/approve/{instanceId}")] HttpRequestData req,
string instanceId,
[DurableClient] DurableTaskClient client)
{
// Read the approval response from the request body
HumanApprovalResponse? approvalResponse = await req.ReadFromJsonAsync<HumanApprovalResponse>();
if (approvalResponse is null)
{
HttpResponseData badRequestResponse = req.CreateResponse(HttpStatusCode.BadRequest);
await badRequestResponse.WriteAsJsonAsync(new { error = "Approval response is required" });
return badRequestResponse;
}
// Send the approval event to the orchestration
await client.RaiseEventAsync(instanceId, "HumanApproval", approvalResponse);
HttpResponseData response = req.CreateResponse(HttpStatusCode.OK);
await response.WriteAsJsonAsync(new
{
message = "Human approval sent to orchestration.",
instanceId,
approved = approvalResponse.Approved
});
return response;
}
// GET /hitl/status/{instanceId}
[Function(nameof(GetOrchestrationStatusAsync))]
public static async Task<HttpResponseData> GetOrchestrationStatusAsync(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = "hitl/status/{instanceId}")] HttpRequestData req,
string instanceId,
[DurableClient] DurableTaskClient client)
{
OrchestrationMetadata? status = await client.GetInstanceAsync(
instanceId,
getInputsAndOutputs: true,
req.FunctionContext.CancellationToken);
if (status is null)
{
HttpResponseData notFound = req.CreateResponse(HttpStatusCode.NotFound);
await notFound.WriteAsJsonAsync(new { error = "Instance not found" });
return notFound;
}
HttpResponseData response = req.CreateResponse(HttpStatusCode.OK);
await response.WriteAsJsonAsync(new
{
instanceId = status.InstanceId,
runtimeStatus = status.RuntimeStatus.ToString(),
workflowStatus = status.SerializedCustomStatus is not null ? (object)status.ReadCustomStatusAs<JsonElement>() : null,
input = status.SerializedInput is not null ? (object)status.ReadInputAs<JsonElement>() : null,
output = status.SerializedOutput is not null ? (object)status.ReadOutputAs<JsonElement>() : null,
failureDetails = status.FailureDetails
});
return response;
}
[Function(nameof(NotifyUserForApproval))]
public static void NotifyUserForApproval(
[ActivityTrigger] GeneratedContent content,
FunctionContext functionContext)
{
ILogger logger = functionContext.GetLogger(nameof(NotifyUserForApproval));
// In a real implementation, this would send notifications via email, SMS, etc.
logger.LogInformation(
"""
NOTIFICATION: Please review the following content for approval:
Title: {Title}
Content: {Content}
Use the approval endpoint to approve or reject this content.
""",
content.Title,
content.Content);
}
[Function(nameof(PublishContent))]
public static void PublishContent(
[ActivityTrigger] GeneratedContent content,
FunctionContext functionContext)
{
ILogger logger = functionContext.GetLogger(nameof(PublishContent));
// In a real implementation, this would publish to a CMS, website, etc.
logger.LogInformation(
"""
PUBLISHING: Content has been published successfully.
Title: {Title}
Content: {Content}
""",
content.Title,
content.Content);
}
private static string GetStatusQueryGetUri(HttpRequestData req, string instanceId)
{
// NOTE: This can be made more robust by considering the value of
// request headers like "X-Forwarded-Host" and "X-Forwarded-Proto".
string authority = $"{req.Url.Scheme}://{req.Url.Authority}";
return $"{authority}/api/hitl/status/{instanceId}";
}
}