Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions src/common/time/src/date.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ impl Date {
pub fn val(&self) -> i32 {
self.0
}

pub fn to_chrono_date(&self) -> Option<NaiveDate> {
NaiveDate::from_num_days_from_ce_opt(UNIX_EPOCH_FROM_CE + self.0)
}
}

#[cfg(test)]
Expand Down
6 changes: 5 additions & 1 deletion src/common/time/src/datetime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::error::{Error, ParseDateStrSnafu, Result};

const DATETIME_FORMAT: &str = "%F %T";

/// [DateTime] represents the **seconds elapsed since "1970-01-01 00:00:00 UTC" (UNIX Epoch)**.
/// [DateTime] represents the **seconds elapsed since "1970-01-01 00:00:00 UTC" (UNIX Epoch)**.
#[derive(
Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default, Serialize, Deserialize,
)]
Expand Down Expand Up @@ -69,6 +69,10 @@ impl DateTime {
pub fn val(&self) -> i64 {
self.0
}

pub fn to_chrono_datetime(&self) -> Option<NaiveDateTime> {
NaiveDateTime::from_timestamp_millis(self.0)
}
}

#[cfg(test)]
Expand Down
15 changes: 9 additions & 6 deletions src/common/time/src/timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,20 +114,23 @@ impl Timestamp {
/// Format timestamp to ISO8601 string. If the timestamp exceeds what chrono timestamp can
/// represent, this function simply print the timestamp unit and value in plain string.
pub fn to_iso8601_string(&self) -> String {
let nano_factor = TimeUnit::Second.factor() / TimeUnit::Nanosecond.factor();
if let LocalResult::Single(datetime) = self.to_chrono_datetime() {
format!("{}", datetime.format("%Y-%m-%d %H:%M:%S%.f%z"))
} else {
format!("[Timestamp{}: {}]", self.unit, self.value)
}
}

pub fn to_chrono_datetime(&self) -> LocalResult<DateTime<Utc>> {
let nano_factor = TimeUnit::Second.factor() / TimeUnit::Nanosecond.factor();
let (mut secs, mut nsecs) = self.split();

if nsecs < 0 {
secs -= 1;
nsecs += nano_factor;
}

if let LocalResult::Single(datetime) = Utc.timestamp_opt(secs, nsecs as u32) {
format!("{}", datetime.format("%Y-%m-%d %H:%M:%S%.f%z"))
} else {
format!("[Timestamp{}: {}]", self.unit, self.value)
}
Utc.timestamp_opt(secs, nsecs as u32)
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/servers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ axum-macros = "0.3"
base64 = "0.13"
bytes = "1.2"
catalog = { path = "../catalog" }
chrono.workspace = true
common-base = { path = "../common/base" }
common-catalog = { path = "../common/catalog" }
common-error = { path = "../common/error" }
Expand All @@ -40,6 +41,7 @@ openmetrics-parser = "0.4"
opensrv-mysql = { git = "https://github.com/datafuselabs/opensrv", rev = "b44c9d1360da297b305abf33aecfa94888e1554c" }
pgwire = "0.10"
pin-project = "1.0"
postgres-types = { version = "0.2", features = ["with-chrono-0_4"] }
prost.workspace = true
query = { path = "../query" }
rand = "0.8"
Expand Down
58 changes: 51 additions & 7 deletions src/servers/src/postgres/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::sync::Arc;
use std::time::Duration;

use async_trait::async_trait;
use chrono::LocalResult;
use common_query::Output;
use common_recordbatch::error::Result as RecordBatchResult;
use common_recordbatch::RecordBatch;
Expand Down Expand Up @@ -171,9 +172,37 @@ fn encode_text_value(value: &Value, builder: &mut DataRowEncoder) -> PgWireResul
Value::Float64(v) => builder.encode_text_format_field(Some(&v.0)),
Value::String(v) => builder.encode_text_format_field(Some(&v.as_utf8())),
Value::Binary(v) => builder.encode_text_format_field(Some(&hex::encode(v.deref()))),
Value::Date(v) => builder.encode_text_format_field(Some(&v.to_string())),
Value::DateTime(v) => builder.encode_text_format_field(Some(&v.to_string())),
Value::Timestamp(v) => builder.encode_text_format_field(Some(&v.to_iso8601_string())),
Value::Date(v) => {
if let Some(date) = v.to_chrono_date() {
builder.encode_text_format_field(Some(&date.format("%Y-%m-%d").to_string()))
} else {
Err(PgWireError::ApiError(Box::new(Error::Internal {
err_msg: format!("Failed to convert date to postgres type {v:?}",),
})))
}
}
Value::DateTime(v) => {
if let Some(datetime) = v.to_chrono_datetime() {
builder.encode_text_format_field(Some(
&datetime.format("%Y-%m-%d %H:%M:%S%.6f").to_string(),
))
} else {
Err(PgWireError::ApiError(Box::new(Error::Internal {
err_msg: format!("Failed to convert date to postgres type {v:?}",),
})))
}
}
Value::Timestamp(v) => {
if let LocalResult::Single(datetime) = v.to_chrono_datetime() {
builder.encode_text_format_field(Some(
&datetime.format("%Y-%m-%d %H:%M:%S%.6f").to_string(),
))
} else {
Err(PgWireError::ApiError(Box::new(Error::Internal {
err_msg: format!("Failed to convert date to postgres type {v:?}",),
})))
}
}
Value::List(_) => Err(PgWireError::ApiError(Box::new(Error::Internal {
err_msg: format!(
"cannot write value {:?} in postgres protocol: unimplemented",
Expand Down Expand Up @@ -203,17 +232,32 @@ fn encode_binary_value(
Value::Float64(v) => builder.encode_binary_format_field(&v.0, datatype),
Value::String(v) => builder.encode_binary_format_field(&v.as_utf8(), datatype),
Value::Binary(v) => builder.encode_binary_format_field(&v.deref(), datatype),
// TODO(sunng87): correct date/time types encoding
Value::Date(v) => builder.encode_binary_format_field(&v.to_string(), datatype),
Value::DateTime(v) => builder.encode_binary_format_field(&v.to_string(), datatype),
Value::Date(v) => {
if let Some(date) = v.to_chrono_date() {
builder.encode_binary_format_field(&date, datatype)
} else {
Err(PgWireError::ApiError(Box::new(Error::Internal {
err_msg: format!("Failed to convert date to postgres type {v:?}",),
})))
}
}
Value::DateTime(v) => {
if let Some(datetime) = v.to_chrono_datetime() {
builder.encode_binary_format_field(&datetime, datatype)
} else {
Err(PgWireError::ApiError(Box::new(Error::Internal {
err_msg: format!("Failed to convert datetime to postgres type {v:?}",),
})))
}
}
Value::Timestamp(v) => {
// convert timestamp to SystemTime
if let Some(ts) = v.convert_to(TimeUnit::Microsecond) {
let sys_time = std::time::UNIX_EPOCH + Duration::from_micros(ts.value() as u64);
builder.encode_binary_format_field(&sys_time, datatype)
} else {
Err(PgWireError::ApiError(Box::new(Error::Internal {
err_msg: format!("Failed to conver timestamp to postgres type {v:?}",),
err_msg: format!("Failed to convert timestamp to postgres type {v:?}",),
})))
}
}
Expand Down