package com.speechify.client.internal.util.collections.flows

import com.speechify.client.internal.util.extensions.intentSyntax.isNotNullAnd
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.onCompletion

/**
 * A shorthand over [onCompletion], which also avoids ambiguity around what 'completion' means (see [kotlinx.coroutines.Job.isCompleted])
 */
internal fun <T> Flow<T>.onCompletionSuccessfully(
    action: suspend FlowCollector<T>.() -> Unit,
) = onCompletion { cause ->
    if (cause == null) {
        action()
    }
}

/**
 * Helps to avoid adding behavior to the wrong exception type (e.g. forgetting to consider cancellation exceptions),
 * as well as makes it easy to safely and intentionally cover behavior for each types of an exceptional case.
 */
internal fun <T> Flow<T>.onCompletionExceptional(
    /**
     * The action to perform on any exception, be it a cancellation (representing a user's intent to abandon the
     *  operation) or a non-cancellation (representing an unexpected error).
     *
     * NOTE: as opposed to [onCompletionExceptionalFromDownstream], [onCompletionExceptionalFromUpstream], this function
     * will be called both for downstream and upstream exceptions (see [onCompletion]).
     */
    onAnyException: (suspend (cause: Throwable) -> Unit)? = null,
    /**
     * The action to perform on any exception, only for cancellation - exceptions that should not be treated as errors
     * (not logged to notify developers), representing user's intent to abandon the operation and to stop wasting
     * resources for it.
     *
     * Typical use for this parameter is cleaning up any state that was created for the operation.
     */
    onCancellationException: (suspend (cancellationException: CancellationException) -> Unit)? = null,
    /**
     * The action to perform on any exception, only for non-cancellation - exceptions representing interruptions against
     * user's intent (including transient problems like connection errors).
     *
     * Typical use for this parameter is clearing pending state but marking it as failed (to notify user and make it
     * available for retry or cleanup).
     *
     * NOTE: as opposed to [onCompletionExceptionalFromDownstreamNonCancellation],
     * [onCompletionExceptionalFromUpstreamNonCancellation], this function will be called both for downstream and
     * upstream exceptions (see [onCompletion]).
     */
    onNonCancellationException: (suspend (nonCancellationException: Throwable) -> Unit)? = null,
) =
    this
        .let { flow ->
            if (onAnyException != null) {
                flow.onCompletion { cause ->
                    if (cause != null) {
                        onAnyException(cause)
                    }
                }
            } else {
                flow
            }
        }
        .let {
            if (onCancellationException != null) {
                it.onCompletionExceptionalCancellation(onCancellationException)
            } else {
                it
            }
        }
        .let {
            if (onNonCancellationException != null) {
                it.onCompletionExceptionalNonCancellation(onNonCancellationException)
            } else {
                it
            }
        }

/**
 * Note that this function will be called both for downstream and upstream cancellations (see [onCompletion]).
 *
 * For non-cancellations see [onCompletionExceptionalNonCancellation], [onCompletionExceptionalFromUpstreamNonCancellation]
 * or [onCompletionExceptionalFromDownstreamNonCancellation].
 * NOTE: If both producer and consumer are cancelled through a common ancestor [kotlinx.coroutines.Job], this will be called only once.
 * (see unit test `onCompletionExceptionalCancellation - does run action once and only once - on cancellation from ancestor Job common to producer and consumer`).
 */
internal fun <T> Flow<T>.onCompletionExceptionalCancellation(
    action: suspend (cancellationException: CancellationException) -> Unit,
) =
    onCompletion { cause ->
        if (cause is CancellationException) {
            action(cause)
        }
    }

/**
 * Note that, as opposed to [onCompletionExceptionalFromDownstreamNonCancellation] ond [onCompletionExceptionalFromUpstreamNonCancellation],
 * this function will be called both for downstream and upstream cancellations (see [onCompletion]).
 *
 * For cancellations see [onCompletionExceptionalCancellation].
 */
internal fun <T> Flow<T>.onCompletionExceptionalNonCancellation(
    action: suspend (cancellationException: Throwable) -> Unit,
) =
    onCompletion { cause ->
        if (cause.isNotNullAnd { this !is CancellationException }) {
            action(cause)
        }
    }

/**
 * As opposed to [onCompletionExceptionalNonCancellation] this function will be called only for downstream exceptions
 * ([CancellationException] as well as any exception while trying to handle collect the item).
 *
 * For upstream version see [onCompletionExceptionalFromUpstream].
 */
internal fun <T> Flow<T>.onCompletionExceptionalFromDownstream(
    action: suspend (exceptionFromDownstream: Throwable) -> Unit,
) =
    flow {
        collect {
            try {
                emit(it)
            } catch (e: Throwable) {
                action(e)
                throw e
            }
        }
    }

/**
 * As opposed to [onCompletionExceptionalNonCancellation] this function will be called only for downstream exceptions.
 *
 * For upstream version see [onCompletionExceptionalFromUpstreamNonCancellation].
 * For both upstream and downstream version see [onCompletionExceptionalNonCancellation].
 *
 * Note that this function only acts on **non-cancellations** and [onCompletionExceptionalCancellation] should be used
 * for **cancellations**: just like explained in [onCompletionExceptionalFromUpstreamNonCancellation], it most likely
 * doesn't make sense to create a version of this function for "**cancellations** only from downstream" because any
 * exception upstream will cause downstream to stop consuming the flow with the same cancellation exception, so not only
 * it doesn't seem useful to separate one from the other, but it also poses a footgun (because separation from
 * downstream isn't total) and even an implementation problem, which can be seen in `onCompletionExceptionalFromDownstreamCancellation - makes no sense - onCompletionExceptionalFromDownstream does not run action - on cancellation from ancestor Job`
 */
internal fun <T> Flow<T>.onCompletionExceptionalFromDownstreamNonCancellation(
    action: suspend (cause: Throwable) -> Unit,
) =
    onCompletionExceptionalFromDownstream { exceptionFromDownstream ->
        if (exceptionFromDownstream !is CancellationException) {
            action(exceptionFromDownstream)
        }
    }

/**
 * As opposed to [onCompletionExceptionalNonCancellation] this function will be called only for non-cancellation
 * upstream exceptions.
 * This function is just like [catch], but this function ensures the exception is rethrown, and not suppressed.
 *
 * For downstream version see [onCompletionExceptionalFromDownstreamNonCancellation].
 * For both upstream and downstream version see [onCompletionExceptionalNonCancellation].
 *
 * Note that this function only acts on **non-cancellations** and [onCompletionExceptionalCancellation] should be used
 * for **cancellations**: just like explained in [onCompletionExceptionalFromDownstreamNonCancellation], it most likely
 * doesn't make sense to create a version of this function for "**cancellations** only from upstream" because
 * cancellations to upstream are also triggered by any event that causes downstream to stop consuming the flow, be it a
 * cancellation, a successful application of a terminating operator like [kotlinx.coroutines.flow.first], or a failure
 * in [Flow.collect] when handling the item (although note here: [onCompletionExceptionalCancellation] will not fire.
 * It will only fire for [onCompletionExceptional] or [onCompletionExceptionalFromDownstreamNonCancellation].). So, not
 * only it doesn't seem useful to separate one from the other, but it also poses a footgun (because separation from
 * downstream isn't total) and an implementation problem, which can be seen in `onCompletionExceptionalFromUpstreamCancellation - makes no sense - because 'onCompletionExceptionalFromUpstream' also runs on cancellation from ancestor Job common to producer and consumer`
 */
internal fun <T> Flow<T>.onCompletionExceptionalFromUpstreamNonCancellation(
    action: suspend (cause: Throwable) -> Unit,
) =
    /** In contrast to [onCompletionExceptionalFromUpstream], this one can be implemented by [catch], because it's
     * already working on "upstream" (as per [catch] KDoc), and it matches precisely our non-cancellation requirement.
     */
    /** No need to filter [CancellationException]s, because [catch] KDoc already says: _"does not catch exceptions that are thrown to cancel the flow"_ */
    catch { cause ->
        action(cause)
        /** As per [catch]'s docs, rethrowing from here will propagate the exception (make this operator
         * transparent to exception again) */
        throw cause
    }

/**
 * As opposed to [onCompletionExceptional]'s `onAnyException`, or [onCompletion], this function will be called only for
 * upstream exceptions ([CancellationException] as well, both from upstream and from cancelling common ancestor, but not
 * on cancellation from consumer using successful terminating operator - see test `onCompletionExceptionalFromUpstream - does not run action - on cancellation from consumer using successful terminating operator`).
 *
 * For downstream version see [onCompletionExceptionalFromDownstream].
 */
internal fun <T> Flow<T>.onCompletionExceptionalFromUpstream(
    action: suspend (cause: Throwable) -> Unit,
) =
    /** Implementation roughly follows [catch], which already works on "upstream" (as per [catch] KDoc), and which
     *  suggests that, "Conceptually, the action of `catch` operator is similar to wrapping the code of upstream flows
     *  with `try { ... } catch (e: Throwable) { action(e) }`.", in that it's doing just that (though [catch]'s
     *  `catchImpl` seems much more complex, so it may be missing some  corner cases), but [catch] filters cancellation
     *  exceptions, so we need a re-implementation.
     */
    flow {
        var hadDownstreamException = false
        try {
            this@onCompletionExceptionalFromUpstream.collect {
                try {
                    this@flow.emit(it)
                } catch (e: Throwable) {
                    hadDownstreamException = true
                    throw e
                }
            }
        } catch (e: Throwable) {
            if (!hadDownstreamException) {
                action(e)
            }
            throw e
        }
    }
