Skip to content

Commit 2a3db1f

Browse files
Add infinite support for date/datetime on binary
1 parent 94aef88 commit 2a3db1f

File tree

2 files changed

+271
-13
lines changed

2 files changed

+271
-13
lines changed

connectorx/src/sources/postgres/mod.rs

Lines changed: 171 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -476,14 +476,110 @@ impl_produce!(
476476
&'r str,
477477
Vec<u8>,
478478
NaiveTime,
479-
NaiveDateTime,
480-
DateTime<Utc>,
481-
NaiveDate,
479+
// NaiveDateTime,
480+
// DateTime<Utc>,
481+
// NaiveDate,
482482
Uuid,
483483
Value,
484484
Vec<String>,
485485
);
486486

487+
impl<'r, 'a> Produce<'r, NaiveDateTime> for PostgresBinarySourcePartitionParser<'a> {
488+
type Error = PostgresSourceError;
489+
490+
#[throws(PostgresSourceError)]
491+
fn produce(&'r mut self) -> NaiveDateTime {
492+
let (ridx, cidx) = self.next_loc()?;
493+
let row = &self.rowbuf[ridx];
494+
let val = row.try_get(cidx)?;
495+
match val {
496+
postgres::types::Timestamp::PosInfinity => NaiveDateTime::MAX,
497+
postgres::types::Timestamp::NegInfinity => NaiveDateTime::MIN,
498+
postgres::types::Timestamp::Value(t) => t,
499+
}
500+
}
501+
}
502+
503+
impl<'r, 'a> Produce<'r, Option<NaiveDateTime>> for PostgresBinarySourcePartitionParser<'a> {
504+
type Error = PostgresSourceError;
505+
506+
#[throws(PostgresSourceError)]
507+
fn produce(&'r mut self) -> Option<NaiveDateTime> {
508+
let (ridx, cidx) = self.next_loc()?;
509+
let row = &self.rowbuf[ridx];
510+
let val = row.try_get(cidx)?;
511+
match val {
512+
postgres::types::Timestamp::PosInfinity => Some(NaiveDateTime::MAX),
513+
postgres::types::Timestamp::NegInfinity => Some(NaiveDateTime::MIN),
514+
postgres::types::Timestamp::Value(t) => t,
515+
}
516+
}
517+
}
518+
519+
impl<'r, 'a> Produce<'r, DateTime<Utc>> for PostgresBinarySourcePartitionParser<'a> {
520+
type Error = PostgresSourceError;
521+
522+
#[throws(PostgresSourceError)]
523+
fn produce(&'r mut self) -> DateTime<Utc> {
524+
let (ridx, cidx) = self.next_loc()?;
525+
let row = &self.rowbuf[ridx];
526+
let val = row.try_get(cidx)?;
527+
match val {
528+
postgres::types::Timestamp::PosInfinity => DateTime::<Utc>::MAX_UTC,
529+
postgres::types::Timestamp::NegInfinity => DateTime::<Utc>::MIN_UTC,
530+
postgres::types::Timestamp::Value(t) => t,
531+
}
532+
}
533+
}
534+
535+
impl<'r, 'a> Produce<'r, Option<DateTime<Utc>>> for PostgresBinarySourcePartitionParser<'a> {
536+
type Error = PostgresSourceError;
537+
538+
#[throws(PostgresSourceError)]
539+
fn produce(&'r mut self) -> Option<DateTime<Utc>> {
540+
let (ridx, cidx) = self.next_loc()?;
541+
let row = &self.rowbuf[ridx];
542+
let val = row.try_get(cidx)?;
543+
match val {
544+
postgres::types::Timestamp::PosInfinity => Some(DateTime::<Utc>::MAX_UTC),
545+
postgres::types::Timestamp::NegInfinity => Some(DateTime::<Utc>::MIN_UTC),
546+
postgres::types::Timestamp::Value(t) => t,
547+
}
548+
}
549+
}
550+
551+
impl<'r, 'a> Produce<'r, NaiveDate> for PostgresBinarySourcePartitionParser<'a> {
552+
type Error = PostgresSourceError;
553+
554+
#[throws(PostgresSourceError)]
555+
fn produce(&'r mut self) -> NaiveDate {
556+
let (ridx, cidx) = self.next_loc()?;
557+
let row = &self.rowbuf[ridx];
558+
let val = row.try_get(cidx)?;
559+
match val {
560+
postgres::types::Date::PosInfinity => NaiveDate::MAX,
561+
postgres::types::Date::NegInfinity => NaiveDate::MIN,
562+
postgres::types::Date::Value(t) => t,
563+
}
564+
}
565+
}
566+
567+
impl<'r, 'a> Produce<'r, Option<NaiveDate>> for PostgresBinarySourcePartitionParser<'a> {
568+
type Error = PostgresSourceError;
569+
570+
#[throws(PostgresSourceError)]
571+
fn produce(&'r mut self) -> Option<NaiveDate> {
572+
let (ridx, cidx) = self.next_loc()?;
573+
let row = &self.rowbuf[ridx];
574+
let val = row.try_get(cidx)?;
575+
match val {
576+
postgres::types::Date::PosInfinity => Some(NaiveDate::MAX),
577+
postgres::types::Date::NegInfinity => Some(NaiveDate::MIN),
578+
postgres::types::Date::Value(t) => t,
579+
}
580+
}
581+
}
582+
487583
impl<'r, 'a> Produce<'r, HashMap<String, Option<String>>>
488584
for PostgresBinarySourcePartitionParser<'a>
489585
{
@@ -1112,15 +1208,78 @@ impl_produce!(
11121208
&'r str,
11131209
Vec<u8>,
11141210
NaiveTime,
1115-
NaiveDateTime,
1116-
DateTime<Utc>,
1117-
// NaiveDate,
11181211
Uuid,
11191212
Value,
11201213
HashMap<String, Option<String>>,
11211214
Vec<String>,
11221215
);
11231216

1217+
impl<'r, 'a> Produce<'r, DateTime<Utc>> for PostgresRawSourceParser<'a> {
1218+
type Error = PostgresSourceError;
1219+
1220+
#[throws(PostgresSourceError)]
1221+
fn produce(&'r mut self) -> DateTime<Utc> {
1222+
let (ridx, cidx) = self.next_loc()?;
1223+
let row = &self.rowbuf[ridx];
1224+
let val: postgres::types::Timestamp<DateTime<Utc>> = row.try_get(cidx)?;
1225+
match val {
1226+
postgres::types::Timestamp::PosInfinity => DateTime::<Utc>::MAX_UTC,
1227+
postgres::types::Timestamp::NegInfinity => DateTime::<Utc>::MIN_UTC,
1228+
postgres::types::Timestamp::Value(t) => t,
1229+
}
1230+
}
1231+
}
1232+
1233+
impl<'r, 'a> Produce<'r, Option<DateTime<Utc>>> for PostgresRawSourceParser<'a> {
1234+
type Error = PostgresSourceError;
1235+
1236+
#[throws(PostgresSourceError)]
1237+
fn produce(&'r mut self) -> Option<DateTime<Utc>> {
1238+
let (ridx, cidx) = self.next_loc()?;
1239+
let row = &self.rowbuf[ridx];
1240+
let val = row.try_get(cidx)?;
1241+
match val {
1242+
postgres::types::Timestamp::PosInfinity => Some(DateTime::<Utc>::MAX_UTC),
1243+
postgres::types::Timestamp::NegInfinity => Some(DateTime::<Utc>::MIN_UTC),
1244+
postgres::types::Timestamp::Value(t) => t,
1245+
}
1246+
1247+
}
1248+
}
1249+
1250+
impl<'r, 'a> Produce<'r, NaiveDateTime> for PostgresRawSourceParser<'a> {
1251+
type Error = PostgresSourceError;
1252+
1253+
#[throws(PostgresSourceError)]
1254+
fn produce(&'r mut self) -> NaiveDateTime {
1255+
let (ridx, cidx) = self.next_loc()?;
1256+
let row = &self.rowbuf[ridx];
1257+
let val: postgres::types::Timestamp<NaiveDateTime> = row.try_get(cidx)?;
1258+
match val {
1259+
postgres::types::Timestamp::PosInfinity => NaiveDateTime::MAX,
1260+
postgres::types::Timestamp::NegInfinity => NaiveDateTime::MIN,
1261+
postgres::types::Timestamp::Value(t) => t,
1262+
}
1263+
}
1264+
}
1265+
1266+
impl<'r, 'a> Produce<'r, Option<NaiveDateTime>> for PostgresRawSourceParser<'a> {
1267+
type Error = PostgresSourceError;
1268+
1269+
#[throws(PostgresSourceError)]
1270+
fn produce(&'r mut self) -> Option<NaiveDateTime> {
1271+
let (ridx, cidx) = self.next_loc()?;
1272+
let row = &self.rowbuf[ridx];
1273+
let val = row.try_get(cidx)?;
1274+
match val {
1275+
postgres::types::Timestamp::PosInfinity => Some(NaiveDateTime::MAX),
1276+
postgres::types::Timestamp::NegInfinity => Some(NaiveDateTime::MIN),
1277+
postgres::types::Timestamp::Value(t) => t,
1278+
}
1279+
1280+
}
1281+
}
1282+
11241283
impl<'r, 'a> Produce<'r, NaiveDate> for PostgresRawSourceParser<'a> {
11251284
type Error = PostgresSourceError;
11261285

@@ -1145,7 +1304,12 @@ impl<'r, 'a> Produce<'r, Option<NaiveDate>> for PostgresRawSourceParser<'a> {
11451304
let (ridx, cidx) = self.next_loc()?;
11461305
let row = &self.rowbuf[ridx];
11471306
let val = row.try_get(cidx)?;
1148-
val
1307+
match val {
1308+
postgres::types::Date::PosInfinity => Some(NaiveDate::MAX),
1309+
postgres::types::Date::NegInfinity => Some(NaiveDate::MIN),
1310+
postgres::types::Date::Value(t) => t,
1311+
}
1312+
11491313
}
11501314
}
11511315

connectorx/tests/test_postgres.rs

Lines changed: 100 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -152,18 +152,66 @@ fn test_postgres() {
152152
}
153153

154154
#[test]
155-
fn test_csv_infinite_values_cursor_proto() {
155+
fn test_csv_infinite_values_binary_proto_option() {
156156

157157
let _ = env_logger::builder().is_test(true).try_init();
158158

159159
let dburl = env::var("POSTGRES_URL").unwrap();
160160
#[derive(Debug, PartialEq)]
161-
struct Row(i32, NaiveDate, NaiveDateTime, Decimal, DateTime<Utc>);
161+
struct Row(Option<NaiveDate>, Option<NaiveDateTime>, Option<DateTime<Utc>>);
162+
163+
let url = Url::parse(dburl.as_str()).unwrap();
164+
let (config, _tls) = rewrite_tls_args(&url).unwrap();
165+
let mut source = PostgresSource::<BinaryProtocol, NoTls>::new(config, NoTls, 1).unwrap();
166+
source.set_queries(&[CXQuery::naked("SELECT test_date, test_timestamp, test_timestamp_timezone FROM test_infinite_values")]);
167+
source.fetch_metadata().unwrap();
168+
169+
let mut partitions = source.partition().unwrap();
170+
assert!(partitions.len() == 1);
171+
let mut partition = partitions.remove(0);
172+
partition.result_rows().expect("run query");
173+
174+
assert_eq!(2, partition.nrows());
175+
assert_eq!(3, partition.ncols());
176+
177+
let mut parser = partition.parser().unwrap();
178+
179+
let mut rows: Vec<Row> = Vec::new();
180+
loop {
181+
let (n, is_last) = parser.fetch_next().unwrap();
182+
for _i in 0..n {
183+
rows.push(Row(
184+
parser.produce().unwrap(),
185+
parser.produce().unwrap(),
186+
parser.produce().unwrap(),
187+
));
188+
}
189+
if is_last {
190+
break;
191+
}
192+
}
193+
assert_eq!(
194+
vec![
195+
Row(Some(NaiveDate::MAX), Some(NaiveDateTime::MAX), Some(DateTime::<Utc>::MAX_UTC)),
196+
Row(Some(NaiveDate::MIN), Some(NaiveDateTime::MIN), Some(DateTime::<Utc>::MIN_UTC)),
197+
],
198+
rows
199+
);
200+
}
201+
202+
#[test]
203+
fn test_csv_infinite_values_cursor_proto_option() {
204+
205+
let _ = env_logger::builder().is_test(true).try_init();
206+
207+
let dburl = env::var("POSTGRES_URL").unwrap();
208+
#[derive(Debug, PartialEq)]
209+
struct Row(Option<NaiveDate>, Option<NaiveDateTime>, Option<DateTime<Utc>>);
162210

163211
let url = Url::parse(dburl.as_str()).unwrap();
164212
let (config, _tls) = rewrite_tls_args(&url).unwrap();
165213
let mut source = PostgresSource::<CursorProtocol, NoTls>::new(config, NoTls, 1).unwrap();
166-
source.set_queries(&[CXQuery::naked("select * from test_infinite_values")]);
214+
source.set_queries(&[CXQuery::naked("SELECT test_date, test_timestamp, test_timestamp_timezone FROM test_infinite_values")]);
167215
source.fetch_metadata().unwrap();
168216

169217
let mut partitions = source.partition().unwrap();
@@ -172,7 +220,7 @@ fn test_csv_infinite_values_cursor_proto() {
172220
partition.result_rows().expect("run query");
173221

174222
assert_eq!(2, partition.nrows());
175-
assert_eq!(5, partition.ncols());
223+
assert_eq!(3, partition.ncols());
176224

177225
let mut parser = partition.parser().unwrap();
178226

@@ -184,6 +232,52 @@ fn test_csv_infinite_values_cursor_proto() {
184232
parser.produce().unwrap(),
185233
parser.produce().unwrap(),
186234
parser.produce().unwrap(),
235+
));
236+
}
237+
if is_last {
238+
break;
239+
}
240+
}
241+
assert_eq!(
242+
vec![
243+
Row(Some(NaiveDate::MAX), Some(NaiveDateTime::MAX), Some(DateTime::<Utc>::MAX_UTC)),
244+
Row(Some(NaiveDate::MIN), Some(NaiveDateTime::MIN), Some(DateTime::<Utc>::MIN_UTC)),
245+
],
246+
rows
247+
);
248+
}
249+
250+
#[test]
251+
fn test_csv_infinite_values_cursor_proto() {
252+
253+
let _ = env_logger::builder().is_test(true).try_init();
254+
255+
let dburl = env::var("POSTGRES_URL").unwrap();
256+
#[derive(Debug, PartialEq)]
257+
struct Row(NaiveDate, NaiveDateTime, DateTime<Utc>);
258+
259+
let url = Url::parse(dburl.as_str()).unwrap();
260+
let (config, _tls) = rewrite_tls_args(&url).unwrap();
261+
let mut source = PostgresSource::<CursorProtocol, NoTls>::new(config, NoTls, 1).unwrap();
262+
source.set_queries(&[CXQuery::naked("SELECT test_date, test_timestamp, test_timestamp_timezone FROM test_infinite_values")]);
263+
source.fetch_metadata().unwrap();
264+
265+
let mut partitions = source.partition().unwrap();
266+
assert!(partitions.len() == 1);
267+
let mut partition = partitions.remove(0);
268+
partition.result_rows().expect("run query");
269+
270+
assert_eq!(2, partition.nrows());
271+
assert_eq!(3, partition.ncols());
272+
273+
let mut parser = partition.parser().unwrap();
274+
275+
let mut rows: Vec<Row> = Vec::new();
276+
loop {
277+
let (n, is_last) = parser.fetch_next().unwrap();
278+
for _i in 0..n {
279+
rows.push(Row(
280+
parser.produce().unwrap(),
187281
parser.produce().unwrap(),
188282
parser.produce().unwrap(),
189283
));
@@ -194,8 +288,8 @@ fn test_csv_infinite_values_cursor_proto() {
194288
}
195289
assert_eq!(
196290
vec![
197-
Row(1, NaiveDate::MAX, NaiveDateTime::MAX, Decimal::MAX, DateTime::<Utc>::MAX_UTC),
198-
Row(2, NaiveDate::MIN, NaiveDateTime::MIN, Decimal::MIN, DateTime::<Utc>::MIN_UTC),
291+
Row(NaiveDate::MAX, NaiveDateTime::MAX, DateTime::<Utc>::MAX_UTC),
292+
Row(NaiveDate::MIN, NaiveDateTime::MIN, DateTime::<Utc>::MIN_UTC),
199293
],
200294
rows
201295
);

0 commit comments

Comments
 (0)