Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions src/common/time/src/date.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ impl Date {
pub fn val(&self) -> i32 {
self.0
}

pub fn to_chrono_date(&self) -> Option<NaiveDate> {
NaiveDate::from_num_days_from_ce_opt(UNIX_EPOCH_FROM_CE + self.0)
}
}

#[cfg(test)]
Expand Down
6 changes: 5 additions & 1 deletion src/common/time/src/datetime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::error::{Error, ParseDateStrSnafu, Result};

const DATETIME_FORMAT: &str = "%F %T";

/// [DateTime] represents the **seconds elapsed since "1970-01-01 00:00:00 UTC" (UNIX Epoch)**.
/// [DateTime] represents the **seconds elapsed since "1970-01-01 00:00:00 UTC" (UNIX Epoch)**.
#[derive(
Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default, Serialize, Deserialize,
)]
Expand Down Expand Up @@ -69,6 +69,10 @@ impl DateTime {
pub fn val(&self) -> i64 {
self.0
}

pub fn to_chrono_datetime(&self) -> Option<NaiveDateTime> {
NaiveDateTime::from_timestamp_millis(self.0)
}
}

#[cfg(test)]
Expand Down
15 changes: 9 additions & 6 deletions src/common/time/src/timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,20 +114,23 @@ impl Timestamp {
/// Format timestamp to ISO8601 string. If the timestamp exceeds what chrono timestamp can
/// represent, this function simply print the timestamp unit and value in plain string.
pub fn to_iso8601_string(&self) -> String {
let nano_factor = TimeUnit::Second.factor() / TimeUnit::Nanosecond.factor();
if let LocalResult::Single(datetime) = self.to_chrono_datetime() {
format!("{}", datetime.format("%Y-%m-%d %H:%M:%S%.f%z"))
} else {
format!("[Timestamp{}: {}]", self.unit, self.value)
}
}

pub fn to_chrono_datetime(&self) -> LocalResult<DateTime<Utc>> {
let nano_factor = TimeUnit::Second.factor() / TimeUnit::Nanosecond.factor();
let (mut secs, mut nsecs) = self.split();

if nsecs < 0 {
secs -= 1;
nsecs += nano_factor;
}

if let LocalResult::Single(datetime) = Utc.timestamp_opt(secs, nsecs as u32) {
format!("{}", datetime.format("%Y-%m-%d %H:%M:%S%.f%z"))
} else {
format!("[Timestamp{}: {}]", self.unit, self.value)
}
Utc.timestamp_opt(secs, nsecs as u32)
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/servers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ axum-macros = "0.3"
base64 = "0.13"
bytes = "1.2"
catalog = { path = "../catalog" }
chrono = "0.4"
Comment thread
sunng87 marked this conversation as resolved.
Outdated
common-base = { path = "../common/base" }
common-catalog = { path = "../common/catalog" }
common-error = { path = "../common/error" }
Expand All @@ -40,6 +41,7 @@ openmetrics-parser = "0.4"
opensrv-mysql = { git = "https://github.com/datafuselabs/opensrv", rev = "b44c9d1360da297b305abf33aecfa94888e1554c" }
pgwire = "0.10"
pin-project = "1.0"
postgres-types = { version = "0.2", features = ["with-chrono-0_4"] }
prost.workspace = true
query = { path = "../query" }
rand = "0.8"
Expand Down
58 changes: 51 additions & 7 deletions src/servers/src/postgres/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::sync::Arc;
use std::time::Duration;

use async_trait::async_trait;
use chrono::LocalResult;
use common_query::Output;
use common_recordbatch::error::Result as RecordBatchResult;
use common_recordbatch::RecordBatch;
Expand Down Expand Up @@ -171,9 +172,37 @@ fn encode_text_value(value: &Value, builder: &mut DataRowEncoder) -> PgWireResul
Value::Float64(v) => builder.encode_text_format_field(Some(&v.0)),
Value::String(v) => builder.encode_text_format_field(Some(&v.as_utf8())),
Value::Binary(v) => builder.encode_text_format_field(Some(&hex::encode(v.deref()))),
Value::Date(v) => builder.encode_text_format_field(Some(&v.to_string())),
Value::DateTime(v) => builder.encode_text_format_field(Some(&v.to_string())),
Value::Timestamp(v) => builder.encode_text_format_field(Some(&v.to_iso8601_string())),
Value::Date(v) => {
if let Some(date) = v.to_chrono_date() {
builder.encode_text_format_field(Some(&date.format("%Y-%m-%d").to_string()))
} else {
Err(PgWireError::ApiError(Box::new(Error::Internal {
err_msg: format!("Failed to convert date to postgres type {v:?}",),
})))
}
}
Value::DateTime(v) => {
if let Some(datetime) = v.to_chrono_datetime() {
builder.encode_text_format_field(Some(
&datetime.format("%Y-%m-%d %H:%M:%S%.6f").to_string(),
))
} else {
Err(PgWireError::ApiError(Box::new(Error::Internal {
err_msg: format!("Failed to convert date to postgres type {v:?}",),
})))
}
}
Value::Timestamp(v) => {
if let LocalResult::Single(datetime) = v.to_chrono_datetime() {
builder.encode_text_format_field(Some(
&datetime.format("%Y-%m-%d %H:%M:%S%.6f").to_string(),
))
} else {
Err(PgWireError::ApiError(Box::new(Error::Internal {
err_msg: format!("Failed to convert date to postgres type {v:?}",),
})))
}
}
Value::List(_) => Err(PgWireError::ApiError(Box::new(Error::Internal {
err_msg: format!(
"cannot write value {:?} in postgres protocol: unimplemented",
Expand Down Expand Up @@ -203,17 +232,32 @@ fn encode_binary_value(
Value::Float64(v) => builder.encode_binary_format_field(&v.0, datatype),
Value::String(v) => builder.encode_binary_format_field(&v.as_utf8(), datatype),
Value::Binary(v) => builder.encode_binary_format_field(&v.deref(), datatype),
// TODO(sunng87): correct date/time types encoding
Value::Date(v) => builder.encode_binary_format_field(&v.to_string(), datatype),
Value::DateTime(v) => builder.encode_binary_format_field(&v.to_string(), datatype),
Value::Date(v) => {
if let Some(date) = v.to_chrono_date() {
builder.encode_binary_format_field(&date, datatype)
} else {
Err(PgWireError::ApiError(Box::new(Error::Internal {
err_msg: format!("Failed to convert date to postgres type {v:?}",),
})))
}
}
Value::DateTime(v) => {
if let Some(datetime) = v.to_chrono_datetime() {
builder.encode_binary_format_field(&datetime, datatype)
} else {
Err(PgWireError::ApiError(Box::new(Error::Internal {
err_msg: format!("Failed to convert datetime to postgres type {v:?}",),
})))
}
}
Value::Timestamp(v) => {
// convert timestamp to SystemTime
if let Some(ts) = v.convert_to(TimeUnit::Microsecond) {
let sys_time = std::time::UNIX_EPOCH + Duration::from_micros(ts.value() as u64);
builder.encode_binary_format_field(&sys_time, datatype)
} else {
Err(PgWireError::ApiError(Box::new(Error::Internal {
err_msg: format!("Failed to conver timestamp to postgres type {v:?}",),
err_msg: format!("Failed to convert timestamp to postgres type {v:?}",),
})))
}
}
Expand Down