Skip to content

Commit e0625b3

Browse files
ion-elgrecotustvoldalamb
authored
feat: Add SpawnService and SpawnedReqwestConnector for running requests on a different runtime (#332)
* feat: spawn service * feat: add SpawnedReqwestConnector * chore: add zed settings to .gitignore * chore: add docs * Tweak main page documentation * Update documentation and fix example * Fix ci * Add end to end integration test * cleanup test * use separate bucket for test * try and fix wasm build --------- Co-authored-by: Raphael Taylor-Davies <[email protected]> Co-authored-by: Andrew Lamb <[email protected]>
1 parent edc01d9 commit e0625b3

File tree

12 files changed

+345
-19
lines changed

12 files changed

+345
-19
lines changed

.github/workflows/ci.yml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ jobs:
129129
echo "LOCALSTACK_CONTAINER=$(docker run -d -p 4566:4566 localstack/localstack:4.0.3)" >> $GITHUB_ENV
130130
echo "EC2_METADATA_CONTAINER=$(docker run -d -p 1338:1338 amazon/amazon-ec2-metadata-mock:v1.9.2 --imdsv2)" >> $GITHUB_ENV
131131
aws --endpoint-url=http://localhost:4566 s3 mb s3://test-bucket
132+
aws --endpoint-url=http://localhost:4566 s3 mb s3://test-bucket-for-spawn
132133
aws --endpoint-url=http://localhost:4566 s3api create-bucket --bucket test-object-lock --object-lock-enabled-for-bucket
133134
aws --endpoint-url=http://localhost:4566 dynamodb create-table --table-name test-table --key-schema AttributeName=path,KeyType=HASH AttributeName=etag,KeyType=RANGE --attribute-definitions AttributeName=path,AttributeType=S AttributeName=etag,AttributeType=S --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5
134135
@@ -150,8 +151,9 @@ jobs:
150151
- name: Run object_store tests
151152
run: cargo test --features=aws,azure,gcp,http
152153

154+
# Don't rerun doc tests (some of them rely on features other than aws)
153155
- name: Run object_store tests (AWS native conditional put)
154-
run: cargo test --features=aws
156+
run: cargo test --lib --tests --features=aws
155157
env:
156158
AWS_CONDITIONAL_PUT: etag
157159
AWS_COPY_IF_NOT_EXISTS: multipart

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ rusty-tags.vi
55
.flatbuffers/
66
.idea/
77
.vscode
8+
.zed
89
.devcontainer
910
venv/*
1011
# created by doctests

CONTRIBUTING.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ Or directly with:
6666

6767
```shell
6868
aws s3 mb s3://test-bucket --endpoint-url=http://localhost:4566
69+
aws --endpoint-url=http://localhost:4566 s3 mb s3://test-bucket-for-spawn
6970
aws --endpoint-url=http://localhost:4566 dynamodb create-table --table-name test-table --key-schema AttributeName=path,KeyType=HASH AttributeName=etag,KeyType=RANGE --attribute-definitions AttributeName=path,AttributeType=S AttributeName=etag,AttributeType=S --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5
7071
```
7172

src/aws/mod.rs

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -497,6 +497,7 @@ impl MultipartStore for AmazonS3 {
497497
mod tests {
498498
use super::*;
499499
use crate::client::get::GetClient;
500+
use crate::client::SpawnedReqwestConnector;
500501
use crate::integration::*;
501502
use crate::tests::*;
502503
use crate::ClientOptions;
@@ -820,4 +821,57 @@ mod tests {
820821
store.delete(location).await.unwrap();
821822
}
822823
}
824+
825+
/// Integration test that ensures I/O is done on an alternate threadpool
826+
/// when using the `SpawnedReqwestConnector`.
827+
#[test]
828+
fn s3_alternate_threadpool_spawned_request_connector() {
829+
maybe_skip_integration!();
830+
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
831+
832+
// Runtime with I/O enabled
833+
let io_runtime = tokio::runtime::Builder::new_current_thread()
834+
.enable_all() // <-- turns on IO
835+
.build()
836+
.unwrap();
837+
838+
// Runtime without I/O enabled
839+
let non_io_runtime = tokio::runtime::Builder::new_current_thread()
840+
// note: no call to enable_all
841+
.build()
842+
.unwrap();
843+
844+
// run the io runtime in a different thread
845+
let io_handle = io_runtime.handle().clone();
846+
let thread_handle = std::thread::spawn(move || {
847+
io_runtime.block_on(async move {
848+
shutdown_rx.await.unwrap();
849+
});
850+
});
851+
852+
let store = AmazonS3Builder::from_env()
853+
// use different bucket to avoid collisions with other tests
854+
.with_bucket_name("test-bucket-for-spawn")
855+
.with_http_connector(SpawnedReqwestConnector::new(io_handle))
856+
.build()
857+
.unwrap();
858+
859+
// run a request on the non io runtime -- will fail if the connector
860+
// does not spawn the request to the io runtime
861+
non_io_runtime
862+
.block_on(async move {
863+
let path = Path::from("alternate_threadpool/test.txt");
864+
store.delete(&path).await.ok(); // remove the file if it exists from prior runs
865+
store.put(&path, "foo".into()).await?;
866+
let res = store.get(&path).await?.bytes().await?;
867+
assert_eq!(res.as_ref(), b"foo");
868+
store.delete(&path).await?; // cleanup
869+
Ok(()) as Result<()>
870+
})
871+
.expect("failed to run request on non io runtime");
872+
873+
// shutdown the io runtime and thread
874+
shutdown_tx.send(()).ok();
875+
thread_handle.join().expect("runtime thread panicked");
876+
}
823877
}

src/client/builder.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use crate::client::connection::HttpErrorKind;
19-
use crate::client::{HttpClient, HttpError, HttpRequest, HttpRequestBody};
18+
use crate::client::{HttpClient, HttpError, HttpErrorKind, HttpRequest, HttpRequestBody};
2019
use http::header::{InvalidHeaderName, InvalidHeaderValue};
2120
use http::uri::InvalidUri;
2221
use http::{HeaderName, HeaderValue, Method, Uri};

src/client/body.rs renamed to src/client/http/body.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use crate::client::connection::{HttpError, HttpErrorKind};
18+
use crate::client::{HttpError, HttpErrorKind};
1919
use crate::{collect_bytes, PutPayload};
2020
use bytes::Bytes;
2121
use futures::stream::BoxStream;
@@ -203,6 +203,18 @@ impl HttpResponseBody {
203203
}
204204
}
205205

206+
impl Body for HttpResponseBody {
207+
type Data = Bytes;
208+
type Error = HttpError;
209+
210+
fn poll_frame(
211+
mut self: Pin<&mut Self>,
212+
cx: &mut Context<'_>,
213+
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
214+
Pin::new(&mut self.0).poll_frame(cx)
215+
}
216+
}
217+
206218
impl From<Bytes> for HttpResponseBody {
207219
fn from(value: Bytes) -> Self {
208220
Self::new(Full::new(value).map_err(|e| match e {}))

src/client/connection.rs renamed to src/client/http/connection.rs

Lines changed: 63 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,15 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use crate::client::body::{HttpRequest, HttpResponse};
1918
use crate::client::builder::{HttpRequestBuilder, RequestBuilderError};
20-
use crate::client::HttpResponseBody;
19+
use crate::client::{HttpRequest, HttpResponse, HttpResponseBody};
2120
use crate::ClientOptions;
2221
use async_trait::async_trait;
2322
use http::{Method, Uri};
2423
use http_body_util::BodyExt;
2524
use std::error::Error;
2625
use std::sync::Arc;
26+
use tokio::runtime::Handle;
2727

2828
/// An HTTP protocol error
2929
///
@@ -298,6 +298,67 @@ impl HttpConnector for ReqwestConnector {
298298
}
299299
}
300300

301+
/// [`reqwest::Client`] connector that performs all I/O on the provided tokio
302+
/// [`Runtime`] (thread pool).
303+
///
304+
/// This adapter is most useful when you wish to segregate I/O from CPU bound
305+
/// work that may be happening on the [`Runtime`].
306+
///
307+
/// [`Runtime`]: tokio::runtime::Runtime
308+
///
309+
/// # Example: Spawning requests on separate runtime
310+
///
311+
/// ```
312+
/// # use std::sync::Arc;
313+
/// # use tokio::runtime::Runtime;
314+
/// # use object_store::azure::MicrosoftAzureBuilder;
315+
/// # use object_store::client::SpawnedReqwestConnector;
316+
/// # use object_store::ObjectStore;
317+
/// # fn get_io_runtime() -> Runtime {
318+
/// # tokio::runtime::Builder::new_current_thread().build().unwrap()
319+
/// # }
320+
/// # fn main() -> Result<(), object_store::Error> {
321+
/// // create a tokio runtime for I/O.
322+
/// let io_runtime: Runtime = get_io_runtime();
323+
/// // configure a store using the runtime.
324+
/// let handle = io_runtime.handle().clone(); // get a handle to the same runtime
325+
/// let store: Arc<dyn ObjectStore> = Arc::new(
326+
/// MicrosoftAzureBuilder::new()
327+
/// .with_http_connector(SpawnedReqwestConnector::new(handle))
328+
/// .with_container_name("my_container")
329+
/// .with_account("my_account")
330+
/// .build()?
331+
/// );
332+
/// // any requests made using store will be spawned on the io_runtime
333+
/// # Ok(())
334+
/// # }
335+
/// ```
336+
#[derive(Debug)]
337+
#[allow(missing_copy_implementations)]
338+
#[cfg(not(target_arch = "wasm32"))]
339+
pub struct SpawnedReqwestConnector {
340+
runtime: Handle,
341+
}
342+
343+
#[cfg(not(target_arch = "wasm32"))]
344+
impl SpawnedReqwestConnector {
345+
/// Create a new [`SpawnedReqwestConnector`] with the provided [`Handle`] to
346+
/// a tokio [`Runtime`]
347+
///
348+
/// [`Runtime`]: tokio::runtime::Runtime
349+
pub fn new(runtime: Handle) -> Self {
350+
Self { runtime }
351+
}
352+
}
353+
354+
#[cfg(not(target_arch = "wasm32"))]
355+
impl HttpConnector for SpawnedReqwestConnector {
356+
fn connect(&self, options: &ClientOptions) -> crate::Result<HttpClient> {
357+
let spawn_service = super::SpawnService::new(options.client()?, self.runtime.clone());
358+
Ok(HttpClient::new(spawn_service))
359+
}
360+
}
361+
301362
#[cfg(all(target_arch = "wasm32", target_os = "wasi"))]
302363
pub(crate) fn http_connector(
303364
custom: Option<Arc<dyn HttpConnector>>,

src/client/http/mod.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! HTTP client abstraction
19+
20+
mod body;
21+
pub use body::*;
22+
23+
mod connection;
24+
pub use connection::*;
25+
26+
mod spawn;
27+
pub use spawn::*;

0 commit comments

Comments
 (0)