package com.speechify.client.internal.util.collections.flows

import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ProducerScope

/**
 * A drop-in replacement for [kotlinx.coroutines.flow.flow] that just adds the possibility to emit from different
 * coroutines, but prevents the [kotlinx.coroutines.flow.channelFlow]'s footgun of losing items to a full buffer, and
 * does this without losing the flow consumer's ability to communicate its backpressure to the producer, just like
 * [kotlinx.coroutines.flow.flow].
 *
 * NOTE: Sometimes it is not desired, or not possible to block the producer, especially when it produces
 * from non-suspend functions - use [callbackFlowWithCapacityRequired] or [channelFlowWithCapacityRequired] instead in
 * such cases instead.
 */
internal fun <T> channelFlowWithoutItemsLoss(
    block: suspend ProducerScopeWithMandatorySuspend<T>.() -> Unit,
) = channelFlowWithCapacityRequired(
    capacityOrNullIfUnlimited = Channel.RENDEZVOUS,
) {
    this.restrictToMandatorySuspend().block()
}

private fun <T> ProducerScope<T>.restrictToMandatorySuspend(): ProducerScopeWithMandatorySuspend<T> =
    object : ProducerScopeWithMandatorySuspend<T> {
        override suspend fun emit(element: T) =
            this@restrictToMandatorySuspend.send(element)
    }

/**
 * Equivalent to [kotlinx.coroutines.flow.FlowCollector], but without the semantics of [kotlinx.coroutines.flow.FlowCollector.emit]
 * that suggest thread unsafety.
 * Equivalent to [kotlinx.coroutines.channels.ProducerScope], but just its [kotlinx.coroutines.channels.ProducerScope.send]
 * method, and especially  without the [kotlinx.coroutines.channels.ProducerScope.trySend], so no possibility to
 * accidentally lose a value to a full buffer.
 */
internal interface ProducerScopeWithMandatorySuspend<E> {
    /**
     * Collects the value emitted by the upstream.
     * This method is thread-safe and can be invoked concurrently (as opposed to [kotlinx.coroutines.flow.FlowCollector.emit]).
     */
    suspend fun emit(element: E)
}

/**
 * A version of [channelFlowWithoutItemsLoss] that also has the behavior of [flowThatFinishesOnlyThroughCollectionCancel].
 */
internal fun <T> channelFlowWithoutItemsLossThatFinishesOnlyThroughCollectionCancel(
    block: suspend ProducerScopeWithMandatorySuspend<T>.() -> Nothing,
): FlowThatFinishesOnlyThroughCollectionCancel<T> = channelFlowWithCapacityRequired(
    capacityOrNullIfUnlimited = Channel.RENDEZVOUS,
) {
    this.restrictToMandatorySuspend().block()
}
    .flowThatFinishesOnlyThroughCollectionCancelStronglyTyped(
        sourceAreaId = "channelFlowWithoutItemsLossThatFinishesOnlyThroughCollectionCancel",
    )
