package com.speechify.client.api.audio

import com.speechify.client.api.adapters.mediaplayer.LocalMediaPlayerAdapter
import com.speechify.client.api.adapters.offlineMode.OfflineModeStatusProvider
import com.speechify.client.api.content.ContentCursor
import com.speechify.client.api.content.ContentText
import com.speechify.client.api.content.ContentTextUtils
import com.speechify.client.api.content.view.speech.Speech
import com.speechify.client.api.content.view.speech.SpeechSentence
import com.speechify.client.api.diagnostics.DiagnosticEvent
import com.speechify.client.api.diagnostics.Log
import com.speechify.client.api.services.library.offline.MaxConcurrencyOnAudioServerOptionReadOnly
import com.speechify.client.api.util.Callback
import com.speechify.client.api.util.fromCo
import com.speechify.client.api.util.successfully
import com.speechify.client.helpers.content.standard.ContentSequenceCharacteristics
import com.speechify.client.helpers.content.standard.ContentSequenceCharacteristicsOfImmutableAlwaysLiveNoUserEffectContent
import com.speechify.client.helpers.content.standard.getAimedCharsCountInUtteranceOverrideOrNullSafely
import com.speechify.client.internal.services.db.DbService
import com.speechify.client.internal.sqldelight.DownloadedAudioForItem
import com.speechify.client.internal.sync.AtomicInt
import com.speechify.client.internal.util.collections.flows.flowFromAsyncSeedDeferred
import com.speechify.client.internal.util.diagnostics.enriching.createTagProperty
import com.speechify.client.internal.util.diagnostics.enriching.toTextKeyProperty
import com.speechify.client.internal.util.extensions.collections.TransformedBatchResult
import com.speechify.client.internal.util.extensions.collections.flows.mapConcurrentlyLosingOrder
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.flatMapConcat
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.runningReduce
import kotlinx.coroutines.flow.toList
import kotlin.js.JsExport

/**
 * The MediaVoice encapsulates the logic of preparing a [Speech] for synthesis
 * and packaging the result into a media-backed [Utterance]!
 */
internal open class MediaVoice(
    private val mediaPlayerFactory: LocalMediaPlayerAdapter,
    private val speechSynthesisConfig: SpeechSynthesisConfig,
    private val multiSpecMediaSynthesisService: MediaSynthesisService,
    override val voiceSpec: VoiceSpec.VoiceSpecForMediaVoiceFromAudioServer,
    // Exposed for testing-only:
    private val defaultAimedCharsCountInBatch: Int = com.speechify.client.api.audio.defaultAimedCharsCountInBatch,
) : Voice() {

    internal fun withPersistentCachingEnabled(
        downloadedAudioForItem: DownloadedAudioForItem,
        voiceOfPreferenceForOfflineProvider: VoiceOfPreferenceForOfflineProvider,
        offlineModeStatusFlowProvider: OfflineModeStatusProvider.FlowProvider,
        dbService: DbService,
        onGapInPersistentAudioDetected: suspend (ContentCursor) -> Unit,
    ) =
        MediaVoiceCachedPersistently(
            mediaPlayerFactory = mediaPlayerFactory,
            dbService = dbService,
            speechSynthesisConfig = speechSynthesisConfig,
            multiSpecMediaSynthesisService = multiSpecMediaSynthesisService,
            voiceSpec = voiceSpec,
            downloadedAudioForItem = downloadedAudioForItem,
            voiceOfPreferenceForOfflineProvider = voiceOfPreferenceForOfflineProvider,
            offlineModeStatusFlowProvider = offlineModeStatusFlowProvider,
            onGapInPersistentAudioDetected = onGapInPersistentAudioDetected,
            aimedCharsCountInBatch = defaultAimedCharsCountInBatch,
        )

    protected open val mediaSynthesisService by lazy {
        multiSpecMediaSynthesisService.let {
            // Use the below to disable the cache:
            // .asSingleSpecsServiceUncached(
            it.asSingleSpecsServiceCachedInMemory(
                cacheCapacityInCharsOfText =
                speechSynthesisConfig.textToSpeechAudioCacheInMemoryCapacityInCharsOfTextFlow.value,
                audioMediaFormat = speechSynthesisConfig.audioConfig.audioMediaFormat,
                voiceSpec = voiceSpec,
                getFullTextOfSentences = this::getFullTextOfSentences,
                splitSentenceByMaxChars = { sentence, maxCharsCount -> sentence.splitOnMaxCharsCount(maxCharsCount) },
                textToSpeechAudioContextInclusionOption = speechSynthesisConfig,
            )
        }
    }

    protected fun getFullTextOfSentences(sentences: List<SpeechSentence>) =
        /* At present, we just join, and we use a fixed single-space separator, as it is not known for synthesis
           to make use of the real separators like newlines, though it's conceivable for passing the real
           whitespaces to be useful at some point (e.g. poems, songs often use newlines for punctuation, and
           result in an audible pause):
        */
        ContentTextUtils.Format.joinWithFillerSeparatorWithMapping(
            constituentParts = sentences,
            /* Join by empty string because that's how speechmarks are when we use paragraphChunks API
                       (#SeparatorsChangingReferenceTextLength)
                     */
            separator = " ",
            getContentTextFromConstituentPart = { it.text },
        )

    protected fun synthesizingColdFlowWithCachingInformation(
        inputContentFlow: Flow<SpeechSentence>,
        shouldOptimizeForQuickStart: Boolean,
        contentSequenceCharacteristics: ContentSequenceCharacteristics,
    ): Flow<
        TransformedBatchResult<
            SpeechSentence,
            SynthesisResponseWithCachingInformation,
            >,
        > = mediaSynthesisService.synthesizingColdFlow(
        inputContentFlow = inputContentFlow
            .filter {
                /*
                   TODO: See if this is still needed now that we have flows, and there needs to be no continuity in
                   `start` and `end` cursors:

                   Don't attempt to synthesize if text contains no alphanumeric chars.
                   NOTE(anson): this invariant should probably be protected by Speech/SpeechSentence, but atm several
                   components depend on their existing behavior, and it is safer to put this logic here.
                   Specifically, we are solving for https://linear.app/speechify-inc/issue/PLT-2194/stuck-infinite-loading-when-resume-playing-on-finished-document-with
                 */

                isSynthesizableSpeechMarkup(it.textRepresentation)
            },
        shouldOptimizeForQuickStart = shouldOptimizeForQuickStart,
        aimedCharsCountInBatch =
        contentSequenceCharacteristics.getAimedCharsCountInUtteranceOverrideOrNullSafely()
            ?: defaultAimedCharsCountInBatch,
    )

    override fun synthesizingColdFlow(
        inputContentFlow: Flow<SpeechSentence>,
        shouldOptimizeForQuickStart: Boolean,
        contentSequenceCharacteristics: ContentSequenceCharacteristics,
    ): Flow<Utterance> =
        synthesizingColdFlowWithCachingInformation(
            inputContentFlow = inputContentFlow,
            shouldOptimizeForQuickStart = shouldOptimizeForQuickStart,
            contentSequenceCharacteristics = contentSequenceCharacteristics,
        )
            .map {
                val result = it.resultOfTransform.getSynthesizeResponse()
                MediaUtterance(
                    result.mediaUrl,
                    result.speechMarks,
                    speech = Speech(
                        sentencesWithAtLeastOne = it.inputItems,
                    ),
                    text = getFullTextOfSentences(it.inputItems).joinedText,
                    SynthesisLocation.REMOTE,
                    mediaPlayerFactory,
                    voiceMetadata = metadata,
                )
            }

    override fun synthesize(speech: Speech, callback: Callback<Array<Utterance>>) = callback.fromCo {
        return@fromCo synthesizingColdFlow(
            inputContentFlow = speech.sentences.asFlow(),
            shouldOptimizeForQuickStart =
            /* `false` because just below results are combined into one `toList()`, so the synthesis should take
             * maximum size chunks, for the whole call to complete as quickly as possible.
             */
            false,
            /* `ImmutableNoUserEffectContent` because just below results are combined into one `toList()`, so the
             * synthesis should take maximum size chunks, for the whole call to complete as quickly as possible.
             */
            contentSequenceCharacteristics = object :
                ContentSequenceCharacteristicsOfImmutableAlwaysLiveNoUserEffectContent { },
        )
            .toList().toTypedArray()
            .successfully()
    }
}

/**
 * A subclass of [MediaVoice] that uses a persistent cache allowing for offline playback. As well as listening
 * to the [OfflineModeStatusProvider.OfflineModeStatus] to determine if the user is online or offline, and falling back to
 * local synthesis if required.
 */
internal class MediaVoiceCachedPersistently(
    private val mediaPlayerFactory: LocalMediaPlayerAdapter,
    private val dbService: DbService,
    private val speechSynthesisConfig: SpeechSynthesisConfig,
    multiSpecMediaSynthesisService: MediaSynthesisService,
    override val voiceSpec: VoiceSpec.VoiceSpecForMediaVoiceFromAudioServer,
    private val downloadedAudioForItem: DownloadedAudioForItem,
    private val voiceOfPreferenceForOfflineProvider: VoiceOfPreferenceForOfflineProvider,
    private val offlineModeStatusFlowProvider: OfflineModeStatusProvider.FlowProvider,
    /**
     * A function that is called whenever a gap in the persistent audio is detected in the [synthesizingColdFlow].
     * This can be used to notify users that they need to redownload their audio after an SDK update, or adjust their
     * content filtering settings to match the ones used when the audio was originally downloaded.
     */
    private val onGapInPersistentAudioDetected: suspend (ContentCursor) -> Unit,
    // Exposed for testing-only:
    aimedCharsCountInBatch: Int = defaultAimedCharsCountInBatch,
) : MediaVoice(
    mediaPlayerFactory = mediaPlayerFactory,
    speechSynthesisConfig = speechSynthesisConfig,
    multiSpecMediaSynthesisService = multiSpecMediaSynthesisService,
    voiceSpec = voiceSpec,
    defaultAimedCharsCountInBatch = aimedCharsCountInBatch,
) {
    override val mediaSynthesisService = multiSpecMediaSynthesisService.let {
        it.asSingleSpecsServiceCachedPersistently(
            speechifyURI = downloadedAudioForItem.documentUri,
            audioMediaFormat = speechSynthesisConfig.audioConfig.audioMediaFormat,
            voiceSpec = voiceSpec,
            getFullTextOfSentences = this::getFullTextOfSentences,
            splitSentenceByMaxChars = { sentence, maxCharsCount -> sentence.splitOnMaxCharsCount(maxCharsCount) },
            dbService = dbService,
            textToSpeechAudioContextInclusionOption = speechSynthesisConfig,
        )
    }

    @OptIn(ExperimentalCoroutinesApi::class)
    override fun synthesizingColdFlow(
        inputContentFlow: Flow<SpeechSentence>,
        shouldOptimizeForQuickStart: Boolean,
        contentSequenceCharacteristics: ContentSequenceCharacteristics,
    ): Flow<Utterance> = flowFromAsyncSeedDeferred(
        getSeed = {
            voiceOfPreferenceForOfflineProvider.getPreferredOfflineVoice()
        },
    ) { voiceOfPreferenceForOffline ->
        synthesizingColdFlowWithCachingInformation(
            inputContentFlow = inputContentFlow,
            shouldOptimizeForQuickStart = shouldOptimizeForQuickStart,
            contentSequenceCharacteristics = contentSequenceCharacteristics,
        )
            .map { synthesisResultItem ->
                // TODO_NICETOHAVE: Right now the cache will not do a look ahead if only skipping a single sentence would be
                //  enough to get back to cached content and will use the local fallback for potentially more content
                //  than needed. Fixing this would require the PersistentCache to be able to report just a gap and not
                //  a complete miss.
                if (synthesisResultItem.resultOfTransform.isCached
                    .let {
                        it &&
                            SpeechifySDKTestingAudioDownload.shouldProduceGap(
                                    getFullTextOfSentences(synthesisResultItem.inputItems).joinedText,
                                ).not()
                    } ||
                    offlineModeStatusFlowProvider.offlineModeStatusFlow.value ==
                    OfflineModeStatusProvider.OfflineModeStatus.ONLINE
                ) {
                    val result = synthesisResultItem.resultOfTransform.getSynthesizeResponse()
                    flowOf(
                        MediaUtterance(
                            result.mediaUrl,
                            result.speechMarks,
                            speech = Speech(
                                sentencesWithAtLeastOne = synthesisResultItem.inputItems,
                            ),
                            text = getFullTextOfSentences(synthesisResultItem.inputItems).joinedText,
                            SynthesisLocation.REMOTE,
                            mediaPlayerFactory,
                            voiceMetadata = metadata,
                        ),
                    )
                } else {
                    // TODO_NICETOHAVE: There is some bad interaction with the buffering here, we pull content
                    //  multiple pages ahead causing this to be reported prematurely. Not an urgent issue since in the
                    //  happy path users have all pages downloaded, and this would only happen on wrong content
                    //  skipping settings allowing users to readjust early.
                    onGapInPersistentAudioDetected(synthesisResultItem.inputItems.first().start)
                    Log.d(
                        DiagnosticEvent(
                            message = "Falling back to local synthesis due to being offline",
                            sourceAreaId = "Playback.VoiceFromAudioServer",
                            properties = mapOf(
                                createTagProperty(
                                    /** see [createTagProperty] Kdoc for explanation how to query these. */
                                    "Playback.VoiceFromAudioServer.fallbackToLocalSynthesis.fallbackOnOffline",
                                )
                                    .toTextKeyProperty(),
                            ),
                        ),
                    )
                    voiceOfPreferenceForOffline.await()
                        .synthesizingColdFlow(
                            inputContentFlow = synthesisResultItem.inputItems.asFlow(),
                            shouldOptimizeForQuickStart = false,
                            contentSequenceCharacteristics = contentSequenceCharacteristics,
                        )
                }
            }.flatMapConcat { it }
    }

    /**
     * A flow over the content optimized for filling the cache. Supports pulling multiple utterances from the
     * AudioServer at once.
     */
    suspend fun downloadingColdFlow(
        inputContentFlow: Flow<SpeechSentence>,
        contentSequenceCharacteristics: ContentSequenceCharacteristics,
        maxConcurrencyOnAudioServerOption: MaxConcurrencyOnAudioServerOptionReadOnly,
    ): Flow<ContentCursor> {
        return synthesizingColdFlowWithCachingInformation(
            inputContentFlow = inputContentFlow,
            shouldOptimizeForQuickStart = false,
            contentSequenceCharacteristics = contentSequenceCharacteristics,
        ).mapConcurrentlyLosingOrder(maxConcurrencyOnAudioServerOption.maxConcurrencyOnAudioServer) {
            it.resultOfTransform.getSynthesizeResponse()
            getFullTextOfSentences(it.inputItems).joinedText.end
        }.runningReduce { accumulator, value ->
            // Since the above step loses the order, we need to find the max of all the cursors so the reported progress
            // will make sense.
            if (value.isAfterOrAt(accumulator)) {
                value
            } else {
                accumulator
            }
        }
    }
}

interface TextToSpeechAudioCacheCapacityOption {
    /**
     * The flow version of [com.speechify.client.bundlers.listening.ListeningBundlerOptions.textToSpeechAudioCacheInMemoryCapacityInCharsOfTextOverride].
     */
    val textToSpeechAudioCacheInMemoryCapacityInCharsOfTextFlow: StateFlow<Int>
}

interface TextToSpeechAudioContextInclusionOption {
    /**
     * The flow version of [com.speechify.client.bundlers.listening.ListeningBundlerOptions.textToSpeechIncludePrecedingContextForAudioSynthesisOverride].
     */
    val textToSpeechIncludePrecedingContextForAudioSynthesis: StateFlow<Boolean>
}

internal fun VoiceSpec.VoiceSpecForMediaVoice.toVoiceMetadataAsMedia() =
    when (this) {
        is VoiceSpec.Speechify -> VoiceMetadataOfMediaVoice(
            avatarUrl = avatarUrl,
            isPremium = isPremium,
            displayName = displayName,
            engine = getMetadataEngineName(this),
            gender = gender,
            id = this.getMetadataId(),
            languageCode = languageCode,
            spec = this,
        )

        is VoiceSpec.ResembleIO -> VoiceMetadataOfMediaVoice(
            avatarUrl = avatarUrl,
            isPremium = isPremium,
            displayName = displayName,
            engine = getMetadataEngineName(this),
            gender = gender,
            id = this.getMetadataId(),
            languageCode = languageCode,
            spec = this,
        )

        is VoiceSpec.AmazonPolly -> VoiceMetadataOfMediaVoice(
            avatarUrl = avatarUrl,
            isPremium = isPremium,
            displayName = displayName,
            engine = getMetadataEngineName(this),
            gender = gender,
            id = this.getMetadataId(),
            languageCode = languageCode,
            spec = this,
        )

        is VoiceSpec.GoogleWavenet -> VoiceMetadataOfMediaVoice(
            avatarUrl = avatarUrl,
            isPremium = isPremium,
            displayName = displayName,
            engine = getMetadataEngineName(this),
            gender = gender,
            id = this.getMetadataId(),
            languageCode = languageCode,
            spec = this,
        )

        is VoiceSpec.Azure -> VoiceMetadataOfMediaVoice(
            avatarUrl = avatarUrl,
            isPremium = isPremium,
            displayName = displayName,
            engine = getMetadataEngineName(this),
            gender = gender,
            id = this.getMetadataId(),
            languageCode = languageCode,
            spec = this,
        )

        is VoiceSpec.Static -> VoiceMetadataOfMediaVoice(
            avatarUrl = avatarUrl,
            isPremium = isPremium,
            displayName = displayName,
            engine = getMetadataEngineName(this),
            gender = gender,
            /** NOTE: We don't use the [VoiceSpec.Static.id] here - #QuirkVoiceSpecStaticIdUnused
             *   TODO: consider #TODOConsiderRemovingIdFromStaticVoiceSpec and remove this comment if we end up removing
             *   [VoiceSpec.Static.id].
             */
            id = (this as VoiceSpec.VoiceSpecForMediaVoice).getMetadataId(),
            languageCode = languageCode,
            spec = this,
        )

        is VoiceSpec.CVLVoiceSpec -> VoiceMetadataOfMediaVoice(
            avatarUrl = avatarUrl,
            isPremium = isPremium,
            displayName = displayName,
            engine = getMetadataEngineName(this),
            gender = gender,
            id = this.getMetadataId(),
            languageCode = languageCode,
            labels = labels,
            localizedDisplayNames = localizedDisplayNames,
            previewAudioSentence = previewAudioSentence,
            previewAudioUrl = previewAudioUrl,
            spec = this,
        )

        is VoiceSpec.VoiceSpecForMediaVoiceFromAudioServerPersisted -> VoiceMetadataOfMediaVoice(
            avatarUrl = avatarUrl,
            isPremium = isPremium,
            displayName = displayName,
            engine = getMetadataEngineName(this),
            gender = gender,
            id = this.getMetadataId(),
            languageCode = languageCode,
            labels = null,
            localizedDisplayNames = null,
            previewAudioSentence = null,
            previewAudioUrl = null,
            spec = this,
        )
    }

// Don't synthesize anything that doesn't have any alphanumeric chars
internal fun isSynthesizableSpeechMarkup(text: String): Boolean {
    return text.any { it.isLetterOrDigit() }
}

/**
 * Copied here because semantics in this context may evolve differently than those in [VoiceSpec.VoiceSpecForMediaVoiceFromAudioServer]
 * context, depending on use-case of this field.
 */
internal fun getMetadataEngineName(spec: VoiceSpec): String {
    return when (spec) {
        is VoiceSpec.AmazonPolly -> when (spec.pollyEngine) {
            VoiceSpec.AmazonPolly.PollyEngine.STANDARD -> "standard"
            VoiceSpec.AmazonPolly.PollyEngine.NEURAL -> "neural"
            VoiceSpec.AmazonPolly.PollyEngine.NEURAL_LFR -> "neural-lfr"
        }
        is VoiceSpec.GoogleWavenet -> "google"
        is VoiceSpec.LocalSynthesisBackedVoice -> "local"
        is VoiceSpec.ResembleIO -> "resemble"
        is VoiceSpec.Speechify -> "speechify"
        is VoiceSpec.Azure -> "azure"
        is VoiceSpec.Static -> "static"
        is VoiceSpec.VoiceSpecForMediaVoiceFromAudioServerPersisted -> spec.engine
    }
}

internal fun VoiceSpec.VoiceSpecForMediaVoice.getMetadataId(): String {
    return when (this) {
        is VoiceSpec.AmazonPolly -> when (pollyEngine) {
            VoiceSpec.AmazonPolly.PollyEngine.STANDARD -> "${getMetadataEngineName(this).lowercase()}-" +
                "${displayName.lowercase()}-" +
                languageCode.lowercase()
            VoiceSpec.AmazonPolly.PollyEngine.NEURAL -> "${getMetadataEngineName(this).lowercase()}-" +
                "${displayName.lowercase()}-" +
                languageCode.lowercase()
            VoiceSpec.AmazonPolly.PollyEngine.NEURAL_LFR -> "${getMetadataEngineName(this).lowercase()}-" +
                "${displayName.lowercase()}-" +
                languageCode.lowercase()
        }
        is VoiceSpec.GoogleWavenet -> "${getMetadataEngineName(this).lowercase()}-" +
            "${displayName.lowercase()}-" +
            languageCode.lowercase()
        is VoiceSpec.ResembleIO -> "${getMetadataEngineName(this).lowercase()}-" +
            "${displayName.lowercase()}-" +
            languageCode.lowercase()
        is VoiceSpec.Speechify -> "${getMetadataEngineName(this).lowercase()}-" +
            "${displayName.lowercase()}-" +
            languageCode.lowercase()
        is VoiceSpec.Azure -> "${getMetadataEngineName(this).lowercase()}-" +
            "${displayName.lowercase()}-" +
            languageCode.lowercase()
        is VoiceSpec.Static -> "${getMetadataEngineName(this).lowercase()}-" +
            "${displayName.lowercase()}-" +
            languageCode.lowercase()
        is VoiceSpec.VoiceSpecForMediaVoiceFromAudioServerPersisted -> "${getMetadataEngineName(this).lowercase()}-" +
            "${displayName.lowercase()}-" +
            languageCode.lowercase()
    }
}

/**
 * QA feature: For testing of applying fallback offline voice when gaps were detected (e.g. due to content extraction improvements)
 * and the user is offline.
 */
/* Exposed because this case doesn't happen in normal usage, so it's hard to test if it didn't regress. */
@JsExport
object SpeechifySDKTestingAudioDownload {
    /**
     * This can be switched at any moment, and will affect current speech.
     */
    var shouldSimulateGaps: Boolean = false

    private val counter = AtomicInt(1)
    internal fun shouldProduceGap(joinedText: ContentText): Boolean {
        if (shouldSimulateGaps.not()) {
            return false
        }

        val shouldProduceGap = counter.getAndIncrement() % 2 == 0
        Log.dEvent {
            DiagnosticEvent(
                message = if (shouldProduceGap) {
                    "Will produce gap on `${joinedText.text}`"
                } else {
                    "Will not produce gap on `${joinedText.text}`"
                },
                properties = mapOf(),
                sourceAreaId = "SpeechifySDKTestingAudioDownload.shouldProduceGap",
            )
        }

        return shouldProduceGap
    }
}
