Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

162 changes: 139 additions & 23 deletions src/common/time/src/range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ use crate::Timestamp;

/// A half-open time range.
///
/// The time range contains all timestamp `ts` that `ts >= start` and `ts < end`. It is
/// empty if `start >= end`.
/// The range contains values that `value >= start` and `val < end`.
///
/// The range is empty iff `start == end == "the default value of T"`
Comment thread
killme2008 marked this conversation as resolved.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct GenericRange<T> {
start: Option<T>,
Expand All @@ -28,8 +29,9 @@ pub struct GenericRange<T> {

impl<T> GenericRange<T>
where
T: Copy + PartialOrd,
T: Copy + PartialOrd + Default,
{
/// Computes the AND'ed range with other.
pub fn and(&self, other: &GenericRange<T>) -> GenericRange<T> {
let start = match (self.start(), other.start()) {
(Some(l), Some(r)) => {
Expand Down Expand Up @@ -57,7 +59,7 @@ where
(None, None) => None,
};

Self { start, end }
Self::from_optional(start, end)
}

/// Compute the OR'ed range of two ranges.
Expand Down Expand Up @@ -98,12 +100,44 @@ where
(None, None) => None,
};

Self { start, end }
Self::from_optional(start, end)
}

/// Checks if current range intersect with target.
pub fn intersects(&self, target: &GenericRange<T>) -> bool {
!self.and(target).is_empty()
}

/// Create an empty range.
pub fn empty() -> GenericRange<T> {
GenericRange {
start: Some(T::default()),
end: Some(T::default()),
}
}

/// Create GenericRange from optional start and end.
/// If the present value of start >= the present value of end, it will return an empty range
/// with the default value of `T`.
fn from_optional(start: Option<T>, end: Option<T>) -> GenericRange<T> {
match (start, end) {
(Some(start_val), Some(end_val)) => {
if start_val < end_val {
Self {
start: Some(start_val),
end: Some(end_val),
}
} else {
Self::empty()
}
}
(s, e) => Self { start: s, end: e },
}
}
}

impl<T> GenericRange<T> {
/// Creates a new range that contains timestamp in `[start, end)`.
/// Creates a new range that contains values in `[start, end)`.
///
/// Returns `None` if `start` > `end`.
pub fn new<U: PartialOrd + Into<T>>(start: U, end: U) -> Option<GenericRange<T>> {
Expand All @@ -115,14 +149,7 @@ impl<T> GenericRange<T> {
}
}

/// Given a value, creates an empty time range that `start == end == value`.
pub fn empty_with_value<U: Clone + Into<T>>(value: U) -> GenericRange<T> {
GenericRange {
start: Some(value.clone().into()),
end: Some(value.into()),
}
}

/// Return a range containing all possible values.
pub fn min_to_max() -> GenericRange<T> {
Self {
start: None,
Expand All @@ -143,11 +170,11 @@ impl<T> GenericRange<T> {
}

/// Returns true if `timestamp` is contained in the range.
pub fn contains<U: PartialOrd<T>>(&self, timestamp: &U) -> bool {
pub fn contains<U: PartialOrd<T>>(&self, target: &U) -> bool {
match (&self.start, &self.end) {
(Some(start), Some(end)) => *timestamp >= *start && *timestamp < *end,
(Some(start), None) => *timestamp >= *start,
(None, Some(end)) => *timestamp < *end,
(Some(start), Some(end)) => *target >= *start && *target < *end,
(Some(start), None) => *target >= *start,
(None, Some(end)) => *target < *end,
(None, None) => true,
}
}
Expand All @@ -165,12 +192,63 @@ impl<T: PartialOrd> GenericRange<T> {
}

pub type TimestampRange = GenericRange<Timestamp>;

impl TimestampRange {
pub fn new_inclusive_unchecked(start: Option<Timestamp>, end: Option<Timestamp>) -> Self {
Comment thread
v0y4g3r marked this conversation as resolved.
Outdated
let end = if let Some(end) = end {
end.value()
.checked_add(1)
.map(|v| Timestamp::new(v, end.unit()))
} else {
None
};
Self::from_optional(start, end)
}

/// Shortcut method to create a timestamp range with given start/end value and time unit.
pub fn with_unit(start: i64, end: i64, unit: TimeUnit) -> Option<Self> {
let start = Timestamp::new(start, unit);
let end = Timestamp::new(end, unit);
Self::new(start, end)
}

/// Create a range that containing only given `ts`.
/// ### Notice:
/// Left-close right-open range cannot properly represent range with a single value.
/// For simplicity, this implementation returns an approximate range `[ts, ts+1)` instead.
pub fn single(ts: Timestamp) -> Self {
let unit = ts.unit();
let start = Some(ts);
let end = ts.value().checked_add(1).map(|v| Timestamp::new(v, unit));

Self::from_optional(start, end)
}

/// Create a range `[start, INF)`.
/// ### Notice
/// Left-close right-open range cannot properly represent range with exclusive start like: `(start, ...)`.
/// You may resort to `[start-1, ...)` instead.
pub fn from(start: Timestamp) -> Self {
Comment thread
v0y4g3r marked this conversation as resolved.
Outdated
Self {
start: Some(start),
end: None,
}
}

/// Create a range `[-INF, end)`.
/// ### Notice
/// Left-close right-open range cannot properly represent range with inclusive end like: `[..., END]`.
/// If `inclusive` is true, this method returns `[-INF, end+1)` instead.
pub fn until(end: Timestamp, inclusive: bool) -> Self {
Comment thread
evenyag marked this conversation as resolved.
Outdated
let end = if inclusive {
end.value()
.checked_add(1)
.map(|v| Timestamp::new(v, end.unit()))
} else {
Some(end)
};
Self { start: None, end }
}
}

/// Time range in milliseconds.
Expand All @@ -197,9 +275,9 @@ mod tests {

assert_eq!(None, RangeMillis::new(1, 0));

let range = RangeMillis::empty_with_value(1024);
let range = RangeMillis::empty();
assert_eq!(range.start(), range.end());
assert_eq!(Some(TimestampMillis::new(1024)), *range.start());
assert_eq!(Some(TimestampMillis::new(0)), *range.start());
}

#[test]
Expand Down Expand Up @@ -295,9 +373,7 @@ mod tests {
TimestampRange::min_to_max().or(&TimestampRange::min_to_max())
);

let empty = TimestampRange::empty_with_value(Timestamp::new_millisecond(1)).or(
&TimestampRange::empty_with_value(Timestamp::new_millisecond(2)),
);
let empty = TimestampRange::empty().or(&TimestampRange::empty());
assert!(empty.is_empty());

let t1 = TimestampRange::with_unit(-10, 0, TimeUnit::Second).unwrap();
Expand All @@ -310,4 +386,44 @@ mod tests {
let t1 = TimestampRange::with_unit(-10, 0, TimeUnit::Second).unwrap();
assert_eq!(t1, t1.or(&t1));
}

#[test]
fn test_intersect() {
let t1 = TimestampRange::with_unit(-10, 0, TimeUnit::Second).unwrap();
let t2 = TimestampRange::with_unit(-30, -20, TimeUnit::Second).unwrap();
assert!(!t1.intersects(&t2));

let t1 = TimestampRange::with_unit(10, 20, TimeUnit::Second).unwrap();
let t2 = TimestampRange::with_unit(0, 30, TimeUnit::Second).unwrap();
assert!(t1.intersects(&t2));

let t1 = TimestampRange::with_unit(-20, -10, TimeUnit::Second).unwrap();
let t2 = TimestampRange::with_unit(-10, 0, TimeUnit::Second).unwrap();
assert!(!t1.intersects(&t2));

let t1 = TimestampRange::with_unit(0, 1, TimeUnit::Second).unwrap();
let t2 = TimestampRange::with_unit(999, 1000, TimeUnit::Millisecond).unwrap();
assert!(t1.intersects(&t2));

let t1 = TimestampRange::with_unit(1, 2, TimeUnit::Second).unwrap();
let t2 = TimestampRange::with_unit(1000, 2000, TimeUnit::Millisecond).unwrap();
assert!(t1.intersects(&t2));

let t1 = TimestampRange::with_unit(0, 1, TimeUnit::Second).unwrap();
assert!(t1.intersects(&t1));

let t1 = TimestampRange::with_unit(0, 1, TimeUnit::Second).unwrap();
let t2 = TimestampRange::empty();
assert!(!t1.intersects(&t2));

// empty range does not intersect with empty range
let empty = TimestampRange::empty();
assert!(!empty.intersects(&empty));

// full range intersects with full range
let full = TimestampRange::min_to_max();
assert!(full.intersects(&full));

assert!(!full.intersects(&empty));
}
}
2 changes: 1 addition & 1 deletion src/common/time/src/timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ mod tests {
}
}

/// Generate timestamp less than or equal to `threshold`
/// Generate timestamp less than or equal to `threshold`
fn gen_ts_le(threshold: &Timestamp) -> Timestamp {
let mut rng = rand::thread_rng();
let timestamp = rng.gen_range(i64::MIN..=threshold.value);
Expand Down
2 changes: 1 addition & 1 deletion src/common/time/src/timestamp_millis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::cmp::Ordering;
/// Unix timestamp in millisecond resolution.
///
/// Negative timestamp is allowed, which represents timestamp before '1970-01-01T00:00:00'.
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
pub struct TimestampMillis(i64);

impl TimestampMillis {
Expand Down
1 change: 1 addition & 0 deletions src/datatypes/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ arrow-schema.workspace = true
common-base = { path = "../common/base" }
common-error = { path = "../common/error" }
common-time = { path = "../common/time" }
common-telemetry = { path = "../common/telemetry" }
datafusion-common.workspace = true
enum_dispatch = "0.3"
num = "0.4"
Expand Down
25 changes: 24 additions & 1 deletion src/datatypes/src/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@

use std::cmp::Ordering;
use std::fmt::{Display, Formatter};
use std::str::FromStr;

use arrow::datatypes::{DataType as ArrowDataType, Field};
use common_base::bytes::{Bytes, StringBytes};
use common_telemetry::logging;
use common_time::date::Date;
use common_time::datetime::DateTime;
use common_time::timestamp::{TimeUnit, Timestamp};
Expand All @@ -25,7 +27,8 @@ pub use ordered_float::OrderedFloat;
use serde::{Deserialize, Serialize};
use snafu::ensure;

use crate::error::{self, Result};
use crate::error;
use crate::error::Result;
use crate::prelude::*;
use crate::type_id::LogicalTypeId;
use crate::types::ListType;
Expand Down Expand Up @@ -286,6 +289,26 @@ fn timestamp_to_scalar_value(unit: TimeUnit, val: Option<i64>) -> ScalarValue {
}
}

/// Convert [ScalarValue] to [Timestamp].
/// Return `None` if
Comment thread
v0y4g3r marked this conversation as resolved.
Outdated
pub fn scalar_value_to_timestamp(scalar: &ScalarValue) -> Option<Timestamp> {
match scalar {
ScalarValue::Int64(val) => val.map(Timestamp::new_millisecond),
ScalarValue::Utf8(Some(s)) => match Timestamp::from_str(s) {
Ok(t) => Some(t),
Err(e) => {
logging::error!("Failed to convert string literal {s} to timestamp, error: {e:?}");
Comment thread
v0y4g3r marked this conversation as resolved.
Outdated
None
}
},
ScalarValue::TimestampSecond(v, _) => v.map(Timestamp::new_second),
ScalarValue::TimestampMillisecond(v, _) => v.map(Timestamp::new_millisecond),
ScalarValue::TimestampMicrosecond(v, _) => v.map(Timestamp::new_microsecond),
ScalarValue::TimestampNanosecond(v, _) => v.map(Timestamp::new_nanosecond),
_ => None,
}
}

macro_rules! impl_ord_for_value_like {
($Type: ident, $left: ident, $right: ident) => {
if $left.is_null() && !$right.is_null() {
Expand Down
Loading