package com.speechify.client.api.util.collections.flows

import com.speechify.client.api.diagnostics.DiagnosticEvent
import com.speechify.client.api.util.CallbackNoError
import com.speechify.client.api.util.Destructor
import com.speechify.client.internal.sync.AtomicBool
import com.speechify.client.internal.sync.AtomicRef
import com.speechify.client.internal.sync.set
import com.speechify.client.internal.util.collections.flows.CallbackBasedProducer
import com.speechify.client.internal.util.collections.flows.wrapWithNeverThrowingToProducer
import com.speechify.client.internal.util.extensions.collections.flows.emitToUnlimited
import com.speechify.client.internal.util.extensions.collections.flows.sharedFlow.stateInSameCoroutineUsingFallbackValueWhenReplayEmpty
import com.speechify.client.internal.util.extensions.intentSyntax.ignoreValue
import com.speechify.client.internal.util.extensions.throwable.throwAfterRunAddingToSuppressed
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.awaitCancellation
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.launch
import kotlin.contracts.ExperimentalContracts
import kotlin.contracts.InvocationKind
import kotlin.contracts.contract
import kotlin.js.JsExport

/**
 * Represents a possibly-changing value coming from SDK consumers, and readable by SDK.
 *
 * Similar to [SharedFlowFromCallback], but without a possibility to return the first value asynchronously.
 *
 * Product teams only need to implement [getCurrentValueAndSubscribeToUpdatesAndGetCancel] - see there for details, as
 * it has special requirements.
 *
 * In Kotlin's idioms, this is closest to a [kotlinx.coroutines.flow.StateFlow] but this type is
 * more "a [kotlinx.coroutines.flow.Flow] that is the building-block for constructing a [kotlinx.coroutines.flow.StateFlow]",
 * which should be done using the provided [stateIn].
 *
 * NOTE on usefulness: Another alternative of achieving such a functionality is the pattern of a property with `getter/setter`,
 * backed by an internal [StateFlow], e.g. like in [com.speechify.client.bundlers.listening.ListeningBundlerOptions.utteranceBufferSize],
 * but this one may be more useful when the state already has a source of truth:
 * - This method does not create a second source of truth for SDK consumers
 * - This method helps to avoid memory-leaks, because this way SDK manages its listeners' lifecycle, does not leave
 *   any listeners from scopes that already finishes (so, achieves structured-concurrency).
 */
@JsExport
abstract class StateFlowFromCallback<T> {
    /**
     * SDK developers must use only this function, and **not** the `protected` [getCurrentValueAndSubscribeToUpdatesAndGetCancel],
     * so that the SDK-consumers are correctly notified if their implementation is incorrect.
     */
    @OptIn(ExperimentalContracts::class)
    internal fun getCurrentValueAndSubscribeToUpdatesAndGetCancelSafeForStateFlow(
        /**
         * NOTE: This function will be called immediately once before the function returns, so `val`s can
         * be defined before the call and Kotlin compiler will correctly consider them initialized after the call.
         */
        receiveFirstItem: CallbackNoError<T>,
        receiveNonFirstItem: CallbackNoError<T>,
        /**
         * The [DiagnosticEvent.sourceAreaId] to use for logging, to be able to identify the source of the error.
         */
        sourceAreaId: String,
        /**
         * The [DiagnosticEvent.properties] to add to the error.
         */
        diagnosticProperties: Map<String, Any>?,
    ): Destructor {
        contract {
            callsInPlace(receiveFirstItem, InvocationKind.EXACTLY_ONCE)
        }
        val firstItemReceived = AtomicBool(initialValue = false)
        lateinit var currentReceiveItem: AtomicRef<CallbackNoError<T>>
        currentReceiveItem = AtomicRef(
            initialValue = { item ->
                receiveFirstItem(item)
                firstItemReceived.set(true)
                /* First item received, now we can switch to the normal callback */
                currentReceiveItem.set(
                    newValue = receiveNonFirstItem,
                )
            },
        )

        val destructor = getCurrentValueAndSubscribeToUpdatesAndGetCancelNeverThrowingToProducer(
            receiveItem = {
                currentReceiveItem.value(it)
            },
            sourceAreaId = sourceAreaId,
            diagnosticProperties = diagnosticProperties,
        )
        if (firstItemReceived.get().not()) {
            IllegalArgumentException(
                "The `getCurrentValueAndSubscribeToUpdatesAndGetCancel` did not call the `receiveItem` immediately. " +
                    "This is a bug in the implementation of class ${this::class.simpleName ?: this}.",
            )
                .throwAfterRunAddingToSuppressed(
                    blockToRun = destructor,
                )
        }
        return destructor
    }

    /**
     * A safe version of [getCurrentValueAndSubscribeToUpdatesAndGetCancel] that just adds crash-prevention.
     */
    private fun getCurrentValueAndSubscribeToUpdatesAndGetCancelNeverThrowingToProducer(
        receiveItem: CallbackNoError<T>,
        /**
         * The [DiagnosticEvent.sourceAreaId] to use for logging, to be able to identify the source of the error.
         */
        sourceAreaId: String,
        /**
         * The [DiagnosticEvent.properties] to add to the error.
         */
        diagnosticProperties: Map<String, Any>?,
    ): Destructor =
        CallbackBasedProducer(this@StateFlowFromCallback::getCurrentValueAndSubscribeToUpdatesAndGetCancel)
            .wrapWithNeverThrowingToProducer(
                sourceAreaId = sourceAreaId,
                diagnosticProperties = diagnosticProperties,
            )
            .subscribeAndGetCancel(receiveItem)

    /**
     * This function should notify the [receiveItem] of the current connectivity status.
     * NOTE: It is required that:
     * - [receiveItem] is called immediately with the current state, before the function returns, and then again whenever
     *   the state changes. (Such design is the only thread-safe way not to miss any update).
     *   A critical error will be thrown if the function returns without calling [receiveItem].
     *  - the [receiveItem] calls are never made in parallel (this is the only thread-safe way to be able to ensure that
     *   a later call does not override an earlier one)
     *
     * @return a function that, when called, cancels the subscription - it should make the component stop calling
     * [receiveItem] and lose the reference to that function (to avoid memory leaks).
     */
    protected abstract fun getCurrentValueAndSubscribeToUpdatesAndGetCancel(
        /**
         * For the implementor: This function should be called immediately with the current value, before the function
         * returns, and then again whenever the value changes (if the value never changes, call it only once, and return
         * a no-op function).
         */
        receiveItem: CallbackNoError<T>,
    ): Destructor
}

/**
 * An equivalent of [kotlinx.coroutines.flow.stateIn] for [StateFlowFromCallback].
 */
/* Implemented as an extension function and using `<T : Any>` because the current implementation of this function is
 * only safe for non-nullable types - see below #OnlySafeForNonNullableGeneric */
internal fun <T : Any> StateFlowFromCallback<T>.stateIn(
    /**
     * The [DiagnosticEvent.sourceAreaId] to use for logging, to be able to identify the source of the error.
     */
    sourceAreaId: String,
    diagnosticProperties: Map<String, Any>? = null,
    scope: CoroutineScope,
): StateFlow<T> {
    val latestValueMutableSharedFlow = MutableSharedFlow<T>(
        /**
         * Thanks to this, we behave like a [StateFlow] and the latest value is always replayed.
         */
        replay = 1,
        onBufferOverflow = BufferOverflow.DROP_OLDEST,
    )

    val firstItem: T
    val unsubscribe = getCurrentValueAndSubscribeToUpdatesAndGetCancelSafeForStateFlow(
        receiveFirstItem = {
            firstItem = it
        },
        receiveNonFirstItem = {
            latestValueMutableSharedFlow.emitToUnlimited(it)
        },
        sourceAreaId = sourceAreaId,
        diagnosticProperties =
        mapOf(
            "StateFlowFromCallback.actualType" to (this::class.simpleName ?: this.toString()),
        ) +
            diagnosticProperties.orEmpty(),
    )
    scope.launch {
        try {
            awaitCancellation()
        } finally {
            unsubscribe()
        }
    }
        .ignoreValue()
    return latestValueMutableSharedFlow.stateInSameCoroutineUsingFallbackValueWhenReplayEmpty(
        fallbackInitialValue = firstItem,
    )
}

/**
 * An Implementation of [StateFlowFromCallback] that always returns the same value.
 */
@JsExport
class StateFlowFromCallbackWithStaticValue<T>(
    private val staticValue: T,
) : StateFlowFromCallback<T>() {
    override fun getCurrentValueAndSubscribeToUpdatesAndGetCancel(
        receiveItem: CallbackNoError<T>,
    ): Destructor {
        receiveItem(staticValue)
        return {}
    }
}
