// ktlint-disable filename
package com.speechify.client.internal.util.extensions.collections.flows

import com.speechify.client.api.util.Destructible
import com.speechify.client.api.util.Result
import com.speechify.client.api.util.successfully
import com.speechify.client.internal.sync.AtomicBool
import com.speechify.client.internal.time.nowInMillisecondsFromEpoch
import com.speechify.client.internal.util.collections.flows.MutableHotFlowThatFinishes
import com.speechify.client.internal.util.collections.flows.MutableSharedFlowThatFinishes
import com.speechify.client.internal.util.extensions.collections.firstInstanceOf
import com.speechify.client.internal.util.extensions.collections.firstNotNull
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancel
import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ChannelIterator
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.combine
import kotlinx.coroutines.flow.debounce
import kotlinx.coroutines.flow.emitAll
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.filterIsInstance
import kotlinx.coroutines.flow.filterNot
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.flatMapLatest
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.merge
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.produceIn
import kotlinx.coroutines.flow.sample
import kotlinx.coroutines.flow.scan
import kotlinx.coroutines.flow.transformWhile
import kotlinx.coroutines.flow.withIndex
import kotlinx.coroutines.launch
import kotlin.coroutines.coroutineContext
import kotlin.time.Duration

internal inline fun <E> Flow<E>.onFirst(crossinline action: suspend (E) -> Unit): Flow<E> =
    onIndex(index = 0, action)

/**
 * Useful for ad-hoc testing exceptions, etc.
 */
internal inline fun <E> Flow<E>.onNth(n: Int, crossinline action: suspend (E) -> Unit): Flow<E> =
    onIndex(index = n - 1, action)

internal inline fun <E> Flow<E>.onIndex(index: Int, crossinline action: suspend (E) -> Unit): Flow<E> {
    require(index >= 0) { "`index` cannot be negative" }

    return flow {
        var currentIdx = 0
        var isYetToFire = true
        emitAll(
            this@onIndex
                .onEach {
                    if (isYetToFire) {
                        if (currentIdx == index) {
                            isYetToFire = false
                            action(it)
                        } else {
                            ++currentIdx
                        }
                    }
                },
        )
    }
}

/**
 * Maps (using [transform]) just the first element observed in the flow, and leaves others intact.
 */
internal inline fun <T> Flow<T>.mapFirst(crossinline transform: suspend (value: T) -> T) =
    this.mapItemAtIndex(index = 0, transform)

/**
 * Useful as a building block for convenience overloads and for ad-hoc testing.
 */
internal inline fun <T> Flow<T>.mapItemAtIndex(index: Int, crossinline transform: suspend (value: T) -> T) =
    this.withIndex()
        .map { if (it.index == index) transform(it.value) else it.value }

/**
 * Runs [action] when it is starting to take more than [duration] for the upstream [this] to produce the next flow item.
 *
 * _(It does nothing if consuming the item downstream took long.)_
 *
 * @param duration - The allowed delay for items to arrive from upstream ([action] executes only when it is exceeded).
 * @param action - If the function passed here suspends, it will be cancelled when the value finally arrives, which
 * means that this cancellation can be used when deferring the side effect using a cooperative suspend function, or
 * a [kotlinx.coroutines.suspendCancellableCoroutine] can be used to undo the side effect.
 */
internal fun <E> Flow<E>.onEachDelayUpstreamOver(
    duration: Duration,
    action: suspend (duration: Duration) -> Unit,
): Flow<E> {
    return flow {
        coroutineScope {
            fun startDelayedActionChildJob() = launch {
                delay(duration)
                action(duration)
            }

            var currentDelayedActionChildJob: Job? = startDelayedActionChildJob()
            try {
                this@onEachDelayUpstreamOver.collect {
                    currentDelayedActionChildJob!!
                        .cancelAndJoin() /* Use `AndJoin`, so that the side effects of `collect` don't race with the
                         side effects of the `emit` (the Jobs can execute in different threads, so otherwise their end
                         result could be in arbitrary order) */
                        .also {
                            currentDelayedActionChildJob = null
                        }
                    emit(it)

                    currentDelayedActionChildJob = startDelayedActionChildJob()
                }
            } finally {
                currentDelayedActionChildJob?.cancelAndJoin()
            }
        }
    }
}

/**
 * Allows to use [this] in a `for` loop (easier step-through debugging, `continue`, etc).
 *
 * NOTE: Must call [Destructible.destroy] if finishing the loop prematurely using `return` or `break` (best to wrap the
 * `for` loop with `use` in such a case).
 */
@OptIn(FlowPreview::class)
internal suspend fun <Item> Flow<Item>.asIterableInCurrentScope(): DestructibleChannelIterator<Item> {
    val iterationScope =
        /* TODO - explain why we are doing `coroutineContext` despite "Do not do this!" ([1](https://stackoverflow.com/questions/67749075/create-a-child-coroutine-scope-in-kotlin), [2](https://elizarov.medium.com/coroutine-context-and-scope-c8b255d59055)).
                    It's mostly maybe because we aren't "If you need to launch a coroutine that keeps running after your function returns".
                   Need a parameter because there seems to be no way to get the current scope, as per
                   [this StackOverflow](https://stackoverflow.com/questions/56659045/kotlin-coroutines-how-to-get-coroutinescope-for-the-current-thread).
                   BTW, if [the proposal for disposable iterators](https://github.com/LouisCAD/kotlin-coroutines/blob/master/disposable-iterators.md)
                   was in place we could create our own [CoroutineScope], even without any parent relation and dispose it in the iterator, but so far iterators aren't
                   disposable in Kotlin. */

        CoroutineScope(coroutineContext) /* Don't create another job as `produceIn` creates its own (child of the
        Job currently in `coroutineContext`), and nicely completes it when finished. We would need to complete ours
        manually - all for no reason as `produceIn` readily also covers cancellation */
    return this@asIterableInCurrentScope
        .buffer(
            capacity = Channel.RENDEZVOUS,
            /* `RENDEZVOUS` so that the `produceIn` doesn't pull anything
                                                   more than we read */
        )
        .produceIn(iterationScope)
        .let {
            object : DestructibleChannelIterator<Item> {
                override fun iterator() = it.iterator()

                override fun destroy() = iterationScope.cancel()
            }
        }
}

internal interface DestructibleChannelIterator<E> : Destructible {
    operator fun iterator(): ChannelIterator<E>
}

/**
 * This is especially useful for [Channel.RENDEZVOUS] flows and similar schemes (for example
 * [kotlinx.coroutines.flow.consumeAsFlow] does such a guard) to makes sure that it is converted to a shared flow,
 * with the correct strategy, if multiple consumptions are needed.
 */
internal fun <E> Flow<E>.guardOneConsumption(reason: String): Flow<E> {
    val hasConsumed = AtomicBool(initialValue = false)

    return flow {
        if (!hasConsumed.compareAndSet(expect = false, set = true)) {
            throw MultipleConsumptionsAttemptedError(
                "The flow was restricted to a single execution. Reason: $reason",
            )
        }
        this@guardOneConsumption.collect(this)
    }
}

internal class MultipleConsumptionsAttemptedError(message: String, cause: Throwable? = null) : Error(message, cause)

/**
 * Returns the first [Result.Failure] found or, if no failures found, the result of [collector].
 */
internal suspend fun <T, Ret> Flow<Result<T>>.collectIfAllSuccessful(
    /**
     * The result to return if no failures are encountered (if a failure is encountered, the function will return it
     * instead of calling this function).
     */
    getSuccessResult: () -> Ret,
    /**
     * Same semantics as [Flow.collect], but any accumulation should be done on the same object that [getSuccessResult]
     * will return.
     */
    collector: FlowCollector<T>,
):
    Result<Ret> {
    var firstFailure: Result.Failure? = null
    this.transformWhile<Result<T>, Unit> {
        when (it) {
            is Result.Failure -> {
                firstFailure = it
                false
            }

            is Result.Success -> {
                collector.emit(it.value)
                true
            }
        }
    }
        .collect()

    return firstFailure ?: getSuccessResult().successfully()
}

internal suspend fun <T> Flow<Flow<T>>.flatCollect(action: suspend (item: T) -> Unit) =
    collect { outerFlow ->
        outerFlow.collect {
            action(it)
        }
    }

/**
 * Useful for state Flows to await a condition (especially for synchronization).
 *
 * NOTE: The main reason this is an extension function on [Flow] and not on [kotlinx.coroutines.flow.StateFlow], despite being useful mostly for
 * 'state flows', is because [kotlinx.coroutines.flow.StateFlow]s can be [combine]d, and the result will be a normal [Flow].
 */
internal suspend fun <T> Flow<T>.suspendUntil(predicate: (item: T) -> Boolean): T =
    this.first { predicate(it) }

/**
 * A shortcut for [suspendUntil].
 */
internal suspend fun <T> Flow<T>.suspendUntilEquals(expected: T) =
    this.suspendUntil { it == expected }

internal suspend inline fun <reified R : Any> Flow<*>.suspendUntilIsInstanceOf() {
    firstInstanceOf<R>()
}

/**
 * Intentful [firstNotNull] for flows, which just waits, ignoring the value.
 */
internal suspend fun <T : Any> Flow<T?>.suspendUntilNotNull() {
    firstNotNull()
}

/**
 * Intentful `first { it == null }` for flows, which just waits, ignoring the value.
 */
internal suspend fun <T : Any> Flow<T?>.suspendUntilNull() {
    first { it == null }
}

/**
 * Intentful [first] for flows, which just waits, ignoring the value.
 */
internal suspend fun <T : Any> Flow<T?>.suspendUntilFirst() {
    first()
}

/**
 * A non-suspending version of [MutableSharedFlow.emit] for unlimited capacity [MutableSharedFlow]s which will never
 * return false.
 * Use it when your [MutableSharedFlow] has `replay` set to [Int.MAX_VALUE] (equivalent to `Channel.UNLIMITED`
 * capacity), to indicate that the flow buffer being unlimited is the reason why you don't expect suspend, and get a
 * correct error when the capacity was changed to be limited.
 * NOTE: Don't use it when [MutableSharedFlow] doesn't have `onBufferOverflow = BufferOverflow.SUSPEND` or the call
 * is not going to detect a problem that the capacity is not unlimited.
 */
internal fun <E> MutableSharedFlow<E>.emitToUnlimited(item: E) {
    this.tryEmit(item)
        .also { check(it) { "This should never happen. Are you sure the `replay` of the flow is `Int.MAX_VALUE`?" } }
}

/**
 * Useful for other reasons that a `send` must succeed, making sure this reason is reported.
 */
internal fun <E> MutableSharedFlow<E>.emitEnsuringReceivedOrBuffered(item: E) {
    this.tryEmit(item)
        .also { succeeded ->
            check(succeeded) {
                "Item was neither received nor buffered. Make sure there is a consumer or sufficient buffer."
            }
        }
}

/**
 * A version of [emitToUnlimited] for our [MutableSharedFlowThatFinishes]
 */
internal fun <E> MutableHotFlowThatFinishes<E>.emitToUnlimited(item: E) {
    this.tryEmit(item)
        .also {
            check(it) {
                "This should never happen. Are you sure the capacity of the flow" +
                    " is `Int.MAX_VALUE`/`Channel.UNLIMITED`?"
            }
        }
}

internal fun MutableStateFlow<Boolean>.setEnsuringWasOpposite(
    newValue: Boolean,
    errorMessage: () -> Any,
) =
    compareAndSet(
        expect = !newValue,
        update = false,
    )
        .also { wasOpposite ->
            check(wasOpposite, lazyMessage = errorMessage)
        }

/**
 * Use to mark that ignoring unsuccessful set is intended.
 */
internal fun MutableStateFlow<Int>.incrementOrIgnore(existing: Int) =
    this.compareAndSet(existing, update = existing + 1)

internal inline fun <reified R> Flow<*>.onEachInstance(crossinline block: suspend (R) -> Unit): Flow<*> {
    return this.onEach { if (it is R) block(it) }
}

internal inline fun <reified A, reified B> Flow<Pair<*, *>>.onEachPairInstance(
    crossinline block: suspend (A, B) -> Unit,
): Flow<Pair<*, *>> {
    return this.onEach { (first, second) ->
        if (first is A && second is B) block(first, second)
    }
}

internal inline fun <reified A, reified B, reified C> Flow<Triple<*, *, *>>.onEachTripleInstance(
    crossinline block: suspend (A, B, C) -> Unit,
): Flow<Triple<*, *, *>> {
    return this.onEach { (first, second, third) ->
        if (first is A && second is B && third is C) block(first, second, third)
    }
}

@OptIn(FlowPreview::class)
internal inline fun <reified R> Flow<*>.debounceInstances(periodMillis: Long): Flow<*> {
    val match = this.filterIsInstance<R>()
    val notmatch = this.filterNot { it is R }
    return merge(match.debounce(periodMillis), notmatch)
}

@OptIn(FlowPreview::class)
internal inline fun <reified R> Flow<*>.sampleInstances(periodMillis: Long): Flow<*> {
    val match = this.filterIsInstance<R>()
    val notmatch = this.filterNot { it is R }
    return merge(match.sample(periodMillis), notmatch)
}

/**
 * Performs similar throttleFirst operation to RxJava operator but for flow.
 * Inspired from here https://github.com/Kotlin/kotlinx.coroutines/issues/1446#issuecomment-525565729
 */
internal fun <T> Flow<T>.throttleFirst(windowDuration: Long): Flow<T> = flow {
    var lastEmissionTime = 0L
    collect { upstream ->
        val currentTime = nowInMillisecondsFromEpoch()
        val mayEmit = currentTime - lastEmissionTime > windowDuration
        if (mayEmit) {
            lastEmissionTime = currentTime
            emit(upstream)
        }
    }
}

/**
 * Emits a pair consisting of the first value of the flow and the current value
 */
internal fun <T> Flow<T>.emitFirstValueAsPair(): Flow<Pair<T, T>> {
    return flow {
        var firstValue: T? = null

        // Collect the flow to get the first emitted value
        this@emitFirstValueAsPair.collect { newValue ->
            if (firstValue == null) {
                firstValue = newValue // Capture the first emitted value
            }
            firstValue?.let {
                emit(Pair(it, newValue)) // Emit the pair with first and new value
            }
        }
    }
}

/**
 * Emits a pair consisting of the first value where the predicate is true of the flow and the current value
 */
internal fun <T> Flow<T>.emitFirstValueAsPairWhere(predicate: suspend (T) -> Boolean): Flow<Pair<T?, T>> {
    return flow {
        var firstValue: T? = null

        // Collect the flow to get the first emitted value
        this@emitFirstValueAsPairWhere.collect { newValue ->
            if (firstValue == null && predicate(newValue)) {
                firstValue = newValue // Capture the first emitted value
            }
            emit(Pair(firstValue, newValue)) // Emit the pair with first and new value
        }
    }
}

@Suppress("UNCHECKED_CAST")
internal fun <T> Flow<T>.currentAndPreviousValues(): Flow<PreviousAndCurrentValues<T, T>> {
    return this.scan(PreviousAndCurrentValues<T?, T?>(null, null)) { acc, value ->
        PreviousAndCurrentValues(acc.current, value)
    }.filter { it.previous != null && it.current != null } as Flow<PreviousAndCurrentValues<T, T>>
}

internal class PreviousAndCurrentValues<out A, out B>(
    val previous: A,
    val current: B,
) {
    operator fun component2(): B {
        return current
    }

    operator fun component1(): A {
        return previous
    }

    fun toPair() = Pair(previous, current)
}

@Suppress("UNCHECKED_CAST")
internal inline fun <reified A, reified B> Flow<Pair<*, *>>.filterIsPairInstance(): Flow<Pair<A, B>> =
    filter { (first, second) ->
        first is A && second is B
    } as Flow<Pair<A, B>>

/**
 * Only the latest operation for a given emission is executed,
 * and any previously running operation is canceled when a new emission is received.
 *
 * This extension is useful for handling tasks where only the latest instance of type [R] should be processed,
 * ensuring that outdated operations are canceled to optimize resource usage.
 *
 * **Note:** By design, this uses [flatMapLatest], which cancels the previous operation when a new emission occurs.
 *
 * WARNING: this will return an empty flow if type of data in flow is not [R], so, be careful when using in any chain of flows. it should
 * be at the end of chain as in case of non [R] elements subsequent operators in chain after [flatMapLatestForEachInstance] will not work.
 */
@OptIn(ExperimentalCoroutinesApi::class)
internal inline fun <reified R> Flow<*>.flatMapLatestForEachInstance(crossinline block: suspend (R) -> Unit): Flow<*> {
    return this.filterIsInstance<R>().flatMapLatest {
        flow {
            block(it)
            emit(Unit)
        }
    }
}

internal inline fun <reified R> Flow<*>.delayEachInstance(
    timeMillis: Long,
    crossinline block: suspend (R) -> Unit,
): Flow<*> {
    return this.onEach {
        if (it is R) {
            delay(timeMillis)
            block(it)
        }
    }
}

@Suppress("UNCHECKED_CAST")
internal fun <T1, T2, T3, T4, T5, T6, R> combine(
    flow: Flow<T1>,
    flow2: Flow<T2>,
    flow3: Flow<T3>,
    flow4: Flow<T4>,
    flow5: Flow<T5>,
    flow6: Flow<T6>,
    transform: suspend (T1, T2, T3, T4, T5, T6) -> R,
): Flow<R> = combine(flow, flow2, flow3, flow4, flow5, flow6) { args: Array<*> ->
    transform(
        args[0] as T1,
        args[1] as T2,
        args[2] as T3,
        args[3] as T4,
        args[4] as T5,
        args[5] as T6,
    )
}
