Skip to content

Commit 8b9dc1d

Browse files
authored
Add anonymous delegating clients / generators (#5650)
1 parent d551bb1 commit 8b9dc1d

11 files changed

Lines changed: 743 additions & 1 deletion

File tree

eng/Versions.props

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
<SystemTextEncodingsWebVersion>9.0.0</SystemTextEncodingsWebVersion>
6464
<SystemNumericsTensorsVersion>9.0.0</SystemNumericsTensorsVersion>
6565
<SystemTextJsonVersion>9.0.0</SystemTextJsonVersion>
66+
<SystemThreadingChannelsVersion>9.0.0</SystemThreadingChannelsVersion>
6667
<!-- Dependencies from https://github.com/aspnet/AspNetCore -->
6768
<MicrosoftAspNetCoreAppRefVersion>9.0.1</MicrosoftAspNetCoreAppRefVersion>
6869
<MicrosoftAspNetCoreAppRuntimewinx64Version>9.0.1</MicrosoftAspNetCoreAppRuntimewinx64Version>
@@ -112,6 +113,7 @@
112113
<SystemTextEncodingsWebLTSVersion>8.0.0</SystemTextEncodingsWebLTSVersion>
113114
<SystemNumericsTensorsLTSVersion>8.0.0</SystemNumericsTensorsLTSVersion>
114115
<SystemTextJsonLTSVersion>8.0.5</SystemTextJsonLTSVersion>
116+
<SystemThreadingChannelsLTSVersion>8.0.0</SystemThreadingChannelsLTSVersion>
115117
<!-- Dependencies from https://github.com/aspnet/AspNetCore -->
116118
<MicrosoftAspNetCoreAppRefLTSVersion>8.0.11</MicrosoftAspNetCoreAppRefLTSVersion>
117119
<MicrosoftAspNetCoreAppRuntimewinx64LTSVersion>8.0.11</MicrosoftAspNetCoreAppRuntimewinx64LTSVersion>

eng/packages/General-LTS.props

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
<PackageVersion Include="System.Net.Http.Json" Version="$(SystemNetHttpJsonLTSVersion)" />
3737
<PackageVersion Include="System.Text.Encodings.Web" Version="$(SystemTextEncodingsWebLTSVersion)" />
3838
<PackageVersion Include="System.Text.Json" Version="$(SystemTextJsonLTSVersion)" />
39+
<PackageVersion Include="System.Threading.Channels" Version="$(SystemThreadingChannelsLTSVersion)" />
3940
</ItemGroup>
4041

4142
</Project>

eng/packages/General-net9.props

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
<PackageVersion Include="System.Net.Http.Json" Version="$(SystemNetHttpJsonVersion)" />
3737
<PackageVersion Include="System.Text.Encodings.Web" Version="$(SystemTextEncodingsWebVersion)" />
3838
<PackageVersion Include="System.Text.Json" Version="$(SystemTextJsonVersion)" />
39+
<PackageVersion Include="System.Threading.Channels" Version="$(SystemThreadingChannelsVersion)" />
3940
</ItemGroup>
4041

4142
</Project>

src/Libraries/Microsoft.Extensions.AI.Abstractions/README.md

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -329,6 +329,80 @@ var client = new RateLimitingChatClient(
329329
await client.CompleteAsync("What color is the sky?");
330330
```
331331

332+
To make it easier to compose such components with others, the author of the component is recommended to create a "Use" extension method for registering this component into a pipeline, e.g.
333+
```csharp
334+
public static class RateLimitingChatClientExtensions
335+
{
336+
public static ChatClientBuilder UseRateLimiting(this ChatClientBuilder builder, RateLimiter rateLimiter) =>
337+
builder.Use(innerClient => new RateLimitingChatClient(innerClient, rateLimiter));
338+
}
339+
```
340+
341+
Such extensions may also query for relevant services from the DI container; the `IServiceProvider` used by the pipeline is passed in as an optional parameter:
342+
```csharp
343+
public static class RateLimitingChatClientExtensions
344+
{
345+
public static ChatClientBuilder UseRateLimiting(this ChatClientBuilder builder, RateLimiter? rateLimiter = null) =>
346+
builder.Use((innerClient, services) => new RateLimitingChatClient(innerClient, services.GetRequiredService<RateLimiter>()));
347+
}
348+
```
349+
350+
The consumer can then easily use this in their pipeline, e.g.
351+
```csharp
352+
var client = new SampleChatClient(new Uri("http://localhost"), "test")
353+
.AsBuilder()
354+
.UseDistributedCache()
355+
.UseRateLimiting()
356+
.UseOpenTelemetry()
357+
.Build(services);
358+
```
359+
360+
The above extension methods demonstrate using a `Use` method on `ChatClientBuilder`. `ChatClientBuilder` also provides `Use` overloads that make it easier to
361+
write such delegating handlers. For example, in the earlier `RateLimitingChatClient` example, the overrides of `CompleteAsync` and `CompleteStreamingAsync` only
362+
need to do work before and after delegating to the next client in the pipeline. To achieve the same thing without writing a custom class, an overload of `Use` may
363+
be used that accepts a delegate which is used for both `CompleteAsync` and `CompleteStreamingAsync`, reducing the boilderplate required:
364+
```csharp
365+
RateLimiter rateLimiter = ...;
366+
var client = new SampleChatClient(new Uri("http://localhost"), "test")
367+
.AsBuilder()
368+
.UseDistributedCache()
369+
.Use(async (chatMessages, options, nextAsync, cancellationToken) =>
370+
{
371+
using var lease = await rateLimiter.AcquireAsync(permitCount: 1, cancellationToken).ConfigureAwait(false);
372+
if (!lease.IsAcquired)
373+
throw new InvalidOperationException("Unable to acquire lease.");
374+
375+
await nextAsync(chatMessages, options, cancellationToken);
376+
})
377+
.UseOpenTelemetry()
378+
.Build();
379+
```
380+
This overload internally uses a public `AnonymousDelegatingChatClient`, which enables more complicated patterns with only a little additional code.
381+
For example, to achieve the same as above but with the `RateLimiter` retrieved from DI:
382+
```csharp
383+
var client = new SampleChatClient(new Uri("http://localhost"), "test")
384+
.AsBuilder()
385+
.UseDistributedCache()
386+
.Use((innerClient, services) =>
387+
{
388+
RateLimiter rateLimiter = services.GetRequiredService<RateLimiter>();
389+
return new AnonymousDelegatingChatClient(innerClient, async (chatMessages, options, next, cancellationToken) =>
390+
{
391+
using var lease = await rateLimiter.AcquireAsync(permitCount: 1, cancellationToken).ConfigureAwait(false);
392+
if (!lease.IsAcquired)
393+
throw new InvalidOperationException("Unable to acquire lease.");
394+
395+
await next(chatMessages, options, cancellationToken);
396+
});
397+
})
398+
.UseOpenTelemetry()
399+
.Build();
400+
```
401+
402+
For scenarios where the developer would like to specify delegating implementations of `CompleteAsync` and `CompleteStreamingAsync` inline,
403+
and where it's important to be able to write a different implementation for each in order to handle their unique return types specially,
404+
another overload of `Use` exists that accepts a delegate for each.
405+
332406
#### Dependency Injection
333407

334408
`IChatClient` implementations will typically be provided to an application via dependency injection (DI). In this example, an `IDistributedCache` is added into the DI container, as is an `IChatClient`. The registration for the `IChatClient` employs a builder that creates a pipeline containing a caching client (which will then use an `IDistributedCache` retrieved from DI) and the sample client. Elsewhere in the app, the injected `IChatClient` may be retrieved and used.
Lines changed: 213 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,213 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
4+
using System;
5+
using System.Collections.Generic;
6+
using System.Diagnostics;
7+
using System.Threading;
8+
using System.Threading.Channels;
9+
using System.Threading.Tasks;
10+
using Microsoft.Shared.Diagnostics;
11+
12+
#pragma warning disable VSTHRD003 // Avoid awaiting foreign Tasks
13+
14+
namespace Microsoft.Extensions.AI;
15+
16+
/// <summary>A delegating chat client that wraps an inner client with implementations provided by delegates.</summary>
17+
public sealed class AnonymousDelegatingChatClient : DelegatingChatClient
18+
{
19+
/// <summary>The delegate to use as the implementation of <see cref="CompleteAsync"/>.</summary>
20+
private readonly Func<IList<ChatMessage>, ChatOptions?, IChatClient, CancellationToken, Task<ChatCompletion>>? _completeFunc;
21+
22+
/// <summary>The delegate to use as the implementation of <see cref="CompleteStreamingAsync"/>.</summary>
23+
/// <remarks>
24+
/// When non-<see langword="null"/>, this delegate is used as the implementation of <see cref="CompleteStreamingAsync"/> and
25+
/// will be invoked with the same arguments as the method itself, along with a reference to the inner client.
26+
/// When <see langword="null"/>, <see cref="CompleteStreamingAsync"/> will delegate directly to the inner client.
27+
/// </remarks>
28+
private readonly Func<IList<ChatMessage>, ChatOptions?, IChatClient, CancellationToken, IAsyncEnumerable<StreamingChatCompletionUpdate>>? _completeStreamingFunc;
29+
30+
/// <summary>The delegate to use as the implementation of both <see cref="CompleteAsync"/> and <see cref="CompleteStreamingAsync"/>.</summary>
31+
private readonly CompleteSharedFunc? _sharedFunc;
32+
33+
/// <summary>
34+
/// Initializes a new instance of the <see cref="AnonymousDelegatingChatClient"/> class.
35+
/// </summary>
36+
/// <param name="innerClient">The inner client.</param>
37+
/// <param name="sharedFunc">
38+
/// A delegate that provides the implementation for both <see cref="CompleteAsync"/> and <see cref="CompleteStreamingAsync"/>.
39+
/// In addition to the arguments for the operation, it's provided with a delegate to the inner client that should be
40+
/// used to perform the operation on the inner client. It will handle both the non-streaming and streaming cases.
41+
/// </param>
42+
/// <remarks>
43+
/// This overload may be used when the anonymous implementation needs to provide pre- and/or post-processing, but doesn't
44+
/// need to interact with the results of the operation, which will come from the inner client.
45+
/// </remarks>
46+
/// <exception cref="ArgumentNullException"><paramref name="innerClient"/> is <see langword="null"/>.</exception>
47+
/// <exception cref="ArgumentNullException"><paramref name="sharedFunc"/> is <see langword="null"/>.</exception>
48+
public AnonymousDelegatingChatClient(IChatClient innerClient, CompleteSharedFunc sharedFunc)
49+
: base(innerClient)
50+
{
51+
_ = Throw.IfNull(sharedFunc);
52+
53+
_sharedFunc = sharedFunc;
54+
}
55+
56+
/// <summary>
57+
/// Initializes a new instance of the <see cref="AnonymousDelegatingChatClient"/> class.
58+
/// </summary>
59+
/// <param name="innerClient">The inner client.</param>
60+
/// <param name="completeFunc">
61+
/// A delegate that provides the implementation for <see cref="CompleteAsync"/>. When <see langword="null"/>,
62+
/// <paramref name="completeStreamingFunc"/> must be non-null, and the implementation of <see cref="CompleteAsync"/>
63+
/// will use <paramref name="completeStreamingFunc"/> for the implementation.
64+
/// </param>
65+
/// <param name="completeStreamingFunc">
66+
/// A delegate that provides the implementation for <see cref="CompleteStreamingAsync"/>. When <see langword="null"/>,
67+
/// <paramref name="completeFunc"/> must be non-null, and the implementation of <see cref="CompleteStreamingAsync"/>
68+
/// will use <paramref name="completeFunc"/> for the implementation.
69+
/// </param>
70+
/// <exception cref="ArgumentNullException"><paramref name="innerClient"/> is <see langword="null"/>.</exception>
71+
/// <exception cref="ArgumentNullException">Both <paramref name="completeFunc"/> and <paramref name="completeStreamingFunc"/> are <see langword="null"/>.</exception>
72+
public AnonymousDelegatingChatClient(
73+
IChatClient innerClient,
74+
Func<IList<ChatMessage>, ChatOptions?, IChatClient, CancellationToken, Task<ChatCompletion>>? completeFunc,
75+
Func<IList<ChatMessage>, ChatOptions?, IChatClient, CancellationToken, IAsyncEnumerable<StreamingChatCompletionUpdate>>? completeStreamingFunc)
76+
: base(innerClient)
77+
{
78+
ThrowIfBothDelegatesNull(completeFunc, completeStreamingFunc);
79+
80+
_completeFunc = completeFunc;
81+
_completeStreamingFunc = completeStreamingFunc;
82+
}
83+
84+
/// <inheritdoc/>
85+
public override Task<ChatCompletion> CompleteAsync(
86+
IList<ChatMessage> chatMessages, ChatOptions? options = null, CancellationToken cancellationToken = default)
87+
{
88+
_ = Throw.IfNull(chatMessages);
89+
90+
if (_sharedFunc is not null)
91+
{
92+
return CompleteViaSharedAsync(chatMessages, options, cancellationToken);
93+
94+
async Task<ChatCompletion> CompleteViaSharedAsync(IList<ChatMessage> chatMessages, ChatOptions? options, CancellationToken cancellationToken)
95+
{
96+
ChatCompletion? completion = null;
97+
await _sharedFunc(chatMessages, options, async (chatMessages, options, cancellationToken) =>
98+
{
99+
completion = await InnerClient.CompleteAsync(chatMessages, options, cancellationToken).ConfigureAwait(false);
100+
}, cancellationToken).ConfigureAwait(false);
101+
102+
if (completion is null)
103+
{
104+
throw new InvalidOperationException("The wrapper completed successfully without producing a ChatCompletion.");
105+
}
106+
107+
return completion;
108+
}
109+
}
110+
else if (_completeFunc is not null)
111+
{
112+
return _completeFunc(chatMessages, options, InnerClient, cancellationToken);
113+
}
114+
else
115+
{
116+
Debug.Assert(_completeStreamingFunc is not null, "Expected non-null streaming delegate.");
117+
return _completeStreamingFunc!(chatMessages, options, InnerClient, cancellationToken)
118+
.ToChatCompletionAsync(coalesceContent: true, cancellationToken);
119+
}
120+
}
121+
122+
/// <inheritdoc/>
123+
public override IAsyncEnumerable<StreamingChatCompletionUpdate> CompleteStreamingAsync(
124+
IList<ChatMessage> chatMessages, ChatOptions? options = null, CancellationToken cancellationToken = default)
125+
{
126+
_ = Throw.IfNull(chatMessages);
127+
128+
if (_sharedFunc is not null)
129+
{
130+
var updates = Channel.CreateBounded<StreamingChatCompletionUpdate>(1);
131+
132+
#pragma warning disable CA2016 // explicitly not forwarding the cancellation token, as we need to ensure the channel is always completed
133+
_ = Task.Run(async () =>
134+
#pragma warning restore CA2016
135+
{
136+
Exception? error = null;
137+
try
138+
{
139+
await _sharedFunc(chatMessages, options, async (chatMessages, options, cancellationToken) =>
140+
{
141+
await foreach (var update in InnerClient.CompleteStreamingAsync(chatMessages, options, cancellationToken).ConfigureAwait(false))
142+
{
143+
await updates.Writer.WriteAsync(update, cancellationToken).ConfigureAwait(false);
144+
}
145+
}, cancellationToken).ConfigureAwait(false);
146+
}
147+
catch (Exception ex)
148+
{
149+
error = ex;
150+
throw;
151+
}
152+
finally
153+
{
154+
_ = updates.Writer.TryComplete(error);
155+
}
156+
});
157+
158+
return updates.Reader.ReadAllAsync(cancellationToken);
159+
}
160+
else if (_completeStreamingFunc is not null)
161+
{
162+
return _completeStreamingFunc(chatMessages, options, InnerClient, cancellationToken);
163+
}
164+
else
165+
{
166+
Debug.Assert(_completeFunc is not null, "Expected non-null non-streaming delegate.");
167+
return CompleteStreamingAsyncViaCompleteAsync(_completeFunc!(chatMessages, options, InnerClient, cancellationToken));
168+
169+
static async IAsyncEnumerable<StreamingChatCompletionUpdate> CompleteStreamingAsyncViaCompleteAsync(Task<ChatCompletion> task)
170+
{
171+
ChatCompletion completion = await task.ConfigureAwait(false);
172+
foreach (var update in completion.ToStreamingChatCompletionUpdates())
173+
{
174+
yield return update;
175+
}
176+
}
177+
}
178+
}
179+
180+
/// <summary>Throws an exception if both of the specified delegates are null.</summary>
181+
/// <exception cref="ArgumentNullException">Both <paramref name="completeFunc"/> and <paramref name="completeStreamingFunc"/> are <see langword="null"/>.</exception>
182+
internal static void ThrowIfBothDelegatesNull(object? completeFunc, object? completeStreamingFunc)
183+
{
184+
if (completeFunc is null && completeStreamingFunc is null)
185+
{
186+
Throw.ArgumentNullException(nameof(completeFunc), $"At least one of the {nameof(completeFunc)} or {nameof(completeStreamingFunc)} delegates must be non-null.");
187+
}
188+
}
189+
190+
// Design note:
191+
// The following delegate could juse use Func<...>, but it's defined as a custom delegate type
192+
// in order to provide better discoverability / documentation / usability around its complicated
193+
// signature with the nextAsync delegate parameter.
194+
195+
/// <summary>
196+
/// Represents a method used to call <see cref="IChatClient.CompleteAsync"/> or <see cref="IChatClient.CompleteStreamingAsync"/>.
197+
/// </summary>
198+
/// <param name="chatMessages">The chat content to send.</param>
199+
/// <param name="options">The chat options to configure the request.</param>
200+
/// <param name="nextAsync">
201+
/// A delegate that provides the implementation for the inner client's <see cref="IChatClient.CompleteAsync"/> or
202+
/// <see cref="IChatClient.CompleteStreamingAsync"/>. It should be invoked to continue the pipeline. It accepts
203+
/// the chat messages, options, and cancellation token, which are typically the same instances as provided to this method
204+
/// but need not be.
205+
/// </param>
206+
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests. The default is <see cref="CancellationToken.None"/>.</param>
207+
/// <returns>A <see cref="Task"/> that represents the completion of the operation.</returns>
208+
public delegate Task CompleteSharedFunc(
209+
IList<ChatMessage> chatMessages,
210+
ChatOptions? options,
211+
Func<IList<ChatMessage>, ChatOptions?, CancellationToken, Task> nextAsync,
212+
CancellationToken cancellationToken);
213+
}

0 commit comments

Comments
 (0)