Skip to content

Commit 26c6c65

Browse files
authored
Merge pull request #646 from sfu-db/microsec
Adding microsecond timestamp type, fix #644 #634
2 parents 80b0a99 + f7ec750 commit 26c6c65

File tree

15 files changed

+634
-79
lines changed

15 files changed

+634
-79
lines changed

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
*.swp
2-
2+
.DS_Store
33
**/target
44
.vscode
55
connectorx-python/connectorx/*.so

connectorx-python/connectorx/tests/test_arrow.py

Lines changed: 98 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,101 @@ def test_arrow2(postgres_url: str) -> None:
7373
df.sort_values(by="test_int", inplace=True, ignore_index=True)
7474
assert_frame_equal(df, expected, check_names=True)
7575

76-
76+
def test_arrow_type(postgres_url: str) -> None:
77+
query = "SELECT test_date, test_timestamp, test_timestamptz, test_int16, test_int64, test_float32, test_numeric, test_bpchar, test_char, test_varchar, test_uuid, test_time, test_bytea, test_json, test_jsonb, test_ltree, test_name FROM test_types"
78+
df = read_sql(postgres_url, query, return_type="arrow")
79+
df = df.to_pandas(date_as_object=False)
80+
df.sort_values(by="test_int16", inplace=True, ignore_index=True)
81+
expected = pd.DataFrame(
82+
index=range(4),
83+
data={
84+
"test_date": pd.Series(
85+
["1970-01-01", "2000-02-28", "2038-01-18", None], dtype="datetime64[ms]"
86+
),
87+
"test_timestamp": pd.Series(
88+
[
89+
"1970-01-01 00:00:01",
90+
"2000-02-28 12:00:10",
91+
"2038-01-18 23:59:59",
92+
None,
93+
],
94+
dtype="datetime64[us]",
95+
),
96+
"test_timestamptz": pd.Series(
97+
[
98+
"1970-01-01 00:00:01+00:00",
99+
"2000-02-28 16:00:10+00:00",
100+
"2038-01-18 15:59:59+00:00",
101+
None,
102+
],
103+
dtype="datetime64[us, UTC]",
104+
),
105+
"test_int16": pd.Series([0, 1, 2, 3], dtype="int64"),
106+
"test_int64": pd.Series(
107+
[-9223372036854775808, 0, 9223372036854775807, None], dtype="float64"
108+
),
109+
"test_float32": pd.Series(
110+
[None, 3.1415926535, 2.71, -1e-37], dtype="float64"
111+
),
112+
"test_numeric": pd.Series([None, 521.34, 0.00, 0.00], dtype="float64"),
113+
"test_bpchar": pd.Series(["a ", "bb ", "ccc ", None], dtype="object"),
114+
"test_char": pd.Series(["a", "b", None, "d"], dtype="object"),
115+
"test_varchar": pd.Series([None, "bb", "c", "defghijklm"], dtype="object"),
116+
"test_uuid": pd.Series(
117+
[
118+
"86b494cc-96b2-11eb-9298-3e22fbb9fe9d",
119+
"86b49b84-96b2-11eb-9298-3e22fbb9fe9d",
120+
"86b49c42-96b2-11eb-9298-3e22fbb9fe9d",
121+
None,
122+
],
123+
dtype="object",
124+
),
125+
"test_time": pd.Series(
126+
[
127+
datetime.time(8, 12, 40),
128+
None,
129+
datetime.time(23, 0, 10),
130+
datetime.time(18, 30),
131+
],
132+
dtype="object",
133+
),
134+
"test_bytea": pd.Series(
135+
[
136+
None,
137+
b"\xd0\x97\xd0\xb4\xd1\x80\xd0\xb0\xcc\x81\xd0\xb2\xd1\x81\xd1\x82\xd0\xb2\xd1\x83\xd0\xb9\xd1\x82\xd0\xb5",
138+
b"",
139+
b"\xf0\x9f\x98\x9c",
140+
],
141+
dtype="object",
142+
),
143+
"test_json": pd.Series(
144+
[
145+
'{"customer":"John Doe","items":{"product":"Beer","qty":6}}',
146+
'{"customer":"Lily Bush","items":{"product":"Diaper","qty":24}}',
147+
'{"customer":"Josh William","items":{"product":"Toy Car","qty":1}}',
148+
None,
149+
],
150+
dtype="object",
151+
),
152+
"test_jsonb": pd.Series(
153+
[
154+
'{"product":"Beer","qty":6}',
155+
'{"product":"Diaper","qty":24}',
156+
'{"product":"Toy Car","qty":1}',
157+
None,
158+
],
159+
dtype="object",
160+
),
161+
"test_ltree": pd.Series(
162+
["A.B.C.D", "A.B.E", "A", None], dtype="object"
163+
),
164+
"test_name": pd.Series(
165+
["0", "21", "someName", "101203203-1212323-22131235"]
166+
)
167+
168+
},
169+
)
170+
assert_frame_equal(df, expected, check_names=True)
77171
def test_arrow2_type(postgres_url: str) -> None:
78172
query = "SELECT test_date, test_timestamp, test_timestamptz, test_int16, test_int64, test_float32, test_numeric, test_bpchar, test_char, test_varchar, test_uuid, test_time, test_bytea, test_json, test_jsonb, test_f4array, test_f8array, test_narray, test_i2array, test_i4array, test_i8array, test_enum, test_ltree, test_name FROM test_types"
79173
df = read_sql(postgres_url, query, return_type="arrow2")
@@ -92,7 +186,7 @@ def test_arrow2_type(postgres_url: str) -> None:
92186
"2038-01-18 23:59:59",
93187
None,
94188
],
95-
dtype="datetime64[ns]",
189+
dtype="datetime64[us]",
96190
),
97191
"test_timestamptz": pd.Series(
98192
[
@@ -101,7 +195,7 @@ def test_arrow2_type(postgres_url: str) -> None:
101195
"2038-01-18 15:59:59+00:00",
102196
None,
103197
],
104-
dtype="datetime64[ns, UTC]",
198+
dtype="datetime64[us, UTC]",
105199
),
106200
"test_int16": pd.Series([0, 1, 2, 3], dtype="int32"),
107201
"test_int64": pd.Series(
@@ -190,4 +284,4 @@ def test_arrow2_type(postgres_url: str) -> None:
190284

191285
},
192286
)
193-
assert_frame_equal(df, expected, check_names=True)
287+
assert_frame_equal(df, expected, check_names=True)

connectorx/src/destinations/arrow/arrow_assoc.rs

Lines changed: 151 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
1-
use super::errors::{ArrowDestinationError, Result};
1+
use super::{
2+
errors::{ArrowDestinationError, Result},
3+
typesystem::{DateTimeWrapperMicro, NaiveDateTimeWrapperMicro, NaiveTimeWrapperMicro},
4+
};
25
use crate::constants::SECONDS_IN_DAY;
36
use arrow::array::{
4-
ArrayBuilder, BooleanBuilder, Date32Builder, Date64Builder, Float32Builder, Float64Builder,
5-
Int32Builder, Int64Builder, LargeBinaryBuilder, StringBuilder, Time64NanosecondBuilder,
6-
TimestampNanosecondBuilder, UInt32Builder, UInt64Builder,
7+
ArrayBuilder, BooleanBuilder, Date32Builder, Float32Builder, Float64Builder, Int32Builder,
8+
Int64Builder, LargeBinaryBuilder, StringBuilder, Time64MicrosecondBuilder,
9+
Time64NanosecondBuilder, TimestampMicrosecondBuilder, TimestampNanosecondBuilder,
10+
UInt32Builder, UInt64Builder,
711
};
812
use arrow::datatypes::Field;
913
use arrow::datatypes::{DataType as ArrowDataType, TimeUnit};
@@ -188,6 +192,48 @@ impl ArrowAssoc for Option<DateTime<Utc>> {
188192
}
189193
}
190194

195+
impl ArrowAssoc for DateTimeWrapperMicro {
196+
type Builder = TimestampMicrosecondBuilder;
197+
198+
fn builder(nrows: usize) -> Self::Builder {
199+
TimestampMicrosecondBuilder::with_capacity(nrows).with_timezone("UTC")
200+
}
201+
202+
#[throws(ArrowDestinationError)]
203+
fn append(builder: &mut Self::Builder, value: DateTimeWrapperMicro) {
204+
builder.append_value(value.0.timestamp_micros());
205+
}
206+
207+
fn field(header: &str) -> Field {
208+
Field::new(
209+
header,
210+
ArrowDataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
211+
false,
212+
)
213+
}
214+
}
215+
216+
impl ArrowAssoc for Option<DateTimeWrapperMicro> {
217+
type Builder = TimestampMicrosecondBuilder;
218+
219+
fn builder(nrows: usize) -> Self::Builder {
220+
TimestampMicrosecondBuilder::with_capacity(nrows).with_timezone("UTC")
221+
}
222+
223+
#[throws(ArrowDestinationError)]
224+
fn append(builder: &mut Self::Builder, value: Option<DateTimeWrapperMicro>) {
225+
builder.append_option(value.map(|x| x.0.timestamp_micros()));
226+
}
227+
228+
fn field(header: &str) -> Field {
229+
Field::new(
230+
header,
231+
ArrowDataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
232+
true,
233+
)
234+
}
235+
}
236+
191237
fn naive_date_to_arrow(nd: NaiveDate) -> i32 {
192238
match nd.and_hms_opt(0, 0, 0) {
193239
Some(dt) => (dt.and_utc().timestamp() / SECONDS_IN_DAY) as i32,
@@ -196,7 +242,9 @@ fn naive_date_to_arrow(nd: NaiveDate) -> i32 {
196242
}
197243

198244
fn naive_datetime_to_arrow(nd: NaiveDateTime) -> i64 {
199-
nd.and_utc().timestamp_millis()
245+
nd.and_utc()
246+
.timestamp_nanos_opt()
247+
.unwrap_or_else(|| panic!("out of range DateTime"))
200248
}
201249

202250
impl ArrowAssoc for Option<NaiveDate> {
@@ -234,10 +282,10 @@ impl ArrowAssoc for NaiveDate {
234282
}
235283

236284
impl ArrowAssoc for Option<NaiveDateTime> {
237-
type Builder = Date64Builder;
285+
type Builder = TimestampNanosecondBuilder;
238286

239287
fn builder(nrows: usize) -> Self::Builder {
240-
Date64Builder::with_capacity(nrows)
288+
TimestampNanosecondBuilder::with_capacity(nrows)
241289
}
242290

243291
fn append(builder: &mut Self::Builder, value: Option<NaiveDateTime>) -> Result<()> {
@@ -246,15 +294,19 @@ impl ArrowAssoc for Option<NaiveDateTime> {
246294
}
247295

248296
fn field(header: &str) -> Field {
249-
Field::new(header, ArrowDataType::Date64, true)
297+
Field::new(
298+
header,
299+
ArrowDataType::Timestamp(TimeUnit::Nanosecond, None),
300+
true,
301+
)
250302
}
251303
}
252304

253305
impl ArrowAssoc for NaiveDateTime {
254-
type Builder = Date64Builder;
306+
type Builder = TimestampNanosecondBuilder;
255307

256308
fn builder(nrows: usize) -> Self::Builder {
257-
Date64Builder::with_capacity(nrows)
309+
TimestampNanosecondBuilder::with_capacity(nrows)
258310
}
259311

260312
fn append(builder: &mut Self::Builder, value: NaiveDateTime) -> Result<()> {
@@ -263,7 +315,56 @@ impl ArrowAssoc for NaiveDateTime {
263315
}
264316

265317
fn field(header: &str) -> Field {
266-
Field::new(header, ArrowDataType::Date64, false)
318+
Field::new(
319+
header,
320+
ArrowDataType::Timestamp(TimeUnit::Nanosecond, None),
321+
false,
322+
)
323+
}
324+
}
325+
326+
impl ArrowAssoc for Option<NaiveDateTimeWrapperMicro> {
327+
type Builder = TimestampMicrosecondBuilder;
328+
329+
fn builder(nrows: usize) -> Self::Builder {
330+
TimestampMicrosecondBuilder::with_capacity(nrows)
331+
}
332+
333+
fn append(builder: &mut Self::Builder, value: Option<NaiveDateTimeWrapperMicro>) -> Result<()> {
334+
builder.append_option(match value {
335+
Some(v) => Some(v.0.and_utc().timestamp_micros()),
336+
None => None,
337+
});
338+
Ok(())
339+
}
340+
341+
fn field(header: &str) -> Field {
342+
Field::new(
343+
header,
344+
ArrowDataType::Timestamp(TimeUnit::Microsecond, None),
345+
true,
346+
)
347+
}
348+
}
349+
350+
impl ArrowAssoc for NaiveDateTimeWrapperMicro {
351+
type Builder = TimestampMicrosecondBuilder;
352+
353+
fn builder(nrows: usize) -> Self::Builder {
354+
TimestampMicrosecondBuilder::with_capacity(nrows)
355+
}
356+
357+
fn append(builder: &mut Self::Builder, value: NaiveDateTimeWrapperMicro) -> Result<()> {
358+
builder.append_value(value.0.and_utc().timestamp_micros());
359+
Ok(())
360+
}
361+
362+
fn field(header: &str) -> Field {
363+
Field::new(
364+
header,
365+
ArrowDataType::Timestamp(TimeUnit::Microsecond, None),
366+
false,
367+
)
267368
}
268369
}
269370

@@ -307,6 +408,45 @@ impl ArrowAssoc for NaiveTime {
307408
}
308409
}
309410

411+
impl ArrowAssoc for Option<NaiveTimeWrapperMicro> {
412+
type Builder = Time64MicrosecondBuilder;
413+
414+
fn builder(nrows: usize) -> Self::Builder {
415+
Time64MicrosecondBuilder::with_capacity(nrows)
416+
}
417+
418+
fn append(builder: &mut Self::Builder, value: Option<NaiveTimeWrapperMicro>) -> Result<()> {
419+
builder.append_option(value.map(|t| {
420+
t.0.num_seconds_from_midnight() as i64 * 1_000_000 + (t.0.nanosecond() as i64) / 1000
421+
}));
422+
Ok(())
423+
}
424+
425+
fn field(header: &str) -> Field {
426+
Field::new(header, ArrowDataType::Time64(TimeUnit::Microsecond), true)
427+
}
428+
}
429+
430+
impl ArrowAssoc for NaiveTimeWrapperMicro {
431+
type Builder = Time64MicrosecondBuilder;
432+
433+
fn builder(nrows: usize) -> Self::Builder {
434+
Time64MicrosecondBuilder::with_capacity(nrows)
435+
}
436+
437+
fn append(builder: &mut Self::Builder, value: NaiveTimeWrapperMicro) -> Result<()> {
438+
builder.append_value(
439+
value.0.num_seconds_from_midnight() as i64 * 1_000_000
440+
+ (value.0.nanosecond() as i64) / 1000,
441+
);
442+
Ok(())
443+
}
444+
445+
fn field(header: &str) -> Field {
446+
Field::new(header, ArrowDataType::Time64(TimeUnit::Microsecond), false)
447+
}
448+
}
449+
310450
impl ArrowAssoc for Option<Vec<u8>> {
311451
type Builder = LargeBinaryBuilder;
312452

connectorx/src/destinations/arrow/typesystem.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,15 @@
11
use crate::impl_typesystem;
22
use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc};
33

4+
#[derive(Debug, Clone, Copy)]
5+
pub struct DateTimeWrapperMicro(pub DateTime<Utc>);
6+
7+
#[derive(Debug, Clone, Copy)]
8+
pub struct NaiveTimeWrapperMicro(pub NaiveTime);
9+
10+
#[derive(Debug, Clone, Copy)]
11+
pub struct NaiveDateTimeWrapperMicro(pub NaiveDateTime);
12+
413
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
514
pub enum ArrowTypeSystem {
615
Int32(bool),
@@ -14,8 +23,11 @@ pub enum ArrowTypeSystem {
1423
LargeBinary(bool),
1524
Date32(bool),
1625
Date64(bool),
26+
Date64Micro(bool),
1727
Time64(bool),
28+
Time64Micro(bool),
1829
DateTimeTz(bool),
30+
DateTimeTzMicro(bool),
1931
}
2032

2133
impl_typesystem! {
@@ -32,7 +44,10 @@ impl_typesystem! {
3244
{ LargeBinary => Vec<u8> }
3345
{ Date32 => NaiveDate }
3446
{ Date64 => NaiveDateTime }
47+
{ Date64Micro => NaiveDateTimeWrapperMicro }
3548
{ Time64 => NaiveTime }
49+
{ Time64Micro => NaiveTimeWrapperMicro }
3650
{ DateTimeTz => DateTime<Utc> }
51+
{ DateTimeTzMicro => DateTimeWrapperMicro }
3752
}
3853
}

0 commit comments

Comments
 (0)