Skip to content

Conversation

@chitralverma
Copy link
Contributor

@chitralverma chitralverma commented Jun 20, 2025

Changes

  • Builds on the existing new_record_batch_iter to expose a pyarrow RecordBatchReader on python side
  • Supports completely lazy iterations over arrow stream destination
  • Added kwargs to read_sql, users can pass record_batch_size to control the number of records in each record batch.
  • fixed a few unwraps causing issues
  • Updates RecordBatchReader trait to support Send (helps offload RecordBatchReader to multi-threaded consumers like DuckDB)
  • Left existing implementations as is, ideally those can also rely on record batch approach

Usage/ Example

import connectorx as cx

conn = "mysql://username:password@server:port/database/"
query = "SELECT * FROM employees"

rb_iter = cx.read_sql(
    conn,
    query,
    return_type="arrow_record_batches",
    record_batch_size=120333,
)

closes #278

pub fn to_ptrs<'py>(&self, py: Python<'py>) -> Bound<'py, PyAny> {
let ptrs = py.allow_threads(
|| -> Result<(Vec<String>, Vec<Vec<(uintptr_t, uintptr_t)>>), ConnectorXPythonError> {
let rbs = vec![self.0.clone()];
Copy link
Contributor Author

@chitralverma chitralverma Jun 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this okay or do you suggest any workarounds?

# doesn't work without `.clone()`, breaks with the following 

cannot move out of `self` which is behind a shared reference
move occurs because `self.0` has type `arrow::array::RecordBatch`, which does not implement the `Copy` trait

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can wrap over Option<RecordBatch> instead of RecordBatch along with take to resolve this.

Also, since we are using an iterator to generate a batch at a time, we do no need to wrap over a vector of batches.

@chitralverma
Copy link
Contributor Author

@wangxiaoying for your review.
If this seems ok, I'll update the PR with documentation/ examples and such.

@chitralverma chitralverma changed the title Allow record batches output from read_sql feat(arrow): Allow record batches output from read_sql Jun 20, 2025
@wangxiaoying
Copy link
Contributor

Thanks @chitralverma for the PR! I will take a look at it by the end of this week.

@wangxiaoying wangxiaoying self-requested a review June 28, 2025 19:33
*,
return_type: Literal[
"pandas", "polars", "arrow", "modin", "dask"
"pandas", "polars", "arrow", "modin", "dask", "arrow_record_batches"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be using arrow_stream instead of arrow_record_batches for simplicity?

elif return_type in {"arrow", "polars", "arrow_record_batches"}:
try_import_module("pyarrow")

record_batch_size = int(kwargs.get("record_batch_size", 10000))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe batch_size instead of record_batch_size for simplicity?

pub fn to_ptrs<'py>(&self, py: Python<'py>) -> Bound<'py, PyAny> {
let ptrs = py.allow_threads(
|| -> Result<(Vec<String>, Vec<Vec<(uintptr_t, uintptr_t)>>), ConnectorXPythonError> {
let rbs = vec![self.0.clone()];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can wrap over Option<RecordBatch> instead of RecordBatch along with take to resolve this.

Also, since we are using an iterator to generate a batch at a time, we do no need to wrap over a vector of batches.

@wangxiaoying
Copy link
Contributor

Hi @chitralverma , thanks for the waiting!

The code looks good in general to me. I have made some changes to the code, including:

  1. Adding unit tests for getting arrow stream in python: connectorx-python/connectorx/tests/test_arrow.py
  2. Resolve the CI by keeping the old arrow interface for arrow and polars destination.
  3. Avoid clone in generating arrow by wrapping record batch with Option.

I also left a few comments on the API. Can you take a look at my changes and the comments? If everything looks good, we can update the documentation and have a new release!

@kevinbds
Copy link

@wangxiaoying

Getting this error with the new implementation on a PostgreSQL table with array<str> column:

called `Result::unwrap()` on an `Err` value: ConnectorX(NoConversionRule("TextArray(true)", "connectorx::destinations::arrowstream::typesystem::ArrowTypeSystem")

The "arrow" implementation works fine with the same table. Seems like a missing conversion rule for PostgreSQL text arrays.

@wangxiaoying
Copy link
Contributor

@wangxiaoying

Getting this error with the new implementation on a PostgreSQL table with array<str> column:

called `Result::unwrap()` on an `Err` value: ConnectorX(NoConversionRule("TextArray(true)", "connectorx::destinations::arrowstream::typesystem::ArrowTypeSystem")

The "arrow" implementation works fine with the same table. Seems like a missing conversion rule for PostgreSQL text arrays.

You are right @kevinbds. The TextArray conversion should be added to the postgres_arrowstream transport file (similar to postgres_arrow transport file). Before that, we also need to add the Utf8Array type to arrowstream (similar to the same type in arrow).

I think we can have a separate PR for completing the arrowstream types as well as type conversions.

@wangxiaoying wangxiaoying merged commit da319be into sfu-db:main Jul 12, 2025
2 checks passed
@wangxiaoying
Copy link
Contributor

I have merged the PR and released and alpha version 0.4.4-alpha.2 for this, please feel free to try out!

@kevinbds
Copy link

Hi @wangxiaoying,

It seems like version 0.4.4a2 only has the ARM build available, so I can't run tests on this version.
image

Besides that, this issue #819 (comment) will still happen, right?

@wangxiaoying
Copy link
Contributor

Hi @wangxiaoying,

It seems like version 0.4.4a2 only has the ARM build available, so I can't run tests on this version. image

Thanks for the reminder. The uploading was failed since the space limit was reached. I have deleted some old alpha version on PyPI and rerun the upload action. All compiled wheel files should be available on PyPI now.

Besides that, this issue #819 (comment) will still happen, right?

Yes.

@SebZbp
Copy link

SebZbp commented Jul 16, 2025

Have tested this in the context of dlt extraction pipelines as it but I am getting this error:
dlt-hub/dlt#2840 (comment)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Provide interface stream out arrow RecordBatch

4 participants