Skip to content

Commit 512c8e5

Browse files
committed
Added stageId <--> jobId mapping in DAGScheduler
...and make sure that DAGScheduler data structures are cleaned up on job completion. Initial effort and discussion at mesos/spark#842 Conflicts: core/src/main/scala/org/apache/spark/MapOutputTracker.scala core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
1 parent 37126e8 commit 512c8e5

File tree

9 files changed

+286
-88
lines changed

9 files changed

+286
-88
lines changed

core/src/main/scala/org/apache/spark/MapOutputTracker.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,7 @@ private[spark] class MapOutputTracker extends Logging {
247247
case Some(bytes) =>
248248
return bytes
249249
case None =>
250-
statuses = mapStatuses(shuffleId)
250+
statuses = mapStatuses.getOrElse(shuffleId, Array[MapStatus]())
251251
epochGotten = epoch
252252
}
253253
}
@@ -261,9 +261,13 @@ private[spark] class MapOutputTracker extends Logging {
261261
cachedSerializedStatuses(shuffleId) = bytes
262262
}
263263
}
264-
return bytes
264+
bytes
265265
}
266266

267+
def has(shuffleId: Int): Boolean = {
268+
cachedSerializedStatuses.get(shuffleId).isDefined || mapStatuses.contains(shuffleId)
269+
}
270+
267271
// Serialize an array of map output locations into an efficient byte format so that we can send
268272
// it to reduce tasks. We do this by compressing the serialized bytes using GZIP. They will
269273
// generally be pretty compressible because many map outputs will be on the same hostname.

0 commit comments

Comments
 (0)