Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion connectorx/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ native-tls = {version = "0.2", optional = true}
num-traits = {version = "0.2", optional = true}
openssl = {version = "0.10", optional = true, features = ["vendored"]}
oracle = {version = "0.6", optional = true}
postgres = {version = "0.19", features = ["with-chrono-0_4", "with-uuid-1", "with-serde_json-1"], optional = true}
postgres = {version = "0.19", features = ["with-chrono-0_4", "with-uuid-1", "with-serde_json-1","with-cidr-0_2"], optional = true}
postgres-native-tls = {version = "0.5", optional = true}
postgres-openssl = {version = "0.5", optional = true}
mysql_common = {version = "0.32", features = ["chrono"], optional = true}
Expand All @@ -59,6 +59,7 @@ j4rs = {version = "0.22", optional = true}
datafusion = {version = "46", optional = true}
prusto = {version = "0.5", optional = true}
serde = {version = "1", optional = true}
cidr-02 = { version = "0.2", package = "cidr", optional = true }

[lib]
crate-type = ["cdylib", "rlib"]
Expand Down Expand Up @@ -97,6 +98,7 @@ src_postgres = [
"native-tls",
"openssl",
"postgres-openssl",
"cidr-02",
]
src_sqlite = ["rusqlite", "r2d2_sqlite", "fallible-streaming-iterator", "r2d2", "urlencoding"]
src_trino = ["prusto", "uuid", "urlencoding", "rust_decimal", "tokio", "num-traits", "serde"]
Expand Down
7 changes: 5 additions & 2 deletions connectorx/src/sources/postgres/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ mod errors;
mod typesystem;

pub use self::errors::PostgresSourceError;
use cidr_02::IpInet;
pub use connection::rewrite_tls_args;
pub use typesystem::{PostgresTypePairs, PostgresTypeSystem};

Expand Down Expand Up @@ -480,6 +481,7 @@ impl_produce!(
NaiveTime,
Uuid,
Value,
IpInet,
Vec<Option<bool>>,
Vec<Option<i16>>,
Vec<Option<i32>>,
Expand Down Expand Up @@ -704,7 +706,7 @@ macro_rules! impl_csv_produce {
};
}

impl_csv_produce!(i8, i16, i32, i64, f32, f64, Uuid,);
impl_csv_produce!(i8, i16, i32, i64, f32, f64, Uuid, IpInet,);

macro_rules! impl_csv_vec_produce {
($($t: ty,)+) => {
Expand Down Expand Up @@ -1216,6 +1218,7 @@ impl_produce!(
NaiveTime,
Uuid,
Value,
IpInet,
HashMap<String, Option<String>>,
Vec<Option<bool>>,
Vec<Option<String>>,
Expand Down Expand Up @@ -1481,7 +1484,7 @@ macro_rules! impl_simple_produce {
};
}

impl_simple_produce!(i8, i16, i32, i64, f32, f64, Uuid,);
impl_simple_produce!(i8, i16, i32, i64, f32, f64, Uuid, IpInet,);

impl<'r> Produce<'r, bool> for PostgresSimpleSourceParser {
type Error = PostgresSourceError;
Expand Down
4 changes: 4 additions & 0 deletions connectorx/src/sources/postgres/typesystem.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc};
use cidr_02::IpInet;
use postgres::types::Type;
use rust_decimal::Decimal;
use serde_json::Value;
Expand Down Expand Up @@ -38,6 +39,7 @@ pub enum PostgresTypeSystem {
Enum(bool),
HSTORE(bool),
Name(bool),
Inet(bool),
}

impl_typesystem! {
Expand Down Expand Up @@ -68,6 +70,7 @@ impl_typesystem! {
{ UUID => Uuid }
{ JSON | JSONB => Value }
{ HSTORE => HashMap<String, Option<String>> }
{ Inet => IpInet }
}
}

Expand Down Expand Up @@ -104,6 +107,7 @@ impl<'a> From<&'a Type> for PostgresTypeSystem {
"json" => JSON(true),
"jsonb" => JSONB(true),
"hstore" => HSTORE(true),
"inet" => Inet(true),
_ => match ty.kind() {
postgres::types::Kind::Enum(_) => Enum(true),
_ => unimplemented!("{}", ty.name()),
Expand Down
14 changes: 14 additions & 0 deletions connectorx/src/transports/postgres_arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::sources::postgres::{
};
use crate::typesystem::TypeConversion;
use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc};
use cidr_02::IpInet;
use num_traits::ToPrimitive;
use postgres::NoTls;
use postgres_openssl::MakeTlsConnector;
Expand Down Expand Up @@ -65,6 +66,7 @@ macro_rules! impl_postgres_transport {
{ ByteA[Vec<u8>] => LargeBinary[Vec<u8>] | conversion auto }
{ JSON[Value] => LargeUtf8[String] | conversion option }
{ JSONB[Value] => LargeUtf8[String] | conversion none }
{ Inet[IpInet] => LargeUtf8[String] | conversion none }
{ BoolArray[Vec<Option<bool>>] => BoolArray[Vec<Option<bool>>] | conversion auto }
{ VarcharArray[Vec<Option<String>>] => Utf8Array[Vec<Option<String>>] | conversion auto }
{ TextArray[Vec<Option<String>>] => Utf8Array[Vec<Option<String>>] | conversion none }
Expand All @@ -88,6 +90,18 @@ impl_postgres_transport!(CursorProtocol, MakeTlsConnector);
impl_postgres_transport!(SimpleProtocol, NoTls);
impl_postgres_transport!(SimpleProtocol, MakeTlsConnector);

impl<P, C> TypeConversion<IpInet, String> for PostgresArrowTransport<P, C> {
fn convert(val: IpInet) -> String {
val.to_string()
}
}

impl<P, C> TypeConversion<Option<IpInet>, Option<String>> for PostgresArrowTransport<P, C> {
fn convert(val: Option<IpInet>) -> Option<String> {
val.map(|val| val.to_string())
}
}

impl<P, C> TypeConversion<NaiveTime, NaiveTimeWrapperMicro> for PostgresArrowTransport<P, C> {
fn convert(val: NaiveTime) -> NaiveTimeWrapperMicro {
NaiveTimeWrapperMicro(val)
Expand Down
19 changes: 19 additions & 0 deletions connectorx/tests/test_postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,7 @@ fn test_types_binary_postgres() {
"test_varchararray",
"test_textarray",
"test_name",
"test_inet",
]
.join(",");

Expand Down Expand Up @@ -465,6 +466,7 @@ fn test_types_csv_postgres() {
"test_varchararray",
"test_textarray",
"test_name",
"test_inet",
]
.join(",");

Expand Down Expand Up @@ -525,6 +527,7 @@ fn test_types_cursor_postgres() {
"test_varchararray",
"test_textarray",
"test_name",
"test_inet",
]
.join(",");

Expand Down Expand Up @@ -583,6 +586,7 @@ fn test_types_simple_postgres() {
"test_varchararray",
"test_textarray",
"test_name",
"test_inet",
]
.join(",");

Expand Down Expand Up @@ -1220,6 +1224,21 @@ pub fn verify_arrow_type_results(result: Vec<RecordBatch>, protocol: &str) {
Some("101203203-1212323-22131235"),
None,
])));

// test_inet
col += 1;
assert!(result[0]
.column(col)
.as_any()
.downcast_ref::<StringArray>()
.unwrap()
.eq(&StringArray::from(vec![
Some("192.168.1.1"),
Some("10.0.0.0/24"),
Some("2001:db8::1"),
Some("2001:db8::/32"),
None,
])));
}

#[test]
Expand Down
15 changes: 8 additions & 7 deletions scripts/postgres.sql
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,16 @@ CREATE TABLE IF NOT EXISTS test_types(
test_ltxtquery ltxtquery,
test_varchararray VARCHAR[],
test_textarray TEXT[],
test_name NAME
test_name NAME,
test_inet INET
);

/* test_bool test_date test_timestamp test_timestamptz test_int2 test_int4 test_int8 test_float4 test_float8 test_numeric test_bpchar test_char test_varchar test_uuid test_time test_interval test_json test_jsonb test_bytea test_enum test_f4array test_f8array test_narray test_boolarray test_i2array test_i4array test_i8array test_citext test_ltree test_lquery test_ltxtquery test_varchararray test_textarray test_name */
INSERT INTO test_types VALUES ( TRUE, '1970-01-01', '1970-01-01 00:00:01', '1970-01-01 00:00:01-00', -32768, 0, -9223372036854775808, -1.1, -1.1, .01, '👨‍🍳', 'a', 'abcdefghij', 'a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11', '08:12:40', '1 year 2 months 3 days', '{"customer": "John Doe", "items": {"product": "Beer", "qty": 6}}', '{"customer": "John Doe", "items": {"product": "Beer", "qty": 6}}', '\010', 'happy', '{-1.1, 0.00}', '{-1.1, 0.00}', '{0.01, 521.23}', '{true, false}', '{12}', '{-1}', '{-9223372036854775808, 9223372036854775807}', 'str_citext', 'A.B.C.D', '*.B.*', 'A & B*', ARRAY['str1','str2'], ARRAY['str1','str2'], '0' );
INSERT INTO test_types VALUES ( true, '2000-02-28', '2000-02-28 12:00:10', '2000-02-28 12:00:10-04', 0, 1, 0, 0.00, 0.0000, 521.34, 'bb', 'ಠ', '', 'a0ee-bc99-9c0b-4ef8-bb6d-6bb9-bd38-0a11', '18:30:00', '2 weeks ago', '{"customer": "Lily Bush", "items": {"product": "Diaper", "qty": 24}}', '{"customer": "Lily Bush", "items": {"product": "Diaper", "qty": 24}}', 'Здра́вствуйте', 'very happy', '{}', '{}', '{0.12, 333.33, 22.22}', '{}', '{}', '{}', '{}', '', 'A.B.E', 'A.*', 'A | B', '{"0123456789","abcdefghijklmnopqrstuvwxyz","!@#$%^&*()_-+=~`:;<>?/"}', '{"0123456789","abcdefghijklmnopqrstuvwxyz","!@#$%^&*()_-+=~`:;<>?/"}', '21' );
INSERT INTO test_types VALUES ( false, '2038-01-18', '2038-01-18 23:59:59', '2038-01-18 23:59:59+08', 1, -2147483648, 9223372036854775807, 2.123456, 2.12345678901, '1e-5', '', '😃', '👨‍🍳👨‍🍳👨‍🍳👨', 'A0EEBC99-9C0B-4EF8-BB6D-6BB9BD380A11', '23:00:10', '3 months 2 days ago', '{"customer": "Josh William", "items": {"product": "Toy Car", "qty": 1}}', '{"customer": "Josh William", "items": {"product": "Toy Car", "qty": 1}}', '', 'ecstatic', '{1, -2, -12345.1}', '{2.12345678901, -12345678901.1}', '{}', '{true}', '{-32768, 32767}', '{-2147483648, 2147483647}', '{0}', 'abcdef', 'A', '*', 'A@', ARRAY['',' '], ARRAY['',' '], 'someName' );
INSERT INTO test_types VALUES ( False, '1901-12-14', '1901-12-14 00:00:00.062547', '1901-12-14 00:00:00.062547-12', 32767, 2147483647, 1, -12345.1, -12345678901.1, -1.123e2, 'ddddd', '@', '@', '{a0eebc999c0b4ef8bb6d6bb9bd380a11}', '00:00:59.062547', '1 year 2 months 3 days', '{}', '{}', '😜', 'ecstatic', '{2.123456, NULL, 123.123}', '{2.123456, NULL, 123.123}', '{0.0, NULL, -112.1}', '{true, false, NULL}', '{-1, 0, 1, NULL}', '{-1, 0, 1123, NULL}', '{-1, 0, 1, NULL}', '1234', '', '*.A', 'A & B*', ARRAY['👨‍🍳👨‍🍳👨‍🍳👨','', NULL], ARRAY['👨‍🍳👨‍🍳👨‍🍳👨','', NULL], '101203203-1212323-22131235');
INSERT INTO test_types VALUES ( NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL );
/* test_bool test_date test_timestamp test_timestamptz test_int2 test_int4 test_int8 test_float4 test_float8 test_numeric test_bpchar test_char test_varchar test_uuid test_time test_interval test_json test_jsonb test_bytea test_enum test_f4array test_f8array test_narray test_boolarray test_i2array test_i4array test_i8array test_citext test_ltree test_lquery test_ltxtquery test_varchararray test_textarray test_name test_inet */
INSERT INTO test_types VALUES ( TRUE, '1970-01-01', '1970-01-01 00:00:01', '1970-01-01 00:00:01-00', -32768, 0, -9223372036854775808, -1.1, -1.1, .01, '👨‍🍳', 'a', 'abcdefghij', 'a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11', '08:12:40', '1 year 2 months 3 days', '{"customer": "John Doe", "items": {"product": "Beer", "qty": 6}}', '{"customer": "John Doe", "items": {"product": "Beer", "qty": 6}}', '\010', 'happy', '{-1.1, 0.00}', '{-1.1, 0.00}', '{0.01, 521.23}', '{true, false}', '{12}', '{-1}', '{-9223372036854775808, 9223372036854775807}', 'str_citext', 'A.B.C.D', '*.B.*', 'A & B*', ARRAY['str1','str2'], ARRAY['str1','str2'], '0', '192.168.1.1' );
INSERT INTO test_types VALUES ( true, '2000-02-28', '2000-02-28 12:00:10', '2000-02-28 12:00:10-04', 0, 1, 0, 0.00, 0.0000, 521.34, 'bb', 'ಠ', '', 'a0ee-bc99-9c0b-4ef8-bb6d-6bb9-bd38-0a11', '18:30:00', '2 weeks ago', '{"customer": "Lily Bush", "items": {"product": "Diaper", "qty": 24}}', '{"customer": "Lily Bush", "items": {"product": "Diaper", "qty": 24}}', 'Здра́вствуйте', 'very happy', '{}', '{}', '{0.12, 333.33, 22.22}', '{}', '{}', '{}', '{}', '', 'A.B.E', 'A.*', 'A | B', '{"0123456789","abcdefghijklmnopqrstuvwxyz","!@#$%^&*()_-+=~`:;<>?/"}', '{"0123456789","abcdefghijklmnopqrstuvwxyz","!@#$%^&*()_-+=~`:;<>?/"}', '21', '10.0.0.0/24' );
INSERT INTO test_types VALUES ( false, '2038-01-18', '2038-01-18 23:59:59', '2038-01-18 23:59:59+08', 1, -2147483648, 9223372036854775807, 2.123456, 2.12345678901, '1e-5', '', '😃', '👨‍🍳👨‍🍳👨‍🍳👨', 'A0EEBC99-9C0B-4EF8-BB6D-6BB9BD380A11', '23:00:10', '3 months 2 days ago', '{"customer": "Josh William", "items": {"product": "Toy Car", "qty": 1}}', '{"customer": "Josh William", "items": {"product": "Toy Car", "qty": 1}}', '', 'ecstatic', '{1, -2, -12345.1}', '{2.12345678901, -12345678901.1}', '{}', '{true}', '{-32768, 32767}', '{-2147483648, 2147483647}', '{0}', 'abcdef', 'A', '*', 'A@', ARRAY['',' '], ARRAY['',' '], 'someName', '2001:db8::1' );
INSERT INTO test_types VALUES ( False, '1901-12-14', '1901-12-14 00:00:00.062547', '1901-12-14 00:00:00.062547-12', 32767, 2147483647, 1, -12345.1, -12345678901.1, -1.123e2, 'ddddd', '@', '@', '{a0eebc999c0b4ef8bb6d6bb9bd380a11}', '00:00:59.062547', '1 year 2 months 3 days', '{}', '{}', '😜', 'ecstatic', '{2.123456, NULL, 123.123}', '{2.123456, NULL, 123.123}', '{0.0, NULL, -112.1}', '{true, false, NULL}', '{-1, 0, 1, NULL}', '{-1, 0, 1123, NULL}', '{-1, 0, 1, NULL}', '1234', '', '*.A', 'A & B*', ARRAY['👨‍🍳👨‍🍳👨‍🍳👨','', NULL], ARRAY['👨‍🍳👨‍🍳👨‍🍳👨','', NULL], '101203203-1212323-22131235', '2001:db8::/32' );
INSERT INTO test_types VALUES ( NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL );


CREATE OR REPLACE FUNCTION increment(i integer) RETURNS integer AS $$
Expand Down
Loading