Skip to content

Commit 91bc7f6

Browse files
authored
Use async/await in the transaction APIs (#91)
Use async/await in the transaction APIs
2 parents b7a9441 + 4ce0844 commit 91bc7f6

File tree

8 files changed

+43
-32
lines changed

8 files changed

+43
-32
lines changed

.travis.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ os:
66
# - windows # TODO: https://github.com/pingcap/kvproto/issues/355
77
- osx
88
rust:
9-
# Requires nightly for now, stable can be re-enabled when 1.36 is stable.
9+
# Requires nightly for now, stable can be re-enabled when async/await is stable.
1010
# - stable
1111
- nightly
1212
env:

README.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@ This is an open source (Apache 2) project hosted by the Cloud Native Computing F
1414

1515
## Using the client
1616

17-
The TiKV client is a Rust library (crate). It requires version 1.36 of the compiler and standard libraries (which will be stable from the 4th July 2019, see below for ensuring compatibility).
17+
The TiKV client is a Rust library (crate). It uses async/await internally and exposes some `async fn` APIs as well.
18+
19+
Async/await is a new feature in Rust and is currently unstable. To use it you'll need to add the feature flag `#![async_await]` to your crate and use a nightly compiler (see below).
1820

1921
To use this crate in your project, add it as a dependency in your `Cargo.toml`:
2022

@@ -28,8 +30,6 @@ The client requires a Git dependency until we can [publish it](https://github.co
2830

2931
There are [examples](examples) which show how to use the client in a Rust program.
3032

31-
The examples and documentation use async/await syntax. This is a new feature in Rust and is currently unstable. To use async/await you'll need to add the feature flag `#![async_await]` to your crate and use a nightly compiler (see below).
32-
3333
## Access the documentation
3434

3535
We recommend using the cargo-generated documentation to browse and understand the API. We've done
@@ -52,7 +52,7 @@ To check what version of Rust you are using, run
5252
rustc --version
5353
```
5454

55-
You'll see something like `rustc 1.36.0-nightly (a784a8022 2019-05-09)` where the `1.36.0` is the toolchain version, and `nightly` is the channel (stable/beta/nightly). To install another toolchain use
55+
You'll see something like `rustc 1.38.0-nightly (dddb7fca0 2019-07-30)` where the `1.38.0` is the toolchain version, and `nightly` is the channel (stable/beta/nightly). To install another toolchain use
5656

5757
```bash
5858
rustup toolchain install nightly

examples/transaction.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use tikv_client::{
1313
};
1414

1515
async fn puts(client: &Client, pairs: impl IntoIterator<Item = impl Into<KvPair>>) {
16-
let mut txn = client.begin();
16+
let mut txn = client.begin().await.expect("Could not begin a transaction");
1717
future::join_all(
1818
pairs
1919
.into_iter()
@@ -28,7 +28,7 @@ async fn puts(client: &Client, pairs: impl IntoIterator<Item = impl Into<KvPair>
2828
}
2929

3030
async fn get(client: &Client, key: Key) -> Value {
31-
let txn = client.begin();
31+
let txn = client.begin().await.expect("Could not begin a transaction");
3232
txn.get(key).await.expect("Could not get value")
3333
}
3434

@@ -37,6 +37,8 @@ async fn get(client: &Client, key: Key) -> Value {
3737
async fn scan(client: &Client, range: impl RangeBounds<Key>, mut limit: usize) {
3838
client
3939
.begin()
40+
.await
41+
.expect("Could not begin a transaction")
4042
.scan(range)
4143
.into_stream()
4244
.take_while(move |r| {
@@ -53,7 +55,7 @@ async fn scan(client: &Client, range: impl RangeBounds<Key>, mut limit: usize) {
5355
}
5456

5557
async fn dels(client: &Client, keys: impl IntoIterator<Item = Key>) {
56-
let mut txn = client.begin();
58+
let mut txn = client.begin().await.expect("Could not begin a transaction");
5759
txn.set_isolation_level(IsolationLevel::ReadCommitted);
5860
let _: Vec<()> = stream::iter(keys.into_iter())
5961
.then(|p| {

rust-toolchain

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
nightly-2019-07-16
1+
nightly-2019-07-31

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
// Long and nested future chains can quickly result in large generic types.
44
#![type_length_limit = "16777216"]
55
#![allow(clippy::redundant_closure)]
6+
#![feature(async_await)]
67

78
//! This crate provides a clean, ready to use client for [TiKV](https://github.com/tikv/tikv), a
89
//! distributed transactional Key-Value database written in Rust.

src/rpc/client.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use crate::{
2020
kv::BoundRange,
2121
raw::ColumnFamily,
2222
rpc::{
23-
pd::{PdClient, Region, RegionId, RetryClient, StoreId},
23+
pd::{PdClient, Region, RegionId, RetryClient, StoreId, Timestamp},
2424
security::SecurityManager,
2525
tikv::KvClient,
2626
Address, RawContext, Store, TxnContext,
@@ -225,6 +225,10 @@ impl<PdC: PdClient> RpcClient<PdC> {
225225
future::err(Error::unimplemented())
226226
}
227227

228+
pub fn get_timestamp(self: Arc<Self>) -> impl Future<Output = Result<Timestamp>> {
229+
Arc::clone(&self.pd).get_timestamp()
230+
}
231+
228232
// Returns a Steam which iterates over the contexts for each region covered by range.
229233
fn regions_for_range(
230234
self: Arc<Self>,

src/transaction/client.rs

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,19 @@
11
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
22

33
use super::{Snapshot, Timestamp, Transaction};
4-
use crate::{Config, Error};
4+
use crate::rpc::RpcClient;
5+
use crate::{Config, Result};
56

67
use derive_new::new;
78
use futures::prelude::*;
89
use futures::task::{Context, Poll};
910
use std::pin::Pin;
11+
use std::sync::Arc;
1012

1113
/// The TiKV transactional `Client` is used to issue requests to the TiKV server and PD cluster.
12-
pub struct Client;
14+
pub struct Client {
15+
rpc: Arc<RpcClient>,
16+
}
1317

1418
impl Client {
1519
/// Creates a new [`Client`](Client) once the [`Connect`](Connect) resolves.
@@ -38,13 +42,13 @@ impl Client {
3842
/// # futures::executor::block_on(async {
3943
/// let connect = Client::connect(Config::default());
4044
/// let client = connect.await.unwrap();
41-
/// let transaction = client.begin();
45+
/// let transaction = client.begin().await.unwrap();
4246
/// // ... Issue some commands.
4347
/// let commit = transaction.commit();
4448
/// let result: () = commit.await.unwrap();
4549
/// # });
4650
/// ```
47-
pub fn begin(&self) -> Transaction {
51+
pub async fn begin(&self) -> Result<Transaction> {
4852
unimplemented!()
4953
}
5054

@@ -57,11 +61,11 @@ impl Client {
5761
/// # futures::executor::block_on(async {
5862
/// let connect = Client::connect(Config::default());
5963
/// let client = connect.await.unwrap();
60-
/// let snapshot = client.snapshot();
64+
/// let snapshot = client.snapshot().await.unwrap();
6165
/// // ... Issue some commands.
6266
/// # });
6367
/// ```
64-
pub fn snapshot(&self) -> Snapshot {
68+
pub async fn snapshot(&self) -> Result<Snapshot> {
6569
unimplemented!()
6670
}
6771

@@ -79,7 +83,7 @@ impl Client {
7983
/// // ... Issue some commands.
8084
/// # });
8185
/// ```
82-
pub fn snapshot_at(&self, _timestamp: Timestamp) -> Snapshot {
86+
pub async fn snapshot_at(&self, _timestamp: Timestamp) -> Result<Snapshot> {
8387
unimplemented!()
8488
}
8589

@@ -92,11 +96,11 @@ impl Client {
9296
/// # futures::executor::block_on(async {
9397
/// let connect = Client::connect(Config::default());
9498
/// let client = connect.await.unwrap();
95-
/// let timestamp = client.current_timestamp();
99+
/// let timestamp = client.current_timestamp().await.unwrap();
96100
/// # });
97101
/// ```
98-
pub fn current_timestamp(&self) -> Timestamp {
99-
unimplemented!()
102+
pub async fn current_timestamp(&self) -> Result<Timestamp> {
103+
self.rpc.clone().get_timestamp().await
100104
}
101105
}
102106

@@ -120,7 +124,7 @@ pub struct Connect {
120124
}
121125

122126
impl Future for Connect {
123-
type Output = Result<Client, Error>;
127+
type Output = Result<Client>;
124128

125129
fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
126130
let _config = &self.config;

src/transaction/transaction.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use std::ops::RangeBounds;
2222
/// # futures::executor::block_on(async {
2323
/// let connect = Client::connect(Config::default());
2424
/// let client = connect.await.unwrap();
25-
/// let txn = client.begin();
25+
/// let txn = client.begin().await.unwrap();
2626
/// # });
2727
/// ```
2828
#[derive(new)]
@@ -42,7 +42,7 @@ impl Transaction {
4242
/// # futures::executor::block_on(async {
4343
/// # let connect = Client::connect(Config::default());
4444
/// # let connected_client = connect.await.unwrap();
45-
/// let txn = connected_client.begin();
45+
/// let txn = connected_client.begin().await.unwrap();
4646
/// // ... Do some actions.
4747
/// let req = txn.commit();
4848
/// let result: () = req.await.unwrap();
@@ -61,7 +61,7 @@ impl Transaction {
6161
/// # futures::executor::block_on(async {
6262
/// # let connect = Client::connect(Config::default());
6363
/// # let connected_client = connect.await.unwrap();
64-
/// let txn = connected_client.begin();
64+
/// let txn = connected_client.begin().await.unwrap();
6565
/// // ... Do some actions.
6666
/// let req = txn.rollback();
6767
/// let result: () = req.await.unwrap();
@@ -80,7 +80,7 @@ impl Transaction {
8080
/// # futures::executor::block_on(async {
8181
/// # let connect = Client::connect(Config::default());
8282
/// # let connected_client = connect.await.unwrap();
83-
/// let mut txn = connected_client.begin();
83+
/// let mut txn = connected_client.begin().await.unwrap();
8484
/// // ... Do some actions.
8585
/// let req = txn.lock_keys(vec!["TiKV".to_owned(), "Rust".to_owned()]);
8686
/// let result: () = req.await.unwrap();
@@ -103,7 +103,7 @@ impl Transaction {
103103
/// # futures::executor::block_on(async {
104104
/// # let connect = Client::connect(Config::default());
105105
/// # let connected_client = connect.await.unwrap();
106-
/// let txn = connected_client.begin();
106+
/// let txn = connected_client.begin().await.unwrap();
107107
/// // ... Do some actions.
108108
/// let ts: Timestamp = txn.start_ts();
109109
/// # });
@@ -121,7 +121,7 @@ impl Transaction {
121121
/// # futures::executor::block_on(async {
122122
/// # let connect = Client::connect(Config::default());
123123
/// # let connected_client = connect.await.unwrap();
124-
/// let txn = connected_client.begin();
124+
/// let txn = connected_client.begin().await.unwrap();
125125
/// // ... Do some actions.
126126
/// let snap: Snapshot = txn.snapshot();
127127
/// # });
@@ -139,7 +139,7 @@ impl Transaction {
139139
/// # futures::executor::block_on(async {
140140
/// # let connect = Client::connect(Config::default());
141141
/// # let connected_client = connect.await.unwrap();
142-
/// let mut txn = connected_client.begin();
142+
/// let mut txn = connected_client.begin().await.unwrap();
143143
/// txn.set_isolation_level(IsolationLevel::SnapshotIsolation);
144144
/// # });
145145
/// ```
@@ -159,7 +159,7 @@ impl Transaction {
159159
/// # futures::executor::block_on(async {
160160
/// # let connecting_client = Client::connect(Config::new(vec!["192.168.0.100", "192.168.0.101"]));
161161
/// # let connected_client = connecting_client.await.unwrap();
162-
/// let mut txn = connected_client.begin();
162+
/// let mut txn = connected_client.begin().await.unwrap();
163163
/// let key = "TiKV".to_owned();
164164
/// let req = txn.get(key);
165165
/// let result: Value = req.await.unwrap();
@@ -183,7 +183,7 @@ impl Transaction {
183183
/// # futures::executor::block_on(async {
184184
/// # let connecting_client = Client::connect(Config::new(vec!["192.168.0.100", "192.168.0.101"]));
185185
/// # let connected_client = connecting_client.await.unwrap();
186-
/// let mut txn = connected_client.begin();
186+
/// let mut txn = connected_client.begin().await.unwrap();
187187
/// let keys = vec!["TiKV".to_owned(), "TiDB".to_owned()];
188188
/// let req = txn.batch_get(keys);
189189
/// let result: Vec<KvPair> = req.await.unwrap();
@@ -214,7 +214,7 @@ impl Transaction {
214214
/// # futures::executor::block_on(async {
215215
/// # let connecting_client = Client::connect(Config::new(vec!["192.168.0.100", "192.168.0.101"]));
216216
/// # let connected_client = connecting_client.await.unwrap();
217-
/// let mut txn = connected_client.begin();
217+
/// let mut txn = connected_client.begin().await.unwrap();
218218
/// let key = "TiKV".to_owned();
219219
/// let val = "TiKV".to_owned();
220220
/// let req = txn.set(key, val);
@@ -238,7 +238,7 @@ impl Transaction {
238238
/// # futures::executor::block_on(async {
239239
/// # let connecting_client = Client::connect(Config::new(vec!["192.168.0.100", "192.168.0.101"]));
240240
/// # let connected_client = connecting_client.await.unwrap();
241-
/// let mut txn = connected_client.begin();
241+
/// let mut txn = connected_client.begin().await.unwrap();
242242
/// let key = "TiKV".to_owned();
243243
/// let req = txn.delete(key);
244244
/// let result: () = req.await.unwrap();

0 commit comments

Comments
 (0)