Skip to content

Commit d4565c0

Browse files
authored
feat(mito): Defines the read Batch struct for mito2 (#2174)
* feat: define batch * feat: define Batch struct * feat: stream writer takes arrow's types * feat: make it compile * feat: use uint64vector and uint8vector * feat: add timestamps and primary key
1 parent 2168970 commit d4565c0

4 files changed

Lines changed: 180 additions & 61 deletions

File tree

src/mito2/src/error.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -294,14 +294,17 @@ pub enum Error {
294294
},
295295

296296
#[snafu(display(
297-
"Failed to deserialize field, source: {} location: {}",
297+
"Failed to deserialize field, source: {}, location: {}",
298298
source,
299299
location
300300
))]
301301
DeserializeField {
302302
source: memcomparable::Error,
303303
location: Location,
304304
},
305+
306+
#[snafu(display("Invalid batch, {}, location: {}", reason, location))]
307+
InvalidBatch { reason: String, location: Location },
305308
}
306309

307310
pub type Result<T> = std::result::Result<T, Error>;
@@ -351,6 +354,7 @@ impl ErrorExt for Error {
351354
SerializeField { .. } => StatusCode::Internal,
352355
NotSupportedField { .. } => StatusCode::Unsupported,
353356
DeserializeField { .. } => StatusCode::Unexpected,
357+
InvalidBatch { .. } => StatusCode::InvalidArguments,
354358
}
355359
}
356360

src/mito2/src/read.rs

Lines changed: 152 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -14,68 +14,182 @@
1414

1515
//! Common structs and utilities for reading data.
1616
17+
use std::sync::Arc;
18+
1719
use async_trait::async_trait;
1820
use common_time::Timestamp;
19-
use datatypes::vectors::VectorRef;
21+
use datatypes::vectors::{UInt64Vector, UInt8Vector, Vector, VectorRef};
22+
use snafu::ensure;
23+
use store_api::storage::ColumnId;
2024

21-
use crate::error::Result;
25+
use crate::error::{InvalidBatchSnafu, Result};
2226
use crate::metadata::RegionMetadataRef;
2327

24-
/// Storage internal representation of a batch of rows.
28+
/// Storage internal representation of a batch of rows
29+
/// for a primary key (time series).
2530
///
26-
/// Now the structure of [Batch] is still unstable, all pub fields may be changed.
27-
#[derive(Debug, Default, PartialEq, Eq, Clone)]
31+
/// Rows are sorted by primary key, timestamp, sequence desc, op_type desc.
32+
#[derive(Debug, PartialEq, Clone)]
2833
pub struct Batch {
29-
/// Rows organized in columnar format.
30-
pub columns: Vec<VectorRef>,
34+
/// Primary key encoded in a comparable form.
35+
primary_key: Vec<u8>,
36+
/// Timestamps of rows, should be sorted and not null.
37+
timestamps: VectorRef,
38+
/// Sequences of rows
39+
///
40+
/// UInt64 type, not null.
41+
sequences: Arc<UInt64Vector>,
42+
/// Op types of rows
43+
///
44+
/// UInt8 type, not null.
45+
op_types: Arc<UInt8Vector>,
46+
/// Fields organized in columnar format.
47+
fields: Vec<BatchColumn>,
3148
}
3249

3350
impl Batch {
34-
/// Create a new `Batch` from `columns`.
35-
///
36-
/// # Panics
37-
/// Panics if vectors in `columns` have different length.
38-
pub fn new(columns: Vec<VectorRef>) -> Batch {
39-
Self::assert_columns(&columns);
51+
/// Creates a new batch.
52+
pub fn new(
53+
primary_key: Vec<u8>,
54+
timestamps: VectorRef,
55+
sequences: Arc<UInt64Vector>,
56+
op_types: Arc<UInt8Vector>,
57+
fields: Vec<BatchColumn>,
58+
) -> Result<Batch> {
59+
BatchBuilder::new(primary_key, timestamps, sequences, op_types)
60+
.with_fields(fields)
61+
.build()
62+
}
63+
64+
/// Returns primary key of the batch.
65+
pub fn primary_key(&self) -> &[u8] {
66+
&self.primary_key
67+
}
68+
69+
/// Returns fields in the batch.
70+
pub fn fields(&self) -> &[BatchColumn] {
71+
&self.fields
72+
}
4073

41-
Batch { columns }
74+
/// Returns timestamps of the batch.
75+
pub fn timestamps(&self) -> &VectorRef {
76+
&self.timestamps
4277
}
4378

44-
/// Returns number of columns in the batch.
45-
pub fn num_columns(&self) -> usize {
46-
self.columns.len()
79+
/// Returns sequences of the batch.
80+
pub fn sequences(&self) -> &Arc<UInt64Vector> {
81+
&self.sequences
4782
}
4883

49-
/// Returns number of rows in the batch.
84+
/// Returns op types of the batch.
85+
pub fn op_types(&self) -> &Arc<UInt8Vector> {
86+
&self.op_types
87+
}
88+
89+
/// Returns the number of rows in the batch.
5090
pub fn num_rows(&self) -> usize {
51-
self.columns.get(0).map(|v| v.len()).unwrap_or(0)
91+
// All vectors have the same length so we use
92+
// the length of timestamps vector.
93+
self.timestamps.len()
5294
}
5395

5496
/// Returns true if the number of rows in the batch is 0.
5597
pub fn is_empty(&self) -> bool {
5698
self.num_rows() == 0
5799
}
100+
}
101+
102+
/// A column in a [Batch].
103+
#[derive(Debug, PartialEq, Eq, Clone)]
104+
pub struct BatchColumn {
105+
/// Id of the column.
106+
pub column_id: ColumnId,
107+
/// Data of the column.
108+
pub data: VectorRef,
109+
}
110+
111+
/// Builder to build [Batch].
112+
pub struct BatchBuilder {
113+
primary_key: Vec<u8>,
114+
timestamps: VectorRef,
115+
sequences: Arc<UInt64Vector>,
116+
op_types: Arc<UInt8Vector>,
117+
fields: Vec<BatchColumn>,
118+
}
119+
120+
impl BatchBuilder {
121+
/// Creates a new [BatchBuilder].
122+
pub fn new(
123+
primary_key: Vec<u8>,
124+
timestamps: VectorRef,
125+
sequences: Arc<UInt64Vector>,
126+
op_types: Arc<UInt8Vector>,
127+
) -> BatchBuilder {
128+
BatchBuilder {
129+
primary_key,
130+
timestamps,
131+
sequences,
132+
op_types,
133+
fields: Vec::new(),
134+
}
135+
}
58136

59-
/// Slice the batch, returning a new batch.
60-
///
61-
/// # Panics
62-
/// Panics if `offset + length > self.num_rows()`.
63-
pub fn slice(&self, offset: usize, length: usize) -> Batch {
64-
let columns = self
65-
.columns
66-
.iter()
67-
.map(|v| v.slice(offset, length))
68-
.collect();
69-
Batch { columns }
70-
}
71-
72-
fn assert_columns(columns: &[VectorRef]) {
73-
if columns.is_empty() {
74-
return;
137+
/// Set all field columns.
138+
pub fn with_fields(mut self, fields: Vec<BatchColumn>) -> Self {
139+
self.fields = fields;
140+
self
141+
}
142+
143+
/// Push a field column.
144+
pub fn push_field(&mut self, column: BatchColumn) -> &mut Self {
145+
self.fields.push(column);
146+
self
147+
}
148+
149+
/// Builds the [Batch].
150+
pub fn build(self) -> Result<Batch> {
151+
let ts_len = self.timestamps.len();
152+
ensure!(
153+
self.sequences.len() == ts_len,
154+
InvalidBatchSnafu {
155+
reason: format!(
156+
"sequence have different len {} != {}",
157+
self.sequences.len(),
158+
ts_len
159+
),
160+
}
161+
);
162+
ensure!(
163+
self.op_types.len() == ts_len,
164+
InvalidBatchSnafu {
165+
reason: format!(
166+
"op type have different len {} != {}",
167+
self.op_types.len(),
168+
ts_len
169+
),
170+
}
171+
);
172+
for column in &self.fields {
173+
ensure!(
174+
column.data.len() == ts_len,
175+
InvalidBatchSnafu {
176+
reason: format!(
177+
"column {} has different len {} != {}",
178+
column.column_id,
179+
column.data.len(),
180+
ts_len
181+
),
182+
}
183+
);
75184
}
76185

77-
let length = columns[0].len();
78-
assert!(columns.iter().all(|col| col.len() == length));
186+
Ok(Batch {
187+
primary_key: self.primary_key,
188+
timestamps: self.timestamps,
189+
sequences: self.sequences,
190+
op_types: self.op_types,
191+
fields: self.fields,
192+
})
79193
}
80194
}
81195

@@ -110,6 +224,7 @@ impl Source {
110224
unimplemented!()
111225
}
112226

227+
// TODO(yingwen): Maybe remove this method.
113228
/// Returns statisics of fetched batches.
114229
pub(crate) fn stats(&self) -> SourceStats {
115230
unimplemented!()

src/mito2/src/sst/parquet/writer.rs

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,14 @@
1515
//! Parquet writer.
1616
1717
use common_telemetry::debug;
18+
use datatypes::arrow::record_batch::RecordBatch;
1819
use object_store::ObjectStore;
1920
use parquet::basic::{Compression, Encoding, ZstdLevel};
2021
use parquet::file::metadata::KeyValue;
2122
use parquet::file::properties::WriterProperties;
23+
use snafu::ResultExt;
2224

23-
use crate::error::Result;
25+
use crate::error::{NewRecordBatchSnafu, Result};
2426
use crate::read::Source;
2527
use crate::sst::parquet::{SstInfo, WriteOptions, PARQUET_METADATA_KEY};
2628
use crate::sst::stream_writer::BufferedWriter;
@@ -64,17 +66,28 @@ impl<'a> ParquetWriter<'a> {
6466

6567
let writer_props = props_builder.build();
6668

69+
let arrow_schema = metadata.schema.arrow_schema();
6770
let mut buffered_writer = BufferedWriter::try_new(
6871
self.file_path.to_string(),
6972
self.object_store.clone(),
70-
&metadata.schema,
73+
arrow_schema.clone(),
7174
Some(writer_props),
7275
opts.write_buffer_size.as_bytes() as usize,
7376
)
7477
.await?;
7578

7679
while let Some(batch) = self.source.next_batch().await? {
77-
buffered_writer.write(&batch).await?;
80+
let arrow_batch = RecordBatch::try_new(
81+
arrow_schema.clone(),
82+
batch
83+
.fields()
84+
.iter()
85+
.map(|v| v.data.to_arrow_array())
86+
.collect::<Vec<_>>(),
87+
)
88+
.context(NewRecordBatchSnafu)?;
89+
90+
buffered_writer.write(&arrow_batch).await?;
7891
}
7992
// Get stats from the source.
8093
let stats = self.source.stats();

src/mito2/src/sst/stream_writer.rs

Lines changed: 7 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -17,24 +17,22 @@ use std::pin::Pin;
1717

1818
use common_datasource::buffered_writer::LazyBufferedWriter;
1919
use common_datasource::share_buffer::SharedBuffer;
20-
use datatypes::arrow;
20+
use datatypes::arrow::datatypes::SchemaRef;
2121
use datatypes::arrow::record_batch::RecordBatch;
22-
use datatypes::schema::SchemaRef;
2322
use object_store::ObjectStore;
2423
use parquet::arrow::ArrowWriter;
2524
use parquet::file::properties::WriterProperties;
2625
use parquet::format::FileMetaData;
2726
use snafu::ResultExt;
2827

2928
use crate::error;
30-
use crate::error::{NewRecordBatchSnafu, WriteParquetSnafu};
31-
use crate::read::Batch;
29+
use crate::error::WriteParquetSnafu;
3230

3331
/// Parquet writer that buffers row groups in memory and writes buffered data to an underlying
3432
/// storage by chunks to reduce memory consumption.
3533
pub struct BufferedWriter {
3634
inner: InnerBufferedWriter,
37-
arrow_schema: arrow::datatypes::SchemaRef,
35+
arrow_schema: SchemaRef,
3836
}
3937

4038
type InnerBufferedWriter = LazyBufferedWriter<
@@ -56,11 +54,10 @@ impl BufferedWriter {
5654
pub async fn try_new(
5755
path: String,
5856
store: ObjectStore,
59-
schema: &SchemaRef,
57+
arrow_schema: SchemaRef,
6058
props: Option<WriterProperties>,
6159
buffer_threshold: usize,
6260
) -> error::Result<Self> {
63-
let arrow_schema = schema.arrow_schema();
6461
let buffer = SharedBuffer::with_capacity(buffer_threshold);
6562

6663
let arrow_writer = ArrowWriter::try_new(buffer.clone(), arrow_schema.clone(), props)
@@ -82,24 +79,14 @@ impl BufferedWriter {
8279
})
8380
}),
8481
),
85-
arrow_schema: arrow_schema.clone(),
82+
arrow_schema,
8683
})
8784
}
8885

8986
/// Write a record batch to stream writer.
90-
pub async fn write(&mut self, batch: &Batch) -> error::Result<()> {
91-
let arrow_batch = RecordBatch::try_new(
92-
self.arrow_schema.clone(),
93-
batch
94-
.columns
95-
.iter()
96-
.map(|v| v.to_arrow_array())
97-
.collect::<Vec<_>>(),
98-
)
99-
.context(NewRecordBatchSnafu)?;
100-
87+
pub async fn write(&mut self, arrow_batch: &RecordBatch) -> error::Result<()> {
10188
self.inner
102-
.write(&arrow_batch)
89+
.write(arrow_batch)
10390
.await
10491
.context(error::WriteBufferSnafu)?;
10592
self.inner

0 commit comments

Comments
 (0)