Skip to content

Commit 9161abc

Browse files
Implement update command for deltalake 3.3.x (#13014)
Close #12988 . --------- Signed-off-by: liurenjie1024 <[email protected]>
1 parent 4f02a77 commit 9161abc

File tree

4 files changed

+431
-2
lines changed

4 files changed

+431
-2
lines changed

delta-lake/delta-33x/src/main/scala/com/nvidia/spark/rapids/delta/delta33x/Delta33xProvider.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import com.nvidia.spark.rapids.delta.DeltaIOProvider
2222
import org.apache.spark.sql.delta.DeltaParquetFileFormat
2323
import org.apache.spark.sql.delta.DeltaParquetFileFormat.{IS_ROW_DELETED_COLUMN_NAME, ROW_INDEX_COLUMN_NAME}
2424
import org.apache.spark.sql.delta.catalog.DeltaCatalog
25-
import org.apache.spark.sql.delta.commands.{DeleteCommand, MergeIntoCommand}
25+
import org.apache.spark.sql.delta.commands.{DeleteCommand, MergeIntoCommand, UpdateCommand}
2626
import org.apache.spark.sql.delta.rapids.DeltaRuntimeShim
2727
import org.apache.spark.sql.execution.FileSourceScanExec
2828
import org.apache.spark.sql.execution.command.RunnableCommand
@@ -39,6 +39,10 @@ object Delta33xProvider extends DeltaIOProvider {
3939
"Delete rows from a Delta Lake table",
4040
(a, conf, p, r) => new DeleteCommandMeta(a, conf, p, r))
4141
.disabledByDefault("Delta Lake delete support is experimental"),
42+
GpuOverrides.runnableCmd[UpdateCommand](
43+
"Update rows from a Delta Lake table",
44+
(a, conf, p, r) => new UpdateCommandMeta(a, conf, p, r))
45+
.disabledByDefault("Delta Lake update support is experimental"),
4246
GpuOverrides.runnableCmd[MergeIntoCommand](
4347
"Merge of a source query/table into a Delta Lake table",
4448
(a, conf, p, r) => new MergeIntoCommandMeta(a, conf, p, r))
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Copyright (c) 2025, NVIDIA CORPORATION.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.nvidia.spark.rapids.delta.delta33x
18+
19+
import com.nvidia.spark.rapids.{DataFromReplacementRule, RapidsConf, RapidsMeta, RunnableCommandMeta}
20+
import com.nvidia.spark.rapids.delta.RapidsDeltaUtils
21+
22+
import org.apache.spark.sql.delta.commands.{DeletionVectorUtils, UpdateCommand}
23+
import org.apache.spark.sql.delta.rapids.GpuDeltaLog
24+
import org.apache.spark.sql.delta.rapids.delta33x.GpuUpdateCommand
25+
import org.apache.spark.sql.delta.sources.DeltaSQLConf
26+
import org.apache.spark.sql.execution.command.RunnableCommand
27+
28+
class UpdateCommandMeta(
29+
updateCmd: UpdateCommand,
30+
conf: RapidsConf,
31+
parent: Option[RapidsMeta[_, _, _]],
32+
rule: DataFromReplacementRule)
33+
extends RunnableCommandMeta[UpdateCommand](updateCmd, conf, parent, rule) {
34+
35+
override def tagSelfForGpu(): Unit = {
36+
if (!conf.isDeltaWriteEnabled) {
37+
willNotWorkOnGpu("Delta Lake output acceleration has been disabled. To enable set " +
38+
s"${RapidsConf.ENABLE_DELTA_WRITE} to true")
39+
}
40+
41+
val dvFeatureEnabled = DeletionVectorUtils.deletionVectorsWritable(
42+
updateCmd.tahoeFileIndex.deltaLog.unsafeVolatileSnapshot)
43+
44+
if (dvFeatureEnabled && updateCmd.conf.getConf(
45+
DeltaSQLConf.DELETE_USE_PERSISTENT_DELETION_VECTORS)) {
46+
// https://github.com/NVIDIA/spark-rapids/issues/8554
47+
willNotWorkOnGpu("Deletion vectors are not supported on GPU")
48+
}
49+
50+
RapidsDeltaUtils.tagForDeltaWrite(this, updateCmd.target.schema,
51+
Some(updateCmd.tahoeFileIndex.deltaLog),
52+
Map.empty, updateCmd.tahoeFileIndex.spark)
53+
}
54+
55+
override def convertToGpu(): RunnableCommand = {
56+
GpuUpdateCommand(
57+
new GpuDeltaLog(updateCmd.tahoeFileIndex.deltaLog, conf),
58+
updateCmd.tahoeFileIndex,
59+
updateCmd.catalogTable,
60+
updateCmd.target,
61+
updateCmd.updateExpressions,
62+
updateCmd.condition
63+
)
64+
}
65+
}

0 commit comments

Comments
 (0)