diff --git a/connectorx-python/connectorx/tests/test_postgres.py b/connectorx-python/connectorx/tests/test_postgres.py index eb424a91ce..2836f23014 100644 --- a/connectorx-python/connectorx/tests/test_postgres.py +++ b/connectorx-python/connectorx/tests/test_postgres.py @@ -767,6 +767,92 @@ def test_postgres_types_cursor(postgres_url: str) -> None: assert_frame_equal(df, expected, check_names=True) +def test_types_simple(postgres_url: str) -> None: + query = "SELECT test_date, test_timestamp, test_timestamptz, test_int16, test_int64, test_float32, test_numeric, test_bpchar, test_char, test_varchar, test_uuid, test_time, test_bytea, test_enum, test_f4array, test_f8array, test_narray, test_i2array, test_i4array, test_i8array FROM test_types" + df = read_sql(postgres_url, query, protocol="simple") + expected = pd.DataFrame( + index=range(4), + data={ + "test_date": pd.Series( + ["1970-01-01", "2000-02-28", "2038-01-18", None], dtype="datetime64[ns]" + ), + "test_timestamp": pd.Series( + [ + "1970-01-01 00:00:01", + "2000-02-28 12:00:10", + "2038-01-18 23:59:59", + None, + ], + dtype="datetime64[ns]", + ), + "test_timestamptz": pd.Series( + [ + "1970-01-01 00:00:01", + "2000-02-28 16:00:10", + "2038-01-18 15:59:59", + None, + ], + dtype="datetime64[ns]", + ), + "test_int16": pd.Series([0, 1, 2, 3], dtype="Int64"), + "test_int64": pd.Series( + [-9223372036854775808, 0, 9223372036854775807, None], dtype="Int64" + ), + "test_float32": pd.Series( + [None, 3.1415926535, 2.71, -1e-37], dtype="float64" + ), + "test_numeric": pd.Series([None, 521.34, 999.99, 0.00], dtype="float64"), + "test_bpchar": pd.Series(["a ", "bb ", "ccc ", None], dtype="object"), + "test_char": pd.Series(["a", "b", None, "d"], dtype="object"), + "test_varchar": pd.Series([None, "bb", "c", "defghijklm"], dtype="object"), + "test_uuid": pd.Series( + [ + "86b494cc-96b2-11eb-9298-3e22fbb9fe9d", + "86b49b84-96b2-11eb-9298-3e22fbb9fe9d", + "86b49c42-96b2-11eb-9298-3e22fbb9fe9d", + None, + ], + dtype="object", + ), + "test_time": pd.Series( + ["08:12:40", None, "23:00:10", "18:30:00"], dtype="object" + ), + "test_bytea": pd.Series( + [ + None, + b"\xd0\x97\xd0\xb4\xd1\x80\xd0\xb0\xcc\x81\xd0\xb2\xd1\x81\xd1\x82\xd0\xb2\xd1\x83\xd0\xb9\xd1\x82\xd0\xb5", + b"", + b"\xf0\x9f\x98\x9c", + ], + dtype="object", + ), + "test_enum": pd.Series( + ["happy", "very happy", "ecstatic", None], dtype="object" + ), + "test_f4array": pd.Series( + [[], None, [123.123], [-1e-37, 1e37]], dtype="object" + ), + "test_f8array": pd.Series( + [[], None, [1e-307, 1e308], [0.000234, -12.987654321]], dtype="object" + ), + "test_narray": pd.Series( + [[], None, [521.34], [0.12, 333.33, 22.22]], dtype="object" + ), + "test_i2array": pd.Series( + [[-1, 0, 1], [], [-32768, 32767], None], dtype="object" + ), + "test_i4array": pd.Series( + [[-1, 0, 1123], [], [-2147483648, 2147483647], None], dtype="object" + ), + "test_i8array": pd.Series( + [[-9223372036854775808, 9223372036854775807], [], [0], None], + dtype="object", + ), + }, + ) + assert_frame_equal(df, expected, check_names=True) + + def test_empty_result(postgres_url: str) -> None: query = "SELECT * FROM test_table where test_int < -100" df = read_sql(postgres_url, query) diff --git a/connectorx-python/src/pandas/get_meta.rs b/connectorx-python/src/pandas/get_meta.rs index 6397e4b515..eec2ea58db 100644 --- a/connectorx-python/src/pandas/get_meta.rs +++ b/connectorx-python/src/pandas/get_meta.rs @@ -15,6 +15,7 @@ use connectorx::{ mysql::{BinaryProtocol as MySQLBinaryProtocol, MySQLSource, TextProtocol}, postgres::{ rewrite_tls_args, BinaryProtocol as PgBinaryProtocol, CSVProtocol, CursorProtocol, + SimpleProtocol, PostgresSource, }, sqlite::SQLiteSource, @@ -107,6 +108,28 @@ pub fn get_meta<'a>(py: Python<'a>, conn: &str, protocol: &str, query: String) - debug!("Running dispatcher"); dispatcher.get_meta()?; } + ("simple", Some(tls_conn)) => { + let sb = PostgresSource::::new( + config, tls_conn, 1, + )?; + let dispatcher = Dispatcher::< + _, + _, + PostgresPandasTransport, + >::new(sb, &mut destination, queries, None); + debug!("Running dispatcher"); + dispatcher.get_meta()?; + } + ("simple", None) => { + let sb = PostgresSource::::new(config, NoTls, 1)?; + let dispatcher = Dispatcher::< + _, + _, + PostgresPandasTransport, + >::new(sb, &mut destination, queries, None); + debug!("Running dispatcher"); + dispatcher.get_meta()?; + } _ => unimplemented!("{} protocol not supported", protocol), } } diff --git a/connectorx-python/src/pandas/mod.rs b/connectorx-python/src/pandas/mod.rs index 9a2755a304..c2cadc8c9c 100644 --- a/connectorx-python/src/pandas/mod.rs +++ b/connectorx-python/src/pandas/mod.rs @@ -19,7 +19,7 @@ use connectorx::{ sources::{ mysql::{BinaryProtocol as MySQLBinaryProtocol, TextProtocol}, postgres::{ - rewrite_tls_args, BinaryProtocol as PgBinaryProtocol, CSVProtocol, CursorProtocol, + rewrite_tls_args, BinaryProtocol as PgBinaryProtocol, CSVProtocol, CursorProtocol, SimpleProtocol }, }, sql::CXQuery, @@ -130,6 +130,33 @@ pub fn write_pandas<'a>( ); dispatcher.run()?; } + ("simple", Some(tls_conn)) => { + let sb = PostgresSource::::new( + config, + tls_conn, + queries.len(), + )?; + let dispatcher = Dispatcher::< + _, + _, + PostgresPandasTransport, + >::new( + sb, &mut destination, queries, origin_query + ); + dispatcher.run()?; + } + ("simple", None) => { + let sb = + PostgresSource::::new(config, NoTls, queries.len())?; + let dispatcher = Dispatcher::< + _, + _, + PostgresPandasTransport, + >::new( + sb, &mut destination, queries, origin_query + ); + dispatcher.run()?; + } _ => unimplemented!("{} protocol not supported", protocol), } } diff --git a/connectorx-python/src/pandas/transports/postgres.rs b/connectorx-python/src/pandas/transports/postgres.rs index d156afeb87..f5dcd2285e 100644 --- a/connectorx-python/src/pandas/transports/postgres.rs +++ b/connectorx-python/src/pandas/transports/postgres.rs @@ -4,7 +4,8 @@ use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc}; use connectorx::{ impl_transport, sources::postgres::{ - BinaryProtocol, CSVProtocol, CursorProtocol, PostgresSource, PostgresTypeSystem, + BinaryProtocol, CSVProtocol, CursorProtocol, SimpleProtocol, + PostgresSource, PostgresTypeSystem, }, typesystem::TypeConversion, }; @@ -64,6 +65,8 @@ impl_postgres_transport!(CSVProtocol, NoTls); impl_postgres_transport!(CSVProtocol, MakeTlsConnector); impl_postgres_transport!(CursorProtocol, NoTls); impl_postgres_transport!(CursorProtocol, MakeTlsConnector); +impl_postgres_transport!(SimpleProtocol, NoTls); +impl_postgres_transport!(SimpleProtocol, MakeTlsConnector); impl<'py, P, C> TypeConversion>, String> for PostgresPandasTransport<'py, P, C> diff --git a/connectorx/src/get_arrow.rs b/connectorx/src/get_arrow.rs index ba4a61d190..0356818150 100644 --- a/connectorx/src/get_arrow.rs +++ b/connectorx/src/get_arrow.rs @@ -3,6 +3,7 @@ use crate::sources::mysql::{BinaryProtocol as MySQLBinaryProtocol, TextProtocol} #[cfg(feature = "src_postgres")] use crate::sources::postgres::{ rewrite_tls_args, BinaryProtocol as PgBinaryProtocol, CSVProtocol, CursorProtocol, + SimpleProtocol, }; use crate::{prelude::*, sql::CXQuery}; use fehler::{throw, throws}; @@ -114,6 +115,35 @@ pub fn get_arrow( ); dispatcher.run()?; } + ("simple", Some(tls_conn)) => { + let sb = PostgresSource::::new( + config, + tls_conn, + queries.len(), + )?; + let dispatcher = Dispatcher::< + _, + _, + PostgresArrowTransport, + >::new( + sb, &mut destination, queries, origin_query + ); + debug!("Running dispatcher"); + dispatcher.run()?; + } + ("simple", None) => { + let sb = + PostgresSource::::new(config, NoTls, queries.len())?; + let dispatcher = Dispatcher::< + _, + _, + PostgresArrowTransport, + >::new( + sb, &mut destination, queries, origin_query + ); + debug!("Running dispatcher"); + dispatcher.run()?; + } _ => unimplemented!("{} protocol not supported", protocol), } } diff --git a/connectorx/src/get_arrow2.rs b/connectorx/src/get_arrow2.rs index dc91611bf9..70a8bdc372 100644 --- a/connectorx/src/get_arrow2.rs +++ b/connectorx/src/get_arrow2.rs @@ -3,6 +3,7 @@ use crate::sources::mysql::{BinaryProtocol as MySQLBinaryProtocol, TextProtocol} #[cfg(feature = "src_postgres")] use crate::sources::postgres::{ rewrite_tls_args, BinaryProtocol as PgBinaryProtocol, CSVProtocol, CursorProtocol, + SimpleProtocol, }; use crate::{prelude::*, sql::CXQuery}; use fehler::{throw, throws}; @@ -114,6 +115,36 @@ pub fn get_arrow2( ); dispatcher.run()?; } + ("simple", Some(tls_conn)) => { + let sb = PostgresSource::::new( + config, + tls_conn, + queries.len(), + )?; + let dispatcher = Dispatcher::< + _, + _, + PostgresArrow2Transport, + >::new( + sb, &mut destination, queries, origin_query + ); + debug!("Running dispatcher"); + dispatcher.run()?; + } + ("simple", None) => { + let sb = + PostgresSource::::new(config, NoTls, queries.len())?; + let dispatcher = Dispatcher::< + _, + _, + PostgresArrow2Transport, + >::new( + sb, &mut destination, queries, origin_query + ); + debug!("Running dispatcher"); + dispatcher.run()?; + } + _ => unimplemented!("{} protocol not supported", protocol), } } diff --git a/connectorx/src/sources/postgres/mod.rs b/connectorx/src/sources/postgres/mod.rs index 5071769905..dcff1208e7 100644 --- a/connectorx/src/sources/postgres/mod.rs +++ b/connectorx/src/sources/postgres/mod.rs @@ -16,7 +16,7 @@ use crate::{ sql::{count_query, CXQuery}, }; use anyhow::anyhow; -use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc}; +use chrono::{DateTime, FixedOffset, NaiveDate, NaiveDateTime, NaiveTime, Utc}; use csv::{ReaderBuilder, StringRecord, StringRecordsIntoIter}; use fehler::{throw, throws}; use hex::decode; @@ -24,7 +24,7 @@ use postgres::{ binary_copy::{BinaryCopyOutIter, BinaryCopyOutRow}, fallible_iterator::FallibleIterator, tls::{MakeTlsConnect, TlsConnect}, - Config, CopyOutReader, Row, RowIter, Socket, + Config, CopyOutReader, Row, RowIter, SimpleQueryMessage, Socket, }; use r2d2::{Pool, PooledConnection}; use r2d2_postgres::PostgresConnectionManager; @@ -45,6 +45,9 @@ pub enum CSVProtocol {} /// Protocol - use Cursor pub enum CursorProtocol {} +/// Protocol - use Simple Query +pub enum SimpleProtocol {} + type PgManager = PostgresConnectionManager; type PgConn = PooledConnection>; @@ -974,3 +977,534 @@ impl_produce!( Value, HashMap>, ); + +impl SourcePartition for PostgresSourcePartition +where + C: MakeTlsConnect + Clone + 'static + Sync + Send, + C::TlsConnect: Send, + C::Stream: Send, + >::Future: Send, +{ + type TypeSystem = PostgresTypeSystem; + type Parser<'a> = PostgresSimpleSourceParser; + type Error = PostgresSourceError; + + #[throws(PostgresSourceError)] + fn result_rows(&mut self) { + self.nrows = get_total_rows(&mut self.conn, &self.query)?; + } + + #[throws(PostgresSourceError)] + fn parser(&mut self) -> Self::Parser<'_> { + let rows = self.conn.simple_query(self.query.as_str())?; // unless reading the data, it seems like issue the query is fast + PostgresSimpleSourceParser::new(rows, &self.schema) + } + + fn nrows(&self) -> usize { + self.nrows + } + + fn ncols(&self) -> usize { + self.ncols + } +} + +pub struct PostgresSimpleSourceParser { + rows: Vec, + ncols: usize, + current_col: usize, + current_row: usize, +} +impl<'a> PostgresSimpleSourceParser { + pub fn new(rows: Vec, schema: &[PostgresTypeSystem]) -> Self { + Self { + rows, + ncols: schema.len(), + current_row: 0, + current_col: 0, + } + } + + #[throws(PostgresSourceError)] + fn next_loc(&mut self) -> (usize, usize) { + let ret = (self.current_row, self.current_col); + self.current_row += (self.current_col + 1) / self.ncols; + self.current_col = (self.current_col + 1) % self.ncols; + ret + } +} + +impl<'a> PartitionParser<'a> for PostgresSimpleSourceParser { + type TypeSystem = PostgresTypeSystem; + type Error = PostgresSourceError; + + #[throws(PostgresSourceError)] + fn fetch_next(&mut self) -> (usize, bool) { + self.current_row = 0; + self.current_col = 0; + (self.rows.len() - 1, true) // last message is command complete + } +} + +macro_rules! impl_simple_produce_unimplemented { + ($($t: ty,)+) => { + $( + impl<'r, 'a> Produce<'r, $t> for PostgresSimpleSourceParser { + type Error = PostgresSourceError; + + #[throws(PostgresSourceError)] + fn produce(&'r mut self) -> $t { + unimplemented!("not implemented!"); + } + } + + impl<'r, 'a> Produce<'r, Option<$t>> for PostgresSimpleSourceParser { + type Error = PostgresSourceError; + + #[throws(PostgresSourceError)] + fn produce(&'r mut self) -> Option<$t> { + unimplemented!("not implemented!"); + } + } + )+ + }; +} + +macro_rules! impl_simple_produce { + ($($t: ty,)+) => { + $( + impl<'r> Produce<'r, $t> for PostgresSimpleSourceParser { + type Error = PostgresSourceError; + + #[throws(PostgresSourceError)] + fn produce(&'r mut self) -> $t { + let (ridx, cidx) = self.next_loc()?; + let val = match &self.rows[ridx] { + SimpleQueryMessage::Row(row) => match row.try_get(cidx)? { + Some(s) => s + .parse() + .map_err(|_| ConnectorXError::cannot_produce::<$t>(Some(s.into())))?, + None => throw!(anyhow!( + "Cannot parse NULL in NOT NULL column." + )), + }, + SimpleQueryMessage::CommandComplete(c) => { + panic!("get command: {}", c); + } + _ => { + panic!("what?"); + } + }; + val + } + } + + impl<'r, 'a> Produce<'r, Option<$t>> for PostgresSimpleSourceParser { + type Error = PostgresSourceError; + + #[throws(PostgresSourceError)] + fn produce(&'r mut self) -> Option<$t> { + let (ridx, cidx) = self.next_loc()?; + let val = match &self.rows[ridx] { + SimpleQueryMessage::Row(row) => match row.try_get(cidx)? { + Some(s) => Some( + s.parse() + .map_err(|_| ConnectorXError::cannot_produce::<$t>(Some(s.into())))?, + ), + None => None, + }, + SimpleQueryMessage::CommandComplete(c) => { + panic!("get command: {}", c); + } + _ => { + panic!("what?"); + } + }; + val + } + } + )+ + }; +} + +impl_simple_produce!(i8, i16, i32, i64, f32, f64, Decimal, Uuid, bool,); +impl_simple_produce_unimplemented!( + Value, + HashMap>,); + +impl<'r> Produce<'r, &'r str> for PostgresSimpleSourceParser { + type Error = PostgresSourceError; + + #[throws(PostgresSourceError)] + fn produce(&'r mut self) -> &'r str { + let (ridx, cidx) = self.next_loc()?; + let val = match &self.rows[ridx] { + SimpleQueryMessage::Row(row) => match row.try_get(cidx)? { + Some(s) => s, + None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")), + }, + SimpleQueryMessage::CommandComplete(c) => { + panic!("get command: {}", c); + } + _ => { + panic!("what?"); + } + }; + val + } +} + +impl<'r, 'a> Produce<'r, Option<&'r str>> for PostgresSimpleSourceParser { + type Error = PostgresSourceError; + + #[throws(PostgresSourceError)] + fn produce(&'r mut self) -> Option<&'r str> { + let (ridx, cidx) = self.next_loc()?; + let val = match &self.rows[ridx] { + SimpleQueryMessage::Row(row) => row.try_get(cidx)?, + SimpleQueryMessage::CommandComplete(c) => { + panic!("get command: {}", c); + } + _ => { + panic!("what?"); + } + }; + val + } +} + +impl<'r> Produce<'r, Vec> for PostgresSimpleSourceParser { + type Error = PostgresSourceError; + + #[throws(PostgresSourceError)] + fn produce(&'r mut self) -> Vec { + let (ridx, cidx) = self.next_loc()?; + let val = match &self.rows[ridx] { + SimpleQueryMessage::Row(row) => match row.try_get(cidx)? { + Some(s) => { + let mut res = s.chars(); + res.next(); + res.next(); + decode( + res.enumerate() + .fold(String::new(), |acc, (_i, c)| format!("{}{}", acc, c)) + .chars() + .map(|c| c as u8) + .collect::>(), + )? + } + None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")), + }, + SimpleQueryMessage::CommandComplete(c) => { + panic!("get command: {}", c); + } + _ => { + panic!("what?"); + } + }; + val + } +} + +impl<'r, 'a> Produce<'r, Option>> for PostgresSimpleSourceParser { + type Error = PostgresSourceError; + + #[throws(PostgresSourceError)] + fn produce(&'r mut self) -> Option> { + let (ridx, cidx) = self.next_loc()?; + let val = match &self.rows[ridx] { + SimpleQueryMessage::Row(row) => match row.try_get(cidx)? { + Some(s) => { + let mut res = s.chars(); + res.next(); + res.next(); + Some(decode( + res.enumerate() + .fold(String::new(), |acc, (_i, c)| format!("{}{}", acc, c)) + .chars() + .map(|c| c as u8) + .collect::>(), + )?) + } + None => None, + }, + SimpleQueryMessage::CommandComplete(c) => { + panic!("get command: {}", c); + } + _ => { + panic!("what?"); + } + }; + val + } +} + +fn rem_first_and_last(value: &str) -> &str { + let mut chars = value.chars(); + chars.next(); + chars.next_back(); + chars.as_str() +} + +macro_rules! impl_simple_vec_produce { + ($($t: ty,)+) => { + $( + impl<'r> Produce<'r, Vec<$t>> for PostgresSimpleSourceParser { + type Error = PostgresSourceError; + + #[throws(PostgresSourceError)] + fn produce(&'r mut self) -> Vec<$t> { + let (ridx, cidx) = self.next_loc()?; + let val = match &self.rows[ridx] { + SimpleQueryMessage::Row(row) => match row.try_get(cidx)? { + Some(s) => match s{ + "" => throw!(anyhow!("Cannot parse NULL in non-NULL column.")), + "{}" => vec![], + _ => rem_first_and_last(s).split(",").map(|token| token.parse().map_err(|_| ConnectorXError::cannot_produce::>(Some(s.into())))).collect::, ConnectorXError>>()? + }, + None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")), + }, + SimpleQueryMessage::CommandComplete(c) => { + panic!("get command: {}", c); + } + _ => { + panic!("what?"); + } + }; + val + } + } + + impl<'r, 'a> Produce<'r, Option>> for PostgresSimpleSourceParser { + type Error = PostgresSourceError; + + #[throws(PostgresSourceError)] + fn produce(&'r mut self) -> Option> { + let (ridx, cidx) = self.next_loc()?; + let val = match &self.rows[ridx] { + + SimpleQueryMessage::Row(row) => match row.try_get(cidx)? { + Some(s) => match s{ + "" => None, + "{}" => Some(vec![]), + _ => Some(rem_first_and_last(s).split(",").map(|token| token.parse().map_err(|_| ConnectorXError::cannot_produce::>(Some(s.into())))).collect::, ConnectorXError>>()?) + }, + None => None, + }, + + SimpleQueryMessage::CommandComplete(c) => { + panic!("get command: {}", c); + } + _ => { + panic!("what?"); + } + }; + val + } + } + )+ + }; +} +impl_simple_vec_produce!(i16, i32, i64, f32, f64, Decimal,); + +impl<'r> Produce<'r, NaiveDate> for PostgresSimpleSourceParser { + type Error = PostgresSourceError; + + #[throws(PostgresSourceError)] + fn produce(&'r mut self) -> NaiveDate { + let (ridx, cidx) = self.next_loc()?; + let val = match &self.rows[ridx] { + SimpleQueryMessage::Row(row) => match row.try_get(cidx)? { + Some(s) => NaiveDate::parse_from_str(s, "%Y-%m-%d") + .map_err(|_| ConnectorXError::cannot_produce::(Some(s.into())))?, + None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")), + }, + SimpleQueryMessage::CommandComplete(c) => { + panic!("get command: {}", c); + } + _ => { + panic!("what?"); + } + }; + val + } +} + +impl<'r> Produce<'r, Option> for PostgresSimpleSourceParser { + type Error = PostgresSourceError; + + #[throws(PostgresSourceError)] + fn produce(&'r mut self) -> Option { + let (ridx, cidx) = self.next_loc()?; + let val = match &self.rows[ridx] { + SimpleQueryMessage::Row(row) => match row.try_get(cidx)? { + Some(s) => Some(NaiveDate::parse_from_str(s, "%Y-%m-%d").map_err(|_| { + ConnectorXError::cannot_produce::>(Some(s.into())) + })?), + None => None, + }, + SimpleQueryMessage::CommandComplete(c) => { + panic!("get command: {}", c); + } + _ => { + panic!("what?"); + } + }; + val + } +} + +impl<'r> Produce<'r, NaiveTime> for PostgresSimpleSourceParser { + type Error = PostgresSourceError; + + #[throws(PostgresSourceError)] + fn produce(&'r mut self) -> NaiveTime { + let (ridx, cidx) = self.next_loc()?; + let val = match &self.rows[ridx] { + SimpleQueryMessage::Row(row) => match row.try_get(cidx)? { + Some(s) => NaiveTime::parse_from_str(s, "%H:%M:%S") + .map_err(|_| ConnectorXError::cannot_produce::(Some(s.into())))?, + None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")), + }, + SimpleQueryMessage::CommandComplete(c) => { + panic!("get command: {}", c); + } + _ => { + panic!("what?"); + } + }; + val + } +} + +impl<'r> Produce<'r, Option> for PostgresSimpleSourceParser { + type Error = PostgresSourceError; + + #[throws(PostgresSourceError)] + fn produce(&'r mut self) -> Option { + let (ridx, cidx) = self.next_loc()?; + let val = match &self.rows[ridx] { + SimpleQueryMessage::Row(row) => match row.try_get(cidx)? { + Some(s) => Some(NaiveTime::parse_from_str(s, "%H:%M:%S").map_err(|_| { + ConnectorXError::cannot_produce::>(Some(s.into())) + })?), + None => None, + }, + SimpleQueryMessage::CommandComplete(c) => { + panic!("get command: {}", c); + } + _ => { + panic!("what?"); + } + }; + val + } +} + +impl<'r> Produce<'r, NaiveDateTime> for PostgresSimpleSourceParser { + type Error = PostgresSourceError; + + #[throws(PostgresSourceError)] + fn produce(&'r mut self) -> NaiveDateTime { + let (ridx, cidx) = self.next_loc()?; + let val = match &self.rows[ridx] { + SimpleQueryMessage::Row(row) => match row.try_get(cidx)? { + Some(s) => NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S").map_err(|_| { + ConnectorXError::cannot_produce::(Some(s.into())) + })?, + None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")), + }, + SimpleQueryMessage::CommandComplete(c) => { + panic!("get command: {}", c); + } + _ => { + panic!("what?"); + } + }; + val + } +} + +impl<'r> Produce<'r, Option> for PostgresSimpleSourceParser { + type Error = PostgresSourceError; + + #[throws(PostgresSourceError)] + fn produce(&'r mut self) -> Option { + let (ridx, cidx) = self.next_loc()?; + let val = match &self.rows[ridx] { + SimpleQueryMessage::Row(row) => match row.try_get(cidx)? { + Some(s) => Some( + NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S").map_err(|_| { + ConnectorXError::cannot_produce::>(Some(s.into())) + })?, + ), + None => None, + }, + SimpleQueryMessage::CommandComplete(c) => { + panic!("get command: {}", c); + } + _ => { + panic!("what?"); + } + }; + val + } +} + +impl<'r> Produce<'r, DateTime> for PostgresSimpleSourceParser { + type Error = PostgresSourceError; + + #[throws(PostgresSourceError)] + fn produce(&'r mut self) -> DateTime { + let (ridx, cidx) = self.next_loc()?; + let val = match &self.rows[ridx] { + SimpleQueryMessage::Row(row) => match row.try_get(cidx)? { + Some(s) => { + let time_string = format!("{}:00", s).to_owned(); + let slice: &str = &time_string[..]; + let time: DateTime = + DateTime::parse_from_str(slice, "%Y-%m-%d %H:%M:%S%:z").unwrap(); + + time.with_timezone(&Utc) + } + None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")), + }, + SimpleQueryMessage::CommandComplete(c) => { + panic!("get command: {}", c); + } + _ => { + panic!("what?"); + } + }; + val + } +} + +impl<'r> Produce<'r, Option>> for PostgresSimpleSourceParser { + type Error = PostgresSourceError; + + #[throws(PostgresSourceError)] + fn produce(&'r mut self) -> Option> { + let (ridx, cidx) = self.next_loc()?; + let val = match &self.rows[ridx] { + SimpleQueryMessage::Row(row) => match row.try_get(cidx)? { + Some(s) => { + let time_string = format!("{}:00", s).to_owned(); + let slice: &str = &time_string[..]; + let time: DateTime = + DateTime::parse_from_str(slice, "%Y-%m-%d %H:%M:%S%:z").unwrap(); + + Some(time.with_timezone(&Utc)) + } + None => None, + }, + SimpleQueryMessage::CommandComplete(c) => { + panic!("get command: {}", c); + } + _ => { + panic!("what?"); + } + }; + val + } +} diff --git a/connectorx/src/transports/postgres_arrow.rs b/connectorx/src/transports/postgres_arrow.rs index c8e9b4677a..fabee32d4f 100644 --- a/connectorx/src/transports/postgres_arrow.rs +++ b/connectorx/src/transports/postgres_arrow.rs @@ -5,7 +5,7 @@ use crate::destinations::arrow::{ }; use crate::sources::postgres::{ BinaryProtocol, CSVProtocol, CursorProtocol, PostgresSource, PostgresSourceError, - PostgresTypeSystem, + PostgresTypeSystem, SimpleProtocol, }; use crate::typesystem::TypeConversion; use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc}; @@ -68,6 +68,8 @@ impl_postgres_transport!(CSVProtocol, NoTls); impl_postgres_transport!(CSVProtocol, MakeTlsConnector); impl_postgres_transport!(CursorProtocol, NoTls); impl_postgres_transport!(CursorProtocol, MakeTlsConnector); +impl_postgres_transport!(SimpleProtocol, NoTls); +impl_postgres_transport!(SimpleProtocol, MakeTlsConnector); impl TypeConversion for PostgresArrowTransport { fn convert(val: Uuid) -> String { diff --git a/connectorx/src/transports/postgres_arrow2.rs b/connectorx/src/transports/postgres_arrow2.rs index d27a633403..a46b6fd28f 100644 --- a/connectorx/src/transports/postgres_arrow2.rs +++ b/connectorx/src/transports/postgres_arrow2.rs @@ -5,7 +5,7 @@ use crate::destinations::arrow2::{ }; use crate::sources::postgres::{ BinaryProtocol, CSVProtocol, CursorProtocol, PostgresSource, PostgresSourceError, - PostgresTypeSystem, + PostgresTypeSystem, SimpleProtocol, }; use crate::typesystem::TypeConversion; use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc}; @@ -77,6 +77,8 @@ impl_postgres_transport!(CSVProtocol, NoTls); impl_postgres_transport!(CSVProtocol, MakeTlsConnector); impl_postgres_transport!(CursorProtocol, NoTls); impl_postgres_transport!(CursorProtocol, MakeTlsConnector); +impl_postgres_transport!(SimpleProtocol, NoTls); +impl_postgres_transport!(SimpleProtocol, MakeTlsConnector); impl TypeConversion for PostgresArrow2Transport { fn convert(val: Uuid) -> String {