package com.speechify.client.internal.util.extensions.collections.flows

import com.speechify.client.internal.util.collections.flows.FlowDestructible
import com.speechify.client.internal.util.collections.flows.MutableSharedFlowThatFinishes
import com.speechify.client.internal.util.collections.flows.SharedFlowThatFinishes
import com.speechify.client.internal.util.collections.flows.SharedFlowThatFinishesDestructible
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.AbstractFlow
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.withIndex
import kotlinx.coroutines.launch

/**
 * A version of [kotlinx.coroutines.flow.shareIn] with infinite capacity, (so an implementation of
 * [kotlinx.coroutines.flow.cache]), which only fetches one-by-one.
 *
 * In contrast, [kotlinx.coroutines.flow.shareIn] with infinite capacity makes it impossible for downstream to limit the
 * pulling to only needed items because, once started, it will pull from the upstream flow constantly because the buffer
 * capacity for replays will prevent emits from suspending - see #ShareInIsInadequateForPullsAsNeeded.
 * Even with 0-buffer a [kotlinx.coroutines.flow.shareIn] would be inappropriate for multiple collections as per
 * #ShareInIsInadequateForPullsAsNeeded.
 *
 *  * This extension is 'similar but different' from [producePullingOnlyWhenNeededIn], in that they are both 'pulling only
 *  * when needed', but this one has unlimited replay, and can be shared by multiple consumers concurrently.
 */
internal fun <T> Flow<T>.sharePullingOnlyWhenNeededIn(scope: CoroutineScope): SharedFlowThatFinishesDestructible<T> {
    return SharedFlowFromFlowPullingOnlyWhenNeeded(
        upstream = this.withIndex(),
        scope = scope,
    ).let {
        object :
            SharedFlowThatFinishesDestructible<T>,
            Flow<T> by (
                flow {
                    var myCount = 0
                    it.collect {
                        val text = it.value.toString()
                        val idxExpected = myCount++
                        check(idxExpected == it.index) {
                            """
                    |Received non consecutive index. This should never happen. myCount=$myCount , index ${it.index},
                    | value=$text expected=$idxExpected
                            """.trimMargin()
                        }
                        emit(it.value)
                    }
                }
                ) {
            override suspend fun destroyAndAwaitFinish() {
                it.destroyAndAwaitFinish()
            }

            override val replayCache: List<T>
                get() = it.replayCache.map { indexedItem -> indexedItem.value }

            override suspend fun collect(collector: FlowCollector<T>) {
                it.collect { indexedValue -> collector.emit(indexedValue.value) }
            }
        }
    }
}

private class SharedFlowFromFlowPullingOnlyWhenNeeded<T>(
    val upstream: Flow<T>,
    scope: CoroutineScope,
) :
    AbstractFlow<T>(),
    FlowDestructible<T>,
    SharedFlowThatFinishes<T> {

    private val requestedCountState = MutableStateFlow(0)

    private val readingJob = scope.launch(
        context = CoroutineName("SharedFlowPullingWhenNeeded.readingJob"),
        start = CoroutineStart.UNDISPATCHED,
        /* `UNDISPATCHED`, Just like in [sources of
            `sharedIn()`](https://github.com/Kotlin/kotlinx.coroutines/blob/17bc90bb00f17672476e1fb8f0a36b3a026406a9/kotlinx-coroutines-core/common/src/flow/operators/Share.kt#L208)
            and consistent with the intent to start 'as needed' so not starting when there is no subscription yet.*/
    ) {
        var collectedCount = 0
        requestedCountState.suspendUntilEquals(++collectedCount)

        try {
            upstream.collect {
                bufferMutable.emitToUnlimited(it)

                requestedCountState.suspendUntilEquals(++collectedCount)
            }
        } catch (e: Throwable) {
            bufferMutable.fail(e)
            // Not throwing here, as we will do so in #ThrowExceptionsFromUpstreamInConsumers
            return@launch
        }
        // Leaving the `collect` means `upstream` has no more items. Communicate this via an event:
        bufferMutable.finish()
    }

    override suspend fun collectSafely(collector: FlowCollector<T>) {
        /* #ForDebuggingJobId
        val collectorId = collectorIdCounter.getAndIncrement()
        // */
        var emittedInThisConsumptionCount = 0

        requestedCountState.incrementOrIgnore(existing = emittedInThisConsumptionCount) /* Ignore when not
             incrementing, because it simply means that the same or higher value was already requested */
        bufferReader.collect { newItem ->

            collector.emit(newItem)
            ++emittedInThisConsumptionCount

            requestedCountState.incrementOrIgnore(existing = emittedInThisConsumptionCount) /* Ignore when not
             incrementing, because it simply means that the same or higher value was already requested */
        }
    }

    /* #ForDebuggingJobId :
    private var collectorIdCounter = com.speechify.client.internal.sync.AtomicInt(value = 0)
    // */

    private val bufferMutable = MutableSharedFlowThatFinishes<T>(
        replayWithMinimumOne = Channel.UNLIMITED,
        onBufferOverflow = BufferOverflow.SUSPEND,
        /* SUSPEND is the default, but we specify explicitly, as we want
           to be able to use `emitToUnlimited` to detect if the channel turned out to be limited and drops any value
         */
    )

    private val bufferReader
        get() = bufferMutable as SharedFlowThatFinishes<T>

    override suspend fun destroyAndAwaitFinish() {
        readingJob.cancel(
            CancellationException("${SharedFlowFromFlowPullingOnlyWhenNeeded::class} was explicitly destroyed."),
        )
        readingJob.join()
    }

    override val replayCache: List<T>
        get() = bufferReader.replayCache
}
