package com.speechify.client.internal.util.extensions.collections.flows

import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.flow.consumeAsFlow
import kotlinx.coroutines.flow.filterIsInstance
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.produceIn
import kotlinx.coroutines.flow.receiveAsFlow
import kotlinx.coroutines.flow.transform

/**
 * Almost like [kotlinx.coroutines.flow.produceIn] but with no buffer at all (not even the [kotlinx.coroutines.channels.Channel.RENDEZVOUS],
 * which, despite having '0'-capacity, actually causes producer to produce one item and await a consumer)
 * Here, the producer does not start to produce the next item until a collector definitely requires it, that is,
 * the producer flow will not have its [kotlinx.coroutines.flow.FlowCollector.emit] call returned from until:
 * - the consumer's [kotlinx.coroutines.flow.Flow.collect]'s `collector` returns
 * - and the consumer did not terminate the consumption. E.g. in case a collector terminates using
 *   [kotlinx.coroutines.flow.first], then the producer's [kotlinx.coroutines.flow.FlowCollector.emit] will be returned
 *   from only when a new collection starts.
 *
 * This extension is 'similar but different' from [sharePullingOnlyWhenNeededIn], in that they are both 'pulling only
 * when needed', but this one has 0-replay, so it's not really intended for 'sharing' (and is not a replacement for
 * broadcast-channel/cache).
 *
 * Some issue at Kotlin regarding 'pulling only when needed' is [here](https://github.com/Kotlin/kotlinx.coroutines/issues/2603)
 * which seems related, but that one pertains to use-cases of proper 'shared flow', where the items would be broadcast
 * to different consumers, so despite [elizarov's suggestion that this falls outside of Flows](https://github.com/Kotlin/kotlinx.coroutines/issues/2603#issuecomment-1403258168),
 * what is done in hereby extension doesn't seem wrong, because it's just what is already offered by [kotlinx.coroutines.flow.produceIn],
 * with a very minor modification, and seems to be useful for when it's desired that a flow can be collected initially
 * by one collector, and then another, and some can even apply conditionally (e.g. exception preventing further
 * collections) and it's necessary that the producer does not produce unconsumed items.
*/
internal fun <T> Flow<T>.producePullingOnlyWhenNeededIn(
    scope: CoroutineScope,
): UnicastChannelCollectibleOnlyAsFlow<T> {
    val channel =
        transform { item ->
            emit(ProducerEvent.NextItem(item))
            /** #PullingOnlyWhenNeededUsingWrapper
             * We emit this dummy [ProducerEvent.WaitingForConsumer] event to keep the [kotlinx.coroutines.flow.produceIn]
             * from pulling the upstream flow for next item, because it will pull as soon as the consumer collected the
             * current item (these are the semantics of [kotlinx.coroutines.channels.Channel.RENDEZVOUS]).
             * This is essentially a variant of [this suggestion](https://github.com/Kotlin/kotlinx.coroutines/issues/2603#issuecomment-808859170)
             * but using a bespoke class [ProducerEvent].
             * (Did not choose [this suggestion](https://github.com/Kotlin/kotlinx.coroutines/issues/2603#issuecomment-1480598729)
             * because using [kotlinx.coroutines.newSingleThreadContext] seems restricting and of much bigger consequences -
             * we don't want to alter threading strategy.)
             */
            emit(ProducerEvent.WaitingForConsumer)
        }
            /**
             * Use  [Channel.RENDEZVOUS] capacity so that nothing is buffered or stored for replay.
             */
            .buffer(capacity = Channel.RENDEZVOUS)
            /** Use [produceIn] which creates a [kotlinx.coroutines.channels.ReceiveChannel].
             * Notably, Using [kotlinx.coroutines.flow.shareIn] is not appropriate here because neither strategy of
             * [kotlinx.coroutines.flow.SharingStarted] is suitable (#ShareInIsInadequateForPullsAsNeeded):
             * - [kotlinx.coroutines.flow.SharingStarted.WhileSubscribed] causes the 'first-item-reading' collection to finish
             *   the upstream flow, so each reading of the resulting flows would be another separate
             *   collections of upstream.
             * - [kotlinx.coroutines.flow.SharingStarted.Eagerly] and [kotlinx.coroutines.flow.SharingStarted.Lazily]
             *   actually continue to pull the upstream flow after the first collection, without any limits, even when there's
             *   no-one listening even with `buffer=0` and `replay=0`, as per
             *   [_" Note that in this case all values emitted by the upstream beyond the most recent values as specified by replay parameter will be immediately discarded."_](https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/share-in.html#:~:text=parameter%20will%20be-,immediately,-discarded.)
             */
            .produceIn(
                scope = scope,
            )

    /** Now we need to map back from [ProducerEvent.NextItem] to values. Theoretically, we could implement our own
     * [kotlinx.coroutines.channels.ReceiveChannel], like [kotlinx.coroutines.flow.produceIn] returns, but it's pretty
     * complex, and for now we just need the flow-based accessors.
     */
    return object : UnicastChannelCollectibleOnlyAsFlow<T> {
        override fun consumeAsFlow(): Flow<T> =
            channel.consumeAsFlow()
                .toOutputFlow()

        override fun receiveAsFlow(): Flow<T> =
            channel.receiveAsFlow()
                .toOutputFlow()

        override fun cancel(cause: CancellationException) =
            channel.cancel(cause)

        private fun Flow<ProducerEvent<T>>.toOutputFlow(): Flow<T> =
            filterIsInstance<ProducerEvent.NextItem<T>>()
                .map { it.item }
    }
}

/**
 * NOTE: In flow terms, [kotlinx.coroutines.channels.ReceiveChannel] is a **hot flow** which doesn't have a replay.
 *
 * Consequently, regarding use of 'multiple collectors concurrently':
 * - Firstly, 'concurrently' is not necessarily the only use case for components with this interface. Rather it's
 *   perfectly fine and common to have multiple collectors read from it 'in sequence', and the object gets passed
 *   to the next consumer by another consumer when it finishes.
 * - But when starting 'multiple collectors concurrently', note that [kotlinx.coroutines.channels.Channel]s are only
 *   unicast channels ([kotlinx.coroutines.channels.BroadcastChannel] was deprecated), which means that in case of
 *   'multiple collectors at a time', an element will be received by only one of them (in a ['fair fashion'](https://kotlinlang.org/docs/channels.html#channels-are-fair)).
 */
internal interface UnicastChannelCollectibleOnlyAsFlow<T> {
    /**
     * NOTE: When using this function, the channel will be closed if the collector fails.
     * Also, the resulting flow will be collectible only once (to support the failure semantics).
     *
     * See [kotlinx.coroutines.channels.Channel.consumeAsFlow].
     */
    fun consumeAsFlow(): Flow<T>

    /**
     * NOTE: remember to [cancel] the channel on error when consuming the [Flow] returned here.
     *
     * See [kotlinx.coroutines.channels.Channel.receiveAsFlow].
     */
    fun receiveAsFlow(): Flow<T>

    /**
     * See [kotlinx.coroutines.channels.Channel.cancel].
     */
    fun cancel(cause: CancellationException)
}

/**
 * A wrapper needed to implement 'pulling only when needed' - see #PullingOnlyWhenNeededUsingWrapper.
 */
private sealed class ProducerEvent<out T> {
    class NextItem<T>(val item: T) : ProducerEvent<T>()
    object WaitingForConsumer : ProducerEvent<Nothing>()
}
