Skip to content

Commit 3329da5

Browse files
authored
feat(puffin): add partial reader (#2741)
* feat(puffin): add partial reader Signed-off-by: Zhenchi <zhongzc_arch@outlook.com> * address comment Signed-off-by: Zhenchi <zhongzc_arch@outlook.com> --------- Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
1 parent a24f8c9 commit 3329da5

8 files changed

Lines changed: 583 additions & 0 deletions

File tree

Cargo.lock

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ opentelemetry-proto = { git = "https://github.com/waynexia/opentelemetry-rust.gi
9898
] }
9999
parquet = "47.0"
100100
paste = "1.0"
101+
pin-project = "1.0"
101102
prometheus = { version = "0.13.3", features = ["process"] }
102103
prost = "0.12"
103104
raft-engine = { git = "https://github.com/tikv/raft-engine.git", rev = "22dfb426cd994602b57725ef080287d3e53db479" }

src/puffin/Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,5 +6,10 @@ license.workspace = true
66

77
[dependencies]
88
derive_builder.workspace = true
9+
futures.workspace = true
10+
pin-project.workspace = true
911
serde.workspace = true
1012
serde_json.workspace = true
13+
14+
[dev-dependencies]
15+
tokio.workspace = true

src/puffin/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,4 @@
1414

1515
pub mod blob_metadata;
1616
pub mod file_metadata;
17+
pub mod partial_reader;

src/puffin/src/partial_reader.rs

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
// Copyright 2023 Greptime Team
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
mod r#async;
16+
mod position;
17+
mod sync;
18+
19+
use pin_project::pin_project;
20+
21+
/// `PartialReader` to perform synchronous or asynchronous reads on a portion of a resource.
22+
#[pin_project]
23+
pub struct PartialReader<R> {
24+
/// offset of the portion in the resource
25+
offset: u64,
26+
27+
/// size of the portion in the resource
28+
size: u64,
29+
30+
/// Resource for the portion.
31+
/// The `offset` and `size` fields are used to determine the slice of `source` to read.
32+
#[pin]
33+
source: R,
34+
35+
/// The current position within the portion.
36+
///
37+
/// A `None` value indicates that no read operations have been performed yet on this portion.
38+
/// Before a read operation can be performed, the resource must be positioned at the correct offset in the portion.
39+
/// After the first read operation, this field will be set to `Some(_)`, representing the current read position in the portion.
40+
position_in_portion: Option<u64>,
41+
}
42+
43+
impl<R> PartialReader<R> {
44+
/// Creates a new `PartialReader` for the given resource.
45+
pub fn new(source: R, offset: u64, size: u64) -> Self {
46+
Self {
47+
offset,
48+
size,
49+
source,
50+
position_in_portion: None,
51+
}
52+
}
53+
54+
/// Returns the current position in the portion.
55+
pub fn position(&self) -> u64 {
56+
self.position_in_portion.unwrap_or_default()
57+
}
58+
59+
/// Returns the size of the portion in portion.
60+
pub fn size(&self) -> u64 {
61+
self.size
62+
}
63+
64+
/// Returns whether the portion is empty.
65+
pub fn is_empty(&self) -> bool {
66+
self.size == 0
67+
}
68+
69+
/// Returns whether the current position is at the end of the portion.
70+
pub fn is_eof(&self) -> bool {
71+
self.position() == self.size
72+
}
73+
}
74+
75+
#[cfg(test)]
76+
mod tests {
77+
use std::io::Cursor;
78+
79+
use super::*;
80+
81+
#[test]
82+
fn is_empty_returns_true_for_zero_length_blob() {
83+
let data: Vec<u8> = (0..100).collect();
84+
let reader = PartialReader::new(Cursor::new(data), 10, 0);
85+
assert!(reader.is_empty());
86+
assert!(reader.is_eof());
87+
}
88+
89+
#[test]
90+
fn is_empty_returns_false_for_non_zero_length_blob() {
91+
let data: Vec<u8> = (0..100).collect();
92+
let reader = PartialReader::new(Cursor::new(data), 10, 30);
93+
assert!(!reader.is_empty());
94+
}
95+
}
Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
// Copyright 2023 Greptime Team
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::io;
16+
use std::pin::Pin;
17+
use std::task::{Context, Poll};
18+
19+
use futures::{ready, AsyncRead, AsyncSeek};
20+
21+
use crate::partial_reader::position::position_after_seek;
22+
use crate::partial_reader::PartialReader;
23+
24+
impl<R: AsyncRead + AsyncSeek + Unpin> AsyncRead for PartialReader<R> {
25+
fn poll_read(
26+
mut self: Pin<&mut Self>,
27+
cx: &mut Context<'_>,
28+
buf: &mut [u8],
29+
) -> Poll<io::Result<usize>> {
30+
// past end of portion
31+
if self.position() > self.size() {
32+
return Poll::Ready(Err(io::Error::new(
33+
io::ErrorKind::InvalidInput,
34+
"invalid read past the end of the portion",
35+
)));
36+
}
37+
38+
// end of portion
39+
if self.is_eof() {
40+
return Poll::Ready(Ok(0));
41+
}
42+
43+
// first read, seek to the correct offset
44+
if self.position_in_portion.is_none() {
45+
// seek operation
46+
let seek_from = io::SeekFrom::Start(self.offset);
47+
ready!(self.as_mut().project().source.poll_seek(cx, seek_from))?;
48+
49+
self.position_in_portion = Some(0);
50+
}
51+
52+
// prevent reading over the end
53+
let max_len = (self.size() - self.position_in_portion.unwrap()) as usize;
54+
let actual_len = max_len.min(buf.len());
55+
56+
// create a limited reader
57+
let target_buf = &mut buf[..actual_len];
58+
59+
// read operation
60+
let read_bytes = ready!(self.as_mut().project().source.poll_read(cx, target_buf))?;
61+
self.position_in_portion = Some(self.position() + read_bytes as u64);
62+
63+
Poll::Ready(Ok(read_bytes))
64+
}
65+
}
66+
67+
impl<R: AsyncRead + AsyncSeek + Unpin> AsyncSeek for PartialReader<R> {
68+
fn poll_seek(
69+
mut self: Pin<&mut Self>,
70+
cx: &mut Context<'_>,
71+
pos: io::SeekFrom,
72+
) -> Poll<io::Result<u64>> {
73+
let new_position = position_after_seek(pos, self.position(), self.size())?;
74+
let pos = io::SeekFrom::Start(self.offset + new_position);
75+
ready!(self.as_mut().project().source.poll_seek(cx, pos))?;
76+
77+
self.position_in_portion = Some(new_position);
78+
Poll::Ready(Ok(new_position))
79+
}
80+
}
81+
82+
#[cfg(test)]
83+
mod tests {
84+
use futures::io::Cursor;
85+
use futures::{AsyncReadExt as _, AsyncSeekExt as _};
86+
87+
use super::*;
88+
89+
#[tokio::test]
90+
async fn read_all_data_in_portion() {
91+
let data: Vec<u8> = (0..100).collect();
92+
let mut reader = PartialReader::new(Cursor::new(data.clone()), 0, 100);
93+
let mut buf = vec![0; 100];
94+
assert_eq!(reader.read(&mut buf).await.unwrap(), 100);
95+
assert_eq!(buf, data);
96+
}
97+
98+
#[tokio::test]
99+
async fn read_part_of_data_in_portion() {
100+
let data: Vec<u8> = (0..100).collect();
101+
let mut reader = PartialReader::new(Cursor::new(data), 10, 30);
102+
let mut buf = vec![0; 30];
103+
assert_eq!(reader.read(&mut buf).await.unwrap(), 30);
104+
assert_eq!(buf, (10..40).collect::<Vec<u8>>());
105+
}
106+
107+
#[tokio::test]
108+
async fn seek_and_read_data_in_portion() {
109+
let data: Vec<u8> = (0..100).collect();
110+
let mut reader = PartialReader::new(Cursor::new(data), 10, 30);
111+
assert_eq!(reader.seek(io::SeekFrom::Start(10)).await.unwrap(), 10);
112+
let mut buf = vec![0; 10];
113+
assert_eq!(reader.read(&mut buf).await.unwrap(), 10);
114+
assert_eq!(buf, (20..30).collect::<Vec<u8>>());
115+
}
116+
117+
#[tokio::test]
118+
async fn read_past_end_of_portion_is_eof() {
119+
let data: Vec<u8> = (0..100).collect();
120+
let mut reader = PartialReader::new(Cursor::new(data), 10, 30);
121+
let mut buf = vec![0; 50];
122+
assert_eq!(reader.read(&mut buf).await.unwrap(), 30);
123+
assert_eq!(reader.read(&mut buf).await.unwrap(), 0); // hit EOF
124+
}
125+
126+
#[tokio::test]
127+
async fn seek_past_end_of_portion_returns_error() {
128+
let data: Vec<u8> = (0..100).collect();
129+
let mut reader = PartialReader::new(Cursor::new(data), 10, 30);
130+
// seeking past the portion returns an error
131+
assert!(reader.seek(io::SeekFrom::Start(31)).await.is_err());
132+
}
133+
134+
#[tokio::test]
135+
async fn seek_to_negative_position_returns_error() {
136+
let data: Vec<u8> = (0..100).collect();
137+
let mut reader = PartialReader::new(Cursor::new(data), 10, 30);
138+
assert_eq!(reader.seek(io::SeekFrom::Start(10)).await.unwrap(), 10);
139+
// seeking back to the start of the portion
140+
assert_eq!(reader.seek(io::SeekFrom::Current(-10)).await.unwrap(), 0);
141+
// seeking to a negative position returns an error
142+
assert!(reader.seek(io::SeekFrom::Current(-1)).await.is_err());
143+
}
144+
145+
#[tokio::test]
146+
async fn seek_from_end_of_portion() {
147+
let data: Vec<u8> = (0..100).collect();
148+
let mut reader = PartialReader::new(Cursor::new(data), 10, 30);
149+
let mut buf = vec![0; 10];
150+
// seek to 10 bytes before the end of the portion
151+
assert_eq!(reader.seek(io::SeekFrom::End(-10)).await.unwrap(), 20);
152+
assert_eq!(reader.read(&mut buf).await.unwrap(), 10);
153+
// the final 10 bytes of the portion
154+
assert_eq!(buf, (30..40).collect::<Vec<u8>>());
155+
assert!(reader.is_eof());
156+
}
157+
158+
#[tokio::test]
159+
async fn seek_from_end_to_negative_position_returns_error() {
160+
let data: Vec<u8> = (0..100).collect();
161+
let mut reader = PartialReader::new(Cursor::new(data.clone()), 10, 30);
162+
// seeking to a negative position returns an error
163+
assert!(reader.seek(io::SeekFrom::End(-31)).await.is_err());
164+
}
165+
166+
#[tokio::test]
167+
async fn zero_length_portion_returns_zero_on_read() {
168+
let data: Vec<u8> = (0..100).collect();
169+
let mut reader = PartialReader::new(Cursor::new(data), 10, 0);
170+
let mut buf = vec![0; 10];
171+
// reading a portion with zero length returns 0 bytes
172+
assert_eq!(reader.read(&mut buf).await.unwrap(), 0);
173+
}
174+
175+
#[tokio::test]
176+
async fn is_eof_returns_true_at_end_of_portion() {
177+
let data: Vec<u8> = (0..100).collect();
178+
let mut reader = PartialReader::new(Cursor::new(data), 10, 30);
179+
// we are not at the end of the portion
180+
assert!(!reader.is_eof());
181+
let mut buf = vec![0; 30];
182+
assert_eq!(reader.read(&mut buf).await.unwrap(), 30);
183+
// we are at the end of the portion
184+
assert!(reader.is_eof());
185+
}
186+
187+
#[tokio::test]
188+
async fn position_resets_after_seek_to_start() {
189+
let data: Vec<u8> = (0..100).collect();
190+
let mut reader = PartialReader::new(Cursor::new(data), 10, 30);
191+
assert_eq!(reader.seek(io::SeekFrom::Start(10)).await.unwrap(), 10);
192+
assert_eq!(reader.position(), 10);
193+
assert_eq!(reader.seek(io::SeekFrom::Start(0)).await.unwrap(), 0);
194+
assert_eq!(reader.position(), 0);
195+
}
196+
}

0 commit comments

Comments
 (0)