Skip to content

Commit 2a71405

Browse files
authored
Merge pull request #387 from alswang18/add_simple_query_protocol
added simple query implementation
2 parents 6160152 + 8c18ed0 commit 2a71405

File tree

9 files changed

+744
-6
lines changed

9 files changed

+744
-6
lines changed

connectorx-python/connectorx/tests/test_postgres.py

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -769,6 +769,92 @@ def test_postgres_types_cursor(postgres_url: str) -> None:
769769
assert_frame_equal(df, expected, check_names=True)
770770

771771

772+
def test_types_simple(postgres_url: str) -> None:
773+
query = "SELECT test_date, test_timestamp, test_timestamptz, test_int16, test_int64, test_float32, test_numeric, test_bpchar, test_char, test_varchar, test_uuid, test_time, test_bytea, test_enum, test_f4array, test_f8array, test_narray, test_i2array, test_i4array, test_i8array FROM test_types"
774+
df = read_sql(postgres_url, query, protocol="simple")
775+
expected = pd.DataFrame(
776+
index=range(4),
777+
data={
778+
"test_date": pd.Series(
779+
["1970-01-01", "2000-02-28", "2038-01-18", None], dtype="datetime64[ns]"
780+
),
781+
"test_timestamp": pd.Series(
782+
[
783+
"1970-01-01 00:00:01",
784+
"2000-02-28 12:00:10",
785+
"2038-01-18 23:59:59",
786+
None,
787+
],
788+
dtype="datetime64[ns]",
789+
),
790+
"test_timestamptz": pd.Series(
791+
[
792+
"1970-01-01 00:00:01",
793+
"2000-02-28 16:00:10",
794+
"2038-01-18 15:59:59",
795+
None,
796+
],
797+
dtype="datetime64[ns]",
798+
),
799+
"test_int16": pd.Series([0, 1, 2, 3], dtype="Int64"),
800+
"test_int64": pd.Series(
801+
[-9223372036854775808, 0, 9223372036854775807, None], dtype="Int64"
802+
),
803+
"test_float32": pd.Series(
804+
[None, 3.1415926535, 2.71, -1e-37], dtype="float64"
805+
),
806+
"test_numeric": pd.Series([None, 521.34, 999.99, 0.00], dtype="float64"),
807+
"test_bpchar": pd.Series(["a ", "bb ", "ccc ", None], dtype="object"),
808+
"test_char": pd.Series(["a", "b", None, "d"], dtype="object"),
809+
"test_varchar": pd.Series([None, "bb", "c", "defghijklm"], dtype="object"),
810+
"test_uuid": pd.Series(
811+
[
812+
"86b494cc-96b2-11eb-9298-3e22fbb9fe9d",
813+
"86b49b84-96b2-11eb-9298-3e22fbb9fe9d",
814+
"86b49c42-96b2-11eb-9298-3e22fbb9fe9d",
815+
None,
816+
],
817+
dtype="object",
818+
),
819+
"test_time": pd.Series(
820+
["08:12:40", None, "23:00:10", "18:30:00"], dtype="object"
821+
),
822+
"test_bytea": pd.Series(
823+
[
824+
None,
825+
b"\xd0\x97\xd0\xb4\xd1\x80\xd0\xb0\xcc\x81\xd0\xb2\xd1\x81\xd1\x82\xd0\xb2\xd1\x83\xd0\xb9\xd1\x82\xd0\xb5",
826+
b"",
827+
b"\xf0\x9f\x98\x9c",
828+
],
829+
dtype="object",
830+
),
831+
"test_enum": pd.Series(
832+
["happy", "very happy", "ecstatic", None], dtype="object"
833+
),
834+
"test_f4array": pd.Series(
835+
[[], None, [123.123], [-1e-37, 1e37]], dtype="object"
836+
),
837+
"test_f8array": pd.Series(
838+
[[], None, [1e-307, 1e308], [0.000234, -12.987654321]], dtype="object"
839+
),
840+
"test_narray": pd.Series(
841+
[[], None, [521.34], [0.12, 333.33, 22.22]], dtype="object"
842+
),
843+
"test_i2array": pd.Series(
844+
[[-1, 0, 1], [], [-32768, 32767], None], dtype="object"
845+
),
846+
"test_i4array": pd.Series(
847+
[[-1, 0, 1123], [], [-2147483648, 2147483647], None], dtype="object"
848+
),
849+
"test_i8array": pd.Series(
850+
[[-9223372036854775808, 9223372036854775807], [], [0], None],
851+
dtype="object",
852+
),
853+
},
854+
)
855+
assert_frame_equal(df, expected, check_names=True)
856+
857+
772858
def test_empty_result(postgres_url: str) -> None:
773859
query = "SELECT * FROM test_table where test_int < -100"
774860
df = read_sql(postgres_url, query)

connectorx-python/src/pandas/get_meta.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use connectorx::{
1515
mysql::{BinaryProtocol as MySQLBinaryProtocol, MySQLSource, TextProtocol},
1616
postgres::{
1717
rewrite_tls_args, BinaryProtocol as PgBinaryProtocol, CSVProtocol, CursorProtocol,
18+
SimpleProtocol,
1819
PostgresSource,
1920
},
2021
sqlite::SQLiteSource,
@@ -107,6 +108,28 @@ pub fn get_meta<'a>(py: Python<'a>, conn: &str, protocol: &str, query: String) -
107108
debug!("Running dispatcher");
108109
dispatcher.get_meta()?;
109110
}
111+
("simple", Some(tls_conn)) => {
112+
let sb = PostgresSource::<SimpleProtocol, MakeTlsConnector>::new(
113+
config, tls_conn, 1,
114+
)?;
115+
let dispatcher = Dispatcher::<
116+
_,
117+
_,
118+
PostgresPandasTransport<SimpleProtocol, MakeTlsConnector>,
119+
>::new(sb, &mut destination, queries, None);
120+
debug!("Running dispatcher");
121+
dispatcher.get_meta()?;
122+
}
123+
("simple", None) => {
124+
let sb = PostgresSource::<SimpleProtocol, NoTls>::new(config, NoTls, 1)?;
125+
let dispatcher = Dispatcher::<
126+
_,
127+
_,
128+
PostgresPandasTransport<SimpleProtocol, NoTls>,
129+
>::new(sb, &mut destination, queries, None);
130+
debug!("Running dispatcher");
131+
dispatcher.get_meta()?;
132+
}
110133
_ => unimplemented!("{} protocol not supported", protocol),
111134
}
112135
}

connectorx-python/src/pandas/mod.rs

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use connectorx::{
1919
sources::{
2020
mysql::{BinaryProtocol as MySQLBinaryProtocol, TextProtocol},
2121
postgres::{
22-
rewrite_tls_args, BinaryProtocol as PgBinaryProtocol, CSVProtocol, CursorProtocol,
22+
rewrite_tls_args, BinaryProtocol as PgBinaryProtocol, CSVProtocol, CursorProtocol, SimpleProtocol
2323
},
2424
},
2525
sql::CXQuery,
@@ -130,6 +130,33 @@ pub fn write_pandas<'a>(
130130
);
131131
dispatcher.run()?;
132132
}
133+
("simple", Some(tls_conn)) => {
134+
let sb = PostgresSource::<SimpleProtocol, MakeTlsConnector>::new(
135+
config,
136+
tls_conn,
137+
queries.len(),
138+
)?;
139+
let dispatcher = Dispatcher::<
140+
_,
141+
_,
142+
PostgresPandasTransport<SimpleProtocol, MakeTlsConnector>,
143+
>::new(
144+
sb, &mut destination, queries, origin_query
145+
);
146+
dispatcher.run()?;
147+
}
148+
("simple", None) => {
149+
let sb =
150+
PostgresSource::<SimpleProtocol, NoTls>::new(config, NoTls, queries.len())?;
151+
let dispatcher = Dispatcher::<
152+
_,
153+
_,
154+
PostgresPandasTransport<SimpleProtocol, NoTls>,
155+
>::new(
156+
sb, &mut destination, queries, origin_query
157+
);
158+
dispatcher.run()?;
159+
}
133160
_ => unimplemented!("{} protocol not supported", protocol),
134161
}
135162
}

connectorx-python/src/pandas/transports/postgres.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@ use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc};
44
use connectorx::{
55
impl_transport,
66
sources::postgres::{
7-
BinaryProtocol, CSVProtocol, CursorProtocol, PostgresSource, PostgresTypeSystem,
7+
BinaryProtocol, CSVProtocol, CursorProtocol, SimpleProtocol,
8+
PostgresSource, PostgresTypeSystem,
89
},
910
typesystem::TypeConversion,
1011
};
@@ -64,6 +65,8 @@ impl_postgres_transport!(CSVProtocol, NoTls);
6465
impl_postgres_transport!(CSVProtocol, MakeTlsConnector);
6566
impl_postgres_transport!(CursorProtocol, NoTls);
6667
impl_postgres_transport!(CursorProtocol, MakeTlsConnector);
68+
impl_postgres_transport!(SimpleProtocol, NoTls);
69+
impl_postgres_transport!(SimpleProtocol, MakeTlsConnector);
6770

6871
impl<'py, P, C> TypeConversion<HashMap<String, Option<String>>, String>
6972
for PostgresPandasTransport<'py, P, C>

connectorx/src/get_arrow.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use crate::sources::mysql::{BinaryProtocol as MySQLBinaryProtocol, TextProtocol}
33
#[cfg(feature = "src_postgres")]
44
use crate::sources::postgres::{
55
rewrite_tls_args, BinaryProtocol as PgBinaryProtocol, CSVProtocol, CursorProtocol,
6+
SimpleProtocol,
67
};
78
use crate::{prelude::*, sql::CXQuery};
89
use fehler::{throw, throws};
@@ -114,6 +115,35 @@ pub fn get_arrow(
114115
);
115116
dispatcher.run()?;
116117
}
118+
("simple", Some(tls_conn)) => {
119+
let sb = PostgresSource::<SimpleProtocol, MakeTlsConnector>::new(
120+
config,
121+
tls_conn,
122+
queries.len(),
123+
)?;
124+
let dispatcher = Dispatcher::<
125+
_,
126+
_,
127+
PostgresArrowTransport<SimpleProtocol, MakeTlsConnector>,
128+
>::new(
129+
sb, &mut destination, queries, origin_query
130+
);
131+
debug!("Running dispatcher");
132+
dispatcher.run()?;
133+
}
134+
("simple", None) => {
135+
let sb =
136+
PostgresSource::<SimpleProtocol, NoTls>::new(config, NoTls, queries.len())?;
137+
let dispatcher = Dispatcher::<
138+
_,
139+
_,
140+
PostgresArrowTransport<SimpleProtocol, NoTls>,
141+
>::new(
142+
sb, &mut destination, queries, origin_query
143+
);
144+
debug!("Running dispatcher");
145+
dispatcher.run()?;
146+
}
117147
_ => unimplemented!("{} protocol not supported", protocol),
118148
}
119149
}

connectorx/src/get_arrow2.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use crate::sources::mysql::{BinaryProtocol as MySQLBinaryProtocol, TextProtocol}
33
#[cfg(feature = "src_postgres")]
44
use crate::sources::postgres::{
55
rewrite_tls_args, BinaryProtocol as PgBinaryProtocol, CSVProtocol, CursorProtocol,
6+
SimpleProtocol,
67
};
78
use crate::{prelude::*, sql::CXQuery};
89
use fehler::{throw, throws};
@@ -114,6 +115,36 @@ pub fn get_arrow2(
114115
);
115116
dispatcher.run()?;
116117
}
118+
("simple", Some(tls_conn)) => {
119+
let sb = PostgresSource::<SimpleProtocol, MakeTlsConnector>::new(
120+
config,
121+
tls_conn,
122+
queries.len(),
123+
)?;
124+
let dispatcher = Dispatcher::<
125+
_,
126+
_,
127+
PostgresArrow2Transport<SimpleProtocol, MakeTlsConnector>,
128+
>::new(
129+
sb, &mut destination, queries, origin_query
130+
);
131+
debug!("Running dispatcher");
132+
dispatcher.run()?;
133+
}
134+
("simple", None) => {
135+
let sb =
136+
PostgresSource::<SimpleProtocol, NoTls>::new(config, NoTls, queries.len())?;
137+
let dispatcher = Dispatcher::<
138+
_,
139+
_,
140+
PostgresArrow2Transport<SimpleProtocol, NoTls>,
141+
>::new(
142+
sb, &mut destination, queries, origin_query
143+
);
144+
debug!("Running dispatcher");
145+
dispatcher.run()?;
146+
}
147+
117148
_ => unimplemented!("{} protocol not supported", protocol),
118149
}
119150
}

0 commit comments

Comments
 (0)