diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index a91fbd06f..5280310b8 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -101,6 +101,7 @@ jobs: - name: Run tests run: | + git submodule update --init source venv/bin/activate maturin develop --locked RUST_BACKTRACE=1 pytest -v . diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 000000000..a3b1b5157 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,6 @@ +[submodule "testing"] + path = testing + url = https://github.com/apache/arrow-testing.git +[submodule "parquet"] + path = parquet + url = https://github.com/apache/parquet-testing.git diff --git a/Cargo.lock b/Cargo.lock index 41cdd3a2f..7c4558634 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8,6 +8,12 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" +[[package]] +name = "adler32" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aae1277d39aeec15cb388266ecc24b11c80469deae6067e17a1a7aa9e5c1f234" + [[package]] name = "ahash" version = "0.7.6" @@ -56,6 +62,33 @@ dependencies = [ "alloc-no-stdlib", ] +[[package]] +name = "apache-avro" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8cf4144857f9e4d7dd6cc4ba4c78efd2a46bad682b029bd0d91e76a021af1b2a" +dependencies = [ + "byteorder", + "crc32fast", + "digest", + "lazy_static", + "libflate", + "log", + "num-bigint", + "quad-rand", + "rand 0.8.5", + "regex", + "serde", + "serde_json", + "snap", + "strum", + "strum_macros", + "thiserror", + "typed-builder", + "uuid 1.1.2", + "zerocopy", +] + [[package]] name = "arrayref" version = "0.3.6" @@ -329,6 +362,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2aca80caa2b0f7fdf267799b8895ac8b6341ea879db6b1e2d361ec49b47bc676" dependencies = [ "ahash 0.8.0", + "apache-avro", "arrow", "async-trait", "bytes", @@ -345,6 +379,7 @@ dependencies = [ "itertools", "lazy_static", "log", + "num-traits", "num_cpus", "object_store", "ordered-float 3.0.0", @@ -369,6 +404,7 @@ version = "12.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7721fd550f6a28ad7235b62462aa51e9a43b08f8346d5cbe4d61f1e83f5df511" dependencies = [ + "apache-avro", "arrow", "object_store", "ordered-float 3.0.0", @@ -842,6 +878,26 @@ version = "0.2.126" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "349d5a591cd28b49e1d1037471617a32ddcda5731b99419008085f72d5a53836" +[[package]] +name = "libflate" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05605ab2bce11bcfc0e9c635ff29ef8b2ea83f29be257ee7d730cac3ee373093" +dependencies = [ + "adler32", + "crc32fast", + "libflate_lz77", +] + +[[package]] +name = "libflate_lz77" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39a734c0493409afcd49deee13c006a04e3586b9761a03543c6272c9c51f2f5a" +dependencies = [ + "rle-decode-fast", +] + [[package]] name = "libmimalloc-sys" version = "0.1.25" @@ -1252,6 +1308,12 @@ dependencies = [ "syn", ] +[[package]] +name = "quad-rand" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "658fa1faf7a4cc5f057c9ee5ef560f717ad9d8dc66d975267f709624d6e1ab88" + [[package]] name = "quote" version = "1.0.18" @@ -1343,9 +1405,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.5.6" +version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d83f127d94bdbcda4c8cc2e50f6f84f4b611f69c902699ca385a39c3a75f9ff1" +checksum = "4c4eb3267174b8c6c2f654116623910a0fef09c4753f8dd83db29c48a0df988b" dependencies = [ "aho-corasick", "memchr", @@ -1373,6 +1435,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "rle-decode-fast" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3582f63211428f83597b51b2ddb88e2a91a9d52d12831f9d08f5e624e8977422" + [[package]] name = "rustversion" version = "1.0.6" @@ -1408,18 +1476,18 @@ checksum = "0772c5c30e1a0d91f6834f8e545c69281c099dfa9a3ac58d96a9fd629c8d4898" [[package]] name = "serde" -version = "1.0.137" +version = "1.0.145" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61ea8d54c77f8315140a05f4c7237403bf38b72704d031543aa1d16abbf517d1" +checksum = "728eb6351430bccb993660dfffc5a72f91ccc1295abaa8ce19b27ebe4f75568b" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.137" +version = "1.0.145" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1f26faba0c3959972377d3b2d306ee9f71faee9714294e41bb777f83f88578be" +checksum = "81fa1584d3d1bcacd84c277a0dfe21f5b0f6accf4a23d04d4c6d61f1af522b4c" dependencies = [ "proc-macro2", "quote", @@ -1428,9 +1496,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.81" +version = "1.0.86" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b7ce2b32a1aed03c558dc61a5cd328f15aff2dbc17daad8fb8af04d2100e15c" +checksum = "41feea4228a6f1cd09ec7a3593a682276702cd67b5273544757dae23c096f074" dependencies = [ "itoa 1.0.2", "ryu", @@ -1694,6 +1762,17 @@ dependencies = [ "once_cell", ] +[[package]] +name = "typed-builder" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89851716b67b937e393b3daa8423e67ddfc4bbbf1654bcf05488e95e0828db0c" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "typenum" version = "1.15.0" @@ -1767,6 +1846,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dd6469f4314d5f1ffec476e05f17cc9a78bc7a27a6a857842170bdf8d6f98d2f" dependencies = [ "getrandom 0.2.7", + "serde", ] [[package]] @@ -1872,6 +1952,27 @@ version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680" +[[package]] +name = "zerocopy" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "332f188cc1bcf1fe1064b8c58d150f497e697f49774aa846f2dc949d9a25f236" +dependencies = [ + "byteorder", + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6505e6815af7de1746a08f69c69606bb45695a17149517680f3b2149713b19a3" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "zstd" version = "0.11.2+zstd.1.5.2" diff --git a/Cargo.toml b/Cargo.toml index 217ac1c7b..a1b8bf40a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,7 +34,7 @@ default = ["mimalloc"] tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] } rand = "0.7" pyo3 = { version = "~0.17.1", features = ["extension-module", "abi3", "abi3-py37"] } -datafusion = { version = "^12.0.0", features = ["pyarrow"] } +datafusion = { version = "^12.0.0", features = ["pyarrow", "avro"] } datafusion-expr = { version = "^12.0.0" } datafusion-common = { version = "^12.0.0", features = ["pyarrow"] } uuid = { version = "0.8", features = ["v4"] } diff --git a/datafusion/tests/test_context.py b/datafusion/tests/test_context.py index 324bbec6f..19b9d0e31 100644 --- a/datafusion/tests/test_context.py +++ b/datafusion/tests/test_context.py @@ -179,3 +179,18 @@ def test_table_exist(ctx): ctx.register_dataset("t", dataset) assert ctx.table_exist("t") is True + + +def test_read_csv(ctx): + csv_df = ctx.read_csv(path="testing/data/csv/aggregate_test_100.csv") + csv_df.select(column("c1")).show() + + +def test_read_parquet(ctx): + csv_df = ctx.read_parquet(path="parquet/data/alltypes_plain.parquet") + csv_df.show() + + +def test_read_avro(ctx): + csv_df = ctx.read_avro(path="testing/data/avro/alltypes_plain.avro") + csv_df.show() diff --git a/parquet b/parquet new file mode 160000 index 000000000..e13af117d --- /dev/null +++ b/parquet @@ -0,0 +1 @@ +Subproject commit e13af117de7c4f0a4d9908ae3827b3ab119868f3 diff --git a/src/context.rs b/src/context.rs index 25d08ef8e..4b8f93029 100644 --- a/src/context.rs +++ b/src/context.rs @@ -28,7 +28,7 @@ use datafusion::arrow::record_batch::RecordBatch; use datafusion::datasource::datasource::TableProvider; use datafusion::datasource::MemTable; use datafusion::execution::context::{SessionConfig, SessionContext}; -use datafusion::prelude::{CsvReadOptions, ParquetReadOptions}; +use datafusion::prelude::{AvroReadOptions, CsvReadOptions, ParquetReadOptions}; use crate::catalog::{PyCatalog, PyTable}; use crate::dataframe::PyDataFrame; @@ -264,4 +264,99 @@ impl PySessionContext { fn session_id(&self) -> PyResult { Ok(self.ctx.session_id()) } + + #[allow(clippy::too_many_arguments)] + #[args( + schema = "None", + has_header = "true", + delimiter = "\",\"", + schema_infer_max_records = "1000", + file_extension = "\".csv\"", + table_partition_cols = "vec![]" + )] + fn read_csv( + &self, + path: PathBuf, + schema: Option, + has_header: bool, + delimiter: &str, + schema_infer_max_records: usize, + file_extension: &str, + table_partition_cols: Vec, + py: Python, + ) -> PyResult { + let path = path + .to_str() + .ok_or_else(|| PyValueError::new_err("Unable to convert path to a string"))?; + + let delimiter = delimiter.as_bytes(); + if delimiter.len() != 1 { + return Err(PyValueError::new_err( + "Delimiter must be a single character", + )); + }; + + let mut options = CsvReadOptions::new() + .has_header(has_header) + .delimiter(delimiter[0]) + .schema_infer_max_records(schema_infer_max_records) + .file_extension(file_extension) + .table_partition_cols(table_partition_cols); + options.schema = schema.as_ref(); + + let result = self.ctx.read_csv(path, options); + let df = PyDataFrame::new(wait_for_future(py, result).map_err(DataFusionError::from)?); + + Ok(df) + } + + #[allow(clippy::too_many_arguments)] + #[args( + parquet_pruning = "true", + file_extension = "\".parquet\"", + table_partition_cols = "vec![]", + skip_metadata = "true" + )] + fn read_parquet( + &self, + path: &str, + table_partition_cols: Vec, + parquet_pruning: bool, + file_extension: &str, + skip_metadata: bool, + py: Python, + ) -> PyResult { + let mut options = ParquetReadOptions::default() + .table_partition_cols(table_partition_cols) + .parquet_pruning(parquet_pruning) + .skip_metadata(skip_metadata); + options.file_extension = file_extension; + + let result = self.ctx.read_parquet(path, options); + let df = PyDataFrame::new(wait_for_future(py, result).map_err(DataFusionError::from)?); + Ok(df) + } + + #[allow(clippy::too_many_arguments)] + #[args( + schema = "None", + file_extension = "\".avro\"", + table_partition_cols = "vec![]" + )] + fn read_avro( + &self, + path: &str, + schema: Option, + table_partition_cols: Vec, + file_extension: &str, + py: Python, + ) -> PyResult { + let mut options = AvroReadOptions::default().table_partition_cols(table_partition_cols); + options.file_extension = file_extension; + options.schema = schema.map(Arc::new); + + let result = self.ctx.read_avro(path, options); + let df = PyDataFrame::new(wait_for_future(py, result).map_err(DataFusionError::from)?); + Ok(df) + } } diff --git a/testing b/testing new file mode 160000 index 000000000..5bab2f264 --- /dev/null +++ b/testing @@ -0,0 +1 @@ +Subproject commit 5bab2f264a23f5af68f69ea93d24ef1e8e77fc88