package com.speechify.client.bundlers.reading.importing

import com.speechify.client.api.SpeechifyURI
import com.speechify.client.api.adapters.firebase.DataSource
import com.speechify.client.api.diagnostics.Log
import com.speechify.client.api.services.importing.ImportProgress
import com.speechify.client.api.services.importing.SpeechifyUriWithInitializer
import com.speechify.client.api.services.importing.models.ImportOptions
import com.speechify.client.api.services.library.LibraryServiceDelegate
import com.speechify.client.api.services.library.getContentLibraryItemFromFirestore
import com.speechify.client.api.services.library.models.LibraryItem
import com.speechify.client.api.telemetry.withTelemetry
import com.speechify.client.api.util.Result
import com.speechify.client.api.util.SDKError
import com.speechify.client.api.util.successfully
import com.speechify.client.internal.createTopLevelCoroutineScope
import com.speechify.client.internal.launchTask
import com.speechify.client.internal.sync.AtomicRef
import com.speechify.client.internal.sync.SingleJobMutexByCancelling
import com.speechify.client.internal.sync.set
import com.speechify.client.internal.util.extensions.collections.flows.SharedFlowWithOwningJob
import com.speechify.client.internal.util.extensions.collections.sendToUnlimited
import com.speechify.client.internal.util.extensions.coroutines.createChildSupervisorScope
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.NonCancellable
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.withContext

internal class ContentImporterImpl(
    /**
     * The `placeholderItemURI` serves its purpose in updating the library's placeholder item once the upload or import
     * process has been completed.
     */
    private val placeholderItemURI: SpeechifyUriWithInitializer?,
    private val existingLocalItem: LibraryItem.DeviceLocalContent?,
    private val startImportAction: StartImportAction,
    private val libraryServiceDelegate: LibraryServiceDelegate,
) : ContentImporter() {
    override val stateFlow = MutableStateFlow<ContentImporterState?>(
        if (existingLocalItem == null) {
            ContentImporterState.NotImported(null)
        } else {
            ContentImporterState.NotImported(existingLocalItem)
        },
    )
    private val afterImportTasks: Channel<suspend (uri: SpeechifyURI) -> Unit> = Channel(capacity = Channel.UNLIMITED)

    /**
     * Starts a non-blocking import of the [file] behind the bundle.
     *
     * Recommended the caller register a listener using [addStateChangeListener] to receive updates on progress state
     * and/or check [state]
     */
    override suspend fun startImport(
        options: ImportOptions,
    ): Result<SpeechifyURI> = withTelemetry(
        telemetryEventName = "ContentImporterImpl.startImport",
    ) {
        val capturedState = stateFlow.value
        if ((capturedState != null && capturedState !is ContentImporterState.NotImported) ||
            !stateFlow.compareAndSet(capturedState, ContentImporterState.Starting(existingLocalItem))
        ) {
            // client has most likely re-tried import, and it is already in-progress or succeeded state
            return@withTelemetry Result.Failure(
                SDKError.OtherMessage("Attempted to import while already $capturedState"),
            ).also {
                Log.e(failure = it, sourceAreaId = "ContentImporterImpl.startImport")
            }
        }

        val uploadJob = startImportAction.invoke(options, placeholderItemURI)
        try {
            val importFlow = uploadJob.sharedFlowThatFinishes
            return@withTelemetry importFlow
                .distinctUntilChanged()
                .onEach { progress ->
                    when (progress) {
                        is ImportProgress.Finished -> {
                            val url = progress.uri
                            editsSaveAction.value?.invoke(url)
                            while (true) {
                                val pendingTask = afterImportTasks.tryReceive().getOrNull() ?: break
                                pendingTask(url)
                            }

                            // Fetch the now existent library item so clients have access to it.
                            val libraryItem = libraryServiceDelegate.getContentLibraryItemFromFirestore(
                                id = url.id,
                                dataSource =
                                /**
                                 * Explicitly setting [DataSource.DEFAULT] because it has been encountered that the
                                 * cache got cleared during the import process (happened [here in WEB-2539](https://linear.app/speechify-inc/issue/WEB-2539/use-resourcenotfound-sdk-error-when-file-in-unavailable-in-cache#comment-f0f1466f)
                                 * - apparently, in Web App, cache clearing occurs, e.g. when user navigates in the library view)
                                 */
                                DataSource.DEFAULT,
                            )
                            stateFlow.emit(ContentImporterState.ImportedToLibrary(url, libraryItem))
                        }
                        is ImportProgress.Canceled -> {
                            stateFlow.emit(ContentImporterState.NotImported(null))
                        }
                        is ImportProgress.Started -> {
                            stateFlow.emit(
                                ContentImporterState.Importing(
                                    options,
                                    LibraryItem.DeviceLocalContent(progress.itemRequiringImport),
                                ),
                            )
                        }
                    }
                }
                .toList().last().uri.successfully()
        } catch (e: Throwable) {
            val libraryItem = when (val value = stateFlow.value) {
                is ContentImporterState.Importing -> {
                    value.libraryItem
                }
                else -> null
            }
            uploadJob.owningJob.cancel()
            if (libraryItem != null) {
                // Since the original StandaloneCoroutine was cancelled, deletion needs to be launched in a new scope
                launchTask {
                    libraryServiceDelegate.deleteItem(libraryItem.uri.id)
                }
            }
            stateFlow.emit(ContentImporterState.NotImported(libraryItem))
            throw e
        }
    }

    private val scope = createTopLevelCoroutineScope()

    private val editsSaveAction: AtomicRef<(suspend (uri: SpeechifyURI) -> Unit)?> = AtomicRef(null)

    private val mutex = SingleJobMutexByCancelling()
    override fun setEditsSaveAction(saveEditsAction: suspend (uri: SpeechifyURI) -> Unit): Deferred<SpeechifyURI> {
        var actioned = false
        editsSaveAction.set(
            newValue = { url ->
                try {
                    saveEditsAction(url)
                } finally {
                    actioned = true
                }
            },
        )

        val editsSavedDeferred = CompletableDeferred<SpeechifyURI>()

        // Support cases where the import has already started.
        mutex.replaceWithNewJobNoWaitForCancelIn(
            scope,
            block = {
                try {
                    val uri = suspendUntilImportedGetUri()
                    if (!actioned) {
                        saveEditsAction(uri)
                    }
                    editsSavedDeferred.complete(uri)
                } catch (e: Throwable) {
                    when (e) {
                        is CancellationException -> {
                            editsSavedDeferred.cancel(e)
                        }
                        else -> {
                            editsSavedDeferred.completeExceptionally(e)
                        }
                    }
                    throw e
                }
            },
        )

        return editsSavedDeferred
    }

    private val afterImportTaskMutex = SingleJobMutexByCancelling()
    override fun queueTaskAfterImport(task: suspend (uri: SpeechifyURI) -> Unit) {
        afterImportTasks.sendToUnlimited(task)

        // Support cases where the import has already started.
        afterImportTaskMutex.replaceWithNewJobNoWaitForCancelIn(
            scope.createChildSupervisorScope(), /* `SupervisorScope` so an exception from single task doesn't cancel the
             entire scope and prevents the other tasks. */
            block = {
                // Since this will only return once all queued tasks are processed in `coStartImport` there is
                // no risk of running anything in the afterImportTasks channel in parallel.
                val uri = suspendUntilImportedGetUri()
                withContext(
                    NonCancellable, /* `NonCancellable` prevents losing any `afterImportTasks` if another call to
                     this method occurs (`replaceWithNewJobNoWaitForCancelIn` will trigger cancel). */
                ) {
                    while (true) {
                        val pendingTask = afterImportTasks.tryReceive().getOrNull() ?: break
                        pendingTask(uri)
                    }
                }
            },
        )
    }
}

internal typealias StartImportAction = suspend (
    options: ImportOptions,
    placeholderItemURI: SpeechifyUriWithInitializer?,
) -> SharedFlowWithOwningJob<ImportProgress>
