package com.speechify.client.api.services.importing

import app.cash.sqldelight.async.coroutines.awaitAsList
import app.cash.sqldelight.async.coroutines.awaitAsOneOrNull
import app.cash.sqldelight.coroutines.asFlow
import com.speechify.client.api.ClientConfig
import com.speechify.client.api.SpeechifyContentId
import com.speechify.client.api.SpeechifyEntityType
import com.speechify.client.api.SpeechifyURI
import com.speechify.client.api.adapters.blobstorage.BlobStorageAdapter
import com.speechify.client.api.adapters.blobstorage.BlobStorageKey
import com.speechify.client.api.adapters.events.EventsTrackerAdapter
import com.speechify.client.api.adapters.events.ImportedItemDataForReporting
import com.speechify.client.api.adapters.events.reportEventsFromImportServiceFlows
import com.speechify.client.api.adapters.firebase.UserId
import com.speechify.client.api.adapters.html.DOMElement
import com.speechify.client.api.adapters.html.serializeHtmlToFile
import com.speechify.client.api.adapters.ocr.OCRAdapter
import com.speechify.client.api.adapters.ocr.OCRResult
import com.speechify.client.api.adapters.ocr.toOcrResult
import com.speechify.client.api.adapters.pdf.PDFAdapterFactory
import com.speechify.client.api.diagnostics.Log
import com.speechify.client.api.services.file.models.SpeechifyFile
import com.speechify.client.api.services.importing.models.ImportOptions
import com.speechify.client.api.services.importing.models.ImportStartChoice
import com.speechify.client.api.services.importing.models.ImportableContentMetadata
import com.speechify.client.api.services.importing.models.addImportableContentMetadataProperties
import com.speechify.client.api.services.library.LibraryServiceDelegate
import com.speechify.client.api.services.library.models.ContentType
import com.speechify.client.api.services.library.models.FolderQuery
import com.speechify.client.api.services.library.models.LibraryItem
import com.speechify.client.api.services.library.models.RecordType
import com.speechify.client.api.services.library.models.UpdateLibraryItemParams
import com.speechify.client.api.services.scannedbook.models.LazyOCRFiles
import com.speechify.client.api.services.scannedbook.models.LazyOCRFilesFromList
import com.speechify.client.api.services.scannedbook.models.OCRFile
import com.speechify.client.api.services.scannedbook.models.allFilesAsFlow
import com.speechify.client.api.services.scannedbook.models.flowOfOcrableResultWhenAvailable
import com.speechify.client.api.services.scannedbook.models.getAllOCRFilesAsFlow
import com.speechify.client.api.services.scannedbook.models.numberOfFiles
import com.speechify.client.api.telemetry.IsItemImportForDiagnosticsTelemetryProp
import com.speechify.client.api.telemetry.TelemetryContentType
import com.speechify.client.api.telemetry.TelemetryEventBuilder
import com.speechify.client.api.telemetry.addMeasurement
import com.speechify.client.api.telemetry.addProperties
import com.speechify.client.api.telemetry.currentTelemetryEvent
import com.speechify.client.api.telemetry.flowWithTelemetryOfCollectingAllItems
import com.speechify.client.api.telemetry.withTelemetry
import com.speechify.client.api.util.Callback
import com.speechify.client.api.util.MimeType
import com.speechify.client.api.util.Result
import com.speechify.client.api.util.SDKError
import com.speechify.client.api.util.SDKError.ImportProcessingError
import com.speechify.client.api.util.SDKErrorException
import com.speechify.client.api.util.ensureListenableMimeType
import com.speechify.client.api.util.fromCoWithTelemetryLoggingErrors
import com.speechify.client.api.util.io.BinaryContentReadableRandomly
import com.speechify.client.api.util.io.File
import com.speechify.client.api.util.io.toBinaryContentWithMimeTypeReadableSequentially
import com.speechify.client.api.util.io.toFailureIfNoContentType
import com.speechify.client.api.util.io.toFile
import com.speechify.client.api.util.isCausedByConnectionError
import com.speechify.client.api.util.multiShotFromFlowIn
import com.speechify.client.api.util.orThrow
import com.speechify.client.api.util.successfully
import com.speechify.client.api.util.toSDKError
import com.speechify.client.api.util.use
import com.speechify.client.bundlers.content.ContentBundle
import com.speechify.client.bundlers.content.ContentBundler
import com.speechify.client.bundlers.content.ListenableBinaryContentPayload
import com.speechify.client.bundlers.content.createBundleForParsedHtml
import com.speechify.client.bundlers.reading.BundleMetadata
import com.speechify.client.bundlers.reading.importing.BinaryContentImporterFactory
import com.speechify.client.bundlers.reading.importing.ContentImporter
import com.speechify.client.bundlers.reading.importing.ContentImporterImpl
import com.speechify.client.helpers.content.standard.html.HtmlContentLoadOptions
import com.speechify.client.helpers.features.ListeningProgress
import com.speechify.client.internal.WithScope
import com.speechify.client.internal.getGlobalScopeWithContext
import com.speechify.client.internal.http.HttpClient
import com.speechify.client.internal.launchAsync
import com.speechify.client.internal.launchTopLevel
import com.speechify.client.internal.services.auth.AuthService
import com.speechify.client.internal.services.db.DBPendingImport
import com.speechify.client.internal.services.db.DbBoolean
import com.speechify.client.internal.services.db.DbOcrFile
import com.speechify.client.internal.services.db.DbScannedPageFile
import com.speechify.client.internal.services.db.DbService
import com.speechify.client.internal.services.db.awaitAsFirstOrNull
import com.speechify.client.internal.services.file.models.SpeechifyFileImpl
import com.speechify.client.internal.services.importing.ImportableContentPayload
import com.speechify.client.internal.services.importing.PlatformImportService
import com.speechify.client.internal.services.importing.models.ImportType
import com.speechify.client.internal.services.importing.models.ItemRequiringImport
import com.speechify.client.internal.services.importing.models.LazyOCRFilesFromDbOcrFile
import com.speechify.client.internal.sqldelight.LocalListeningProgress
import com.speechify.client.internal.sqldelight.PendingImport
import com.speechify.client.internal.sqldelight.ScannedPage
import com.speechify.client.internal.time.DateTime
import com.speechify.client.internal.util.collections.maps.BlockingThreadsafeMap
import com.speechify.client.internal.util.coroutines.ChildCoroutineErrorBehavior
import com.speechify.client.internal.util.extensions.collections.flows.SharedFlowWithOwningJob
import com.speechify.client.internal.util.extensions.collections.flows.shareFinishingAlwaysReturningLastItemWithJobIn
import com.speechify.client.internal.util.extensions.coroutines.job.cancelAndJoin
import com.speechify.client.internal.util.extensions.intentSyntax.ignoreValue
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.SharingStarted
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.emptyFlow
import kotlinx.coroutines.flow.filterIsInstance
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.firstOrNull
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.shareIn
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.flow.withIndex
import kotlinx.coroutines.launch
import kotlin.coroutines.cancellation.CancellationException
import kotlin.coroutines.resume
import kotlin.coroutines.suspendCoroutine
import kotlin.js.JsExport

/**
 * The Import Service orchestrates the entire process of bringing content into your Speechify Library.
 *
 * When you import content, it will return a representation of that content as soon as possible, *even if the process
 * of uploading the content to the Speechify Platform has not completed*. This helps you reduce Time To Listen by
 * immediately proceeding to the Listening Experience, working with in-memory or locally-persisted data even as the
 * content is uploaded in the background.
 */
@JsExport
class ImportService internal constructor(
    private val platformImportService: PlatformImportService,
    private val httpClient: HttpClient,
    private val pdfAdapterFactory: PDFAdapterFactory,
    private val dbService: DbService,
    private val blobStorageAdapter: BlobStorageAdapter,
    private val ocrAdapter: OCRAdapter,
    private val authService: AuthService,
    clientConfig: ClientConfig,
    eventsTrackerAdapter: EventsTrackerAdapter,
) : WithScope() {

    private val importedItemDataForReportingFlow = MutableSharedFlow<ImportedItemDataForReporting>()
    private val importableContentMetadataFlow = MutableStateFlow<ImportableContentMetadata?>(null)

    private lateinit var contentBundler: ContentBundler
    private lateinit var libraryServiceDelegate: LibraryServiceDelegate

    internal val importScope = scope

    internal fun lateInject(contentBundler: ContentBundler, libraryServiceDelegate: LibraryServiceDelegate) {
        this.contentBundler = contentBundler
        this.libraryServiceDelegate = libraryServiceDelegate
        pendingImportsJob.start()
    }

    private val currentlyRunningImports =
        BlockingThreadsafeMap<SpeechifyURI, SharedFlowWithOwningJob<ImportProgress>>()

    private val flowOfAllImportEvents =
        MutableSharedFlow<ImportProgress>()
    internal val flowOfFinishedImportEvents = flowOfAllImportEvents.filterIsInstance(ImportProgress.Finished::class)

    /**
     * Internal for use from tests only.
     */
    internal val pendingImportsJob = scope.launch(
        start = CoroutineStart.LAZY,
    ) {
        triggerPendingImports()
    }

    init {
        reportEventsFromImportServiceFlows(
            scope = scope,
            clientConfig = clientConfig,
            importedItemDataForReportingFlow = importedItemDataForReportingFlow,
            eventsTrackerAdapter = eventsTrackerAdapter,
        )
    }

    /**
     * Import the content as this URL as a File into your Library root folder.
     * In case of Failure, returns an [ImportProcessingError] which contains the ID of the item
     * (itemId), and the error message (sdkError)
     * NOTE: any changes here should also be propagated to [importFileFromURLNoLocalDb].
     * @param url the URL of the website to be imported
     * @param options (optional) An [ImportOptions], only the parentFolderId can be designated for URL imports.
     * Title will not be set since the cloud function will overwrite it
     * @param importableContentMetadata (optional) holds import metadata, See [ImportableContentMetadata].
     * @return a File providing access to this content without waiting for full upload.
     */
    fun importFileFromURL(
        url: String,
        options: ImportOptions? = null,
        importableContentMetadata: ImportableContentMetadata,
        callback: Callback<SpeechifyFile>,
    ) = callback.fromCoWithTelemetryLoggingErrors(telemetryEventName = "ImportService.importFileFromURL") {
        importableContentMetadataFlow.emit(importableContentMetadata)
        it.addProperty("url", url)
        it.addImportableContentMetadataProperties(importableContentMetadata)
        it.addProperty(IsItemImportForDiagnosticsTelemetryProp.toPairWithVal(true))
        val updatedOptions = if (options == null) {
            ImportOptions()
        } else {
            options
        }
        updatedOptions.analyticsProperties.set(
            "contentSubType",
            importableContentMetadata.contentSubType,
        )
        importFileFromURL(null, url, updatedOptions, it)
    }

    internal suspend fun importFileFromURL(
        existingSpeechifyUri: SpeechifyURI? = null,
        url: String,
        options: ImportOptions?,
        telemetryEventBuilder: TelemetryEventBuilder? = null,
    ): Result<SpeechifyFile> {
        val speechifyUri = existingSpeechifyUri ?: SpeechifyURI.generate(
            SpeechifyEntityType.LIBRARY_ITEM,
        )

        val itemRequiringImport = getOrCreateItemRequiringImport<ItemRequiringImport.UrlImport>(
            speechifyUri = speechifyUri,
            options = options,
        ) {
            it.copy(sourceURL = url)
        }

        platformImportService.createInitializingItem(
            speechifyUri,
            options,
            RecordType.WEB,
            mergeIfAlreadyExists = false,
        )
        try {
            val file = if (itemRequiringImport.primaryFileBlobStorageKey != null) {
                blobStorageAdapter.coGetBlob(itemRequiringImport.primaryFileBlobStorageKey).orThrow()
            } else {
                null
            } ?: run {
                val file = httpClient.getBinaryContentReadableRandomly(
                    url = url,
                )
                    /* TODO investigate if `toFailureIfNoContentType` is still needed.
                     *  It is there just to keep historical behavior.
                     */
                    .toFailureIfNoContentType(url)
                    .orThrow()

                // Also store a copy to make resuming the import easier.
                val blobStorageKey = BlobStorageKey("importer-${speechifyUri.id}")
                blobStorageAdapter.coPutBlob(blobStorageKey, file).orThrow()
                dbService.getPendingImportQueries().updatePrimaryFileBlobStorageKey(blobStorageKey, speechifyUri)

                file
            }
            telemetryEventBuilder?.addProperty("contentType", file.mimeType!!)

            // Update mime type of the local import item
            dbService.getPendingImportQueries().updateMimeType(file.mimeType, speechifyUri)

            // Start import, continuing after handoff to backend - doesn't wait for it to *complete*.
            platformImportService.createFileFromWebLink(
                sourceURL = url,
                /* TODO: Consider uploading to bucket and `publicBucketFileLocation`, so that backend doesn't hit the URL
                        again (possibly with a different result). Maybe by just consolidating with how `PlatformImportService`
                        uses the `createFileFromWebLink`
                    */
                publicBucketFileLocation = null,
                RecordType.fromMimeType(file.mimeType!!),
                speechifyUri,
                options,
            ).orThrow()

            applyChangesMadeDuringTheImport(speechifyUri)
            removePendingImport(itemRequiringImport)
            importedItemDataForReportingFlow.emit(
                ImportedItemDataForReporting(
                    title = options?.title ?: "unknown",
                    sourceStoredUrl = speechifyUri.toString(),
                    contentType = ContentType.fromMimeType(file.mimeType!!),
                    sourceUrl = url,
                    metadata = importableContentMetadataFlow.value,
                ),
            )
            return SpeechifyFileImpl(file.toFile(), speechifyUri).successfully()
        } catch (e: Throwable) {
            try {
                platformImportService.setItemStatusAsError(speechifyUri).orThrow()
            } catch (exceptionWhileMarkingItemAsError: Throwable) {
                e.addSuppressed(exceptionWhileMarkingItemAsError)
            }

            updateAttemptsCountAndErrorInPendingImport(itemRequiringImport, e)

            throw SDKErrorException(ImportProcessingError(speechifyUri.id, e.toSDKError()), cause = e)
        }
    }

    /**
     * NOTE: This is a workaround for [PLT-2879]:
     * https://linear.app/speechify-inc/issue/PLT-2879/importserviceimport-entry-points-not-working-since-9410-blocking
     * RECOMMENDED FOR USE ONLY BY CHROME EXTENSION —
     * This enables the client to import without having SQLDelight implemented.
     * This is based off of [importFileFromURL] — any changes there should also be propagated here.
     * TODO: research the possibility of making LocalDB service _actually_ optional, instead of having to rely on
     * workarounds such as this, in order to have something cleaner and more seamless.
     */
    fun importFileFromURLNoLocalDb(
        url: String,
        options: ImportOptions? = null,
        /**
         * [importableContentMetadata] (optional) holds import metadata, See [ImportableContentMetadata].
         */
        importableContentMetadata: ImportableContentMetadata,
        callback: Callback<SpeechifyFile>,
    ) = callback.fromCoWithTelemetryLoggingErrors(telemetryEventName = "ImportService.importFileFromURL") {
        importableContentMetadataFlow.emit(importableContentMetadata)
        it.addProperty("url", url)
        it.addProperty(IsItemImportForDiagnosticsTelemetryProp.toPairWithVal(true))
        it.addImportableContentMetadataProperties(importableContentMetadata)
        val speechifyUri = SpeechifyURI.generate(
            SpeechifyEntityType.LIBRARY_ITEM,
        )

        platformImportService.createInitializingItem(
            speechifyUri,
            options,
            RecordType.WEB,
            mergeIfAlreadyExists = false,
        )
        try {
            val file = httpClient.getBinaryContentReadableRandomly(
                url = url,
            )
                /* TODO investigate if `toFailureIfNoContentType` is still needed.
                 *  It is there just to keep historical behavior.
                 */
                .toFailureIfNoContentType(url)
                .orThrow()

            // Also store a copy to make resuming the import easier.
            val blobStorageKey = BlobStorageKey("importer-${speechifyUri.id}")
            blobStorageAdapter.coPutBlob(blobStorageKey, file).orThrow()

            it.addProperty("contentType", file.mimeType!!)

            // Start import, continuing after handoff to backend - doesn't wait for it to *complete*.
            platformImportService.createFileFromWebLink(
                sourceURL = url,
                /* TODO: Consider uploading to bucket and `publicBucketFileLocation`, so that backend doesn't hit the URL
                        again (possibly with a different result). Maybe by just consolidating with how `PlatformImportService`
                        uses the `createFileFromWebLink`
                    */
                publicBucketFileLocation = null,
                RecordType.fromMimeType(file.mimeType!!),
                speechifyUri,
                options,
            ).orThrow()

            importedItemDataForReportingFlow.emit(
                ImportedItemDataForReporting(
                    title = options?.title ?: "unknown",
                    sourceStoredUrl = speechifyUri.toString(),
                    contentType = ContentType.fromMimeType(file.mimeType!!),
                    sourceUrl = url,
                    metadata = importableContentMetadataFlow.value,
                ),
            )
            SpeechifyFileImpl(file.toFile(), speechifyUri).successfully()
        } catch (e: Throwable) {
            try {
                platformImportService.setItemStatusAsError(speechifyUri).orThrow()
            } catch (exceptionWhileMarkingItemAsError: Throwable) {
                e.addSuppressed(exceptionWhileMarkingItemAsError)
            }

            throw SDKErrorException(ImportProcessingError(speechifyUri.id, e.toSDKError()), cause = e)
        }
    }

    /**
     * Import the content of this data as a File into your Library root folder.
     * @return [SpeechifyURI] of the import within Speechify Library.
     *
     * In case of Failure, returns an [ImportProcessingError] which contains the ID of the item
     * (itemId), and the error message (sdkError)
     *
     * * For PDF files - use MIME type `application/pdf`
     * * For TXT files - use MIME type `text/plain`
     * * For HTML files - use MIME type `text/html`
     *
     * For JavaScript, there is a convenience overload of `importBlobByUpload`.
     *
     * @param importableContentMetadata (optional) holds import metadata, See [ImportableContentMetadata].
     */
    fun importFileByUpload(
        /**
         *  The file that you want to upload into your Library root folder.
         */
        @Suppress(
            /* `NON_EXPORTABLE_TYPE` is unnecessary because the `actual` type is exported. */
            "NON_EXPORTABLE_TYPE",
        )
        file: BinaryContentReadableRandomly,
        mimeType: MimeType,
        options: ImportOptions,
        importableContentMetadata: ImportableContentMetadata,
        callback: Callback<SpeechifyURI>,
    ) = callback.fromCoWithTelemetryLoggingErrors(telemetryEventName = "ImportService.importFileByUpload") {
        importableContentMetadataFlow.emit(importableContentMetadata)
        it.addProperty("mimeType", mimeType)
        it.addProperty(IsItemImportForDiagnosticsTelemetryProp.toPairWithVal(true))
        it.addImportableContentMetadataProperties(importableContentMetadata)
        options.analyticsProperties.set(
            "contentSubType",
            importableContentMetadata.contentSubType,
        )
        importFileByUpload(
            createSpeechifyUriWithInitializingLibraryItem(
                RecordType.fromMimeType(mimeType),
                SpeechifyEntityType.LIBRARY_ITEM,
                options,
            ),
            file,
            mimeType,
            options,
        )
    }

    /**
     * Imports a batch of files to your library.
     */
    fun importFilesByBatchUpload(
        @Suppress("NON_EXPORTABLE_TYPE")
        importFilesBatchOperationInputs: Array<ImportFilesBatchOperationInput<BinaryContentReadableRandomly>>,
        progressCallback: Callback<BatchOperationProgressResult<SpeechifyURI>>,
        completionCallback: Callback<Array<Result<SpeechifyURI>>>,
    ) {
        val importedItemsResultList = mutableListOf<Result<SpeechifyURI>>()
        progressCallback.multiShotFromFlowIn(
            flow = importFilesBatchOperationInputs.asFlow().map { batchOperationInput ->
                withTelemetry("ImportService.importFileByUpload") {
                    it.addProperty("mimeType", batchOperationInput.mimeType)
                    it.addProperty("fromBatchFileImport", true)
                    it.addProperty(IsItemImportForDiagnosticsTelemetryProp.toPairWithVal(true))
                    it.addImportableContentMetadataProperties(batchOperationInput.importableContentMetadata)
                    try {
                        importFileByUpload(
                            createSpeechifyUriWithInitializingLibraryItem(
                                RecordType.fromMimeType(batchOperationInput.mimeType),
                                SpeechifyEntityType.LIBRARY_ITEM,
                                batchOperationInput.options,
                            ),
                            batchOperationInput.file,
                            batchOperationInput.mimeType,
                            batchOperationInput.options,
                        )
                    } catch (throwable: Throwable) {
                        Result.Failure(SDKError.OtherException(throwable))
                    }
                }
            }.onEach {
                importedItemsResultList.add(it)
            }.withIndex().map { indexedValue ->
                BatchOperationProgressResult(
                    batchInputIndex = indexedValue.index,
                    result = indexedValue.value,
                ).successfully()
            }.onCompletion {
                completionCallback.invoke(
                    importedItemsResultList.toTypedArray().successfully(),
                )
                // Destroy listener when last file is imported.
                importedItemsResultList.clear()
                destroy()
            },
            scope = getGlobalScopeWithContext(),
        ).ignoreValue()
    }

    private suspend fun importFileByUpload(
        existingSpeechifyUri: SpeechifyUriWithInitializer,
        file: BinaryContentReadableRandomly,
        mimeType: MimeType,
        /**
         * (optional) An [ImportOptions] which contains title and parentFolderId
         */
        options: ImportOptions? = null,
    ): Result<SpeechifyURI> {
        val listenableBinaryContentPayload = ListenableBinaryContentPayload.createForBinaryContentWithNativeApi(
            content = file,
            mimeType = mimeType.ensureListenableMimeType(contentTypeForFallback = null),
            sourceUrl = null,
        ).orReturn { return it }

        return contentBundler.coCreateBundleForBinaryContent(
            payload = listenableBinaryContentPayload,
            deviceLocalContent = null,
            importerFactory =
            @JsExport.Ignore
            object : BinaryContentImporterFactory {
                override fun createContentImporter(
                    payload: ImportableContentPayload.ImportableContentPayloadOfSingleBlob,
                    deviceLocalContent: LibraryItem.DeviceLocalContent?,
                    customProperties: Sequence<Pair<String, Any>>,
                    importStartChoice: ImportStartChoice,
                    bundleMetadata: BundleMetadata?,
                ): ContentImporter {
                    return ContentImporterImpl(
                        placeholderItemURI = existingSpeechifyUri,
                        existingLocalItem = deviceLocalContent,
                        startImportAction = { options: ImportOptions, speechifyURI: SpeechifyUriWithInitializer? ->
                            if (importStartChoice !is ImportStartChoice.DoNotStart) {
                                throw IllegalStateException(
                                    /** This should never be non-DoNotStart because we start manually
                                     * below - see #StartManually. */
                                    "importStartChoice changed to $importStartChoice, despite input was DoNotStart",
                                )
                            }
                            this@ImportService.getOrCreateSharedFlowImportingBinaryContentByUpload(
                                payload = payload,
                                options = options,
                                existingSpeechifyUri = speechifyURI ?: createSpeechifyUriWithInitializingLibraryItem(
                                    recordType = payload.recordType,
                                    speechifyEntityType = SpeechifyEntityType.LIBRARY_ITEM,
                                    options,
                                ),
                                customProperties = customProperties,
                            )
                        },
                        libraryServiceDelegate = libraryServiceDelegate,
                    )
                }
            },
            importStartChoice =
            /** `DoNotStart` because we #StartManually just below */
            ImportStartChoice.DoNotStart,
            bundleMetadata = null,
        ).orThrow().use { contentBundle ->
            importedItemDataForReportingFlow.emit(
                ImportedItemDataForReporting(
                    title = options?.title ?: "unknown",
                    sourceStoredUrl = existingSpeechifyUri.finalizedSpeechifyUri.await().toString(),
                    contentType = listenableBinaryContentPayload.libraryItemContentType,
                    sourceUrl = null,
                    metadata = importableContentMetadataFlow.value,
                ),
            )
            contentBundle.importer.startImport(options ?: ImportOptions())
        }
    }

    /**
     * If the import for the given URI is already running returns the existing flow, otherwise creates a new flow
     * to import the given item.
     */
    internal suspend fun getOrCreateSharedFlowImportingBinaryContentByUpload(
        existingSpeechifyUri: SpeechifyUriWithInitializer,
        payload: ImportableContentPayload.ImportableContentPayloadOfSingleBlob,
        options: ImportOptions,
        customProperties: Sequence<Pair<String, Any>> = emptySequence(),
    ): SharedFlowWithOwningJob<ImportProgress> =
        currentlyRunningImports.getOrPut(existingSpeechifyUri.unconfirmedSpeechifyUri) {
            val speechifyUri = existingSpeechifyUri.unconfirmedSpeechifyUri
            val itemRequiringImportDeferred = launchAsync {
                getOrCreateItemRequiringImport<ItemRequiringImport.FileImport>(
                    speechifyUri,
                    options,
                ) { uninitializedPendingImport ->
                    // For binary content we need to make sure that the file is copied to the blob storage, so we can access it
                    // when the app restarts.
                    val blobStorageKey = BlobStorageKey("importer-${speechifyUri.id}")
                    when (val binaryContentPayload = payload.listenableBinaryContentPayload) {
                        is ListenableBinaryContentPayload.Html -> {
                            blobStorageAdapter.coPutBytes(
                                blobStorageKey,
                                binaryContentPayload
                                    .contentWithMimeType
                                    .toBinaryContentWithMimeTypeReadableSequentially(),
                            ).orThrow()
                        }

                        is ListenableBinaryContentPayload.Pdf -> {
                            blobStorageAdapter.coPutBlob(
                                blobStorageKey,
                                binaryContentPayload.contentWithMimeType,
                            ).orThrow()
                        }

                        is ListenableBinaryContentPayload.PlainTextOrMarkdown -> {
                            blobStorageAdapter.coPutBytes(
                                blobStorageKey,
                                binaryContentPayload
                                    .contentWithMimeType
                                    .toBinaryContentWithMimeTypeReadableSequentially(),
                            ).orThrow()
                        }

                        is ListenableBinaryContentPayload.Epub -> {
                            blobStorageAdapter.coPutBlob(
                                blobStorageKey,
                                binaryContentPayload.contentWithMimeType,
                            ).orThrow()
                        }

                        is ListenableBinaryContentPayload.SpeechifyBook ->
                            throw UnsupportedOperationException("Speechify book not supported for import")
                    }

                    uninitializedPendingImport.copy(
                        primaryFileBlobStorageKey = blobStorageKey,
                        sourceURL = payload.listenableBinaryContentPayload.sourceUrl,
                        mimeType = payload.listenableBinaryContentPayload.contentWithMimeType.mimeType,
                    )
                }
            }

            coImportBinaryContentByUpload(
                finalizedSpeechifyUri = existingSpeechifyUri.finalizedSpeechifyUri,
                itemRequiringImportDeferred = itemRequiringImportDeferred,
                payload = payload,
                options = options,
                customProperties = customProperties,
            ).withRemovalFromRunningImportsOnCompletion(existingSpeechifyUri.unconfirmedSpeechifyUri)
                .emittingToFlowOfAllImportEvents()
                .shareFinishingAlwaysReturningLastItemWithJobIn(
                    /* `ChildCoroutineErrorBehavior.DONT_PROPAGATE_TO_PARENT` so we don't fail this entire service on a
                     *  single import-flow failure. */
                    onError = ChildCoroutineErrorBehavior.DONT_PROPAGATE_TO_PARENT,
                    scope = this@ImportService.scope,
                )
        }

    private fun coImportBinaryContentByUpload(
        finalizedSpeechifyUri: Deferred<SpeechifyURI>,
        itemRequiringImportDeferred: Deferred<ItemRequiringImport>,
        payload: ImportableContentPayload.ImportableContentPayloadOfSingleBlob,
        options: ImportOptions? = null,
        customProperties: Sequence<Pair<String, Any>> = emptySequence(),
    ): Flow<ImportProgress> = flowWithTelemetryOfCollectingAllItems(
        telemetryEventName = "ImportService.importFileByUpload",
    ) {
        it.addProperty(
            "contentType",
            payload.listenableBinaryContentPayload.contentWithMimeType.mimeType?.fullString,
        )
        it.addProperty(IsItemImportForDiagnosticsTelemetryProp.toPairWithVal(true))
        it.addProperties(*customProperties.toList().toTypedArray())
        // Await the initializing item and the db item being created.
        val pendingImport = it.addMeasurement("storeImportInDb") {
            itemRequiringImportDeferred.await()
        }
        emit(ImportProgress.Started(pendingImport))
        val speechifyUri = finalizedSpeechifyUri.await()
        it.addProperty("id", speechifyUri.id)

        try {
            when (payload) {
                is ImportableContentPayload.Pdf -> {
                    platformImportService.processPdf(
                        pdfContentPayload = payload,
                        speechifyUri,
                        options,
                        customProperties,
                    )
                }

                is ImportableContentPayload.PlainTextOrMarkdown -> {
                    platformImportService.processTxtFile(
                        txtContentPayload = payload,
                        speechifyUri,
                        options,
                        customProperties,
                    )
                }

                is ImportableContentPayload.Html -> {
                    platformImportService.processHtml(
                        htmlContentPayload = payload,
                        speechifyUri,
                        options,
                        customProperties,
                    )
                }

                is ImportableContentPayload.Epub -> {
                    platformImportService.processEpub(
                        epubContentPayload = payload,
                        options = options,
                        speechifyUri = speechifyUri,
                    )
                }

                is ImportableContentPayload.SpeechifyBook ->
                    throw UnsupportedOperationException("Speechify book not supported for upload")
            }.orThrow()
        } catch (e: CancellationException) {
            removePendingImport(pendingImport)
            emit(ImportProgress.Canceled(speechifyUri))
            return@flowWithTelemetryOfCollectingAllItems
        } catch (e: Throwable) {
            updateAttemptsCountAndErrorInPendingImport(pendingImport, e)
            throw SDKErrorException(ImportProcessingError(speechifyUri.id, e.toSDKError()), cause = e)
        }

        applyChangesMadeDuringTheImport(speechifyUri)
        removePendingImport(pendingImport)

        emit(ImportProgress.Finished(speechifyUri))
    }

    /**
     * Import shared content, given the shared item id (supports v1/v2), into your Library root folder
     * @return a URI referencing the content that is now available through your Library.
     * This might be a copy of the original.
     */
    fun importSharedContent(sharedItemId: String, callback: Callback<SpeechifyURI>) =
        callback.fromCoWithTelemetryLoggingErrors(telemetryEventName = "ImportService.importSharedContent") {
            it.addProperty("sharedItemId", sharedItemId)
            it.addProperty(IsItemImportForDiagnosticsTelemetryProp.toPairWithVal(true))
            val speechifyURI = importSharedContent(sharedItemId).orReturn { return@fromCoWithTelemetryLoggingErrors it }
            it.addProperty("id", speechifyURI.id)
            it.addProperty("contentType", TelemetryContentType.SharedContent.contentType)
            speechifyURI.successfully()
        }

    internal suspend fun importSharedContent(sharedItemId: String): Result<SpeechifyURI> {
        return platformImportService.copySharedItemToUsersLibrary(sharedItemId)
    }

    /**
     * Import these files with their respective [OCRResult]s as a new scanned book into the user's library.
     * @param importableContentMetadata (optional) holds import metadata, See [ImportableContentMetadata].
     * @return a [SpeechifyURI] of the new library item.
     */
    fun importScannedBook(
        files: Array<OCRFile>,
        options: ImportOptions,
        importableContentMetadata: ImportableContentMetadata,
        callback: Callback<SpeechifyURI>,
    ) = callback.fromCoWithTelemetryLoggingErrors(telemetryEventName = "ImportService.importScannedBook") {
        importableContentMetadataFlow.emit(importableContentMetadata)
        options.analyticsProperties.set(
            "contentSubType",
            importableContentMetadata.contentSubType,
        )
        it.addProperty(IsItemImportForDiagnosticsTelemetryProp.toPairWithVal(true))
        it.addImportableContentMetadataProperties(importableContentMetadata)
        getOrCreateSharedFlowImportingScannedBook(
            existingSpeechifyUri = createSpeechifyUriWithInitializingLibraryItem(
                RecordType.SCAN,
                SpeechifyEntityType.SCANNED_BOOK,
                options,
            ),
            LazyOCRFilesFromList(files.toList()),
            options,
        ).sharedFlowThatFinishes.waitUntilFinished().uri.successfully()
    }

    /**
     * Import these files with their respective [OCRResult]s as a new scanned book into the user's library.
     * @param importableContentMetadata (optional) holds import metadata, See [ImportableContentMetadata].
     * @return a [SpeechifyURI] of the new library item.
     */
    fun importScannedBookFromLazyOcrFiles(
        files: LazyOCRFiles,
        options: ImportOptions?,
        importableContentMetadata: ImportableContentMetadata,
        callback: Callback<SpeechifyURI>,
    ) = callback.fromCoWithTelemetryLoggingErrors(
        telemetryEventName = "ImportService.importScannedBookFromLazyOcrFiles",
    ) {
        importableContentMetadataFlow.emit(importableContentMetadata)
        val updatedOptions = if (options == null) {
            ImportOptions()
        } else {
            options
        }
        updatedOptions.analyticsProperties.set(
            "contentSubType",
            importableContentMetadata.contentSubType,
        )
        it.addProperty(IsItemImportForDiagnosticsTelemetryProp.toPairWithVal(true))
        it.addImportableContentMetadataProperties(importableContentMetadata)
        getOrCreateSharedFlowImportingScannedBook(
            existingSpeechifyUri = createSpeechifyUriWithInitializingLibraryItem(
                RecordType.SCAN,
                SpeechifyEntityType.SCANNED_BOOK,
                options,
            ),
            files,
            updatedOptions,
        ).sharedFlowThatFinishes.waitUntilFinished().uri.successfully()
    }

    /**
     * If the import for the given URI is already running returns the existing flow, otherwise creates a new flow
     * to import the given item.
     */
    internal suspend fun getOrCreateSharedFlowImportingScannedBook(
        existingSpeechifyUri: SpeechifyUriWithInitializer,
        files: LazyOCRFiles,
        options: ImportOptions?,
        shouldIgnoreLoggingEvent: Boolean = false,
    ): SharedFlowWithOwningJob<ImportProgress> =
        currentlyRunningImports.getOrPut(existingSpeechifyUri.unconfirmedSpeechifyUri) {
            val uri = existingSpeechifyUri.unconfirmedSpeechifyUri
            val itemRequiringImportDeferred = launchAsync {
                getOrCreateItemRequiringImport<ItemRequiringImport.ScannedPagesImport>(
                    speechifyUri = uri,
                    options = options,
                ) { uninitializedPendingImport ->
                    val dbScannedPages = files.allFilesAsFlow.withIndex().map { (index, file) ->
                        // We copy all images to the blob storage, so we can access them when the app restarts.
                        val blobStorageKey = BlobStorageKey("importer-${uri.id}-page-$index")
                        blobStorageAdapter.coPutBlob(blobStorageKey, file).orThrow()

                        val ocrResultDeferred = files.getCachedOcrableImageOrNullAsync(index)
                        val ocrResult = ocrResultDeferred
                            ?.takeIf { it.isCompleted }
                            ?.await()
                            ?.let { ocRableImage ->
                                /**
                                 * If the OCR result is ready, we can convert it to the [OCRResult] immediately.
                                 * We want to avoid blocking here because we want to save the import as soon as possible.
                                 */
                                if (ocRableImage.isOcrTextResultReady) {
                                    ocRableImage.toOcrResult(ocrAdapter.ocrMaxConcurrencyGuard).orThrow()
                                } else {
                                    null
                                }
                            }
                        ScannedPage(
                            speechifyUri = uri,
                            pageIndex = index,
                            blobStorageKey = blobStorageKey,
                            ocrResult = ocrResult,
                        )
                    }.toList()

                    saveOcrResultIntoDbWhenAvailable(files, uri)

                    dbScannedPages.forEach { scanPage ->
                        dbService.getScannedPageQueries()
                            .addScannedPage(scanPage)
                    }
                    uninitializedPendingImport
                }
            }

            coImportScannedBook(
                existingSpeechifyUri.finalizedSpeechifyUri,
                itemRequiringImportDeferred,
                files,
                options,
                shouldIgnoreLoggingEvent,
            ).withRemovalFromRunningImportsOnCompletion(existingSpeechifyUri.unconfirmedSpeechifyUri)
                .emittingToFlowOfAllImportEvents()
                .shareFinishingAlwaysReturningLastItemWithJobIn(
                    /* `ChildCoroutineErrorBehavior.DONT_PROPAGATE_TO_PARENT` so we don't fail this entire service on a
                     *  single import-flow failure. */
                    onError = ChildCoroutineErrorBehavior.DONT_PROPAGATE_TO_PARENT,
                    scope = this@ImportService.scope,
                )
        }

    /**
     * Launches a separate coroutine to save the OCR results as soon as they are ready.
     */
    internal fun saveOcrResultIntoDbWhenAvailable(files: LazyOCRFiles, uri: SpeechifyURI) =
        launchTopLevel {
            try {
                files.flowOfOcrableResultWhenAvailable
                    .collect { (index, ocrResult) ->
                        val scannedPage = dbService.getScannedPageQueries().getScannedPageBySpeechifyUriAndPageIndex(
                            uri,
                            index,
                        ).awaitAsFirstOrNull() ?: return@collect
                        if (scannedPage.ocrResult != null) {
                            return@collect
                        }

                        dbService.getScannedPageQueries()
                            .updateOcrResult(
                                ocrResult,
                                uri,
                                index,
                            )
                    }
            } catch (ex: CancellationException) {
                /**
                 * This is expected when the import is completed or all the ocr results are saved.
                 * Do nothing.
                 */
            }
        }.ignoreValue()

    private fun coImportScannedBook(
        finalizedSpeechifyUri: Deferred<SpeechifyURI>,
        itemRequiringImportDeferred: Deferred<ItemRequiringImport>,
        files: LazyOCRFiles,
        options: ImportOptions?,
        shouldIgnoreLoggingEvent: Boolean,
    ): Flow<ImportProgress> = flowWithTelemetryOfCollectingAllItems(
        telemetryEventName = "ImportService.importScannedBook",
    ) {
        it.addProperty("numberOfPages", files.numberOfFiles)
        it.addProperty(IsItemImportForDiagnosticsTelemetryProp.toPairWithVal(true))
        val pendingImport = it.addMeasurement("storeImportInDb") {
            itemRequiringImportDeferred.await()
        }
        emit(ImportProgress.Started(pendingImport))
        val speechifyUri = finalizedSpeechifyUri.await()
        it.addProperty("id", speechifyUri.id)
        it.addProperty("contentType", TelemetryContentType.ScannedBook.contentType)
        try {
            platformImportService
                .importScannedBook(
                    files.getAllOCRFilesAsFlow(ocrAdapter.ocrMaxConcurrencyGuard),
                    speechifyUri,
                    options,
                    currentTelemetryEvent(),
                ).orThrow()
        } catch (e: Throwable) {
            updateAttemptsCountAndErrorInPendingImport(pendingImport, e)
            throw SDKErrorException(ImportProcessingError(speechifyUri.id, e.toSDKError()), cause = e)
        }

        applyChangesMadeDuringTheImport(speechifyUri)
        removePendingImport(pendingImport)

        if (!shouldIgnoreLoggingEvent) {
            importedItemDataForReportingFlow.emit(
                ImportedItemDataForReporting(
                    title = options?.title ?: "unknown",
                    sourceStoredUrl = speechifyUri.toString(),
                    contentType = ContentType.SCAN,
                    sourceUrl = null,
                    metadata = importableContentMetadataFlow.value,
                ),
            )
        }
        emit(ImportProgress.Finished(speechifyUri))
    }

    private fun Flow<ImportProgress>.withRemovalFromRunningImportsOnCompletion(
        uri: SpeechifyURI,
    ): Flow<ImportProgress> =
        onCompletion {
            currentlyRunningImports.remove(uri)
        }

    private fun Flow<ImportProgress>.emittingToFlowOfAllImportEvents(): Flow<ImportProgress> =
        onEach {
            flowOfAllImportEvents.emit(it)
        }

    private suspend fun Flow<ImportProgress>.waitUntilFinished() =
        this.filterIsInstance(ImportProgress.Finished::class).first()

    /**
     * Import the content of a parsed, clean HTML file to the Library
     * In case of Failure, returns an [ImportProcessingError] which contains the ID of the item
     * (itemId), and the error message (sdkError)
     * @param htmlFile The parsed html file that you want to upload into your Library root folder.
     * @param coverImageFile (optional) The desired cover image associated with this item in the Library.
     * @param sourceUrl the original source URL of the item to be imported
     * @param options (optional) An [ImportOptions] which contains title and parentFolderId
     * @param importableContentMetadata (optional) holds import metadata, See [ImportableContentMetadata].
     * @param callback Notifies the app the success/failure of the import within Speechify Library.
     */
    @Deprecated(
        message = "This is a temporary workaround that'll be removed once main import APIs get better parsing support.",
    )
    fun importFileByParsedHtml(
        htmlFile: File,
        coverImageFile: File? = null,
        sourceUrl: String,
        options: ImportOptions? = null,
        importableContentMetadata: ImportableContentMetadata,
        callback: Callback<SpeechifyFile>,
    ) = callback.fromCoWithTelemetryLoggingErrors(telemetryEventName = "ImportService.importFileByParsedHtml") {
        importableContentMetadataFlow.emit(importableContentMetadata)
        it.addProperty("contentType", htmlFile.contentType)
        it.addProperty("sourceUrl", sourceUrl)
        it.addProperty(IsItemImportForDiagnosticsTelemetryProp.toPairWithVal(true))
        it.addImportableContentMetadataProperties(importableContentMetadata = importableContentMetadata)

        val speechifyUri =
            contentBundler.coCreateBundleForUnimportedBinaryContent(
                payload = ListenableBinaryContentPayload.Html(
                    contentWithMimeType = htmlFile,
                    sourceUrl = sourceUrl,
                ),
                deviceLocalContent = null,
                importStartChoice = ImportStartChoice.DoNotStart,
                bundleMetadata = null,
            ).orThrow()
                .use { contentBundle ->
                    importFileByParsedHtml(
                        contentBundle = contentBundle,
                        htmlFile = htmlFile,
                        htmlOptions = HtmlContentLoadOptions(
                            sourceUrl = sourceUrl,
                            isEntireDocument = null,
                            isPostJavaScriptExecution = null,
                            isPostContentExtraction = null,
                        ),
                        coverImageFile = coverImageFile,
                        importOptions = options,
                    )
                }
                .orReturn {
                    return@fromCoWithTelemetryLoggingErrors it
                }

        importedItemDataForReportingFlow.emit(
            ImportedItemDataForReporting(
                title = options?.title ?: "unknown",
                sourceStoredUrl = speechifyUri.toString(),
                contentType = ContentType.HTML,
                sourceUrl = null,
                metadata = importableContentMetadataFlow.value,
            ),
        )
        SpeechifyFileImpl(htmlFile, speechifyUri).successfully()
    }

    /**
     * Imports as a user's library-item the HTML-tree represented by [htmlElement].
     */
    fun importHtmlFromSdkElement(
        htmlElement: DOMElement,
        htmlOptions: HtmlContentLoadOptions,
        coverImageFile: File? = null,
        importOptions: ImportOptions? = null,
        bundleMetadata: BundleMetadata? = null,
        callback: Callback<SpeechifyURI>,
    ) = callback.fromCoWithTelemetryLoggingErrors(telemetryEventName = "ImportService.importFileByParsedHtml") {
        importableContentMetadataFlow.emit(bundleMetadata)
        it.addProperty("sourceUrl", htmlOptions.sourceUrl)
        it.addProperty(IsItemImportForDiagnosticsTelemetryProp.toPairWithVal(true))

        val htmlFile = htmlElement.serializeHtmlToFile()
        it.addProperty("contentType", htmlFile.contentType)

        return@fromCoWithTelemetryLoggingErrors contentBundler
            .createBundleForParsedHtml(
                htmlSerialized = htmlElement.serializeHtmlToFile(),
                htmlElement = htmlElement,
                options = htmlOptions,
                importStartChoice = ImportStartChoice.DoNotStart,
                bundleMetadata = bundleMetadata,
            )
            .use { contentBundle ->
                importFileByParsedHtml(
                    contentBundle = contentBundle,
                    htmlFile = htmlFile,
                    htmlOptions = htmlOptions,
                    coverImageFile = coverImageFile,
                    importOptions = importOptions,
                )
            }
    }

    /**
     * This will immediately attempt importing the given [LibraryItem.DeviceLocalContent] into the user's library.
     * If an import is already running, for example the one automatically started on app start, or one triggered from
     * the bundles content importer, this will return the result of that import, otherwise a new import process is
     * started.
     *
     * @param libraryItem A [LibraryItem.DeviceLocalContent] that should be imported.
     * @param callback Notifies the app the success/failure of the import within Speechify Library.
     */
    fun importDeviceLocalContentToLibrary(
        libraryItem: LibraryItem.DeviceLocalContent,
        callback: Callback<SpeechifyURI>,
    ) = callback
        .fromCoWithTelemetryLoggingErrors(telemetryEventName = "ImportService.importDeviceLocalContentToLibrary") {
            val itemRequiringImport = libraryItem.underlyingItemRequiringImport
            try {
                it.addProperty("id", itemRequiringImport.speechifyUri.id)
                it.addProperty("contentType", itemRequiringImport.telemetryContentType)
                it.addProperty(IsItemImportForDiagnosticsTelemetryProp.toPairWithVal(true))

                return@fromCoWithTelemetryLoggingErrors performImportForItemRequiringImport(itemRequiringImport)
                    .successfully()
            } catch (e: Exception) {
                updateAttemptsCountAndErrorInPendingImport(itemRequiringImport, e)
                throw e
            }
        }

    @Deprecated(
        message = "This is a temporary workaround that'll be removed once main import APIs get better parsing support.",
    )
    internal suspend fun coImportFileByParsedHtml(
        htmlFile: File,
        coverImageFile: File? = null,
        sourceUrl: String,
        options: ImportOptions? = null,
        importableContentMetadata: ImportableContentMetadata,
    ) = suspendCoroutine {
        importFileByParsedHtml(
            htmlFile = htmlFile,
            coverImageFile = coverImageFile,
            sourceUrl = sourceUrl,
            options = options,
            importableContentMetadata = importableContentMetadata,
            callback = it::resume,
        )
    }

    internal suspend fun createInitializingItem(
        speechifyUri: SpeechifyURI,
        options: ImportOptions?,
        recordType: RecordType,
        mergeIfAlreadyExists: Boolean,
        customProperties: Sequence<Pair<String, Any>> = emptySequence(),
    ) = platformImportService.createInitializingItem(
        speechifyUri,
        options,
        recordType,
        mergeIfAlreadyExists,
        customProperties,
    )

    private suspend fun importFileByParsedHtml(
        contentBundle: ContentBundle,
        htmlFile: File,
        htmlOptions: HtmlContentLoadOptions?,
        coverImageFile: File?,
        importOptions: ImportOptions?,
    ): Result<SpeechifyURI> {
        val speechifyUri = SpeechifyURI.generate(
            SpeechifyEntityType.LIBRARY_ITEM,
        )

        platformImportService.processParsedHtmlFile(
            htmlOptions = htmlOptions,
            htmlFile = htmlFile,
            coverImgFile = coverImageFile,
            speechifyUri = speechifyUri,
            options = importOptions,
            payload = ImportableContentPayload.Html(
                listenableBinaryContentPayload = ListenableBinaryContentPayload.Html(
                    contentWithMimeType = htmlFile,
                    sourceUrl = htmlOptions?.sourceUrl,
                ),
                parsedContentsForImport = ImportableContentPayload.ImportableContentPayloadOfSingleBlob
                    .ParsedContentsForImportOfSingleBlob(
                        contentBundle = CompletableDeferred(
                            value = contentBundle,
                        ),
                    ),
            ),
        )
            .mapFailure { error ->
                ImportProcessingError(
                    itemId = speechifyUri.id,
                    sdkError = error,
                )
            }
            .orReturn { return it }

        importedItemDataForReportingFlow.emit(
            ImportedItemDataForReporting(
                title = importOptions?.title ?: "unknown",
                sourceStoredUrl = speechifyUri.toString(),
                contentType = ContentType.HTML,
                sourceUrl = htmlOptions?.sourceUrl,
                metadata = importableContentMetadataFlow.value,
            ),
        )

        return speechifyUri
            .successfully()
    }

    /**
     * Called on app start to trigger retrying any imports that werent previously finished.
     */
    private suspend fun triggerPendingImports() = withTelemetry("ImportService.triggerPendingImports") {
        // This already only returns imports that have pending retries.
        // We need to wait for a user to be available because some platform can only provide user after initialising SDK as mentioned https://speechifyworkspace.slack.com/archives/C02LEG7AEGM/p1696863123522359?thread_ts=1696606948.902989&cid=C02LEG7AEGM
        val currentUserId = authService.awaitNonNullUser().uid
        val pendingImportsDb = dbService.getPendingImportQueriesForOptionalAction()
            /** `return` if SDK-consumer doesn't support database, because there couldn't have been any pending
             * imports saved.
             */
            ?: return@withTelemetry

        val pendingImports = pendingImportsDb
            .getResumableImportsWithLocalListeningProgress(
                attemptsPerformedCount = MAXIMUM_ATTEMPTS,
                owner = currentUserId,
                mapper = this::mapPendingImportWithListeningProgress,
            )
            .awaitAsList()
        it.addProperty("numberOfPendingImports", pendingImports.size)
        pendingImports.map { pendingImport ->
            ItemRequiringImport.fromDb(
                pendingImport = pendingImport,
                isImportCurrentlyRunning = false,
                obtainScannedPageFiles = getScannedPageFilesDeferred(pendingImport.speechifyUri)::await,
            )
        }.forEach { itemRequiringImport ->
            try {
                performImportForItemRequiringImport(itemRequiringImport)
            } catch (e: Exception) {
                Log.e(
                    "Failed to process pending import.",
                    e,
                    sourceAreaId = "ImportService.triggerPendingImports",
                )
                updateAttemptsCountAndErrorInPendingImport(itemRequiringImport, e)
            }
        }
    }

    private suspend fun performImportForItemRequiringImport(itemRequiringImport: ItemRequiringImport): SpeechifyURI {
        return when (itemRequiringImport) {
            is ItemRequiringImport.FileImport -> {
                val binaryContent =
                    blobStorageAdapter.coGetBlob(itemRequiringImport.primaryFileBlobStorageKey).orThrow()
                        ?: throw NullPointerException("Binary content is null, cannot resume import.")

                importFileByUpload(
                    SpeechifyUriWithInitializer(
                        itemRequiringImport.speechifyUri,
                        launchAsync { itemRequiringImport.speechifyUri },
                    ),
                    binaryContent.binaryContent,
                    binaryContent.mimeType
                        ?: throw NullPointerException("Binary content mime type is null cannot resume import."),
                    itemRequiringImport.importOptions,
                ).orThrow()
            }

            is ItemRequiringImport.ScannedPagesImport -> {
                val ocrFiles = LazyOCRFilesFromDbOcrFile(
                    initialScannedPages = itemRequiringImport.scannedPages,
                    blobStorageAdapter = blobStorageAdapter,
                    ocrAdapter = ocrAdapter,
                    obtainScannedPageForImportingItem = { index ->
                        getScannedPagesFor(itemRequiringImport.speechifyUri, index)
                    },
                )

                saveOcrResultIntoDbWhenAvailable(ocrFiles, itemRequiringImport.speechifyUri)

                getOrCreateSharedFlowImportingScannedBook(
                    existingSpeechifyUri = SpeechifyUriWithInitializer(
                        itemRequiringImport.speechifyUri,
                        launchAsync { itemRequiringImport.speechifyUri },
                    ),
                    ocrFiles,
                    itemRequiringImport.importOptions,
                ).sharedFlowThatFinishes.waitUntilFinished().uri
            }

            is ItemRequiringImport.UrlImport -> {
                importFileFromURL(
                    itemRequiringImport.speechifyUri,
                    itemRequiringImport.sourceURL,
                    itemRequiringImport.importOptions,
                ).orThrow().uri
            }
        }
    }

    private suspend inline fun <reified T : ItemRequiringImport> getOrCreateItemRequiringImport(
        speechifyUri: SpeechifyURI,
        options: ImportOptions?,
        /**
         * Note this block is only called if there is no existing import for the given [speechifyUri] in the DB yet.
         * This block will be called with a partially setup [PendingImports] object.
         * Perform any actions that need to happen only once before attempting to import the file.
         * You can return an updated object with any fields that need to be set initially.
         */
        initializerAction: (importToBeCreated: DBPendingImport) -> DBPendingImport,
    ): T {
        val currentUserId = authService.getCurrentUser().orThrow().uid
        val importQueries = dbService.getPendingImportQueries()

        val existingImport = importQueries.getImportWithLocalListeningProgress(
            speechifyUri = speechifyUri,
            mapper = this::mapPendingImportWithListeningProgress,
        ).awaitAsOneOrNull()
        if (existingImport != null) {
            // No need to run the initializer action if the import already exists.
            return ItemRequiringImport.fromDb(
                pendingImport = existingImport,
                isImportCurrentlyRunning = true,
                obtainScannedPageFiles = getScannedPageFilesDeferred(speechifyUri)::await,
            ) as T
        }

        val importType = when (T::class) {
            ItemRequiringImport.FileImport::class -> ImportType.FILE
            ItemRequiringImport.UrlImport::class -> ImportType.URL
            ItemRequiringImport.ScannedPagesImport::class -> ImportType.SCANNED_PAGES
            else -> throw IllegalArgumentException("Unknown pending import type: ${T::class}")
        }

        val uninitializedImport = DBPendingImport(
            speechifyUri = speechifyUri,
            primaryFileBlobStorageKey = null,
            scannedPages = null,
            sourceURL = null,
            importOptions = options,
            htmlContentLoadOptions = null,
            lastErrorStackTrace = null,
            attemptsPerformedCount = 0,
            importType = importType,
            wasLastErrorConnectionError = null,
            owner = currentUserId,
            lastUpdatedAt = DateTime.now(),
            listeningProgress = null,
            mimeType = null,
        )

        val initializedImport = initializerAction(uninitializedImport)

        importQueries.trackImport(
            initializedImport,
        )

        return ItemRequiringImport.fromDb(
            pendingImport = initializedImport,
            isImportCurrentlyRunning = true,
            obtainScannedPageFiles = getScannedPageFilesDeferred(speechifyUri)::await,
        ) as T
    }

    /**
     * This function will be called after the import is done to apply any values that might have changed during the
     * import process to the existing firestore document.
     */
    private suspend fun applyChangesMadeDuringTheImport(uri: SpeechifyURI) {
        // We refetch the pending import from the DB to apply any changes a user made during the import process.
        // This includes the title, or listening progress.
        val pendingImport = dbService.getPendingImportQueries().getImportWithLocalListeningProgress(
            speechifyUri = uri,
            mapper = this::mapPendingImportWithListeningProgress,
        ).awaitAsOneOrNull()

        if (pendingImport?.listeningProgress != null) {
            platformImportService.setItemListeningProgress(uri, pendingImport.listeningProgress)
        }

        if (pendingImport?.importOptions != null) {
            platformImportService.applyImportOptions(uri, pendingImport.importOptions)
        }
    }

    internal suspend fun stopImportingAndRemoveFromLibrary(itemRequiringImport: ItemRequiringImport) {
        // Make sure if the import is currently running we cancel it.
        val runningImport = currentlyRunningImports[itemRequiringImport.speechifyUri]
        platformImportService.cancelImport(itemRequiringImport.speechifyUri)
        runningImport?.owningJob?.cancelAndJoin(
            exceptionMessage = "User requested import to be stopped.",
        )
        currentlyRunningImports.remove(itemRequiringImport.speechifyUri)

        // And remove the pending import from the DB.
        removePendingImport(itemRequiringImport)
        dbService.getLocalListeningProgressQueries().removeLocalListeningProgress(itemRequiringImport.speechifyUri)
    }

    private suspend fun removePendingImport(itemRequiringImport: ItemRequiringImport) {
        // Finally clear the pending imports table.
        // Remove from the DB and remove from the file system.
        dbService.getPendingImportQueries().removeImport(itemRequiringImport.speechifyUri)
        when (itemRequiringImport) {
            is ItemRequiringImport.FileImport -> {
                blobStorageAdapter.coDeleteBlob(itemRequiringImport.primaryFileBlobStorageKey).orThrow()
            }

            is ItemRequiringImport.ScannedPagesImport -> {
                itemRequiringImport.scannedPages.forEach { dbOcrFile ->
                    blobStorageAdapter.coDeleteBlob(dbOcrFile.localStorageKey).orThrow()
                }
                dbService.getScannedPageQueries()
                    .removeScannedPageBySpeechifyUri(itemRequiringImport.speechifyUri)
            }

            is ItemRequiringImport.UrlImport -> {
                if (itemRequiringImport.primaryFileBlobStorageKey != null) {
                    blobStorageAdapter.coDeleteBlob(itemRequiringImport.primaryFileBlobStorageKey).orThrow()
                }
            }
        }
    }

    /**
     * Asynchronously retrieves all scanned page files associated with a given Speechify URI from the database.
     * Returns a Deferred that will resolve to a list of scanned pages when awaited.
     * This is useful for lazy loading of scanned pages data without blocking the calling thread.
     * This does not retrieve the OCR results for the scanned pages.
     * @see [ImportService.getScannedPagesFor] for retrieving OCR results.
     */
    private fun getScannedPageFilesDeferred(speechifyUri: SpeechifyURI): Deferred<List<DbScannedPageFile>> =
        launchAsync {
            dbService.getScannedPageQueries()
                .getScannedPagesBySpeechifyUri(speechifyUri, this@ImportService::mapScannedPageToDbScannedPageFile)
                .awaitAsList()
        }

    private suspend fun updateAttemptsCountAndErrorInPendingImport(
        itemRequiringImport: ItemRequiringImport,
        error: Throwable,
    ) {
        val wasConnectionError = error.isCausedByConnectionError()
        val attemptsPerformedCount = if (wasConnectionError) {
            // For connection errors we don't increment the attempt count since we want to retry indefinitely.
            itemRequiringImport.attemptsPerformedCount
        } else {
            itemRequiringImport.attemptsPerformedCount + 1
        }

        dbService.getPendingImportQueries().updateAttemptsPerformedCount(
            attemptsPerformedCount = attemptsPerformedCount,
            lastErrorStackTrace = error.stackTraceToString(),
            speechifyUri = itemRequiringImport.speechifyUri,
            wasLastErrorConnectionError = DbBoolean(wasConnectionError),
            lastUpdatedAt = DateTime.now(),
        )
    }

    /**
     * Creates a new Speechify URI and starts the creation of the initializing library item.
     * Only call if the user intends to make an import otherwise we will pollute the library with
     * needless items.
     */
    internal fun createSpeechifyUriWithInitializingLibraryItem(
        recordType: RecordType,
        speechifyEntityType: SpeechifyEntityType,
        importOptions: ImportOptions?,
    ): SpeechifyUriWithInitializer {
        val speechifyUri = SpeechifyURI.generate(
            speechifyEntityType,
        )

        /**
         * We initiate the creation of the 'placeholder library item' in the background to prevent
         * any delay with other processing.
         * This typically results in a delay of half a second for the listening bundle, even with
         * a strong internet connection.
         *
         * When offline this blocks forever so should only be awaited on once anything that can happen offline is done.
         */
        val finalizedSpeechifyUri = launchAsync {
            createInitializingItem(
                speechifyUri,
                importOptions,
                recordType = recordType,
                mergeIfAlreadyExists = false,
            )

            speechifyUri
        }

        return SpeechifyUriWithInitializer(speechifyUri, finalizedSpeechifyUri)
    }

    /**
     * Returns a flow that is updated whenever a pending import is added or removed.
     */
    internal suspend fun observeListenableImports(
        userId: UserId,
        folderQuery: FolderQuery,
    ): Flow<List<ItemRequiringImport>> {
        val pendingImportQueriesNoNull = dbService.getPendingImportQueriesForOptionalAction()
            /**
             * Return empty flow if SDK-consumer doesn't support database, as there were no pending imports saved.
             */
            ?: return emptyFlow()
        return pendingImportQueriesNoNull.getListenableImportsWithLocalListeningProgress(
            owner = userId,
            mapper = this::mapPendingImportWithListeningProgress,
        )
            .asFlow()
            .map {
                // We make sure that we only return imports that match the folder query.
                it.awaitAsList().filter { import ->
                    when (folderQuery) {
                        FolderQuery.All -> true
                        is FolderQuery.Only -> folderQuery.ref.id == import.importOptions?.parentFolder?.id
                    }
                }.map { pendingImport ->
                    getItemRequiringImportFromPendingImport(pendingImport)
                }
            }
            .shareIn(this.scope, SharingStarted.Eagerly, replay = 1)
    }

    internal suspend fun observePendingLocalItem(libraryItemId: String, userId: UserId): Flow<ItemRequiringImport?> {
        val pendingImportQueriesNoNull = dbService.getPendingImportQueriesForOptionalAction() ?: return emptyFlow()

        return pendingImportQueriesNoNull.getListenableImportsWithLocalListeningProgress(
            owner = userId,
            mapper = this::mapPendingImportWithListeningProgress,
        )
            .asFlow()
            .map {
                it.awaitAsList().filter { it.speechifyUri.id == libraryItemId }
                    .map { pendingImport ->
                        getItemRequiringImportFromPendingImport(pendingImport)
                    }.firstOrNull()
            }
            .shareIn(this.scope, SharingStarted.Eagerly, replay = 1)
    }

    /**
     * Gets the currently pending import with the given URI or null if no such import exists.
     * With a null value the item may still exist in the users remote Speechify library.
     */
    internal suspend fun getPendingImportFromUri(speechifyUri: SpeechifyURI): ItemRequiringImport? {
        return dbService.getPendingImportQueries().getImportWithLocalListeningProgress(
            speechifyUri = speechifyUri,
            mapper = this::mapPendingImportWithListeningProgress,
        ).awaitAsOneOrNull()
            ?.let { getItemRequiringImportFromPendingImport(it) }
    }

    /**
     * Gets a specific scanned page for a given Speechify URI and page index from the database.
     * Returns null if no such page exists in the database.
     */
    internal suspend fun getScannedPagesFor(speechifyUri: SpeechifyURI, index: Int): DbOcrFile? {
        return dbService.getScannedPageQueries().getScannedPageBySpeechifyUriAndPageIndex(
            speechifyUri = speechifyUri,
            pageIndex = index,
            mapper = this::mapScannedPageToDbOcrFile,
        ).executeAsOneOrNull()
    }

    /**
     * Utilized as a mapping mechanism to transform database scanned page fields into a [DbOcrFile] due to
     * SqlDelight's requirement for explicit mapping of query results to model objects.
     */
    private fun mapScannedPageToDbOcrFile(
        speechifyUri: SpeechifyURI,
        primaryFileBlobStorageKey: BlobStorageKey,
        ocrResult: OCRResult?,
        index: Int,
    ) = DbOcrFile(
        localStorageKey = primaryFileBlobStorageKey,
        ocrResult = ocrResult,
    )

    /**
     * Utilized as a mapping mechanism to transform database scanned page fields into a [DbScannedPageFile]
     */
    private fun mapScannedPageToDbScannedPageFile(
        speechifyUri: SpeechifyURI,
        primaryFileBlobStorageKey: BlobStorageKey,
        index: Int,
    ) = DbScannedPageFile(
        localStorageKey = primaryFileBlobStorageKey,
        pageIndex = index,
        speechifyURI = speechifyUri,
    )

    /**
     * Utilized as a mapping mechanism to associate the [LocalListeningProgress] with the [DBPendingImport] due
     * to the absence of model relationship support in SqlDelight.
     */
    private fun mapPendingImportWithListeningProgress(
        speechifyUri: SpeechifyURI,
        primaryFileBlobStorageKey: BlobStorageKey?,
        scannedPages: List<DbOcrFile>?,
        sourceURL: String?,
        importOptions: ImportOptions?,
        htmlContentLoadOptions: HtmlContentLoadOptions?,
        lastErrorStackTrace: String?,
        wasLastErrorConnectionError: DbBoolean?,
        importType: ImportType,
        attemptsPerformedCount: Int,
        owner: String,
        lastUpdatedAt: DateTime,
        listeningProgress: ListeningProgress?,
        mimeType: MimeType?,
        localListeningProgress: ListeningProgress?,
    ): PendingImport = PendingImport(
        speechifyUri = speechifyUri,
        primaryFileBlobStorageKey = primaryFileBlobStorageKey,
        scannedPages = scannedPages,
        sourceURL = sourceURL,
        importOptions = importOptions,
        htmlContentLoadOptions = htmlContentLoadOptions,
        lastErrorStackTrace = lastErrorStackTrace,
        wasLastErrorConnectionError = wasLastErrorConnectionError,
        importType = importType,
        attemptsPerformedCount = attemptsPerformedCount,
        owner = owner,
        lastUpdatedAt = lastUpdatedAt,
        listeningProgress = localListeningProgress ?: listeningProgress,
        mimeType = mimeType,
    )

    /**
     * Gets the currently pending import with the given ID or null if no such import exists.
     * Prefer using [getPendingImportFromUri] if possible, since this implementation is less efficient due to the fuzzy
     * string matching happening in the DB, whereas [getPendingImportFromUri] uses the primary key for a direct lookup.
     */
    internal suspend fun getPendingImportFromId(id: SpeechifyContentId): ItemRequiringImport? {
        return dbService.getPendingImportQueries().getImportEndingWithIdWithLocalListeningProgress(
            id = id,
            mapper = this::mapPendingImportWithListeningProgress,
        ).awaitAsOneOrNull()?.let { getItemRequiringImportFromPendingImport(it) }
    }

    internal suspend fun insertOrUpdateLocalListeningProgress(
        uri: SpeechifyURI,
        listeningProgress: ListeningProgress?,
    ): Result<Unit> {
        return dbService.getLocalListeningProgressQueries().insertOrReplaceListeningProgress(
            LocalListeningProgress(
                speechifyUri = uri,
                listeningProgress = listeningProgress,
                lastUpdatedTime = DateTime.now().asSeconds().toLong(),
            ),
        ).successfully()
    }

    internal suspend fun getLocalListeningProgress(speechifyUri: SpeechifyURI): LocalListeningProgress? {
        return dbService.getLocalListeningProgressQueries().getLocalListeningProgress(speechifyUri).awaitAsOneOrNull()
    }

    private suspend fun getItemRequiringImportFromPendingImport(pendingImport: PendingImport): ItemRequiringImport {
        return ItemRequiringImport.fromDb(
            pendingImport = pendingImport,
            isImportCurrentlyRunning = isImportRunningForUri(pendingImport.speechifyUri),
            obtainScannedPageFiles = getScannedPageFilesDeferred(pendingImport.speechifyUri)::await,
        )
    }

    private suspend fun isImportRunningForUri(uri: SpeechifyURI): Boolean {
        val runningImport = currentlyRunningImports[uri] ?: return false
        val currentStatus = runningImport.sharedFlowThatFinishes.firstOrNull() ?: return false
        return when (currentStatus) {
            is ImportProgress.Finished -> false
            is ImportProgress.Canceled -> false
            is ImportProgress.Started -> true
        }
    }

    internal suspend fun updatePendingImport(
        itemRequiringImport: ItemRequiringImport,
        patch: UpdateLibraryItemParams,
    ) {
        // Simply updating the DB entry here is safe because we have a final step after the firestore records are
        // created where we re-read the pending import from the DB and apply any changes the user made.
        if (patch.title != null) {
            val newOptions = (itemRequiringImport.importOptions ?: ImportOptions()).copy(title = patch.title)
            dbService.getPendingImportQueries().updateImportOptions(
                newOptions,
                itemRequiringImport.speechifyUri,
            )
        }

        if (patch.listeningProgress != null) {
            insertOrUpdateLocalListeningProgress(itemRequiringImport.speechifyUri, patch.listeningProgress)
        }
    }

    companion object {
        const val MAXIMUM_ATTEMPTS = 5

        internal fun willAutomaticallyRetryImport(itemRequiringImport: ItemRequiringImport) =
            itemRequiringImport.attemptsPerformedCount < MAXIMUM_ATTEMPTS
    }
}

internal data class SpeechifyUriWithInitializer(
    val unconfirmedSpeechifyUri: SpeechifyURI,
    /**
     * The initializer needs to be awaited on before any other updates to Firestore are made.
     */
    val finalizedSpeechifyUri: Deferred<SpeechifyURI>,
)

internal sealed class ImportProgress {
    abstract val uri: SpeechifyURI

    /**
     * Emitted when the import is started with the [ItemRequiringImport] that is being imported.
     */
    data class Started(val itemRequiringImport: ItemRequiringImport) : ImportProgress() {
        override val uri: SpeechifyURI = itemRequiringImport.speechifyUri
    }

    /**
     * Emitted when the import is complete with the final URI in the users library.
     */
    data class Finished(override val uri: SpeechifyURI) : ImportProgress()

    /**
     * Emitted when the import is canceled.
     */
    data class Canceled(override val uri: SpeechifyURI) : ImportProgress()
}
