Skip to content

The PR add support for blocking RM_Call.#333

Merged
MeirShpilraien merged 7 commits into
masterfrom
blocking_rm_call
Jun 6, 2023
Merged

The PR add support for blocking RM_Call.#333
MeirShpilraien merged 7 commits into
masterfrom
blocking_rm_call

Conversation

@MeirShpilraien
Copy link
Copy Markdown

@MeirShpilraien MeirShpilraien commented May 18, 2023

The blocking RM_Call was introduce on Redis 7.2. The idea is to give a module writer the ability to perform blocking commands like BLPOP using RM_Call. This PR adds this functionality to redismodule-rs.

In order to be able to invoke blocking commands, the user need to use call_blocking instead of call or call_ext. call_blocking will return an enum that can either be a regular reply (like call_ext returns) or it can be a FutureCallReply.

The FutureCallReply can be used to set on_done_handler that will be called when the command gets unblock. The on_done_handler gets the command reply as an input. The FutureCallReply not outlive the Context that was used to invoke the blocking command. This is because the on_done_handler must be set before the Redis GIL is released. This restriction forces it.

After setting the unblock handler, the user will get a FutureHandler object that can be use to abort the command invocation. The abort is done base on a best effort approach. And important details is that the FutureHandler must be freed when Redis GIL is held, this is why we introduce a dispose function (that gets a lock indicator) and not counting on Drop implementation.

A simple usage example:

fn call_blocking(ctx: &Context, _: Vec<RedisString>) -> RedisResult {
    // create blocking call options
    let call_options = CallOptionsBuilder::new().build_blocking();

    // call the blocking command
    let res = ctx.call_blocking("blpop", &call_options, &["list", "1"]);

    // check the reply, if its a future, block the client until the
    // future is resolved.
    match res {
        PromiseCallReply::Resolved(r) => r.map_or_else(|e| Err(e.into()), |v| Ok((&v).into())),
        PromiseCallReply::Future(f) => {
            let blocked_client = ctx.block_client();
            f.set_unblock_handler(move |_ctx, reply| {
                let thread_ctx = ThreadSafeContext::with_blocked_client(blocked_client);
                thread_ctx.reply(reply.map_or_else(|e| Err(e.into()), |v| Ok((&v).into())));
            });
            Ok(RedisValue::NoReply)
        }
    }
}

Notice a nice possible improvement would be to integrate this feature with rust async await. We leave this for future PR.

Notice The PR also updates the redismodule.h file.

The [blocking `RM_Call`](redis/redis#11568) was introduce on Redis 7.2. The idea is to give a module writter the ability to perform blocking commands like `BLPOP` using `RM_Call`. This PR adds this functionallity to `redismodule-rs`.

In order to be able to invoke blocking commands, the user need to use `call_blocking` instead of `call` or `call_ext`. `call_blocking` will return an enum that can either be a regular reply (like `call_ext` returns) or it can be a `FutureCallReply`.

The `FutureCallReply` can be used to set `on_done_handler` that will be called when the command gets unblock. The `on_done_handler` gets the command reply as an intput. The `FutureCallReply` not outlive the `Context` that was used to invoke the  blocking command. This is because the `on_done_handler` must be set before the Redis GIL is released. This restriction forces it.

After setting the unblock handler, the user will get a `FutureHandler` object that can be use to abort the command invocation. The abort is done base on a best effort approach. And important details is that the `FutureHandler` must be freed when Redis GIL is held, this is why we introduce a `dispose` function (that gets a lock indicator) and not counting on `Drop` implementation.

A simple usage example:

```rust
fn call_blocking(ctx: &Context, _: Vec<RedisString>) -> RedisResult {
    // create blocking call options
    let call_options = CallOptionsBuilder::new().build_blocking();

    // call the blocking command
    let res = ctx.call_blocking("blpop", &call_options, &["list", "1"]);

    // check the reply, if its a future, block the client until the
    // future is resolved.
    match res {
        PromiseCallReply::Resolved(r) => r.map_or_else(|e| Err(e.into()), |v| Ok((&v).into())),
        PromiseCallReply::Future(f) => {
            let blocked_client = ctx.block_client();
            f.set_unblock_handler(move |_ctx, reply| {
                let thread_ctx = ThreadSafeContext::with_blocked_client(blocked_client);
                thread_ctx.reply(reply.map_or_else(|e| Err(e.into()), |v| Ok((&v).into())));
            });
            Ok(RedisValue::NoReply)
        }
    }
}
```

**Notice** a nice possible improvement would be to integrate this feature with rust async await. We leave this for future PR.
@MeirShpilraien MeirShpilraien requested a review from iddm May 18, 2023 13:20
iddm
iddm previously approved these changes Jun 6, 2023
Comment thread src/context/call_reply.rs Outdated
Comment thread src/context/call_reply.rs Outdated
Comment thread src/context/call_reply.rs Outdated
@MeirShpilraien MeirShpilraien merged commit 4eb77dc into master Jun 6, 2023
@MeirShpilraien MeirShpilraien deleted the blocking_rm_call branch June 6, 2023 10:25
MeirShpilraien pushed a commit to RedisGears/RedisGears that referenced this pull request Jun 20, 2023
Fix #937

The PR adds support for to call Redis commands that might block the client like [`blpop`](https://redis.io/commands/blpop/).
The new API added on this PR:

1. `client.callAsync` - run poterntially blocking Redis command, returns a promise object that will be resolved when the command finished.
2. `client.callAsyncRaw` - just like `client.callAsync` but do not perform utf8 decoding.

In order to support calling blocking commands, we use the new [`redismodule-rs`](https://github.com/RedisLabsModules/redismodule-rs) API, [`call_blocking`](RedisLabsModules/redismodule-rs#333). The new API allows to call a blocking command and get a future CallReply which will be resolved when the command finished. This future CallReply must be aborted if the instance become replica. To supported that we maintain a new list per library that holds all the created future object (we call it `future_handlers`). In case the instance become replica, we can abort all future CallReplies and return an error to the v8 plugin which will raise the error on the user code (see `testCallAsyncBecomeReplica` test for demonstration).

Notice that the list hold a weak reference to the `future_handlers`, so if it was already resolved it will just be ignored. We use Redis cron job to clean those weak refernces.

From V8 plugin POV, when exeucting a blocking command, the V8 plugin will create a JS promise object that will be resolved by the callback that it set on the future CallReply.

Usage example:

```js
#!js api_version=1.0 name=lib

redis.registerAsyncFunction('my_blpop', async function(client, key, expected_val) {
    var res = null
    do {
        res = await client.block((c) => {
            return c.callAsync('blpop', key, '0');
        })
    } while (res[1] != expected_val);
    return res;
});
```

The following function will continue popping elements from the requested list up until it will encounter the requested value. In case the list is empty it will wait until elements will be added to the list.
MeirShpilraien pushed a commit to RedisGears/RedisGears that referenced this pull request Jun 26, 2023
Added support for `callAsync`.

Fix #937

The PR adds support for to call Redis commands that might block the client like [`blpop`](https://redis.io/commands/blpop/).
The new API added on this PR:

1. `client.callAsync` - run poterntially blocking Redis command, returns a promise object that will be resolved when the command finished.
2. `client.callAsyncRaw` - just like `client.callAsync` but do not perform utf8 decoding.

In order to support calling blocking commands, we use the new [`redismodule-rs`](https://github.com/RedisLabsModules/redismodule-rs) API, [`call_blocking`](RedisLabsModules/redismodule-rs#333). The new API allows to call a blocking command and get a future CallReply which will be resolved when the command finished. This future CallReply must be aborted if the instance become replica. To supported that we maintain a new list per library that holds all the created future object (we call it `future_handlers`). In case the instance become replica, we can abort all future CallReplies and return an error to the v8 plugin which will raise the error on the user code (see `testCallAsyncBecomeReplica` test for demonstration).

Notice that the list hold a weak reference to the `future_handlers`, so if it was already resolved it will just be ignored. We use Redis cron job to clean those weak refernces.

From V8 plugin POV, when exeucting a blocking command, the V8 plugin will create a JS promise object that will be resolved by the callback that it set on the future CallReply.

Usage example:

```js
#!js api_version=1.0 name=lib

redis.registerAsyncFunction('my_blpop', async function(client, key, expected_val) {
    var res = null
    do {
        res = await client.block((c) => {
            return c.callAsync('blpop', key, '0');
        })
    } while (res[1] != expected_val);
    return res;
});
```

The following function will continue popping elements from the requested list up until it will encounter the requested value. In case the list is empty it will wait until elements will be added to the list.
@github-actions github-actions Bot mentioned this pull request May 4, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants