package com.speechify.client.internal.util.extensions.collections.flows

import com.speechify.client.api.util.Destructible
import com.speechify.client.internal.sync.AtomicRef
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.AbstractFlow
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.emitAll
import kotlinx.coroutines.flow.firstOrNull
import kotlinx.coroutines.flow.flow

/**
 * A version of [kotlinx.coroutines.flow.firstOrNull] which can be used where it's necessary to get the first item of the flow
 * in some place _A_, before some other place _B_ starts consuming it, but all this should happen in a single
 * [Flow.collect]ion, i.e. so that the flow is not started twice (if [kotlinx.coroutines.flow.firstOrNull] was used,
 * this would cause a separate flow [Flow.collect]).
 *
 * WARNING: if the returned [FlowWithFirstElementRetrieved.flow] is never consumed,
 * [FlowWithFirstElementRetrieved.destroy] must be called to prevent leaving the parent incomplete (consequently, its
 * [kotlinx.coroutines.Job.invokeOnCompletion] will never run, while [kotlinx.coroutines.coroutineScope],
 * [kotlinx.coroutines.withContext] with it inside will never return).
 *
 * The function is useful especially where the flow is expensive to start twice, or even impossible due to its ephemeral
 * nature e.g. it can be extracting from a concurrently-editable source of items. The function makes processing
 * editable sequences possible and safe, because the producer gets a single [Flow.collect] through which it can isolate
 * a single snapshot of a window of content. The returned flow does not have any buffer or replay (except
 * replaying the first item on the first consumption), thanks to which any new [Flow.collect]s can reflect new snapshots
 * of the content. Lack of buffering also makes applying this function on the flow not cause significant memory
 * consumption by itself, leaving it entirely to downstream collection to apply any buffer only to its required size.
 *
 * NOTE: The first [Flow.collect] will emit the first item immediately (it's going to be the element observed in
 * [FlowWithFirstElementRetrieved.firstElement]), while the following items will be pulled by continuing the
 * first collection of [this], so the timing will be entirely dictated by that collection.
 *
 * NOTE: Ideally the flow should be collected only once (place _B_ should be the only consumer), which is also sensible
 * for the [expensive/ephemeral] nature of the flow.
 * If this is not the case then, after the first [Flow.collect]ion, any subsequent [Flow.collect]ions of the resulting
 * [Flow] are going to be redirected to the original flow, so:
 * - the subsequent collections will not be as quick to start as the first one.
 * - in any subsequent collection, the element observed in [FlowWithFirstElementRetrieved.firstElement] will not be
 *   emitted at the start, so if the original flow is indeed ephemeral and subsequent collections have different first
 *   element, then [FlowWithFirstElementRetrieved.firstElement] will not reflect the first element of those collections
 *   (#FirstElementOfFirstConsumptionCanBeDifferentInSubsequentCollections).
 */
@OptIn(
    ExperimentalCoroutinesApi::class,
)
internal suspend fun <T> Flow<T>.firstOrNullRetainingFlowWhilePreventingTwoConsumptionsIn(
    scope: CoroutineScope,
): FlowWithFirstElementRetrieved<T>? {
    val channelForFirstConsumption =
        producePullingOnlyWhenNeededIn(scope = scope)

    val firstElementOfFirstConsumption = try {
        channelForFirstConsumption
            .receiveAsFlow()
            .firstOrNull()
    } catch (e: Throwable) {
        channelForFirstConsumption.cancel(CancellationException("channelForFirstConsumption failed", e))
        throw e
    }
        ?: return null

    val flowForFirstConsumption = flow {
        try {
            emit(firstElementOfFirstConsumption)
        } catch (e: Throwable) {
            channelForFirstConsumption.cancel(CancellationException("consumption of first element failed", e))
            throw e
        }
        emitAll(
            channelForFirstConsumption
                /** use [kotlinx.coroutines.flow.consumeAsFlow] especially so that errors from collector cause an
                 *  immediate closing of `channelForFirstConsumption`.
                 */
                .consumeAsFlow(),
        )
    }

    return FlowWithFirstElementRetrieved(
        firstElement = firstElementOfFirstConsumption,
        flow = object : AbstractFlow<T>(), Flow<T> {
            /** Use an [AtomicRef] of the flow, so that all the state for first collection is entirely released from
             * memory after first collection (thanks to the returned flow not referencing it).
             */
            private val flowForConsumption: AtomicRef<Flow<T>> = AtomicRef(
                flowForFirstConsumption,
            )

            override suspend fun collectSafely(collector: FlowCollector<T>) {
                flowForConsumption.swap(with = this@firstOrNullRetainingFlowWhilePreventingTwoConsumptionsIn)
                    .collect(collector)
            }
        },
        destroyFn = {
            channelForFirstConsumption.cancel(
                CancellationException("firstOrNullRetainingFlow explicitly destroyed"),
            )
        },
    )
}

internal class FlowWithFirstElementRetrieved<T>(
    /**
     * NOTE: After the first [Flow.collect]ion, any subsequent [Flow.collect]ions of this flow may start from a
     * different element. See docs of [firstOrNullRetainingFlowWhilePreventingTwoConsumptionsIn] near #FirstElementOfFirstConsumptionCanBeDifferentInSubsequentCollections.
     */
    val firstElement: T,
    val flow: Flow<T>,
    private val destroyFn: () -> Unit,
) : Destructible {
    override fun destroy() =
        destroyFn()
}
