Skip to content

Commit e43edde

Browse files
cloud-fanPavithraRamachandran
authored andcommitted
[SPARK-28878][SQL][FOLLOWUP] Remove extra project for DSv2 streaming scan
### What changes were proposed in this pull request? Remove the project node if the streaming scan is columnar ### Why are the changes needed? This is a followup of apache#25586. Batch and streaming share the same DS v2 read API so both can support columnar reads. We should apply apache#25586 to streaming scan as well. ### Does this PR introduce any user-facing change? no ### How was this patch tested? existing tests Closes apache#25727 from cloud-fan/follow. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 7c281b5 commit e43edde

1 file changed

Lines changed: 21 additions & 8 deletions

File tree

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -155,17 +155,30 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper {
155155

156156
case r: StreamingDataSourceV2Relation if r.startOffset.isDefined && r.endOffset.isDefined =>
157157
val microBatchStream = r.stream.asInstanceOf[MicroBatchStream]
158-
// ensure there is a projection, which will produce unsafe rows required by some operators
159-
ProjectExec(r.output,
160-
MicroBatchScanExec(
161-
r.output, r.scan, microBatchStream, r.startOffset.get, r.endOffset.get)) :: Nil
158+
val scanExec = MicroBatchScanExec(
159+
r.output, r.scan, microBatchStream, r.startOffset.get, r.endOffset.get)
160+
161+
val withProjection = if (scanExec.supportsColumnar) {
162+
scanExec
163+
} else {
164+
// Add a Project here to make sure we produce unsafe rows.
165+
ProjectExec(r.output, scanExec)
166+
}
167+
168+
withProjection :: Nil
162169

163170
case r: StreamingDataSourceV2Relation if r.startOffset.isDefined && r.endOffset.isEmpty =>
164171
val continuousStream = r.stream.asInstanceOf[ContinuousStream]
165-
// ensure there is a projection, which will produce unsafe rows required by some operators
166-
ProjectExec(r.output,
167-
ContinuousScanExec(
168-
r.output, r.scan, continuousStream, r.startOffset.get)) :: Nil
172+
val scanExec = ContinuousScanExec(r.output, r.scan, continuousStream, r.startOffset.get)
173+
174+
val withProjection = if (scanExec.supportsColumnar) {
175+
scanExec
176+
} else {
177+
// Add a Project here to make sure we produce unsafe rows.
178+
ProjectExec(r.output, scanExec)
179+
}
180+
181+
withProjection :: Nil
169182

170183
case WriteToDataSourceV2(writer, query) =>
171184
WriteToDataSourceV2Exec(writer, planLater(query)) :: Nil

0 commit comments

Comments
 (0)