Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/common/time/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ license.workspace = true

[dependencies]
chrono = "0.4"
common-error = { path = "../error" }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
snafu = { version = "0.7", features = ["backtraces"] }
Expand Down
24 changes: 23 additions & 1 deletion src/common/time/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::any::Any;

use chrono::ParseError;
use snafu::{Backtrace, Snafu};
use common_error::ext::ErrorExt;
use common_error::prelude::StatusCode;
use snafu::{Backtrace, ErrorCompat, Snafu};

#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
Expand All @@ -24,6 +28,24 @@ pub enum Error {
ParseTimestamp { raw: String, backtrace: Backtrace },
}

impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
match self {
Error::ParseDateStr { .. } | Error::ParseTimestamp { .. } => {
StatusCode::InvalidArguments
}
}
}

fn backtrace_opt(&self) -> Option<&Backtrace> {
ErrorCompat::backtrace(self)
}

fn as_any(&self) -> &dyn Any {
self
}
}

pub type Result<T> = std::result::Result<T, Error>;

#[cfg(test)]
Expand Down
164 changes: 140 additions & 24 deletions src/common/time/src/range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ use crate::Timestamp;

/// A half-open time range.
///
/// The time range contains all timestamp `ts` that `ts >= start` and `ts < end`. It is
/// empty if `start >= end`.
/// The range contains values that `value >= start` and `val < end`.
///
/// The range is empty iff `start == end == "the default value of T"`
Comment thread
killme2008 marked this conversation as resolved.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct GenericRange<T> {
start: Option<T>,
Expand All @@ -28,8 +29,9 @@ pub struct GenericRange<T> {

impl<T> GenericRange<T>
where
T: Copy + PartialOrd,
T: Copy + PartialOrd + Default,
{
/// Computes the AND'ed range with other.
pub fn and(&self, other: &GenericRange<T>) -> GenericRange<T> {
let start = match (self.start(), other.start()) {
(Some(l), Some(r)) => {
Expand Down Expand Up @@ -57,7 +59,7 @@ where
(None, None) => None,
};

Self { start, end }
Self::from_optional(start, end)
}

/// Compute the OR'ed range of two ranges.
Expand Down Expand Up @@ -98,12 +100,44 @@ where
(None, None) => None,
};

Self { start, end }
Self::from_optional(start, end)
}

/// Checks if current range intersect with target.
pub fn intersects(&self, target: &GenericRange<T>) -> bool {
!self.and(target).is_empty()
}

/// Create an empty range.
pub fn empty() -> GenericRange<T> {
GenericRange {
start: Some(T::default()),
end: Some(T::default()),
}
}

/// Create GenericRange from optional start and end.
/// If the present value of start >= the present value of end, it will return an empty range
/// with the default value of `T`.
fn from_optional(start: Option<T>, end: Option<T>) -> GenericRange<T> {
match (start, end) {
(Some(start_val), Some(end_val)) => {
if start_val < end_val {
Self {
start: Some(start_val),
end: Some(end_val),
}
} else {
Self::empty()
}
}
(s, e) => Self { start: s, end: e },
}
}
}

impl<T> GenericRange<T> {
/// Creates a new range that contains timestamp in `[start, end)`.
/// Creates a new range that contains values in `[start, end)`.
///
/// Returns `None` if `start` > `end`.
pub fn new<U: PartialOrd + Into<T>>(start: U, end: U) -> Option<GenericRange<T>> {
Expand All @@ -115,14 +149,7 @@ impl<T> GenericRange<T> {
}
}

/// Given a value, creates an empty time range that `start == end == value`.
pub fn empty_with_value<U: Clone + Into<T>>(value: U) -> GenericRange<T> {
GenericRange {
start: Some(value.clone().into()),
end: Some(value.into()),
}
}

/// Return a range containing all possible values.
pub fn min_to_max() -> GenericRange<T> {
Self {
start: None,
Expand All @@ -143,11 +170,11 @@ impl<T> GenericRange<T> {
}

/// Returns true if `timestamp` is contained in the range.
pub fn contains<U: PartialOrd<T>>(&self, timestamp: &U) -> bool {
pub fn contains<U: PartialOrd<T>>(&self, target: &U) -> bool {
match (&self.start, &self.end) {
(Some(start), Some(end)) => *timestamp >= *start && *timestamp < *end,
(Some(start), None) => *timestamp >= *start,
(None, Some(end)) => *timestamp < *end,
(Some(start), Some(end)) => *target >= *start && *target < *end,
(Some(start), None) => *target >= *start,
(None, Some(end)) => *target < *end,
(None, None) => true,
}
}
Expand All @@ -158,19 +185,70 @@ impl<T: PartialOrd> GenericRange<T> {
#[inline]
pub fn is_empty(&self) -> bool {
match (&self.start, &self.end) {
(Some(start), Some(end)) => start >= end,
(Some(start), Some(end)) => start == end,
_ => false,
}
}
}

pub type TimestampRange = GenericRange<Timestamp>;

impl TimestampRange {
pub fn new_inclusive(start: Option<Timestamp>, end: Option<Timestamp>) -> Self {
let end = if let Some(end) = end {
end.value()
.checked_add(1)
.map(|v| Timestamp::new(v, end.unit()))
} else {
None
};
Self::from_optional(start, end)
}

/// Shortcut method to create a timestamp range with given start/end value and time unit.
pub fn with_unit(start: i64, end: i64, unit: TimeUnit) -> Option<Self> {
let start = Timestamp::new(start, unit);
let end = Timestamp::new(end, unit);
Self::new(start, end)
}

/// Create a range that containing only given `ts`.
/// ### Notice:
/// Left-close right-open range cannot properly represent range with a single value.
/// For simplicity, this implementation returns an approximate range `[ts, ts+1)` instead.
pub fn single(ts: Timestamp) -> Self {
let unit = ts.unit();
let start = Some(ts);
let end = ts.value().checked_add(1).map(|v| Timestamp::new(v, unit));

Self::from_optional(start, end)
}

/// Create a range `[start, INF)`.
/// ### Notice
/// Left-close right-open range cannot properly represent range with exclusive start like: `(start, ...)`.
/// You may resort to `[start-1, ...)` instead.
pub fn from_start(start: Timestamp) -> Self {
Self {
start: Some(start),
end: None,
}
}

/// Create a range `[-INF, end)`.
/// ### Notice
/// Left-close right-open range cannot properly represent range with inclusive end like: `[..., END]`.
/// If `inclusive` is true, this method returns `[-INF, end+1)` instead.
pub fn until_end(end: Timestamp, inclusive: bool) -> Self {
let end = if inclusive {
end.value()
.checked_add(1)
.map(|v| Timestamp::new(v, end.unit()))
} else {
Some(end)
};
Self { start: None, end }
}
}

/// Time range in milliseconds.
Expand All @@ -197,9 +275,9 @@ mod tests {

assert_eq!(None, RangeMillis::new(1, 0));

let range = RangeMillis::empty_with_value(1024);
let range = RangeMillis::empty();
assert_eq!(range.start(), range.end());
assert_eq!(Some(TimestampMillis::new(1024)), *range.start());
assert_eq!(Some(TimestampMillis::new(0)), *range.start());
}

#[test]
Expand Down Expand Up @@ -295,9 +373,7 @@ mod tests {
TimestampRange::min_to_max().or(&TimestampRange::min_to_max())
);

let empty = TimestampRange::empty_with_value(Timestamp::new_millisecond(1)).or(
&TimestampRange::empty_with_value(Timestamp::new_millisecond(2)),
);
let empty = TimestampRange::empty().or(&TimestampRange::empty());
assert!(empty.is_empty());

let t1 = TimestampRange::with_unit(-10, 0, TimeUnit::Second).unwrap();
Expand All @@ -310,4 +386,44 @@ mod tests {
let t1 = TimestampRange::with_unit(-10, 0, TimeUnit::Second).unwrap();
assert_eq!(t1, t1.or(&t1));
}

#[test]
fn test_intersect() {
let t1 = TimestampRange::with_unit(-10, 0, TimeUnit::Second).unwrap();
let t2 = TimestampRange::with_unit(-30, -20, TimeUnit::Second).unwrap();
assert!(!t1.intersects(&t2));

let t1 = TimestampRange::with_unit(10, 20, TimeUnit::Second).unwrap();
let t2 = TimestampRange::with_unit(0, 30, TimeUnit::Second).unwrap();
assert!(t1.intersects(&t2));

let t1 = TimestampRange::with_unit(-20, -10, TimeUnit::Second).unwrap();
let t2 = TimestampRange::with_unit(-10, 0, TimeUnit::Second).unwrap();
assert!(!t1.intersects(&t2));

let t1 = TimestampRange::with_unit(0, 1, TimeUnit::Second).unwrap();
let t2 = TimestampRange::with_unit(999, 1000, TimeUnit::Millisecond).unwrap();
assert!(t1.intersects(&t2));

let t1 = TimestampRange::with_unit(1, 2, TimeUnit::Second).unwrap();
let t2 = TimestampRange::with_unit(1000, 2000, TimeUnit::Millisecond).unwrap();
assert!(t1.intersects(&t2));

let t1 = TimestampRange::with_unit(0, 1, TimeUnit::Second).unwrap();
assert!(t1.intersects(&t1));

let t1 = TimestampRange::with_unit(0, 1, TimeUnit::Second).unwrap();
let t2 = TimestampRange::empty();
assert!(!t1.intersects(&t2));

// empty range does not intersect with empty range
let empty = TimestampRange::empty();
assert!(!empty.intersects(&empty));

// full range intersects with full range
let full = TimestampRange::min_to_max();
assert!(full.intersects(&full));

assert!(!full.intersects(&empty));
}
}
2 changes: 1 addition & 1 deletion src/common/time/src/timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ mod tests {
}
}

/// Generate timestamp less than or equal to `threshold`
/// Generate timestamp less than or equal to `threshold`
fn gen_ts_le(threshold: &Timestamp) -> Timestamp {
let mut rng = rand::thread_rng();
let timestamp = rng.gen_range(i64::MIN..=threshold.value);
Expand Down
2 changes: 1 addition & 1 deletion src/common/time/src/timestamp_millis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::cmp::Ordering;
/// Unix timestamp in millisecond resolution.
///
/// Negative timestamp is allowed, which represents timestamp before '1970-01-01T00:00:00'.
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
pub struct TimestampMillis(i64);

impl TimestampMillis {
Expand Down
1 change: 1 addition & 0 deletions src/datatypes/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ arrow-schema.workspace = true
common-base = { path = "../common/base" }
common-error = { path = "../common/error" }
common-time = { path = "../common/time" }
common-telemetry = { path = "../common/telemetry" }
datafusion-common.workspace = true
enum_dispatch = "0.3"
num = "0.4"
Expand Down
25 changes: 24 additions & 1 deletion src/datatypes/src/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@

use std::cmp::Ordering;
use std::fmt::{Display, Formatter};
use std::str::FromStr;

use arrow::datatypes::{DataType as ArrowDataType, Field};
use common_base::bytes::{Bytes, StringBytes};
use common_telemetry::logging;
use common_time::date::Date;
use common_time::datetime::DateTime;
use common_time::timestamp::{TimeUnit, Timestamp};
Expand All @@ -25,7 +27,8 @@ pub use ordered_float::OrderedFloat;
use serde::{Deserialize, Serialize};
use snafu::ensure;

use crate::error::{self, Result};
use crate::error;
use crate::error::Result;
use crate::prelude::*;
use crate::type_id::LogicalTypeId;
use crate::types::ListType;
Expand Down Expand Up @@ -286,6 +289,26 @@ fn timestamp_to_scalar_value(unit: TimeUnit, val: Option<i64>) -> ScalarValue {
}
}

/// Convert [ScalarValue] to [Timestamp].
/// Return `None` if given scalar value cannot be converted to a valid timestamp.
pub fn scalar_value_to_timestamp(scalar: &ScalarValue) -> Option<Timestamp> {
match scalar {
ScalarValue::Int64(val) => val.map(Timestamp::new_millisecond),
ScalarValue::Utf8(Some(s)) => match Timestamp::from_str(s) {
Ok(t) => Some(t),
Err(e) => {
logging::error!(e;"Failed to convert string literal {s} to timestamp");
None
}
},
ScalarValue::TimestampSecond(v, _) => v.map(Timestamp::new_second),
ScalarValue::TimestampMillisecond(v, _) => v.map(Timestamp::new_millisecond),
ScalarValue::TimestampMicrosecond(v, _) => v.map(Timestamp::new_microsecond),
ScalarValue::TimestampNanosecond(v, _) => v.map(Timestamp::new_nanosecond),
_ => None,
}
}

macro_rules! impl_ord_for_value_like {
($Type: ident, $left: ident, $right: ident) => {
if $left.is_null() && !$right.is_null() {
Expand Down
Loading