Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 106 additions & 100 deletions connectorx-python/connectorx/tests/test_arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import pytest
from pandas.testing import assert_frame_equal
import datetime
from decimal import localcontext, Decimal

from .. import read_sql

Expand Down Expand Up @@ -43,110 +44,115 @@ def test_arrow(postgres_url: str) -> None:
df.sort_values(by="test_int", inplace=True, ignore_index=True)
assert_frame_equal(df, expected, check_names=True)

def decimal_s10(val):
return Decimal(val).quantize(Decimal("0.0000000001"))

def test_arrow_type(postgres_url: str) -> None:
query = "SELECT test_date, test_timestamp, test_timestamptz, test_int2, test_int4, test_int8, test_float4, test_float8, test_numeric, test_bpchar, test_char, test_varchar, test_uuid, test_time, test_bytea, test_json, test_jsonb, test_ltree, test_name FROM test_types"
df = read_sql(postgres_url, query, return_type="arrow")
df = df.to_pandas(date_as_object=False)
df.sort_values(by="test_int2", inplace=True, ignore_index=True)
expected = pd.DataFrame(
index=range(5),
data={
"test_date": pd.Series(
["1970-01-01", "2000-02-28", "2038-01-18", "1901-12-14", None], dtype="datetime64[ms]"
),
"test_timestamp": pd.Series(
[
"1970-01-01 00:00:01",
"2000-02-28 12:00:10",
"2038-01-18 23:59:59",
"1901-12-14 00:00:00.062547",
None,
],
dtype="datetime64[us]",
),
"test_timestamptz": pd.Series(
[
"1970-01-01 00:00:01+00:00",
"2000-02-28 12:00:10-04:00",
"2038-01-18 23:59:59+08:00",
"1901-12-14 00:00:00.062547-12:00",
None,
],
dtype="datetime64[us, UTC]",
),
"test_int2": pd.Series([-32768, 0, 1, 32767], dtype="int16"),
"test_int4": pd.Series([0, 1, -2147483648, 2147483647], dtype="int32"),
"test_int8": pd.Series(
[-9223372036854775808, 0, 9223372036854775807, 1], dtype="float64"
),
"test_float4": pd.Series(
[-1.1, 0.00, 2.123456, -12345.1, None], dtype="float32"
),
"test_float8": pd.Series(
[-1.1, 0.00, 2.12345678901, -12345678901.1, None], dtype="float64"
),
"test_numeric": pd.Series([0.01, 521.34, 0, -1.123e2, None], dtype="float64"),
"test_bpchar": pd.Series(["👨‍🍳 ", "bb ", " ", "ddddd", None], dtype="object"),
"test_char": pd.Series(["a", "ಠ", "😃", "@", None], dtype="object"),
"test_varchar": pd.Series(["abcdefghij", "", "👨‍🍳👨‍🍳👨‍🍳👨", "@", None], dtype="object"),
"test_uuid": pd.Series(
[
"a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11",
"a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11",
"a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11",
"a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11",
None,
],
dtype="object",
),
"test_time": pd.Series(
[
datetime.time(8, 12, 40),
datetime.time(18, 30),
datetime.time(23, 0, 10),
datetime.time(0, 0, 59, 62547),
None,
],
dtype="object",
),
"test_bytea": pd.Series(
[
b'\x08',
b"\xd0\x97\xd0\xb4\xd1\x80\xd0\xb0\xcc\x81\xd0\xb2\xd1\x81\xd1\x82\xd0\xb2\xd1\x83\xd0\xb9\xd1\x82\xd0\xb5",
b"",
b"\xf0\x9f\x98\x9c",
None
],
dtype="object",
),
"test_json": pd.Series(
[
'{"customer":"John Doe","items":{"product":"Beer","qty":6}}',
'{"customer":"Lily Bush","items":{"product":"Diaper","qty":24}}',
'{"customer":"Josh William","items":{"product":"Toy Car","qty":1}}',
'{}',
None,
],
dtype="object",
),
"test_jsonb": pd.Series(
[
'{"customer":"John Doe","items":{"product":"Beer","qty":6}}',
'{"customer":"Lily Bush","items":{"product":"Diaper","qty":24}}',
'{"customer":"Josh William","items":{"product":"Toy Car","qty":1}}',
'{}',
None,
],
dtype="object",
),
"test_ltree": pd.Series(
["A.B.C.D", "A.B.E", "A", "", None], dtype="object"
),
"test_name": pd.Series(
["0", "21", "someName", "101203203-1212323-22131235", None]
)

},
)
with localcontext() as ctx:
ctx.prec = 38
expected = pd.DataFrame(
index=range(5),
data={
"test_date": pd.Series(
["1970-01-01", "2000-02-28", "2038-01-18", "1901-12-14", None], dtype="datetime64[ms]"
),
"test_timestamp": pd.Series(
[
"1970-01-01 00:00:01",
"2000-02-28 12:00:10",
"2038-01-18 23:59:59",
"1901-12-14 00:00:00.062547",
None,
],
dtype="datetime64[us]",
),
"test_timestamptz": pd.Series(
[
"1970-01-01 00:00:01+00:00",
"2000-02-28 12:00:10-04:00",
"2038-01-18 23:59:59+08:00",
"1901-12-14 00:00:00.062547-12:00",
None,
],
dtype="datetime64[us, UTC]",
),
"test_int2": pd.Series([-32768, 0, 1, 32767], dtype="int16"),
"test_int4": pd.Series([0, 1, -2147483648, 2147483647], dtype="int32"),
"test_int8": pd.Series(
[-9223372036854775808, 0, 9223372036854775807, 1], dtype="float64"
),
"test_float4": pd.Series(
[-1.1, 0.00, 2.123456, -12345.1, None], dtype="float32"
),
"test_float8": pd.Series(
[-1.1, 0.00, 2.12345678901, -12345678901.1, None], dtype="float64"
),
"test_numeric": pd.Series([decimal_s10(0.01), decimal_s10(521.34), decimal_s10(0), decimal_s10(-1.123e2), None], dtype="object"),
"test_bpchar": pd.Series(["👨‍🍳 ", "bb ", " ", "ddddd", None], dtype="object"),
"test_char": pd.Series(["a", "ಠ", "😃", "@", None], dtype="object"),
"test_varchar": pd.Series(["abcdefghij", "", "👨‍🍳👨‍🍳👨‍🍳👨", "@", None], dtype="object"),
"test_uuid": pd.Series(
[
"a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11",
"a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11",
"a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11",
"a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11",
None,
],
dtype="object",
),
"test_time": pd.Series(
[
datetime.time(8, 12, 40),
datetime.time(18, 30),
datetime.time(23, 0, 10),
datetime.time(0, 0, 59, 62547),
None,
],
dtype="object",
),
"test_bytea": pd.Series(
[
b'\x08',
b"\xd0\x97\xd0\xb4\xd1\x80\xd0\xb0\xcc\x81\xd0\xb2\xd1\x81\xd1\x82\xd0\xb2\xd1\x83\xd0\xb9\xd1\x82\xd0\xb5",
b"",
b"\xf0\x9f\x98\x9c",
None
],
dtype="object",
),
"test_json": pd.Series(
[
'{"customer":"John Doe","items":{"product":"Beer","qty":6}}',
'{"customer":"Lily Bush","items":{"product":"Diaper","qty":24}}',
'{"customer":"Josh William","items":{"product":"Toy Car","qty":1}}',
'{}',
None,
],
dtype="object",
),
"test_jsonb": pd.Series(
[
'{"customer":"John Doe","items":{"product":"Beer","qty":6}}',
'{"customer":"Lily Bush","items":{"product":"Diaper","qty":24}}',
'{"customer":"Josh William","items":{"product":"Toy Car","qty":1}}',
'{}',
None,
],
dtype="object",
),
"test_ltree": pd.Series(
["A.B.C.D", "A.B.E", "A", "", None], dtype="object"
),
"test_name": pd.Series(
["0", "21", "someName", "101203203-1212323-22131235", None]
)

},
)

assert_frame_equal(df, expected, check_names=True)
139 changes: 136 additions & 3 deletions connectorx/src/destinations/arrow/arrow_assoc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,18 @@ use super::{
errors::{ArrowDestinationError, Result},
typesystem::{DateTimeWrapperMicro, NaiveDateTimeWrapperMicro, NaiveTimeWrapperMicro},
};
use crate::constants::SECONDS_IN_DAY;
use crate::{constants::SECONDS_IN_DAY, utils::decimal_to_i128};
use arrow::array::{
ArrayBuilder, BooleanBuilder, Date32Builder, Float32Builder, Float64Builder, Int16Builder,
Int32Builder, Int64Builder, LargeBinaryBuilder, LargeListBuilder, StringBuilder,
ArrayBuilder, BooleanBuilder, Date32Builder, Decimal128Builder, Float32Builder, Float64Builder,
Int16Builder, Int32Builder, Int64Builder, LargeBinaryBuilder, LargeListBuilder, StringBuilder,
Time64MicrosecondBuilder, Time64NanosecondBuilder, TimestampMicrosecondBuilder,
TimestampNanosecondBuilder, UInt16Builder, UInt32Builder, UInt64Builder,
};
use arrow::datatypes::Field;
use arrow::datatypes::{DataType as ArrowDataType, TimeUnit};
use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Timelike, Utc};
use fehler::throws;
use rust_decimal::Decimal;

/// Associate arrow builder with native type
pub trait ArrowAssoc {
Expand Down Expand Up @@ -71,6 +72,51 @@ impl_arrow_assoc!(f32, ArrowDataType::Float32, Float32Builder);
impl_arrow_assoc!(f64, ArrowDataType::Float64, Float64Builder);
impl_arrow_assoc!(bool, ArrowDataType::Boolean, BooleanBuilder);

const DEFAULT_ARROW_DECIMAL_PRECISION: u8 = 38;
const DEFAULT_ARROW_DECIMAL_SCALE: i8 = 10;
const DEFAULT_ARROW_DECIMAL: ArrowDataType =
ArrowDataType::Decimal128(DEFAULT_ARROW_DECIMAL_PRECISION, DEFAULT_ARROW_DECIMAL_SCALE);

impl ArrowAssoc for Decimal {
type Builder = Decimal128Builder;

fn builder(nrows: usize) -> Self::Builder {
Decimal128Builder::with_capacity(nrows).with_data_type(DEFAULT_ARROW_DECIMAL)
}

fn append(builder: &mut Self::Builder, value: Self) -> Result<()> {
builder.append_value(decimal_to_i128(value, DEFAULT_ARROW_DECIMAL_SCALE as u32)?);
Ok(())
}

fn field(header: &str) -> Field {
Field::new(header, DEFAULT_ARROW_DECIMAL, false)
}
}

impl ArrowAssoc for Option<Decimal> {
type Builder = Decimal128Builder;

fn builder(nrows: usize) -> Self::Builder {
Decimal128Builder::with_capacity(nrows).with_data_type(DEFAULT_ARROW_DECIMAL)
}

fn append(builder: &mut Self::Builder, value: Self) -> Result<()> {
match value {
Some(v) => builder.append_option(Some(decimal_to_i128(
v,
DEFAULT_ARROW_DECIMAL_SCALE as u32,
)?)),
None => builder.append_null(),
}
Ok(())
}

fn field(header: &str) -> Field {
Field::new(header, DEFAULT_ARROW_DECIMAL, true)
}
}

impl ArrowAssoc for &str {
type Builder = StringBuilder;

Expand Down Expand Up @@ -486,6 +532,93 @@ impl ArrowAssoc for Vec<u8> {
}
}

impl ArrowAssoc for Option<Vec<Option<Decimal>>> {
type Builder = LargeListBuilder<Decimal128Builder>;

fn builder(nrows: usize) -> Self::Builder {
LargeListBuilder::with_capacity(
Decimal128Builder::with_capacity(nrows).with_data_type(DEFAULT_ARROW_DECIMAL),
nrows,
)
}

fn append(builder: &mut Self::Builder, value: Self) -> Result<()> {
match value {
Some(vals) => {
let mut list = vec![];

for val in vals {
match val {
Some(v) => {
list.push(Some(decimal_to_i128(
v,
DEFAULT_ARROW_DECIMAL_SCALE as u32,
)?));
}
None => list.push(None),
}
}

builder.append_value(list);
}
None => builder.append_null(),
};
Ok(())
}

fn field(header: &str) -> Field {
Field::new(
header,
ArrowDataType::LargeList(std::sync::Arc::new(Field::new_list_field(
DEFAULT_ARROW_DECIMAL,
true,
))),
true,
)
}
}

impl ArrowAssoc for Vec<Option<Decimal>> {
type Builder = LargeListBuilder<Decimal128Builder>;

fn builder(nrows: usize) -> Self::Builder {
LargeListBuilder::with_capacity(
Decimal128Builder::with_capacity(nrows).with_data_type(DEFAULT_ARROW_DECIMAL),
nrows,
)
}

fn append(builder: &mut Self::Builder, vals: Self) -> Result<()> {
let mut list = vec![];

for val in vals {
match val {
Some(v) => {
list.push(Some(decimal_to_i128(
v,
DEFAULT_ARROW_DECIMAL_SCALE as u32,
)?));
}
None => list.push(None),
}
}

builder.append_value(list);
Ok(())
}

fn field(header: &str) -> Field {
Field::new(
header,
ArrowDataType::LargeList(std::sync::Arc::new(Field::new_list_field(
DEFAULT_ARROW_DECIMAL,
false,
))),
false,
)
}
}

macro_rules! impl_arrow_array_assoc {
($T:ty, $AT:expr, $B:ident) => {
impl ArrowAssoc for $T {
Expand Down
Loading
Loading