diff --git a/.github/workflows/benchmark.yml b/.github/workflows/benchmark.yml index 0dfb0c07c2..d6d2be7749 100644 --- a/.github/workflows/benchmark.yml +++ b/.github/workflows/benchmark.yml @@ -26,7 +26,7 @@ jobs: - name: Install Rust uses: actions-rs/toolchain@v1 with: - toolchain: nightly-2021-11-18 + toolchain: nightly-2022-04-17 components: rustfmt default: true diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 00e078d718..562c00f048 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -69,7 +69,7 @@ jobs: - name: Install Rust uses: actions-rs/toolchain@v1 with: - toolchain: nightly-2021-11-18 + toolchain: nightly-2022-04-17 components: rustfmt default: true @@ -194,10 +194,15 @@ jobs: - name: Install Rust uses: actions-rs/toolchain@v1 with: - toolchain: nightly-2021-11-18 + toolchain: nightly-2022-04-17 components: rustfmt default: true + - uses: actions/setup-java@v3 + with: + distribution: 'temurin' + java-version: '8' + - uses: actions/cache@v2 with: path: | @@ -251,4 +256,6 @@ jobs: SQLITE_URL: "sqlite:///tmp/test.db" MYSQL_URL: "mysql://root:mysql@mysql:3306/mysql" MSSQL_URL: "mssql://sa:mssql!Password@mssql:1433/tempdb" + DB1: "postgresql://postgres:postgres@postgres:5432/postgres" + DB2: "postgresql://postgres:postgres@postgres:5432/postgres" SQLITE3_STATIC: 1 diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml index dc8fe0a587..fd2cdaa8a3 100644 --- a/.github/workflows/docs.yml +++ b/.github/workflows/docs.yml @@ -14,7 +14,7 @@ jobs: - name: Install Rust uses: actions-rs/toolchain@v1 with: - toolchain: nightly-2021-11-18 + toolchain: nightly-2022-04-17 components: rustfmt default: true diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 2e1bfac87c..a4d3364aa7 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -26,7 +26,7 @@ jobs: - uses: actions-rs/toolchain@v1 with: - toolchain: nightly-2021-11-18 + toolchain: nightly-2022-04-17 components: rustfmt target: aarch64-unknown-linux-gnu default: true @@ -88,7 +88,7 @@ jobs: - uses: actions-rs/toolchain@v1 with: - toolchain: nightly-2021-11-18 + toolchain: nightly-2022-04-17 components: rustfmt default: true @@ -135,7 +135,7 @@ jobs: - uses: actions-rs/toolchain@v1 with: - toolchain: nightly-2021-11-18 + toolchain: nightly-2022-04-17 components: rustfmt target: aarch64-apple-darwin default: true diff --git a/.gitignore b/.gitignore index ea8f5bf8a6..609539e151 100644 --- a/.gitignore +++ b/.gitignore @@ -18,3 +18,4 @@ connectorx-python/LICENSE *.db benchmark.json docs/_build +connectorx/examples/test.rs diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000000..e1a78d96e0 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "federated-query"] + path = federated-query + url = git@github.com:sfu-db/federated-query.git diff --git a/Cargo.lock b/Cargo.lock index 06a73a260e..eecce67722 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -37,6 +37,21 @@ dependencies = [ "memchr", ] +[[package]] +name = "alloc-no-stdlib" +version = "2.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35ef4730490ad1c4eae5c4325b2a95f521d023e5c885853ff7aca0a6a1631db3" + +[[package]] +name = "alloc-stdlib" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "697ed7edc0f1711de49ce108c541623a0af97c6c60b2f6e2b65229847ac843c2" +dependencies = [ + "alloc-no-stdlib", +] + [[package]] name = "ansi_term" version = "0.11.0" @@ -52,6 +67,12 @@ version = "1.0.44" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "61604a8f862e1d5c3229fdd78f8b02c68dcf73a4c4b05fd636d12240aaa242c1" +[[package]] +name = "arrayref" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4c527152e37cf757a3f78aae5a06fbeefdb07ccc535c980a3208ee3060dd544" + [[package]] name = "arrayvec" version = "0.4.12" @@ -67,14 +88,21 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b" +[[package]] +name = "arrayvec" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8da52d66c7071e2e3fa2a1e5c6d088fec47b593032b254f5e980de8ea54454d6" + [[package]] name = "arrow" -version = "11.0.0" +version = "12.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db922bcac0aa9f3b6e191d73b0b5df925c2c10ab4893f7416359f289e5d23489" +checksum = "e94e2d315bc11f3d43f38344141453282591788381061fabc06c95e37d0dbc7d" dependencies = [ "bitflags", "chrono", + "comfy-table", "csv", "flatbuffers", "half", @@ -332,6 +360,29 @@ dependencies = [ "wyz", ] +[[package]] +name = "blake2" +version = "0.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9cf849ee05b2ee5fba5e36f97ff8ec2533916700fc0758d40d92136a42f3388" +dependencies = [ + "digest 0.10.3", +] + +[[package]] +name = "blake3" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a08e53fc5a564bb15bfe6fae56bd71522205f1f91893f9c0116edad6496c183f" +dependencies = [ + "arrayref", + "arrayvec 0.7.2", + "cc", + "cfg-if", + "constant_time_eq", + "digest 0.10.3", +] + [[package]] name = "block-buffer" version = "0.9.0" @@ -341,6 +392,36 @@ dependencies = [ "generic-array", ] +[[package]] +name = "block-buffer" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bf7fe51849ea569fd452f37822f606a5cabb684dc918707a0193fd4664ff324" +dependencies = [ + "generic-array", +] + +[[package]] +name = "brotli" +version = "3.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1a0b1dbcc8ae29329621f8d4f0d835787c1c38bb1401979b49d13b0b305ff68" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", + "brotli-decompressor", +] + +[[package]] +name = "brotli-decompressor" +version = "2.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59ad2d4653bf5ca36ae797b1f4bb4dbddb60ce49ca4aed8a2ce4829f60425b80" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", +] + [[package]] name = "bstr" version = "0.2.17" @@ -417,6 +498,15 @@ name = "cc" version = "1.0.70" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d26a6ce4b6a484fa3edb70f7efa6fc430fd2b87285fe8b84304fd0936faa0dc0" +dependencies = [ + "jobserver", +] + +[[package]] +name = "cesu8" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d43a04d8753f35258c91f8ec639f792891f748a1edbd759cf1dcea3382ad83c" [[package]] name = "cexpr" @@ -453,9 +543,9 @@ version = "1.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "10612c0ec0e0a1ff0e97980647cb058a6e7aedb913d01d009c406b8b7d0b26ee" dependencies = [ - "glob", + "glob 0.3.0", "libc", - "libloading", + "libloading 0.7.1", ] [[package]] @@ -524,6 +614,7 @@ dependencies = [ "chrono", "criterion", "csv", + "datafusion", "env_logger 0.9.0", "fallible-streaming-iterator", "fehler", @@ -532,6 +623,7 @@ dependencies = [ "hex", "iai", "itertools", + "j4rs", "log", "native-tls", "ndarray", @@ -554,13 +646,13 @@ dependencies = [ "rusqlite", "rust_decimal", "serde_json", - "sqlparser", + "sqlparser 0.11.0", "thiserror", "tiberius", "tokio", "url", "urlencoding", - "uuid", + "uuid 0.8.2", ] [[package]] @@ -569,6 +661,12 @@ version = "0.4.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f92cfa0fd5690b3cf8c1ef2cabbd9b7ef22fa53cf5e1f92b05103f6d5d1cf6e7" +[[package]] +name = "constant_time_eq" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "245097e9a4535ee1e3e3931fcfcd55a796a44c643e8596ff6566d68f09b87bbc" + [[package]] name = "core-foundation" version = "0.9.1" @@ -717,6 +815,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "crypto-common" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57952ca27b5e3606ff4dd79b0020231aaf9d6aa76dc05fd30137538c50bd3ce8" +dependencies = [ + "generic-array", + "typenum", +] + [[package]] name = "crypto-mac" version = "0.11.1" @@ -758,13 +866,103 @@ dependencies = [ "sct", ] +[[package]] +name = "datafusion" +version = "7.0.0" +source = "git+https://github.com/apache/arrow-datafusion?rev=93a7054b837cec2418adc427a6505dcea92e6755#93a7054b837cec2418adc427a6505dcea92e6755" +dependencies = [ + "ahash", + "arrow", + "async-trait", + "chrono", + "datafusion-common", + "datafusion-data-access", + "datafusion-expr", + "datafusion-physical-expr", + "futures", + "hashbrown 0.12.0", + "lazy_static", + "log", + "num_cpus", + "ordered-float 2.10.0", + "parking_lot 0.12.0", + "parquet", + "paste", + "pin-project-lite", + "rand 0.8.4", + "smallvec", + "sqlparser 0.16.0", + "tempfile", + "tokio", + "tokio-stream", + "uuid 1.0.0", +] + +[[package]] +name = "datafusion-common" +version = "7.0.0" +source = "git+https://github.com/apache/arrow-datafusion?rev=93a7054b837cec2418adc427a6505dcea92e6755#93a7054b837cec2418adc427a6505dcea92e6755" +dependencies = [ + "arrow", + "ordered-float 2.10.0", + "parquet", + "sqlparser 0.16.0", +] + +[[package]] +name = "datafusion-data-access" +version = "1.0.0" +source = "git+https://github.com/apache/arrow-datafusion?rev=93a7054b837cec2418adc427a6505dcea92e6755#93a7054b837cec2418adc427a6505dcea92e6755" +dependencies = [ + "async-trait", + "chrono", + "futures", + "parking_lot 0.12.0", + "tempfile", + "tokio", +] + +[[package]] +name = "datafusion-expr" +version = "7.0.0" +source = "git+https://github.com/apache/arrow-datafusion?rev=93a7054b837cec2418adc427a6505dcea92e6755#93a7054b837cec2418adc427a6505dcea92e6755" +dependencies = [ + "ahash", + "arrow", + "datafusion-common", + "sqlparser 0.16.0", +] + +[[package]] +name = "datafusion-physical-expr" +version = "7.0.0" +source = "git+https://github.com/apache/arrow-datafusion?rev=93a7054b837cec2418adc427a6505dcea92e6755#93a7054b837cec2418adc427a6505dcea92e6755" +dependencies = [ + "ahash", + "arrow", + "blake2", + "blake3", + "chrono", + "datafusion-common", + "datafusion-expr", + "hashbrown 0.12.0", + "lazy_static", + "md-5 0.10.1", + "ordered-float 2.10.0", + "paste", + "rand 0.8.4", + "regex", + "sha2 0.10.2", + "unicode-segmentation", +] + [[package]] name = "debugid" version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f91cf5a8c2f2097e2a32627123508635d47ce10563d999ec1a95addf08b502ba" dependencies = [ - "uuid", + "uuid 0.8.2", ] [[package]] @@ -787,6 +985,26 @@ dependencies = [ "generic-array", ] +[[package]] +name = "digest" +version = "0.10.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2fb860ca6fafa5552fb6d0e816a69c8e49f0908bf524e30a90d97c85892d506" +dependencies = [ + "block-buffer 0.10.2", + "crypto-common", + "subtle", +] + +[[package]] +name = "dirs" +version = "3.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30baa043103c9d0c2a57cf537cc2f35623889dc0d405e6c3cccfadbc81c71309" +dependencies = [ + "dirs-sys", +] + [[package]] name = "dirs" version = "4.0.0" @@ -1095,6 +1313,12 @@ dependencies = [ "syn", ] +[[package]] +name = "fs_extra" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2022715d62ab30faffd124d40b76f4134a550a87792276512b18d63272333394" + [[package]] name = "funty" version = "1.2.0" @@ -1252,6 +1476,12 @@ version = "0.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f0a01e0497841a3b2db4f8afa483cce65f7e96a3498bd6c541734792aeac8fe7" +[[package]] +name = "glob" +version = "0.2.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8be18de09a56b60ed0edf84bc9df007e30040691af7acd1c41874faac5895bfb" + [[package]] name = "glob" version = "0.3.0" @@ -1348,7 +1578,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2a2a2320eb7ec0ebe8da8f744d7812d9fc4cb4d09344ac01898dbcb6a20ae69b" dependencies = [ "crypto-mac", - "digest", + "digest 0.9.0", ] [[package]] @@ -1486,6 +1716,12 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "integer-encoding" +version = "1.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48dc51180a9b377fd75814d0cc02199c20f8e99433d6762f650d39cdbbd3b56f" + [[package]] name = "io-enum" version = "1.0.1" @@ -1519,6 +1755,51 @@ version = "0.4.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b71991ff56294aa922b450139ee08b3bfc70982c6b2c7562771375cf73542dd4" +[[package]] +name = "j4rs" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "de204777df8f1b7ddd414468755d6a7e5919ad62834aca23bf6166ecb557fac9" +dependencies = [ + "cesu8", + "dirs 3.0.2", + "fs_extra", + "java-locator", + "jni-sys", + "lazy_static", + "libc", + "libloading 0.6.7", + "log", + "serde", + "serde_json", + "sha2 0.9.8", +] + +[[package]] +name = "java-locator" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62402993f775e51f6d3bab6d64b38aece6d15cbf37c845bda65005220fa0f9df" +dependencies = [ + "glob 0.2.11", + "lazy_static", +] + +[[package]] +name = "jni-sys" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8eaf4bc02d17cbdd7ff4c7438cafcdf7fb9a4613313ad11b4f8fefe7d3fa0130" + +[[package]] +name = "jobserver" +version = "0.1.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af25a77299a7f711a01975c35a6a424eb6862092cc2d6c72c4ed6cbc56dfc1fa" +dependencies = [ + "libc", +] + [[package]] name = "js-sys" version = "0.3.55" @@ -1642,6 +1923,16 @@ version = "0.2.103" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dd8f7255a17a627354f321ef0055d63b898c6fb27eff628af4d1b66b7331edf6" +[[package]] +name = "libloading" +version = "0.6.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "351a32417a12d5f7e82c368a66781e307834dae04c6ce0cd4456d52989229883" +dependencies = [ + "cfg-if", + "winapi", +] + [[package]] name = "libloading" version = "0.7.1" @@ -1701,6 +1992,26 @@ dependencies = [ "hashbrown 0.11.2", ] +[[package]] +name = "lz4" +version = "1.23.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4edcb94251b1c375c459e5abe9fb0168c1c826c3370172684844f8f3f8d1a885" +dependencies = [ + "libc", + "lz4-sys", +] + +[[package]] +name = "lz4-sys" +version = "1.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7be8908e2ed6f31c02db8a9fa962f03e36c53fbfde437363eae3306b85d7e17" +dependencies = [ + "cc", + "libc", +] + [[package]] name = "matches" version = "0.1.9" @@ -1722,11 +2033,20 @@ version = "0.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7b5a279bb9607f9f53c22d496eade00d138d1bdcccd07d74650387cf94942a15" dependencies = [ - "block-buffer", - "digest", + "block-buffer 0.9.0", + "digest 0.9.0", "opaque-debug", ] +[[package]] +name = "md-5" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "658646b21e0b72f7866c7038ab086d3d5e1cd6271f060fd37defb241949d0582" +dependencies = [ + "digest 0.10.3", +] + [[package]] name = "md5" version = "0.6.1" @@ -1886,12 +2206,12 @@ dependencies = [ "serde", "serde_json", "sha1", - "sha2", + "sha2 0.9.8", "smallvec", "subprocess", "thiserror", "time 0.2.27", - "uuid", + "uuid 0.8.2", ] [[package]] @@ -2177,6 +2497,24 @@ dependencies = [ "paste", ] +[[package]] +name = "ordered-float" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3305af35278dd29f46fcdd139e0b1fbfae2153f0e5928b39b035542dd31e37b7" +dependencies = [ + "num-traits", +] + +[[package]] +name = "ordered-float" +version = "2.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7940cf2ca942593318d07fcf2596cdca60a85c9e7fab408a5e21a4f9dcd40d87" +dependencies = [ + "num-traits", +] + [[package]] name = "owning_ref" version = "0.4.1" @@ -2234,6 +2572,37 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "parquet" +version = "12.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13c5df78854c7d56aed3aba69fb1b342dda41c384963f4ddfa962b690572b42a" +dependencies = [ + "arrow", + "base64", + "brotli", + "byteorder", + "chrono", + "flate2", + "lz4", + "num", + "num-bigint 0.4.2", + "parquet-format", + "rand 0.8.4", + "snap", + "thrift", + "zstd", +] + +[[package]] +name = "parquet-format" +version = "4.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f0c06cdcd5460967c485f9c40a821746f5955ad81990533c7fae95dbd9bc0b5" +dependencies = [ + "thrift", +] + [[package]] name = "paste" version = "1.0.5" @@ -2383,7 +2752,7 @@ dependencies = [ "anyhow", "arrow2", "csv-core", - "dirs", + "dirs 4.0.0", "lazy_static", "lexical 6.0.1", "memchr", @@ -2404,7 +2773,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4eca1fed3b88ae1bb9b7f1d7b2958f1655d9c1aed33495d6ba30ff84a0c1e9e9" dependencies = [ "ahash", - "glob", + "glob 0.3.0", "parking_lot 0.12.0", "polars-arrow", "polars-core", @@ -2486,10 +2855,10 @@ dependencies = [ "bytes", "fallible-iterator", "hmac", - "md-5", + "md-5 0.9.1", "memchr", "rand 0.8.4", - "sha2", + "sha2 0.9.8", "stringprep", ] @@ -2505,7 +2874,7 @@ dependencies = [ "postgres-protocol", "serde", "serde_json", - "uuid", + "uuid 0.8.2", ] [[package]] @@ -3120,13 +3489,24 @@ version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b69f9a4c9740d74c5baa3fd2e547f9525fa8088a8a958e0ca2409a514e33f5fa" dependencies = [ - "block-buffer", + "block-buffer 0.9.0", "cfg-if", "cpufeatures", - "digest", + "digest 0.9.0", "opaque-debug", ] +[[package]] +name = "sha2" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55deaec60f81eefe3cce0dc50bda92d6d8e88f2a27df7c5033b42afeb1ed2676" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest 0.10.3", +] + [[package]] name = "shlex" version = "1.1.0" @@ -3187,6 +3567,12 @@ version = "1.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1ecab6c735a6bb4139c0caafd0cc3635748bbb3acf4550e8138122099251f309" +[[package]] +name = "snap" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "45456094d1983e2ee2a18fdfebce3189fa451699d0502cb8e3b49dba5ba41451" + [[package]] name = "socket2" version = "0.4.2" @@ -3212,6 +3598,15 @@ dependencies = [ "log", ] +[[package]] +name = "sqlparser" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e9a527b68048eb95495a1508f6c8395c8defcff5ecdbe8ad4106d08a2ef2a3c" +dependencies = [ + "log", +] + [[package]] name = "stable_deref_trait" version = "1.2.0" @@ -3360,7 +3755,7 @@ dependencies = [ "debugid", "memmap", "stable_deref_trait", - "uuid", + "uuid 0.8.2", ] [[package]] @@ -3443,6 +3838,28 @@ dependencies = [ "syn", ] +[[package]] +name = "threadpool" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d050e60b33d41c19108b32cea32164033a9013fe3b46cbd4457559bfbf77afaa" +dependencies = [ + "num_cpus", +] + +[[package]] +name = "thrift" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c6d965454947cc7266d22716ebfd07b18d84ebaf35eec558586bbb2a8cb6b5b" +dependencies = [ + "byteorder", + "integer-encoding", + "log", + "ordered-float 1.1.1", + "threadpool", +] + [[package]] name = "tiberius" version = "0.5.16" @@ -3470,7 +3887,7 @@ dependencies = [ "rust_decimal", "thiserror", "tracing", - "uuid", + "uuid 0.8.2", "winauth", ] @@ -3633,6 +4050,17 @@ dependencies = [ "webpki", ] +[[package]] +name = "tokio-stream" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50145484efff8818b5ccd256697f36863f587da82cf8b409c53adf1e840798e3" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-util" version = "0.6.9" @@ -3777,6 +4205,15 @@ dependencies = [ "md5 0.7.0", ] +[[package]] +name = "uuid" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8cfcd319456c4d6ea10087ed423473267e1a071f3bc0aa89f80d60997843c6f0" +dependencies = [ + "getrandom 0.2.3", +] + [[package]] name = "vcpkg" version = "0.2.15" @@ -4061,3 +4498,32 @@ dependencies = [ "tokio", "url", ] + +[[package]] +name = "zstd" +version = "0.11.1+zstd.1.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a16b8414fde0414e90c612eba70985577451c4c504b99885ebed24762cb81a" +dependencies = [ + "zstd-safe", +] + +[[package]] +name = "zstd-safe" +version = "5.0.1+zstd.1.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7c12659121420dd6365c5c3de4901f97145b79651fb1d25814020ed2ed0585ae" +dependencies = [ + "libc", + "zstd-sys", +] + +[[package]] +name = "zstd-sys" +version = "2.0.1+zstd.1.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fd07cbbc53846d9145dbffdf6dd09a7a0aa52be46741825f5c97bdd4f73f12b" +dependencies = [ + "cc", + "libc", +] diff --git a/Justfile b/Justfile index bd28a291bc..7c78734b8d 100644 --- a/Justfile +++ b/Justfile @@ -6,7 +6,7 @@ build-release: build-debug: cargo build -test +ARGS="": +test +ARGS="": cargo test --features all {{ARGS}} -- --nocapture test-feature-gate: @@ -25,6 +25,10 @@ bootstrap-python: cp LICENSE connectorx-python/LICENSE cd connectorx-python && poetry install +setup-java: + cd federated-query/rewriter && mvn package -Dmaven.test.skip=true + cp -f ./federated-query/rewriter/target/federated-rewriter-1.0-SNAPSHOT-jar-with-dependencies.jar connectorx-python/connectorx/dependencies/federated-rewriter.jar + setup-python: cd connectorx-python && poetry run maturin develop --release @@ -34,6 +38,12 @@ test-python +opts="": setup-python test-python-s +opts="": cd connectorx-python && poetry run pytest connectorx/tests -v -s {{opts}} +test-fed file="3.sql": + cd connectorx && cargo run --features src_postgres --features src_mysql --features dst_arrow --features federation --example federated_test "../federated-query/test-queries/{{file}}" + +test-datafusion: + cd connectorx && cargo run --features src_postgres --features src_mysql --features dst_arrow --features federation --example test + seed-db: #!/bin/bash psql $POSTGRES_URL -f scripts/postgres.sql @@ -87,7 +97,34 @@ benchmark-report: setup-python poetry run pytest connectorx/tests/benchmarks.py --benchmark-json ../benchmark.json # releases -ci-build-python-wheel: +build-python-wheel: + # need to get the j4rs dependency first + cd connectorx-python && maturin build --release -i python --no-sdist + # copy files + mkdir -p connectorx-python/connectorx/dependencies/deps + cp -rf connectorx-python/target/release/jassets connectorx-python/connectorx/dependencies + cp -f connectorx-python/target/release/deps/libj4rs*.{dylib,so,dll} connectorx-python/connectorx/dependencies/deps 2>/dev/null || : cp README.md connectorx-python/README.md cp LICENSE connectorx-python/LICENSE - cd connectorx-python && maturin build --release \ No newline at end of file + # build final wheel + cd connectorx-python && maturin build --release -i python --no-sdist + +bench-fed path: + just python-tpch fed --file {{path}}/q2.sql + just python-tpch-ext fed --file {{path}}/q3.sql + just python-tpch-ext fed --file {{path}}/q4.sql + just python-tpch-ext fed --file {{path}}/q5.sql + just python-tpch-ext fed --file {{path}}/q7.sql + just python-tpch-ext fed --file {{path}}/q8.sql + just python-tpch-ext fed --file {{path}}/q9.sql + just python-tpch-ext fed --file {{path}}/q10.sql + just python-tpch-ext fed --file {{path}}/q11.sql + just python-tpch-ext fed --file {{path}}/q12.sql + just python-tpch-ext fed --file {{path}}/q13.sql + just python-tpch-ext fed --file {{path}}/q14.sql + just python-tpch-ext fed --file {{path}}/q16.sql + just python-tpch-ext fed --file {{path}}/q17.sql + just python-tpch-ext fed --file {{path}}/q18.sql + just python-tpch-ext fed --file {{path}}/q19.sql + just python-tpch-ext fed --file {{path}}/q20.sql + just python-tpch-ext fed --file {{path}}/q22.sql diff --git a/benchmarks/.ipynb_checkpoints/tpch-cx-checkpoint.py b/benchmarks/.ipynb_checkpoints/tpch-cx-checkpoint.py deleted file mode 100644 index d9f3b8bc6a..0000000000 --- a/benchmarks/.ipynb_checkpoints/tpch-cx-checkpoint.py +++ /dev/null @@ -1,65 +0,0 @@ -""" -Usage: - tpch-cx.py [--protocol=] [--conn=] [--ret=] - -Options: - --protocol= The protocol to use [default: binary]. - --conn= The connection url to use [default: POSTGRES_URL]. - --ret= The return type [default: pandas]. - -h --help Show this screen. - --version Show version. -""" -import os - -import connectorx as cx -from contexttimer import Timer -from docopt import docopt -import pandas as pd -import modin.pandas as mpd -import dask.dataframe as dd -import polars as pl -import pyarrow as pa - - -def describe(df): - if isinstance(df, pd.DataFrame): - print(df.head()) - elif isinstance(df, mpd.DataFrame): - print(df.head()) - elif isinstance(df, pl.DataFrame): - print(df.head()) - elif isinstance(df, dd.DataFrame): - print(df.head()) - elif isinstance(df, pa.Table): - print(df.slice(0, 10).to_pandas()) - else: - raise ValueError("unknown type") - - -if __name__ == "__main__": - args = docopt(__doc__, version="Naval Fate 2.0") - conn = os.environ[args["--conn"]] - table = os.environ["TPCH_TABLE"] - part_num = int(args[""]) - - with Timer() as timer: - if part_num > 1: - df = cx.read_sql( - conn, - f"""SELECT * FROM {table}""", - partition_on="L_ORDERKEY", - partition_num=int(args[""]), - protocol=args["--protocol"], - return_type=args["--ret"], - ) - else: - df = cx.read_sql( - conn, - f"""SELECT * FROM {table}""", - protocol=args["--protocol"], - return_type=args["--ret"], - ) - print("time in total:", timer.elapsed) - - print(type(df), len(df)) - describe(df) diff --git a/benchmarks/tpch-fed.py b/benchmarks/tpch-fed.py new file mode 100644 index 0000000000..f79a83370f --- /dev/null +++ b/benchmarks/tpch-fed.py @@ -0,0 +1,36 @@ +""" +Usage: + tpch-fed.py --file= + +Options: + --file= Query file. + -h --help Show this screen. + --version Show version. +""" +import os + +import connectorx as cx +from contexttimer import Timer +from docopt import docopt +import pandas as pd + + +if __name__ == "__main__": + args = docopt(__doc__, version="Naval Fate 2.0") + query_file = args["--file"] + + db_map = { + "db1": os.environ["DB1"], + "db2": os.environ["DB2"], + } + print(f"dbs: {db_map}") + + with open(query_file, "r") as f: + sql = f.read() + print(f"file: {query_file}") + + with Timer() as timer: + df = cx.read_sql(db_map, sql, return_type="pandas") + print("time in total:", timer.elapsed) + + print(df) diff --git a/connectorx-python/Cargo.lock b/connectorx-python/Cargo.lock index 32878077fd..b41b6f8feb 100644 --- a/connectorx-python/Cargo.lock +++ b/connectorx-python/Cargo.lock @@ -37,6 +37,21 @@ dependencies = [ "memchr", ] +[[package]] +name = "alloc-no-stdlib" +version = "2.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35ef4730490ad1c4eae5c4325b2a95f521d023e5c885853ff7aca0a6a1631db3" + +[[package]] +name = "alloc-stdlib" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "697ed7edc0f1711de49ce108c541623a0af97c6c60b2f6e2b65229847ac843c2" +dependencies = [ + "alloc-no-stdlib", +] + [[package]] name = "ansi_term" version = "0.12.1" @@ -52,6 +67,12 @@ version = "1.0.55" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "159bb86af3a200e19a068f4224eae4c8bb2d0fa054c7e5d1cacd5cef95e684cd" +[[package]] +name = "arrayref" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4c527152e37cf757a3f78aae5a06fbeefdb07ccc535c980a3208ee3060dd544" + [[package]] name = "arrayvec" version = "0.4.12" @@ -75,12 +96,13 @@ checksum = "8da52d66c7071e2e3fa2a1e5c6d088fec47b593032b254f5e980de8ea54454d6" [[package]] name = "arrow" -version = "11.0.0" +version = "12.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db922bcac0aa9f3b6e191d73b0b5df925c2c10ab4893f7416359f289e5d23489" +checksum = "e94e2d315bc11f3d43f38344141453282591788381061fabc06c95e37d0dbc7d" dependencies = [ "bitflags", "chrono", + "comfy-table", "csv", "flatbuffers", "half", @@ -344,6 +366,29 @@ dependencies = [ "wyz", ] +[[package]] +name = "blake2" +version = "0.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9cf849ee05b2ee5fba5e36f97ff8ec2533916700fc0758d40d92136a42f3388" +dependencies = [ + "digest 0.10.3", +] + +[[package]] +name = "blake3" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a08e53fc5a564bb15bfe6fae56bd71522205f1f91893f9c0116edad6496c183f" +dependencies = [ + "arrayref", + "arrayvec 0.7.2", + "cc", + "cfg-if 1.0.0", + "constant_time_eq", + "digest 0.10.3", +] + [[package]] name = "block-buffer" version = "0.9.0" @@ -362,6 +407,27 @@ dependencies = [ "generic-array", ] +[[package]] +name = "brotli" +version = "3.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1a0b1dbcc8ae29329621f8d4f0d835787c1c38bb1401979b49d13b0b305ff68" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", + "brotli-decompressor", +] + +[[package]] +name = "brotli-decompressor" +version = "2.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59ad2d4653bf5ca36ae797b1f4bb4dbddb60ce49ca4aed8a2ce4829f60425b80" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", +] + [[package]] name = "bstr" version = "0.2.17" @@ -460,6 +526,15 @@ name = "cc" version = "1.0.73" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2fff2a6927b3bb87f9595d67196a70493f627687a71d87a0d692242c33f58c11" +dependencies = [ + "jobserver", +] + +[[package]] +name = "cesu8" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d43a04d8753f35258c91f8ec639f792891f748a1edbd759cf1dcea3382ad83c" [[package]] name = "cexpr" @@ -502,9 +577,9 @@ version = "1.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4cc00842eed744b858222c4c9faf7243aafc6d33f92f96935263ef4d8a41ce21" dependencies = [ - "glob", + "glob 0.3.0", "libc", - "libloading", + "libloading 0.7.3", ] [[package]] @@ -573,12 +648,14 @@ dependencies = [ "bb8-tiberius", "chrono", "csv", + "datafusion", "fallible-streaming-iterator", "fehler", "futures", "gcp-bigquery-client", "hex", "itertools", + "j4rs", "log", "native-tls", "num-traits", @@ -598,13 +675,13 @@ dependencies = [ "rusqlite", "rust_decimal", "serde_json", - "sqlparser", + "sqlparser 0.11.0", "thiserror", "tiberius", "tokio", "url", "urlencoding", - "uuid", + "uuid 0.8.2", ] [[package]] @@ -646,14 +723,14 @@ dependencies = [ "rust_decimal", "rust_decimal_macros", "serde_json", - "sqlparser", + "sqlparser 0.11.0", "thiserror", "tiberius", "tokio", "tokio-util", "url", "urlencoding", - "uuid", + "uuid 0.8.2", ] [[package]] @@ -662,6 +739,12 @@ version = "0.4.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fbdcdcb6d86f71c5e97409ad45898af11cbc995b4ee8112d59095a28d376c935" +[[package]] +name = "constant_time_eq" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "245097e9a4535ee1e3e3931fcfcd55a796a44c643e8596ff6566d68f09b87bbc" + [[package]] name = "core-foundation" version = "0.9.3" @@ -861,13 +944,103 @@ dependencies = [ "sct 0.6.1", ] +[[package]] +name = "datafusion" +version = "7.0.0" +source = "git+https://github.com/apache/arrow-datafusion?rev=93a7054b837cec2418adc427a6505dcea92e6755#93a7054b837cec2418adc427a6505dcea92e6755" +dependencies = [ + "ahash", + "arrow", + "async-trait", + "chrono", + "datafusion-common", + "datafusion-data-access", + "datafusion-expr", + "datafusion-physical-expr", + "futures", + "hashbrown 0.12.0", + "lazy_static", + "log", + "num_cpus", + "ordered-float 2.10.0", + "parking_lot 0.12.0", + "parquet", + "paste 1.0.6", + "pin-project-lite", + "rand 0.8.5", + "smallvec", + "sqlparser 0.16.0", + "tempfile", + "tokio", + "tokio-stream", + "uuid 1.0.0", +] + +[[package]] +name = "datafusion-common" +version = "7.0.0" +source = "git+https://github.com/apache/arrow-datafusion?rev=93a7054b837cec2418adc427a6505dcea92e6755#93a7054b837cec2418adc427a6505dcea92e6755" +dependencies = [ + "arrow", + "ordered-float 2.10.0", + "parquet", + "sqlparser 0.16.0", +] + +[[package]] +name = "datafusion-data-access" +version = "1.0.0" +source = "git+https://github.com/apache/arrow-datafusion?rev=93a7054b837cec2418adc427a6505dcea92e6755#93a7054b837cec2418adc427a6505dcea92e6755" +dependencies = [ + "async-trait", + "chrono", + "futures", + "parking_lot 0.12.0", + "tempfile", + "tokio", +] + +[[package]] +name = "datafusion-expr" +version = "7.0.0" +source = "git+https://github.com/apache/arrow-datafusion?rev=93a7054b837cec2418adc427a6505dcea92e6755#93a7054b837cec2418adc427a6505dcea92e6755" +dependencies = [ + "ahash", + "arrow", + "datafusion-common", + "sqlparser 0.16.0", +] + +[[package]] +name = "datafusion-physical-expr" +version = "7.0.0" +source = "git+https://github.com/apache/arrow-datafusion?rev=93a7054b837cec2418adc427a6505dcea92e6755#93a7054b837cec2418adc427a6505dcea92e6755" +dependencies = [ + "ahash", + "arrow", + "blake2", + "blake3", + "chrono", + "datafusion-common", + "datafusion-expr", + "hashbrown 0.12.0", + "lazy_static", + "md-5", + "ordered-float 2.10.0", + "paste 1.0.6", + "rand 0.8.5", + "regex", + "sha2 0.10.2", + "unicode-segmentation", +] + [[package]] name = "debugid" version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6ee87af31d84ef885378aebca32be3d682b0e0dc119d5b4860a2c5bb5046730" dependencies = [ - "uuid", + "uuid 0.8.2", ] [[package]] @@ -912,6 +1085,15 @@ dependencies = [ "subtle", ] +[[package]] +name = "dirs" +version = "3.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30baa043103c9d0c2a57cf537cc2f35623889dc0d405e6c3cccfadbc81c71309" +dependencies = [ + "dirs-sys", +] + [[package]] name = "dirs" version = "4.0.0" @@ -1235,6 +1417,12 @@ dependencies = [ "syn", ] +[[package]] +name = "fs_extra" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2022715d62ab30faffd124d40b76f4134a550a87792276512b18d63272333394" + [[package]] name = "funty" version = "1.2.0" @@ -1387,6 +1575,12 @@ version = "0.26.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78cc372d058dcf6d5ecd98510e7fbc9e5aec4d21de70f65fea8fecebcd881bd4" +[[package]] +name = "glob" +version = "0.2.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8be18de09a56b60ed0edf84bc9df007e30040691af7acd1c41874faac5895bfb" + [[package]] name = "glob" version = "0.3.0" @@ -1656,6 +1850,12 @@ dependencies = [ "cfg-if 1.0.0", ] +[[package]] +name = "integer-encoding" +version = "1.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48dc51180a9b377fd75814d0cc02199c20f8e99433d6762f650d39cdbbd3b56f" + [[package]] name = "io-enum" version = "1.0.1" @@ -1695,6 +1895,51 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1aab8fc367588b89dcee83ab0fd66b72b50b72fa1904d7095045ace2b0c81c35" +[[package]] +name = "j4rs" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "de204777df8f1b7ddd414468755d6a7e5919ad62834aca23bf6166ecb557fac9" +dependencies = [ + "cesu8", + "dirs 3.0.2", + "fs_extra", + "java-locator", + "jni-sys", + "lazy_static", + "libc", + "libloading 0.6.7", + "log", + "serde", + "serde_json", + "sha2 0.9.9", +] + +[[package]] +name = "java-locator" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62402993f775e51f6d3bab6d64b38aece6d15cbf37c845bda65005220fa0f9df" +dependencies = [ + "glob 0.2.11", + "lazy_static", +] + +[[package]] +name = "jni-sys" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8eaf4bc02d17cbdd7ff4c7438cafcdf7fb9a4613313ad11b4f8fefe7d3fa0130" + +[[package]] +name = "jobserver" +version = "0.1.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af25a77299a7f711a01975c35a6a424eb6862092cc2d6c72c4ed6cbc56dfc1fa" +dependencies = [ + "libc", +] + [[package]] name = "js-sys" version = "0.3.56" @@ -1818,6 +2063,16 @@ version = "0.2.119" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1bf2e165bb3457c8e098ea76f3e3bc9db55f87aa90d52d0e6be741470916aaa4" +[[package]] +name = "libloading" +version = "0.6.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "351a32417a12d5f7e82c368a66781e307834dae04c6ce0cd4456d52989229883" +dependencies = [ + "cfg-if 1.0.0", + "winapi", +] + [[package]] name = "libloading" version = "0.7.3" @@ -1877,6 +2132,26 @@ dependencies = [ "hashbrown 0.11.2", ] +[[package]] +name = "lz4" +version = "1.23.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4edcb94251b1c375c459e5abe9fb0168c1c826c3370172684844f8f3f8d1a885" +dependencies = [ + "libc", + "lz4-sys", +] + +[[package]] +name = "lz4-sys" +version = "1.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7be8908e2ed6f31c02db8a9fa962f03e36c53fbfde437363eae3306b85d7e17" +dependencies = [ + "cc", + "libc", +] + [[package]] name = "matches" version = "0.1.9" @@ -2074,7 +2349,7 @@ dependencies = [ "subprocess", "thiserror", "time 0.2.27", - "uuid", + "uuid 0.8.2", ] [[package]] @@ -2384,6 +2659,24 @@ dependencies = [ "paste 1.0.6", ] +[[package]] +name = "ordered-float" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3305af35278dd29f46fcdd139e0b1fbfae2153f0e5928b39b035542dd31e37b7" +dependencies = [ + "num-traits", +] + +[[package]] +name = "ordered-float" +version = "2.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7940cf2ca942593318d07fcf2596cdca60a85c9e7fab408a5e21a4f9dcd40d87" +dependencies = [ + "num-traits", +] + [[package]] name = "owning_ref" version = "0.4.1" @@ -2441,6 +2734,37 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "parquet" +version = "12.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13c5df78854c7d56aed3aba69fb1b342dda41c384963f4ddfa962b690572b42a" +dependencies = [ + "arrow", + "base64", + "brotli", + "byteorder", + "chrono", + "flate2", + "lz4", + "num", + "num-bigint 0.4.3", + "parquet-format", + "rand 0.8.5", + "snap", + "thrift", + "zstd", +] + +[[package]] +name = "parquet-format" +version = "4.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f0c06cdcd5460967c485f9c40a821746f5955ad81990533c7fae95dbd9bc0b5" +dependencies = [ + "thrift", +] + [[package]] name = "paste" version = "0.1.18" @@ -2619,7 +2943,7 @@ dependencies = [ "anyhow", "arrow2", "csv-core", - "dirs", + "dirs 4.0.0", "lazy_static", "lexical 6.0.1", "memchr", @@ -2640,7 +2964,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4eca1fed3b88ae1bb9b7f1d7b2958f1655d9c1aed33495d6ba30ff84a0c1e9e9" dependencies = [ "ahash", - "glob", + "glob 0.3.0", "parking_lot 0.12.0", "polars-arrow", "polars-core", @@ -2741,7 +3065,7 @@ dependencies = [ "postgres-protocol", "serde", "serde_json", - "uuid", + "uuid 0.8.2", ] [[package]] @@ -3582,6 +3906,12 @@ version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f2dd574626839106c320a323308629dcb1acfc96e32a8cba364ddc61ac23ee83" +[[package]] +name = "snap" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "45456094d1983e2ee2a18fdfebce3189fa451699d0502cb8e3b49dba5ba41451" + [[package]] name = "socket2" version = "0.4.4" @@ -3607,6 +3937,15 @@ dependencies = [ "log", ] +[[package]] +name = "sqlparser" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e9a527b68048eb95495a1508f6c8395c8defcff5ecdbe8ad4106d08a2ef2a3c" +dependencies = [ + "log", +] + [[package]] name = "stable_deref_trait" version = "1.2.0" @@ -3755,7 +4094,7 @@ dependencies = [ "debugid", "memmap2", "stable_deref_trait", - "uuid", + "uuid 0.8.2", ] [[package]] @@ -3838,6 +4177,28 @@ dependencies = [ "syn", ] +[[package]] +name = "threadpool" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d050e60b33d41c19108b32cea32164033a9013fe3b46cbd4457559bfbf77afaa" +dependencies = [ + "num_cpus", +] + +[[package]] +name = "thrift" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c6d965454947cc7266d22716ebfd07b18d84ebaf35eec558586bbb2a8cb6b5b" +dependencies = [ + "byteorder", + "integer-encoding", + "log", + "ordered-float 1.1.1", + "threadpool", +] + [[package]] name = "tiberius" version = "0.5.16" @@ -3865,7 +4226,7 @@ dependencies = [ "rust_decimal", "thiserror", "tracing", - "uuid", + "uuid 0.8.2", "winauth", ] @@ -4039,6 +4400,17 @@ dependencies = [ "webpki 0.22.0", ] +[[package]] +name = "tokio-stream" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50145484efff8818b5ccd256697f36863f587da82cf8b409c53adf1e840798e3" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-util" version = "0.6.9" @@ -4198,6 +4570,15 @@ dependencies = [ "md5 0.7.0", ] +[[package]] +name = "uuid" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8cfcd319456c4d6ea10087ed423473267e1a071f3bc0aa89f80d60997843c6f0" +dependencies = [ + "getrandom 0.2.5", +] + [[package]] name = "vcpkg" version = "0.2.15" @@ -4503,3 +4884,32 @@ dependencies = [ "tokio", "url", ] + +[[package]] +name = "zstd" +version = "0.11.1+zstd.1.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a16b8414fde0414e90c612eba70985577451c4c504b99885ebed24762cb81a" +dependencies = [ + "zstd-safe", +] + +[[package]] +name = "zstd-safe" +version = "5.0.1+zstd.1.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7c12659121420dd6365c5c3de4901f97145b79651fb1d25814020ed2ed0585ae" +dependencies = [ + "libc", + "zstd-sys", +] + +[[package]] +name = "zstd-sys" +version = "2.0.1+zstd.1.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fd07cbbc53846d9145dbffdf6dd09a7a0aa52be46741825f5c97bdd4f73f12b" +dependencies = [ + "cc", + "libc", +] diff --git a/connectorx-python/Cargo.toml b/connectorx-python/Cargo.toml index 56f9c31193..184884bf78 100644 --- a/connectorx-python/Cargo.toml +++ b/connectorx-python/Cargo.toml @@ -11,7 +11,7 @@ version = "0.2.6-alpha.5" [dependencies] anyhow = "1" -arrow = "11.0.0" +arrow = "12" arrow2 = {version = "0.10", default-features = false} bitfield = "0.13" bytes = "1" @@ -64,11 +64,12 @@ name = "connectorx" [features] branch = ["connectorx/branch"] -default = ["extension", "fptr", "nbstr", "dsts", "srcs"] +default = ["extension", "fptr", "nbstr", "dsts", "srcs", "federation"] dsts = ["connectorx/dst_arrow", "connectorx/dst_arrow2"] executable = ["pyo3/auto-initialize"] extension = ["pyo3/extension-module"] fptr = ["connectorx/fptr"] +federation = ["connectorx/federation"] nbstr = [] srcs = [ "connectorx/src_postgres", diff --git a/connectorx-python/connectorx/__init__.py b/connectorx-python/connectorx/__init__.py index fd46a1ec58..15fdb2f7cc 100644 --- a/connectorx-python/connectorx/__init__.py +++ b/connectorx-python/connectorx/__init__.py @@ -3,6 +3,7 @@ from .connectorx import ( read_sql as _read_sql, partition_sql as _partition_sql, + read_sql2 as _read_sql2, get_meta as _get_meta, ) @@ -18,6 +19,22 @@ except: pass +import os + +dir_path = os.path.dirname(os.path.realpath(__file__)) +# check whether it is in development env or installed +if ( + not os.path.basename(os.path.abspath(os.path.join(dir_path, ".."))) + == "connectorx-python" +): + print("haha", dir_path, "lala", os.path.basename(os.path.join(dir_path, ".."))) + if "J4RS_BASE_PATH" not in os.environ: + os.environ["J4RS_BASE_PATH"] = os.path.join(dir_path, "dependencies") +if "CX_REWRITER_PATH" not in os.environ: + os.environ["CX_REWRITER_PATH"] = os.path.join( + dir_path, "dependencies/federated-rewriter.jar" + ) + def rewrite_conn(conn: str, protocol: Optional[str] = None): if not protocol: @@ -94,7 +111,7 @@ def partition_sql( def read_sql( - conn: str, + conn: Union[str, Dict[str, str]], query: Union[List[str], str], *, return_type: str = "pandas", @@ -110,7 +127,7 @@ def read_sql( Parameters ========== conn - the connection string. + the connection string, or dict of connection string mapping for federated query. query a SQL query or a list of SQL queries. return_type @@ -148,10 +165,33 @@ def read_sql( >>> read_sql(postgres_url, queries) """ - if isinstance(query, list) and len(query) == 1: query = query[0] + if isinstance(conn, dict): + assert partition_on is None and isinstance( + query, str + ), "Federated query does not support query partitioning for now" + assert ( + protocol is None + ), "Federated query does not support specifying protocol for now" + result = _read_sql2(query, conn) + df = reconstruct_arrow(result) + if return_type == "pandas": + df = df.to_pandas(date_as_object=False, split_blocks=False) + if return_type == "polars": + try: + import polars as pl + except ModuleNotFoundError: + raise ValueError("You need to install polars first") + + try: + df = pl.DataFrame.from_arrow(df) + except AttributeError: + # api change for polars >= 0.8.* + df = pl.from_arrow(df) + return df + if isinstance(query, str): if partition_on is None: queries = [query] diff --git a/connectorx-python/connectorx/dependencies/federated-rewriter.jar b/connectorx-python/connectorx/dependencies/federated-rewriter.jar new file mode 100644 index 0000000000..821cbf45bf Binary files /dev/null and b/connectorx-python/connectorx/dependencies/federated-rewriter.jar differ diff --git a/connectorx-python/connectorx/tests/test_arrow.py b/connectorx-python/connectorx/tests/test_arrow.py index 043c595682..0c4435fb6d 100644 --- a/connectorx-python/connectorx/tests/test_arrow.py +++ b/connectorx-python/connectorx/tests/test_arrow.py @@ -27,7 +27,7 @@ def test_arrow(postgres_url: str) -> None: expected = pd.DataFrame( index=range(6), data={ - "test_int": pd.Series([0, 1, 2, 3, 4, 1314], dtype="int32"), + "test_int": pd.Series([0, 1, 2, 3, 4, 1314], dtype="int64"), "test_nullint": pd.Series([5, 3, None, 7, 9, 2], dtype="float64"), "test_str": pd.Series( ["a", "str1", "str2", "b", "c", None], dtype="object" diff --git a/connectorx-python/connectorx/tests/test_federation.py b/connectorx-python/connectorx/tests/test_federation.py new file mode 100644 index 0000000000..36203d78cb --- /dev/null +++ b/connectorx-python/connectorx/tests/test_federation.py @@ -0,0 +1,59 @@ +import os + +import pandas as pd +import pytest +from pandas.testing import assert_frame_equal + +from .. import read_sql + + +@pytest.fixture(scope="module") # type: ignore +def db1_url() -> str: + conn = os.environ["DB1"] + return conn + + +@pytest.fixture(scope="module") # type: ignore +def db2_url() -> str: + conn = os.environ["DB2"] + return conn + + +@pytest.mark.skipif( + not (os.environ.get("DB1") and os.environ.get("DB2")), + reason="Do not test federated queries is set unless both `DB1` and `DB2` are set", +) +def test_fed_spj(db1_url: str, db2_url: str) -> None: + query = "SELECT T.test_int, T.test_bool, S.test_language FROM db1.test_table T INNER JOIN db2.test_str S ON T.test_int = S.id" + df = read_sql({"db1": db1_url, "db2": db2_url}, query) + expected = pd.DataFrame( + index=range(5), + data={ + "TEST_INT": pd.Series([0, 1, 2, 3, 4], dtype="int64"), + "TEST_BOOL": pd.Series([None, True, False, False, None], dtype="object"), + "TEST_LANGUAGE": pd.Series( + ["English", "中文", "日本語", "русский", "Emoji"], dtype="object" + ), + }, + ) + df.sort_values(by="TEST_INT", inplace=True, ignore_index=True) + assert_frame_equal(df, expected, check_names=True) + + +@pytest.mark.skipif( + not (os.environ.get("DB1") and os.environ.get("DB2")), + reason="Do not test federated queries is set unless both `DB1` and `DB2` are set", +) +def test_fed_spja(db1_url: str, db2_url: str) -> None: + query = "select test_bool, AVG(test_float) as avg_float, SUM(test_int) as sum_int from db1.test_table as a, db2.test_str as b where a.test_int = b.id AND test_nullint is not NULL GROUP BY test_bool ORDER BY sum_int" + df = read_sql({"db1": db1_url, "db2": db2_url}, query) + expected = pd.DataFrame( + index=range(3), + data={ + "test_bool": pd.Series([True, False, None], dtype="object"), + "AVG_FLOAT": pd.Series([None, 3, 5.45], dtype="float64"), + "SUM_INT": pd.Series([1, 3, 4], dtype="int64"), + }, + ) + df.sort_values(by="SUM_INT", inplace=True, ignore_index=True) + assert_frame_equal(df, expected, check_names=True) diff --git a/connectorx-python/pyproject.toml b/connectorx-python/pyproject.toml index 17a11516b6..05c4b98a65 100644 --- a/connectorx-python/pyproject.toml +++ b/connectorx-python/pyproject.toml @@ -13,7 +13,6 @@ classifiers = [ "Framework :: IPython", ] description = "Load data from databases to dataframes, the fastest way." -include = ["connectorx/*.so", "connectorx/*.pyd", "LICENSE"] keywords = ["read_sql"] license = "MIT" maintainers = ["Weiyuan Wu "] @@ -69,3 +68,6 @@ python_functions = "test_* bench_*" [build-system] build-backend = "maturin" requires = ["maturin>=0.12,<0.13"] + +[tool.maturin] +sdist-include = ["connectorx/*.so", "connectorx/*.pyd", "connectorx/dependencies/", "LICENSE"] \ No newline at end of file diff --git a/connectorx-python/src/arrow.rs b/connectorx-python/src/arrow.rs index 8403278c67..ba0cf82215 100644 --- a/connectorx-python/src/arrow.rs +++ b/connectorx-python/src/arrow.rs @@ -216,7 +216,7 @@ pub fn write_arrow<'a>( obj.into_ref(py) } -fn to_ptrs(rbs: Vec) -> (Vec, Vec>) { +pub fn to_ptrs(rbs: Vec) -> (Vec, Vec>) { if rbs.is_empty() { return (vec![], vec![]); } diff --git a/connectorx-python/src/constants.rs b/connectorx-python/src/constants.rs index 4043afe054..4ca08ecc14 100644 --- a/connectorx-python/src/constants.rs +++ b/connectorx-python/src/constants.rs @@ -1,2 +1,7 @@ // PyString buffer size in MB pub const PYSTRING_BUFFER_SIZE: usize = 4; + +#[cfg(not(debug_assertions))] +pub const J4RS_BASE_PATH: &str = "./target/release"; +#[cfg(debug_assertions)] +pub const J4RS_BASE_PATH: &str = "./target/debug"; \ No newline at end of file diff --git a/connectorx-python/src/lib.rs b/connectorx-python/src/lib.rs index 791275af9d..9e9dc52ff3 100644 --- a/connectorx-python/src/lib.rs +++ b/connectorx-python/src/lib.rs @@ -1,4 +1,5 @@ #![feature(generic_associated_types)] +#![feature(fmt_internals)] #![allow(incomplete_features)] pub mod arrow; @@ -9,9 +10,13 @@ pub mod pandas; pub mod read_sql; mod source_router; +use crate::constants::J4RS_BASE_PATH; +use connectorx::fed_dispatcher::run; use pyo3::prelude::*; use pyo3::{wrap_pyfunction, PyResult}; +use std::collections::HashMap; use std::convert::TryFrom; +use std::env; use std::sync::Once; #[macro_use] @@ -32,6 +37,7 @@ fn connectorx(_: Python, m: &PyModule) -> PyResult<()> { }); m.add_wrapped(wrap_pyfunction!(read_sql))?; + m.add_wrapped(wrap_pyfunction!(read_sql2))?; m.add_wrapped(wrap_pyfunction!(partition_sql))?; m.add_wrapped(wrap_pyfunction!(get_meta))?; m.add_class::()?; @@ -60,6 +66,27 @@ pub fn partition_sql( Ok(queries.into_iter().map(|q| q.to_string()).collect()) } +#[pyfunction] +pub fn read_sql2<'a>( + py: Python<'a>, + sql: &str, + db_map: HashMap, +) -> PyResult<&'a PyAny> { + let rbs = run( + sql.to_string(), + db_map, + Some( + env::var("J4RS_BASE_PATH") + .unwrap_or(J4RS_BASE_PATH.to_string()) + .as_str(), + ), + ) + .unwrap(); + let ptrs = arrow::to_ptrs(rbs); + let obj: PyObject = ptrs.into_py(py); + Ok(obj.into_ref(py)) +} + #[pyfunction] pub fn get_meta<'a>( py: Python<'a>, diff --git a/connectorx-python/src/pandas/destination.rs b/connectorx-python/src/pandas/destination.rs index fb78f713a0..957c9fe3ea 100644 --- a/connectorx-python/src/pandas/destination.rs +++ b/connectorx-python/src/pandas/destination.rs @@ -123,10 +123,7 @@ impl<'a> PandasDestination<'a> { impl<'a> Destination for PandasDestination<'a> { const DATA_ORDERS: &'static [DataOrder] = &[DataOrder::RowMajor]; type TypeSystem = PandasTypeSystem; - type Partition<'b> - where - 'a: 'b, - = PandasPartitionDestination<'b>; + type Partition<'b> = PandasPartitionDestination<'b> where 'a: 'b; type Error = ConnectorXPythonError; fn needs_count(&self) -> bool { diff --git a/connectorx/Cargo.toml b/connectorx/Cargo.toml index ea5474546f..9a3a1a0a16 100644 --- a/connectorx/Cargo.toml +++ b/connectorx/Cargo.toml @@ -18,7 +18,7 @@ rayon = "1" sqlparser = "0.11" thiserror = "1" -arrow = {version = "11.0.0", optional = true} +arrow = {version = "12", optional = true, features = ["prettyprint"]} arrow2 = {version = "0.10", default-features = false, optional = true} bb8 = {version = "0.7", optional = true} bb8-tiberius = {version = "0.5", optional = true} @@ -52,6 +52,8 @@ tokio = {version = "1", features = ["rt", "rt-multi-thread", "net"], optional = url = {version = "2", optional = true} urlencoding = {version = "2.1", optional = true} uuid = {version = "0.8", optional = true} +j4rs = {version = "0.13", optional = true} +datafusion = {git = "https://github.com/apache/arrow-datafusion", rev = "93a7054b837cec2418adc427a6505dcea92e6755", optional = true} [lib] crate-type = ["cdylib", "rlib"] @@ -64,7 +66,7 @@ iai = "0.1" pprof = {version = "0.5", features = ["flamegraph"]} [features] -all = ["src_sqlite", "src_postgres", "src_mysql", "src_mssql", "src_oracle", "src_bigquery", "src_csv", "src_dummy", "dst_arrow", "dst_arrow2"] +all = ["src_sqlite", "src_postgres", "src_mysql", "src_mssql", "src_oracle", "src_bigquery", "src_csv", "src_dummy", "dst_arrow", "dst_arrow2", "federation"] branch = [] default = ["fptr"] dst_arrow = ["arrow", "chrono"] @@ -94,5 +96,6 @@ src_postgres = [ "postgres-openssl", ] src_sqlite = ["rusqlite", "r2d2_sqlite", "fallible-streaming-iterator", "owning_ref", "chrono", "r2d2", "urlencoding"] +federation = ["datafusion", "j4rs", "tokio"] [package.metadata.docs.rs] features = ["all"] diff --git a/connectorx/examples/federated_test.rs b/connectorx/examples/federated_test.rs new file mode 100644 index 0000000000..eadcacd2e1 --- /dev/null +++ b/connectorx/examples/federated_test.rs @@ -0,0 +1,214 @@ +use connectorx::{ + prelude::*, + sources::{ + mysql::BinaryProtocol as MYSQLBinaryProtocol, + postgres::{rewrite_tls_args, BinaryProtocol, PostgresSource}, + }, + sql::CXQuery, + transports::PostgresArrowTransport, +}; +use datafusion::datasource::MemTable; +use datafusion::prelude::*; +use j4rs::{ClasspathEntry, InvocationArg, Jvm, JvmBuilder}; +use postgres::NoTls; +use std::collections::HashMap; +use std::convert::TryFrom; +use std::env; +use std::fs; +use std::iter::Iterator; +use std::sync::Arc; +use url::Url; + +fn main() { + let db_map = HashMap::from([("db1", "POSTGRES"), ("db2", "POSTGRES"), ("LOCAL", "LOCAL")]); + + let path = fs::canonicalize("../federated-rewriter.jar").unwrap(); + let entry = ClasspathEntry::new(path.to_str().unwrap()); + let jvm: Jvm = JvmBuilder::new().classpath_entry(entry).build().unwrap(); + + let args: Vec = env::args().collect(); + let file = &args[1]; + let sql = fs::read_to_string(file).unwrap(); + println!("input sql: {}", sql); + let sql = InvocationArg::try_from(sql).unwrap(); + + let ds1 = jvm + .invoke_static( + "org.apache.calcite.adapter.jdbc.JdbcSchema", + "dataSource", + &[ + InvocationArg::try_from(env::var("DB1_JDBC_URL").unwrap()).unwrap(), + InvocationArg::try_from(env::var("DB1_JDBC_DRIVER").unwrap()).unwrap(), + InvocationArg::try_from(env::var("DB1_USER").unwrap()).unwrap(), + InvocationArg::try_from(env::var("DB1_PASSWORD").unwrap()).unwrap(), + ], + ) + .unwrap(); + + let ds2 = jvm + .invoke_static( + "org.apache.calcite.adapter.jdbc.JdbcSchema", + "dataSource", + &[ + InvocationArg::try_from(env::var("DB2_JDBC_URL").unwrap()).unwrap(), + InvocationArg::try_from(env::var("DB2_JDBC_DRIVER").unwrap()).unwrap(), + InvocationArg::try_from(env::var("DB2_USER").unwrap()).unwrap(), + InvocationArg::try_from(env::var("DB2_PASSWORD").unwrap()).unwrap(), + ], + ) + .unwrap(); + + let db_conns = jvm.create_instance("java.util.HashMap", &[]).unwrap(); + jvm.invoke( + &db_conns, + "put", + &[ + InvocationArg::try_from("db1").unwrap(), + InvocationArg::try_from(ds1).unwrap(), + ], + ) + .unwrap(); + jvm.invoke( + &db_conns, + "put", + &[ + InvocationArg::try_from("db2").unwrap(), + InvocationArg::try_from(ds2).unwrap(), + ], + ) + .unwrap(); + + let db_conns = InvocationArg::try_from(db_conns).unwrap(); + + let rewriter = jvm + .create_instance("ai.dataprep.federated.FederatedQueryRewriter", &[]) + .unwrap(); + + let plan = jvm.invoke(&rewriter, "rewrite", &[db_conns, sql]).unwrap(); + + let count = jvm.invoke(&plan, "getCount", &[]).unwrap(); + let count: i32 = jvm.to_rust(count).unwrap(); + + let ctx = SessionContext::new(); + let mut local_sql = String::new(); + let mut alias_names = vec![]; + for i in 0..count { + println!("\nquery {i}:"); + + let db = jvm + .invoke( + &plan, + "getDBName", + &[InvocationArg::try_from(i) + .unwrap() + .into_primitive() + .unwrap()], + ) + .unwrap(); + let db: String = jvm.to_rust(db).unwrap(); + + let alias_db = jvm + .invoke( + &plan, + "getAliasDBName", + &[InvocationArg::try_from(i) + .unwrap() + .into_primitive() + .unwrap()], + ) + .unwrap(); + let alias_db: String = jvm.to_rust(alias_db).unwrap(); + + let rewrite_sql = jvm + .invoke( + &plan, + "getSql", + &[InvocationArg::try_from(i) + .unwrap() + .into_primitive() + .unwrap()], + ) + .unwrap(); + let rewrite_sql: String = jvm.to_rust(rewrite_sql).unwrap(); + println!("db: {}, rewrite sql: {}", db, rewrite_sql); + + if db == "LOCAL" { + local_sql = rewrite_sql; + } else { + let mut destination = ArrowDestination::new(); + let queries = [CXQuery::naked(rewrite_sql)]; + + let conn = match db.as_str() { + "db1" => env::var("DB1").unwrap(), + "db2" => env::var("DB2").unwrap(), + _ => unimplemented!(), + }; + + match db_map[db.as_str()] { + "POSTGRES" => { + let url = Url::parse(&conn).unwrap(); + let (config, _) = rewrite_tls_args(&url).unwrap(); + + let sb = + PostgresSource::::new(config, NoTls, 1).unwrap(); + let dispatcher = Dispatcher::< + _, + _, + PostgresArrowTransport, + >::new( + sb, &mut destination, &queries, None + ); + // println!("run dispatcher"); + dispatcher.run().unwrap(); + } + "MYSQL" => { + let source = MySQLSource::::new(conn.as_str(), 1).unwrap(); + let dispatcher = + Dispatcher::<_, _, MySQLArrowTransport>::new( + source, + &mut destination, + &queries, + None, + ); + dispatcher.run().unwrap(); + } + _ => {} + }; + let rbs = destination.arrow().unwrap(); + // println!("schema: {}", rbs[0].schema()); + // arrow::util::pretty::print_batches(&rbs).unwrap(); + let provider = MemTable::try_new(rbs[0].schema(), vec![rbs]).unwrap(); + ctx.register_table(alias_db.as_str(), Arc::new(provider)) + .unwrap(); + alias_names.push(alias_db); + } + } + + println!("\nquery final:"); + let rt = Arc::new(tokio::runtime::Runtime::new().expect("Failed to create runtime")); + // until datafusion fix the bug + for alias in alias_names { + local_sql = local_sql.replace(format!("\"{}\"", alias).as_str(), alias.as_str()); + } + println!("{}", local_sql); + + // let sql1 = "SELECT * FROM db2 INNER JOIN db1 ON db2.\"p_partkey\" = db1.\"l_partkey\" AND db2.\"EXPR$0\" AND (db2.\"EXPR$1\" AND db1.\"EXPR$1\") AND (db1.\"EXPR$2\" AND db2.\"EXPR$2\" AND (db1.\"EXPR$3\" AND db1.\"EXPR$4\"))"; + // println!("==== run sql 1 ===="); + // let t = rt.block_on(ctx.sql(sql1)).unwrap(); + // // rt.block_on(t.limit(5).unwrap().show()).unwrap(); + // let rbs1 = rt.block_on(t.collect()).unwrap(); + // arrow::util::pretty::print_batches(&rbs1).unwrap(); + // println!("==== run sql 2 ===="); + + let df = rt.block_on(ctx.sql(local_sql.as_str())).unwrap(); + rt.block_on(df.explain(false, false).unwrap().show()) + .unwrap(); + rt.block_on(df.limit(5).unwrap().show()).unwrap(); + let num_rows = rt + .block_on(df.collect()) + .unwrap() + .into_iter() + .map(|rb| rb.num_rows()) + .sum::(); + println!("Final # rows: {}", num_rows); +} diff --git a/connectorx/examples/jvm_test.rs b/connectorx/examples/jvm_test.rs new file mode 100644 index 0000000000..7e294ab7e5 --- /dev/null +++ b/connectorx/examples/jvm_test.rs @@ -0,0 +1,57 @@ +use connectorx::{ + prelude::*, + sources::postgres::{rewrite_tls_args, BinaryProtocol, PostgresSource}, + sql::CXQuery, + transports::PostgresArrowTransport, +}; +use j4rs::{ClasspathEntry, InvocationArg, Jvm, JvmBuilder}; +use postgres::NoTls; +use std::convert::TryFrom; +use std::env; +use std::fs; +use std::iter::Iterator; +use url::Url; + +fn main() { + let path = fs::canonicalize("./federated-rewriter.jar").unwrap(); + println!("path: {:?}", path); + let entry = ClasspathEntry::new(path.to_str().unwrap()); + let jvm: Jvm = JvmBuilder::new().classpath_entry(entry).build().unwrap(); + + let args: Vec = env::args().collect(); + let file = &args[1]; + let sql = fs::read_to_string(file).unwrap(); + println!("input sql: {}", sql); + let sql = InvocationArg::try_from(sql).unwrap(); + let rewrite_sql = jvm + .invoke_static("ai.dataprep.federated.QueryRewriter", "rewrite", &[sql]) + .unwrap(); + + let rewrite_sql: String = jvm.to_rust(rewrite_sql).unwrap(); + + println!("rewrite sql: {}", rewrite_sql); + + let conn = env::var("POSTGRES_URL").unwrap(); + let url = Url::parse(&conn).unwrap(); + let (config, _) = rewrite_tls_args(&url).unwrap(); + + let sb = PostgresSource::::new(config, NoTls, 1).unwrap(); + let mut destination = ArrowDestination::new(); + let queries = [CXQuery::naked(rewrite_sql)]; + let dispatcher = Dispatcher::<_, _, PostgresArrowTransport>::new( + sb, + &mut destination, + &queries, + None, + ); + println!("run dispatcher"); + dispatcher.run().unwrap(); + let result = destination.arrow().unwrap(); + let counts = result + .iter() + .map(|rb| rb.num_rows()) + .collect::>(); + + println!("result rows: {}", counts.iter().sum::()); + println!("result columns: {}", result[0].schema()) +} diff --git a/connectorx/src/constants.rs b/connectorx/src/constants.rs index 56c27c4e37..566bacd584 100644 --- a/connectorx/src/constants.rs +++ b/connectorx/src/constants.rs @@ -17,3 +17,16 @@ pub const DB_BUFFER_SIZE: usize = 32; #[cfg(any(feature = "src_oracle"))] pub const ORACLE_ARRAY_SIZE: u32 = (1 * KILO) as u32; + +#[cfg(all(not(debug_assertions), feature = "federation"))] +pub const J4RS_BASE_PATH: &str = "../target/release"; + +#[cfg(all(debug_assertions, feature = "federation"))] +pub const J4RS_BASE_PATH: &str = "../target/debug"; + +#[cfg(feature = "federation")] +pub const CX_REWRITER_PATH: &str = + "../connectorx-python/connectorx/dependencies/federated-rewriter.jar"; + +#[cfg(any(feature = "federation", feature = "src_postgres"))] +pub const POSTGRES_JDBC_DRIVER: &str = "org.postgresql.Driver"; diff --git a/connectorx/src/destinations/arrow/arrow_assoc.rs b/connectorx/src/destinations/arrow/arrow_assoc.rs index 936f31eec9..5ac8d3522b 100644 --- a/connectorx/src/destinations/arrow/arrow_assoc.rs +++ b/connectorx/src/destinations/arrow/arrow_assoc.rs @@ -2,7 +2,7 @@ use super::errors::{ArrowDestinationError, Result}; use crate::constants::SECONDS_IN_DAY; use arrow::array::{ ArrayBuilder, BooleanBuilder, Date32Builder, Date64Builder, Float32Builder, Float64Builder, - Int32Builder, Int64Builder, LargeBinaryBuilder, LargeStringBuilder, Time64NanosecondBuilder, + Int32Builder, Int64Builder, LargeBinaryBuilder, StringBuilder, Time64NanosecondBuilder, UInt32Builder, UInt64Builder, }; use arrow::datatypes::Field; @@ -66,10 +66,10 @@ impl_arrow_assoc!(f64, ArrowDataType::Float64, Float64Builder); impl_arrow_assoc!(bool, ArrowDataType::Boolean, BooleanBuilder); impl ArrowAssoc for &str { - type Builder = LargeStringBuilder; + type Builder = StringBuilder; fn builder(nrows: usize) -> Self::Builder { - LargeStringBuilder::new(nrows) + StringBuilder::new(nrows) } #[throws(ArrowDestinationError)] @@ -78,15 +78,15 @@ impl ArrowAssoc for &str { } fn field(header: &str) -> Field { - Field::new(header, ArrowDataType::LargeUtf8, false) + Field::new(header, ArrowDataType::Utf8, false) } } impl ArrowAssoc for Option<&str> { - type Builder = LargeStringBuilder; + type Builder = StringBuilder; fn builder(nrows: usize) -> Self::Builder { - LargeStringBuilder::new(nrows) + StringBuilder::new(nrows) } #[throws(ArrowDestinationError)] @@ -98,15 +98,15 @@ impl ArrowAssoc for Option<&str> { } fn field(header: &str) -> Field { - Field::new(header, ArrowDataType::LargeUtf8, true) + Field::new(header, ArrowDataType::Utf8, true) } } impl ArrowAssoc for String { - type Builder = LargeStringBuilder; + type Builder = StringBuilder; fn builder(nrows: usize) -> Self::Builder { - LargeStringBuilder::new(nrows) + StringBuilder::new(nrows) } #[throws(ArrowDestinationError)] @@ -115,15 +115,15 @@ impl ArrowAssoc for String { } fn field(header: &str) -> Field { - Field::new(header, ArrowDataType::LargeUtf8, false) + Field::new(header, ArrowDataType::Utf8, false) } } impl ArrowAssoc for Option { - type Builder = LargeStringBuilder; + type Builder = StringBuilder; fn builder(nrows: usize) -> Self::Builder { - LargeStringBuilder::new(nrows) + StringBuilder::new(nrows) } #[throws(ArrowDestinationError)] @@ -135,7 +135,7 @@ impl ArrowAssoc for Option { } fn field(header: &str) -> Field { - Field::new(header, ArrowDataType::LargeUtf8, true) + Field::new(header, ArrowDataType::Utf8, true) } } diff --git a/connectorx/src/errors.rs b/connectorx/src/errors.rs index a7f0a8da13..de1c8026dd 100644 --- a/connectorx/src/errors.rs +++ b/connectorx/src/errors.rs @@ -30,9 +30,38 @@ pub enum ConnectorXError { #[error("Cannot get total number of rows in advance.")] CountError(), + #[error("File {0} not found.")] + FileNotFoundError(String), + #[error(transparent)] SQLParserError(#[from] sqlparser::parser::ParserError), + #[error(transparent)] + StdIOError(#[from] std::io::Error), + + #[cfg(feature = "federation")] + #[error(transparent)] + J4RSError(#[from] j4rs::errors::J4RsError), + + #[error(transparent)] + StdVarError(#[from] std::env::VarError), + + #[cfg(feature = "federation")] + #[error(transparent)] + DataFusionError(#[from] datafusion::error::DataFusionError), + + #[cfg(feature = "federation")] + #[error(transparent)] + UrlParseError(#[from] url::ParseError), + + // #[error(transparent)] + // PostgresError(#[from] crate::source::postgres::Error), + + // #[error(transparent)] + // ArrowError(#[from] crate::destination::arrow::ArrowDestinationError), + + // #[error(transparent)] + // PostgresArrowTransportError(#[from] crate::transports::PostgresArrowTransportError), /// Any other errors that are too trivial to be put here explicitly. #[error(transparent)] Other(#[from] anyhow::Error), diff --git a/connectorx/src/fed_dispatcher.rs b/connectorx/src/fed_dispatcher.rs new file mode 100644 index 0000000000..d55a892747 --- /dev/null +++ b/connectorx/src/fed_dispatcher.rs @@ -0,0 +1,212 @@ +use crate::{ + constants::{CX_REWRITER_PATH, J4RS_BASE_PATH, POSTGRES_JDBC_DRIVER}, + prelude::*, + sources::postgres::{rewrite_tls_args, BinaryProtocol, PostgresSource}, + sql::CXQuery, + transports::PostgresArrowTransport, +}; +use arrow::record_batch::RecordBatch; +use datafusion::datasource::MemTable; +use datafusion::prelude::*; +use fehler::throws; +use j4rs::{ClasspathEntry, InvocationArg, Jvm, JvmBuilder}; +use log::debug; +use postgres::NoTls; +use rayon::prelude::*; +use std::collections::HashMap; +use std::convert::TryFrom; +use std::sync::{mpsc::channel, Arc}; +use std::{env, fs}; +use url::Url; + +struct Plan { + db_name: String, + db_alias: String, + sql: String, +} + +#[throws(ConnectorXError)] +fn init_jvm(j4rs_base: Option<&str>) -> Jvm { + let base = match j4rs_base { + Some(path) => fs::canonicalize(path) + .map_err(|_| ConnectorXError::FileNotFoundError(path.to_string()))?, + None => fs::canonicalize(J4RS_BASE_PATH) + .map_err(|_| ConnectorXError::FileNotFoundError(J4RS_BASE_PATH.to_string()))?, + }; + debug!("j4rs base path: {:?}", base); + + let rewriter_path = env::var("CX_REWRITER_PATH").unwrap_or(CX_REWRITER_PATH.to_string()); + let path = fs::canonicalize(rewriter_path.as_str()) + .map_err(|_| ConnectorXError::FileNotFoundError(rewriter_path))?; + + debug!("rewriter path: {:?}", path); + + let entry = ClasspathEntry::new(path.to_str().unwrap()); + JvmBuilder::new() + .classpath_entry(entry) + .with_base_path(base.to_str().unwrap()) + .build()? +} + +#[throws(ConnectorXError)] +fn rewrite_sql(jvm: &Jvm, sql: &str, db_map: &HashMap) -> Vec { + let sql = InvocationArg::try_from(sql).unwrap(); + let db_conns = jvm.create_instance("java.util.HashMap", &[])?; + for (db_name, url) in db_map.iter() { + debug!("url: {:?}", url); + let ds = jvm.invoke_static( + "org.apache.calcite.adapter.jdbc.JdbcSchema", + "dataSource", + &[ + InvocationArg::try_from(format!( + "jdbc:postgresql://{}:{}{}", + url.host_str().unwrap_or("localhost"), + url.port().unwrap_or(5432), + url.path() + )) + .unwrap(), + InvocationArg::try_from(POSTGRES_JDBC_DRIVER).unwrap(), + InvocationArg::try_from(url.username()).unwrap(), + InvocationArg::try_from(url.password().unwrap_or("")).unwrap(), + ], + )?; + + jvm.invoke( + &db_conns, + "put", + &[ + InvocationArg::try_from(db_name).unwrap(), + InvocationArg::try_from(ds).unwrap(), + ], + )?; + } + + let rewriter = jvm.create_instance("ai.dataprep.federated.FederatedQueryRewriter", &[])?; + let db_conns = InvocationArg::try_from(db_conns).unwrap(); + let plan = jvm.invoke(&rewriter, "rewrite", &[db_conns, sql])?; + + let count = jvm.invoke(&plan, "getCount", &[])?; + let count: i32 = jvm.to_rust(count)?; + debug!("rewrite finished, got {} queries", count); + + let mut fed_plan = vec![]; + for i in 0..count { + let db = jvm.invoke( + &plan, + "getDBName", + &[InvocationArg::try_from(i).unwrap().into_primitive()?], + )?; + let db: String = jvm.to_rust(db)?; + + let alias_db = jvm.invoke( + &plan, + "getAliasDBName", + &[InvocationArg::try_from(i).unwrap().into_primitive()?], + )?; + let alias_db: String = jvm.to_rust(alias_db)?; + + let rewrite_sql = jvm.invoke( + &plan, + "getSql", + &[InvocationArg::try_from(i).unwrap().into_primitive()?], + )?; + let rewrite_sql: String = jvm.to_rust(rewrite_sql)?; + debug!( + "{} - db: {}, alias: {} rewrite sql: {}", + i, db, alias_db, rewrite_sql + ); + fed_plan.push(Plan { + db_name: db, + db_alias: alias_db, + sql: rewrite_sql, + }); + } + fed_plan +} + +#[throws(ConnectorXError)] +pub fn run( + sql: String, + db_map: HashMap, + j4rs_base: Option<&str>, +) -> Vec { + debug!("federated input sql: {}", sql); + + let jvm = init_jvm(j4rs_base)?; + debug!("init jvm successfully!"); + + let mut db_url_map: HashMap = HashMap::new(); + for (k, v) in db_map.into_iter() { + db_url_map.insert(k, Url::parse(v.as_str())?); + } + + let fed_plan = rewrite_sql(&jvm, sql.as_str(), &db_url_map)?; + + debug!("fetch queries from remote"); + let (sender, receiver) = channel(); + fed_plan.into_par_iter().enumerate().try_for_each_with( + sender, + |s, (i, p)| -> Result<(), ConnectorXError> { + match p.db_name.as_str() { + "LOCAL" => { + s.send((p.sql, None)).expect("send error local"); + } + _ => { + debug!("start query {}: {}", i, p.sql); + let mut destination = ArrowDestination::new(); + let queries = [CXQuery::naked(p.sql)]; + let url = &db_url_map[p.db_name.as_str()]; + let (config, _) = + rewrite_tls_args(&url).expect(&format!("{} postgres config error", i)); + let sb = PostgresSource::::new(config, NoTls, 1) + .expect(&format!("{} postgres init error", i)); + let dispatcher = Dispatcher::< + _, + _, + PostgresArrowTransport, + >::new( + sb, &mut destination, &queries, None + ); + dispatcher + .run() + .expect(&format!("run dispatcher fails {}", i)); + let rbs = destination + .arrow() + .expect(&format!("get arrow fails {}", i)); + let provider = MemTable::try_new(rbs[0].schema(), vec![rbs])?; + s.send((p.db_alias, Some(Arc::new(provider)))) + .expect(&format!("send error {}", i)); + debug!("query {} finished", i); + } + } + Ok(()) + }, + )?; + + let ctx = SessionContext::new(); + let mut alias_names: Vec = vec![]; + let mut local_sql = String::new(); + receiver + .iter() + .try_for_each(|(alias, provider)| -> Result<(), ConnectorXError> { + match provider { + Some(p) => { + ctx.register_table(alias.as_str(), p)?; + alias_names.push(alias); + } + None => local_sql = alias, + } + + Ok(()) + })?; + + debug!("\nexecute query final:"); + let rt = Arc::new(tokio::runtime::Runtime::new().expect("Failed to create runtime")); + // until datafusion fix the bug: https://github.com/apache/arrow-datafusion/issues/2147 + for alias in alias_names { + local_sql = local_sql.replace(format!("\"{}\"", alias).as_str(), alias.as_str()); + } + + let df = rt.block_on(ctx.sql(local_sql.as_str()))?; + rt.block_on(df.collect())? +} diff --git a/connectorx/src/lib.rs b/connectorx/src/lib.rs index 27fe79f41e..93edc982e5 100644 --- a/connectorx/src/lib.rs +++ b/connectorx/src/lib.rs @@ -138,6 +138,8 @@ pub mod data_order; pub mod destinations; mod dispatcher; pub mod errors; +#[cfg(feature = "federation")] +pub mod fed_dispatcher; pub mod sources; #[doc(hidden)] pub mod sql; diff --git a/connectorx/src/macros.rs b/connectorx/src/macros.rs index 059428a01e..3d80d67f6a 100644 --- a/connectorx/src/macros.rs +++ b/connectorx/src/macros.rs @@ -201,7 +201,7 @@ macro_rules! impl_transport { ts2: Self::TSD, src: &'r mut <::Partition as $crate::sources::SourcePartition>::Parser<'s>, dst: &'r mut ::Partition<'d>, - ) -> Result<(), Self::Error> { + ) -> Result<(), Self::Error> where Self: 'd { match (ts1, ts2) { $( ($TSS::$V1(true), $TSD::$V2(true)) => { @@ -236,7 +236,7 @@ macro_rules! impl_transport { src: &mut <::Partition as $crate::sources::SourcePartition>::Parser<'s>, dst: &mut ::Partition<'d>, ) -> Result<(), Self::Error> - > { + > where Self: 'd { match (ts1, ts2) { $( ($TSS::$V1(true), $TSD::$V2(true)) => { diff --git a/connectorx/src/transports/mssql_arrow.rs b/connectorx/src/transports/mssql_arrow.rs index f12749b8ef..9e3aa8e143 100644 --- a/connectorx/src/transports/mssql_arrow.rs +++ b/connectorx/src/transports/mssql_arrow.rs @@ -30,9 +30,9 @@ impl_transport!( systems = MsSQLTypeSystem => ArrowTypeSystem, route = MsSQLSource => ArrowDestination, mappings = { - { Tinyint[u8] => Int32[i32] | conversion auto } - { Smallint[i16] => Int32[i32] | conversion auto } - { Int[i32] => Int32[i32] | conversion auto } + { Tinyint[u8] => Int64[i64] | conversion auto } + { Smallint[i16] => Int64[i64] | conversion auto } + { Int[i32] => Int64[i64] | conversion auto } { Bigint[i64] => Int64[i64] | conversion auto } { Intn[IntN] => Int64[i64] | conversion option } { Float24[f32] => Float32[f32] | conversion auto } diff --git a/connectorx/src/transports/postgres_arrow.rs b/connectorx/src/transports/postgres_arrow.rs index 5a4d15c6e1..c8e9b4677a 100644 --- a/connectorx/src/transports/postgres_arrow.rs +++ b/connectorx/src/transports/postgres_arrow.rs @@ -40,11 +40,11 @@ macro_rules! impl_postgres_transport { systems = PostgresTypeSystem => ArrowTypeSystem, route = PostgresSource<$proto, $tls> => ArrowDestination, mappings = { - { Float4[f32] => Float32[f32] | conversion auto } + { Float4[f32] => Float64[f64] | conversion auto } { Float8[f64] => Float64[f64] | conversion auto } { Numeric[Decimal] => Float64[f64] | conversion option } - { Int2[i16] => Int32[i32] | conversion auto } - { Int4[i32] => Int32[i32] | conversion auto } + { Int2[i16] => Int64[i64] | conversion auto } + { Int4[i32] => Int64[i64] | conversion auto } { Int8[i64] => Int64[i64] | conversion auto } { Bool[bool] => Boolean[bool] | conversion auto } { Text[&'r str] => LargeUtf8[String] | conversion owned } diff --git a/connectorx/src/typesystem.rs b/connectorx/src/typesystem.rs index d11844da7f..093f7bd63a 100644 --- a/connectorx/src/typesystem.rs +++ b/connectorx/src/typesystem.rs @@ -93,7 +93,9 @@ pub trait Transport { ts2: Self::TSD, src: &'r mut <::Partition as SourcePartition>::Parser<'s>, dst: &'r mut ::Partition<'d>, - ) -> Result<(), Self::Error>; + ) -> Result<(), Self::Error> + where + Self: 'd; #[allow(clippy::type_complexity)] fn processor<'s, 'd>( @@ -104,7 +106,9 @@ pub trait Transport { src: &mut <::Partition as SourcePartition>::Parser<'s>, dst: &mut ::Partition<'d>, ) -> Result<(), Self::Error>, - >; + > + where + Self: 'd; } #[doc(hidden)] diff --git a/connectorx/tests/test_arrow.rs b/connectorx/tests/test_arrow.rs index 7f897acffd..31c93fafaa 100644 --- a/connectorx/tests/test_arrow.rs +++ b/connectorx/tests/test_arrow.rs @@ -1,5 +1,5 @@ use arrow::{ - array::{BooleanArray, Float64Array, Int32Array, Int64Array, LargeStringArray}, + array::{BooleanArray, Float64Array, Int64Array, StringArray}, record_batch::RecordBatch, }; use connectorx::{ @@ -88,9 +88,9 @@ fn test_arrow() { assert!(r .column(3) .as_any() - .downcast_ref::() + .downcast_ref::() .unwrap() - .eq(&LargeStringArray::from(vec!["0", "1", "2", "3"]))); + .eq(&StringArray::from(vec!["0", "1", "2", "3"]))); assert!(r .column(4) .as_any() @@ -123,11 +123,9 @@ fn test_arrow() { assert!(r .column(3) .as_any() - .downcast_ref::() + .downcast_ref::() .unwrap() - .eq(&LargeStringArray::from(vec![ - "0", "1", "2", "3", "4", "5", "6" - ]))); + .eq(&StringArray::from(vec!["0", "1", "2", "3", "4", "5", "6"]))); assert!(r .column(4) .as_any() @@ -219,21 +217,21 @@ fn test_postgres_arrow() { assert!(r .column(0) .as_any() - .downcast_ref::() + .downcast_ref::() .unwrap() - .eq(&Int32Array::from(vec![1, 0]))); + .eq(&Int64Array::from(vec![1, 0]))); assert!(r .column(1) .as_any() - .downcast_ref::() + .downcast_ref::() .unwrap() - .eq(&Int32Array::from(vec![3, 5]))); + .eq(&Int64Array::from(vec![3, 5]))); assert!(r .column(2) .as_any() - .downcast_ref::() + .downcast_ref::() .unwrap() - .eq(&LargeStringArray::from(vec!["str1", "a"]))); + .eq(&StringArray::from(vec!["str1", "a"]))); assert!(r .column(3) .as_any() @@ -251,21 +249,21 @@ fn test_postgres_arrow() { assert!(r .column(0) .as_any() - .downcast_ref::() + .downcast_ref::() .unwrap() - .eq(&Int32Array::from(vec![2, 3, 4, 1314]))); + .eq(&Int64Array::from(vec![2, 3, 4, 1314]))); assert!(r .column(1) .as_any() - .downcast_ref::() + .downcast_ref::() .unwrap() - .eq(&Int32Array::from(vec![None, Some(7), Some(9), Some(2)]))); + .eq(&Int64Array::from(vec![None, Some(7), Some(9), Some(2)]))); assert!(r .column(2) .as_any() - .downcast_ref::() + .downcast_ref::() .unwrap() - .eq(&LargeStringArray::from(vec![ + .eq(&StringArray::from(vec![ Some("str2"), Some("b"), Some("c"), diff --git a/connectorx/tests/test_bigquery.rs b/connectorx/tests/test_bigquery.rs index 6958810703..a8eb04e7b5 100644 --- a/connectorx/tests/test_bigquery.rs +++ b/connectorx/tests/test_bigquery.rs @@ -1,7 +1,3 @@ -use arrow::{ - array::{BooleanArray, Float64Array, Int32Array, LargeStringArray}, - record_batch::RecordBatch, -}; use connectorx::{ destinations::arrow::ArrowDestination, prelude::*, sources::bigquery::BigQuerySource, sql::CXQuery, transports::BigQueryArrowTransport, @@ -28,7 +24,7 @@ fn test_source() { fn test_bigquery_partition() { let dburl = env::var("BIGQUERY_URL").unwrap(); let rt = Arc::new(Runtime::new().unwrap()); - let mut source = BigQuerySource::new(rt, &dburl).unwrap(); + let source = BigQuerySource::new(rt, &dburl).unwrap(); let queries = [ CXQuery::naked("SELECT * FROM (SELECT * FROM `dataprep-bigquery.dataprep.lineitem` LIMIT 1000) AS CXTMPTAB_PART WHERE 1281 <= CXTMPTAB_PART.L_ORDERKEY AND CXTMPTAB_PART.L_ORDERKEY < 29128610"), CXQuery::naked("SELECT * FROM (SELECT * FROM `dataprep-bigquery.dataprep.lineitem` LIMIT 1000) AS CXTMPTAB_PART WHERE 29128610 <= CXTMPTAB_PART.L_ORDERKEY AND CXTMPTAB_PART.L_ORDERKEY < 58255940"), @@ -37,5 +33,5 @@ fn test_bigquery_partition() { let dispatcher = Dispatcher::<_, _, BigQueryArrowTransport>::new(source, &mut destination, &queries, None); dispatcher.run().unwrap(); - let result = destination.arrow().unwrap(); + let _result = destination.arrow().unwrap(); } diff --git a/connectorx/tests/test_fed.rs b/connectorx/tests/test_fed.rs new file mode 100644 index 0000000000..cfa200a3a2 --- /dev/null +++ b/connectorx/tests/test_fed.rs @@ -0,0 +1,21 @@ +use connectorx::fed_dispatcher::run; +use std::collections::HashMap; +use std::env; + +#[test] +#[ignore] +fn test_fed() { + let _ = env_logger::builder().is_test(true).try_init(); + + let sql = "select test_bool, AVG(test_float) as avg_float, SUM(test_int) as sum_int from db1.test_table as a, db2.test_str as b where a.test_int = b.id AND test_nullint is not NULL GROUP BY test_bool ORDER BY sum_int"; + let db_map = HashMap::from([ + (String::from("db1"), env::var("DB1").unwrap()), + (String::from("db2"), env::var("DB2").unwrap()), + ]); + + println!("db_map: {:?}", db_map); + + // make sure no error here + let rbs = run(sql.to_string(), db_map, None).unwrap(); + arrow::util::pretty::print_batches(&rbs).unwrap(); +} diff --git a/connectorx/tests/test_mssql.rs b/connectorx/tests/test_mssql.rs index aaee1945be..724682a68d 100644 --- a/connectorx/tests/test_mssql.rs +++ b/connectorx/tests/test_mssql.rs @@ -1,5 +1,5 @@ use arrow::{ - array::{BooleanArray, Float64Array, Int32Array, Int64Array, LargeStringArray}, + array::{BooleanArray, Float64Array, Int64Array, StringArray}, record_batch::RecordBatch, }; use connectorx::{ @@ -89,9 +89,9 @@ pub fn verify_arrow_results(result: Vec) { assert!(rb .column(0) .as_any() - .downcast_ref::() + .downcast_ref::() .unwrap() - .eq(&Int32Array::from(vec![1, 0]))); + .eq(&Int64Array::from(vec![1, 0]))); assert!(rb .column(1) @@ -103,9 +103,9 @@ pub fn verify_arrow_results(result: Vec) { assert!(rb .column(2) .as_any() - .downcast_ref::() + .downcast_ref::() .unwrap() - .eq(&LargeStringArray::from(vec![Some("str1"), Some("a"),]))); + .eq(&StringArray::from(vec![Some("str1"), Some("a"),]))); assert!(rb .column(3) @@ -125,9 +125,9 @@ pub fn verify_arrow_results(result: Vec) { assert!(rb .column(0) .as_any() - .downcast_ref::() + .downcast_ref::() .unwrap() - .eq(&Int32Array::from(vec![2, 3, 4, 1314]))); + .eq(&Int64Array::from(vec![2, 3, 4, 1314]))); assert!(rb .column(1) @@ -139,9 +139,9 @@ pub fn verify_arrow_results(result: Vec) { assert!(rb .column(2) .as_any() - .downcast_ref::() + .downcast_ref::() .unwrap() - .eq(&LargeStringArray::from(vec![ + .eq(&StringArray::from(vec![ Some("str2"), Some("b"), Some("c"), diff --git a/connectorx/tests/test_mysql.rs b/connectorx/tests/test_mysql.rs index 4550ae8632..4bc21bc284 100644 --- a/connectorx/tests/test_mysql.rs +++ b/connectorx/tests/test_mysql.rs @@ -1,5 +1,5 @@ use arrow::{ - array::{Float64Array, Int64Array, LargeStringArray}, + array::{Float64Array, Int64Array, StringArray}, record_batch::RecordBatch, }; use connectorx::{ @@ -82,9 +82,9 @@ pub fn verify_arrow_results(result: Vec) { assert!(r .column(2) .as_any() - .downcast_ref::() + .downcast_ref::() .unwrap() - .eq(&LargeStringArray::from(vec!["odd", "even"]))); + .eq(&StringArray::from(vec!["odd", "even"]))); assert!(r .column(3) .as_any() @@ -108,9 +108,9 @@ pub fn verify_arrow_results(result: Vec) { assert!(r .column(2) .as_any() - .downcast_ref::() + .downcast_ref::() .unwrap() - .eq(&LargeStringArray::from(vec!["odd", "even", "odd", "even"]))); + .eq(&StringArray::from(vec!["odd", "even", "odd", "even"]))); assert!(r .column(3) .as_any() diff --git a/connectorx/tests/test_postgres.rs b/connectorx/tests/test_postgres.rs index 9899154e70..df4850be02 100644 --- a/connectorx/tests/test_postgres.rs +++ b/connectorx/tests/test_postgres.rs @@ -1,5 +1,5 @@ use arrow::{ - array::{BooleanArray, Float64Array, Int32Array, LargeStringArray}, + array::{BooleanArray, Float64Array, Int64Array, StringArray}, record_batch::RecordBatch, }; use connectorx::{ @@ -230,21 +230,21 @@ pub fn verify_arrow_results(result: Vec) { assert!(r .column(0) .as_any() - .downcast_ref::() + .downcast_ref::() .unwrap() - .eq(&Int32Array::from(vec![1, 0]))); + .eq(&Int64Array::from(vec![1, 0]))); assert!(r .column(1) .as_any() - .downcast_ref::() + .downcast_ref::() .unwrap() - .eq(&Int32Array::from(vec![3, 5]))); + .eq(&Int64Array::from(vec![3, 5]))); assert!(r .column(2) .as_any() - .downcast_ref::() + .downcast_ref::() .unwrap() - .eq(&LargeStringArray::from(vec!["str1", "a"]))); + .eq(&StringArray::from(vec!["str1", "a"]))); assert!(r .column(3) .as_any() @@ -262,21 +262,21 @@ pub fn verify_arrow_results(result: Vec) { assert!(r .column(0) .as_any() - .downcast_ref::() + .downcast_ref::() .unwrap() - .eq(&Int32Array::from(vec![2, 3, 4, 1314]))); + .eq(&Int64Array::from(vec![2, 3, 4, 1314]))); assert!(r .column(1) .as_any() - .downcast_ref::() + .downcast_ref::() .unwrap() - .eq(&Int32Array::from(vec![None, Some(7), Some(9), Some(2)]))); + .eq(&Int64Array::from(vec![None, Some(7), Some(9), Some(2)]))); assert!(r .column(2) .as_any() - .downcast_ref::() + .downcast_ref::() .unwrap() - .eq(&LargeStringArray::from(vec![ + .eq(&StringArray::from(vec![ Some("str2"), Some("b"), Some("c"), diff --git a/docs/install.md b/docs/install.md index db34aac04f..f87aafdf2f 100644 --- a/docs/install.md +++ b/docs/install.md @@ -31,7 +31,7 @@ rustup override set nightly-{version} * Step 4: Build ```bash just bootstrap-python -just ci-build-python-wheel +just build-python-wheel ``` diff --git a/federated-query b/federated-query new file mode 160000 index 0000000000..e2d842d868 --- /dev/null +++ b/federated-query @@ -0,0 +1 @@ +Subproject commit e2d842d868742a616de6b6e94ea7dc8d3f7b1f75