package com.speechify.client.internal.util.extensions.collections.flows

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.emitAll
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.mapNotNull
import kotlinx.coroutines.flow.merge
import kotlinx.coroutines.flow.withIndex
import kotlin.time.Duration

/**
 * Emits a fallback first item after [timeout] if the [this] upstream flow did not emit anything during that time (if it did, it doesn't affect the upstream flow)
 */
internal fun <T> Flow<T>.firstValueFallbackOnTimeout(timeout: Duration, getFallbackItem: () -> T): Flow<T> = flow {
    // Create a fallback flow that emits after a specified timeout
    val fallbackFlow = flow {
        delay(timeout)
        emit(FlowEventForFirstValueFallback.Timeout)
    }

    val originalFlowMappedToValueFromUpstream = this@firstValueFallbackOnTimeout
        .map {
            FlowEventForFirstValueFallback.ValueFromUpstream(it)
        }

    // Merge the original flow with the fallback flow
    emitAll(
        merge(originalFlowMappedToValueFromUpstream, fallbackFlow)
            /**
             * We use [withIndex] with a check if the fallback is really first to rule out any rare original-value-erasure-by-the-fallback due to two threads racing, and the fallback still getting emitted as a second item.
             * The [merge] used above [does run the flows in concurrent coroutines without limit](https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/merge.html#:~:text=All%20flows%20are%20merged%20concurrently).
             * The index ensures the fallback item is emitted only after the timeout and never fired as a second item, after the original upstream flow managed to produce its first value due to threads racing.
             */
            .withIndex()
            .mapNotNull {
                when (val item = it.value) {
                    is FlowEventForFirstValueFallback.ValueFromUpstream -> item.value
                    is FlowEventForFirstValueFallback.Timeout -> if (it.index == 0) getFallbackItem() else null
                }
            },
    )
}

private sealed class FlowEventForFirstValueFallback<T> {
    class ValueFromUpstream<T>(val value: T) : FlowEventForFirstValueFallback<T>()
    object Timeout : FlowEventForFirstValueFallback<Nothing>()
}
