Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,655 changes: 918 additions & 737 deletions Cargo.lock

Large diffs are not rendered by default.

1,418 changes: 837 additions & 581 deletions connectorx-python/Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion connectorx-python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ readme = "README.md"

[dependencies]
anyhow = "1"
arrow = "12"
arrow = { version = "22", features = ["ffi"]}
arrow2 = {version = "0.10", default-features = false}
bitfield = "0.13"
bytes = "1"
Expand Down
6 changes: 3 additions & 3 deletions connectorx-python/connectorx/tests/test_arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,9 @@ def test_arrow2_type(postgres_url: str) -> None:
),
"test_jsonb": pd.Series(
[
'{"qty":6,"product":"Beer"}',
'{"qty":24,"product":"Diaper"}',
'{"qty":1,"product":"Toy Car"}',
'{"product":"Beer","qty":6}',
'{"product":"Diaper","qty":24}',
'{"product":"Toy Car","qty":1}',
None,
],
dtype="object",
Expand Down
2 changes: 1 addition & 1 deletion connectorx-python/connectorx/tests/test_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def test_fed_spja(db1_url: str, db2_url: str) -> None:
index=range(3),
data={
"test_bool": pd.Series([True, False, None], dtype="object"),
"AVG_FLOAT": pd.Series([None, 3, 5.45], dtype="float64"),
"AVG_FLOAT": pd.Series([0, 3, 5.45], dtype="float64"),
"SUM_INT": pd.Series([1, 3, 4], dtype="int64"),
},
)
Expand Down
18 changes: 9 additions & 9 deletions connectorx-python/connectorx/tests/test_postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -510,9 +510,9 @@ def test_types_binary(postgres_url: str) -> None:
),
"test_jsonb": pd.Series(
[
'{"qty":6,"product":"Beer"}',
'{"qty":24,"product":"Diaper"}',
'{"qty":1,"product":"Toy Car"}',
'{"product":"Beer","qty":6}',
'{"product":"Diaper","qty":24}',
'{"product":"Toy Car","qty":1}',
None,
],
dtype="object",
Expand Down Expand Up @@ -614,9 +614,9 @@ def test_types_csv(postgres_url: str) -> None:
),
"test_jsonb": pd.Series(
[
'{"qty":6,"product":"Beer"}',
'{"qty":24,"product":"Diaper"}',
'{"qty":1,"product":"Toy Car"}',
'{"product":"Beer","qty":6}',
'{"product":"Diaper","qty":24}',
'{"product":"Toy Car","qty":1}',
None,
],
dtype="object",
Expand Down Expand Up @@ -718,9 +718,9 @@ def test_types_cursor(postgres_url: str) -> None:
),
"test_jsonb": pd.Series(
[
'{"qty":6,"product":"Beer"}',
'{"qty":24,"product":"Diaper"}',
'{"qty":1,"product":"Toy Car"}',
'{"product":"Beer","qty":6}',
'{"product":"Diaper","qty":24}',
'{"product":"Toy Car","qty":1}',
None,
],
dtype="object",
Expand Down
5 changes: 4 additions & 1 deletion connectorx-python/src/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use fehler::throws;
use libc::uintptr_t;
use pyo3::prelude::*;
use pyo3::{PyAny, Python};
use std::convert::TryFrom;

#[throws(ConnectorXPythonError)]
pub fn write_arrow<'a>(
Expand Down Expand Up @@ -38,7 +39,9 @@ pub fn to_ptrs(rbs: Vec<RecordBatch>) -> (Vec<String>, Vec<Vec<(uintptr_t, uintp
let mut cols = vec![];

for array in rb.columns() {
let (array_ptr, schema_ptr) = array.to_raw().expect("c ptr");
let data = array.data().clone();
let array = arrow::ffi::ArrowArray::try_from(data).expect("c ptr");
let (array_ptr, schema_ptr) = arrow::ffi::ArrowArray::into_raw(array);
cols.push((array_ptr as uintptr_t, schema_ptr as uintptr_t));
}

Expand Down
5 changes: 2 additions & 3 deletions connectorx/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ sqlparser = "0.11"
thiserror = "1"
url = "2"

arrow = {version = "12", optional = true, features = ["prettyprint"]}
arrow = {version = "22", optional = true, features = ["prettyprint"]}
arrow2 = {version = "0.10", default-features = false, optional = true}
bb8 = {version = "0.7", optional = true}
bb8-tiberius = {version = "0.5", optional = true}
Expand Down Expand Up @@ -53,8 +53,7 @@ tokio = {version = "1", features = ["rt", "rt-multi-thread", "net"], optional =
urlencoding = {version = "2.1", optional = true}
uuid = {version = "0.8", optional = true}
j4rs = {version = "0.13", optional = true}
datafusion = {git = "https://github.com/apache/arrow-datafusion", rev = "93a7054b837cec2418adc427a6505dcea92e6755", optional = true}
# datafusion = {version = "8.0.0", optional = true}
datafusion = {version = "12", optional = true}

[lib]
crate-type = ["cdylib", "rlib"]
Expand Down
2 changes: 1 addition & 1 deletion connectorx/examples/federated_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ fn main() {
let df = rt.block_on(ctx.sql(local_sql.as_str())).unwrap();
rt.block_on(df.explain(false, false).unwrap().show())
.unwrap();
rt.block_on(df.limit(5).unwrap().show()).unwrap();
rt.block_on(df.limit(5, None).unwrap().show()).unwrap();
let num_rows = rt
.block_on(df.collect())
.unwrap()
Expand Down
68 changes: 35 additions & 33 deletions connectorx/src/destinations/arrow/arrow_assoc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ macro_rules! impl_arrow_assoc {
type Builder = $B;

fn builder(nrows: usize) -> Self::Builder {
Self::Builder::new(nrows)
Self::Builder::with_capacity(nrows)
}

#[throws(ArrowDestinationError)]
fn append(builder: &mut Self::Builder, value: Self) {
builder.append_value(value)?;
builder.append_value(value);
}

fn field(header: &str) -> Field {
Expand All @@ -42,12 +42,12 @@ macro_rules! impl_arrow_assoc {
type Builder = $B;

fn builder(nrows: usize) -> Self::Builder {
Self::Builder::new(nrows)
Self::Builder::with_capacity(nrows)
}

#[throws(ArrowDestinationError)]
fn append(builder: &mut Self::Builder, value: Self) {
builder.append_option(value)?;
builder.append_option(value);
}

fn field(header: &str) -> Field {
Expand All @@ -69,12 +69,12 @@ impl ArrowAssoc for &str {
type Builder = StringBuilder;

fn builder(nrows: usize) -> Self::Builder {
StringBuilder::new(nrows)
StringBuilder::with_capacity(1024, nrows)
}

#[throws(ArrowDestinationError)]
fn append(builder: &mut Self::Builder, value: Self) {
builder.append_value(value)?;
builder.append_value(value);
}

fn field(header: &str) -> Field {
Expand All @@ -86,14 +86,14 @@ impl ArrowAssoc for Option<&str> {
type Builder = StringBuilder;

fn builder(nrows: usize) -> Self::Builder {
StringBuilder::new(nrows)
StringBuilder::with_capacity(1024, nrows)
}

#[throws(ArrowDestinationError)]
fn append(builder: &mut Self::Builder, value: Self) {
match value {
Some(s) => builder.append_value(s)?,
None => builder.append_null()?,
Some(s) => builder.append_value(s),
None => builder.append_null(),
}
}

Expand All @@ -106,12 +106,12 @@ impl ArrowAssoc for String {
type Builder = StringBuilder;

fn builder(nrows: usize) -> Self::Builder {
StringBuilder::new(nrows)
StringBuilder::with_capacity(1024, nrows)
}

#[throws(ArrowDestinationError)]
fn append(builder: &mut Self::Builder, value: String) {
builder.append_value(value.as_str())?;
builder.append_value(value.as_str());
}

fn field(header: &str) -> Field {
Expand All @@ -123,14 +123,14 @@ impl ArrowAssoc for Option<String> {
type Builder = StringBuilder;

fn builder(nrows: usize) -> Self::Builder {
StringBuilder::new(nrows)
StringBuilder::with_capacity(1024, nrows)
}

#[throws(ArrowDestinationError)]
fn append(builder: &mut Self::Builder, value: Self) {
match value {
Some(s) => builder.append_value(s.as_str())?,
None => builder.append_null()?,
Some(s) => builder.append_value(s.as_str()),
None => builder.append_null(),
}
}

Expand Down Expand Up @@ -215,11 +215,11 @@ impl ArrowAssoc for Option<NaiveDate> {
type Builder = Date32Builder;

fn builder(nrows: usize) -> Self::Builder {
Date32Builder::new(nrows)
Date32Builder::with_capacity(nrows)
}

fn append(builder: &mut Self::Builder, value: Option<NaiveDate>) -> Result<()> {
builder.append_option(value.map(naive_date_to_arrow))?;
builder.append_option(value.map(naive_date_to_arrow));
Ok(())
}

Expand All @@ -232,11 +232,11 @@ impl ArrowAssoc for NaiveDate {
type Builder = Date32Builder;

fn builder(nrows: usize) -> Self::Builder {
Date32Builder::new(nrows)
Date32Builder::with_capacity(nrows)
}

fn append(builder: &mut Self::Builder, value: NaiveDate) -> Result<()> {
builder.append_value(naive_date_to_arrow(value))?;
builder.append_value(naive_date_to_arrow(value));
Ok(())
}

Expand All @@ -249,11 +249,11 @@ impl ArrowAssoc for Option<NaiveDateTime> {
type Builder = Date64Builder;

fn builder(nrows: usize) -> Self::Builder {
Date64Builder::new(nrows)
Date64Builder::with_capacity(nrows)
}

fn append(builder: &mut Self::Builder, value: Option<NaiveDateTime>) -> Result<()> {
builder.append_option(value.map(naive_datetime_to_arrow))?;
builder.append_option(value.map(naive_datetime_to_arrow));
Ok(())
}

Expand All @@ -266,11 +266,11 @@ impl ArrowAssoc for NaiveDateTime {
type Builder = Date64Builder;

fn builder(nrows: usize) -> Self::Builder {
Date64Builder::new(nrows)
Date64Builder::with_capacity(nrows)
}

fn append(builder: &mut Self::Builder, value: NaiveDateTime) -> Result<()> {
builder.append_value(naive_datetime_to_arrow(value))?;
builder.append_value(naive_datetime_to_arrow(value));
Ok(())
}

Expand All @@ -283,13 +283,15 @@ impl ArrowAssoc for Option<NaiveTime> {
type Builder = Time64NanosecondBuilder;

fn builder(nrows: usize) -> Self::Builder {
Time64NanosecondBuilder::new(nrows)
Time64NanosecondBuilder::with_capacity(nrows)
}

fn append(builder: &mut Self::Builder, value: Option<NaiveTime>) -> Result<()> {
builder.append_option(value.map(|t| {
t.num_seconds_from_midnight() as i64 * 1_000_000_000 + t.nanosecond() as i64
}))?;
builder.append_option(
value.map(|t| {
t.num_seconds_from_midnight() as i64 * 1_000_000_000 + t.nanosecond() as i64
}),
);
Ok(())
}

Expand All @@ -302,13 +304,13 @@ impl ArrowAssoc for NaiveTime {
type Builder = Time64NanosecondBuilder;

fn builder(nrows: usize) -> Self::Builder {
Time64NanosecondBuilder::new(nrows)
Time64NanosecondBuilder::with_capacity(nrows)
}

fn append(builder: &mut Self::Builder, value: NaiveTime) -> Result<()> {
builder.append_value(
value.num_seconds_from_midnight() as i64 * 1_000_000_000 + value.nanosecond() as i64,
)?;
);
Ok(())
}

Expand All @@ -321,13 +323,13 @@ impl ArrowAssoc for Option<Vec<u8>> {
type Builder = LargeBinaryBuilder;

fn builder(nrows: usize) -> Self::Builder {
LargeBinaryBuilder::new(nrows)
LargeBinaryBuilder::with_capacity(1024, nrows)
}

fn append(builder: &mut Self::Builder, value: Self) -> Result<()> {
match value {
Some(v) => builder.append_value(v)?,
None => builder.append_null()?,
Some(v) => builder.append_value(v),
None => builder.append_null(),
};
Ok(())
}
Expand All @@ -341,11 +343,11 @@ impl ArrowAssoc for Vec<u8> {
type Builder = LargeBinaryBuilder;

fn builder(nrows: usize) -> Self::Builder {
LargeBinaryBuilder::new(nrows)
LargeBinaryBuilder::with_capacity(1024, nrows)
}

fn append(builder: &mut Self::Builder, value: Self) -> Result<()> {
builder.append_value(value)?;
builder.append_value(value);
Ok(())
}

Expand Down