From 96f45136e979836a3e45470b658c37a92f5b8525 Mon Sep 17 00:00:00 2001 From: Alec Wang Date: Mon, 31 Oct 2022 03:33:30 -0700 Subject: [PATCH 1/5] added simple query implementation with tests --- .../connectorx/tests/test_postgres.py | 104 +++ .../src/pandas/transports/postgres.rs | 5 +- connectorx/src/get_arrow.rs | 30 + connectorx/src/get_arrow2.rs | 31 + connectorx/src/sources/postgres/mod.rs | 747 +++++++++++++++++- connectorx/src/transports/postgres_arrow.rs | 4 +- connectorx/src/transports/postgres_arrow2.rs | 4 +- 7 files changed, 921 insertions(+), 4 deletions(-) diff --git a/connectorx-python/connectorx/tests/test_postgres.py b/connectorx-python/connectorx/tests/test_postgres.py index eb424a91ce..83140c5bf6 100644 --- a/connectorx-python/connectorx/tests/test_postgres.py +++ b/connectorx-python/connectorx/tests/test_postgres.py @@ -767,6 +767,110 @@ def test_postgres_types_cursor(postgres_url: str) -> None: assert_frame_equal(df, expected, check_names=True) +def test_types_simple(postgres_url: str) -> None: + query = "SELECT test_date, test_timestamp, test_timestamptz, test_int16, test_int64, test_float32, test_numeric, test_bpchar, test_char, test_varchar, test_uuid, test_time, test_json, test_jsonb, test_bytea, test_enum::text, test_f4array, test_f8array, test_narray, test_i2array, test_i4array, test_i8array FROM test_types" + df = read_sql(postgres_url, query, protocol="simple") + expected = pd.DataFrame( + index=range(4), + data={ + "test_date": pd.Series( + ["1970-01-01", "2000-02-28", "2038-01-18", None], dtype="datetime64[ns]" + ), + "test_timestamp": pd.Series( + [ + "1970-01-01 00:00:01", + "2000-02-28 12:00:10", + "2038-01-18 23:59:59", + None, + ], + dtype="datetime64[ns]", + ), + "test_timestamptz": pd.Series( + [ + "1970-01-01 00:00:01", + "2000-02-28 16:00:10", + "2038-01-18 15:59:59", + None, + ], + dtype="datetime64[ns]", + ), + "test_int16": pd.Series([0, 1, 2, 3], dtype="Int64"), + "test_int64": pd.Series( + [-9223372036854775808, 0, 9223372036854775807, None], dtype="Int64" + ), + "test_float32": pd.Series( + [None, 3.1415926535, 2.71, -1e-37], dtype="float64" + ), + "test_numeric": pd.Series([None, 521.34, 999.99, 0.00], dtype="float64"), + "test_bpchar": pd.Series(["a ", "bb ", "ccc ", None], dtype="object"), + "test_char": pd.Series(["a", "b", None, "d"], dtype="object"), + "test_varchar": pd.Series([None, "bb", "c", "defghijklm"], dtype="object"), + "test_uuid": pd.Series( + [ + "86b494cc-96b2-11eb-9298-3e22fbb9fe9d", + "86b49b84-96b2-11eb-9298-3e22fbb9fe9d", + "86b49c42-96b2-11eb-9298-3e22fbb9fe9d", + None, + ], + dtype="object", + ), + "test_time": pd.Series( + ["08:12:40", None, "23:00:10", "18:30:00"], dtype="object" + ), + "test_json": pd.Series( + [ + '{"customer":"John Doe","items":{"product":"Beer","qty":6}}', + '{"customer":"Lily Bush","items":{"product":"Diaper","qty":24}}', + '{"customer":"Josh William","items":{"product":"Toy Car","qty":1}}', + None, + ], + dtype="object", + ), + "test_jsonb": pd.Series( + [ + '{"qty":6,"product":"Beer"}', + '{"qty":24,"product":"Diaper"}', + '{"qty":1,"product":"Toy Car"}', + None, + ], + dtype="object", + ), + "test_bytea": pd.Series( + [ + None, + b"\xd0\x97\xd0\xb4\xd1\x80\xd0\xb0\xcc\x81\xd0\xb2\xd1\x81\xd1\x82\xd0\xb2\xd1\x83\xd0\xb9\xd1\x82\xd0\xb5", + b"", + b"\xf0\x9f\x98\x9c", + ], + dtype="object", + ), + "test_enum": pd.Series( + ["happy", "very happy", "ecstatic", None], dtype="object" + ), + "test_f4array": pd.Series( + [[], None, [123.123], [-1e-37, 1e37]], dtype="object" + ), + "test_f8array": pd.Series( + [[], None, [1e-307, 1e308], [0.000234, -12.987654321]], dtype="object" + ), + "test_narray": pd.Series( + [[], None, [521.34], [0.12, 333.33, 22.22]], dtype="object" + ), + "test_i2array": pd.Series( + [[-1, 0, 1], [], [-32768, 32767], None], dtype="object" + ), + "test_i4array": pd.Series( + [[-1, 0, 1123], [], [-2147483648, 2147483647], None], dtype="object" + ), + "test_i8array": pd.Series( + [[-9223372036854775808, 9223372036854775807], [], [0], None], + dtype="object", + ), + }, + ) + assert_frame_equal(df, expected, check_names=True) + + def test_empty_result(postgres_url: str) -> None: query = "SELECT * FROM test_table where test_int < -100" df = read_sql(postgres_url, query) diff --git a/connectorx-python/src/pandas/transports/postgres.rs b/connectorx-python/src/pandas/transports/postgres.rs index d156afeb87..f5dcd2285e 100644 --- a/connectorx-python/src/pandas/transports/postgres.rs +++ b/connectorx-python/src/pandas/transports/postgres.rs @@ -4,7 +4,8 @@ use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc}; use connectorx::{ impl_transport, sources::postgres::{ - BinaryProtocol, CSVProtocol, CursorProtocol, PostgresSource, PostgresTypeSystem, + BinaryProtocol, CSVProtocol, CursorProtocol, SimpleProtocol, + PostgresSource, PostgresTypeSystem, }, typesystem::TypeConversion, }; @@ -64,6 +65,8 @@ impl_postgres_transport!(CSVProtocol, NoTls); impl_postgres_transport!(CSVProtocol, MakeTlsConnector); impl_postgres_transport!(CursorProtocol, NoTls); impl_postgres_transport!(CursorProtocol, MakeTlsConnector); +impl_postgres_transport!(SimpleProtocol, NoTls); +impl_postgres_transport!(SimpleProtocol, MakeTlsConnector); impl<'py, P, C> TypeConversion>, String> for PostgresPandasTransport<'py, P, C> diff --git a/connectorx/src/get_arrow.rs b/connectorx/src/get_arrow.rs index ba4a61d190..0356818150 100644 --- a/connectorx/src/get_arrow.rs +++ b/connectorx/src/get_arrow.rs @@ -3,6 +3,7 @@ use crate::sources::mysql::{BinaryProtocol as MySQLBinaryProtocol, TextProtocol} #[cfg(feature = "src_postgres")] use crate::sources::postgres::{ rewrite_tls_args, BinaryProtocol as PgBinaryProtocol, CSVProtocol, CursorProtocol, + SimpleProtocol, }; use crate::{prelude::*, sql::CXQuery}; use fehler::{throw, throws}; @@ -114,6 +115,35 @@ pub fn get_arrow( ); dispatcher.run()?; } + ("simple", Some(tls_conn)) => { + let sb = PostgresSource::::new( + config, + tls_conn, + queries.len(), + )?; + let dispatcher = Dispatcher::< + _, + _, + PostgresArrowTransport, + >::new( + sb, &mut destination, queries, origin_query + ); + debug!("Running dispatcher"); + dispatcher.run()?; + } + ("simple", None) => { + let sb = + PostgresSource::::new(config, NoTls, queries.len())?; + let dispatcher = Dispatcher::< + _, + _, + PostgresArrowTransport, + >::new( + sb, &mut destination, queries, origin_query + ); + debug!("Running dispatcher"); + dispatcher.run()?; + } _ => unimplemented!("{} protocol not supported", protocol), } } diff --git a/connectorx/src/get_arrow2.rs b/connectorx/src/get_arrow2.rs index dc91611bf9..cecf6ea77d 100644 --- a/connectorx/src/get_arrow2.rs +++ b/connectorx/src/get_arrow2.rs @@ -3,6 +3,7 @@ use crate::sources::mysql::{BinaryProtocol as MySQLBinaryProtocol, TextProtocol} #[cfg(feature = "src_postgres")] use crate::sources::postgres::{ rewrite_tls_args, BinaryProtocol as PgBinaryProtocol, CSVProtocol, CursorProtocol, + SimpleProtocol, }; use crate::{prelude::*, sql::CXQuery}; use fehler::{throw, throws}; @@ -114,6 +115,36 @@ pub fn get_arrow2( ); dispatcher.run()?; } + ("simple", Some(tls_conn)) => { + let sb = PostgresSource::::new( + config, + tls_conn, + queries.len(), + )?; + let dispatcher = Dispatcher::< + _, + _, + PostgresArrowTransport, + >::new( + sb, &mut destination, queries, origin_query + ); + debug!("Running dispatcher"); + dispatcher.run()?; + } + ("simple", None) => { + let sb = + PostgresSource::::new(config, NoTls, queries.len())?; + let dispatcher = Dispatcher::< + _, + _, + PostgresArrowTransport, + >::new( + sb, &mut destination, queries, origin_query + ); + debug!("Running dispatcher"); + dispatcher.run()?; + } + _ => unimplemented!("{} protocol not supported", protocol), } } diff --git a/connectorx/src/sources/postgres/mod.rs b/connectorx/src/sources/postgres/mod.rs index 5071769905..cc68556623 100644 --- a/connectorx/src/sources/postgres/mod.rs +++ b/connectorx/src/sources/postgres/mod.rs @@ -24,7 +24,7 @@ use postgres::{ binary_copy::{BinaryCopyOutIter, BinaryCopyOutRow}, fallible_iterator::FallibleIterator, tls::{MakeTlsConnect, TlsConnect}, - Config, CopyOutReader, Row, RowIter, Socket, + Config, CopyOutReader, Row, RowIter, SimpleQueryMessage, Socket, }; use r2d2::{Pool, PooledConnection}; use r2d2_postgres::PostgresConnectionManager; @@ -45,6 +45,9 @@ pub enum CSVProtocol {} /// Protocol - use Cursor pub enum CursorProtocol {} +/// Protocol - use Simple Query +pub enum SimpleProtocol {} + type PgManager = PostgresConnectionManager; type PgConn = PooledConnection>; @@ -974,3 +977,745 @@ impl_produce!( Value, HashMap>, ); + +impl SourcePartition for PostgresSourcePartition +where + C: MakeTlsConnect + Clone + 'static + Sync + Send, + C::TlsConnect: Send, + C::Stream: Send, + >::Future: Send, +{ + type TypeSystem = PostgresTypeSystem; + type Parser<'a> = PostgresSimpleSourceParser; + type Error = PostgresSourceError; + + #[throws(PostgresSourceError)] + fn result_rows(&mut self) { + self.nrows = get_total_rows(&mut self.conn, &self.query)?; + } + + #[throws(PostgresSourceError)] + fn parser(&mut self) -> Self::Parser<'_> { + let rows = self.conn.simple_query(self.query.as_str())?; // unless reading the data, it seems like issue the query is fast + PostgresSimpleSourceParser::new(rows, &self.schema) + } + + fn nrows(&self) -> usize { + self.nrows + } + + fn ncols(&self) -> usize { + self.ncols + } +} + +pub struct PostgresSimpleSourceParser { + rows: Vec, + ncols: usize, + current_col: usize, + current_row: usize, +} +impl<'a> PostgresSimpleSourceParser { + pub fn new(rows: Vec, schema: &[PostgresTypeSystem]) -> Self { + Self { + rows, + ncols: schema.len(), + current_row: 0, + current_col: 0, + } + } + + #[throws(PostgresSourceError)] + fn next_loc(&mut self) -> (usize, usize) { + let ret = (self.current_row, self.current_col); + self.current_row += (self.current_col + 1) / self.ncols; + self.current_col = (self.current_col + 1) % self.ncols; + ret + } +} + +impl<'a> PartitionParser<'a> for PostgresSimpleSourceParser { + type TypeSystem = PostgresTypeSystem; + type Error = PostgresSourceError; + + #[throws(PostgresSourceError)] + fn fetch_next(&mut self) -> (usize, bool) { + self.current_row = 0; + self.current_col = 0; + (self.rows.len() - 1, true) // last message is command complete + } +} + +macro_rules! impl_simple_produce_unimplemented { + ($($t: ty,)+) => { + $( + impl<'r, 'a> Produce<'r, $t> for PostgresSimpleSourceParser { + type Error = PostgresSourceError; + + #[throws(PostgresSourceError)] + fn produce(&'r mut self) -> $t { + unimplemented!("not implemented!"); + } + } + + impl<'r, 'a> Produce<'r, Option<$t>> for PostgresSimpleSourceParser { + type Error = PostgresSourceError; + + #[throws(PostgresSourceError)] + fn produce(&'r mut self) -> Option<$t> { + unimplemented!("not implemented!"); + } + } + )+ + }; +} + +macro_rules! impl_simple_produce { + ($($t: ty,)+) => { + $( + impl<'r> Produce<'r, $t> for PostgresSimpleSourceParser { + type Error = PostgresSourceError; + + #[throws(PostgresSourceError)] + fn produce(&'r mut self) -> $t { + let (ridx, cidx) = self.next_loc()?; + let val = match &self.rows[ridx] { + SimpleQueryMessage::Row(row) => match row.try_get(cidx)? { + Some(s) => s + .parse() + .map_err(|_| ConnectorXError::cannot_produce::<$t>(Some(s.into())))?, + None => throw!(anyhow!( + "Cannot parse NULL in NOT NULL column." + )), + }, + SimpleQueryMessage::CommandComplete(c) => { + panic!("get command: {}", c); + } + _ => { + panic!("what?"); + } + }; + val + } + } + + impl<'r, 'a> Produce<'r, Option<$t>> for PostgresSimpleSourceParser { + type Error = PostgresSourceError; + + #[throws(PostgresSourceError)] + fn produce(&'r mut self) -> Option<$t> { + let (ridx, cidx) = self.next_loc()?; + let val = match &self.rows[ridx] { + SimpleQueryMessage::Row(row) => match row.try_get(cidx)? { + Some(s) => Some( + s.parse() + .map_err(|_| ConnectorXError::cannot_produce::<$t>(Some(s.into())))?, + ), + None => None, + }, + SimpleQueryMessage::CommandComplete(c) => { + panic!("get command: {}", c); + } + _ => { + panic!("what?"); + } + }; + val + } + } + )+ + }; +} + +impl_simple_produce!(i8, i16, i32, i64, f32, f64, Decimal, Uuid, bool,); +impl_simple_produce_unimplemented!( + Value, + HashMap>,); + +impl<'r> Produce<'r, &'r str> for PostgresSimpleSourceParser { + type Error = PostgresSourceError; + + #[throws(PostgresSourceError)] + fn produce(&'r mut self) -> &'r str { + let (ridx, cidx) = self.next_loc()?; + let val = match &self.rows[ridx] { + SimpleQueryMessage::Row(row) => match row.try_get(cidx)? { + Some(s) => s, + None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")), + }, + SimpleQueryMessage::CommandComplete(c) => { + panic!("get command: {}", c); + } + _ => { + panic!("what?"); + } + }; + val + } +} + +impl<'r, 'a> Produce<'r, Option<&'r str>> for PostgresSimpleSourceParser { + type Error = PostgresSourceError; + + #[throws(PostgresSourceError)] + fn produce(&'r mut self) -> Option<&'r str> { + let (ridx, cidx) = self.next_loc()?; + let val = match &self.rows[ridx] { + SimpleQueryMessage::Row(row) => row.try_get(cidx)?, + SimpleQueryMessage::CommandComplete(c) => { + panic!("get command: {}", c); + } + _ => { + panic!("what?"); + } + }; + val + } +} + +impl<'r> Produce<'r, Vec> for PostgresSimpleSourceParser { + type Error = PostgresSourceError; + + #[throws(PostgresSourceError)] + fn produce(&'r mut self) -> Vec { + let (ridx, cidx) = self.next_loc()?; + let val = match &self.rows[ridx] { + SimpleQueryMessage::Row(row) => match row.try_get(cidx)? { + Some(s) => { + let res: Vec = s.chars().map(|c| c as u8).collect::>(); + res + } + None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")), + }, + SimpleQueryMessage::CommandComplete(c) => { + panic!("get command: {}", c); + } + _ => { + panic!("what?"); + } + }; + val + } +} + +impl<'r, 'a> Produce<'r, Option>> for PostgresSimpleSourceParser { + type Error = PostgresSourceError; + + #[throws(PostgresSourceError)] + fn produce(&'r mut self) -> Option> { + let (ridx, cidx) = self.next_loc()?; + let val = match &self.rows[ridx] { + SimpleQueryMessage::Row(row) => match row.try_get(cidx)? { + Some(s) => { + let res: Vec = s.chars().map(|c| c as u8).collect::>(); + Option::Some(res) + } + None => None, + }, + SimpleQueryMessage::CommandComplete(c) => { + panic!("get command: {}", c); + } + _ => { + panic!("what?"); + } + }; + val + } +} + +fn rem_first_and_last(value: &str) -> &str { + let mut chars = value.chars(); + chars.next(); + chars.next_back(); + chars.as_str() +} + +fn parse_input_i16(input: &str) -> Result, std::num::ParseIntError> { + input.split(",").map(|token| token.parse()).collect() +} + +impl<'r> Produce<'r, Vec> for PostgresSimpleSourceParser { + type Error = PostgresSourceError; + + #[throws(PostgresSourceError)] + fn produce(&'r mut self) -> Vec { + let (ridx, cidx) = self.next_loc()?; + let val = match &self.rows[ridx] { + SimpleQueryMessage::Row(row) => match row.try_get(cidx)? { + Some(s) => parse_input_i16(rem_first_and_last(s)).unwrap(), + None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")), + }, + SimpleQueryMessage::CommandComplete(c) => { + panic!("get command: {}", c); + } + _ => { + panic!("what?"); + } + }; + val + } +} + +impl<'r, 'a> Produce<'r, Option>> for PostgresSimpleSourceParser { + type Error = PostgresSourceError; + + #[throws(PostgresSourceError)] + fn produce(&'r mut self) -> Option> { + let (ridx, cidx) = self.next_loc()?; + let val = match &self.rows[ridx] { + SimpleQueryMessage::Row(row) => match row.try_get(cidx)? { + Some(s) => Option::Some(parse_input_i16(rem_first_and_last(s)).unwrap()), + None => None, + }, + SimpleQueryMessage::CommandComplete(c) => { + panic!("get command: {}", c); + } + _ => { + panic!("what?"); + } + }; + val + } +} + +fn parse_input_i32(input: &str) -> Result, std::num::ParseIntError> { + input.split(",").map(|token| token.parse()).collect() +} + +impl<'r> Produce<'r, Vec> for PostgresSimpleSourceParser { + type Error = PostgresSourceError; + + #[throws(PostgresSourceError)] + fn produce(&'r mut self) -> Vec { + let (ridx, cidx) = self.next_loc()?; + let val = match &self.rows[ridx] { + SimpleQueryMessage::Row(row) => match row.try_get(cidx)? { + Some(s) => parse_input_i32(rem_first_and_last(s)).unwrap(), + None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")), + }, + SimpleQueryMessage::CommandComplete(c) => { + panic!("get command: {}", c); + } + _ => { + panic!("what?"); + } + }; + val + } +} + +impl<'r, 'a> Produce<'r, Option>> for PostgresSimpleSourceParser { + type Error = PostgresSourceError; + + #[throws(PostgresSourceError)] + fn produce(&'r mut self) -> Option> { + let (ridx, cidx) = self.next_loc()?; + let val = match &self.rows[ridx] { + SimpleQueryMessage::Row(row) => match row.try_get(cidx)? { + Some(s) => Option::Some(parse_input_i32(rem_first_and_last(s)).unwrap()), + None => None, + }, + SimpleQueryMessage::CommandComplete(c) => { + panic!("get command: {}", c); + } + _ => { + panic!("what?"); + } + }; + val + } +} + +fn parse_input_i64(input: &str) -> Result, std::num::ParseIntError> { + input.split(",").map(|token| token.parse()).collect() +} + +impl<'r> Produce<'r, Vec> for PostgresSimpleSourceParser { + type Error = PostgresSourceError; + + #[throws(PostgresSourceError)] + fn produce(&'r mut self) -> Vec { + let (ridx, cidx) = self.next_loc()?; + let val = match &self.rows[ridx] { + SimpleQueryMessage::Row(row) => match row.try_get(cidx)? { + Some(s) => parse_input_i64(rem_first_and_last(s)).unwrap(), + None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")), + }, + SimpleQueryMessage::CommandComplete(c) => { + panic!("get command: {}", c); + } + _ => { + panic!("what?"); + } + }; + val + } +} + +impl<'r, 'a> Produce<'r, Option>> for PostgresSimpleSourceParser { + type Error = PostgresSourceError; + + #[throws(PostgresSourceError)] + fn produce(&'r mut self) -> Option> { + let (ridx, cidx) = self.next_loc()?; + let val = match &self.rows[ridx] { + SimpleQueryMessage::Row(row) => match row.try_get(cidx)? { + Some(s) => Option::Some(parse_input_i64(rem_first_and_last(s)).unwrap()), + None => None, + }, + SimpleQueryMessage::CommandComplete(c) => { + panic!("get command: {}", c); + } + _ => { + panic!("what?"); + } + }; + val + } +} + +fn parse_input_f32(input: &str) -> Result, std::num::ParseFloatError> { + input.split(",").map(|token| token.parse()).collect() +} + +impl<'r> Produce<'r, Vec> for PostgresSimpleSourceParser { + type Error = PostgresSourceError; + + #[throws(PostgresSourceError)] + fn produce(&'r mut self) -> Vec { + let (ridx, cidx) = self.next_loc()?; + let val = match &self.rows[ridx] { + SimpleQueryMessage::Row(row) => match row.try_get(cidx)? { + Some(s) => parse_input_f32(rem_first_and_last(s)).unwrap(), + None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")), + }, + SimpleQueryMessage::CommandComplete(c) => { + panic!("get command: {}", c); + } + _ => { + panic!("what?"); + } + }; + val + } +} + +impl<'r, 'a> Produce<'r, Option>> for PostgresSimpleSourceParser { + type Error = PostgresSourceError; + + #[throws(PostgresSourceError)] + fn produce(&'r mut self) -> Option> { + let (ridx, cidx) = self.next_loc()?; + let val = match &self.rows[ridx] { + SimpleQueryMessage::Row(row) => match row.try_get(cidx)? { + Some(s) => Option::Some(parse_input_f32(rem_first_and_last(s)).unwrap()), + None => None, + }, + SimpleQueryMessage::CommandComplete(c) => { + panic!("get command: {}", c); + } + _ => { + panic!("what?"); + } + }; + val + } +} + +fn parse_input_f64(input: &str) -> Result, std::num::ParseFloatError> { + input.split(",").map(|token| token.parse()).collect() +} + +impl<'r> Produce<'r, Vec> for PostgresSimpleSourceParser { + type Error = PostgresSourceError; + + #[throws(PostgresSourceError)] + fn produce(&'r mut self) -> Vec { + let (ridx, cidx) = self.next_loc()?; + let val = match &self.rows[ridx] { + SimpleQueryMessage::Row(row) => match row.try_get(cidx)? { + Some(s) => parse_input_f64(rem_first_and_last(s)).unwrap(), + None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")), + }, + SimpleQueryMessage::CommandComplete(c) => { + panic!("get command: {}", c); + } + _ => { + panic!("what?"); + } + }; + val + } +} + +impl<'r, 'a> Produce<'r, Option>> for PostgresSimpleSourceParser { + type Error = PostgresSourceError; + + #[throws(PostgresSourceError)] + fn produce(&'r mut self) -> Option> { + let (ridx, cidx) = self.next_loc()?; + let val = match &self.rows[ridx] { + SimpleQueryMessage::Row(row) => match row.try_get(cidx)? { + Some(s) => Option::Some(parse_input_f64(rem_first_and_last(s)).unwrap()), + None => None, + }, + SimpleQueryMessage::CommandComplete(c) => { + panic!("get command: {}", c); + } + _ => { + panic!("what?"); + } + }; + val + } +} + +fn parse_input_decimal(input: &str) -> Result, rust_decimal::Error> { + input.split(",").map(|token| token.parse()).collect() +} + +impl<'r> Produce<'r, Vec> for PostgresSimpleSourceParser { + type Error = PostgresSourceError; + + #[throws(PostgresSourceError)] + fn produce(&'r mut self) -> Vec { + let (ridx, cidx) = self.next_loc()?; + let val = match &self.rows[ridx] { + SimpleQueryMessage::Row(row) => match row.try_get(cidx)? { + Some(s) => parse_input_decimal(rem_first_and_last(s)).unwrap(), + None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")), + }, + SimpleQueryMessage::CommandComplete(c) => { + panic!("get command: {}", c); + } + _ => { + panic!("what?"); + } + }; + val + } +} + +impl<'r, 'a> Produce<'r, Option>> for PostgresSimpleSourceParser { + type Error = PostgresSourceError; + + #[throws(PostgresSourceError)] + fn produce(&'r mut self) -> Option> { + let (ridx, cidx) = self.next_loc()?; + let val = match &self.rows[ridx] { + SimpleQueryMessage::Row(row) => match row.try_get(cidx)? { + Some(s) => Option::Some(parse_input_decimal(rem_first_and_last(s)).unwrap()), + None => None, + }, + SimpleQueryMessage::CommandComplete(c) => { + panic!("get command: {}", c); + } + _ => { + panic!("what?"); + } + }; + val + } +} + +impl<'r> Produce<'r, NaiveDate> for PostgresSimpleSourceParser { + type Error = PostgresSourceError; + + #[throws(PostgresSourceError)] + fn produce(&'r mut self) -> NaiveDate { + let (ridx, cidx) = self.next_loc()?; + let val = match &self.rows[ridx] { + SimpleQueryMessage::Row(row) => match row.try_get(cidx)? { + Some(s) => NaiveDate::parse_from_str(s, "%Y-%m-%d") + .map_err(|_| ConnectorXError::cannot_produce::(Some(s.into())))?, + None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")), + }, + SimpleQueryMessage::CommandComplete(c) => { + panic!("get command: {}", c); + } + _ => { + panic!("what?"); + } + }; + val + } +} + +impl<'r> Produce<'r, Option> for PostgresSimpleSourceParser { + type Error = PostgresSourceError; + + #[throws(PostgresSourceError)] + fn produce(&'r mut self) -> Option { + let (ridx, cidx) = self.next_loc()?; + let val = match &self.rows[ridx] { + SimpleQueryMessage::Row(row) => match row.try_get(cidx)? { + Some(s) => Some(NaiveDate::parse_from_str(s, "%Y-%m-%d").map_err(|_| { + ConnectorXError::cannot_produce::>(Some(s.into())) + })?), + None => None, + }, + SimpleQueryMessage::CommandComplete(c) => { + panic!("get command: {}", c); + } + _ => { + panic!("what?"); + } + }; + val + } +} + +impl<'r> Produce<'r, NaiveTime> for PostgresSimpleSourceParser { + type Error = PostgresSourceError; + + #[throws(PostgresSourceError)] + fn produce(&'r mut self) -> NaiveTime { + let (ridx, cidx) = self.next_loc()?; + let val = match &self.rows[ridx] { + SimpleQueryMessage::Row(row) => match row.try_get(cidx)? { + Some(s) => NaiveTime::parse_from_str(s, "%H:%M:%S") + .map_err(|_| ConnectorXError::cannot_produce::(Some(s.into())))?, + None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")), + }, + SimpleQueryMessage::CommandComplete(c) => { + panic!("get command: {}", c); + } + _ => { + panic!("what?"); + } + }; + val + } +} + +impl<'r> Produce<'r, Option> for PostgresSimpleSourceParser { + type Error = PostgresSourceError; + + #[throws(PostgresSourceError)] + fn produce(&'r mut self) -> Option { + let (ridx, cidx) = self.next_loc()?; + let val = match &self.rows[ridx] { + SimpleQueryMessage::Row(row) => match row.try_get(cidx)? { + Some(s) => Some(NaiveTime::parse_from_str(s, "%H:%M:%S").map_err(|_| { + ConnectorXError::cannot_produce::>(Some(s.into())) + })?), + None => None, + }, + SimpleQueryMessage::CommandComplete(c) => { + panic!("get command: {}", c); + } + _ => { + panic!("what?"); + } + }; + val + } +} + +impl<'r> Produce<'r, NaiveDateTime> for PostgresSimpleSourceParser { + type Error = PostgresSourceError; + + #[throws(PostgresSourceError)] + fn produce(&'r mut self) -> NaiveDateTime { + let (ridx, cidx) = self.next_loc()?; + let val = match &self.rows[ridx] { + SimpleQueryMessage::Row(row) => match row.try_get(cidx)? { + Some(s) => NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S").map_err(|_| { + ConnectorXError::cannot_produce::(Some(s.into())) + })?, + None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")), + }, + SimpleQueryMessage::CommandComplete(c) => { + panic!("get command: {}", c); + } + _ => { + panic!("what?"); + } + }; + val + } +} + +impl<'r> Produce<'r, Option> for PostgresSimpleSourceParser { + type Error = PostgresSourceError; + + #[throws(PostgresSourceError)] + fn produce(&'r mut self) -> Option { + let (ridx, cidx) = self.next_loc()?; + let val = match &self.rows[ridx] { + SimpleQueryMessage::Row(row) => match row.try_get(cidx)? { + Some(s) => Some( + NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S").map_err(|_| { + ConnectorXError::cannot_produce::>(Some(s.into())) + })?, + ), + None => None, + }, + SimpleQueryMessage::CommandComplete(c) => { + panic!("get command: {}", c); + } + _ => { + panic!("what?"); + } + }; + val + } +} + +impl<'r> Produce<'r, DateTime> for PostgresSimpleSourceParser { + type Error = PostgresSourceError; + + #[throws(PostgresSourceError)] + fn produce(&'r mut self) -> DateTime { + let (ridx, cidx) = self.next_loc()?; + let val = match &self.rows[ridx] { + SimpleQueryMessage::Row(row) => match row.try_get(cidx)? { + Some(s) => { + let time_string = format!("{}:00", s).to_owned(); + let slice: &str = &time_string[..]; + let time: DateTime = + DateTime::parse_from_str(slice, "%Y-%m-%d %H:%M:%S%:z").unwrap(); + + time.with_timezone(&Utc) + } + None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")), + }, + SimpleQueryMessage::CommandComplete(c) => { + panic!("get command: {}", c); + } + _ => { + panic!("what?"); + } + }; + val + } +} + +impl<'r> Produce<'r, Option>> for PostgresSimpleSourceParser { + type Error = PostgresSourceError; + + #[throws(PostgresSourceError)] + fn produce(&'r mut self) -> Option> { + let (ridx, cidx) = self.next_loc()?; + let val = match &self.rows[ridx] { + SimpleQueryMessage::Row(row) => match row.try_get(cidx)? { + Some(s) => { + let time_string = format!("{}:00", s).to_owned(); + let slice: &str = &time_string[..]; + let time: DateTime = + DateTime::parse_from_str(slice, "%Y-%m-%d %H:%M:%S%:z").unwrap(); + + Some(time.with_timezone(&Utc)) + } + None => None, + }, + SimpleQueryMessage::CommandComplete(c) => { + panic!("get command: {}", c); + } + _ => { + panic!("what?"); + } + }; + val + } +} diff --git a/connectorx/src/transports/postgres_arrow.rs b/connectorx/src/transports/postgres_arrow.rs index c8e9b4677a..fabee32d4f 100644 --- a/connectorx/src/transports/postgres_arrow.rs +++ b/connectorx/src/transports/postgres_arrow.rs @@ -5,7 +5,7 @@ use crate::destinations::arrow::{ }; use crate::sources::postgres::{ BinaryProtocol, CSVProtocol, CursorProtocol, PostgresSource, PostgresSourceError, - PostgresTypeSystem, + PostgresTypeSystem, SimpleProtocol, }; use crate::typesystem::TypeConversion; use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc}; @@ -68,6 +68,8 @@ impl_postgres_transport!(CSVProtocol, NoTls); impl_postgres_transport!(CSVProtocol, MakeTlsConnector); impl_postgres_transport!(CursorProtocol, NoTls); impl_postgres_transport!(CursorProtocol, MakeTlsConnector); +impl_postgres_transport!(SimpleProtocol, NoTls); +impl_postgres_transport!(SimpleProtocol, MakeTlsConnector); impl TypeConversion for PostgresArrowTransport { fn convert(val: Uuid) -> String { diff --git a/connectorx/src/transports/postgres_arrow2.rs b/connectorx/src/transports/postgres_arrow2.rs index d27a633403..a46b6fd28f 100644 --- a/connectorx/src/transports/postgres_arrow2.rs +++ b/connectorx/src/transports/postgres_arrow2.rs @@ -5,7 +5,7 @@ use crate::destinations::arrow2::{ }; use crate::sources::postgres::{ BinaryProtocol, CSVProtocol, CursorProtocol, PostgresSource, PostgresSourceError, - PostgresTypeSystem, + PostgresTypeSystem, SimpleProtocol, }; use crate::typesystem::TypeConversion; use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc}; @@ -77,6 +77,8 @@ impl_postgres_transport!(CSVProtocol, NoTls); impl_postgres_transport!(CSVProtocol, MakeTlsConnector); impl_postgres_transport!(CursorProtocol, NoTls); impl_postgres_transport!(CursorProtocol, MakeTlsConnector); +impl_postgres_transport!(SimpleProtocol, NoTls); +impl_postgres_transport!(SimpleProtocol, MakeTlsConnector); impl TypeConversion for PostgresArrow2Transport { fn convert(val: Uuid) -> String { From 2ff1254290bc4bac6c3acecf8cab03b99b620ae3 Mon Sep 17 00:00:00 2001 From: Alec Wang Date: Thu, 3 Nov 2022 23:10:27 -0700 Subject: [PATCH 2/5] made minor import changes --- connectorx/src/get_arrow2.rs | 4 ++-- connectorx/src/sources/postgres/mod.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/connectorx/src/get_arrow2.rs b/connectorx/src/get_arrow2.rs index cecf6ea77d..70a8bdc372 100644 --- a/connectorx/src/get_arrow2.rs +++ b/connectorx/src/get_arrow2.rs @@ -124,7 +124,7 @@ pub fn get_arrow2( let dispatcher = Dispatcher::< _, _, - PostgresArrowTransport, + PostgresArrow2Transport, >::new( sb, &mut destination, queries, origin_query ); @@ -137,7 +137,7 @@ pub fn get_arrow2( let dispatcher = Dispatcher::< _, _, - PostgresArrowTransport, + PostgresArrow2Transport, >::new( sb, &mut destination, queries, origin_query ); diff --git a/connectorx/src/sources/postgres/mod.rs b/connectorx/src/sources/postgres/mod.rs index cc68556623..8fcc699387 100644 --- a/connectorx/src/sources/postgres/mod.rs +++ b/connectorx/src/sources/postgres/mod.rs @@ -16,7 +16,7 @@ use crate::{ sql::{count_query, CXQuery}, }; use anyhow::anyhow; -use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc}; +use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, FixedOffset, Utc}; use csv::{ReaderBuilder, StringRecord, StringRecordsIntoIter}; use fehler::{throw, throws}; use hex::decode; From 640ddac0575f35f82cc0ae0f24c26085b0464266 Mon Sep 17 00:00:00 2001 From: Alec Wang Date: Fri, 4 Nov 2022 13:42:37 -0700 Subject: [PATCH 3/5] added pandas case statements for simple protocol and ran cargo fmt --- connectorx-python/src/pandas/get_meta.rs | 23 +++++++++++++++++++ connectorx-python/src/pandas/mod.rs | 29 +++++++++++++++++++++++- connectorx/src/sources/postgres/mod.rs | 2 +- 3 files changed, 52 insertions(+), 2 deletions(-) diff --git a/connectorx-python/src/pandas/get_meta.rs b/connectorx-python/src/pandas/get_meta.rs index 6397e4b515..eec2ea58db 100644 --- a/connectorx-python/src/pandas/get_meta.rs +++ b/connectorx-python/src/pandas/get_meta.rs @@ -15,6 +15,7 @@ use connectorx::{ mysql::{BinaryProtocol as MySQLBinaryProtocol, MySQLSource, TextProtocol}, postgres::{ rewrite_tls_args, BinaryProtocol as PgBinaryProtocol, CSVProtocol, CursorProtocol, + SimpleProtocol, PostgresSource, }, sqlite::SQLiteSource, @@ -107,6 +108,28 @@ pub fn get_meta<'a>(py: Python<'a>, conn: &str, protocol: &str, query: String) - debug!("Running dispatcher"); dispatcher.get_meta()?; } + ("simple", Some(tls_conn)) => { + let sb = PostgresSource::::new( + config, tls_conn, 1, + )?; + let dispatcher = Dispatcher::< + _, + _, + PostgresPandasTransport, + >::new(sb, &mut destination, queries, None); + debug!("Running dispatcher"); + dispatcher.get_meta()?; + } + ("simple", None) => { + let sb = PostgresSource::::new(config, NoTls, 1)?; + let dispatcher = Dispatcher::< + _, + _, + PostgresPandasTransport, + >::new(sb, &mut destination, queries, None); + debug!("Running dispatcher"); + dispatcher.get_meta()?; + } _ => unimplemented!("{} protocol not supported", protocol), } } diff --git a/connectorx-python/src/pandas/mod.rs b/connectorx-python/src/pandas/mod.rs index 9a2755a304..c2cadc8c9c 100644 --- a/connectorx-python/src/pandas/mod.rs +++ b/connectorx-python/src/pandas/mod.rs @@ -19,7 +19,7 @@ use connectorx::{ sources::{ mysql::{BinaryProtocol as MySQLBinaryProtocol, TextProtocol}, postgres::{ - rewrite_tls_args, BinaryProtocol as PgBinaryProtocol, CSVProtocol, CursorProtocol, + rewrite_tls_args, BinaryProtocol as PgBinaryProtocol, CSVProtocol, CursorProtocol, SimpleProtocol }, }, sql::CXQuery, @@ -130,6 +130,33 @@ pub fn write_pandas<'a>( ); dispatcher.run()?; } + ("simple", Some(tls_conn)) => { + let sb = PostgresSource::::new( + config, + tls_conn, + queries.len(), + )?; + let dispatcher = Dispatcher::< + _, + _, + PostgresPandasTransport, + >::new( + sb, &mut destination, queries, origin_query + ); + dispatcher.run()?; + } + ("simple", None) => { + let sb = + PostgresSource::::new(config, NoTls, queries.len())?; + let dispatcher = Dispatcher::< + _, + _, + PostgresPandasTransport, + >::new( + sb, &mut destination, queries, origin_query + ); + dispatcher.run()?; + } _ => unimplemented!("{} protocol not supported", protocol), } } diff --git a/connectorx/src/sources/postgres/mod.rs b/connectorx/src/sources/postgres/mod.rs index 8fcc699387..2083bdca1e 100644 --- a/connectorx/src/sources/postgres/mod.rs +++ b/connectorx/src/sources/postgres/mod.rs @@ -16,7 +16,7 @@ use crate::{ sql::{count_query, CXQuery}, }; use anyhow::anyhow; -use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, FixedOffset, Utc}; +use chrono::{DateTime, FixedOffset, NaiveDate, NaiveDateTime, NaiveTime, Utc}; use csv::{ReaderBuilder, StringRecord, StringRecordsIntoIter}; use fehler::{throw, throws}; use hex::decode; From b7a7ae969ce5d590a57486dbb5fc2b1e623414c0 Mon Sep 17 00:00:00 2001 From: Alec Wang Date: Mon, 7 Nov 2022 06:41:37 -0800 Subject: [PATCH 4/5] all tests passing and implemented macro for vectors --- .gitignore | 1 + .../connectorx/tests/test_postgres.py | 20 +- connectorx/src/sources/postgres/mod.rs | 355 ++++-------------- scripts/Dockerfile | 4 + 4 files changed, 78 insertions(+), 302 deletions(-) create mode 100644 scripts/Dockerfile diff --git a/.gitignore b/.gitignore index 81e8702ef0..4a18dd576d 100644 --- a/.gitignore +++ b/.gitignore @@ -17,3 +17,4 @@ dist benchmark.json docs/_build connectorx/examples/test.rs +docker-compose.yml diff --git a/connectorx-python/connectorx/tests/test_postgres.py b/connectorx-python/connectorx/tests/test_postgres.py index 83140c5bf6..2836f23014 100644 --- a/connectorx-python/connectorx/tests/test_postgres.py +++ b/connectorx-python/connectorx/tests/test_postgres.py @@ -768,7 +768,7 @@ def test_postgres_types_cursor(postgres_url: str) -> None: def test_types_simple(postgres_url: str) -> None: - query = "SELECT test_date, test_timestamp, test_timestamptz, test_int16, test_int64, test_float32, test_numeric, test_bpchar, test_char, test_varchar, test_uuid, test_time, test_json, test_jsonb, test_bytea, test_enum::text, test_f4array, test_f8array, test_narray, test_i2array, test_i4array, test_i8array FROM test_types" + query = "SELECT test_date, test_timestamp, test_timestamptz, test_int16, test_int64, test_float32, test_numeric, test_bpchar, test_char, test_varchar, test_uuid, test_time, test_bytea, test_enum, test_f4array, test_f8array, test_narray, test_i2array, test_i4array, test_i8array FROM test_types" df = read_sql(postgres_url, query, protocol="simple") expected = pd.DataFrame( index=range(4), @@ -817,24 +817,6 @@ def test_types_simple(postgres_url: str) -> None: "test_time": pd.Series( ["08:12:40", None, "23:00:10", "18:30:00"], dtype="object" ), - "test_json": pd.Series( - [ - '{"customer":"John Doe","items":{"product":"Beer","qty":6}}', - '{"customer":"Lily Bush","items":{"product":"Diaper","qty":24}}', - '{"customer":"Josh William","items":{"product":"Toy Car","qty":1}}', - None, - ], - dtype="object", - ), - "test_jsonb": pd.Series( - [ - '{"qty":6,"product":"Beer"}', - '{"qty":24,"product":"Diaper"}', - '{"qty":1,"product":"Toy Car"}', - None, - ], - dtype="object", - ), "test_bytea": pd.Series( [ None, diff --git a/connectorx/src/sources/postgres/mod.rs b/connectorx/src/sources/postgres/mod.rs index 2083bdca1e..dcff1208e7 100644 --- a/connectorx/src/sources/postgres/mod.rs +++ b/connectorx/src/sources/postgres/mod.rs @@ -1182,8 +1182,16 @@ impl<'r> Produce<'r, Vec> for PostgresSimpleSourceParser { let val = match &self.rows[ridx] { SimpleQueryMessage::Row(row) => match row.try_get(cidx)? { Some(s) => { - let res: Vec = s.chars().map(|c| c as u8).collect::>(); - res + let mut res = s.chars(); + res.next(); + res.next(); + decode( + res.enumerate() + .fold(String::new(), |acc, (_i, c)| format!("{}{}", acc, c)) + .chars() + .map(|c| c as u8) + .collect::>(), + )? } None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")), }, @@ -1207,8 +1215,16 @@ impl<'r, 'a> Produce<'r, Option>> for PostgresSimpleSourceParser { let val = match &self.rows[ridx] { SimpleQueryMessage::Row(row) => match row.try_get(cidx)? { Some(s) => { - let res: Vec = s.chars().map(|c| c as u8).collect::>(); - Option::Some(res) + let mut res = s.chars(); + res.next(); + res.next(); + Some(decode( + res.enumerate() + .fold(String::new(), |acc, (_i, c)| format!("{}{}", acc, c)) + .chars() + .map(|c| c as u8) + .collect::>(), + )?) } None => None, }, @@ -1230,293 +1246,66 @@ fn rem_first_and_last(value: &str) -> &str { chars.as_str() } -fn parse_input_i16(input: &str) -> Result, std::num::ParseIntError> { - input.split(",").map(|token| token.parse()).collect() -} - -impl<'r> Produce<'r, Vec> for PostgresSimpleSourceParser { - type Error = PostgresSourceError; - - #[throws(PostgresSourceError)] - fn produce(&'r mut self) -> Vec { - let (ridx, cidx) = self.next_loc()?; - let val = match &self.rows[ridx] { - SimpleQueryMessage::Row(row) => match row.try_get(cidx)? { - Some(s) => parse_input_i16(rem_first_and_last(s)).unwrap(), - None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")), - }, - SimpleQueryMessage::CommandComplete(c) => { - panic!("get command: {}", c); - } - _ => { - panic!("what?"); - } - }; - val - } -} - -impl<'r, 'a> Produce<'r, Option>> for PostgresSimpleSourceParser { - type Error = PostgresSourceError; - - #[throws(PostgresSourceError)] - fn produce(&'r mut self) -> Option> { - let (ridx, cidx) = self.next_loc()?; - let val = match &self.rows[ridx] { - SimpleQueryMessage::Row(row) => match row.try_get(cidx)? { - Some(s) => Option::Some(parse_input_i16(rem_first_and_last(s)).unwrap()), - None => None, - }, - SimpleQueryMessage::CommandComplete(c) => { - panic!("get command: {}", c); - } - _ => { - panic!("what?"); - } - }; - val - } -} - -fn parse_input_i32(input: &str) -> Result, std::num::ParseIntError> { - input.split(",").map(|token| token.parse()).collect() -} - -impl<'r> Produce<'r, Vec> for PostgresSimpleSourceParser { - type Error = PostgresSourceError; - - #[throws(PostgresSourceError)] - fn produce(&'r mut self) -> Vec { - let (ridx, cidx) = self.next_loc()?; - let val = match &self.rows[ridx] { - SimpleQueryMessage::Row(row) => match row.try_get(cidx)? { - Some(s) => parse_input_i32(rem_first_and_last(s)).unwrap(), - None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")), - }, - SimpleQueryMessage::CommandComplete(c) => { - panic!("get command: {}", c); - } - _ => { - panic!("what?"); - } - }; - val - } -} - -impl<'r, 'a> Produce<'r, Option>> for PostgresSimpleSourceParser { - type Error = PostgresSourceError; - - #[throws(PostgresSourceError)] - fn produce(&'r mut self) -> Option> { - let (ridx, cidx) = self.next_loc()?; - let val = match &self.rows[ridx] { - SimpleQueryMessage::Row(row) => match row.try_get(cidx)? { - Some(s) => Option::Some(parse_input_i32(rem_first_and_last(s)).unwrap()), - None => None, - }, - SimpleQueryMessage::CommandComplete(c) => { - panic!("get command: {}", c); - } - _ => { - panic!("what?"); - } - }; - val - } -} - -fn parse_input_i64(input: &str) -> Result, std::num::ParseIntError> { - input.split(",").map(|token| token.parse()).collect() -} - -impl<'r> Produce<'r, Vec> for PostgresSimpleSourceParser { - type Error = PostgresSourceError; - - #[throws(PostgresSourceError)] - fn produce(&'r mut self) -> Vec { - let (ridx, cidx) = self.next_loc()?; - let val = match &self.rows[ridx] { - SimpleQueryMessage::Row(row) => match row.try_get(cidx)? { - Some(s) => parse_input_i64(rem_first_and_last(s)).unwrap(), - None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")), - }, - SimpleQueryMessage::CommandComplete(c) => { - panic!("get command: {}", c); - } - _ => { - panic!("what?"); - } - }; - val - } -} - -impl<'r, 'a> Produce<'r, Option>> for PostgresSimpleSourceParser { - type Error = PostgresSourceError; - - #[throws(PostgresSourceError)] - fn produce(&'r mut self) -> Option> { - let (ridx, cidx) = self.next_loc()?; - let val = match &self.rows[ridx] { - SimpleQueryMessage::Row(row) => match row.try_get(cidx)? { - Some(s) => Option::Some(parse_input_i64(rem_first_and_last(s)).unwrap()), - None => None, - }, - SimpleQueryMessage::CommandComplete(c) => { - panic!("get command: {}", c); - } - _ => { - panic!("what?"); - } - }; - val - } -} - -fn parse_input_f32(input: &str) -> Result, std::num::ParseFloatError> { - input.split(",").map(|token| token.parse()).collect() -} - -impl<'r> Produce<'r, Vec> for PostgresSimpleSourceParser { - type Error = PostgresSourceError; - - #[throws(PostgresSourceError)] - fn produce(&'r mut self) -> Vec { - let (ridx, cidx) = self.next_loc()?; - let val = match &self.rows[ridx] { - SimpleQueryMessage::Row(row) => match row.try_get(cidx)? { - Some(s) => parse_input_f32(rem_first_and_last(s)).unwrap(), - None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")), - }, - SimpleQueryMessage::CommandComplete(c) => { - panic!("get command: {}", c); - } - _ => { - panic!("what?"); - } - }; - val - } -} - -impl<'r, 'a> Produce<'r, Option>> for PostgresSimpleSourceParser { - type Error = PostgresSourceError; - - #[throws(PostgresSourceError)] - fn produce(&'r mut self) -> Option> { - let (ridx, cidx) = self.next_loc()?; - let val = match &self.rows[ridx] { - SimpleQueryMessage::Row(row) => match row.try_get(cidx)? { - Some(s) => Option::Some(parse_input_f32(rem_first_and_last(s)).unwrap()), - None => None, - }, - SimpleQueryMessage::CommandComplete(c) => { - panic!("get command: {}", c); - } - _ => { - panic!("what?"); - } - }; - val - } -} - -fn parse_input_f64(input: &str) -> Result, std::num::ParseFloatError> { - input.split(",").map(|token| token.parse()).collect() -} - -impl<'r> Produce<'r, Vec> for PostgresSimpleSourceParser { - type Error = PostgresSourceError; - - #[throws(PostgresSourceError)] - fn produce(&'r mut self) -> Vec { - let (ridx, cidx) = self.next_loc()?; - let val = match &self.rows[ridx] { - SimpleQueryMessage::Row(row) => match row.try_get(cidx)? { - Some(s) => parse_input_f64(rem_first_and_last(s)).unwrap(), - None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")), - }, - SimpleQueryMessage::CommandComplete(c) => { - panic!("get command: {}", c); - } - _ => { - panic!("what?"); - } - }; - val - } -} - -impl<'r, 'a> Produce<'r, Option>> for PostgresSimpleSourceParser { - type Error = PostgresSourceError; +macro_rules! impl_simple_vec_produce { + ($($t: ty,)+) => { + $( + impl<'r> Produce<'r, Vec<$t>> for PostgresSimpleSourceParser { + type Error = PostgresSourceError; - #[throws(PostgresSourceError)] - fn produce(&'r mut self) -> Option> { - let (ridx, cidx) = self.next_loc()?; - let val = match &self.rows[ridx] { - SimpleQueryMessage::Row(row) => match row.try_get(cidx)? { - Some(s) => Option::Some(parse_input_f64(rem_first_and_last(s)).unwrap()), - None => None, - }, - SimpleQueryMessage::CommandComplete(c) => { - panic!("get command: {}", c); - } - _ => { - panic!("what?"); + #[throws(PostgresSourceError)] + fn produce(&'r mut self) -> Vec<$t> { + let (ridx, cidx) = self.next_loc()?; + let val = match &self.rows[ridx] { + SimpleQueryMessage::Row(row) => match row.try_get(cidx)? { + Some(s) => match s{ + "" => throw!(anyhow!("Cannot parse NULL in non-NULL column.")), + "{}" => vec![], + _ => rem_first_and_last(s).split(",").map(|token| token.parse().map_err(|_| ConnectorXError::cannot_produce::>(Some(s.into())))).collect::, ConnectorXError>>()? + }, + None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")), + }, + SimpleQueryMessage::CommandComplete(c) => { + panic!("get command: {}", c); + } + _ => { + panic!("what?"); + } + }; + val + } } - }; - val - } -} - -fn parse_input_decimal(input: &str) -> Result, rust_decimal::Error> { - input.split(",").map(|token| token.parse()).collect() -} -impl<'r> Produce<'r, Vec> for PostgresSimpleSourceParser { - type Error = PostgresSourceError; + impl<'r, 'a> Produce<'r, Option>> for PostgresSimpleSourceParser { + type Error = PostgresSourceError; - #[throws(PostgresSourceError)] - fn produce(&'r mut self) -> Vec { - let (ridx, cidx) = self.next_loc()?; - let val = match &self.rows[ridx] { - SimpleQueryMessage::Row(row) => match row.try_get(cidx)? { - Some(s) => parse_input_decimal(rem_first_and_last(s)).unwrap(), - None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")), - }, - SimpleQueryMessage::CommandComplete(c) => { - panic!("get command: {}", c); - } - _ => { - panic!("what?"); - } - }; - val - } -} + #[throws(PostgresSourceError)] + fn produce(&'r mut self) -> Option> { + let (ridx, cidx) = self.next_loc()?; + let val = match &self.rows[ridx] { -impl<'r, 'a> Produce<'r, Option>> for PostgresSimpleSourceParser { - type Error = PostgresSourceError; + SimpleQueryMessage::Row(row) => match row.try_get(cidx)? { + Some(s) => match s{ + "" => None, + "{}" => Some(vec![]), + _ => Some(rem_first_and_last(s).split(",").map(|token| token.parse().map_err(|_| ConnectorXError::cannot_produce::>(Some(s.into())))).collect::, ConnectorXError>>()?) + }, + None => None, + }, - #[throws(PostgresSourceError)] - fn produce(&'r mut self) -> Option> { - let (ridx, cidx) = self.next_loc()?; - let val = match &self.rows[ridx] { - SimpleQueryMessage::Row(row) => match row.try_get(cidx)? { - Some(s) => Option::Some(parse_input_decimal(rem_first_and_last(s)).unwrap()), - None => None, - }, - SimpleQueryMessage::CommandComplete(c) => { - panic!("get command: {}", c); - } - _ => { - panic!("what?"); + SimpleQueryMessage::CommandComplete(c) => { + panic!("get command: {}", c); + } + _ => { + panic!("what?"); + } + }; + val + } } - }; - val - } + )+ + }; } +impl_simple_vec_produce!(i16, i32, i64, f32, f64, Decimal,); impl<'r> Produce<'r, NaiveDate> for PostgresSimpleSourceParser { type Error = PostgresSourceError; diff --git a/scripts/Dockerfile b/scripts/Dockerfile new file mode 100644 index 0000000000..f5e1d1a04a --- /dev/null +++ b/scripts/Dockerfile @@ -0,0 +1,4 @@ +FROM postgres:14.1-alpine +COPY ./postgres.sql . +ENV POSTGRES_PASSWORD=postgres +ENV POSTGRES_USER=postgres \ No newline at end of file From 8c18ed0a0918b2938e958c63b7b5099d5b192a76 Mon Sep 17 00:00:00 2001 From: Alec Wang Date: Mon, 7 Nov 2022 06:44:54 -0800 Subject: [PATCH 5/5] removed testing files --- .gitignore | 1 - scripts/Dockerfile | 4 ---- 2 files changed, 5 deletions(-) delete mode 100644 scripts/Dockerfile diff --git a/.gitignore b/.gitignore index 4a18dd576d..81e8702ef0 100644 --- a/.gitignore +++ b/.gitignore @@ -17,4 +17,3 @@ dist benchmark.json docs/_build connectorx/examples/test.rs -docker-compose.yml diff --git a/scripts/Dockerfile b/scripts/Dockerfile deleted file mode 100644 index f5e1d1a04a..0000000000 --- a/scripts/Dockerfile +++ /dev/null @@ -1,4 +0,0 @@ -FROM postgres:14.1-alpine -COPY ./postgres.sql . -ENV POSTGRES_PASSWORD=postgres -ENV POSTGRES_USER=postgres \ No newline at end of file