package com.speechify.client.api.audio

import com.speechify.client.api.content.ContentCursor
import com.speechify.client.api.content.view.speech.SpeechFlow
import com.speechify.client.api.content.view.speech.SpeechSentence
import com.speechify.client.api.services.library.offline.MaxConcurrencyOnAudioServerOptionReadOnly
import com.speechify.client.api.util.Result
import com.speechify.client.api.util.orThrow
import com.speechify.client.helpers.content.standard.ContentSequenceCharacteristics
import com.speechify.client.helpers.content.standard.ContentSequenceCharacteristicsOfImmutableAlwaysLiveNoUserEffectContent
import com.speechify.client.internal.sync.AtomicRef
import com.speechify.client.internal.util.collections.flows.flowFromAsyncSeed
import com.speechify.client.internal.util.extensions.collections.flows.BufferObserver
import com.speechify.client.internal.util.extensions.collections.flows.BufferedFlowMeasurement
import com.speechify.client.internal.util.extensions.collections.flows.ProducerExceptionBehavior
import com.speechify.client.internal.util.extensions.collections.flows.bufferWithMeasurement
import com.speechify.client.internal.util.extensions.collections.flows.mapFirst
import com.speechify.client.internal.util.intentSyntax.ifNotNull
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.transform

internal abstract class UtteranceFlowProvider {
    /**
     * The method that player-stack will use for playback (separation required because [getUtterancesFlow] may return
     * utterances that start before the cursor).
     * TODO - consider making the contract with [getUtterancesFlow] implementors more explicit so that it's easy to see
     *  where slicing is necessary (e.g. they could return an instance of a `sealed` class where they need to specify
     *  if the first utterances needs slicing or not, so that we don't do it twice where they are already sliced).
     */
    fun getUtterancesFlowPlayingExactlyFromCursor(
        speechFlow: SpeechFlow,
        voiceOfPreferenceProvider: VoiceOfPreferenceProviderWithCache,
        startingCursor: ContentCursor?,
        preSpeechTransform: PreSpeechTransformOptions,
        shouldOptimizeForQuickStart: Boolean,
        reportBufferMeasurement: (measurement: BufferedFlowMeasurement<Utterance>) -> Unit,
        contentSequenceCharacteristics: ContentSequenceCharacteristics,
        utteranceBufferSizeOption: UtteranceBufferSizeOption,
        bufferObserver: BufferObserver<Utterance>,
    ): Flow<Utterance> =
        getUtterancesFlow(
            speechFlow = speechFlow,
            voiceOfPreferenceProvider = voiceOfPreferenceProvider,
            startingCursor = startingCursor,
            preSpeechTransform = preSpeechTransform,
            shouldOptimizeForQuickStart = shouldOptimizeForQuickStart,
            reportBufferMeasurement = reportBufferMeasurement,
            contentSequenceCharacteristics = contentSequenceCharacteristics,
            utteranceBufferSizeOption = utteranceBufferSizeOption,
            bufferObserver = bufferObserver,
        )
            .let { flow ->
                if (startingCursor == null) {
                    flow
                } else {
                    flow.mapFirst {
                        /**
                         * This will make the playback start from this cursor, as per #SeekingToSlice
                         */
                        it.sliceFrom(start = startingCursor)
                    }
                }
            }

    /**
     * The method through which implementors need to provide the Utterances.
     * NOTE: The first utterance must contain the [startingCursor], but it doesn't need to start on it, because the
     * playback stack will make sure it plays at [startingCursor].
     */
    protected abstract fun getUtterancesFlow(
        speechFlow: SpeechFlow,
        voiceOfPreferenceProvider: VoiceOfPreferenceProviderWithCache,
        startingCursor: ContentCursor?,
        preSpeechTransform: PreSpeechTransformOptions,
        shouldOptimizeForQuickStart: Boolean,
        reportBufferMeasurement: (measurement: BufferedFlowMeasurement<Utterance>) -> Unit,
        contentSequenceCharacteristics: ContentSequenceCharacteristics,
        utteranceBufferSizeOption: UtteranceBufferSizeOption,
        bufferObserver: BufferObserver<Utterance>,
    ): Flow<Utterance>
}

/**
 * A narrowed-down [UtteranceFlowProvider] for expressing the requirement of a component that is not preoccupied with
 * seeking and starting from new places, but rather just playing a single continuous sequence of utterances.
 */
internal interface UtteranceFlowProviderWithPredefinedStartingPoint {
    suspend fun getUtterancesFlow(
        shouldOptimizeForQuickStart: Boolean,
        reportBufferMeasurement: (measurement: BufferedFlowMeasurement<Utterance>) -> Unit,
    ): Flow<Utterance>
}

internal fun UtteranceFlowProvider.toUtteranceFlowProviderWithPredefinedStartingPoint(
    speechFlow: Flow<SpeechSentence>,
    startingCursor: ContentCursor?,
    voiceOfPreferenceProvider: VoiceOfPreferenceProviderWithCache,
    preSpeechTransform: PreSpeechTransformOptions,
    contentSequenceCharacteristics: ContentSequenceCharacteristics,
    utteranceBufferSizeOption: UtteranceBufferSizeOption,
    bufferObserver: BufferObserver<Utterance>,
): UtteranceFlowProviderWithPredefinedStartingPoint =
    object : UtteranceFlowProviderWithPredefinedStartingPoint {
        override suspend fun getUtterancesFlow(
            shouldOptimizeForQuickStart: Boolean,
            reportBufferMeasurement: (measurement: BufferedFlowMeasurement<Utterance>) -> Unit,
        ): Flow<Utterance> =
            getUtterancesFlowPlayingExactlyFromCursor(
                speechFlow = speechFlow,
                voiceOfPreferenceProvider = voiceOfPreferenceProvider,
                startingCursor = startingCursor,
                preSpeechTransform = preSpeechTransform,
                shouldOptimizeForQuickStart = shouldOptimizeForQuickStart,
                reportBufferMeasurement = reportBufferMeasurement,
                contentSequenceCharacteristics = contentSequenceCharacteristics,
                utteranceBufferSizeOption = utteranceBufferSizeOption,
                bufferObserver = bufferObserver,
            )
    }

internal object UtteranceFlowProviderFromSpeechFlow : UtteranceFlowProvider() {

    override fun getUtterancesFlow(
        speechFlow: SpeechFlow,
        voiceOfPreferenceProvider: VoiceOfPreferenceProviderWithCache,
        startingCursor: ContentCursor?,
        preSpeechTransform: PreSpeechTransformOptions,
        shouldOptimizeForQuickStart: Boolean,
        reportBufferMeasurement: (measurement: BufferedFlowMeasurement<Utterance>) -> Unit,
        contentSequenceCharacteristics: ContentSequenceCharacteristics,
        utteranceBufferSizeOption: UtteranceBufferSizeOption,
        bufferObserver: BufferObserver<Utterance>,
    ): Flow<Utterance> = flowFromAsyncSeed(
        getSeed = { voiceOfPreferenceProvider.getPreferredVoiceWithCache() },
    ) { voice ->
        voice.synthesizingColdFlow(
            inputContentFlow = speechFlow
                .transform { sentence ->
                    // We apply all pre-speech transforms in order.
                    val sentenceTransformed =
                        preSpeechTransform
                            .currentSkippingSettings
                            .value
                            .toSentenceTransformers()
                            .fold<SentenceTransformer, SpeechSentence?>(sentence) { transformableSentence, transform ->
                                if (transformableSentence == null) {
                                    return@fold null
                                } else {
                                    return@fold transform.invoke(
                                        transformableSentence,
                                        voice.metadata.languageIdentity,
                                    )
                                }
                            }

                    ifNotNull(sentenceTransformed) {
                        emit(it)
                    }
                },
            shouldOptimizeForQuickStart = shouldOptimizeForQuickStart,
            contentSequenceCharacteristics = contentSequenceCharacteristics,
        )
            .bufferWithMeasurement(
                reportMeasurement = reportBufferMeasurement,
                bufferSize =
                /* We choose the RENDEZVOUS (0) buffer size for content requiring user interaction for pulling.
                   In other cases, we go with the specified buffer size.
                 */
                if (contentSequenceCharacteristics.doesPullingContentCauseEffectOnUser) {
                    Channel.RENDEZVOUS
                } else {
                    utteranceBufferSizeOption.utteranceBufferSizeFlow.value
                },
                producerExceptionBehavior = ProducerExceptionBehavior.SuppressAndDeferFailureToConsumer,
                bufferObserver = bufferObserver,
            )
    }

    suspend fun getDownloadFlow(
        speechFlow: SpeechFlow,
        voice: MediaVoiceCachedPersistently,
        preSpeechTransform: PreSpeechTransformOptions,
        maxConcurrencyOnAudioServerOption: MaxConcurrencyOnAudioServerOptionReadOnly,
    ): Flow<ContentCursor> {
        return voice.downloadingColdFlow(
            inputContentFlow = speechFlow
                .transform { sentence ->
                    // We apply all pre-speech transforms in order.
                    val sentenceTransformed =
                        preSpeechTransform
                            .currentSkippingSettings
                            .value
                            .toSentenceTransformers()
                            .fold<SentenceTransformer, SpeechSentence?>(sentence) { transformableSentence, transform ->
                                if (transformableSentence == null) {
                                    return@fold null
                                } else {
                                    return@fold transform.invoke(
                                        transformableSentence,
                                        voice.metadata.languageIdentity,
                                    )
                                }
                            }

                    ifNotNull(sentenceTransformed) {
                        emit(it)
                    }
                },
            contentSequenceCharacteristics = object :
                ContentSequenceCharacteristicsOfImmutableAlwaysLiveNoUserEffectContent { },
            maxConcurrencyOnAudioServerOption = maxConcurrencyOnAudioServerOption,
        )
    }
}

/**
 * Exposes a way to set the utterance buffer size
 */
internal interface UtteranceBufferSizeOption {
    val utteranceBufferSizeFlow: StateFlow<Int>
}

internal class UtteranceFlowProviderFromSingleStaticUtterance(
    val getUtterance: suspend () -> Result<Utterance>,
) : UtteranceFlowProvider() {
    private val cachedSingleUtterance = AtomicRef<Utterance?>(null)

    override fun getUtterancesFlow(
        speechFlow: SpeechFlow,
        voiceOfPreferenceProvider: VoiceOfPreferenceProviderWithCache,
        startingCursor: ContentCursor?,
        preSpeechTransform: PreSpeechTransformOptions,
        shouldOptimizeForQuickStart: Boolean,
        /**
         * Unused here because there is only one utterance and, as per [BufferedFlowMeasurement] there is
         * no measurement in such a situation.
         */
        reportBufferMeasurement: (measurement: BufferedFlowMeasurement<Utterance>) -> Unit,
        contentSequenceCharacteristics: ContentSequenceCharacteristics,
        utteranceBufferSizeOption: UtteranceBufferSizeOption,
        bufferObserver: BufferObserver<Utterance>,
    ): Flow<Utterance> =
        flow {
            when (val utterance = cachedSingleUtterance.value) {
                null -> {
                    getUtterance().orThrow().also {
                        cachedSingleUtterance.swap(it)
                        emit(it)
                    }
                }

                else -> {
                    /*
                     HACK(anson): this delay is necessary to prevent a race condition... somewhere. It manifests
                     on Android as "play button becomes unresponsive after audioController.seek", among others.
                     See Slack for full details.
                     https://speechifyworkspace.slack.com/archives/C04NADQUX7W/p1679525135807029?thread_ts=1679522719.618249&cid=C04NADQUX7W
                     */
                    delay(200)
                    emit(utterance)
                }
            }
        }
}
