Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connector.catalog.SupportsWrite
import org.apache.spark.sql.delta.{DeltaLog, DeltaParquetFileFormat}
import org.apache.spark.sql.delta.DeltaParquetFileFormat.IS_ROW_DELETED_COLUMN_NAME
import org.apache.spark.sql.delta.catalog.{DeltaCatalog, DeltaTableV2}
import org.apache.spark.sql.delta.commands.{DeleteCommand, MergeIntoCommand, OptimizeTableCommand, UpdateCommand}
import org.apache.spark.sql.delta.rapids.DeltaRuntimeShim
import org.apache.spark.sql.delta.sources.DeltaDataSource
import org.apache.spark.sql.delta.sources.{DeltaDataSource, DeltaSQLConf}
import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.execution.datasources.{FileFormat, HadoopFsRelation, SaveIntoDataSourceCommand}
Expand Down Expand Up @@ -89,6 +90,15 @@ object Delta33xProvider extends DeltaIOProvider {
override def tagSupportForGpuFileSourceScan(meta: SparkPlanMeta[FileSourceScanExec]): Unit = {
val format = meta.wrapped.relation.fileFormat
if (format.getClass == classOf[DeltaParquetFileFormat]) {
val session = meta.wrapped.session
val useMetadataRowIndex =
session.sessionState.conf.getConf(DeltaSQLConf.DELETION_VECTORS_USE_METADATA_ROW_INDEX)
val requiredSchema = meta.wrapped.requiredSchema
val isRowDeletedCol = requiredSchema.exists(_.name == IS_ROW_DELETED_COLUMN_NAME)
if (useMetadataRowIndex && isRowDeletedCol) {
meta.willNotWorkOnGpu("we don't support generating metadata row index for " +
s"${meta.wrapped.getClass.getSimpleName}")
}
GpuReadParquetFileFormat.tagSupport(meta)
} else {
meta.willNotWorkOnGpu(s"format ${format.getClass} is not supported")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@ import ai.rapids.cudf._
import ai.rapids.cudf.HostColumnVector._
import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.Arm.withResource
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.delta.GpuDeltaParquetFileFormat
import com.nvidia.spark.rapids.parquet._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.Logging
import org.apache.spark.internal.MDC
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
Expand All @@ -52,7 +54,7 @@ case class GpuDelta33xParquetFileFormat(
optimizationsEnabled: Boolean = true,
tablePath: Option[String] = None,
isCDCRead: Boolean = false
) extends GpuDeltaParquetFileFormat {
) extends GpuDeltaParquetFileFormat with Logging {

// Validate either we have all arguments for DV enabled read or none of them.
if (hasTablePath) {
Expand Down Expand Up @@ -216,6 +218,14 @@ case class GpuDelta33xParquetFileFormat(
broadcastedConf: Broadcast[SerializableConfiguration],
pushedFilters: Array[Filter],
fileScan: GpuFileSourceScanExec): PartitionReaderFactory = {

if (fileScan.rapidsConf.isParquetCoalesceFileReadEnabled) {
logWarning("Coalescing is not supported when `delta.enableDeletionVectors=true`, " +
"using the multi-threaded reader. For more details on the Parquet reader types " +
"please look at 'spark.rapids.sql.format.parquet.reader.type' config at " +
"https://nvidia.github.io/spark-rapids/docs/additional-functionality/advanced_configs.html")
}

new DeltaMultiFileReaderFactory(
fileScan.conf,
broadcastedConf,
Expand Down Expand Up @@ -312,10 +322,8 @@ class DeltaMultiFileReaderFactory(
private val schemaWithIndices = readDataSchema.fields.zipWithIndex
def findColumn(name: String): Option[ColumnMetadata] = {
val results = schemaWithIndices.filter(_._1.name == name)
if (results.length > 1) {
throw new IllegalArgumentException(
s"There are more than one column with name=`$name` requested in the reader output")
}
require(results.length <= 1,
s"There are more than one column with name=`$name` requested in the reader output")
results.headOption.map(e => ColumnMetadata(e._2, e._1))
}

Expand Down Expand Up @@ -578,16 +586,17 @@ object RapidsDeletionVectorUtils {
metrics("rowIndexColumnGenTime") += System.nanoTime() - startTime
val indexVectorTuples = new ArrayBuffer[(Int, org.apache.spark.sql.vectorized.ColumnVector)]
try {
if (rowIndexColumnOpt.isDefined) {
indexVectorTuples += (rowIndexColumnOpt.get.index -> rowIndexGpuCol.incRefCount())
rowIndexColumnOpt.foreach { rowIndexCol =>
indexVectorTuples += (rowIndexCol.index -> rowIndexGpuCol.incRefCount())
}
startTime = System.nanoTime()
val isRowDeletedVector = rowIndexFilterOpt.get.materializeIntoVector(rowIndexGpuCol)
metrics("isRowDeletedColumnGenTime") += System.nanoTime() - startTime
indexVectorTuples += (isRowDeletedColumnOpt.get.index -> isRowDeletedVector)
replaceVectors(batch, indexVectorTuples.toSeq: _*)
} catch {
case e: Exception => indexVectorTuples.foreach(item => item._2.close())
case e: Throwable =>
indexVectorTuples.map(_._2).safeClose(e)
throw e
}
}
Expand Down
8 changes: 3 additions & 5 deletions integration_tests/src/main/python/delta_lake_delete_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,14 +138,12 @@ def read_parquet_sql(data_path):

assert_gpu_and_cpu_are_equal_collect(read_parquet_sql(data_path))

fallback_readers_pre_353=["PERFILE", "MULTITHREADED", "COALESCING"]
fallback_readers_353_plus=["COALESCING"]
@allow_non_gpu("SortExec, ColumnarToRowExec", *delta_meta_allow)
@delta_lake
@ignore_order
@pytest.mark.skipif(not supports_delta_lake_deletion_vectors(),
@pytest.mark.skipif(not supports_delta_lake_deletion_vectors() or is_spark_353_or_later(),
reason="Deletion vectors new in Delta Lake 2.4 / Apache Spark 3.4")
@pytest.mark.parametrize("reader_type", fallback_readers_pre_353 if is_before_spark_353() else fallback_readers_353_plus, ids=idfn)
@pytest.mark.parametrize("reader_type", ["PERFILE", "MULTITHREADED", "COALESCING"], ids=idfn)
def test_delta_deletion_vector_read_fallback(spark_tmp_path, reader_type):
data_path = spark_tmp_path + "/DELTA_DATA"
def setup_tables(spark):
Expand Down Expand Up @@ -175,7 +173,7 @@ def read_parquet_sql(data_path):
@ignore_order
@pytest.mark.skipif(not supports_delta_lake_deletion_vectors() or is_before_spark_353(), \
reason="Deletion vectors new in Delta Lake 2.4 / Apache Spark 3.4")
@pytest.mark.parametrize("reader_type", ["PERFILE", "MULTITHREADED"])
@pytest.mark.parametrize("reader_type", ["PERFILE", "COALESCING", "MULTITHREADED"])
# a='' shouldn't match anything as a is an int
@pytest.mark.parametrize("condition", ["where a = 0", "", "where a = ''"])
def test_delta_deletion_vector_read(spark_tmp_path, reader_type, condition):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -395,8 +395,6 @@ case class GpuFileSourceScanExec(
"numFiles" -> createMetric(ESSENTIAL_LEVEL, "number of files read"),
"metadataTime" -> createTimingMetric(ESSENTIAL_LEVEL, "metadata time"),
"filesSize" -> createSizeMetric(ESSENTIAL_LEVEL, "size of files read"),
"isRowDeletedColumnGenTime" -> createNanoTimingMetric(ESSENTIAL_LEVEL, "time skiprow gen"),
"rowIndexColumnGenTime" -> createNanoTimingMetric(ESSENTIAL_LEVEL, "time row index gen"),
GPU_DECODE_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_GPU_DECODE_TIME),
BUFFER_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_BUFFER_TIME),
FILTER_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_FILTER_TIME),
Expand All @@ -420,6 +418,14 @@ case class GpuFileSourceScanExec(
// Track the actual data size read from the file system, excluding data being pruned
// by meta-level pruning.
bf += "readBufferSize" -> createSizeMetric(DEBUG_LEVEL, "size of read buffer")
// This metric is used to post the time spent in generating the `skip_row` column
// in Delta Lake 3.3.0+
bf += "isRowDeletedColumnGenTime" ->
createNanoTimingMetric(ESSENTIAL_LEVEL, "time skiprow gen")
// This metric is used to post the time spent in generating the `row_index` column
// in Delta Lake 3.3.0+
bf += "rowIndexColumnGenTime" ->
createNanoTimingMetric(ESSENTIAL_LEVEL, "time row index gen")
}
bf.result()
case _ =>
Expand Down