Skip to content

Support asynchronous/reactive function calling #1778

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
PeatBoy opened this issue Nov 20, 2024 · 7 comments
Open

Support asynchronous/reactive function calling #1778

PeatBoy opened this issue Nov 20, 2024 · 7 comments

Comments

@PeatBoy
Copy link

PeatBoy commented Nov 20, 2024

Please do a quick search on GitHub issues first, the feature you are about to request might have already been requested.

Expected Behavior

The current FunctionCallback only supports blocking programming. If the bottom layer of the provided function callback method uses the streaming programming of reactor, it can only be blocked by block or asynchronous thread pool.

Current Behavior

I hope to provide a FunctionCallback that can be called by reactor streaming programming. For example, bi function < request, tool context, Mono >

Context
This is more friendly for applications built entirely by webflux, and using blocking FunctionCallback in webflux will bring many difficult problems.

More information

@tzolov
Copy link
Contributor

tzolov commented Nov 22, 2024

@PeatBoy - Thanks for flagging this. Reactive FunctionCallback isn't currently supported.
While we (@chemicL) may consider adding such a feature in the future, for now you'll need to use a blocking call instead of .subscribe(res::add).

@chemicL
Copy link
Member

chemicL commented Nov 22, 2024

Hey, as @tzolov mentioned, reactive/asynchronous signatures for functions are currently not supported, but it looks like a desired functionality having in mind streaming scenarios.

I suggest closing the discussion #1757 and keeping this issue and related discussions here.

As you noticed:

    ArrayList<Object> res = new ArrayList<>();
        // ...
        .subscribe(res::add);
    return new Response(res);

this construct immediately returns an object which is incomplete and another Thread will mutate it (potentially not even making the result visible to the consuming Thread) with no synchronization whatsoever. That is undesired.

As a solution for the time being you don't need to introduce CompletableFuture at all:

List<Map<String, Object>> data = CompletableFuture.supplyAsync(
    () -> this.dataQueryRepository.query()
        // ...
        .collectList()
        .block())
    .join();

Instead,

this.dataQueryRepository.query()
    // ...
    .collectList()
    .block())

should get you the same result.

For the Spring AI's internal implementation, it's worth keeping in mind that in case when an imperative user function is executed in an event loop, it's worth offloading the blocking call into a bounded-elastic Scheduler on which blocking is allowed, e.g. via

        return Mono.fromCallable(() -> userBlockingFunction())
            .subscribeOn(Schedulers.boundedElastic());

regardless of whether the function is using reactive APIs or not, since just performing blocking calls in imperative code will degrade the performance and stall the event loop.

@chemicL chemicL changed the title Can function callback be supported to call reactor reaction method? Support asynchronous/reactive function calling. Nov 22, 2024
@chemicL chemicL changed the title Support asynchronous/reactive function calling. Support asynchronous/reactive function calling Nov 22, 2024
@PeatBoy
Copy link
Author

PeatBoy commented Nov 24, 2024

Ok, thank you for your reply @chemicL . I hope you can remind me under this issues if there is any progress in asynchronous/reactive function calling.

@martypitt
Copy link

martypitt commented Feb 26, 2025

Hey folks - keen to see this implemented as well. I spent some time looking into this myself, and quickly found that the entire stack needs to become reactive.

Not neccessarily a bad idea (eg., calls to the actual LLM are typically quite long, and would benefit from being non-blocking), but it's not a trivial amount of effort, hence I appreciate it might take a while to arrive.

I also wonder if JVM's new virtual threads can help deliver similar benefit here, without the same level of refactor effort? Unsure.

In the interim, I thought I'd share how we've worked around this, in case anyone else hits this issue.
We wanted to support human-in-the-loop with our tool calling, and this can't be blocking for obvious reasons.

Here's a sketch of the implementation we've used. I've tried to simplify it down to the key parts:

// This is our wrapper around ChatClient
// to help make testing easier - unrelated
fun sendToAi(
    // inputs..
): Mono<ChatResponse> {
    // TODO : This could/should move to reactive as well...
    val chatResponse:ChatResponse = chatClient
        .chatResponse()!!
    // Execute tools if neccessary..
    val chatResponseMono = if (response.hasToolCalls()) {
        executeTools(/** inputs */)
    } else Mono.just(response)

    return chatResponseMono
}

// Execute tools. Returns Mono<ChatResponse>
// to allow tool-calling to be async
private fun executeTools(
): Mono<ChatResponse> {
    val toolCallingManager = ToolCallingManager.builder().build()

    return Mono.just(prompt to chatResponse)
        .flatMap {
            invokeTool(toolCallingManager, prompt, chatResponse, session)
        }.flatMap { toolResponsePrompt ->
            // TODO : Make async
            // Send the result of the tool back to the main agent to interpret the result
            // and decide what to do next.
            // It might complete, or it might decide to invoke another tool
            val toolCallingResponse = chatClient
                .prompt(toolResponsePrompt)
                .call()
                .chatResponse()!!
        }
    // If there's more work to do, recurse...
    if (toolCallingResponse.hasToolCalls()) {
        executeTools(toolParams, toolResponsePrompt, toolCallingResponse, spec, session)
    } else {
        Mono.just(toolCallingResponse)
    }
}

// This is the key function - we may route to a human for intervention,
// before routing back into the function calling loop
private fun invokeTool(
 // inputs...
): Mono<Prompt> {
    val toolExecutionResult = toolCallingManager.executeToolCalls(prompt, chatResponse)
    return if (requiresUserIntervention(toolExecutionResult)) {
        routeToUser(toolExecutionResult, session, prompt)
    } else {
        Mono.just(Prompt(toolExecutionResult.conversationHistory(), prompt.options))
    }
}

private fun routeToUser( /* inputs */):Mono<Prompt> {
    // send to the user, process response, and return the next prompt.
}

Hope that helps.

And as always, huge thanks to the maintainers for their efforts!

@lovelinessmoe
Copy link

Hey, as @tzolov mentioned, reactive/asynchronous signatures for functions are currently not supported, but it looks like a desired functionality having in mind streaming scenarios.

I suggest closing the discussion #1757 and keeping this issue and related discussions here.

As you noticed:

ArrayList<Object> res = new ArrayList<>();
    // ...
    .subscribe(res::add);
return new Response(res);

this construct immediately returns an object which is incomplete and another Thread will mutate it (potentially not even making the result visible to the consuming Thread) with no synchronization whatsoever. That is undesired.

As a solution for the time being you don't need to introduce CompletableFuture at all:

List<Map<String, Object>> data = CompletableFuture.supplyAsync(
() -> this.dataQueryRepository.query()
// ...
.collectList()
.block())
.join();
Instead,

this.dataQueryRepository.query()
// ...
.collectList()
.block())
should get you the same result.

For the Spring AI's internal implementation, it's worth keeping in mind that in case when an imperative user function is executed in an event loop, it's worth offloading the blocking call into a bounded-elastic Scheduler on which blocking is allowed, e.g. via

    return Mono.fromCallable(() -> userBlockingFunction())
        .subscribeOn(Schedulers.boundedElastic());

regardless of whether the function is using reactive APIs or not, since just performing blocking calls in imperative code will degrade the performance and stall the event loop.

hello, while i use block() in tool calling
i got org.springframework.ai.tool.execution.ToolExecutionException: block()/blockFirst()/blockLast() are blocking,
in

Image

here is my code, how i can use this method successfully

Image

@lovelinessmoe
Copy link

when i use

Image

AI give me

Information about Meow Meow has been obtained. For information protection reasons, I will provide a general description:

- ** Buyer's Name ** : Meow Meow

If you need more detailed information (such as address, contact details, etc.), please confirm that you have permission to view this data or provide more background information to proceed with your inquiry. If you have any other specific needs, please feel free to let us know!

Image

@eugene-kamenev
Copy link

I did some work to implement non blocking tool calls on top of Spring AI:
https://github.com/eugene-kamenev/spring-ai-reactive-tools

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

6 participants