Skip to content

Commit 782f65a

Browse files
committed
feat: parallel cache check
1 parent 41b0bd9 commit 782f65a

3 files changed

Lines changed: 163 additions & 119 deletions

File tree

composeApp/src/commonMain/kotlin/com/nuvio/app/features/player/PlayerStreamsRepository.kt

Lines changed: 80 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ object PlayerStreamsRepository {
162162
val installedAddonNames = installedAddons.map { it.displayTitle }.toSet()
163163
PlayerSettingsRepository.ensureLoaded()
164164
val playerSettings = PlayerSettingsRepository.uiState.value
165+
val debridSettings = DebridSettingsRepository.snapshot()
165166
val pluginScrapers = if (AppFeaturePolicy.pluginsEnabled) {
166167
PluginRepository.initialize()
167168
PluginRepository.getEnabledScrapersForType(type)
@@ -234,6 +235,61 @@ object PlayerStreamsRepository {
234235
val job = scope.launch {
235236
val pendingStreamAddons = streamAddons.filterNot { it.addonId in warmedAddonIds }
236237
val installedAddonIds = streamAddons.map { it.addonId }.toSet()
238+
val debridAvailabilityJobs = mutableListOf<Job>()
239+
fun emptyStateReason(groups: List<AddonStreamGroup>, anyLoading: Boolean) =
240+
if (!anyLoading && groups.all { it.streams.isEmpty() }) {
241+
if (groups.all { !it.error.isNullOrBlank() }) {
242+
com.nuvio.app.features.streams.StreamsEmptyStateReason.StreamFetchFailed
243+
} else {
244+
com.nuvio.app.features.streams.StreamsEmptyStateReason.NoStreamsFound
245+
}
246+
} else {
247+
null
248+
}
249+
250+
fun presentDebridGroup(group: AddonStreamGroup): AddonStreamGroup =
251+
DebridStreamPresentation.apply(
252+
groups = listOf(group),
253+
settings = debridSettings,
254+
).firstOrNull() ?: group
255+
256+
fun publishStreamGroup(group: AddonStreamGroup) {
257+
stateFlow.update { current ->
258+
val updated = StreamAutoPlaySelector.orderAddonStreams(
259+
groups = current.groups.map { currentGroup ->
260+
if (currentGroup.addonId == group.addonId) group else currentGroup
261+
},
262+
installedOrder = installedAddonOrder,
263+
)
264+
val anyLoading = updated.any { it.isLoading }
265+
current.copy(
266+
groups = updated,
267+
isAnyLoading = anyLoading,
268+
emptyStateReason = emptyStateReason(updated, anyLoading),
269+
)
270+
}
271+
}
272+
273+
fun launchDebridAvailability(group: AddonStreamGroup) {
274+
if (group.addonId !in installedAddonIds || group.streams.isEmpty()) return
275+
276+
val eligibleGroupIds = setOf(group.addonId)
277+
val checkingGroup = TorboxAvailabilityService.markChecking(
278+
groups = listOf(group),
279+
eligibleGroupIds = eligibleGroupIds,
280+
).firstOrNull() ?: group
281+
publishStreamGroup(checkingGroup)
282+
283+
val availabilityJob = launch {
284+
val availabilityGroup = TorboxAvailabilityService.annotateCachedAvailability(
285+
groups = listOf(checkingGroup),
286+
eligibleGroupIds = eligibleGroupIds,
287+
).firstOrNull() ?: checkingGroup
288+
publishStreamGroup(presentDebridGroup(availabilityGroup))
289+
}
290+
debridAvailabilityJobs += availabilityJob
291+
}
292+
237293
val addonJobs = pendingStreamAddons.map { addon ->
238294
async {
239295
val url = buildAddonResourceUrl(
@@ -301,64 +357,33 @@ object PlayerStreamsRepository {
301357
completions.send(deferred.await())
302358
}
303359
}
304-
var debridPreparationLaunched = false
305360
repeat(jobs.size) {
306361
val result = completions.receive()
307-
stateFlow.update { current ->
308-
val updated = StreamAutoPlaySelector.orderAddonStreams(
309-
groups = current.groups.map { g -> if (g.addonId == result.addonId) result else g },
310-
installedOrder = installedAddonOrder,
311-
)
312-
val anyLoading = updated.any { it.isLoading }
313-
current.copy(
314-
groups = updated,
315-
isAnyLoading = anyLoading,
316-
emptyStateReason = if (!anyLoading && updated.all { it.streams.isEmpty() }) {
317-
if (updated.all { !it.error.isNullOrBlank() }) {
318-
com.nuvio.app.features.streams.StreamsEmptyStateReason.StreamFetchFailed
319-
} else {
320-
com.nuvio.app.features.streams.StreamsEmptyStateReason.NoStreamsFound
321-
}
322-
} else null,
323-
)
324-
}
362+
publishStreamGroup(result)
363+
launchDebridAvailability(result)
325364
}
326-
if (!debridPreparationLaunched) {
327-
debridPreparationLaunched = true
328-
val checkingGroups = TorboxAvailabilityService.markChecking(
329-
groups = stateFlow.value.groups,
330-
eligibleGroupIds = installedAddonIds,
331-
)
332-
stateFlow.update { current -> current.copy(groups = checkingGroups) }
333-
val availabilityGroups = TorboxAvailabilityService.annotateCachedAvailability(
334-
groups = stateFlow.value.groups,
335-
eligibleGroupIds = installedAddonIds,
336-
)
337-
val presentedGroups = DebridStreamPresentation.apply(
338-
groups = availabilityGroups,
339-
settings = DebridSettingsRepository.snapshot(),
340-
)
341-
stateFlow.update { current -> current.copy(groups = presentedGroups) }
342-
launch {
343-
DirectDebridStreamPreparer.prepare(
344-
streams = stateFlow.value.groups
345-
.filter { it.addonId in installedAddonIds }
346-
.flatMap { it.streams },
347-
season = season,
348-
episode = episode,
349-
playerSettings = playerSettings,
350-
installedAddonNames = installedAddonNames,
351-
) { original, prepared ->
352-
stateFlow.update { current ->
353-
current.copy(
354-
groups = DirectDebridStreamPreparer.replacePreparedStream(
355-
groups = current.groups,
356-
original = original,
357-
prepared = prepared,
358-
eligibleGroupIds = installedAddonIds,
359-
),
360-
)
361-
}
365+
for (availabilityJob in debridAvailabilityJobs) {
366+
availabilityJob.join()
367+
}
368+
launch {
369+
DirectDebridStreamPreparer.prepare(
370+
streams = stateFlow.value.groups
371+
.filter { it.addonId in installedAddonIds }
372+
.flatMap { it.streams },
373+
season = season,
374+
episode = episode,
375+
playerSettings = playerSettings,
376+
installedAddonNames = installedAddonNames,
377+
) { original, prepared ->
378+
stateFlow.update { current ->
379+
current.copy(
380+
groups = DirectDebridStreamPreparer.replacePreparedStream(
381+
groups = current.groups,
382+
original = original,
383+
prepared = prepared,
384+
eligibleGroupIds = installedAddonIds,
385+
),
386+
)
362387
}
363388
}
364389
}

composeApp/src/commonMain/kotlin/com/nuvio/app/features/streams/AddonStreamWarmupRepository.kt

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -110,14 +110,28 @@ object AddonStreamWarmupRepository {
110110
val targets = key.addonTargets
111111
if (targets.isEmpty()) return emptyList()
112112

113+
val addonIds = targets.map { it.addonId }.toSet()
113114
val orderedGroups = coroutineScope {
114115
targets.map { target ->
115116
async {
116-
fetchAddonStreams(
117+
val group = fetchAddonStreams(
117118
target = target,
118119
type = key.type,
119120
videoId = key.videoId,
120121
)
122+
val eligibleGroupIds = setOf(group.addonId)
123+
val checkingGroup = TorboxAvailabilityService.markChecking(
124+
groups = listOf(group),
125+
eligibleGroupIds = eligibleGroupIds,
126+
).firstOrNull() ?: group
127+
val availabilityGroup = TorboxAvailabilityService.annotateCachedAvailability(
128+
groups = listOf(checkingGroup),
129+
eligibleGroupIds = eligibleGroupIds,
130+
).firstOrNull() ?: checkingGroup
131+
DebridStreamPresentation.apply(
132+
groups = listOf(availabilityGroup),
133+
settings = key.settings,
134+
).firstOrNull() ?: availabilityGroup
121135
}
122136
}.awaitAll()
123137
}.let { groups ->
@@ -127,18 +141,7 @@ object AddonStreamWarmupRepository {
127141
)
128142
}
129143

130-
val addonIds = targets.map { it.addonId }.toSet()
131-
val availabilityGroups = TorboxAvailabilityService.annotateCachedAvailability(
132-
groups = TorboxAvailabilityService.markChecking(
133-
groups = orderedGroups,
134-
eligibleGroupIds = addonIds,
135-
),
136-
eligibleGroupIds = addonIds,
137-
)
138-
var preparedGroups = DebridStreamPresentation.apply(
139-
groups = availabilityGroups,
140-
settings = key.settings,
141-
)
144+
var preparedGroups = orderedGroups
142145

143146
PlayerSettingsRepository.ensureLoaded()
144147
DirectDebridStreamPreparer.prepare(

composeApp/src/commonMain/kotlin/com/nuvio/app/features/streams/StreamsRepository.kt

Lines changed: 67 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -230,14 +230,56 @@ object StreamsRepository {
230230

231231
val installedAddonNames = installedAddonOrder.toSet()
232232
val installedAddonIds = streamAddons.map { it.addonId }.toSet()
233+
val debridAvailabilityJobs = mutableListOf<Job>()
233234
var autoSelectTriggered = false
234235
var timeoutElapsed = false
235-
var debridPreparationLaunched = false
236236
fun publishCompletion(completion: StreamLoadCompletion) {
237237
if (completions.trySend(completion).isFailure) {
238238
log.d { "Ignoring late stream load completion after channel close" }
239239
}
240240
}
241+
fun presentDebridGroup(group: AddonStreamGroup): AddonStreamGroup =
242+
DebridStreamPresentation.apply(
243+
groups = listOf(group),
244+
settings = debridSettings,
245+
).firstOrNull() ?: group
246+
247+
fun publishAddonGroup(group: AddonStreamGroup) {
248+
_uiState.update { current ->
249+
val updated = StreamAutoPlaySelector.orderAddonStreams(
250+
groups = current.groups.map { currentGroup ->
251+
if (currentGroup.addonId == group.addonId) group else currentGroup
252+
},
253+
installedOrder = installedAddonOrder,
254+
)
255+
val anyLoading = updated.any { it.isLoading }
256+
current.copy(
257+
groups = updated,
258+
isAnyLoading = anyLoading,
259+
emptyStateReason = updated.toEmptyStateReason(anyLoading),
260+
)
261+
}
262+
}
263+
264+
fun launchDebridAvailability(group: AddonStreamGroup) {
265+
if (group.addonId !in installedAddonIds || group.streams.isEmpty()) return
266+
267+
val eligibleGroupIds = setOf(group.addonId)
268+
val checkingGroup = TorboxAvailabilityService.markChecking(
269+
groups = listOf(group),
270+
eligibleGroupIds = eligibleGroupIds,
271+
).firstOrNull() ?: group
272+
publishAddonGroup(checkingGroup)
273+
274+
val availabilityJob = launch {
275+
val availabilityGroup = TorboxAvailabilityService.annotateCachedAvailability(
276+
groups = listOf(checkingGroup),
277+
eligibleGroupIds = eligibleGroupIds,
278+
).firstOrNull() ?: checkingGroup
279+
publishAddonGroup(presentDebridGroup(availabilityGroup))
280+
}
281+
debridAvailabilityJobs += availabilityJob
282+
}
241283

242284
val timeoutJob = if (isAutoPlayEnabled) {
243285
val timeoutMs = playerSettings.streamAutoPlayTimeoutSeconds * 1_000L
@@ -370,20 +412,8 @@ object StreamsRepository {
370412
when (val completion = completions.receive()) {
371413
is StreamLoadCompletion.Addon -> {
372414
val result = completion.group
373-
_uiState.update { current ->
374-
val updated = StreamAutoPlaySelector.orderAddonStreams(
375-
groups = current.groups.map { group ->
376-
if (group.addonId == result.addonId) result else group
377-
},
378-
installedOrder = installedAddonOrder,
379-
)
380-
val anyLoading = updated.any { it.isLoading }
381-
current.copy(
382-
groups = updated,
383-
isAnyLoading = anyLoading,
384-
emptyStateReason = updated.toEmptyStateReason(anyLoading),
385-
)
386-
}
415+
publishAddonGroup(result)
416+
launchDebridAvailability(result)
387417
}
388418

389419
is StreamLoadCompletion.PluginScraper -> {
@@ -431,42 +461,28 @@ object StreamsRepository {
431461
}
432462
}
433463

434-
if (!debridPreparationLaunched) {
435-
debridPreparationLaunched = true
436-
val checkingGroups = TorboxAvailabilityService.markChecking(
437-
groups = _uiState.value.groups,
438-
eligibleGroupIds = installedAddonIds,
439-
)
440-
_uiState.update { current -> current.copy(groups = checkingGroups) }
441-
val availabilityGroups = TorboxAvailabilityService.annotateCachedAvailability(
442-
groups = _uiState.value.groups,
443-
eligibleGroupIds = installedAddonIds,
444-
)
445-
val presentedGroups = DebridStreamPresentation.apply(
446-
groups = availabilityGroups,
447-
settings = debridSettings,
448-
)
449-
_uiState.update { current -> current.copy(groups = presentedGroups) }
450-
launch {
451-
DirectDebridStreamPreparer.prepare(
452-
streams = _uiState.value.groups
453-
.filter { it.addonId in installedAddonIds }
454-
.flatMap { it.streams },
455-
season = season,
456-
episode = episode,
457-
playerSettings = playerSettings,
458-
installedAddonNames = installedAddonNames,
459-
) { original, prepared ->
460-
_uiState.update { current ->
461-
current.copy(
462-
groups = DirectDebridStreamPreparer.replacePreparedStream(
463-
groups = current.groups,
464-
original = original,
465-
prepared = prepared,
466-
eligibleGroupIds = installedAddonIds,
467-
),
468-
)
469-
}
464+
for (availabilityJob in debridAvailabilityJobs) {
465+
availabilityJob.join()
466+
}
467+
launch {
468+
DirectDebridStreamPreparer.prepare(
469+
streams = _uiState.value.groups
470+
.filter { it.addonId in installedAddonIds }
471+
.flatMap { it.streams },
472+
season = season,
473+
episode = episode,
474+
playerSettings = playerSettings,
475+
installedAddonNames = installedAddonNames,
476+
) { original, prepared ->
477+
_uiState.update { current ->
478+
current.copy(
479+
groups = DirectDebridStreamPreparer.replacePreparedStream(
480+
groups = current.groups,
481+
original = original,
482+
prepared = prepared,
483+
eligibleGroupIds = installedAddonIds,
484+
),
485+
)
470486
}
471487
}
472488
}

0 commit comments

Comments
 (0)