package com.speechify.client.internal.util.extensions.collections.flows.sharedFlow

import com.speechify.client.internal.util.collections.flows.channelFlowWithoutItemsLossThatFinishesOnlyThroughCollectionCancel
import com.speechify.client.internal.util.collections.flows.distinctUntilChangedThatFinishesOnlyThroughCollectionCancel
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.StateFlow

/**
 * Converts [this] [SharedFlow] (which may not yet have any value ready), into a perfect [StateFlow] without starting
 * any new coroutine (so this function does not take a [kotlinx.coroutines.CoroutineScope], unlike [kotlinx.coroutines.flow.stateIn]).
 *
 * WARNING: The [this] [SharedFlow] should have minimum replay cache size of 1, otherwise the flow may miss a value
 * (the value last observed by any of the collectors of the resulting [StateFlow] will be used then).
 */
/* Using `<T : Any>` because the current implementation of this function is only safe for non-nullable types. */
internal fun <T : Any> SharedFlow<T>.stateInSameCoroutineUsingFallbackValueWhenReplayEmpty(
    fallbackInitialValue: T,
): StateFlow<T> {
    val currentFallbackValue = MutableStateFlow(value = fallbackInitialValue)

    /** [StateFlow] is claimed to be unstable for inheritance, but inheriting is the only way we can implement this
     * mapping without starting a new coroutine.
     * And we're doing it successfully in more places #InheritingStateFlowDespiteBeingUnstable
     */
    return object : StateFlow<T> {
        override val replayCache: List<T> get() =
            listOfNotNull(
                valueFromReplayCacheOrNull,
            )

        override val value: T
            get() = valueFromReplayCacheOrNull ?: currentFallbackValue.value

        private val valueFromReplayCacheOrNull: T? get() =
            this@stateInSameCoroutineUsingFallbackValueWhenReplayEmpty.replayCache
                .lastOrNull()

        override suspend fun collect(collector: FlowCollector<T>): Nothing =
            /** Need to use `channelFlow` because below we use `collectLatest`, which changes the coroutine scope */
            channelFlowWithoutItemsLossThatFinishesOnlyThroughCollectionCancel {
                /** Prevent double-emit by emitting the fixed value only if the replay cache is empty
                 * (a race condition could still cause a double emit, but this is solved by `distinctUntilChanged` below).
                 */
                if (valueFromReplayCacheOrNull == null) {
                    emit(currentFallbackValue.value)
                }

                this@stateInSameCoroutineUsingFallbackValueWhenReplayEmpty
                    /**
                     * Use [collectLatestFromSharedFlow] to omit anything in the [SharedFlow.replayCache].
                     */
                    .collectLatestFromSharedFlow(
                        action = {
                            this.emit(it)
                        },
                    )
            }
                /** [SharedFlow] does not filter out duplicates, so we need `distinctUntilChanged` to make it
                 * behave like a [StateFlow].
                 */
                .distinctUntilChangedThatFinishesOnlyThroughCollectionCancel()
                .collect(
                    collector = collector,
                )
    }
}
