Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
7836e03
feat: add json type and vector
CookiePieWw Aug 22, 2024
ed2f07d
fix: allow to create and insert json data
CookiePieWw Aug 23, 2024
f910b37
feat: udf to query json as string
CookiePieWw Aug 26, 2024
c334919
refactor: remove JsonbValue and JsonVector
CookiePieWw Aug 26, 2024
b2585ac
feat: show json value as strings
CookiePieWw Aug 27, 2024
45c853b
chore: make ci happy
CookiePieWw Aug 27, 2024
5e9e417
test: adunit test and sqlness test
CookiePieWw Aug 28, 2024
092ee0c
refactor: use binary as grpc value of json
CookiePieWw Aug 28, 2024
6d5241d
Merge branch 'main' into json
CookiePieWw Aug 28, 2024
cb88312
fix: use non-preserve-order jsonb
CookiePieWw Aug 29, 2024
7ceddd0
test: revert changed test
CookiePieWw Aug 29, 2024
26e8d1a
refactor: change udf get_by_path to jq
CookiePieWw Aug 29, 2024
d3752b3
chore: make ci happy
CookiePieWw Aug 29, 2024
601f647
fix: distinguish binary and json in proto
CookiePieWw Aug 30, 2024
13c370c
chore: delete udf for future pr
CookiePieWw Aug 30, 2024
619f490
refactor: remove Value(Json)
CookiePieWw Aug 30, 2024
9f07158
chore: follow review comments
CookiePieWw Aug 30, 2024
1c279e4
test: some tests and checks
CookiePieWw Aug 30, 2024
ae85538
test: fix unit tests
CookiePieWw Aug 31, 2024
411cf8f
chore: follow review comments
CookiePieWw Sep 2, 2024
3ca3d42
chore: corresponding changes to proto
CookiePieWw Sep 4, 2024
1f2d751
fix: change grpc and pgsql server behavior alongside with sqlness/cru…
CookiePieWw Sep 4, 2024
5645c63
chore: follow review comments
CookiePieWw Sep 5, 2024
d908a46
feat: udf of conversions between json and strings, used for grpc server
CookiePieWw Sep 5, 2024
34c15b0
refactor: rename to_string to json_to_string
CookiePieWw Sep 5, 2024
94e57be
chore: resolve merge conflict
CookiePieWw Sep 5, 2024
315b1e6
test: add more sqlness test for json
CookiePieWw Sep 5, 2024
ac3f9e1
chore: thanks for review :)
CookiePieWw Sep 5, 2024
65b71ce
chore: merge main into json
CookiePieWw Sep 9, 2024
9141eaa
Apply suggestions from code review
WenyXu Sep 9, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", r
humantime = "2.1"
humantime-serde = "1.1"
itertools = "0.10"
jsonb = { git = "https://github.com/CookiePieWw/jsonb.git", branch = "trigger-preserve-order-feature" }
Comment thread
WenyXu marked this conversation as resolved.
Outdated
lazy_static = "1.4"
meter-core = { git = "https://github.com/GreptimeTeam/greptime-meter.git", rev = "80eb97c24c88af4dd9a86f8bbaf50e741d4eb8cd" }
mockall = "0.11.4"
Expand Down
21 changes: 16 additions & 5 deletions src/api/src/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ impl TryFrom<ConcreteDataType> for ColumnDataTypeWrapper {
ConcreteDataType::UInt64(_) => ColumnDataType::Uint64,
ConcreteDataType::Float32(_) => ColumnDataType::Float32,
ConcreteDataType::Float64(_) => ColumnDataType::Float64,
ConcreteDataType::Binary(_) => ColumnDataType::Binary,
ConcreteDataType::Binary(_) | ConcreteDataType::Json(_) => ColumnDataType::Binary,
ConcreteDataType::String(_) => ColumnDataType::String,
ConcreteDataType::Date(_) => ColumnDataType::Date,
ConcreteDataType::DateTime(_) => ColumnDataType::Datetime,
Expand Down Expand Up @@ -443,7 +443,7 @@ pub fn push_vals(column: &mut Column, origin_count: usize, vector: VectorRef) {
.push(convert_i128_to_interval(val.to_i128())),
},
Value::Decimal128(val) => values.decimal128_values.push(convert_to_pb_decimal128(val)),
Value::List(_) | Value::Duration(_) => unreachable!(),
Value::List(_) | Value::Duration(_) | Value::Json(_) => unreachable!(),
Comment thread
WenyXu marked this conversation as resolved.
Outdated
});
column.null_mask = null_mask.into_vec();
}
Expand Down Expand Up @@ -649,7 +649,8 @@ pub fn pb_values_to_vector_ref(data_type: &ConcreteDataType, values: Values) ->
ConcreteDataType::Null(_)
| ConcreteDataType::List(_)
| ConcreteDataType::Dictionary(_)
| ConcreteDataType::Duration(_) => {
| ConcreteDataType::Duration(_)
| ConcreteDataType::Json(_) => {
Comment thread
WenyXu marked this conversation as resolved.
unreachable!()
}
}
Expand Down Expand Up @@ -813,7 +814,8 @@ pub fn pb_values_to_values(data_type: &ConcreteDataType, values: Values) -> Vec<
ConcreteDataType::Null(_)
| ConcreteDataType::List(_)
| ConcreteDataType::Dictionary(_)
| ConcreteDataType::Duration(_) => {
| ConcreteDataType::Duration(_)
| ConcreteDataType::Json(_) => {
Comment thread
WenyXu marked this conversation as resolved.
unreachable!()
}
}
Expand All @@ -831,7 +833,12 @@ pub fn is_column_type_value_eq(
expect_type: &ConcreteDataType,
) -> bool {
ColumnDataTypeWrapper::try_new(type_value, type_extension)
.map(|wrapper| ConcreteDataType::from(wrapper) == *expect_type)
.map(|wrapper| {
let datatype = ConcreteDataType::from(wrapper);
(datatype == *expect_type)
|| (datatype == ConcreteDataType::binary_datatype()
&& *expect_type == ConcreteDataType::json_datatype())
Comment thread
WenyXu marked this conversation as resolved.
})
.unwrap_or(false)
}

Expand Down Expand Up @@ -928,6 +935,9 @@ pub fn to_proto_value(value: Value) -> Option<v1::Value> {
Value::Decimal128(v) => v1::Value {
value_data: Some(ValueData::Decimal128Value(convert_to_pb_decimal128(v))),
},
Value::Json(v) => v1::Value {
value_data: Some(ValueData::BinaryValue(v.to_vec())),
},
Value::List(_) | Value::Duration(_) => return None,
};

Expand Down Expand Up @@ -1023,6 +1033,7 @@ pub fn value_to_grpc_value(value: Value) -> GrpcValue {
}
}),
Value::Decimal128(v) => Some(ValueData::Decimal128Value(convert_to_pb_decimal128(v))),
Value::Json(v) => Some(ValueData::BinaryValue(v.to_vec())),
Value::List(_) | Value::Duration(_) => unreachable!(),
},
}
Expand Down
1 change: 1 addition & 0 deletions src/common/function/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ common-time.workspace = true
common-version.workspace = true
datafusion.workspace = true
datatypes.workspace = true
jsonb.workspace = true
num = "0.4"
num-traits = "0.2"
once_cell.workspace = true
Expand Down
4 changes: 4 additions & 0 deletions src/common/function/src/function_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::function::{AsyncFunctionRef, FunctionRef};
use crate::scalars::aggregate::{AggregateFunctionMetaRef, AggregateFunctions};
use crate::scalars::date::DateFunction;
use crate::scalars::expression::ExpressionFunction;
use crate::scalars::json::JsonFunction;
use crate::scalars::matches::MatchesFunction;
use crate::scalars::math::MathFunction;
use crate::scalars::numpy::NumpyFunction;
Expand Down Expand Up @@ -116,6 +117,9 @@ pub static FUNCTION_REGISTRY: Lazy<Arc<FunctionRegistry>> = Lazy::new(|| {
SystemFunction::register(&function_registry);
TableFunction::register(&function_registry);

// JSON functions
JsonFunction::register(&function_registry);

Arc::new(function_registry)
});

Expand Down
2 changes: 2 additions & 0 deletions src/common/function/src/scalars.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
pub mod aggregate;
pub(crate) mod date;
pub mod expression;
pub mod json;
pub mod matches;
pub mod math;
pub mod numpy;

#[cfg(test)]
pub(crate) mod test;
pub(crate) mod timestamp;
Expand Down
28 changes: 28 additions & 0 deletions src/common/function/src/scalars/json.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;
mod jq;

use jq::JqFunction;

use crate::function_registry::FunctionRegistry;

pub(crate) struct JsonFunction;

impl JsonFunction {
pub fn register(registry: &FunctionRegistry) {
registry.register(Arc::new(JqFunction));
}
}
174 changes: 174 additions & 0 deletions src/common/function/src/scalars/json/jq.rs
Comment thread
WenyXu marked this conversation as resolved.
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt::{self, Display};

use common_query::error::{InvalidFuncArgsSnafu, Result, UnsupportedInputDataTypeSnafu};
use common_query::prelude::Signature;
use datafusion::logical_expr::Volatility;
use datatypes::data_type::ConcreteDataType;
use datatypes::prelude::VectorRef;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::vectors::{MutableVector, StringVectorBuilder};

use crate::function::{Function, FunctionContext};

#[derive(Clone, Debug, Default)]
Comment thread
WenyXu marked this conversation as resolved.
pub struct JqFunction;

const NAME: &str = "jq";

impl Function for JqFunction {
fn name(&self) -> &str {
"jq"
}

fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
Ok(ConcreteDataType::string_datatype())
}

fn signature(&self) -> Signature {
Signature::exact(
vec![
ConcreteDataType::string_datatype(),
ConcreteDataType::json_datatype(),
],
Volatility::Immutable,
)
}

fn eval(&self, _func_ctx: FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
let jsons = &columns[1];
let paths = &columns[0];

let size = jsons.len();
let datatype = jsons.data_type();
let mut results = StringVectorBuilder::with_capacity(size);

match datatype {
ConcreteDataType::Binary(_) | ConcreteDataType::Json(_) => {
for i in 0..size {
let json = jsons.get_ref(i);
Comment thread
WenyXu marked this conversation as resolved.
let json = json.as_binary().unwrap();
let path = paths.get_ref(i);
let path = path.as_string().unwrap();
let result = match (json, path) {
(Some(json), Some(path)) => {
let json_path = match jsonb::jsonpath::parse_json_path(path.as_bytes())
{
Ok(json_path) => json_path,
Err(_) => {
return InvalidFuncArgsSnafu {
err_msg: format!("Invalid JSON path: {}", path),
}
.fail();
}
};
let mut sub_jsonb = Vec::new();
let mut sub_offsets = Vec::new();
match jsonb::get_by_path(
json,
json_path,
&mut sub_jsonb,
&mut sub_offsets,
) {
Ok(_) => Some(jsonb::to_string(&sub_jsonb)),
Err(_) => None,
}
}
_ => None,
};

results.push(result.as_deref());
}
}
_ => {
return UnsupportedInputDataTypeSnafu {
function: NAME,
datatypes: columns.iter().map(|c| c.data_type()).collect::<Vec<_>>(),
}
.fail();
}
}

Ok(results.to_vector())
}
}

impl Display for JqFunction {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "JQ")
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;

use common_query::prelude::TypeSignature;
use datatypes::scalars::ScalarVector;
use datatypes::vectors::{BinaryVector, StringVector};

use super::*;

#[test]
fn test_jq_function() {
let jq = JqFunction;

assert_eq!("jq", jq.name());
assert_eq!(
ConcreteDataType::string_datatype(),
jq.return_type(&[
ConcreteDataType::json_datatype(),
ConcreteDataType::string_datatype()
])
.unwrap()
);

assert!(matches!(jq.signature(),
Signature {
type_signature: TypeSignature::Exact(valid_types),
volatility: Volatility::Immutable
} if valid_types == vec![ConcreteDataType::json_datatype(), ConcreteDataType::string_datatype()]
));

let json_strings = [
r#"{"a": {"b": 2}, "b": 2, "c": 3}"#,
r#"{"a": 4, "b": {"c": 6}, "c": 6}"#,
r#"{"a": 7, "b": 8, "c": {"a": 7}}"#,
];
let paths = vec!["a", "b", "c"];
let results = [r#"{"b":2}"#, r#"{"c":6}"#, r#"{"a":7}"#];

let jsonbs = json_strings
.iter()
.map(|s| {
let value = jsonb::parse_value(s.as_bytes()).unwrap();
value.to_vec()
})
.collect::<Vec<_>>();

let json_vector = BinaryVector::from_vec(jsonbs);
let path_vector = StringVector::from_vec(paths);
let args: Vec<VectorRef> = vec![Arc::new(json_vector), Arc::new(path_vector)];
let vector = jq.eval(FunctionContext::default(), &args).unwrap();

assert_eq!(3, vector.len());
for (i, gt) in results.iter().enumerate() {
let result = vector.get_ref(i);
let result = result.as_string().unwrap().unwrap();
assert_eq!(*gt, result);
}
}
}
2 changes: 1 addition & 1 deletion src/common/grpc/src/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ macro_rules! convert_arrow_array_to_grpc_vals {
return Ok(vals);
},
)+
ConcreteDataType::Null(_) | ConcreteDataType::List(_) | ConcreteDataType::Dictionary(_) | ConcreteDataType::Duration(_) => unreachable!("Should not send {:?} in gRPC", $data_type),
ConcreteDataType::Null(_) | ConcreteDataType::List(_) | ConcreteDataType::Dictionary(_) | ConcreteDataType::Duration(_) | ConcreteDataType::Json(_) => unreachable!("Should not send {:?} in gRPC", $data_type),
}
}};
}
Expand Down
Loading