Skip to content

Commit a16a2fd

Browse files
authored
[Spark] Add dataframe reader options to unblock non-additive schema changes (#4126)
## Description Non-additive schema changes - DROP/RENAME and, since https://github.com/databricks-eng/runtime/pull/124363 , type changes - in streaming block the stream until the user sets a SQL conf to unblock them: ``` spark.databricks.delta.streaming.allowSourceColumnRename spark.databricks.delta.streaming.allowSourceColumnDrop spark.databricks.delta.streaming.allowSourceColumnTypeChange ``` This change adds dataframe reader options as an alternative to SQL confs to unblock non-additive schema changes: ``` spark.readStream .option("allowSourceColumnRename", "true") .option("allowSourceColumnDrop", "true") .option("allowSourceColumnTypeChange", "true") ``` ## How was this patch tested? Extended existing tests in `DeltaSourceMetadataEvolutionSupportSuite` to also cover dataframe reader options. ## This PR introduces the following *user-facing* changes The error thrown on non-additive schema changes during streaming is updated to suggest dataframe reader options in addition to SQL confs to unblock the stream: ``` [DELTA_STREAMING_CANNOT_CONTINUE_PROCESSING_POST_SCHEMA_EVOLUTION] We've detected one or more non-additive schema change(s) (DROP) between Delta version 1 and 2 in the Delta streaming source. Please check if you want to manually propagate the schema change(s) to the sink table before we proceed with stream processing using the finalized schema at version 2. Once you have fixed the schema of the sink table or have decided there is no need to fix, you can set the following configuration(s) to unblock the non-additive schema change(s) and continue stream processing. <NEW> Using dataframe reader option(s): .option("allowSourceColumnDrop", "true") <NEW> Using SQL configuration(s): To unblock for this particular stream just for this series of schema change(s): SET spark.databricks.delta.streaming.allowSourceColumnDrop.ckpt_123456 = 2; To unblock for this particular stream: SET spark.databricks.delta.streaming.allowSourceColumnDrop.ckpt_123456 = "always"; To unblock for all streams: SET spark.databricks.delta.streaming.allowSourceColumnDrop= "always"; ``` The user can use the available reader option to unblock a given type of non-additive schema change: ``` spark.readStream .option("allowSourceColumnRename", "true") .option("allowSourceColumnDrop", "true") .option("allowSourceColumnTypeChange", "true") ```
1 parent 50a99fd commit a16a2fd

File tree

10 files changed

+225
-32
lines changed

10 files changed

+225
-32
lines changed

spark/src/main/resources/error/delta-error-classes.json

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2319,6 +2319,13 @@
23192319
"Please check if you want to manually propagate the schema change(s) to the sink table before we proceed with stream processing using the finalized schema at version <currentSchemaChangeVersion>.",
23202320
"Once you have fixed the schema of the sink table or have decided there is no need to fix, you can set the following configuration(s) to unblock the non-additive schema change(s) and continue stream processing.",
23212321
"",
2322+
"Using dataframe reader option(s):",
2323+
"To unblock for this particular stream just for this series of schema change(s):",
2324+
"<unblockChangeOptions>",
2325+
"To unblock for this particular stream:",
2326+
"<unblockStreamOptions>",
2327+
"",
2328+
"Using SQL configuration(s):",
23222329
"To unblock for this particular stream just for this series of schema change(s):",
23232330
"<unblockChangeConfs>",
23242331
"To unblock for this particular stream:",
@@ -2338,6 +2345,13 @@
23382345
"Please check if you want to update your streaming query before we proceed with stream processing using the finalized schema at version <currentSchemaChangeVersion>.",
23392346
"Once you have updated your streaming query or have decided there is no need to update it, you can set the following configuration to unblock the type change(s) and continue stream processing.",
23402347
"",
2348+
"Using dataframe reader option:",
2349+
"To unblock for this particular stream just for this series of schema change(s):",
2350+
"<unblockChangeOptions>",
2351+
"To unblock for this particular stream:",
2352+
"<unblockStreamOptions>",
2353+
"",
2354+
"Using SQL configuration:",
23412355
"To unblock for this particular stream just for this series of schema change(s):",
23422356
"<unblockChangeConfs>",
23432357
"To unblock for this particular stream:",

spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3136,7 +3136,14 @@ trait DeltaErrorsBase
31363136
previousSchemaChangeVersion: Long,
31373137
currentSchemaChangeVersion: Long,
31383138
checkpointHash: Int,
3139+
readerOptionsUnblock: Seq[String],
31393140
sqlConfsUnblock: Seq[String]): Throwable = {
3141+
val unblockChangeOptions = readerOptionsUnblock.map { option =>
3142+
s""" .option("$option", "$currentSchemaChangeVersion")"""
3143+
}.mkString("\n")
3144+
val unblockStreamOptions = readerOptionsUnblock.map { option =>
3145+
s""" .option("$option", "always")"""
3146+
}.mkString("\n")
31403147
val unblockChangeConfs = sqlConfsUnblock.map { conf =>
31413148
s""" SET $conf.ckpt_$checkpointHash = $currentSchemaChangeVersion;"""
31423149
}.mkString("\n")
@@ -3154,6 +3161,8 @@ trait DeltaErrorsBase
31543161
previousSchemaChangeVersion.toString,
31553162
currentSchemaChangeVersion.toString,
31563163
currentSchemaChangeVersion.toString,
3164+
unblockChangeOptions,
3165+
unblockStreamOptions,
31573166
unblockChangeConfs,
31583167
unblockStreamConfs,
31593168
unblockAllConfs
@@ -3165,6 +3174,7 @@ trait DeltaErrorsBase
31653174
previousSchemaChangeVersion: Long,
31663175
currentSchemaChangeVersion: Long,
31673176
checkpointHash: Int,
3177+
readerOptionsUnblock: Seq[String],
31683178
sqlConfsUnblock: Seq[String],
31693179
wideningTypeChanges: Seq[TypeChange]): Throwable = {
31703180

@@ -3173,6 +3183,12 @@ trait DeltaErrorsBase
31733183
s"${change.toType.sql}"
31743184
}.mkString("\n")
31753185

3186+
val unblockChangeOptions = readerOptionsUnblock.map { option =>
3187+
s""" .option("$option", "$currentSchemaChangeVersion")"""
3188+
}.mkString("\n")
3189+
val unblockStreamOptions = readerOptionsUnblock.map { option =>
3190+
s""" .option("$option", "always")"""
3191+
}.mkString("\n")
31763192
val unblockChangeConfs = sqlConfsUnblock.map { conf =>
31773193
s""" SET $conf.ckpt_$checkpointHash = $currentSchemaChangeVersion;"""
31783194
}.mkString("\n")
@@ -3190,6 +3206,8 @@ trait DeltaErrorsBase
31903206
currentSchemaChangeVersion.toString,
31913207
wideningTypeChangesStr,
31923208
currentSchemaChangeVersion.toString,
3209+
unblockChangeOptions,
3210+
unblockStreamOptions,
31933211
unblockChangeConfs,
31943212
unblockStreamConfs,
31953213
unblockAllConfs

spark/src/main/scala/org/apache/spark/sql/delta/DeltaOptions.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,12 @@ trait DeltaReadOptions extends DeltaOptionParser {
212212
val schemaTrackingLocation = options.get(SCHEMA_TRACKING_LOCATION)
213213

214214
val sourceTrackingId = options.get(STREAMING_SOURCE_TRACKING_ID)
215+
216+
val allowSourceColumnRename = options.get(ALLOW_SOURCE_COLUMN_RENAME)
217+
218+
val allowSourceColumnDrop = options.get(ALLOW_SOURCE_COLUMN_DROP)
219+
220+
val allowSourceColumnTypeChange = options.get(ALLOW_SOURCE_COLUMN_TYPE_CHANGE)
215221
}
216222

217223

@@ -289,6 +295,10 @@ object DeltaOptions extends DeltaLogging {
289295
*/
290296
val STREAMING_SOURCE_TRACKING_ID = "streamingSourceTrackingId"
291297

298+
val ALLOW_SOURCE_COLUMN_DROP = "allowSourceColumnDrop"
299+
val ALLOW_SOURCE_COLUMN_RENAME = "allowSourceColumnRename"
300+
val ALLOW_SOURCE_COLUMN_TYPE_CHANGE = "allowSourceColumnTypeChange"
301+
292302
/**
293303
* An option to control if delta will write partition columns to data files
294304
*/

spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaDataSource.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -439,7 +439,6 @@ object DeltaDataSource extends DatabricksLogging {
439439
parameters: Map[String, String],
440440
sourceMetadataPathOpt: Option[String] = None,
441441
mergeConsecutiveSchemaChanges: Boolean = false): Option[DeltaSourceMetadataTrackingLog] = {
442-
val options = new CaseInsensitiveStringMap(parameters.asJava)
443442

444443
DeltaDataSource.extractSchemaTrackingLocationConfig(spark, parameters)
445444
.map { schemaTrackingLocation =>
@@ -451,7 +450,7 @@ object DeltaDataSource extends DatabricksLogging {
451450

452451
DeltaSourceMetadataTrackingLog.create(
453452
spark, schemaTrackingLocation, sourceSnapshot,
454-
Option(options.get(DeltaOptions.STREAMING_SOURCE_TRACKING_ID)),
453+
parameters,
455454
sourceMetadataPathOpt,
456455
mergeConsecutiveSchemaChanges
457456
)

spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceMetadataEvolutionSupport.scala

Lines changed: 49 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -449,7 +449,7 @@ object DeltaSourceMetadataEvolutionSupport {
449449

450450
/**
451451
* Defining the different combinations of non-additive schema changes to detect them and allow
452-
* users to vet and unblock them using a corresponding SQL conf:
452+
* users to vet and unblock them using a corresponding SQL conf or reader option:
453453
* - dropping columns
454454
* - renaming columns
455455
* - widening data types
@@ -460,48 +460,61 @@ object DeltaSourceMetadataEvolutionSupport {
460460
val isDrop: Boolean
461461
val isTypeWidening: Boolean
462462
val sqlConfsUnblock: Seq[String]
463+
val readerOptionsUnblock: Seq[String]
463464
}
464465

465466
// Single types of schema change, typically caused by a single ALTER TABLE operation.
466467
private case object SchemaChangeRename extends SchemaChangeType {
467468
override val name = "RENAME COLUMN"
468-
override val sqlConfsUnblock: Seq[String] = Seq(SQL_CONF_UNBLOCK_RENAME)
469469
override val (isRename, isDrop, isTypeWidening) = (true, false, false)
470+
override val sqlConfsUnblock: Seq[String] = Seq(SQL_CONF_UNBLOCK_RENAME)
471+
override val readerOptionsUnblock: Seq[String] = Seq(DeltaOptions.ALLOW_SOURCE_COLUMN_RENAME)
470472
}
471473
private case object SchemaChangeDrop extends SchemaChangeType {
472474
override val name = "DROP COLUMN"
473475
override val (isRename, isDrop, isTypeWidening) = (false, true, false)
474476
override val sqlConfsUnblock: Seq[String] = Seq(SQL_CONF_UNBLOCK_DROP)
477+
override val readerOptionsUnblock: Seq[String] = Seq(DeltaOptions.ALLOW_SOURCE_COLUMN_DROP)
475478
}
476479
private case object SchemaChangeTypeWidening extends SchemaChangeType {
477480
override val name = "TYPE WIDENING"
478481
override val (isRename, isDrop, isTypeWidening) = (false, false, true)
479482
override val sqlConfsUnblock: Seq[String] = Seq(SQL_CONF_UNBLOCK_TYPE_CHANGE)
483+
override val readerOptionsUnblock: Seq[String] =
484+
Seq(DeltaOptions.ALLOW_SOURCE_COLUMN_TYPE_CHANGE)
480485
}
481486

482487
// Combinations of rename, drop and type change -> can be caused by a complete overwrite.
483488
private case object SchemaChangeRenameAndDrop extends SchemaChangeType {
484489
override val name = "RENAME AND DROP COLUMN"
485490
override val (isRename, isDrop, isTypeWidening) = (true, true, false)
486491
override val sqlConfsUnblock: Seq[String] = Seq(SQL_CONF_UNBLOCK_RENAME_DROP)
492+
override val readerOptionsUnblock: Seq[String] =
493+
Seq(DeltaOptions.ALLOW_SOURCE_COLUMN_RENAME, DeltaOptions.ALLOW_SOURCE_COLUMN_DROP)
487494
}
488495
private case object SchemaChangeRenameAndTypeWidening extends SchemaChangeType {
489496
override val name = "RENAME AND TYPE WIDENING"
490497
override val (isRename, isDrop, isTypeWidening) = (true, false, true)
491498
override val sqlConfsUnblock: Seq[String] =
492499
Seq(SQL_CONF_UNBLOCK_RENAME, SQL_CONF_UNBLOCK_TYPE_CHANGE)
500+
override val readerOptionsUnblock: Seq[String] =
501+
Seq(DeltaOptions.ALLOW_SOURCE_COLUMN_RENAME, DeltaOptions.ALLOW_SOURCE_COLUMN_DROP)
493502
}
494503
private case object SchemaChangeDropAndTypeWidening extends SchemaChangeType {
495504
override val name = "DROP AND TYPE WIDENING"
496505
override val (isRename, isDrop, isTypeWidening) = (false, true, true)
497506
override val sqlConfsUnblock: Seq[String] =
498507
Seq(SQL_CONF_UNBLOCK_DROP, SQL_CONF_UNBLOCK_TYPE_CHANGE)
508+
override val readerOptionsUnblock: Seq[String] =
509+
Seq(DeltaOptions.ALLOW_SOURCE_COLUMN_DROP, DeltaOptions.ALLOW_SOURCE_COLUMN_TYPE_CHANGE)
499510
}
500511
private case object SchemaChangeRenameAndDropAndTypeWidening extends SchemaChangeType {
501512
override val name = "RENAME, DROP AND TYPE WIDENING"
502513
override val (isRename, isDrop, isTypeWidening) = (true, true, true)
503514
override val sqlConfsUnblock: Seq[String] =
504515
Seq(SQL_CONF_UNBLOCK_RENAME_DROP, SQL_CONF_UNBLOCK_TYPE_CHANGE)
516+
override val readerOptionsUnblock: Seq[String] =
517+
Seq(DeltaOptions.ALLOW_SOURCE_COLUMN_DROP, DeltaOptions.ALLOW_SOURCE_COLUMN_TYPE_CHANGE)
505518
}
506519

507520
private final val allSchemaChangeTypes = Seq(
@@ -541,11 +554,12 @@ object DeltaSourceMetadataEvolutionSupport {
541554

542555
/**
543556
* Returns whether the given type of non-additive schema change was unblocked by setting one of
544-
* the corresponding SQL confs.
557+
* the corresponding SQL confs or reader options.
545558
*/
546559
private def isChangeUnblocked(
547560
spark: SparkSession,
548561
change: SchemaChangeType,
562+
options: DeltaOptions,
549563
checkpointHash: Int,
550564
schemaChangeVersion: Long): Boolean = {
551565

@@ -561,11 +575,20 @@ object DeltaSourceMetadataEvolutionSupport {
561575
validConfKeysValuePair.exists(p => getConf(p._1).contains(p._2))
562576
}
563577

564-
val isBlockedRename = change.isRename && !isUnblockedBySQLConf(SQL_CONF_UNBLOCK_RENAME) &&
578+
def isUnblockedByReaderOption(readerOption: Option[String]): Boolean = {
579+
readerOption.contains("always") || readerOption.contains(schemaChangeVersion.toString)
580+
}
581+
582+
val isBlockedRename = change.isRename &&
583+
!isUnblockedByReaderOption(options.allowSourceColumnRename) &&
584+
!isUnblockedBySQLConf(SQL_CONF_UNBLOCK_RENAME) &&
565585
!isUnblockedBySQLConf(SQL_CONF_UNBLOCK_RENAME_DROP)
566-
val isBlockedDrop = change.isDrop && !isUnblockedBySQLConf(SQL_CONF_UNBLOCK_DROP) &&
586+
val isBlockedDrop = change.isDrop &&
587+
!isUnblockedByReaderOption(options.allowSourceColumnDrop) &&
588+
!isUnblockedBySQLConf(SQL_CONF_UNBLOCK_DROP) &&
567589
!isUnblockedBySQLConf(SQL_CONF_UNBLOCK_RENAME_DROP)
568590
val isBlockedTypeChange = change.isTypeWidening &&
591+
!isUnblockedByReaderOption(options.allowSourceColumnTypeChange) &&
569592
!isUnblockedBySQLConf(SQL_CONF_UNBLOCK_TYPE_CHANGE)
570593

571594
!isBlockedRename && !isBlockedDrop && !isBlockedTypeChange
@@ -576,7 +599,7 @@ object DeltaSourceMetadataEvolutionSupport {
576599
/**
577600
* Whether to accept widening type changes:
578601
* - when true, widening type changes cause the stream to fail, requesting user to review and
579-
* unblock them via a SQL conf.
602+
* unblock them via a SQL conf or reader option.
580603
* - when false, widening type changes are rejected without possibility to unblock, similar to
581604
* any other arbitrary type change.
582605
*/
@@ -595,34 +618,45 @@ object DeltaSourceMetadataEvolutionSupport {
595618
// scalastyle:off
596619
/**
597620
* Given a non-additive operation type from a previous schema evolution, check we can process
598-
* using the new schema given any SQL conf users have explicitly set to unblock.
621+
* using the new schema given any SQL conf or dataframe reader option users have explicitly set to
622+
* unblock.
599623
* The SQL conf can take one of following formats:
600624
* 1. spark.databricks.delta.streaming.allowSourceColumn$action = "always"
601625
* -> allows non-additive schema change to propagate for all streams.
602626
* 2. spark.databricks.delta.streaming.allowSourceColumn$action.$checkpointHash = "always"
603627
* -> allows non-additive schema change to propagate for this particular stream.
604628
* 3. spark.databricks.delta.streaming.allowSourceColumn$action.$checkpointHash = $deltaVersion
605-
* -> allow non-additive schema change to propagate only for this particular stream source
629+
* -> allow non-additive schema change to propagate only for this particular stream source
630+
* table version.
631+
* The reader options can take one of the following format:
632+
* 1. .option("allowSourceColumn$action", "always")
633+
* -> allows non-additive schema change to propagate for this particular stream.
634+
* 2. .option("allowSourceColumn$action", "$deltaVersion")
635+
* -> allow non-additive schema change to propagate only for this particular stream source
606636
* table version.
607637
* where `allowSourceColumn$action` is one of:
608638
* 1. `allowSourceColumnRename` to allow column renames.
609639
* 2. `allowSourceColumnDrop` to allow column drops.
610-
* 3. `allowSourceColumnRenameAndDrop` to allow both column drops and renames.
611-
* 4. `allowSourceColumnTypeChange` to allow widening type changes.
640+
* 3. `allowSourceColumnTypeChange` to allow widening type changes.
641+
* For SQL confs only, action can also be `allowSourceColumnRenameAndDrop` to allow both column
642+
* drops and renames.
612643
*
613644
* We will check for any of these configs given the non-additive operation, and throw a proper
614-
* error message to instruct the user to set the SQL conf if they would like to unblock.
645+
* error message to instruct the user to set the SQL conf / reader options if they would like to
646+
* unblock.
615647
*
616648
* @param metadataPath The path to the source-unique metadata location under checkpoint
617649
* @param currentSchema The current persisted schema
618650
* @param previousSchema The previous persisted schema
619651
*/
620652
// scalastyle:on
621-
protected[sources] def validateIfSchemaChangeCanBeUnblockedWithSQLConf(
653+
protected[sources] def validateIfSchemaChangeCanBeUnblocked(
622654
spark: SparkSession,
655+
parameters: Map[String, String],
623656
metadataPath: String,
624657
currentSchema: PersistedMetadata,
625658
previousSchema: PersistedMetadata): Unit = {
659+
val options = new DeltaOptions(parameters, spark.sessionState.conf)
626660
val checkpointHash = getCheckpointHash(metadataPath)
627661

628662
// The start version of a possible series of consecutive schema changes.
@@ -644,7 +678,7 @@ object DeltaSourceMetadataEvolutionSupport {
644678
determineNonAdditiveSchemaChangeType(
645679
spark, currentSchema.dataSchema, previousSchema.dataSchema).foreach { change =>
646680
if (!isChangeUnblocked(
647-
spark, change, checkpointHash, currentSchemaChangeVersion)) {
681+
spark, change, options, checkpointHash, currentSchemaChangeVersion)) {
648682
// Throw error to prompt user to set the correct confs
649683
change match {
650684
case SchemaChangeTypeWidening =>
@@ -656,6 +690,7 @@ object DeltaSourceMetadataEvolutionSupport {
656690
previousSchemaChangeVersion,
657691
currentSchemaChangeVersion,
658692
checkpointHash,
693+
change.readerOptionsUnblock,
659694
change.sqlConfsUnblock,
660695
wideningTypeChanges)
661696

@@ -665,6 +700,7 @@ object DeltaSourceMetadataEvolutionSupport {
665700
previousSchemaChangeVersion,
666701
currentSchemaChangeVersion,
667702
checkpointHash,
703+
change.readerOptionsUnblock,
668704
change.sqlConfsUnblock)
669705
}
670706
}

spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceMetadataTrackingLog.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.sql.delta.sources
1919
// scalastyle:off import.ordering.noEmptyLine
2020
import java.io.InputStream
2121

22+
import scala.collection.JavaConverters._
2223
import scala.util.control.NonFatal
2324

2425
import org.apache.spark.sql.delta.streaming.{JsonSchemaSerializer, PartitionAndDataSchema, SchemaTrackingLog}
@@ -33,6 +34,7 @@ import org.apache.hadoop.fs.Path
3334
import org.apache.spark.internal.Logging
3435
import org.apache.spark.sql.SparkSession
3536
import org.apache.spark.sql.types.{DataType, StructType}
37+
import org.apache.spark.sql.util.CaseInsensitiveStringMap
3638
// scalastyle:on import.ordering.noEmptyLine
3739

3840
/**
@@ -240,10 +242,12 @@ object DeltaSourceMetadataTrackingLog extends Logging {
240242
sparkSession: SparkSession,
241243
rootMetadataLocation: String,
242244
sourceSnapshot: SnapshotDescriptor,
243-
sourceTrackingId: Option[String] = None,
245+
parameters: Map[String, String],
244246
sourceMetadataPathOpt: Option[String] = None,
245247
mergeConsecutiveSchemaChanges: Boolean = false,
246248
initMetadataLogEagerly: Boolean = true): DeltaSourceMetadataTrackingLog = {
249+
val options = new CaseInsensitiveStringMap(parameters.asJava)
250+
val sourceTrackingId = Option(options.get(DeltaOptions.STREAMING_SOURCE_TRACKING_ID))
247251
val metadataTrackingLocation = fullMetadataTrackingLocation(
248252
rootMetadataLocation, sourceSnapshot.deltaLog.tableId, sourceTrackingId)
249253
val log = new DeltaSourceMetadataTrackingLog(
@@ -296,7 +300,8 @@ object DeltaSourceMetadataTrackingLog extends Logging {
296300
(log.getPreviousTrackedMetadata, log.getCurrentTrackedMetadata, sourceMetadataPathOpt) match {
297301
case (Some(prev), Some(curr), Some(metadataPath)) =>
298302
DeltaSourceMetadataEvolutionSupport
299-
.validateIfSchemaChangeCanBeUnblockedWithSQLConf(sparkSession, metadataPath, curr, prev)
303+
.validateIfSchemaChangeCanBeUnblocked(
304+
sparkSession, parameters, metadataPath, curr, prev)
300305
case _ =>
301306
}
302307

spark/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2732,6 +2732,7 @@ trait DeltaErrorsSuiteBase
27322732
nonAdditiveSchemaChangeOpType = "RENAME AND TYPE WIDENING",
27332733
previousSchemaChangeVersion = 0,
27342734
currentSchemaChangeVersion = 1,
2735+
readerOptionsUnblock = Seq("allowSourceColumnRename", "allowSourceColumnTypeChange"),
27352736
sqlConfsUnblock = Seq(
27362737
"spark.databricks.delta.streaming.allowSourceColumnRename",
27372738
"spark.databricks.delta.streaming.allowSourceColumnTypeChange"),
@@ -2743,6 +2744,12 @@ trait DeltaErrorsSuiteBase
27432744
"opType" -> "RENAME AND TYPE WIDENING",
27442745
"previousSchemaChangeVersion" -> "0",
27452746
"currentSchemaChangeVersion" -> "1",
2747+
"unblockChangeOptions" ->
2748+
s""" .option("allowSourceColumnRename", "1")
2749+
| .option("allowSourceColumnTypeChange", "1")""".stripMargin,
2750+
"unblockStreamOptions" ->
2751+
s""" .option("allowSourceColumnRename", "always")
2752+
| .option("allowSourceColumnTypeChange", "always")""".stripMargin,
27462753
"unblockChangeConfs" ->
27472754
s""" SET spark.databricks.delta.streaming.allowSourceColumnRename.ckpt_15 = 1;
27482755
| SET spark.databricks.delta.streaming.allowSourceColumnTypeChange.ckpt_15 = 1;""".stripMargin,
@@ -2760,6 +2767,7 @@ trait DeltaErrorsSuiteBase
27602767
throw DeltaErrors.cannotContinueStreamingTypeWidening(
27612768
previousSchemaChangeVersion = 0,
27622769
currentSchemaChangeVersion = 1,
2770+
readerOptionsUnblock = Seq("allowSourceColumnTypeChange"),
27632771
sqlConfsUnblock = Seq("spark.databricks.delta.streaming.allowSourceColumnTypeChange"),
27642772
checkpointHash = 15,
27652773
wideningTypeChanges = Seq(TypeChange(None, IntegerType, LongType, Seq("a"))))
@@ -2770,6 +2778,8 @@ trait DeltaErrorsSuiteBase
27702778
"previousSchemaChangeVersion" -> "0",
27712779
"currentSchemaChangeVersion" -> "1",
27722780
"wideningTypeChanges" -> " a: INT -> BIGINT",
2781+
"unblockChangeOptions" -> " .option(\"allowSourceColumnTypeChange\", \"1\")",
2782+
"unblockStreamOptions" -> " .option(\"allowSourceColumnTypeChange\", \"always\")",
27732783
"unblockChangeConfs" ->
27742784
" SET spark.databricks.delta.streaming.allowSourceColumnTypeChange.ckpt_15 = 1;",
27752785
"unblockStreamConfs" ->

0 commit comments

Comments
 (0)