Skip to content

Conversation

@razajafri
Copy link
Collaborator

@razajafri razajafri commented Sep 25, 2025

Fixes #13401
Fixes #13487

Description

Problem
Previously, GPU support for reading Delta Lake DVs was only available for the PERFILE Parquet reader. As a result, users of the multithreaded reader would experience CPU fallback and degraded performance when reading Delta tables with deletion vectors. This PR closes that gap, improving performance, consistency, and feature parity.

Key changes

  • Implements logic to read and process Delta Deletion Vectors within the multithreaded Parquet reader path.
  • Ensures file-order agnosticism between Spark and the GPU for robust integration.
  • The COALESCING reader falls back to the MULTITHREADED reader regardless of the config after showing a warning.
  • Updates and renames tests to validate both PERFILE, COALESCING and MULTITHREADED reader support for DVs. The test modified is test_delta_deletion_vector_read.

Performance

  Baseline - commit id 5d88f6a DV MULTITHREADED READER  
 Percentage Deleted Average Average Speedup
5% 38504.6 29869.6 1.29
10% 37225.6 29824.2 1.25
25% 37978.4 29465.6 1.29
50% 36255.6 28331 1.28
75% 35527.8 28369.8 1.25
50% (500 small files) 35464 28947.4 1.23

Baseline: commit id - 5d88f6a
Target: This PR
Dataset: TPC-DS (sf100_parquet)
Environment: Local
Spark Configs

export SPARK_CONF=("--master" "local[16]"
                   "--conf" "spark.driver.maxResultSize=2GB"
                   "--conf" "spark.driver.memory=50G"
                   "--conf" "spark.executor.cores=16"
                   "--conf" "spark.executor.instances=1"
                   "--conf" "spark.executor.memory=16G"
                   "--conf" "spark.driver.maxResultSize=4gb"
                   "--conf" "spark.sql.files.maxPartitionBytes=2gb"
                   "--conf" "spark.sql.adaptive.enabled=true"
                   "--conf" "spark.plugins=com.nvidia.spark.SQLPlugin"
                   "--conf" "spark.rapids.memory.host.spillStorageSize=16G"
                   "--conf" "spark.rapids.memory.pinnedPool.size=8g"
                   "--conf" "spark.rapids.sql.concurrentGpuTasks=3"
                   "--conf" "spark.rapids.sql.explain=all"
                   "--conf" "spark.eventLog.enabled=true"
                   "--conf" "spark.eventLog.dir=/tmp/spark-events"
                   "--conf" "spark.sql.warehouse.dir=/home/rjafri/spark-warehouse"
                   "--conf" "spark.sql.legacy.createHiveTableByDefault=false"
                   "--conf" "spark.databricks.delta.deletionVectors.useMetadataRowIndex=false"
                   "--conf" "spark.rapids.sql.format.parquet.reader.type=MULTITHREADED"
                   "--packages" "io.delta:delta-spark_2.12:3.3.0"
                   "--conf" "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension"
                   "--conf" "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
                   "--conf" "spark.driver.extraClassPath=$SPARK_RAPIDS_PLUGIN_JAR:$NDS_LISTENER_JAR"
                   "--conf" "spark.executor.extraClassPath=$SPARK_RAPIDS_PLUGIN_JAR:$NDS_LISTENER_JAR")

Query:

-- start query 1
select *
from store_sales
ORDER BY ss_ticket_number DESC
LIMIT 1000000;
-- end query 1

Checklists

  • This PR has added documentation for new or modified features or behaviors.
  • This PR has added new tests or modified existing tests to cover new code paths.
    (Please explain in the PR description how the new code paths are tested, such as names of the new/existing tests that cover them.)
  • Performance testing has been performed and its results are added in the PR description. Or, an issue has been filed with a link in the PR description.

@razajafri razajafri changed the title Add Deletion Vector Read support to multithreaded Parquet reader Add Deletion Vector Read Support to Multithreaded Parquet Reader Sep 25, 2025
@razajafri razajafri marked this pull request as ready for review September 25, 2025 03:07
tablePath: Option[String]
) extends GpuParquetMultiFilePartitionReaderFactory(fileScan.conf, broadcastedConf,
fileScan.relation.dataSchema, fileScan.requiredSchema, fileScan.readPartitionSchema,
pushedFilters, fileScan.rapidsConf, fileScan.allMetrics, fileScan.queryUsesInputFile) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should enforce fileScan.queryUsesInputFile to true to avoid concating files.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This worths a warning log as well. Also please file an issue to support the concat.

@razajafri
Copy link
Collaborator Author

build

@razajafri
Copy link
Collaborator Author

All Delta Lake integration tests are passing

===================== 355 passed, 139 skipped, 64 warnings in 578.85s (0:09:38) =======================

Copy link
Collaborator

@jihoonson jihoonson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @razajafri, looks good overall. I left some comments. Also, would you elaborate on what 50% (500 small files) means? Were there 500 files left after the delete? Was it only the case of 50% delete? What about other cases?

indexVectorTuples += (isRowDeletedColumnOpt.get.index -> isRowDeletedVector)
replaceVectors(batch, indexVectorTuples.toSeq: _*)
} catch {
case e: Exception => indexVectorTuples.foreach(item => item._2.close())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

close() can throw an exception, and we'd want it to be reported as well.

Suggested change
case e: Exception => indexVectorTuples.foreach(item => item._2.close())
case e: Exception =>
try {
indexVectorTuples.foreach(item => item._2.close())
} catch {
case t: Throwable =>
e.addSuppressed(t)
}

assert_gpu_and_cpu_are_equal_collect(read_parquet_sql(data_path))

fallback_readers_pre_353=["PERFILE", "MULTITHREADED", "COALESCING"]
fallback_readers_353_plus=["COALESCING"]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we fall back for the COALESCING reader here? Based on this, should it still run on GPU but with multithreaded instead?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, I should remove that

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to your previous comment, this test seems to have passed with COALESCING fallback in your previous testing. Why did it pass? Can you make sure that this test passes after your last update?

@razajafri
Copy link
Collaborator Author

Thanks @razajafri, looks good overall. I left some comments. Also, would you elaborate on what 50% (500 small files) means? Were there 500 files left after the delete? Was it only the case of 50% delete? What about other cases?

This case was first created to test the onPrem/Coalescing reader, as I wanted to make sure we are concatenating files before decoding them. This dataset has 500 files (~39M each) and around 30 deletion vector files to handle a 50% deletion of rows. I chose this because to me, this was a good middle ground to do a quick test.

But since we aren't supporting coalescing in this PR, this still provides a good insight into the performance with small(er) files

fileScan: GpuFileSourceScanExec): PartitionReaderFactory = {

if (fileScan.rapidsConf.isParquetCoalesceFileReadEnabled) {
logWarning("Coalescing is not supported when Deletion Vectors are enabled, " +
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better use actual table properties or that the user can check the docs for


if (fileScan.rapidsConf.isParquetCoalesceFileReadEnabled) {
logWarning("Coalescing is not supported when Deletion Vectors are enabled, " +
"using the multi-threaded reader")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tie it to our config / doc that the user can check

Comment on lines 322 to 325
if (results.length > 1) {
throw new IllegalArgumentException(
s"There are more than one column with name=`$name` requested in the reader output")
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a syntactic sugar for it

Suggested change
if (results.length > 1) {
throw new IllegalArgumentException(
s"There are more than one column with name=`$name` requested in the reader output")
}
require (results.length <= 1,
s"There are more than one column with name=`$name` requested in the reader output")

The use would understand this message better if it is phrased

N columns found for name ..., expected number of columns 1

or something like that

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The message is copied from OSS Delta. I think we should keep it the same to be consistent

Comment on lines 588 to 590
if (rowIndexColumnOpt.isDefined) {
indexVectorTuples += (rowIndexColumnOpt.get.index -> rowIndexGpuCol.incRefCount())
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scala way:

rowIndexColumnOpt.foreach { rowIndexColumn =>
   indexVectorTuples += (rowIndexColumn.index -> rowIndexGpuCol.incRefCount())
}

indexVectorTuples += (isRowDeletedColumnOpt.get.index -> isRowDeletedVector)
replaceVectors(batch, indexVectorTuples.toSeq: _*)
} catch {
case e: Exception => indexVectorTuples.foreach(item => item._2.close())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If one of closes throws it will leak the rest. Need to use one of safeClose closeOnExcept kind of thing

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yes. This is a better way. I thought indexVectorTuples is an Option. Since it's a list, Gera's suggestion is better.

@razajafri
Copy link
Collaborator Author

Thank you for reviewing @gerashegalov @jihoonson. I have addressed your concerns PTAL again

assert_gpu_and_cpu_are_equal_collect(read_parquet_sql(data_path))

fallback_readers_pre_353=["PERFILE", "MULTITHREADED", "COALESCING"]
fallback_readers_353_plus=["COALESCING"]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to your previous comment, this test seems to have passed with COALESCING fallback in your previous testing. Why did it pass? Can you make sure that this test passes after your last update?

@razajafri
Copy link
Collaborator Author

The test was passing earlier because FileSourceScanExec still falls back due to useMetadataRowIndex having a default value of true. Now it will be skipped for Spark 353+

Copy link
Collaborator

@jihoonson jihoonson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@razajafri
Copy link
Collaborator Author

build

@razajafri razajafri merged commit 01a5380 into NVIDIA:branch-25.10 Sep 26, 2025
60 checks passed
@razajafri razajafri deleted the SP-13402-multi-dv branch September 26, 2025 15:54
nartal1 pushed a commit to nartal1/spark-rapids that referenced this pull request Sep 30, 2025
…DIA#13491)

Fixes NVIDIA#13401
Fixes NVIDIA#13487

### Description

Problem
Previously, GPU support for reading Delta Lake DVs was only available
for the PERFILE Parquet reader. As a result, users of the multithreaded
reader would experience CPU fallback and degraded performance when
reading Delta tables with deletion vectors. This PR closes that gap,
improving performance, consistency, and feature parity.

Key changes

- Implements logic to read and process Delta Deletion Vectors within the
multithreaded Parquet reader path.
- Ensures file-order agnosticism between Spark and the GPU for robust
integration.
- The COALESCING reader falls back to the MULTITHREADED reader
regardless of the config after showing a warning.
- Updates and renames tests to validate both PERFILE, COALESCING and
MULTITHREADED reader support for DVs. The test modified is
`test_delta_deletion_vector_read`.

### Performance

  | Baseline - commit id 5d88f6a | DV MULTITHREADED READER |  
-- | -- | -- | --
 Percentage Deleted | Average | Average | Speedup
5% | 38504.6 | 29869.6 | 1.29
10% | 37225.6 | 29824.2 | 1.25
25% | 37978.4 | 29465.6 | 1.29
50% | 36255.6 | 28331 | 1.28
75% | 35527.8 | 28369.8 | 1.25
50% (500 small files) | 35464 | 28947.4 | 1.23

Baseline: commit id -
NVIDIA@5d88f6a
Target: This PR
Dataset: TPC-DS (sf100_parquet)
Environment: Local
Spark Configs

```
export SPARK_CONF=("--master" "local[16]"
                   "--conf" "spark.driver.maxResultSize=2GB"
                   "--conf" "spark.driver.memory=50G"
                   "--conf" "spark.executor.cores=16"
                   "--conf" "spark.executor.instances=1"
                   "--conf" "spark.executor.memory=16G"
                   "--conf" "spark.driver.maxResultSize=4gb"
                   "--conf" "spark.sql.files.maxPartitionBytes=2gb"
                   "--conf" "spark.sql.adaptive.enabled=true"
                   "--conf" "spark.plugins=com.nvidia.spark.SQLPlugin"
                   "--conf" "spark.rapids.memory.host.spillStorageSize=16G"
                   "--conf" "spark.rapids.memory.pinnedPool.size=8g"
                   "--conf" "spark.rapids.sql.concurrentGpuTasks=3"
                   "--conf" "spark.rapids.sql.explain=all"
                   "--conf" "spark.eventLog.enabled=true"
                   "--conf" "spark.eventLog.dir=/tmp/spark-events"
                   "--conf" "spark.sql.warehouse.dir=/home/rjafri/spark-warehouse"
                   "--conf" "spark.sql.legacy.createHiveTableByDefault=false"
                   "--conf" "spark.databricks.delta.deletionVectors.useMetadataRowIndex=false"
                   "--conf" "spark.rapids.sql.format.parquet.reader.type=MULTITHREADED"
                   "--packages" "io.delta:delta-spark_2.12:3.3.0"
                   "--conf" "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension"
                   "--conf" "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
                   "--conf" "spark.driver.extraClassPath=$SPARK_RAPIDS_PLUGIN_JAR:$NDS_LISTENER_JAR"
                   "--conf" "spark.executor.extraClassPath=$SPARK_RAPIDS_PLUGIN_JAR:$NDS_LISTENER_JAR")
```
Query:

```
-- start query 1
select *
from store_sales
ORDER BY ss_ticket_number DESC
LIMIT 1000000;
-- end query 1
```

### Checklists

- [ ] This PR has added documentation for new or modified features or
behaviors.
- [x] This PR has added new tests or modified existing tests to cover
new code paths.
(Please explain in the PR description how the new code paths are tested,
such as names of the new/existing tests that cover them.)
- [x] Performance testing has been performed and its results are added
in the PR description. Or, an issue has been filed with a link in the PR
description.

---------

Signed-off-by: Raza Jafri <[email protected]>
@sameerz sameerz added the feature request New feature or request label Sep 30, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

feature request New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Use ColumnVector::sequence method to generate the RowIndices [FEA] Add basic Deletion Vector support for Multifile readers in Delta in Scala

5 participants