Skip to content

[KYUUBI #6943][1/2]HiveScan support dpp#7436

Open
maomaodev wants to merge 2 commits intoapache:masterfrom
maomaodev:kyuubi-6943
Open

[KYUUBI #6943][1/2]HiveScan support dpp#7436
maomaodev wants to merge 2 commits intoapache:masterfrom
maomaodev:kyuubi-6943

Conversation

@maomaodev
Copy link
Copy Markdown
Contributor

@maomaodev maomaodev commented May 8, 2026

Why are the changes needed?

Part 1 of 2 to add KSHC support for dynamic partition pruning (DPP). See #6943.

  • Add DPP support in HiveScan for non-Parquet/ORC tables.
  • Add DPP support in ParquetScan / ORCScan for Parquet/ORC tables.

spark.sql.kyuubi.hive.connector.read.runtimeFilter.enabled is introduced by KSHC to control whether partition columns are exposed as runtime filter attributes, which is required for Spark DPP. The default value is true. To disable DPP on KSHC tables, set it to false.

How was this patch tested?

  1. Unit tests
  2. Manual test: TPC-DS benchmark (11 GB text dataset).
  • Spark configuration used for the benchmark(Spark 3.5.7, Kyuubi 1.12.0-SNAPSHOT):
spark.driver.cores    1
spark.driver.memory    4g
spark.executor.cores    1
spark.executor.instances    10
spark.executor.memory    4g
spark.master    yarn
spark.shuffle.service.enabled    true
spark.yarn.appMasterEnv.JAVA_HOME /usr/local/jdk-17
spark.executorEnv.JAVA_HOME /usr/local/jdk-17
  • Overall performance (sum of 99)
Dimension Vanilla Spark KSHC Before KSHC Now
Total time 5427.47 s 2775.26 s 2574.12 s
vs. Vanilla Spark −48.87% −52.57%
vs. KSHC Before −7.25%

KSHC Now provides a 7.25% (~200 s) speedup over KSHC Before, with no correctness regression.

  • DPP hit subset (70/99)

DPP trigger was detected by matching runtime partition filter in the driver logs.

3,4,5,6,7,8,10,11,12,13,14,15,17,18,19,20,23,25,26,27,29,30,31,32,33,
35,36,38,40,42,45,46,47,48,49,50,51,52,53,54,55,56,57,58,60,63,64,65,
66,67,69,70,71,72,74,75,77,78,79,80,81,83,85,86,87,89,91,92,97,98
Dimension Vanilla Spark KSHC Before KSHC Now
Subset total time 3061.73 s 2131.38 s 1914.81 s
vs. Vanilla Spark −30.39% −37.46%
vs. KSHC Before −10.16%

On the DPP-hit subset, KSHC Now provides a 10.2% speedup over KSHC Before, noticeably larger than the overall 7.25%, indicating the performance benefit mainly comes from queries where DPP is triggered.

Was this patch authored or co-authored using generative AI tooling?

Partially assisted by Claude Code (Claude Opus 4.7) for unit test, code style fixes, and analysis of TPC-DS benchmark results. Core design and implementation are human-authored.

@pan3793
Copy link
Copy Markdown
Member

pan3793 commented May 8, 2026

How can KSHC be faster than Vanilla Spark?

@maomaodev
Copy link
Copy Markdown
Contributor Author

maomaodev commented May 8, 2026

How can KSHC be faster than Vanilla Spark?

Thanks for the review! After investigation, KSHC being faster than vanilla Spark on TEXT-format Hive tables mainly comes from two factors, the two factors partially overlap, together they explain essentially all of the gap:
1、Different file-splitting strategies

  • Vanilla Spark reads via HadoopRDD + mapred.FileInputFormat.getSplits(JobConf, numSplits):
splitSize = max(minSize, min(goalSize, blockSize)),  goalSize = totalSize / numSplits

With the defaults (minSize=1B, blockSize=128MB, numSplits=2), splitSize is only ~2MB, so each small file (1–4MB) is split into ~2 tasks, and scheduling overhead dominates.

  • KSHC uses DataSource V2 FileScan + FilePartition:
maxSplitBytes = min(maxPartitionBytes, max(openCostInBytes, bytesPerCore))

With the defaults (maxPartitionBytes=128MB, openCostInBytes=4MB), each small file becomes at most one task.

  • Validation (full 99 TPC-DS queries): setting mapreduce.input.fileinputformat.split.minsize=128M on vanilla Spark to align splitting with KSHC drops the total from 5427.47s → 3560.15s (saving ~1867s, about 65% of the 2853s gap).

2、KSHC reuses FileStatus via FileStatusCache

  • Vanilla Spark goes through HiveMetastoreCatalogHadoopFsRelation on every scan and re-listStatus per partition every time. KSHC's HiveCatalogFileIndex reuses FileStatus across scans within a session.
  • Validation (full 99 TPC-DS queries): replacing KSHC's fileStatusCache with NoopCache increases the total from 2574.12s → 4334.32s (adding ~1760s, about 62% of the 2853s gap).

Note on Orc/Parquet
We also tested 10GB TPC-DS in Orc/Parquet format, where vanilla Spark is faster than KSHC, this issue does not appear there.

On the Spark 3.3 CI failure
SupportsRuntimeV2Filtering was introduced in Spark 3.4, so cross-version compilation against 3.3 fails. Orc/Parquet DPP additionally relies on Scan.columnarSupportMode (Spark 3.5+) — without it, the DPP benefit is fully cancelled by a pre-DPP full-table listing (See apache/spark#42099). Any suggestions on the preferred direction? Or should we just support Spark 3.5+?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants