Skip to content

Commit 26848f9

Browse files
feat: Replace SelectResult with FlightData (#776)
* feat: replace SelectResult with FlightData * Update tests/runner/src/env.rs Co-authored-by: Ruihang Xia <waynestxia@gmail.com> Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
1 parent 9099058 commit 26848f9

21 files changed

Lines changed: 161 additions & 750 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 48 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/api/build.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ fn main() {
2020
.file_descriptor_set_path(default_out_dir.join("greptime_fd.bin"))
2121
.compile(
2222
&[
23-
"greptime/v1/select.proto",
2423
"greptime/v1/greptime.proto",
2524
"greptime/v1/meta/common.proto",
2625
"greptime/v1/meta/heartbeat.proto",

src/api/greptime/v1/database.proto

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -48,17 +48,11 @@ message InsertExpr {
4848
message ObjectResult {
4949
ResultHeader header = 1;
5050
oneof result {
51-
SelectResult select = 2;
52-
MutateResult mutate = 3;
53-
FlightDataRaw flight_data = 4;
51+
MutateResult mutate = 2;
52+
FlightDataRaw flight_data = 3;
5453
}
5554
}
5655

57-
// TODO(LFC): replace with flight data
58-
message SelectResult {
59-
bytes raw_data = 1;
60-
}
61-
6256
message FlightDataRaw {
6357
repeated bytes raw_data = 1;
6458
}

src/api/greptime/v1/select.proto

Lines changed: 0 additions & 10 deletions
This file was deleted.

src/api/src/result.rs

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,8 @@
1414

1515
use common_error::prelude::ErrorExt;
1616

17-
use crate::v1::codec::SelectResult;
1817
use crate::v1::{
1918
admin_result, object_result, AdminResult, MutateResult, ObjectResult, ResultHeader,
20-
SelectResult as SelectResultRaw,
2119
};
2220

2321
pub const PROTOCOL_VERSION: u32 = 1;
@@ -36,7 +34,6 @@ pub struct ObjectResultBuilder {
3634

3735
pub enum Body {
3836
Mutate((Success, Failure)),
39-
Select(SelectResult),
4037
FlightDataRaw(FlightDataRaw),
4138
}
4239

@@ -69,11 +66,6 @@ impl ObjectResultBuilder {
6966
self
7067
}
7168

72-
pub fn select_result(mut self, select_result: SelectResult) -> Self {
73-
self.result = Some(Body::Select(select_result));
74-
self
75-
}
76-
7769
pub fn flight_data(mut self, flight_data: FlightDataRaw) -> Self {
7870
self.result = Some(Body::FlightDataRaw(flight_data));
7971
self
@@ -93,9 +85,6 @@ impl ObjectResultBuilder {
9385
failure,
9486
}))
9587
}
96-
Some(Body::Select(select)) => Some(object_result::Result::Select(SelectResultRaw {
97-
raw_data: select.into(),
98-
})),
9988
Some(Body::FlightDataRaw(raw_data)) => Some(object_result::Result::FlightData(
10089
crate::v1::FlightDataRaw { raw_data },
10190
)),

src/api/src/serde.rs

Lines changed: 0 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
pub use prost::DecodeError;
1616
use prost::Message;
1717

18-
use crate::v1::codec::SelectResult;
1918
use crate::v1::meta::TableRouteValue;
2019

2120
macro_rules! impl_convert_with_bytes {
@@ -36,80 +35,4 @@ macro_rules! impl_convert_with_bytes {
3635
};
3736
}
3837

39-
impl_convert_with_bytes!(SelectResult);
4038
impl_convert_with_bytes!(TableRouteValue);
41-
42-
#[cfg(test)]
43-
mod tests {
44-
use std::ops::Deref;
45-
46-
use crate::v1::codec::*;
47-
use crate::v1::{column, Column};
48-
49-
const SEMANTIC_TAG: i32 = 0;
50-
51-
#[test]
52-
fn test_convert_select_result() {
53-
let select_result = mock_select_result();
54-
55-
let bytes: Vec<u8> = select_result.into();
56-
let result: SelectResult = bytes.deref().try_into().unwrap();
57-
58-
assert_eq!(8, result.row_count);
59-
assert_eq!(1, result.columns.len());
60-
61-
let column = &result.columns[0];
62-
assert_eq!("foo", column.column_name);
63-
assert_eq!(SEMANTIC_TAG, column.semantic_type);
64-
assert_eq!(vec![1], column.null_mask);
65-
assert_eq!(
66-
vec![2, 3, 4, 5, 6, 7, 8],
67-
column.values.as_ref().unwrap().i32_values
68-
);
69-
}
70-
71-
#[should_panic]
72-
#[test]
73-
fn test_convert_select_result_wrong() {
74-
let select_result = mock_select_result();
75-
76-
let mut bytes: Vec<u8> = select_result.into();
77-
78-
// modify some bytes
79-
bytes[0] = 0b1;
80-
bytes[1] = 0b1;
81-
82-
let result: SelectResult = bytes.deref().try_into().unwrap();
83-
84-
assert_eq!(8, result.row_count);
85-
assert_eq!(1, result.columns.len());
86-
87-
let column = &result.columns[0];
88-
assert_eq!("foo", column.column_name);
89-
assert_eq!(SEMANTIC_TAG, column.semantic_type);
90-
assert_eq!(vec![1], column.null_mask);
91-
assert_eq!(
92-
vec![2, 3, 4, 5, 6, 7, 8],
93-
column.values.as_ref().unwrap().i32_values
94-
);
95-
}
96-
97-
fn mock_select_result() -> SelectResult {
98-
let values = column::Values {
99-
i32_values: vec![2, 3, 4, 5, 6, 7, 8],
100-
..Default::default()
101-
};
102-
let null_mask = vec![1];
103-
let column = Column {
104-
column_name: "foo".to_string(),
105-
semantic_type: SEMANTIC_TAG,
106-
values: Some(values),
107-
null_mask,
108-
..Default::default()
109-
};
110-
SelectResult {
111-
columns: vec![column],
112-
row_count: 8,
113-
}
114-
}
115-
}

src/api/src/v1.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,5 @@ tonic::include_proto!("greptime.v1");
1717

1818
pub const GREPTIME_FD_SET: &[u8] = tonic::include_file_descriptor_set!("greptime_fd");
1919

20-
pub mod codec {
21-
tonic::include_proto!("greptime.v1.codec");
22-
}
23-
2420
mod column_def;
2521
pub mod meta;

src/client/src/database.rs

Lines changed: 5 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -12,24 +12,16 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use std::sync::Arc;
16-
17-
use api::v1::codec::SelectResult as GrpcSelectResult;
18-
use api::v1::column::SemanticType;
1915
use api::v1::{
2016
object_expr, object_result, query_request, DatabaseRequest, ExprHeader, InsertExpr,
2117
MutateResult as GrpcMutateResult, ObjectExpr, ObjectResult as GrpcObjectResult, QueryRequest,
2218
};
2319
use common_error::status_code::StatusCode;
2420
use common_grpc::flight::{raw_flight_data_to_message, FlightMessage};
25-
use common_grpc_expr::column_to_vector;
2621
use common_query::Output;
27-
use common_recordbatch::{RecordBatch, RecordBatches};
28-
use datatypes::prelude::*;
29-
use datatypes::schema::{ColumnSchema, Schema};
3022
use snafu::{ensure, OptionExt, ResultExt};
3123

32-
use crate::error::{ColumnToVectorSnafu, ConvertSchemaSnafu, DatanodeSnafu, DecodeSelectSnafu};
24+
use crate::error::DatanodeSnafu;
3325
use crate::{error, Client, Result};
3426

3527
pub const PROTOCOL_VERSION: u32 = 1;
@@ -141,7 +133,6 @@ impl Database {
141133

142134
#[derive(Debug)]
143135
pub enum ObjectResult {
144-
Select(GrpcSelectResult),
145136
FlightData(Vec<FlightMessage>),
146137
Mutate(GrpcMutateResult),
147138
}
@@ -165,10 +156,6 @@ impl TryFrom<api::v1::ObjectResult> for ObjectResult {
165156
actual: 0_usize,
166157
})?;
167158
Ok(match obj_result {
168-
object_result::Result::Select(select) => {
169-
let result = (*select.raw_data).try_into().context(DecodeSelectSnafu)?;
170-
ObjectResult::Select(result)
171-
}
172159
object_result::Result::Mutate(mutate) => ObjectResult::Mutate(mutate),
173160
object_result::Result::FlightData(flight_data) => {
174161
let flight_messages = raw_flight_data_to_message(flight_data.raw_data)
@@ -188,41 +175,6 @@ impl TryFrom<ObjectResult> for Output {
188175

189176
fn try_from(value: ObjectResult) -> Result<Self> {
190177
let output = match value {
191-
ObjectResult::Select(select) => {
192-
let vectors = select
193-
.columns
194-
.iter()
195-
.map(|column| {
196-
column_to_vector(column, select.row_count).context(ColumnToVectorSnafu)
197-
})
198-
.collect::<Result<Vec<VectorRef>>>()?;
199-
200-
let column_schemas = select
201-
.columns
202-
.iter()
203-
.zip(vectors.iter())
204-
.map(|(column, vector)| {
205-
let datatype = vector.data_type();
206-
// nullable or not, does not affect the output
207-
let mut column_schema =
208-
ColumnSchema::new(&column.column_name, datatype, true);
209-
if column.semantic_type == SemanticType::Timestamp as i32 {
210-
column_schema = column_schema.with_time_index(true);
211-
}
212-
column_schema
213-
})
214-
.collect::<Vec<ColumnSchema>>();
215-
216-
let schema = Arc::new(Schema::try_new(column_schemas).context(ConvertSchemaSnafu)?);
217-
let recordbatches = if vectors.is_empty() {
218-
RecordBatches::try_new(schema, vec![])
219-
} else {
220-
RecordBatch::new(schema, vectors)
221-
.and_then(|batch| RecordBatches::try_new(batch.schema.clone(), vec![batch]))
222-
}
223-
.context(error::CreateRecordBatchesSnafu)?;
224-
Output::RecordBatches(recordbatches)
225-
}
226178
ObjectResult::Mutate(mutate) => {
227179
if mutate.failure != 0 {
228180
return error::MutateFailureSnafu {
@@ -240,17 +192,19 @@ impl TryFrom<ObjectResult> for Output {
240192

241193
#[cfg(test)]
242194
mod tests {
195+
use std::sync::Arc;
196+
243197
use api::helper::ColumnDataTypeWrapper;
244198
use api::v1::Column;
245199
use common_grpc::select::{null_mask, values};
200+
use common_grpc_expr::column_to_vector;
201+
use datatypes::prelude::{Vector, VectorRef};
246202
use datatypes::vectors::{
247203
BinaryVector, BooleanVector, DateTimeVector, DateVector, Float32Vector, Float64Vector,
248204
Int16Vector, Int32Vector, Int64Vector, Int8Vector, StringVector, UInt16Vector,
249205
UInt32Vector, UInt64Vector, UInt8Vector,
250206
};
251207

252-
use super::*;
253-
254208
#[test]
255209
fn test_column_to_vector() {
256210
let mut column = create_test_column(Arc::new(BooleanVector::from(vec![true])));

0 commit comments

Comments
 (0)