package com.speechify.client.internal.util.collections.flows

import com.speechify.client.api.diagnostics.DiagnosticEvent
import com.speechify.client.api.diagnostics.Log
import com.speechify.client.internal.runTask
import com.speechify.client.internal.util.extensions.collections.sendEnsuringReceivedOrBuffered
import com.speechify.client.internal.util.extensions.intentSyntax.ignoreValue
import com.speechify.client.internal.util.runCatchingSilencingErrorsLoggingThem
import com.speechify.client.internal.util.timeout.awaitWithTimeoutThrowingNonCancellationException
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ClosedSendChannelException
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.job
import kotlin.coroutines.CoroutineContext
import kotlin.experimental.ExperimentalTypeInference
import kotlin.time.ComparableTimeMark
import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.Duration.Companion.seconds
import kotlin.time.ExperimentalTime
import kotlin.time.TimeSource

/**
 * To be used when the producer (the caller of the callback) should never get an exception from callbacks through
 * which it reports the items for the flow.
 *
 * Note: Some tests for this function are ignored due to their flaky nature.
 * For more details, see [CallbackFlowNeverThrowingToProducerTest].
 */
@OptIn(ExperimentalTypeInference::class, ExperimentalTime::class)
internal fun <T> callbackFlowNeverThrowingToProducer(
    /**
     * Use `true` for linking producer and consumers where no items should be lost.
     *
     * Such an error means failure of the producer to control concurrency and can be fixed by
     * not returning from the cancellation API (called in [kotlinx.coroutines.channels.awaitClose]),
     * until no events are sent (e.g. by mutex lock/single thread confinement).
     *
     * NOTE: Other cases of the item not being delivered are always logged (especially receiving items a long time
     * after cancellation, enough to not be considered a race), and this flag is made available only to cover
     * APIs that are known to not be considering concurrency, while the consumer has no need to ensure that no
     * items are lost.
     */
    shouldLogErrorIfCancellationPreventedDelivery: Boolean,
    /**
     * The [DiagnosticEvent.sourceAreaId] to use for logging, to be able to identify the source of the error.
     */
    sourceAreaId: String,
    diagnosticProperties: Map<String, Any>? = null,
    /**
     * The size of the buffer to use for the underlying channel. Specifying [Channel.UNLIMITED] is valid
     * in cases where the collector might be slower than the producer.
     */
    bufferCapacity: BufferCapacity,
    @BuilderInference block: suspend ProducerScopeForNeverThrowingCallbackFlow<T>.() -> Unit,
): Flow<T> =
    callbackFlowWithCapacityRequired(capacityOrNullIfUnlimited = bufferCapacity) {
        val nativeProducerScope = this

        val closedTime = CompletableDeferred<ComparableTimeMark>()
        block(
            object :
                ProducerScopeForNeverThrowingCallbackFlow<T>,
                CoroutineContext by nativeProducerScope.coroutineContext {
                init {
                    nativeProducerScope
                        .coroutineContext
                        .job
                        .invokeOnCompletion {
                            /**
                             * Need to mark `closedTime` in a common place because even though
                             * [kotlinx.coroutines.channels.ProducerScope.awaitClose] is encouraged,
                             * the caller can just use the block's suspension and [kotlinx.coroutines.channels.ProducerScope.close]
                             * or [kotlinx.coroutines.channels.ProducerScope.cancel] at the end as well, thus avoiding any
                             * complaint from [kotlinx.coroutines.flow.callbackFlow].
                             */
                            /**
                             * Not doing it in `nativeProducerScope.invokeOnClose { ensureClosingTimeMarked() }`, because
                             * it only allows one listener. If we did this, we'd need to expose our own listeners-list
                             * for the [ProducerScopeForNeverThrowingCallbackFlow] that we return.
                             */
                            /** Using [CompletableDeferred.complete] because it already is idempotent, as per its KDoc */
                            closedTime.complete(
                                TimeSource.Monotonic.markNow(),
                            )
                        }
                }
                override fun send(item: T) {
                    runCatchingSilencingErrorsLoggingThem(
                        sourceAreaId = sourceAreaId,
                        properties = diagnosticProperties,
                        shouldIgnoreCancellationExceptions =
                        /**
                         * `false`, not to ignore [CancellationException]. #LoggingCancellations
                         * See #CancellationSpecialMeaningInCallbackFlow.
                         */
                        false,
                        overrideLogging = { exception, sourceAreaId, properties ->
                            if (
                                exception is CancellationException ||
                                /** [ClosedSendChannelException] occurs when the [block] calls just [kotlinx.coroutines.channels.ProducerScope.close]. Despite
                                 * it not being strictly [CancellationException], it is a valid API use (signals the
                                 * consumer that all items have arrived - there will be no more - see also [com.speechify.client.internal.util.extensions.collections.channels.closeNormallySignifyingEndOfItems]), so we also
                                 * treat this one like the [CancellationException], and respect [shouldLogErrorIfCancellationPreventedDelivery]
                                 * to not always log it, but just if it's the implementer not respecting a 'unsubscribe'.
                                 */
                                exception is ClosedSendChannelException
                            ) {
                                val now = TimeSource.Monotonic.markNow()

                                /** NOTE: This is the unusual case where we handle and log a cancellation exception.
                                 * but this is desired - they are thrown when the producer emitted an event that the consumer
                                 * will not receive.
                                 * This can be:
                                 * - an undesired situation of an event getting missed, where it's expected that no
                                 * event is missed ([shouldLogErrorIfCancellationPreventedDelivery]=true)
                                 * - a memory-leak bug in implementation, where it ignored the destructor call and
                                 * continues to hold onto the callback and call it with new values.
                                 * #CancellationSpecialMeaningInCallbackFlow
                                 */
                                runTask(
                                    contextToMerge = CoroutineName(
                                        "callbackFlowNeverThrowingToProducer.logging",
                                    ),
                                ) {
                                    val actualClosedTime = closedTime
                                        .awaitWithTimeoutThrowingNonCancellationException(
                                            timeout = 5.seconds,
                                            actionName = "callbackFlowNeverThrowingToProducer.logging.closedTime.await",
                                            /* 5 seconds without `closedTime` arriving seems like a clear problem in the
                                             * implementation of `callbackFlowNeverThrowingToProducer` that would cause
                                             * a memory leak if we didn't abort here.
                                             */
                                            consequencesMessage = "This indicates a likely memory leak - report to " +
                                                "SDK maintainers.",
                                        )

                                    val durationSinceClosed: Duration = now - actualClosedTime

                                    /** A `false` [shouldLogErrorIfCancellationPreventedDelivery] means that the user
                                     *  of this function cannot ensure there is no race with cancellation.
                                     * We allow a modest time for the racing, and then don't log, but when the time is
                                     * really far, we notify developers of a suspected race-condition. */
                                    if (
                                        shouldLogErrorIfCancellationPreventedDelivery ||
                                        durationSinceClosed > 500.milliseconds
                                    ) {
                                        Log.e(
                                            DiagnosticEvent(
                                                sourceAreaId = sourceAreaId,
                                                /** NOTE: This is one unusual case where we log a `CancellationException`,
                                                 * but this is desired as per #CancellationSpecialMeaningInCallbackFlow
                                                 * (we could consider wrapping it with a non-cancellation one, so that
                                                 * in the logs we don't immediately think it should be ignored, but
                                                 * hopefully the message acts to explain why it's not desired).
                                                 */
                                                message = "The channel for the flow has closed, but the producer is" +
                                                    " still sending events to it even after {timeSinceClosedMs}. This" +
                                                    " is a memory leak - the producer should unregister the callback." +
                                                    " The `CancellationException` logged with this event is the " +
                                                    "cancellation exception that may carry information about the" +
                                                    " cause of the consumer's cancellation.",
                                                nativeError = exception,
                                                properties = mapOf(
                                                    "timeSinceClosedMs" to
                                                        durationSinceClosed.inWholeMilliseconds,
                                                ) + (properties ?: emptyMap()),
                                            ),
                                        )
                                    }
                                }
                                    .ignoreValue()
                            } else {
                                /* In any case where the communication was broken not through a graceful cancel or `close`,
                                 * let's log the exception immediately - the consumer-code will not get it, so this is our
                                 * last chance to notify developers.
                                 */
                                Log.e(
                                    DiagnosticEvent(
                                        sourceAreaId = sourceAreaId,
                                        message = "An abnormal failure of the channel for the flow prevented the item" +
                                            " from being delivered to the flow.",
                                        nativeError = exception,
                                        properties = properties,
                                    ),
                                )
                            }
                        },
                    ) {
                        nativeProducerScope.sendEnsuringReceivedOrBuffered(item)
                    }
                }

                @Suppress(
                    /* Ignore `OVERRIDE_DEPRECATION` because this is an anonymous class. */
                    "OVERRIDE_DEPRECATION",
                )
                override fun close(causeIfFailureOrNullIfNormalClose: Throwable?): Boolean =
                    nativeProducerScope
                        .close(causeIfFailureOrNullIfNormalClose)

                override suspend fun awaitClose(block: () -> Unit) {
                    nativeProducerScope.awaitClose {
                        block()
                    }
                }
            },
        )
    }

/**
 * Reduced version of [kotlinx.coroutines.channels.ProducerScope], exposing only the methods to use in [callbackFlowNeverThrowingToProducer].
 */
internal interface ProducerScopeForNeverThrowingCallbackFlow<T> : CoroutineContext {
    fun send(item: T)

    /**
     * See [kotlinx.coroutines.channels.ProducerScope.close]
     */
    @Deprecated(
        message = "Use 'closeAbnormallyWithFailure' or 'closeSuccessfullyWithCancellation' instead.",
    )
    fun close(
        causeIfFailureOrNullIfNormalClose: Throwable? = null,
    ): Boolean

    /**
     * See [com.speechify.client.internal.util.extensions.collections.channels.closeNormallySignifyingEndOfItems]
     */
    fun closeNormallySignifyingEndOfItems(): Boolean =
        @Suppress("DEPRECATION")
        close(
            causeIfFailureOrNullIfNormalClose = null,
        )

    /**
     * See [com.speechify.client.internal.util.extensions.collections.channels.closeAbnormallyWithFailure]
     */
    fun closeAbnormallyWithFailure(cause: Throwable): Boolean =
        @Suppress("DEPRECATION")
        close(
            causeIfFailureOrNullIfNormalClose = cause,
        )

    /**
     * See [com.speechify.client.internal.util.extensions.collections.channels.closeImmediatelyWithCancellation]
     */
    fun closeByCancellingProducerAndConsumerImmediately(
        cause: CancellationException? = null,
    ) =
        cancel(
            cause = cause,
        )

    /**
     * See [kotlinx.coroutines.channels.awaitClose].
     */
    suspend fun awaitClose(block: () -> Unit = {})
}
