Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions connectorx-python/connectorx/tests/test_postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ def test_postgres_with_index_col(postgres_url: str) -> None:


def test_postgres_types_binary(postgres_url: str) -> None:
query = "SELECT test_date, test_timestamp, test_timestamptz, test_int16, test_int64, test_float32, test_numeric, test_bpchar, test_char, test_varchar, test_uuid, test_time, test_json, test_jsonb, test_bytea, test_enum, test_f4array, test_f8array, test_narray, test_i2array, test_i4array, test_i8array, test_citext, test_ltree, test_lquery, test_ltxtquery FROM test_types"
query = "SELECT test_date, test_timestamp, test_timestamptz, test_int16, test_int64, test_float32, test_numeric, test_bpchar, test_char, test_varchar, test_uuid, test_time, test_json, test_jsonb, test_bytea, test_enum, test_f4array, test_f8array, test_narray, test_boolarray, test_i2array, test_i4array, test_i8array, test_citext, test_ltree, test_lquery, test_ltxtquery FROM test_types"
df = read_sql(postgres_url, query)
expected = pd.DataFrame(
index=range(4),
Expand Down Expand Up @@ -538,6 +538,9 @@ def test_postgres_types_binary(postgres_url: str) -> None:
"test_narray": pd.Series(
[[], None, [521.34], [0.12, 333.33, 22.22]], dtype="object"
),
"test_boolarray": pd.Series(
[[True, False], [], [True], None], dtype="object"
),
"test_i2array": pd.Series(
[[-1, 0, 1], [], [-32768, 32767], None], dtype="object"
),
Expand All @@ -560,7 +563,7 @@ def test_postgres_types_binary(postgres_url: str) -> None:


def test_postgres_types_csv(postgres_url: str) -> None:
query = "SELECT test_date, test_timestamp, test_timestamptz, test_int16, test_int64, test_float32, test_numeric, test_bpchar, test_char, test_varchar, test_uuid, test_time, test_json, test_jsonb, test_bytea, test_enum::text, test_f4array, test_f8array, test_narray, test_i2array, test_i4array, test_i8array, test_citext, test_ltree FROM test_types"
query = "SELECT test_date, test_timestamp, test_timestamptz, test_int16, test_int64, test_float32, test_numeric, test_bpchar, test_char, test_varchar, test_uuid, test_time, test_json, test_jsonb, test_bytea, test_enum::text, test_f4array, test_f8array, test_narray, test_boolarray, test_i2array, test_i4array, test_i8array, test_citext, test_ltree FROM test_types"
df = read_sql(postgres_url, query, protocol="csv")
expected = pd.DataFrame(
index=range(4),
Expand Down Expand Up @@ -648,6 +651,9 @@ def test_postgres_types_csv(postgres_url: str) -> None:
"test_narray": pd.Series(
[[], None, [521.34], [0.12, 333.33, 22.22]], dtype="object"
),
"test_boolarray": pd.Series(
[[True, False], [], [True], None], dtype="object"
),
"test_i2array": pd.Series(
[[-1, 0, 1], [], [-32768, 32767], None], dtype="object"
),
Expand All @@ -666,7 +672,7 @@ def test_postgres_types_csv(postgres_url: str) -> None:


def test_postgres_types_cursor(postgres_url: str) -> None:
query = "SELECT test_date, test_timestamp, test_timestamptz, test_int16, test_int64, test_float32, test_numeric, test_bpchar, test_char, test_varchar, test_uuid, test_time, test_json, test_jsonb, test_bytea, test_enum::text, test_f4array, test_f8array, test_narray, test_i2array, test_i4array, test_i8array, test_citext, test_ltree FROM test_types"
query = "SELECT test_date, test_timestamp, test_timestamptz, test_int16, test_int64, test_float32, test_numeric, test_bpchar, test_char, test_varchar, test_uuid, test_time, test_json, test_jsonb, test_bytea, test_enum::text, test_f4array, test_f8array, test_narray, test_boolarray, test_i2array, test_i4array, test_i8array, test_citext, test_ltree FROM test_types"
df = read_sql(postgres_url, query, protocol="cursor")
expected = pd.DataFrame(
index=range(4),
Expand Down Expand Up @@ -754,6 +760,9 @@ def test_postgres_types_cursor(postgres_url: str) -> None:
"test_narray": pd.Series(
[[], None, [521.34], [0.12, 333.33, 22.22]], dtype="object"
),
"test_boolarray": pd.Series(
[[True, False], [], [True], None], dtype="object"
),
"test_i2array": pd.Series(
[[-1, 0, 1], [], [-32768, 32767], None], dtype="object"
),
Expand All @@ -772,7 +781,7 @@ def test_postgres_types_cursor(postgres_url: str) -> None:


def test_postgres_types_simple(postgres_url: str) -> None:
query = "SELECT test_date, test_timestamp, test_timestamptz, test_int16, test_int64, test_float32, test_numeric, test_bpchar, test_char, test_varchar, test_uuid, test_time, test_bytea, test_enum, test_f4array, test_f8array, test_narray, test_i2array, test_i4array, test_i8array FROM test_types"
query = "SELECT test_date, test_timestamp, test_timestamptz, test_int16, test_int64, test_float32, test_numeric, test_bpchar, test_char, test_varchar, test_uuid, test_time, test_bytea, test_enum, test_f4array, test_f8array, test_narray, test_boolarray, test_i2array, test_i4array, test_i8array FROM test_types"
df = read_sql(postgres_url, query, protocol="simple")
expected = pd.DataFrame(
index=range(4),
Expand Down Expand Up @@ -842,6 +851,9 @@ def test_postgres_types_simple(postgres_url: str) -> None:
"test_narray": pd.Series(
[[], None, [521.34], [0.12, 333.33, 22.22]], dtype="object"
),
"test_boolarray": pd.Series(
[[True, False], [], [True], None], dtype="object"
),
"test_i2array": pd.Series(
[[-1, 0, 1], [], [-32768, 32767], None], dtype="object"
),
Expand Down
14 changes: 14 additions & 0 deletions connectorx-python/src/pandas/destination.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,9 @@ impl<'a> Destination for PandasDestination<'a> {
PandasBlockType::Float64 => {
self.allocate_array::<f64>(dt, placement)?;
}
PandasBlockType::BooleanArray => {
self.allocate_array::<super::pandas_columns::PyList>(dt, placement)?;
}
PandasBlockType::Float64Array => {
self.allocate_array::<super::pandas_columns::PyList>(dt, placement)?;
}
Expand Down Expand Up @@ -219,6 +222,17 @@ impl<'a> Destination for PandasDestination<'a> {
.collect()
}
}
PandasBlockType::BooleanArray => {
let bblock = ArrayBlock::<bool>::extract(buf)?;
let bcols = bblock.split()?;
for (&cid, bcol) in block.cids.iter().zip_eq(bcols) {
partitioned_columns[cid] = bcol
.partition(counts)
.into_iter()
.map(|c| Box::new(c) as _)
.collect()
}
}
PandasBlockType::Float64Array => {
let fblock = ArrayBlock::<f64>::extract(buf)?;
let fcols = fblock.split()?;
Expand Down
35 changes: 35 additions & 0 deletions connectorx-python/src/pandas/pandas_columns/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,33 @@ where
}
}

impl PandasColumn<Vec<bool>> for ArrayColumn<bool> {
#[throws(ConnectorXPythonError)]
fn write(&mut self, val: Vec<bool>, row: usize) {
self.lengths.push(val.len());
self.buffer.extend_from_slice(&val[..]);
self.row_idx.push(row);
self.try_flush()?;
}
}

impl PandasColumn<Option<Vec<bool>>> for ArrayColumn<bool> {
#[throws(ConnectorXPythonError)]
fn write(&mut self, val: Option<Vec<bool>>, row: usize) {
match val {
Some(v) => {
self.lengths.push(v.len());
self.buffer.extend_from_slice(&v[..]);
self.row_idx.push(row);
self.try_flush()?;
}
None => {
self.lengths.push(usize::MAX);
self.row_idx.push(row);
}
}
}
}
impl PandasColumn<Vec<f64>> for ArrayColumn<f64> {
#[throws(ConnectorXPythonError)]
fn write(&mut self, val: Vec<f64>, row: usize) {
Expand Down Expand Up @@ -150,6 +177,14 @@ impl PandasColumn<Option<Vec<i64>>> for ArrayColumn<i64> {
}
}

impl HasPandasColumn for Vec<bool> {
type PandasColumn<'a> = ArrayColumn<bool>;
}

impl HasPandasColumn for Option<Vec<bool>> {
type PandasColumn<'a> = ArrayColumn<bool>;
}

impl HasPandasColumn for Vec<f64> {
type PandasColumn<'a> = ArrayColumn<f64>;
}
Expand Down
1 change: 1 addition & 0 deletions connectorx-python/src/pandas/transports/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ macro_rules! impl_postgres_transport {
{ Int2[i16] => I64[i64] | conversion auto }
{ Int4[i32] => I64[i64] | conversion auto }
{ Int8[i64] => I64[i64] | conversion auto }
{ BoolArray[Vec<bool>] => BoolArray[Vec<bool>] | conversion auto_vec }
{ Int2Array[Vec<i16>] => I64Array[Vec<i64>] | conversion auto_vec }
{ Int4Array[Vec<i32>] => I64Array[Vec<i64>] | conversion auto_vec }
{ Int8Array[Vec<i64>] => I64Array[Vec<i64>] | conversion auto }
Expand Down
4 changes: 4 additions & 0 deletions connectorx-python/src/pandas/typesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub enum PandasTypeSystem {
F64Array(bool),
I64Array(bool),
Bool(bool),
BoolArray(bool),
Char(bool),
Str(bool),
BoxStr(bool),
Expand All @@ -23,6 +24,7 @@ pub enum PandasBlockType {
Boolean(bool), // bool indicates nullablity
Int64(bool),
Float64,
BooleanArray,
Int64Array,
Float64Array,
String,
Expand Down Expand Up @@ -54,6 +56,7 @@ impl From<PandasTypeSystem> for PandasBlockType {
PandasTypeSystem::Bool(nullable) => PandasBlockType::Boolean(nullable),
PandasTypeSystem::I64(nullable) => PandasBlockType::Int64(nullable),
PandasTypeSystem::F64(_) => PandasBlockType::Float64,
PandasTypeSystem::BoolArray(_) => PandasBlockType::BooleanArray,
PandasTypeSystem::F64Array(_) => PandasBlockType::Float64Array,
PandasTypeSystem::I64Array(_) => PandasBlockType::Int64Array,
PandasTypeSystem::String(_)
Expand All @@ -74,6 +77,7 @@ impl_typesystem! {
{ F64Array => Vec<f64> }
{ I64Array => Vec<i64> }
{ Bool => bool }
{ BoolArray => Vec<bool> }
{ Char => char }
{ Str => &'r str }
{ BoxStr => Box<str> }
Expand Down
29 changes: 18 additions & 11 deletions connectorx/src/destinations/arrow2/arrow_assoc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,12 @@ impl_arrow_assoc!(f64, ArrowDataType::Float64, MutablePrimitiveArray<f64>);
impl_arrow_assoc!(bool, ArrowDataType::Boolean, MutableBooleanArray);

macro_rules! impl_arrow_assoc_vec {
($T:ty, $AT:expr) => {
($T:ty, $PT:ty, $AT:expr) => {
impl ArrowAssoc for Vec<$T> {
type Builder = MutableListArray<i64, MutablePrimitiveArray<$T>>;
type Builder = MutableListArray<i64, $PT>;

fn builder(nrows: usize) -> Self::Builder {
MutableListArray::<i64, MutablePrimitiveArray<$T>>::with_capacity(nrows)
MutableListArray::<i64, $PT>::with_capacity(nrows)
}

#[inline]
Expand All @@ -86,10 +86,10 @@ macro_rules! impl_arrow_assoc_vec {
}

impl ArrowAssoc for Option<Vec<$T>> {
type Builder = MutableListArray<i64, MutablePrimitiveArray<$T>>;
type Builder = MutableListArray<i64, $PT>;

fn builder(nrows: usize) -> Self::Builder {
MutableListArray::<i64, MutablePrimitiveArray<$T>>::with_capacity(nrows)
MutableListArray::<i64, $PT>::with_capacity(nrows)
}

#[inline]
Expand All @@ -114,12 +114,19 @@ macro_rules! impl_arrow_assoc_vec {
};
}

impl_arrow_assoc_vec!(i32, ArrowDataType::Int32);
impl_arrow_assoc_vec!(i64, ArrowDataType::Int64);
impl_arrow_assoc_vec!(u32, ArrowDataType::UInt32);
impl_arrow_assoc_vec!(u64, ArrowDataType::UInt64);
impl_arrow_assoc_vec!(f32, ArrowDataType::Float32);
impl_arrow_assoc_vec!(f64, ArrowDataType::Float64);
macro_rules! impl_arrow_assoc_primitive_vec {
($T:ty, $AT:expr) => {
impl_arrow_assoc_vec!($T, MutablePrimitiveArray<$T>, $AT);
};
}

impl_arrow_assoc_vec!(bool, MutableBooleanArray, ArrowDataType::Boolean);
impl_arrow_assoc_primitive_vec!(i32, ArrowDataType::Int32);
impl_arrow_assoc_primitive_vec!(i64, ArrowDataType::Int64);
impl_arrow_assoc_primitive_vec!(u32, ArrowDataType::UInt32);
impl_arrow_assoc_primitive_vec!(u64, ArrowDataType::UInt64);
impl_arrow_assoc_primitive_vec!(f32, ArrowDataType::Float32);
impl_arrow_assoc_primitive_vec!(f64, ArrowDataType::Float64);

impl ArrowAssoc for &str {
type Builder = MutableUtf8Array<i64>;
Expand Down
2 changes: 2 additions & 0 deletions connectorx/src/destinations/arrow2/typesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub enum Arrow2TypeSystem {
Date64(bool),
Time64(bool),
DateTimeTz(bool),
BoolArray(bool),
Int32Array(bool),
Int64Array(bool),
UInt32Array(bool),
Expand All @@ -41,6 +42,7 @@ impl_typesystem! {
{ Date64 => NaiveDateTime }
{ Time64 => NaiveTime }
{ DateTimeTz => DateTime<Utc> }
{ BoolArray => Vec<bool> }
{ Int32Array => Vec<i32> }
{ Int64Array => Vec<i64> }
{ UInt32Array => Vec<u32> }
Expand Down
Loading