package com.speechify.client.internal.sync

import com.speechify.client.api.util.AsyncDestructible
import com.speechify.client.internal.util.extensions.collections.flows.mapStateFlow
import com.speechify.client.internal.util.extensions.collections.flows.swap
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.Job
import kotlinx.coroutines.NonCancellable
import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext

/**
 * An abstraction to guarantee that at most one job of some kind is ever running at any time.
 */
internal class SingleJobMutexByCancelling constructor(
    initialJob: Job? = null,
) : AsyncDestructible {
    private val currentJob = MutableStateFlow(initialJob)

    val isCurrentJobRunning: Boolean
        get() = currentJob.value.let {
            it != null && !it.isCancelled && !it.isCompleted
        }

    /**
     * Replace with a new job, this guarantees that the new job is always started only after canceling the previous
     * one succeeds.
     *
     * NOTE: This function returns only after the cancellation of the previous job finishes. The one that does not
     * is [replaceWithNewJobNoWaitForCancelIn]
     *
     * If canceling the current job in this slot fails the new job is never started.
     */
    suspend fun replaceWithNewJobIn(
        /**
         * The [CoroutineScope] in which the [block] will be started (if it is started).
         */
        scope: CoroutineScope,
        /**
         * For cases where the jobs will be cancelled externally or via [cancelCurrentJobAndJoin], can be used to detect
         * cases where this has not happened.
         */
        onOldJobNotCancelled: suspend (oldJobInfo: ReadOnlyJobInfo, newJobInfo: ReadOnlyJobInfo) -> Unit = { _, _ -> },
        block: suspend CoroutineScope.() -> Unit,
    ) {
        val newJob = scope.launch(start = CoroutineStart.LAZY, block = block)
        val oldJob = currentJob.swap(newJob)
        withContext(NonCancellable) {
            if (oldJob != null && !oldJob.isCancelled) {
                onOldJobNotCancelled(ReadOnlyJobInfo(oldJob), ReadOnlyJobInfo(newJob))
            }
            try {
                oldJob?.cancelAndJoin()
            } catch (e: Throwable) {
                newJob.cancelAndJoin()
                throw e
            }
        }
        newJob.start()
    }

    /**
     * A variant of [replaceWithNewJobIn] especially for synchronous blocks of code.
     * This function returns immediately and does not wait for the cancellation of the previous job to finish.
     */
    fun replaceWithNewJobNoWaitForCancelIn(
        /**
         * The [CoroutineScope] in which the [block] will be started (if it is started).
         */
        scope: CoroutineScope,
        /**
         * See documentation on [replaceWithNewJobIn]
         */
        onOldJobNotCancelled: suspend (oldJobInfo: ReadOnlyJobInfo, newJobInfo: ReadOnlyJobInfo) -> Unit = { _, _ -> },
        block: suspend CoroutineScope.() -> Unit,
    ) {
        scope.launch {
            replaceWithNewJobIn(
                scope = scope,
                onOldJobNotCancelled = onOldJobNotCancelled,
                block = block,
            )
        }
    }

    /**
     * For implementing an 'ensure running' and 'ensure stopped' (using [cancelCurrentJob]), especially for
     * implementing this in a non-wasteful way (without starting a new job), and with well documented usage instruction.
     *
     * NOTE: While this function is thread-safe, its callers needs to consider whether to declare thread-safety (e.g. it
     * may be incorrect for side effects in the caller to ever happen in swapped order). Additionally, the
     * [cancelCurrentJob] should never be called in parallel to calling this function, or else it's not determined if
     * after the call there is, or there isn't a running job.
     *
     * @return `true` if a new job was started, `false` if there was already a job running, or it was cancelled
     * before starting.
     */
    fun ensureJobIn(
        /**
         * The [CoroutineScope] in which the [block] will be started (if it is started).
         */
        scope: CoroutineScope,
        /**
         * Note - the block may never execute if there is already a job running.
         */
        block: suspend CoroutineScope.() -> Unit,
    ): Boolean {
        val oldJob = currentJob.value
        if (oldJob != null && !oldJob.isCancelled) {
            return false
        }

        val newJob = scope.launch(start = CoroutineStart.LAZY, block = block)
        if (!currentJob.compareAndSet(
                expect = oldJob,
                update = newJob,
            )
        ) {
            newJob.cancel()
            /** Rare concurrency scenario for which this method exists - some other thread has just started a job,
             so we leave it as is - at least we saved the caller from naively cancelling it, if all this class had was
             [replaceWithNewJobNoWaitForCancelIn]. */
            return false
        }

        return newJob.start()
    }

    /**
     * @return `true` if there was any job.
     */
    suspend fun cancelCurrentJobAndJoin(): Boolean {
        val oldJob = currentJob.swap(null)
        oldJob?.cancelAndJoin()
        return oldJob != null
    }

    /**
     * Synchronous version of [cancelCurrentJobAndJoin] which doesn't wait for the cancellation to finish.
     *
     * NOTE: Calls to this should never be called in parallel to calling [ensureJobIn], or else it's not determined if
     * after the call there is, or there isn't a running job.
     *
     * @return `true` if there was a job, and it was cancelled.
     */
    fun cancelCurrentJob(): Boolean {
        val oldJob = currentJob.value /* Don't swap, so that any new Job waits for the cancellation to finish. */
        if (oldJob != null && !oldJob.isCancelled) {
            oldJob.cancel()
            return true
        } else {
            return false
        }
    }

    val currentJobInfo: StateFlow<ReadOnlyJobInfo?> =
        currentJob.mapStateFlow { it?.let(::ReadOnlyJobInfo) }

    override suspend fun destroyAndAwaitFinish() {
        cancelCurrentJobAndJoin()
    }
}
