Skip to content

ConnectorX 'arrow2' ValueError when running SQL database pipeline #2661

@dat-a-man

Description

@dat-a-man

dlt version

1.11.0

Describe the problem

Running a SQL pipeline using dlt's sql_database source results in the following error due to the use of an unsupported return_type in ConnectorX:
ValueError: arrow2

This is triggered when calling connectorx.read_sql(...).

Expected behavior

No response

Steps to reproduce

To Reproduce

Steps to reproduce the behavior:

  1. Create a SQL source using dlt's sql_database helpers.
  2. Use a pipeline to run extraction.
  3. ConnectorX 0.4.2 is installed.
  4. Run: python3 sql_database_pipeline.py

Operating system

Linux

Runtime environment

Local

Python version

3.12

dlt data source

sql_database

dlt destination

Google BigQuery

Other deployment details

No response

Additional information

Full Stack Trace

(.venv) amanguptanalytics@sandbox-vm:~/16_test_sling/tests/dlthub$ python3 sql_database_pipeline.py 
psutil dependency is not installed and mem stats will not be available. add psutil to your environment or pass dump_system_stats argument as False to disable warning.
/home/amanguptanalytics/16_test_sling/tests/.venv/lib/python3.11/site-packages/google/cloud/bigquery/client.py:595: UserWarning: Cannot create BigQuery Storage client, the dependency google-cloud-bigquery-storage is not installed.
  warnings.warn(
----------------------------- Extract sql_database -----------------------------
Resources: 0/6 (0.0%) | Time: 0.00s | Rate: 0.00/s

----------------------------- Extract sql_database -----------------------------
Resources: 0/6 (0.0%) | Time: 0.03s | Rate: 0.00/s

Traceback (most recent call last):
  File "/usr/lib/python3.11/concurrent/futures/thread.py", line 58, in run
    result = self.fn(*self.args, **self.kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/amanguptanalytics/16_test_sling/tests/.venv/lib/python3.11/site-packages/dlt/extract/utils.py", line 260, in _parallel_gen
    return next(gen)  # type: ignore[call-overload]
           ^^^^^^^^^
  File "/home/amanguptanalytics/16_test_sling/tests/.venv/lib/python3.11/site-packages/dlt/sources/sql_database/helpers.py", line 303, in table_rows
    yield from loader.load_rows(backend_kwargs)
  File "/home/amanguptanalytics/16_test_sling/tests/.venv/lib/python3.11/site-packages/dlt/sources/sql_database/helpers.py", line 176, in load_rows
    yield from self._load_rows_connectorx(query, backend_kwargs)
  File "/home/amanguptanalytics/16_test_sling/tests/.venv/lib/python3.11/site-packages/dlt/sources/sql_database/helpers.py", line 234, in _load_rows_connectorx
    df = cx.read_sql(conn, query_str, **backend_kwargs)
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/amanguptanalytics/16_test_sling/tests/.venv/lib/python3.11/site-packages/connectorx/__init__.py", line 426, in read_sql
    raise ValueError(return_type)
ValueError: arrow2

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/amanguptanalytics/16_test_sling/tests/.venv/lib/python3.11/site-packages/dlt/pipeline/pipeline.py", line 468, in extract
    self._extract_source(
  File "/home/amanguptanalytics/16_test_sling/tests/.venv/lib/python3.11/site-packages/dlt/pipeline/pipeline.py", line 1254, in _extract_source
    load_id = extract.extract(
              ^^^^^^^^^^^^^^^^
  File "/home/amanguptanalytics/16_test_sling/tests/.venv/lib/python3.11/site-packages/dlt/extract/extract.py", line 457, in extract
    self._extract_single_source(
  File "/home/amanguptanalytics/16_test_sling/tests/.venv/lib/python3.11/site-packages/dlt/extract/extract.py", line 380, in _extract_single_source
    for pipe_item in pipes:
  File "/home/amanguptanalytics/16_test_sling/tests/.venv/lib/python3.11/site-packages/dlt/extract/pipe_iterator.py", line 167, in __next__
    pipe_item = self._futures_pool.resolve_next_future(
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/amanguptanalytics/16_test_sling/tests/.venv/lib/python3.11/site-packages/dlt/extract/concurrency.py", line 182, in resolve_next_future
    return self._resolve_future(future)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/amanguptanalytics/16_test_sling/tests/.venv/lib/python3.11/site-packages/dlt/extract/concurrency.py", line 142, in _resolve_future
    raise ResourceExtractionError(pipe.name, future, str(ex), "future") from ex
dlt.extract.exceptions.ResourceExtractionError: In processing pipe nation: extraction of resource nation in future <Future at 0x7f8311801990 state=finished raised ValueError> caused an exception: arrow2

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/amanguptanalytics/16_test_sling/tests/dlthub/sql_database_pipeline.py", line 39, in <module>
    load_entire_database()
  File "/home/amanguptanalytics/16_test_sling/tests/dlthub/sql_database_pipeline.py", line 27, in load_entire_database
    info = pipeline.run(source, write_disposition="replace")
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/amanguptanalytics/16_test_sling/tests/.venv/lib/python3.11/site-packages/dlt/pipeline/pipeline.py", line 222, in _wrap
    step_info = f(self, *args, **kwargs)
                ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/amanguptanalytics/16_test_sling/tests/.venv/lib/python3.11/site-packages/dlt/pipeline/pipeline.py", line 271, in _wrap
    return f(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/amanguptanalytics/16_test_sling/tests/.venv/lib/python3.11/site-packages/dlt/pipeline/pipeline.py", line 716, in run
    self.extract(
  File "/home/amanguptanalytics/16_test_sling/tests/.venv/lib/python3.11/site-packages/dlt/pipeline/pipeline.py", line 222, in _wrap
    step_info = f(self, *args, **kwargs)
                ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/amanguptanalytics/16_test_sling/tests/.venv/lib/python3.11/site-packages/dlt/pipeline/pipeline.py", line 176, in _wrap
    rv = f(self, *args, **kwargs)
         ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/amanguptanalytics/16_test_sling/tests/.venv/lib/python3.11/site-packages/dlt/pipeline/pipeline.py", line 162, in _wrap
    return f(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/amanguptanalytics/16_test_sling/tests/.venv/lib/python3.11/site-packages/dlt/pipeline/pipeline.py", line 271, in _wrap
    return f(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/amanguptanalytics/16_test_sling/tests/.venv/lib/python3.11/site-packages/dlt/pipeline/pipeline.py", line 489, in extract
    raise PipelineStepFailed(
dlt.pipeline.exceptions.PipelineStepFailed: Pipeline execution failed at stage extract when processing package 1747549428.1768405 with exception:

<class 'dlt.extract.exceptions.ResourceExtractionError'>
In processing pipe nation: extraction of resource nation in future <Future at 0x7f8311801990 state=finished raised ValueError> caused an exception: arrow2

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

Projects

Status

Done

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions