@file:OptIn(InternalSerializationApi::class, ExperimentalSerializationApi::class)

package com.speechify.client.api.services.library.offline

import app.cash.sqldelight.async.coroutines.awaitAsList
import com.speechify.client.api.ClientConfig
import com.speechify.client.api.SpeechifyClient
import com.speechify.client.api.SpeechifyURI
import com.speechify.client.api.SpeechifyVersions
import com.speechify.client.api.adapters.firebase.DataSource
import com.speechify.client.api.adapters.firebase.FirebaseFirestoreDocumentSnapshot
import com.speechify.client.api.adapters.firebase.FirebaseFirestoreService
import com.speechify.client.api.adapters.firebase.GoogleCloudStorageUriFileId
import com.speechify.client.api.adapters.firebase.coGetDocument
import com.speechify.client.api.adapters.offlineMode.OfflineModeStatusProvider
import com.speechify.client.api.audio.AudioConfig
import com.speechify.client.api.audio.AudioMediaFormat
import com.speechify.client.api.audio.MediaVoice
import com.speechify.client.api.audio.PreSpeechTransformOptions
import com.speechify.client.api.audio.SpeechSynthesisConfig
import com.speechify.client.api.audio.UtteranceFlowProviderFromSpeechFlow
import com.speechify.client.api.audio.Voice
import com.speechify.client.api.audio.VoiceFactory
import com.speechify.client.api.audio.VoiceOfPreferenceForOfflineProvider
import com.speechify.client.api.audio.VoiceSpec
import com.speechify.client.api.audio.caching.VoiceIdForDb
import com.speechify.client.api.audio.caching.getIdForDb
import com.speechify.client.api.audio.createVoice
import com.speechify.client.api.content.ContentCursor
import com.speechify.client.api.content.coGetProgressFromCursor
import com.speechify.client.api.content.coGetStats
import com.speechify.client.api.content.ml.MLParsingMode
import com.speechify.client.api.content.ocr.OcrFallbackStrategy
import com.speechify.client.api.content.pdf.ContentSortingStrategy
import com.speechify.client.api.diagnostics.DiagnosticEvent
import com.speechify.client.api.diagnostics.Log
import com.speechify.client.api.googleCloudStorageUriFileIdFromUrl
import com.speechify.client.api.services.ebook.EncryptedDownloadService
import com.speechify.client.api.services.library.LibraryServiceDelegate
import com.speechify.client.api.services.library.getContentLibraryItemFromFirestore
import com.speechify.client.api.services.library.models.ContentType
import com.speechify.client.api.services.library.models.LibraryItem
import com.speechify.client.api.services.library.offline.ClientScopeAudioDownloadOptions.Companion.DEFAULT_MAX_CONCURRENCY_ON_AUDIO_SERVER
import com.speechify.client.api.services.library.offline.ClientScopeAudioDownloadOptions.Companion.DEFAULT_MAX_DOCUMENT_WORD_COUNT
import com.speechify.client.api.telemetry.LibraryItemContentTypeTelemetryProp
import com.speechify.client.api.telemetry.addMeasurement
import com.speechify.client.api.telemetry.addProperties
import com.speechify.client.api.telemetry.addPropertiesNonNull
import com.speechify.client.api.telemetry.currentTelemetryEvent
import com.speechify.client.api.telemetry.withTelemetryOfFlowCollectingAllItems
import com.speechify.client.api.util.Callback
import com.speechify.client.api.util.boundary.CancellableJobWithIntermediateResults
import com.speechify.client.api.util.boundary.toCancellableJobWithIntermediateResultsIn
import com.speechify.client.api.util.collections.flows.CallbackFlowSourceFromCollectWithResult
import com.speechify.client.api.util.collections.flows.NeverEndingCallbackStateFlowOfNonNulls
import com.speechify.client.api.util.collections.flows.toCallbackFlowSourceFromCollectWithResult
import com.speechify.client.api.util.collections.flows.toNeverEndingCallbackStateFlowOfNonNulls
import com.speechify.client.api.util.fromCoWithErrorLogging
import com.speechify.client.api.util.isCausedByConnectionError
import com.speechify.client.api.util.orThrow
import com.speechify.client.api.util.successfully
import com.speechify.client.api.util.toNullSuccessIfResourceNotFound
import com.speechify.client.bundlers.BundlerFactoryConfig
import com.speechify.client.bundlers.content.ContentBundle
import com.speechify.client.bundlers.content.ContentBundlerConfig
import com.speechify.client.bundlers.content.ContentBundlerOptions
import com.speechify.client.bundlers.content.SpeechifyContentBundler
import com.speechify.client.bundlers.listening.ListeningBundlerConfig
import com.speechify.client.bundlers.listening.ListeningBundlerOptions
import com.speechify.client.bundlers.listening.VoicePreferences
import com.speechify.client.bundlers.reading.book.BookReadingBundlerConfig
import com.speechify.client.bundlers.reading.book.BookReadingBundlerOptions
import com.speechify.client.bundlers.reading.classic.ClassicReadingBundlerConfig
import com.speechify.client.bundlers.reading.classic.ClassicReadingBundlerOptions
import com.speechify.client.bundlers.reading.embedded.EmbeddedReadingBundlerConfig
import com.speechify.client.bundlers.reading.embedded.EmbeddedReadingBundlerOptions
import com.speechify.client.helpers.constants.SpeechifyVoiceSpecifications
import com.speechify.client.helpers.features.ProgressFraction
import com.speechify.client.internal.WithScope
import com.speechify.client.internal.caching.ReadWriteThroughCachedFirebaseStorage
import com.speechify.client.internal.services.db.DbBoolean
import com.speechify.client.internal.services.db.DbService
import com.speechify.client.internal.services.db.asFlowOfListWithCurrentAndChanges
import com.speechify.client.internal.services.db.awaitAsFirstOrNull
import com.speechify.client.internal.services.editing.getEditsPath
import com.speechify.client.internal.services.library.getLegacyPagesCollectionRef
import com.speechify.client.internal.services.library.getLibraryItemPath
import com.speechify.client.internal.services.library.getLocationsCollectionRef
import com.speechify.client.internal.services.library.models.FirebaseLibraryItem
import com.speechify.client.internal.services.scannedbook.PlatformScannedBookService
import com.speechify.client.internal.services.scannedbook.PlatformScannedBookService.Companion.PAGE_OF_ORIGINAL_DOC__ID_PLACEHOLDER
import com.speechify.client.internal.services.scannedbook.getScannedPagePath
import com.speechify.client.internal.services.scannedbook.getScannedPagesCollectionRef
import com.speechify.client.internal.sqldelight.DownloadedAudioForItem
import com.speechify.client.internal.sqldelight.VoiceCacheQueries
import com.speechify.client.internal.sync.AtomicInt
import com.speechify.client.internal.sync.increment
import com.speechify.client.internal.util.collections.flows.channelFlowWithCapacityRequired
import com.speechify.client.internal.util.collections.flows.flowFromAsyncGetFlow
import com.speechify.client.internal.util.collections.flows.flowFromAsyncSeed
import com.speechify.client.internal.util.collections.flows.onCompletionExceptional
import com.speechify.client.internal.util.collections.flows.onCompletionSuccessfully
import com.speechify.client.internal.util.collections.maps.BlockingThreadsafeMap
import com.speechify.client.internal.util.collections.maps.LockingMapWithSubscribableValues
import com.speechify.client.internal.util.collections.maps.MapWithSubscribableValues
import com.speechify.client.internal.util.collections.maps.getCurrentOrNull
import com.speechify.client.internal.util.diagnostics.enriching.addTagProperties
import com.speechify.client.internal.util.extensions.collections.flows.all
import com.speechify.client.internal.util.extensions.throwable.addCustomProperty
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.NonCancellable
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.SharingStarted
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.flatMapMerge
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.merge
import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.onStart
import kotlinx.coroutines.flow.stateIn
import kotlinx.coroutines.flow.takeWhile
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.supervisorScope
import kotlinx.coroutines.sync.Semaphore
import kotlinx.coroutines.sync.withPermit
import kotlinx.coroutines.withContext
import kotlinx.serialization.ExperimentalSerializationApi
import kotlinx.serialization.InternalSerializationApi
import kotlinx.serialization.KSerializer
import kotlinx.serialization.Serializable
import kotlinx.serialization.descriptors.SerialDescriptor
import kotlinx.serialization.encoding.Decoder
import kotlinx.serialization.encoding.Encoder
import kotlinx.serialization.serializer
import kotlin.js.JsExport

/**
 * Fraction in range <0.0, 1.0> (inclusive), with 0 representing the beginning and 1 the end.
 */
typealias ProgressFraction = Double

/**
 * The OfflineAvailabilityManager exposes functionality related to:
 * - Checking if library items are available offline
 * - Downloading library items for offline use
 * - Deleting library items that are no longer needed
 */
@JsExport
class OfflineAvailabilityManager internal constructor(
    private val clientConfig: ClientConfig,
    private val firebaseStorageCache: ReadWriteThroughCachedFirebaseStorage,
    private val firestoreService: FirebaseFirestoreService,
    private val dbService: DbService,
    private val ebookService: EncryptedDownloadService,
) : WithScope() {

    companion object {
        const val MAXIMUM_PARALLEL_RESOURCE_DOWNLOADS = 8
    }

    private val runningDownloads =
        BlockingThreadsafeMap<SpeechifyURI, CancellableJobWithIntermediateResults<ProgressFraction, Unit>>()

    private val availabilityStatusCache: MapWithSubscribableValues<OfflineAvailabilityStatus> =
        LockingMapWithSubscribableValues(
            this.scope,
        )

    private lateinit var speechifyClient: SpeechifyClient
    private lateinit var createBundlerDependencies: (
        bundlerFactoryConfig: BundlerFactoryConfig,
    ) -> BundlerDependencies
    private lateinit var libraryServiceDelegate: LibraryServiceDelegate

    /**
     * There is a circular dependency between [LibraryServiceDelegate] and [OfflineAvailabilityManager].
     * The [SpeechifyContentBundler] is also only available after all other services are created so can only be
     * late injected.
     */
    internal fun lateInject(
        speechifyClient: SpeechifyClient,
        // Extracted as a functional parameter to allow easier mocking in tests.
        createBundlerDependencies: (
            bundlerFactoryConfig: BundlerFactoryConfig,
        ) -> BundlerDependencies = { bundlerFactoryConfig: BundlerFactoryConfig ->
            speechifyClient.createBundlerFactory(
                bundlerFactoryConfig,
            )
                .let {
                    BundlerDependencies(
                        speechifyContentBundler = it.speechifyContentBundler,
                        voiceFactory = it.listeningBundler.bundlerPlugins.voiceFactory,
                    )
                }
        },
        libraryServiceDelegate: LibraryServiceDelegate,
    ) {
        this.speechifyClient = speechifyClient
        this.createBundlerDependencies = createBundlerDependencies
        this.libraryServiceDelegate = libraryServiceDelegate
    }

    @JsExport.Ignore
    internal class BundlerDependencies(
        val speechifyContentBundler: SpeechifyContentBundler,
        val voiceFactory: VoiceFactory,
    )

    /**
     * Starts listening to the offline availability status of the given item. Will be called with all status updates
     * until the observation is stopped.
     */
    fun getOfflineAvailabilityStatusFlow(
        uri: SpeechifyURI,
        callback: Callback<NeverEndingCallbackStateFlowOfNonNulls<OfflineAvailabilityStatus>>,
    ) =
        callback.fromCoWithErrorLogging(
            sourceAreaId = "OfflineAvailabilityManager.getOfflineAvailabilityStatusFlow",
        ) {
            return@fromCoWithErrorLogging getOfflineAvailabilityStatusFlow(uri)
                .toNeverEndingCallbackStateFlowOfNonNulls()
                .successfully()
        }

    /**
     * Gets the offline availability for a library item.
     */
    internal fun getOfflineAvailabilityStatusFlow(
        uri: SpeechifyURI,
    ): StateFlow<OfflineAvailabilityStatus?> = flow {
        availabilityStatusCache.getSubscribableValueOrPut(uri.toString()) {
            // We trigger this update in the background after the flow was already returned to the caller.
            // This is also why the returned flow is nullable, to account for the time until this is done.
            getOfflineAvailabilityStatus(uri)
        }.collect {
            emit(it)
        }
    }.stateIn(scope, SharingStarted.Eagerly, null)

    /**
     * If the status for the given URI is in the cache, this will re-trigger the calculation of the status.
     * If the status is not in the cache, this will do nothing.
     */
    internal suspend fun refreshOfflineAvailabilityStatusIfCached(
        uri: SpeechifyURI,
    ) {
        availabilityStatusCache.getSubscribableValue(uri.toString())?.let {
            availabilityStatusCache.put(uri.toString(), getOfflineAvailabilityStatus(uri))
        }
    }

    private suspend fun getOfflineAvailabilityStatus(uri: SpeechifyURI): OfflineAvailabilityStatus {
        try {
            // If it's not being downloaded or cached check the actual status.
            val allItemsAvailable = requiredItemsFlow(
                uri,
                shouldQueryCacheOnly = true,
            ).all(maxConcurrency = MAXIMUM_PARALLEL_RESOURCE_DOWNLOADS) {
                supervisorScope {
                    it.isAvailableOffline()
                }
            }

            return if (allItemsAvailable) {
                OfflineAvailabilityStatus.AVAILABLE
            } else {
                OfflineAvailabilityStatus.NOT_AVAILABLE
            }
        } catch (e: UnsupportedContentException) {
            return OfflineAvailabilityStatus.NOT_AVAILABLE
        } catch (e: Throwable) {
            if (e.isCausedByConnectionError()) {
                return OfflineAvailabilityStatus.NOT_AVAILABLE
            } else {
                throw e
            }
        }
    }

    internal fun makeLibraryItemAvailableOfflineFlow(
        uri: SpeechifyURI,
        audioDownloadOptions: AudioDownloadOptions?,
    ): Flow<ProgressFraction> = withTelemetryOfFlowCollectingAllItems(
        telemetryEventName = "OfflineAvailabilityManager.makeLibraryItemAvailableOffline",
        properties = listOf(
            "uri" to uri.toString(),
        ),
    ) { telemetry ->
        flowFromAsyncSeed(
            getSeed = {
                (
                    availabilityStatusCache.getCurrentOrNull(uri.toString())
                        ?: OfflineAvailabilityStatus.NOT_AVAILABLE
                    )
                    .let { currentStatus ->
                        currentStatus as? OfflineAvailabilityStatusAtRest
                            ?: throw IllegalStateException(
                                "Report to SDK maintainers - should not allow start download for item " +
                                    "that is currently $currentStatus.",
                            )
                    }
                    .also {
                        telemetry.addProperties(
                            "offlineAvailabilityStatusAtStart" to it,
                        )
                    }
            },
            getFlow = { offlineAvailabilityStatusAtStart ->
                downloadItemsFlow(
                    uriToDownload = uri,
                    audioDownloadOptions = audioDownloadOptions,
                    audioDownloadRequestedStartCursor = null,
                    offlineAvailabilityStatusAtStart = offlineAvailabilityStatusAtStart,
                )
                    .onStart {
                        availabilityStatusCache.put(
                            uri.toString(),
                            OfflineAvailabilityStatus.DOWNLOADING,
                        )
                    }
                    .onCompletionExceptional(
                        onNonCancellationException = {
                            withContext(NonCancellable) {
                                availabilityStatusCache.put(
                                    key = uri.toString(),
                                    /*
                                     * While we don't have any error state for offline availability, we need to
                                     * stop the state from being a 'pending' state one:
                                     *
                                     * We are being careful here to not corrupt the original status, for example if a download
                                     * for an already cached item is triggered while offline, on exception it should go back to
                                     * available.
                                     */
                                    value = offlineAvailabilityStatusAtStart as
                                        OfflineAvailabilityStatus,
                                )
                            }
                            /**
                             * NOTE: In case of a non-cancellation exception, we don't remove any download
                             * progress that managed to succeed.
                             * Non-cancellations are interruptions against user's intent (includes connection
                             * errors). As such, we can assume it's intended to be retried and eventually
                             * still fulfill the intent, and not removing any download progress that
                             * managed to succeed can speed up fulfilling of the retry.
                             */
                        },
                        onCancellationException = {
                            /**
                             * User has cancelled, meaning they don't want what we started to download. Now need to
                             * establish what it was, and remove appropriately.
                             */
                            withContext(NonCancellable) {
                                if (audioDownloadOptions == null) {
                                    /**
                                     * No audio was being downloaded, so user wanted to download the document.
                                     * We can remove it by reusing the 'entire item removal' logic of  `removeLibraryItemFromOffline`.
                                     */
                                    /**
                                     * We don't worry about `*UnsafeIfConcurrent` because we are already running exclusively thanks to `runningDownloads.getOrPut` being thread-safe and exclusive.
                                     */
                                    removeLibraryItemFromOfflineUnsafeIfConcurrent(
                                        uri = uri,
                                    )
                                    availabilityStatusCache.put(
                                        uri.toString(),
                                        OfflineAvailabilityStatus.NOT_AVAILABLE,
                                    )
                                } else {
                                    /**
                                     * Audio was being downloaded ...
                                     */
                                    when (offlineAvailabilityStatusAtStart) {
                                        OfflineAvailabilityStatus.AVAILABLE -> {
                                            /**
                                             * ... in this case, it was an already-available item, so we just remove the audio.
                                             */
                                            /**
                                             * We don't worry about `*UnsafeIfConcurrent` because we are already running exclusively thanks to `runningDownloads.getOrPut` being thread-safe and exclusive.
                                             */
                                            dbService.getVoiceCacheQueries()
                                                .removeDownloadedAudioForVoiceUnsafeIfConcurrent(
                                                    uri = uri,
                                                    voiceId = audioDownloadOptions.voice.getIdForDb(),
                                                    reclaimSpace = true,
                                                )
                                            availabilityStatusCache.put(
                                                uri.toString(),
                                                OfflineAvailabilityStatus.AVAILABLE,
                                            )
                                        }

                                        OfflineAvailabilityStatus.NOT_AVAILABLE -> {
                                            /**
                                             * ... in this case, it was an initially-unavailable item, so we remove everything.
                                             */
                                            /**
                                             * We don't worry about `*UnsafeIfConcurrent` because we are already running exclusively thanks to `runningDownloads.getOrPut` being thread-safe and exclusive.
                                             */
                                            removeLibraryItemFromOfflineUnsafeIfConcurrent(
                                                uri = uri,
                                            )
                                            availabilityStatusCache.put(
                                                uri.toString(),
                                                OfflineAvailabilityStatus.NOT_AVAILABLE,
                                            )
                                        }
                                    }
                                }
                            }
                        },
                    )
                    .onCompletionSuccessfully {
                        // If the download succeeded, we update the cached status.
                        withContext(NonCancellable) {
                            availabilityStatusCache.put(
                                uri.toString(),
                                OfflineAvailabilityStatus.AVAILABLE,
                            )
                        }
                    }
                    .onCompletion {
                        /* This runs on any completion (successfully or with an exception) */
                        /* We must signal it as no longer running */
                        runningDownloads.remove(uri)
                    }
            },
        )
    }

    /**
     * This will start the process to make the item available offline.
     * The returned [CancellableJobWithIntermediateResults] will report progress updates, and a final result.
     * In addition, calling [CancellableJobWithIntermediateResults.cancel] will cancel the download, and remove
     * any files that were already downloaded.
     *
     * Calling this multiple times for the same item will return the same [CancellableJobWithIntermediateResults].
     * You can attach as many progress listeners as you want.
     *
     * Calling this for an item where the download was already finished, will trigger the download process again.
     *
     * Returns a [CancellableJobWithIntermediateResults] that will report progress updates, and a final result.
     *
     * To download audio, specify a non-null [audioDownloadOptions]. If this is the case then:
     * - The audio-download request fulfillment can be tracked by this function's progress, but also in,
     *   [audioDownloadsForAllItems], [com.speechify.client.api.services.library.models.LibraryItem.Content.audioDownloads]
     *   and [com.speechify.client.bundlers.listening.ListeningBundle.voicesAudioDownloads].

     * - [ErrorOfExceedingAudioDownloadMaxDocumentWordCount] will be thrown if the document exceeds the maximum word count.
     *   To test if it was the cause of the error, use [com.speechify.client.api.util.SDKError.getCauseThatMatchesOrNull]
     *   with a predicate that matches instances of [ErrorOfExceedingAudioDownloadMaxDocumentWordCount]].
     */
    @Suppress("NON_EXPORTABLE_TYPE")
    fun makeLibraryItemAvailableOffline(
        uri: SpeechifyURI,
        audioDownloadOptions: AudioDownloadOptions?,
    ): CancellableJobWithIntermediateResults<ProgressFraction, Unit> =
        runningDownloads.getOrPut(
            key = uri,
            defaultValue = {
                makeLibraryItemAvailableOfflineFlow(
                    uri,
                    audioDownloadOptions,
                ).toCancellableJobWithIntermediateResultsIn(
                    sourceAreaId = "OfflineAvailabilityManager.makeLibraryItemAvailableOffline",
                    scope = scope,
                )
            },
        )

    /**
     * This will remove the item from the offline cache.
     * Returns true if the item was removed, false if it was not found in the offline cache.
     * This will return an error if the item is currently being downloaded.
     *
     * See also [removeLibraryItemDownloadedAudioForVoice] for removing only the audio for a specific voice.
     */
    fun removeLibraryItemFromOffline(
        uri: SpeechifyURI,
        callback: Callback<Boolean>,
    ) = callback.fromCoWithErrorLogging(
        sourceAreaId = "OfflineAvailabilityManager.removeLibraryItemFromOffline",
    ) {
        removeResourceDownloadedForOfflineSafely(uri) {
            removeLibraryItemFromOfflineUnsafeIfConcurrent(uri)
        }
            .successfully()
    }

    private suspend fun removeLibraryItemFromOfflineUnsafeIfConcurrent(
        uri: SpeechifyURI,
    ): Boolean {
        val maxConcurrencyGuardingSemaphore = Semaphore(MAXIMUM_PARALLEL_RESOURCE_DOWNLOADS)
        val wasAnythingFromItemRemovedFromCache: Boolean = try {
            requiredItemsFlow(uri, shouldQueryCacheOnly = true)
                .map {
                    this@OfflineAvailabilityManager.scope.async {
                        supervisorScope {
                            maxConcurrencyGuardingSemaphore.withPermit {
                                it.removeFromCache()
                            }
                        }
                    }
                }.toList()
                .awaitAll()
                .any { wasRemoved -> wasRemoved }
                .also {
                    availabilityStatusCache.put(
                        uri.toString(),
                        OfflineAvailabilityStatus.NOT_AVAILABLE,
                    )
                }
        } catch (e: Throwable) {
            if (e.isCausedByConnectionError()) {
                false
            } else {
                throw e
            }
        }

        val wasAudioRemoved = removeDownloadedAudioForAllVoicesUnsafeIfConcurrent(uri)

        // We check if any of the items were actually deleted and use that as the return value.
        return wasAnythingFromItemRemovedFromCache || wasAudioRemoved
    }

    /**
     * For the document identified by [documentUri], removes downloaded audio for [voice].
     * Returns true if there was any audio.
     * This will throw an error if the item is currently being downloaded.
     *
     * See also [removeLibraryItemFromOffline] for removing the entire item from the offline cache.
     */
    fun removeLibraryItemDownloadedAudioForVoice(
        documentUri: SpeechifyURI,
        voice: VoiceSpec.VoiceSpecForMediaVoiceFromAudioServer,
        callback: Callback<Boolean>,
    ) = callback.fromCoWithErrorLogging(
        sourceAreaId = "OfflineAvailabilityManager.removeLibraryItemDownloadedAudioForVoice",
    ) {
        removeResourceDownloadedForOfflineSafely(
            uri = documentUri,
            action = {
                dbService.getVoiceCacheQueries().removeDownloadedAudioForVoiceUnsafeIfConcurrent(
                    uri = documentUri,
                    voiceId = voice.getIdForDb(),
                    reclaimSpace = true,
                )
                    .successfully()
            },
        )
    }

    /**
     * This will throw an error if the item is currently being downloaded.
     */
    private suspend fun <Result> removeResourceDownloadedForOfflineSafely(
        uri: SpeechifyURI,
        action: suspend () -> Result,
    ): Result {
        val downloader = runningDownloads[uri]
        if (downloader != null) {
            throw IllegalStateException("Cannot remove item from offline while it is being downloaded.")
        }
        return action()
    }

    private suspend fun removeDownloadedAudioForAllVoicesUnsafeIfConcurrent(
        uri: SpeechifyURI,
    ): Boolean {
        val voiceCacheQueries = dbService.getVoiceCacheQueries()

        return voiceCacheQueries.transactionWithResult {
            var hasRemovedAny = false
            for (voiceDownload in voiceCacheQueries.getDownloadedAudioForItem(uri).awaitAsList()) {
                voiceCacheQueries.removeDownloadedAudioForVoiceUnsafeIfConcurrent(
                    uri = uri,
                    voiceId = voiceDownload.voiceId,
                    /**
                     * Cannot reclaim space in a transaction, we'll do it below near #ReclaimAfterTransaction
                     */
                    reclaimSpace = false,
                )
                    .also { hasRemoved ->
                        if (!hasRemovedAny) {
                            hasRemovedAny = hasRemoved
                        }
                    }
            }

            return@transactionWithResult hasRemovedAny
        }
            .also {
                /**
                 * #ReclaimAfterTransaction
                 */
                voiceCacheQueries.reclaimSpace()
            }
    }

    private suspend fun VoiceCacheQueries.removeDownloadedAudioForVoiceUnsafeIfConcurrent(
        uri: SpeechifyURI,
        voiceId: VoiceIdForDb,
        /**
         * Since we remove large amounts of binary data a vacuum is needed to reclaim the space.
         * Otherwise, the DB file will keep it's previous size.
         *
         * NOTE: This cannot be done inside a transaction, so if there is an outer transaction, it must be
         * done by the caller by calling [VoiceCacheQueries.reclaimSpace].
         */
        reclaimSpace: Boolean,
    ): Boolean {
        val voiceCacheQueries = this

        return voiceCacheQueries.transactionWithResult(
            noEnclosing = false,
        ) {
            val hasItems = voiceCacheQueries.getDownloadedAudioForItem(documentUri = uri).awaitAsFirstOrNull() != null
            voiceCacheQueries.removeSynthesisResultsForDocumentVoice(
                documentUri = uri,
                voiceId = voiceId,
            )
            voiceCacheQueries.removeSentenceIndexForDocumentVoice(
                documentUri = uri,
                voiceId = voiceId,
            )
            voiceCacheQueries.removeDownloadedAudioForDocumentVoice(
                documentUri = uri,
                voiceId = voiceId,
            )
            hasItems
        }.also {
            if (reclaimSpace) {
                voiceCacheQueries.reclaimSpace()
            }
        }
    }

    /**
     * Returns a list of [GoogleCloudStorageUriFileId] pointing at all files that need to be made available
     * for the supplied library item to be listenable to.
     */
    private suspend fun getRequiredFiles(
        libraryItem: LibraryItem.Content,
        /**
         * For docs on result behavior depending on this value, see [DataSource.CACHE].
         */
        shouldQueryCacheOnly: Boolean,
    ): List<GoogleCloudStorageUriFileId> {
        val dataSource = if (shouldQueryCacheOnly) DataSource.CACHE else DataSource.SERVER
        return when (libraryItem.contentType) {
            ContentType.PDF, ContentType.HTML, ContentType.TXT, ContentType.EPUB -> {
                if (libraryItem.sourceStoredUrl != null) {
                    // For most content we only need to fetch the principal resource.
                    listOf(clientConfig.googleCloudStorageUriFileIdFromUrl(libraryItem.sourceStoredUrl))
                } else {
                    // TODO: https://linear.app/speechify-inc/issue/PLT-2735/support-legacy-library-items
                    throw UnsupportedContentException("Legacy content is not supported yet.")
                }
            }

            ContentType.SPEECHIFY_BOOK -> {
                emptyList()
            }

            ContentType.SCAN -> {
                // We check if the document has scanned pages.
                val rawLibraryItem = (
                    firestoreService.coGetDocument(
                        path = getLibraryItemPath(libraryItem.uri.id),
                        dataSource = dataSource,
                    )
                        .orThrow() as FirebaseFirestoreDocumentSnapshot.Exists
                    ).value<FirebaseLibraryItem>().orThrow()

                // We do a try-catch here in case we're checking against the cache which can end up throwing a
                // ResourceNotFound error. In which case we return null here. If other types of error, we log the error
                // and return an emptyList.
                try {
                    if (rawLibraryItem.scannedBookFields != null) {
                        rawLibraryItem.scannedBookFields.pageOrdering.mapNotNull { orderingEntry ->
                            val scannedPageSnapshot = firestoreService.coGetDocument(
                                path = getScannedPagePath(libraryItem.uri.id, orderingEntry.pageId),
                                dataSource = dataSource,
                            ).toNullSuccessIfResourceNotFound().orThrow()

                            when (scannedPageSnapshot) {
                                null -> null
                                else -> {
                                    val scannedPage = (scannedPageSnapshot as FirebaseFirestoreDocumentSnapshot.Exists)
                                        .value<PlatformScannedBookService.FirestoreScannedBookPageModel>()
                                        .orThrow()
                                    /** TODO according to [com.speechify.client.internal.services.scannedbook.PlatformScannedBookService.FirestoreScannedBookPageModel.imageUrlOrNull]
                                     *   `null` here only happens for PDF fallbacks. TODO Make the code reflect that (raise an error, or at least log one?) */
                                    if (scannedPage.imageUrlOrNull == null) {
                                        return@mapNotNull null
                                    }
                                    clientConfig.googleCloudStorageUriFileIdFromUrl(scannedPage.imageUrlOrNull)
                                }
                            }
                        }
                    } else {
                        emptyList()
                    }
                } catch (e: Throwable) {
                    Log.e(
                        DiagnosticEvent(
                            message = e.message,
                            nativeError = e,
                            sourceAreaId = "OfflineAvailabilityManager.getRequiredFiles",
                        ),
                    )
                    emptyList()
                }
            }

            ContentType.DOCX -> {
                throw UnsupportedContentException("DOCX content is not supported yet.")
            }
        }
    }

    private suspend fun getAllRequiredChildrenRecursively(
        requiredItemResource: LibraryItemResource,
        /**
         * For docs on result behavior depending on this value, see [DataSource.CACHE].
         */

        shouldQueryCacheOnly: Boolean,
    ):
        Flow<LibraryItemResource> {
        val subItems = requiredItemResource.getSubItems(shouldQueryCacheOnly)
        return if (subItems.isEmpty()) {
            flowOf(requiredItemResource)
        } else {
            merge(
                flowOf(requiredItemResource),
                subItems.asFlow().flatMapMerge {
                    getAllRequiredChildrenRecursively(it, shouldQueryCacheOnly)
                },
            )
        }
    }

    private fun requiredItemsFlow(
        uriToDownload: SpeechifyURI,
        /**
         * For docs on result behavior depending on this value, see [DataSource.CACHE].
         */
        shouldQueryCacheOnly: Boolean,
    ) = flow {
        val dataSource = if (shouldQueryCacheOnly) DataSource.CACHE else DataSource.SERVER
        val libraryItem = libraryServiceDelegate.getContentLibraryItemFromFirestore(
            uriToDownload.id,
            dataSource,
        )

        when (libraryItem.contentType) {
            ContentType.SPEECHIFY_BOOK -> {
                emit(LibraryItemResource.EncryptedLibraryItemResource(libraryItem.uri.id, ebookService))
            }
            else -> {
                val firebaseStorageItems = getRequiredFiles(libraryItem, shouldQueryCacheOnly)
                firebaseStorageItems.forEach {
                    emit(
                        LibraryItemResource.FirebaseStorageLibraryItemResource(
                            it,
                            this@OfflineAvailabilityManager.scope,
                            firebaseStorageCache,
                        ),
                    )
                }
                val subCollectionsRequiringSubItems = arrayOf(
                    getLegacyPagesCollectionRef(uriToDownload.id),
                    getLocationsCollectionRef(uriToDownload.id),
                    getScannedPagesCollectionRef(uriToDownload.id),
                )

                subCollectionsRequiringSubItems.forEach {
                    emit(LibraryItemResource.FirebaseCollectionLibraryItemResource(it, firestoreService))
                }

                // We check if the document has scanned pages.
                val rawLibraryItem = (
                    firestoreService.coGetDocument(
                        path = getLibraryItemPath(uriToDownload.id),
                        dataSource = dataSource,
                    )
                        .orThrow() as FirebaseFirestoreDocumentSnapshot.Exists
                    ).value<FirebaseLibraryItem>().orThrow()
                if (rawLibraryItem.scannedBookFields != null) {
                    // In which case we explicitly require those pages be cached.
                    rawLibraryItem.scannedBookFields.pageOrdering.forEach { orderingEntry ->
                        if (orderingEntry.pageId == PAGE_OF_ORIGINAL_DOC__ID_PLACEHOLDER) {
                            return@forEach
                        }

                        emit(
                            LibraryItemResource.FirebaseObjectLibraryItemResource(
                                getScannedPagePath(uriToDownload.id, orderingEntry.pageId),
                                required = true,
                                firestoreService,
                            ),
                        )
                    }
                }

                if (rawLibraryItem.hasPageEdits) {
                    // We try to fetch the edits unless they are explicitly marked as non-existent.
                    emit(
                        LibraryItemResource.FirebaseObjectLibraryItemResource(
                            getEditsPath(libraryItem.uri.id),
                            required = true,
                            firestoreService,
                        ),
                    )
                }
            }
        }
    }.flatMapMerge {
        // This makes sure that all sub items are also part of the flow.
        getAllRequiredChildrenRecursively(it, shouldQueryCacheOnly)
    }

    /** This flow encapsulates the download of all the items' components required for offline availability, along
     * with the requested audio (`null` [audioDownloadOptions] signifies request for no audio). */
    internal fun downloadItemsFlow(
        uriToDownload: SpeechifyURI,
        audioDownloadOptions: AudioDownloadOptions?,
        audioDownloadRequestedStartCursor: ContentCursor?,
        offlineAvailabilityStatusAtStart: OfflineAvailabilityStatusAtRest,
    ): Flow<ProgressFraction> = channelFlowWithCapacityRequired(
        /**
         * `CONFLATED` capacity so as not to restrict the download, and to save on memory (we just need the last
         * value of progress).
         */
        capacityOrNullIfUnlimited = Channel.CONFLATED,
    ) {
        val telemetry = currentTelemetryEvent()

        var voiceDownloadProgressFraction = 0.0
        var resourceProgressFraction = 0.0

        // If there is a voice download we count the main items from 0-1 and the voice download from 0-1.
        // To have the progress bar go from 0-1 we need to divide by 2.

        val offlineAvailabilityWeight = if (offlineAvailabilityStatusAtStart is OfflineAvailabilityStatus.AVAILABLE) {
            /**
             * If the item is already available for download, we don't want the progress bar to jump half-way
             * immediately so reduce it to ~5% of bar (it's not quite this many percent, as the weights don't add up to more than 1.0).
             * This is also #PreventZeroDivision below.
             * TODO make the progress based on time total and remaining (even if an estimate) in each part of this
             *  operation, and then these weights will not be needed
             */
            0.05
        } else {
            1.0
        }

        val audioDownloadWeight = if (audioDownloadOptions == null) {
            0.0
        } else {
            1.0
        }

        val weightsSum = offlineAvailabilityWeight + audioDownloadWeight

        // TODO_NICETOHAVE: Clean up when we have richer progress reports, this works as in it goes from 0-1 but
        // for product teams, especially if they want a seperate screen to show the voice download having only the voice
        // progress reported would be useful. Not urgent since changing the reported values here is easy.
        suspend fun sendProgress() {
            if (weightsSum == 0.0) {
                /** We prevent this from happening above at #PreventZeroDivision, but let's cover it in case of
                 * human error while changing code.
                 * Pick 0.0, as this may be called a number of times, so we don't report 100%.
                 */
                Log.e(
                    DiagnosticEvent(
                        message = "'weightsSum' was 0. Report to SDK maintainers.",
                        sourceAreaId = "OfflineAvailabilityManager.downloadItemsFlow.sendProgress",
                    ),
                )
                return send(0.0)
            }
            val totalProgress = (
                (resourceProgressFraction * offlineAvailabilityWeight) +
                    (voiceDownloadProgressFraction * audioDownloadWeight)
                ) /
                weightsSum

            send(totalProgress)
        }

        val bundlerDependencies: BundlerDependencies
        val bundle: ContentBundle
        telemetry.addMeasurement(
            measurementName = "makeLibraryItemDocumentAvailableOffline",
        ) {
            makeLibraryItemDocumentAvailableOfflineColdFlow(
                uriToDownload = uriToDownload,
            )
                .collect { currentResourceProgressFraction ->
                    resourceProgressFraction = currentResourceProgressFraction
                    sendProgress()
                }

            // We also do a bundling operation to doubly ensure all required files are available.

            bundlerDependencies = createBundlerDependencies.invoke(
                /* bundlerFactoryConfig = */
                getBundlerFactoryConfigForAudioDownload(
                    audioDownloadOptions,
                    voicePreferences = VoicePreferences(
                        staticDefaultFreeOfflineVoice = audioDownloadOptions?.voice
                            ?: SpeechifyVoiceSpecifications.ALEX,
                    ),
                ),
            )
            bundle = bundlerDependencies.speechifyContentBundler
                .coCreateBundleForResource(
                    uri = uriToDownload,
                    receiveLibraryItem = { libraryItem ->
                        telemetry?.addPropertiesNonNull(
                            LibraryItemContentTypeTelemetryProp.toPairWithValOrNull(libraryItem.contentType),
                        )
                    },
                    bundleMetadata = null,
                )
                .orThrow()
        }

        // If requested now we can start downloading the HD voice for this file with the already bundled document.
        if (audioDownloadOptions != null) {
            telemetry.addMeasurement(
                measurementName = "makeLibraryItemAudioAvailableOffline",
            ) {
                telemetry?.addProperties(
                    "voice" to audioDownloadOptions.voice.idQualified,
                    "audioMediaFormat" to audioDownloadOptions.audioMediaFormat,
                )
                makeLibraryItemAudioAvailableOfflineColdFlow(
                    uriToDownload = uriToDownload,
                    audioDownloadOptions = audioDownloadOptions,
                    audioDownloadRequestedStartCursor = audioDownloadRequestedStartCursor,
                    contentBundle = bundle,
                    voiceFactory = bundlerDependencies.voiceFactory,
                )
                    .collect {
                        voiceDownloadProgressFraction = it
                        sendProgress()
                    }
            }
        }

        bundle.destroy()
    }.distinctUntilChanged()

    /**
     * Logic for downloading the 'document' part of the offline availability (as opposed to e.g. audio, which is
     * encapsulated in [makeLibraryItemAudioAvailableOfflineColdFlow]).
     */
    private fun makeLibraryItemDocumentAvailableOfflineColdFlow(
        uriToDownload: SpeechifyURI,
    ): Flow<ProgressFraction> =
        channelFlowWithCapacityRequired(
            /**
             * `CONFLATED` capacity so as not to restrict the download, and to save on memory (we just need the last
             * value of progress).
             */
            capacityOrNullIfUnlimited = Channel.CONFLATED,
        ) {
            val requiredItems = AtomicInt(0)
            val downloadedItems = AtomicInt(0)
            val processedItems = mutableListOf<LibraryItemResource>()

            suspend fun sendProgress() {
                val resourceProgressFraction = (downloadedItems.get().toDouble() / requiredItems.get().toDouble())

                send(resourceProgressFraction)
            }

            suspend fun trackItemBeingDownloaded(item: LibraryItemResource) {
                requiredItems.increment()
                processedItems.add(item)
                sendProgress()
            }

            suspend fun markItemDownloaded() {
                downloadedItems.increment()
                sendProgress()
            }

            try {
                val maxConcurrencyGuardingSemaphore = Semaphore(MAXIMUM_PARALLEL_RESOURCE_DOWNLOADS)
                requiredItemsFlow(uriToDownload, shouldQueryCacheOnly = false)
                    .onEach {
                        trackItemBeingDownloaded(it)
                    }
                    // We want to make sure we collected all required items to prevent weird bouncing of the progress.
                    .toList()
                    .asFlow()
                    .map {
                        this@OfflineAvailabilityManager.scope.async {
                            supervisorScope {
                                maxConcurrencyGuardingSemaphore.withPermit {
                                    it.download()
                                    markItemDownloaded()
                                    it
                                }
                            }
                        }
                    }.toList()
                    .awaitAll()
            } catch (e: CancellationException) {
                // We only run clean up if the user explicitly cancels the download.
                // Otherwise, we keep leftover files around so that we can resume the download.
                processedItems.forEach {
                    it.abortOrUndo()
                }

                throw e
            }
        }

    /**
     * Logic for downloading the 'audio' part of the offline availability (as opposed to e.g. audio, which is
     * encapsulated in [makeLibraryItemDocumentAvailableOfflineColdFlow]).
     */
    private suspend fun makeLibraryItemAudioAvailableOfflineColdFlow(
        uriToDownload: SpeechifyURI,
        audioDownloadOptions: AudioDownloadOptions,
        audioDownloadRequestedStartCursor: ContentCursor?,
        contentBundle: ContentBundle,
        voiceFactory: VoiceFactory,
    ): Flow<ProgressFraction> = flow {
        // If requested now we can start downloading the HD voice for this file with the already bundled document.
        val stats = contentBundle.contentIndex.coGetStats().orThrow()

        currentTelemetryEvent()?.addProperties(
            "estimatedWordCount" to stats.estimatedWordCount.count,
        )

        if (stats.estimatedWordCount.count > clientConfig.options.audioDownloadOptions.maxDocumentWordCount) {
            throw ErrorOfExceedingAudioDownloadMaxDocumentWordCount(
                maxDocumentWordCount = clientConfig.options.audioDownloadOptions.maxDocumentWordCount,
                actualDocumentWordCount = stats.estimatedWordCount.count,
            )
                .apply {
                    addTagProperties(
                        /* Ads this property to make sure the event can be found in logs, even if the exception gets
                         * wrapped by another exception.
                         * To see the errors, [use filter of `"is.ErrorOfExceedingAudioDownloadMaxDocumentWordCount"`](https://cloudlogging.app.goo.gl/wSYXPiAh9kTZ7aeE8)
                         *
                         * Implemented as per [this requirement](https://speechifyworkspace.slack.com/archives/C03JLSQMBEJ/p1697213971457979?thread_ts=1697117922.609459&cid=C03JLSQMBEJ)
                         */
                        "ErrorOfExceedingAudioDownloadMaxDocumentWordCount",
                    )
                }
        }

        // We don't especially need to handle resuming a download since the Utterance flow will naturally be served
        // from the persistent cache skipping over already downloaded content quickly.
        val startCursor = audioDownloadRequestedStartCursor ?: contentBundle.speechView.start
        val endCursor = contentBundle.speechView.end

        val startProgressFraction = contentBundle.contentIndex.coGetProgressFromCursor(startCursor).orThrow()
        val endProgressFraction = contentBundle.contentIndex.coGetProgressFromCursor(endCursor).orThrow()

        val voiceSpec = audioDownloadOptions.voice
        val voice = (
            voiceFactory.createVoice(
                spec = voiceSpec,
                speechSynthesisConfig = object : SpeechSynthesisConfig {
                    override val audioConfig: AudioConfig =
                        AudioConfig(audioDownloadOptions.audioMediaFormat)

                    @JsExport.Ignore
                    override val textToSpeechAudioCacheInMemoryCapacityInCharsOfTextFlow: StateFlow<Int>
                        get() =
                            MutableStateFlow(
                                /* `0` because there's no point in caching during download - there will be only a single pass through the content. */
                                value = 0,
                            )

                    @JsExport.Ignore
                    override val textToSpeechIncludePrecedingContextForAudioSynthesis: StateFlow<Boolean>
                        get() = clientConfig.options.defaultBundlerFactoryConfig
                            .listeningBundlerConfig
                            .textToSpeechIncludePrecedingContextForAudioSynthesis
                },
            ).orThrow() as? MediaVoice
            )?.withPersistentCachingEnabled(
            downloadedAudioForItem = DownloadedAudioForItem(
                documentUri = uriToDownload,
                voiceId = voiceSpec.getIdForDb(),
                downloadOptions = audioDownloadOptions,
                sdkVersionAtCreation = SpeechifyVersions.SDK_VERSION,
                downloadProgress = 0.0,
                startCursor = startCursor,
                startProgressFraction = startProgressFraction,
                endCursor = startCursor,
                endProgressFraction = startProgressFraction,
                hasGapsInAudio = DbBoolean(false),
            ),
            onGapInPersistentAudioDetected = {
                throw IllegalStateException("A gap should not be able to be detected when downloading audio.")
            },
            voiceOfPreferenceForOfflineProvider = object : VoiceOfPreferenceForOfflineProvider {
                @JsExport.Ignore
                override suspend fun getPreferredOfflineVoice(): Voice =
                    throw IllegalStateException("Downloading should never fall back to local voice.")
            },
            offlineModeStatusFlowProvider =
            OfflineModeStatusProvider.FlowProviderWithStaticValue(
                // For downloading the local voice fallback is not desired so we're always online.
                OfflineModeStatusProvider.OfflineModeStatus.ONLINE,
            ),
            dbService = dbService,
        )
            ?: throw NullPointerException("Couldn't obtain voice for ${audioDownloadOptions.voice}")

        // In case the download was previously started and is resumed here progress restarts from 0 but will quickly
        // return to the previous value since it's just scanning through the persistent cache which is quick.
        dbService.getVoiceCacheQueries().insertDownloadedAudioForItem(
            documentUri = uriToDownload,
            voiceId = voiceSpec.getIdForDb(),
            downloadOptions = audioDownloadOptions,
            sdkVersionAtCreation = SpeechifyVersions.SDK_VERSION,
            downloadProgress = 0.0,
            startCursor = startCursor,
            startProgressFraction = startProgressFraction,
            endCursor = startCursor,
            endProgressFraction = endProgressFraction,
            hasGapsInAudio = DbBoolean(false),
        )

        var currentProgressFraction = startProgressFraction

        currentTelemetryEvent()?.addProperties(
            "maxConcurrencyOnAudioServer" to clientConfig.options.audioDownloadOptions.maxConcurrencyOnAudioServer,
        )
        UtteranceFlowProviderFromSpeechFlow
            .getDownloadFlow(
                speechFlow = contentBundle.speechView.getFullSentencesFlowFromSentenceContaining(startCursor),
                voice = voice,
                preSpeechTransform = audioDownloadOptions
                    .contentTransformOptions
                    .toContentBundlerOptions()
                    .preSpeechTransformOptions,
                maxConcurrencyOnAudioServerOption = clientConfig.options.audioDownloadOptions,
            )
            .onEach { currentProgressCursor ->
                // The download flow guarantees that cursors never go backwards,
                // so getting the download progress is easy.
                currentProgressFraction = contentBundle.contentIndex.coGetProgressFromCursor(currentProgressCursor)
                    .orThrow()
                // We want the progress to be 0-1 for the area that is being downloaded.
                val voiceDownloadProgressFraction = (currentProgressFraction - startProgressFraction) /
                    (endProgressFraction - startProgressFraction)
                dbService.getVoiceCacheQueries()
                    .updateDownloadedAudioForItemDownloadProgress(
                        downloadProgress = voiceDownloadProgressFraction,
                        // The end progress is the global fraction through the whole document.
                        endProgressFraction = currentProgressFraction,
                        endCursor = currentProgressCursor,
                        documentUri = uriToDownload,
                        voiceId = voiceSpec.getIdForDb(),
                    )

                emit(voiceDownloadProgressFraction)
            }
            .takeWhile { currentProgressCursor ->
                currentProgressCursor.isBeforeOrAt(endCursor)
            }
            .onCompletionSuccessfully {
                /** The [DownloadedAudioForItem.downloadProgress] isn't always `1.0` from the math above, so need
                 * to make it so at the end.
                 */
                dbService.getVoiceCacheQueries()
                    .updateDownloadedAudioForItemDownloadProgress(
                        downloadProgress = 1.0,
                        // The end progress is the global fraction through the whole document.
                        endProgressFraction = currentProgressFraction,
                        endCursor = endCursor,
                        documentUri = uriToDownload,
                        voiceId = voiceSpec.getIdForDb(),
                    )
            }
            .collect()
    }

    private fun getBundlerFactoryConfigForAudioDownload(
        audioDownloadOptions: AudioDownloadOptions?,
        voicePreferences: VoicePreferences,
    ): BundlerFactoryConfig =
        BundlerFactoryConfig(
            ClassicReadingBundlerConfig(ClassicReadingBundlerOptions()),
            BookReadingBundlerConfig(
                BookReadingBundlerOptions(
                    contentSortingStrategy = audioDownloadOptions?.contentSortingStrategy
                        ?: ContentSortingStrategy.None,
                ),
            ),
            EmbeddedReadingBundlerConfig(EmbeddedReadingBundlerOptions()),
            ListeningBundlerConfig(
                audioConfig = AudioConfig(audioDownloadOptions?.audioMediaFormat ?: AudioMediaFormat.MP3),
                defaultSpeedWPM = 220,
                allVoices = emptyArray(),
                voicePreferences = voicePreferences,
                options = ListeningBundlerOptions(),
            ),
            ContentBundlerConfig(
                audioDownloadOptions?.contentTransformOptions?.toContentBundlerOptions()
                    ?: ContentBundlerOptions(),
            ),
        )

    /**
     * Allows to get a list of all items in the library with downloaded audio (immediately queries for the lists and
     * returns as the first call to [CallbackFlowSourceFromCollectWithResult.collect]'s `collectOne` callback as soon as
     * the query returns), and observe changes to that list (in the following calls to the callback).
     */
    val audioDownloadsForAllItems:
        CallbackFlowSourceFromCollectWithResult<Array<LibraryItemWithAudioDownload>> by lazy {
            flowFromAsyncGetFlow(
                getFlow = {
                    dbService.getVoiceCacheQueries()
                        .getDownloadedAudioForItems()
                        .asFlowOfListWithCurrentAndChanges()
                },
            )
                .map {
                    it.map { item ->
                        LibraryItemWithAudioDownload(
                            resourceUri = item.documentUri,
                            audioDownload = item.toVoiceAudioDownloadInfo(),
                        )
                    }
                }
                .map { it.toTypedArray() }
                .toCallbackFlowSourceFromCollectWithResult()
        }

    internal suspend fun observeDownloadedAudioForUri(uri: SpeechifyURI): Flow<List<DownloadedAudioForItem>> =
        dbService.getVoiceCacheQueries()
            .getDownloadedAudioForItem(documentUri = uri)
            .asFlowOfListWithCurrentAndChanges()

    internal suspend fun observeItemHavingGapsInAudio(uri: SpeechifyURI): Flow<Boolean> =
        dbService.getVoiceCacheQueries()
            .getDownloadedAudioForItem(documentUri = uri)
            .asFlowOfListWithCurrentAndChanges()
            .map {
                it.any { downloadedAudioForItem -> downloadedAudioForItem.hasGapsInAudio.value }
            }
}

/**
 * This exception indicates that the given library item is not yet supported for offline downloading.
 */
internal class UnsupportedContentException(message: String?) : IllegalArgumentException(message)

@JsExport
@Serializable
sealed class OfflineAvailabilityStatus {

    object AVAILABLE : OfflineAvailabilityStatus(), OfflineAvailabilityStatusAtRest {
        override fun toString(): String {
            return "AVAILABLE"
        }
    }

    @Suppress("ClassName")
    object NOT_AVAILABLE : OfflineAvailabilityStatus(), OfflineAvailabilityStatusAtRest {
        override fun toString(): String {
            return "NOT_AVAILABLE"
        }
    }

    object DOWNLOADING : OfflineAvailabilityStatus() {
        override fun toString(): String {
            return "DOWNLOADING"
        }
    }
}

@JsExport
internal sealed interface OfflineAvailabilityStatusAtRest

@JsExport
class AudioDownloadOptions(
    /** The voice used for the download.
     *
     * - For specifying for download: Note that a number of classes implement this type, including [VoiceSpec.CVLVoiceSpec].
     * - For consuming to play-back using this voice: Note that this will not reinstate the original type specified.
     *   If it's required to establish an instance of the original class (e.g. to have the latest values), it can be
     *   achieved by finding using [com.speechify.client.api.audio.VoiceSpecOfAvailableVoice.idQualified] among the
     *   SDK-consumer's list of available voices.
     */
    val voice: VoiceSpec.VoiceSpecForMediaVoiceFromAudioServerPersisted,
    val contentTransformOptions: StaticContentTransformOptions,
    val ocrFallbackStrategy: OcrFallbackStrategy,
    val contentSortingStrategy: ContentSortingStrategy,
    val audioMediaFormat: AudioMediaFormat,
    /**
     * Set a default value to `false` to fix a regression on audio download feature caused by ml-solution PR.
     * Reported here [https://speechifyworkspace.slack.com/archives/C03LS9C1SUV/p1702308062693989].
     */
    val mlParsingMode: MLParsingMode = MLParsingMode.ForceDisable,
)

@JsExport
@Serializable(with = StaticContentTransformOptionsSerializer::class)
class StaticContentTransformOptions(
    val shouldSkipHeaders: Boolean,
    val shouldSkipFooters: Boolean,
    val shouldSkipFootnotes: Boolean,
    val shouldSkipCaptions: Boolean,
    val preSpeechTransformOptions: PreSpeechTransformOptions,
) {
    internal fun toContentBundlerOptions(): ContentBundlerOptions {
        val contentBundlerOptions = ContentBundlerOptions()
        contentBundlerOptions.shouldSkipHeaders = shouldSkipHeaders
        contentBundlerOptions.shouldSkipFooters = shouldSkipFooters
        contentBundlerOptions.shouldSkipFootnotes = shouldSkipFootnotes
        contentBundlerOptions.preSpeechTransformOptions = preSpeechTransformOptions
        contentBundlerOptions.shouldSkipCaptions = shouldSkipCaptions

        return contentBundlerOptions
    }
}

internal object StaticContentTransformOptionsSerializer : KSerializer<StaticContentTransformOptions> {
    @OptIn(InternalSerializationApi::class)
    private val serializer = SerializableStaticContentTransformOptionsSerializer::class.serializer()
    override val descriptor: SerialDescriptor =
        SerialDescriptor("StaticContentTransformOptionsSerializer", serializer.descriptor)

    override fun deserialize(decoder: Decoder): StaticContentTransformOptions {
        val serializableStaticContentTransformOptionsSerializer = decoder.decodeSerializableValue(serializer)
        val preSpeechTransformOptions = PreSpeechTransformOptions(
            serializableStaticContentTransformOptionsSerializer.contentSkippingSettings.shouldSkipBraces,
            serializableStaticContentTransformOptionsSerializer.contentSkippingSettings.shouldSkipCitations,
            serializableStaticContentTransformOptionsSerializer.contentSkippingSettings.shouldSkipParentheses,
            serializableStaticContentTransformOptionsSerializer.contentSkippingSettings.shouldSkipBrackets,
            serializableStaticContentTransformOptionsSerializer.contentSkippingSettings.shouldSkipUrls,
            null,
        )
        return StaticContentTransformOptions(
            serializableStaticContentTransformOptionsSerializer.shouldSkipHeaders,
            serializableStaticContentTransformOptionsSerializer.shouldSkipFooters,
            serializableStaticContentTransformOptionsSerializer.shouldSkipFootnotes,
            serializableStaticContentTransformOptionsSerializer.shouldSkipCaptions ?: false,
            preSpeechTransformOptions,
        )
    }

    override fun serialize(encoder: Encoder, value: StaticContentTransformOptions) {
        if (value.preSpeechTransformOptions.currentSkippingSettings.value.customSentenceTransformer != null) {
            throw IllegalArgumentException(
                "Cannot serialize StaticContentTransformOptions with custom PreSpeechTransformOptions.",
            )
        }
        serializer.serialize(
            encoder,
            SerializableStaticContentTransformOptionsSerializer(
                value.shouldSkipHeaders,
                value.shouldSkipFooters,
                value.shouldSkipFootnotes,
                value.shouldSkipCaptions,
                value.preSpeechTransformOptions.currentSkippingSettings.value,
            ),
        )
    }

    @Serializable
    private class SerializableStaticContentTransformOptionsSerializer(
        val shouldSkipHeaders: Boolean,
        val shouldSkipFooters: Boolean,
        val shouldSkipFootnotes: Boolean,
        val shouldSkipCaptions: Boolean = false,
        val contentSkippingSettings: PreSpeechTransformOptions.ContentSkippingSettings,
    )
}

/**
 * Internal type for read-only access to the option by the SDK. Look for usages to see what in SDK needs this value.
 */
internal interface MaxDocumentWordCountOptionReadOnly {
    /**
     * The size of a document (inclusive, in number-of-words) that the client will only allow downloading.
     * If not changed and not specified in [ClientScopeAudioDownloadOptions]'s constructor, it will be [ClientScopeAudioDownloadOptions.DEFAULT_MAX_DOCUMENT_WORD_COUNT].
     */
    val maxDocumentWordCount: Int
}

/**
 * Internal type for read-only access to the option by the SDK. Look for usages to see what in SDK needs this value.
 */
internal interface MaxConcurrencyOnAudioServerOptionReadOnly {
    /**
     * Sets maximum concurrent requests to AudioServer
     * If not changed and not specified in [ClientScopeAudioDownloadOptions]'s constructor, it will be [ClientScopeAudioDownloadOptions.DEFAULT_MAX_CONCURRENCY_ON_AUDIO_SERVER].
     */
    val maxConcurrencyOnAudioServer: Int
}

/**
 * Those audio-download options that apply to all downloads performed by a [com.speechify.client.api.SpeechifyClient],
 * (in contrast to the more narrowed scoped ones, e.g. the per-download [AudioDownloadOptions]).
 */
@JsExport
class ClientScopeAudioDownloadOptions(
    /**
     * When `null`, it will be [DEFAULT_MAX_DOCUMENT_WORD_COUNT].
     * See [MaxDocumentWordCountOptionReadOnly.maxDocumentWordCount] for semantics.
     */
    maxDocumentWordCountDefaultOverride: Int? = null,
    /**
     * When `null`, it will be [DEFAULT_MAX_CONCURRENCY_ON_AUDIO_SERVER].
     * See [MaxConcurrencyOnAudioServerOptionReadOnly.maxConcurrencyOnAudioServer] for semantics.
     */
    maxConcurrencyOnAudioServerDefaultOverride: Int? = null,
) : MaxDocumentWordCountOptionReadOnly,
    MaxConcurrencyOnAudioServerOptionReadOnly {

    override var maxDocumentWordCount: Int =
        maxDocumentWordCountDefaultOverride ?: DEFAULT_MAX_DOCUMENT_WORD_COUNT

    override var maxConcurrencyOnAudioServer: Int =
        maxConcurrencyOnAudioServerDefaultOverride ?: DEFAULT_MAX_CONCURRENCY_ON_AUDIO_SERVER

    companion object {
        /**
         * The default value for [maxDocumentWordCount].
         */
        const val DEFAULT_MAX_DOCUMENT_WORD_COUNT: Int =
            /** 100K was decided [here](https://speechifyworkspace.slack.com/archives/C03JLSQMBEJ/p1697127943680389?thread_ts=1697117922.609459&cid=C03JLSQMBEJ) */
            100_000

        /**
         * The default value for [maxConcurrencyOnAudioServer].
         */
        const val DEFAULT_MAX_CONCURRENCY_ON_AUDIO_SERVER: Int =
            /** `1` (no concurrency) is the initial safe value, subject to explorations in QA and Production via remote-config (as per [here](https://speechifyworkspace.slack.com/archives/C05H20U16SX/p1697641903011829)) */
            1
    }
}

/**
 * Signifies the situation where the document word count submitted to [OfflineAvailabilityManager.makeLibraryItemAvailableOffline]
 * exceeded the [ClientScopeAudioDownloadOptions.maxDocumentWordCount].
 */
@JsExport
class ErrorOfExceedingAudioDownloadMaxDocumentWordCount internal constructor(
    val maxDocumentWordCount: Int,
    val actualDocumentWordCount: Int,
) : IllegalArgumentException(
    "The document {actualDocumentWordCount} exceeds the maximum of $maxDocumentWordCount.",
) {
    init {
        this.addCustomProperty(
            key = "actualDocumentWordCount",
            value = actualDocumentWordCount,
        )
    }
}

@JsExport
class LibraryItemWithAudioDownload internal constructor(
    val resourceUri: SpeechifyURI,
    val audioDownload: VoiceAudioDownloadInfo,
)

@JsExport
class VoiceAudioDownloadInfo internal constructor(
    val downloadProgress: ProgressFraction,
    val lastDownloadOptions: AudioDownloadOptions,
) {
    /**
     * A shorthand for `downloadProgress == 1.0`.
     */
    val isDownloadComplete: Boolean get() = downloadProgress == 1.0

    val voice: VoiceSpec.VoiceSpecForMediaVoiceFromAudioServerPersisted
        get() =
            lastDownloadOptions.voice
}

internal fun DownloadedAudioForItem.toVoiceAudioDownloadInfo(): VoiceAudioDownloadInfo =
    VoiceAudioDownloadInfo(
        downloadProgress = downloadProgress,
        lastDownloadOptions = downloadOptions,
    )
