Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ etcd-client = "0.14"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "67ee5f94e5da72314cda7d0eb90106eb1c16a1ae" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "442348b2518c0bf187fb1ad011ba370c38b96cc4" }
hex = "0.4"
http = "1"
humantime = "2.1"
Expand Down
1 change: 1 addition & 0 deletions src/common/test-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ workspace = true

[dependencies]
client = { workspace = true, features = ["testing"] }
common-grpc.workspace = true
common-query.workspace = true
common-recordbatch.workspace = true
once_cell.workspace = true
Expand Down
26 changes: 26 additions & 0 deletions src/common/test-util/src/flight.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use common_grpc::flight::{FlightEncoder, FlightMessage};
use common_grpc::FlightData;
use common_recordbatch::DfRecordBatch;

/// Encodes record batch to a Schema message and a RecordBatch message.
pub fn encode_to_flight_data(rb: DfRecordBatch) -> (FlightData, FlightData) {
let mut encoder = FlightEncoder::default();
(
encoder.encode(FlightMessage::Schema(rb.schema())),
encoder.encode(FlightMessage::RecordBatch(rb)),
)
}
1 change: 1 addition & 0 deletions src/common/test-util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::path::{Path, PathBuf};
use std::process::Command;
use std::sync::LazyLock;

pub mod flight;
pub mod ports;
pub mod recordbatch;
pub mod temp_dir;
Expand Down
18 changes: 9 additions & 9 deletions src/log-store/src/kafka/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ impl KafkaLogStore {
}

fn build_entry(
data: &mut Vec<u8>,
data: Vec<u8>,
entry_id: EntryId,
region_id: RegionId,
provider: &Provider,
Expand All @@ -109,10 +109,10 @@ fn build_entry(
provider: provider.clone(),
region_id,
entry_id,
data: std::mem::take(data),
data,
})
} else {
let parts = std::mem::take(data)
let parts = data
.chunks(max_data_size)
.map(|s| s.into())
.collect::<Vec<_>>();
Expand Down Expand Up @@ -140,7 +140,7 @@ impl LogStore for KafkaLogStore {
/// Creates an [Entry].
fn entry(
&self,
data: &mut Vec<u8>,
data: Vec<u8>,
entry_id: EntryId,
region_id: RegionId,
provider: &Provider,
Expand Down Expand Up @@ -479,7 +479,7 @@ mod tests {
fn test_build_naive_entry() {
let provider = Provider::kafka_provider("my_topic".to_string());
let region_id = RegionId::new(1, 1);
let entry = build_entry(&mut vec![1; 100], 1, region_id, &provider, 120);
let entry = build_entry(vec![1; 100], 1, region_id, &provider, 120);

assert_eq!(
entry.into_naive_entry().unwrap(),
Expand All @@ -496,7 +496,7 @@ mod tests {
fn test_build_into_multiple_part_entry() {
let provider = Provider::kafka_provider("my_topic".to_string());
let region_id = RegionId::new(1, 1);
let entry = build_entry(&mut vec![1; 100], 1, region_id, &provider, 50);
let entry = build_entry(vec![1; 100], 1, region_id, &provider, 50);

assert_eq!(
entry.into_multiple_part_entry().unwrap(),
Expand All @@ -510,7 +510,7 @@ mod tests {
);

let region_id = RegionId::new(1, 1);
let entry = build_entry(&mut vec![1; 100], 1, region_id, &provider, 21);
let entry = build_entry(vec![1; 100], 1, region_id, &provider, 21);

assert_eq!(
entry.into_multiple_part_entry().unwrap(),
Expand Down Expand Up @@ -545,9 +545,9 @@ mod tests {
) -> Vec<Entry> {
(0..num_entries)
.map(|_| {
let mut data: Vec<u8> = (0..data_len).map(|_| rand::random::<u8>()).collect();
let data: Vec<u8> = (0..data_len).map(|_| rand::random::<u8>()).collect();
// Always set `entry_id` to 0, the real entry_id will be set during the read.
logstore.entry(&mut data, 0, region_id, provider).unwrap()
logstore.entry(data, 0, region_id, provider).unwrap()
})
.collect()
}
Expand Down
4 changes: 2 additions & 2 deletions src/log-store/src/raft_engine/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ impl LogStore for RaftEngineLogStore {

fn entry(
&self,
data: &mut Vec<u8>,
data: Vec<u8>,
entry_id: EntryId,
region_id: RegionId,
provider: &Provider,
Expand All @@ -455,7 +455,7 @@ impl LogStore for RaftEngineLogStore {
provider: provider.clone(),
region_id,
entry_id,
data: std::mem::take(data),
data,
}))
}

Expand Down
1 change: 1 addition & 0 deletions src/mito2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ common-config.workspace = true
common-datasource.workspace = true
common-decimal.workspace = true
common-error.workspace = true
common-grpc.workspace = true
common-macro.workspace = true
common-meta.workspace = true
common-query.workspace = true
Expand Down
5 changes: 2 additions & 3 deletions src/mito2/benches/bench_filter_time_partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ fn random_array(num: usize) -> BulkPart {
.unwrap();
BulkPart {
batch,
num_rows: num,
max_ts: max,
min_ts: min,
sequence: 0,
timestamp_index: 0,
raw_data: None,
}
}

Expand All @@ -76,7 +76,6 @@ fn filter_arrow_impl(part: &BulkPart, min: i64, max: i64) -> Option<BulkPart> {
return None;
}

let num_rows_filtered = ts_filtered.len();
let i64array = ts_filtered
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
Expand All @@ -87,11 +86,11 @@ fn filter_arrow_impl(part: &BulkPart, min: i64, max: i64) -> Option<BulkPart> {
let batch = arrow::compute::filter_record_batch(&part.batch, &predicate).unwrap();
Some(BulkPart {
batch,
num_rows: num_rows_filtered,
max_ts: max,
min_ts: min,
sequence: 0,
timestamp_index: part.timestamp_index,
raw_data: None,
})
}

Expand Down
8 changes: 8 additions & 0 deletions src/mito2/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1037,6 +1037,13 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Failed to decode bulk wal entry"))]
ConvertBulkWalEntry {
#[snafu(implicit)]
location: Location,
source: common_grpc::Error,
},
}

pub type Result<T, E = Error> = std::result::Result<T, E>;
Expand Down Expand Up @@ -1192,6 +1199,7 @@ impl ErrorExt for Error {
ScanSeries { source, .. } => source.status_code(),

ScanMultiTimes { .. } => StatusCode::InvalidArguments,
Error::ConvertBulkWalEntry { source, .. } => source.status_code(),
}
}

Expand Down
67 changes: 64 additions & 3 deletions src/mito2/src/memtable/bulk/part.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ use std::collections::VecDeque;
use std::sync::Arc;

use api::helper::{value_to_grpc_value, ColumnDataTypeWrapper};
use api::v1::{Mutation, OpType};
use api::v1::bulk_wal_entry::Body;
use api::v1::{bulk_wal_entry, ArrowIpc, BulkWalEntry, Mutation, OpType};
use bytes::Bytes;
use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage};
use common_recordbatch::DfRecordBatch as RecordBatch;
use common_time::timestamp::TimeUnit;
use datafusion::arrow::array::{TimestampNanosecondArray, UInt64Builder};
Expand Down Expand Up @@ -59,11 +61,66 @@ use crate::sst::to_sst_arrow_schema;
#[derive(Clone)]
pub struct BulkPart {
pub batch: RecordBatch,
pub num_rows: usize,
pub max_ts: i64,
pub min_ts: i64,
pub sequence: u64,
pub timestamp_index: usize,
pub raw_data: Option<ArrowIpc>,
}

impl TryFrom<BulkWalEntry> for BulkPart {
type Error = error::Error;

fn try_from(value: BulkWalEntry) -> std::result::Result<Self, Self::Error> {
match value.body.expect("Entry payload should be present") {
Body::ArrowIpc(ipc) => {
let mut decoder = FlightDecoder::try_from_schema_bytes(&ipc.schema)
.context(error::ConvertBulkWalEntrySnafu)?;
let batch = decoder
.try_decode_record_batch(&ipc.data_header, &ipc.payload)
.context(error::ConvertBulkWalEntrySnafu)?;
Ok(Self {
batch,
max_ts: value.max_ts,
min_ts: value.min_ts,
sequence: value.sequence,
timestamp_index: value.timestamp_index as usize,
raw_data: Some(ipc),
})
}
}
}
}

impl From<&BulkPart> for BulkWalEntry {
fn from(value: &BulkPart) -> Self {
if let Some(ipc) = &value.raw_data {
BulkWalEntry {
sequence: value.sequence,
max_ts: value.max_ts,
min_ts: value.min_ts,
timestamp_index: value.timestamp_index as u32,
body: Some(Body::ArrowIpc(ipc.clone())),
}
} else {
let mut encoder = FlightEncoder::default();
let schema_bytes = encoder
.encode(FlightMessage::Schema(value.batch.schema()))
.data_header;
let rb_data = encoder.encode(FlightMessage::RecordBatch(value.batch.clone()));
BulkWalEntry {
sequence: value.sequence,
max_ts: value.max_ts,
min_ts: value.min_ts,
timestamp_index: value.timestamp_index as u32,
body: Some(Body::ArrowIpc(ArrowIpc {
schema: schema_bytes,
data_header: rb_data.data_header,
payload: rb_data.data_body,
})),
}
}
}
}

impl BulkPart {
Expand All @@ -84,7 +141,7 @@ impl BulkPart {
.collect::<datatypes::error::Result<Vec<_>>>()
.context(error::ComputeVectorSnafu)?;

let rows = (0..self.num_rows)
let rows = (0..self.num_rows())
.map(|row_idx| {
let values = (0..self.batch.num_columns())
.map(|col_idx| {
Expand Down Expand Up @@ -130,6 +187,10 @@ impl BulkPart {
pub fn timestamps(&self) -> &ArrayRef {
self.batch.column(self.timestamp_index)
}

pub fn num_rows(&self) -> usize {
self.batch.num_rows()
}
}

#[derive(Debug)]
Expand Down
4 changes: 2 additions & 2 deletions src/mito2/src/memtable/simple_bulk_memtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ impl Memtable for SimpleBulkMemtable {
value_bytes: part.estimated_size(),
min_ts: part.min_ts,
max_ts: part.max_ts,
num_rows: part.num_rows,
num_rows: part.num_rows(),
max_sequence: sequence,
});
Ok(())
Expand Down Expand Up @@ -563,8 +563,8 @@ mod tests {
sequence: 1,
min_ts: 1,
max_ts: 2,
num_rows: 2,
timestamp_index: 0,
raw_data: None,
};
memtable.write_bulk(part).unwrap();

Expand Down
Loading
Loading