package com.speechify.client.internal.util.collections.flows

import com.speechify.client.api.diagnostics.DiagnosticEvent
import com.speechify.client.api.diagnostics.Log
import com.speechify.client.internal.sync.AtomicBool
import com.speechify.client.internal.sync.AtomicRef
import com.speechify.client.internal.util.extensions.collections.sendToUnlimited
import com.speechify.client.internal.util.extensions.collections.takeWhileInstanceOf
import com.speechify.client.internal.util.intentSyntax.ifNotNull
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ClosedSendChannelException
import kotlinx.coroutines.flow.AbstractFlow
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asSharedFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.receiveAsFlow

/**
 * The Kotlin's idiomatic [SharedFlow] never finishes (which is indicated by [Nothing] being the return type of
 * [SharedFlow.collect]).
 * Use this if your flow has some semantics of a shared flow (e.g. it is Hot, and it has [replayCache])
 * but can actually finish (so implements [Flow.collect] with [Unit] result).
 *
 *
 * ## Rationale of having this interface
 *
 * 'Finishing Shared Flows' are perfectly sensible, for example, there's no reason why
 * `generateSequence({ getSomeExpensiveResourceOrNullForFinish() }).asFlow().shareIn(someScope)`, which nicely
 * saves from multiple expensive operations and even provides the `replayCache`, should hang after the
 * last item. It seems to be purely Kotlin's decision to conflate being 'shared' with 'state flows' (which indeed don't
 * finish as they always await the next state change), and consequently to declare that [_"SharedFlow cannot be closed
 * like BroadcastChannel and can never represent a failure. All errors and completion signals should be explicitly
 * materialized if needed"_](https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-shared-flow/)
 *
 * The hereby types also make the code intent clear where a construct like this is needed: making sure every consumption has a
 * terminating condition is error-prone, while using `null` events has cryptic intent and does not allow passing nulls).
 */
internal interface SharedFlowThatFinishes<E> : Flow<E> {
    /**
     * A snapshot of the replay cache.
     */
    val replayCache: List<E>
}

internal interface SharedFlowThatFinishesDestructible<E> : FlowDestructible<E>, SharedFlowThatFinishes<E>

/**
 * An interface that groups methods for emitting and finishing a mutable flow, so, e.g. in this interface it is not
 * determined whether the flow has any replay cache (like e.g. [MutableHotFlowThatFinishes]) or doesn't
 * (like e.g. [MutableHotQueueFlow]).
 *
 * The flow also doesn't necessarily need to be a _shared_ one - a single consumer is possible too (even
 * guarded by the implementation). The only important thing is that it is _hot_, so a collection of its items are
 * already be taking up space in memory (even if empty), and the moment of consumption may matter, so it is not
 * guaranteed that all items that its source ever produced will be returned (yet again, some implementations may give
 * this guarantee).
 */
internal interface MutableHotFlowThatFinishes<E> : Flow<E> {

    /**
     * @throws TriedEmittingItemToCollectionInFinishedStateError if the flow has [finish]ed correctly already.
     * @throws Throwable the flow has [fail]ed already, then it will be the error that was passed to [fail].
     */
    suspend fun emit(value: E)

    /**
     * @throws TriedEmittingItemToCollectionInFinishedStateError if the flow has [finish]ed correctly already.
     * @throws Throwable the flow has [fail]ed already, then it will be the error that was passed to [fail].
     */
    fun tryEmit(value: E): Boolean

    suspend fun finish()

    suspend fun fail(reason: Throwable)
}

internal class TriedEmittingItemToCollectionInFinishedStateError(
    message: String,
    cause: Throwable? = null,
) : Error(message, cause)

/**
 * A shared-flow that has characteristics like [MutableSharedFlow], but which materializes finishes/errors of the flow
 * (see [SharedFlowThatFinishes] for more rationale).
 *
 * @param replayWithMinimumOne Needs to be at least 1 in this implementation (so the parameter is also devoid of
 * default, to make the caller aware). Use []
 * This seemed like the simplest way to implement thread-safety (by always putting the terminating item onto the base
 * flow, no matter any racing reader, every collect will eventually get the item and finish).
 */
@OptIn(ExperimentalCoroutinesApi::class)
internal class MutableSharedFlowThatFinishes<E>(
    // Parameters taken from `MutableSharedFlow`
    replayWithMinimumOne: Int, /* Except this one is slightly modified, as per KDoc */
    extraBufferCapacity: Int = 0,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
) : AbstractFlow<E>(), SharedFlowThatFinishes<E>, MutableHotFlowThatFinishes<E> {
    init {
        require(replayWithMinimumOne > 0) {
            "`replay` needs to be at least one for `MutableSharedFlowThatFinishes`" +
                " (to store the finish event)"
        }
    }

    private val isFinished = AtomicBool(false)

    private val exceptionForFinish = AtomicRef<Throwable?>(null)

    private val baseFlow = MutableSharedFlow<FiniteCollectionEventForSharedFlow<E>>(
        replay = replayWithMinimumOne,
        extraBufferCapacity = extraBufferCapacity,
        onBufferOverflow = onBufferOverflow,
    )

    private val itemsFlow = baseFlow
        .asSharedFlow()
        .toNormalFlowThatFinises()

    override suspend fun fail(reason: Throwable) {
        isFinished.set(true)
        exceptionForFinish.compareAndSet(expected = null, with = reason)
            .also {
                if (!it) {
                    Log.e(
                        DiagnosticEvent(
                            message = "Tried to fail `MutableSharedFlowThatFinishes` twice. This should not happen to" +
                                " not lose existing error information. See the event's error for the second reason.",
                            nativeError = reason,
                            sourceAreaId = "SharedFlowThatFinishes.fail",
                        ),
                    )
                }
            }
        baseFlow.emit(FiniteCollectionEventForSharedFlow.Exception(reason))
    }

    override suspend fun finish() {
        isFinished.set(true)
        baseFlow.emit(FiniteCollectionEventForSharedFlow.FinishedItems)
    }

    override suspend fun collectSafely(collector: FlowCollector<E>) =
        itemsFlow.collect(collector)

    override val replayCache: List<E>
        get() = baseFlow.replayCache.filterIsInstance<FiniteCollectionEventForSharedFlow.ItemAvailable<E>>()
            .map { it.item }

    val subscriptionCount: StateFlow<Int>
        get() = baseFlow.subscriptionCount

    override fun tryEmit(value: E): Boolean {
        if (isFinished.get()) {
            ifNotNull(exceptionForFinish.value) {
                throw it
            }

            throw TriedEmittingItemToCollectionInFinishedStateError(
                "Attempt to `tryEmit` an item on a flow that is already finished",
            )
        }

        return baseFlow.tryEmit(FiniteCollectionEventForSharedFlow.ItemAvailable(value))
    }

    override suspend fun emit(value: E) {
        if (isFinished.get()) {
            ifNotNull(exceptionForFinish.value) {
                throw it
            }

            throw TriedEmittingItemToCollectionInFinishedStateError(
                "Attempt to `emit` an item on a flow that is already finished",
            )
        }

        baseFlow.emit(FiniteCollectionEventForSharedFlow.ItemAvailable(value))
    }
}

/**
 * Required for converting a Shared flow (which [_"never completes. ... cannot be closed like BroadcastChannel"_](https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-shared-flow/))
 * into a finite collection flow (as per [_"completion signals should be explicitly materialized if needed."_](https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-shared-flow/))
 */
internal sealed class FiniteCollectionEventForSharedFlow<out Item> {

    /**
     * Signals that there is no more items.
     */
    object FinishedItems : FiniteCollectionEventForSharedFlow<Nothing>()

    /**
     * The events that need some action. Basically, not-the-[FinishedItems].
     */
    sealed class ActionableEvent<out Item> : FiniteCollectionEventForSharedFlow<Item>()

    /**
     * Signals a new item produced.
     */
    class ItemAvailable<out Item>(val item: Item) : ActionableEvent<Item>()

    /**
     * Signals an exception when trying to get the next available item.
     */
    class Exception(val exception: Throwable) : ActionableEvent<Nothing>()
}

internal fun <T> Flow<FiniteCollectionEventForSharedFlow<T>>.toNormalFlowThatFinises(): Flow<T> =
    takeWhileInstanceOf<FiniteCollectionEventForSharedFlow.ActionableEvent<T>>()
        .map {
            when (it) {
                is FiniteCollectionEventForSharedFlow.ItemAvailable<T> ->
                    it.item
                is FiniteCollectionEventForSharedFlow.Exception -> throw it.exception /*
                            This is where #ThrowExceptionsFromUpstreamInConsumers is happening
                         */
            }
        }

/**
 * A hot flow for a single consumption that is mutable (shares characterics of a [Channel], i.e. has no replay, so
 * uses minimum memory - once a consumer collects an item, it gets released from memory).
 */
@OptIn(FlowPreview::class)
internal class MutableHotQueueFlow<E>(
    /**
     * If one of [Channel].SOMECAPACITY satisfies your use case (e.g. you don't need an exact number here), then
     * use that one for best semantics.
     */
    channelCapacity: Int,
) : AbstractFlow<E>(), MutableHotFlowThatFinishes<E> {
    private val channel = Channel<E>(
        capacity = channelCapacity,
    )

    private val flow = channel.receiveAsFlow()

    override suspend fun collectSafely(collector: FlowCollector<E>) =
        flow.collect(collector)

    override suspend fun emit(value: E) {
        try {
            channel.sendToUnlimited(value)
        } catch (e: ClosedSendChannelException) {
            throw TriedEmittingItemToCollectionInFinishedStateError(
                message = "Attempt to `emit` an item on a flow that is already finished",
                cause = e,
            )
        }
    }

    override fun tryEmit(value: E): Boolean {
        try {
            channel.sendToUnlimited(value)
        } catch (e: ClosedSendChannelException) {
            throw TriedEmittingItemToCollectionInFinishedStateError(
                message = "Attempt to `tryEmit` an item on a flow that is already finished",
                cause = e,
            )
        }
        return true
    }

    override suspend fun finish() {
        channel.close()
    }

    override suspend fun fail(reason: Throwable) {
        channel.close(reason)
    }
}
