package com.speechify.client.api.util.collections.flows

import com.speechify.client.api.util.Callback
import com.speechify.client.api.util.CallbackNoError
import com.speechify.client.api.util.Destructible
import com.speechify.client.api.util.fromCoGetDestructible
import com.speechify.client.api.util.successfully
import com.speechify.client.internal.coroutines.fromNonCancellableAPIs.suspendCancellableCoroutineWithSDKResultThrowing
import com.speechify.client.internal.util.collections.flows.callbackFlowNeverThrowingToProducer
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlin.js.JsExport

/**
 * This is a boundary equivalent of a Kotlin Flow for callback-based API using the SDK Result.
 * NOTE: Being a plain sync 'callback flow' (and especially the callback not being async) it has a caveat that the
 * producer cannot make the production suspend until consumption finished (to allow that, the `emit` would have to be
 * an async function)
 */
@JsExport
interface CallbackFlowSourceFromCollectWithResult<T> {
/** TODO Consider consolidating this class with [com.speechify.client.api.util.boundary.CancellableJobWithIntermediateResults] */
    /**
     * The function that iterates over the items, calling the [collectOne] callback for each of them,
     * and [complete] when no more items will come (this may be due to a failure, in which case it will receive
     * a [Result.Failure] as the argument).
     *
     * Returns a [Destructible] whose [Destructible.destroy] unsubscribes the [collectOne] callback.
     * Once this is done successfully, a cancellation [Result.Failure] is returned in [complete] (detectable using
     * [com.speechify.client.api.util.boundary.deferred.CancellationUtils.isCancellationResult]). See documentation of
     * [complete] for more details of this aspect.
     */
    fun collect(
        /**
         * For the caller:
         * - this will be called for each item. NOTE: when the function returns, the implementor will resume producing
         *   the next item, so the caller should not return if this would cause a race condition (e.g. by only using
         *   blocking code).
         *
         * For the implementor:
         * - this should be called for every item produced, and no two calls should ever be running in parallel.
         */
        collectOne: CallbackNoError<T>,
        /**
         * For the caller: this will be called when no more items will come. This may be due to a failure or
         * cancellation, in which case it will receive a [Result.Failure] as the argument.
         * NOTE on **cancellations**: the caller should check if the reason was a cancellation using [com.speechify.client.api.util.boundary.deferred.CancellationUtils.isCancellationResult]
         * (or if translating to an idiomatic exception using [com.speechify.client.api.util.boundary.deferred.CancellationUtils])
         * and should not consider it an error and, in particular, **should not notify developers about it** (e.g. produce no logs or console output).
         *
         * For implementor: This callback should be used by the implementor, just like any other callback-based
         * function, so should always be called (ideally, the body of the functions should be just an async block):
         * - when successful, it should be called with `Unit`
         * - when reading failed, it should be called with the `Failure`
         * NOTE on **cancellations**:
         * - When [Destructible.destroy] is called, the implementor should cancel any pending reading, and call [complete] with
         *   a result created using [com.speechify.client.api.util.boundary.deferred.CancellationUtils.createCancellationCauseResult].
         * - [Destructible.destroy] should be safe to be called from any thread, and after it returns, the implementor
         *   should not call [collectOne] anymore.
         */
        @Suppress("NON_EXPORTABLE_TYPE")
        complete: Callback<Unit>,
    ): Destructible
}

internal suspend fun <T> CallbackFlowSourceFromCollectWithResult<T>.collect(
    emitChunk: CallbackNoError<T>,
): Unit = suspendCancellableCoroutineWithSDKResultThrowing { completion ->
    val collectingJob = collect(
        collectOne = emitChunk,
        complete = completion::resume,
    )
    completion.invokeOnCancellation {
        collectingJob.destroy()
    }
}

/**
 * See [CallbackFlowSourceFromCollectWithResult] for limitations of this flow.
 */
abstract class FlowFromCallbackFlowSource<T> :
    Flow<T>,
    CallbackFlowSourceFromCollectWithResult<T> {
    abstract override fun collect(collectOne: CallbackNoError<T>, complete: Callback<Unit>): Destructible

    override suspend fun collect(collector: FlowCollector<T>) =
        callbackFlow.collect(collector)

    private val callbackFlow: Flow<T> = callbackFlowNeverThrowingToProducer(
        /** shouldLogErrorIfCancellationPreventedDelivery=`false`, because the adapters are implemented without
         *  requirement to control concurrency (they could do that by not returning from the unsubscribe function
         *  until no more items coming is ensured)
         */
        shouldLogErrorIfCancellationPreventedDelivery = false,
        sourceAreaId = "KotlinFlowFromCallbackFlow.callbackFlow",
        block = {
            collect(
                emitChunk = this::send,
            )
            closeNormallySignifyingEndOfItems()
        },
        bufferCapacity = Channel.UNLIMITED, // Unlimited to prevent errors when producer is faster than consumer.
    )
}

/**
 * Turns a [Flow] into [CallbackFlowSourceFromCollectWithResult].
 * A reverse of [FlowFromCallbackFlowSource].
 */
internal fun <T> Flow<T>.toCallbackFlowSourceFromCollectWithResult(): CallbackFlowSourceFromCollectWithResult<T> =
    object : CallbackFlowSourceFromCollectWithResult<T> {
        override fun collect(
            collectOne: CallbackNoError<T>,
            complete: Callback<Unit>,
        ): Destructible = complete.fromCoGetDestructible {
            this@toCallbackFlowSourceFromCollectWithResult.collect {
                collectOne(it)
            }

            Unit.successfully()
        }
    }
