Skip to content

Improve performance #202

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,15 @@ internal class Segment(
return state == null || state !is State.Eos
}

fun needsSleep(): Boolean {
when(val s = state ?: return false) {
is State.Ok -> return false
is State.Retry -> return false
is State.Wait -> return s.sleep
}
}

fun release() {
pipeline.release()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,14 @@ internal class AudioEngine(

override fun drain(): State<EncoderData> {
if (chunks.isEmpty()) {
// nothing was enqueued
log.i("drain(): no chunks, waiting...")
return State.Wait
return State.Wait(false)
}
val (outBytes, outId) = next.buffer() ?: return run {
// dequeueInputBuffer failed
log.i("drain(): no next buffer, waiting...")
State.Wait
State.Wait(true)
}
val outBuffer = outBytes.asShortBuffer()
return chunks.drain(
Expand Down Expand Up @@ -115,4 +117,4 @@ internal class AudioEngine(
State.Ok(EncoderData(outBytes, outId, timeUs))
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ internal class Decoder(
}

override fun buffer(): Pair<ByteBuffer, Int>? {
val id = codec.dequeueInputBuffer(0)
val id = codec.dequeueInputBuffer(100)
return if (id >= 0) {
dequeuedInputs++
buffers.getInputBuffer(id) to id
Expand All @@ -88,11 +88,11 @@ internal class Decoder(
}

override fun drain(): State<DecoderData> {
val result = codec.dequeueOutputBuffer(info, 0)
val result = codec.dequeueOutputBuffer(info, 100)
return when (result) {
INFO_TRY_AGAIN_LATER -> {
log.i("drain(): got INFO_TRY_AGAIN_LATER, waiting.")
State.Wait
State.Wait(true)
}
INFO_OUTPUT_FORMAT_CHANGED -> {
log.i("drain(): got INFO_OUTPUT_FORMAT_CHANGED, handling format and retrying. format=${codec.outputFormat}")
Expand All @@ -116,8 +116,9 @@ internal class Decoder(
}
if (isEos) State.Eos(data) else State.Ok(data)
} else {
// frame was dropped, no need to sleep
codec.releaseOutputBuffer(result, false)
State.Wait
State.Wait(false)
}.also {
log.v("drain(): returning $it")
}
Expand All @@ -130,4 +131,4 @@ internal class Decoder(
codec.stop()
codec.release()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ internal class Encoder(
}

override fun buffer(): Pair<ByteBuffer, Int>? {
val id = codec.dequeueInputBuffer(0)
val id = codec.dequeueInputBuffer(100)
return if (id >= 0) {
dequeuedInputs++
buffers.getInputBuffer(id) to id
Expand Down Expand Up @@ -107,7 +107,7 @@ internal class Encoder(
}

override fun drain(): State<WriterData> {
val timeoutUs = if (eosReceivedButNotEnqueued) 5000L else 0L
val timeoutUs = if (eosReceivedButNotEnqueued) 5000L else 100L
return when (val result = codec.dequeueOutputBuffer(info, timeoutUs)) {
INFO_TRY_AGAIN_LATER -> {
if (eosReceivedButNotEnqueued) {
Expand All @@ -118,7 +118,7 @@ internal class Encoder(
State.Eos(WriterData(buffer, 0L, 0) {})
} else {
log.i("Can't dequeue output buffer: INFO_TRY_AGAIN_LATER")
State.Wait
State.Wait(true)
}
}
INFO_OUTPUT_FORMAT_CHANGED -> {
Expand Down Expand Up @@ -160,4 +160,4 @@ internal class Encoder(
codec.stop()
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ internal class Reader(
private inline fun nextBufferOrWait(action: (ByteBuffer, Int) -> State<ReaderData>): State<ReaderData> {
val buffer = next.buffer()
if (buffer == null) {
// dequeueInputBuffer failed
log.v("Returning State.Wait because buffer is null.")
return State.Wait
return State.Wait(true)
} else {
return action(buffer.first, buffer.second)
}
Expand All @@ -46,7 +47,7 @@ internal class Reader(
}
} else if (!source.canReadTrack(track)) {
log.i("Returning State.Wait because source can't read $track right now.")
State.Wait
State.Wait(false)
} else {
nextBufferOrWait { byteBuffer, id ->
chunk.buffer = byteBuffer
Expand All @@ -55,4 +56,4 @@ internal class Reader(
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,27 @@ internal class Pipeline private constructor(name: String, private val chain: Lis
chain.forEachIndexed { index, step ->
if (index < head) return@forEachIndexed
val fresh = head == 0 || index != head
state = executeStep(state, step, fresh) ?: run {
log.v("execute(): step ${step.name} (#$index/${chain.size}) is waiting. headState=$headState headIndex=$headIndex")
return State.Wait
}
// log.v("execute(): executed ${step.name} (#$index/${chain.size}). result=$state")
if (state is State.Eos) {
log.i("execute(): EOS from ${step.name} (#$index/${chain.size}).")
headState = state
headIndex = index + 1

fun executeStep(fresh: Boolean): State.Wait<Any>? {
return when (val newState = step.step(state, fresh)) {
is State.Eos -> {
state = newState
log.i("execute(): EOS from ${step.name} (#$index/${chain.size}).")
headState = newState
headIndex = index + 1
null
}
is State.Ok -> {
state = newState
null
}
is State.Retry -> executeStep(fresh = false)
is State.Wait -> return newState
}
}

val wait = executeStep(fresh)
if (wait != null) return State.Wait(wait.sleep)
}
return when {
chain.isEmpty() -> State.Eos(Unit)
Expand All @@ -47,15 +58,6 @@ internal class Pipeline private constructor(name: String, private val chain: Lis
chain.forEach { it.release() }
}

private fun executeStep(previous: State.Ok<Any>, step: AnyStep, fresh: Boolean): State.Ok<Any>? {
val state = step.step(previous, fresh)
return when (state) {
is State.Ok -> state
is State.Retry -> executeStep(previous, step, fresh = false)
is State.Wait -> null
}
}

companion object {
@Suppress("UNCHECKED_CAST")
internal fun build(name: String, builder: () -> Builder<*, Channel> = { Builder<Unit, Channel>() }): Pipeline {
Expand All @@ -79,4 +81,4 @@ internal operator fun <
other: Step<CurrData, CurrChannel, NewData, NewChannel>
): Pipeline.Builder<NewData, NewChannel> {
return Pipeline.Builder<CurrData, CurrChannel>(listOf(this)) + other
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ internal sealed class State<out T> {
}

// couldn't run, but might in the future
object Wait : State<Nothing>() {
override fun toString() = "State.Wait"
class Wait<T>(val sleep: Boolean) : State<T>() {
override fun toString() = "State.Wait($sleep)"
}

// call again as soon as possible
object Retry : State<Nothing>() {
override fun toString() = "State.Retry"
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ internal class DefaultThumbnailsEngine(
}

companion object {
private val WAIT_MS = 10L
private val WAIT_MS = 2L
private val PROGRESS_LOOPS = 10L
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,16 +117,17 @@ internal class DefaultTranscodeEngine(
log.v("transcode(): executed step=$loop advanced=$advanced completed=$completed")
if (Thread.interrupted()) {
throw InterruptedException()
} else if (completed) {
}
if (completed) {
progress(1.0)
break
}

if (!advanced) {
if (!advanced && audio?.needsSleep() != false && video?.needsSleep() != false) {
Thread.sleep(WAIT_MS)
}

if (++loop % PROGRESS_LOOPS == 0L) {
if (advanced && ++loop % PROGRESS_LOOPS == 0L) {
val audioProgress = timer.progress.audio
val videoProgress = timer.progress.video
log.v("transcode(): got progress, video=$videoProgress audio=$audioProgress")
Expand All @@ -145,7 +146,7 @@ internal class DefaultTranscodeEngine(


companion object {
private val WAIT_MS = 10L
private val WAIT_MS = 2L
private val PROGRESS_LOOPS = 10L
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,12 @@ internal class VideoRenderer(
State.Ok(state.value.timeUs)
} else {
state.value.release(false)
State.Wait
State.Wait(false)
}
}
}

override fun release() {
frameDrawer.release()
}
}
}