package com.speechify.client.api.util

import com.speechify.client.api.adapters.firebase.DocumentChange
import com.speechify.client.api.adapters.firebase.DocumentChangeType
import com.speechify.client.api.adapters.firebase.DocumentQueryBuilder
import com.speechify.client.api.adapters.firebase.DocumentQueryBuilder.BoundType
import com.speechify.client.api.adapters.firebase.FirebaseFirestoreDocumentSnapshot
import com.speechify.client.api.adapters.firebase.FirebaseFirestoreQuerySnapshot
import com.speechify.client.api.adapters.firebase.HasSnapshotRef
import com.speechify.client.api.adapters.firebase.HasUri
import com.speechify.client.api.diagnostics.DiagnosticEvent
import com.speechify.client.api.diagnostics.Log
import com.speechify.client.api.util.boundary.BoundaryPair
import com.speechify.client.internal.WithScope
import com.speechify.client.internal.launchTask
import com.speechify.client.internal.sync.WrappingMutex
import com.speechify.client.internal.toDestructor
import com.speechify.client.internal.util.extensions.collections.replaceWith
import com.speechify.client.internal.util.extensions.collections.sendToUnlimited
import com.speechify.client.internal.util.extensions.collections.toTypedArraySeedingTypeFrom
import com.speechify.client.internal.util.extensions.throwable.addCustomProperty
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.asSharedFlow
import kotlinx.coroutines.flow.combine
import kotlinx.coroutines.flow.firstOrNull
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Semaphore
import kotlinx.coroutines.sync.withPermit
import kotlin.js.JsExport
import kotlin.math.max

private data class Page<T : HasSnapshotRef>(
    val items: MutableList<T>,
    val dtor: Destructor,
)

internal typealias PredicateAsync<T> = suspend (T) -> Boolean

@JsExport
interface ILiveQueryView<T> : Destructible where T : HasSnapshotRef, T : HasUri {
    /**
     * Collect the current items from this view.
     */
    fun getCurrentItems(callback: (Array<T>) -> Unit)

    /**
     * Extend the view in place.
     *
     * @param callback called when the extension finishes running, providing a reference to this
     * same view, for convenience. The reference can be null, indicating that no new items were
     * found.
     */
    fun loadMoreItems(callback: Callback<ILiveQueryView<T>?>)

    /**
     * Register a listener to be called when the view changes.
     *
     * ### Note:
     * This is will not be called when an extension requested through [loadMoreItems] finishes
     *
     * @return a callback used to unregister the listener
     */
    fun addChangeListener(callback: Callback<ILiveQueryView<T>>): Destructor
}

/**
 * A self updating view of records in firebase.
 */
@JsExport
class LiveQueryView<T> private constructor(
    // you need a starter array to produce the public array copy. JVM things
    private val baseArray: Array<T>, // always empty
    private val itemsPerPage: Int,
    private val query: DocumentQueryBuilder,
    private val transformer: suspend (FirebaseFirestoreDocumentSnapshot.Exists) -> Result<T>,
    private val outputItemFilter: PredicateAsync<T>?,
    /**
     * Additional items that come from a local source and not firestore.
     * They will be merged with the firestore items and sorted by the local comparator.
     * If multiple items have the same URI the local items will take precedence.
     */
    private val localItemsFlow: Flow<List<T>>?,
    /**
     * If you include local items you need to provide a local comparator to sort them since we can't rely on Firestore
     * to sort them for us.
     * This will be used to sort the final output array.
     */
    private val localComparator: Comparator<T>?,
) : WithScope(), ILiveQueryView<T> where T : HasSnapshotRef, T : HasUri {
    // There a couple of invariants that need to be always upheld for this to work
    //  1. For every page except the last:
    //      1. No pages are ever empty. This means that `items.size > 0`. If the last item of a page
    //          is removed, the page is replaced with a `null` reference in the page list.
    //      2. When an item is added or removed from a page, its firebase connection range is
    //          updated to reflect the new page size.
    //  2. For the last page:
    //      1. It can't be replaced with `null` when `items.size == 0`.
    //      2. When it shrinks below `lastPageSize` its connection is not updated.
    //      3. When it grows past `lastPageSize`, its connection and `lastPageSize` are updated.
    //      4. When a new page is added to the back, `lastPageSize` is reset to `itemsPerPage`.
    //      5. When the last item of the previous page changes, the limit queried stays
    //         at `lastPageSize`.
    //  3. In general:
    //      1. To iterate over pages the functions `nextPage` and `previousPage` must always be used
    //          (this is to skip over the null pages)
    //      2. For as long as the last page is not full, no new pages will ever be created.
    //
    // This special treatment of the last page is an optimization, whereas every page is
    // updated (connection dropped and a new one created) every time an item is added or removed
    // from it. The last page does not need to do this, it only needs to update when it grows past
    // its max size, which is tracked in the `lastPageSize` variable.
    private val pages: WrappingMutex<MutableList<Page<T>?>> = WrappingMutex.of(mutableListOf())
    private val updateFlow: MutableSharedFlow<LiveQueryView<T>> = MutableSharedFlow()
    private var lastPageSize: Int = itemsPerPage
    private val changeChannel: Channel<Pair<Int, Array<DocumentChange>>> = Channel(
        capacity = Channel.UNLIMITED,
    )

    init {
        if (itemsPerPage <= 0) {
            throw IllegalArgumentException("itemsPerPage must be greater than 0 but was $itemsPerPage")
        }

        if (localItemsFlow != null && localComparator == null) {
            throw IllegalArgumentException("You must provide a localComparator when using localItemsFlow.")
        }
    }

    companion object {
        /**
         * Create a new empty live view, this live view can be populated using loadMoreItems
         */
        internal inline fun <reified T> empty(
            itemsPerPage: Int,
            query: DocumentQueryBuilder,
            noinline transformer: suspend (FirebaseFirestoreDocumentSnapshot.Exists) -> Result<T>,
            noinline outputItemFilter: PredicateAsync<T>? = null,
            localItemsFlow: Flow<List<T>>? = null,
            localComparator: Comparator<T>? = null,
        ) where T : HasSnapshotRef, T : HasUri = LiveQueryView(
            baseArray = emptyArray(),
            itemsPerPage = itemsPerPage,
            query = query,
            transformer = transformer,
            outputItemFilter = outputItemFilter,
            localItemsFlow = localItemsFlow,
            localComparator = localComparator,
        )

        private const val MAXIMUM_PARALLEL_TRANSFORMS = 50
    }

    /**
     * Collect the current items from this view.
     */
    override fun getCurrentItems(callback: (Array<T>) -> Unit) = callback.fromCoNoError(scope = scope) {
        val localItems = localItemsFlow?.firstOrNull() ?: emptyList()
        pages.locked { pages ->
            val remoteItems = localItems + pages
                .asSequence()
                .mapNotNull { it?.items }
                .flatMap { it.asSequence() }

            // Remove all items from the remote items that we have locally since local items take precedence.
            val remoteMinusLocalItems = remoteItems.filterNot { remoteItem ->
                localItems.any { localItem -> localItem.uri.id == remoteItem.uri.id }
            }

            val allItems = localItems + remoteMinusLocalItems

            val outputItemFilter = outputItemFilter
            if (outputItemFilter != null) {
                val filteredItems = allItems.map { it to async { outputItemFilter(it) } }
                    .toList()
                    .filter { (_, filter) -> filter.await() }
                    .map { (item, _) -> item }
                filteredItems.toTypedArraySeedingTypeFrom(baseArray)
            } else {
                allItems.toList().toTypedArraySeedingTypeFrom(baseArray)
            }.let {
                // Normally Firestore already sorts the items for us, but since we are adding local items
                // we need to sort them ourselves.
                if (localComparator != null) {
                    // Performs an in-place sort of the array.
                    it.sortWith(localComparator)
                }

                it
            }
        }
    }

    /**
     * Extend the view in place.
     *
     * @param callback called when the extension finishes running, providing a reference to this
     * same view, for convenience. The reference can be null, indicating that no new items were
     * found.
     */
    override fun loadMoreItems(callback: Callback<ILiveQueryView<T>?>) {
        scope.launch {
            pages.locked { pages ->
                val lastPageItems = pages.lastOrNull()
                // if the last connection has yet to be filled up
                if (lastPageItems != null && lastPageItems.items.size < lastPageSize) { // (3.2)
                    callback(Result.Success(null))
                    return@locked
                }
                val observeParams = query.queryDto().let { dto ->
                    @Suppress("EmptyRange")
                    when (val last = lastPageItems?.items?.last()) {
                        null -> dto.copy(snapshotBounds = dto.snapshotBounds.sliceArray(0 until 0))
                        else -> dto.copy(
                            snapshotBounds = arrayOf(BoundaryPair.from(last.snapshotRef to BoundType.StartAfter)),
                        )
                    }
                }
                val items = ArrayList<T>(itemsPerPage)
                val dtor = query.observeImpl(
                    observeParams,
                    createCallbackForHandlingSnapshots(
                        pageIndex = pages.size,
                        listForFirstState = items,
                        extraCallbackForFirstState = callback,
                    )
                        .catchingErrors(
                            sourceAreaId = "LiveQueryView.loadMoreItems",
                            shouldIgnoreCancellationExceptions =
                            /** `false` for now to preserve existing behavior. If this ever logs a [kotlinx.coroutines.CancellationException],
                             * try to understand if this is a consistency corruption of user-perceived state. If not,
                             * change it to `true`.
                             */
                            false,
                            properties = mapOf("pageIndex" to pages.size.toString()),
                        ),
                )
                lastPageSize = itemsPerPage // (2.4)
                pages.add(Page(items, dtor))
            }
        }
    }

    /**
     * Register a listener to be called when the view changes.
     *
     * ### Note:
     * This is will not be called when an extension requested through [loadMoreItems] finishes
     *
     * @return a callback used to unregister the listener
     */
    override fun addChangeListener(callback: Callback<ILiveQueryView<T>>): Destructor {
        val combinedFlow = if (localItemsFlow != null) {
            updateFlow.asSharedFlow()
                // We don't care for the local items here, only that they changed.
                // They will be combined with the remote items in `getCurrentItems`.
                .combine(localItemsFlow) { view, _ -> view }
        } else {
            updateFlow.asSharedFlow()
        }
        return combinedFlow
            .onEach { callback(it.successfully()) }
            .launchIn(scope)
            .toDestructor()
    }

    /**
     * Destroy all firebase connections and cancel all listeners.
     *
     * After this the view will be empty.
     */
    override fun destroy() {
        super.destroy() /* Disposes of the `scope`, to gracefully cancel both producers and listeners. */
        changeChannel.cancel() /** Use `cancel()`, not `close()`, so that producers also get a
         [kotlinx.coroutines.CancellationException] that gets ignored in reporting to devs, and not a [kotlinx.coroutines.channels.ClosedSendChannelException].
         */
        launchTask { /* Need to use a new scope, as the `scope` has just been cancelled. Likely this can be refactored
         out either not to be needed - destructors should be linked to `scope`, ideally by observations happening via
         `Flows` (`callbackFlow`s over the Adapters). */
            pages.locked { pages ->
                pages.forEach { it?.dtor?.invoke() }
                pages.clear()
            }
        }
    }

    init {
        scope.launch {
            changeChannel.consumeEach {
                val pageBeforeProcessing = pages.locked { pages ->
                    pages[it.first]
                }?.items?.toList()
                try {
                    handleChanges(
                        page = it.first,
                        changes = it.second,
                    )
                } catch (e: Throwable) {
                    e.addCustomProperty("changeSet_processed", it.second.map { change -> change.stringForTelemetry() })
                    e.addCustomProperty("pageContent", pageBeforeProcessing ?: "no content")
                    e.addCustomProperty("pageIndex_processed", it.first)
                    Log.e(
                        DiagnosticEvent(
                            message = "Failed to handle live query view change.",
                            sourceAreaId = "LiveQueryView.changeChannel",
                            nativeError = e,
                        ),
                    )
                    throw e
                }
            }
        }
    }

    private suspend fun handleChanges(
        page: Int,
        changes: Array<DocumentChange>,
    ) {
        // this is a very long critical region, on top of that it calls suspending functions (transformer),
        // which don't have predictable performance characteristics. On the other hand this has worked for
        // a few months now, and currently I don't have capacity to change this to make it shorter but still
        // correct. So I'm leaving it like this for now.
        pages.locked { pages ->
            val conn = pages[page] ?: return@locked
            val itemsAtOldIndexes = changes.mapNotNull { c ->
                conn.items.getOrNull(c.oldIndex)?.let { it to c.oldIndex }
            }
            val translateOldIndex = { change: DocumentChange ->
                val item = itemsAtOldIndexes
                    .find { it.second == change.oldIndex }
                    ?.first
                conn.items.indexOfFirst { it === item }
            }
            val originalLastItem = conn.items.lastOrNull()
            val originalLength = conn.items.size
            for (change in changes) {
                when (change.type) {
                    DocumentChangeType.Added -> {
                        val newItem = when (val i = transformer(change.doc)) {
                            is Result.Success -> i.value
                            is Result.Failure -> {
                                Log.e(
                                    error = i.error,
                                    sourceAreaId = "LiveQueryView.handleChanges(DocumentChangeType.Added)",
                                )
                                continue
                            }
                        }
                        conn.items.add(
                            // the size of `conn.items` might have changed in this loop, so we should
                            // be careful to avoid out of bounds insertion
                            change.newIndex.coerceAtMost(conn.items.size),
                            newItem,
                        )
                    }
                    DocumentChangeType.Removed -> {
                        val index = translateOldIndex(change)
                        if (index == -1) continue
                        conn.items.removeAt(index)
                    }
                    DocumentChangeType.Modified -> {
                        val index = translateOldIndex(change)
                        if (index == -1) continue
                        val newItem = when (val i = transformer(change.doc)) {
                            is Result.Success -> i.value
                            is Result.Failure -> {
                                Log.e(
                                    error = i.error,
                                    sourceAreaId = "LiveQueryView.handleChanges(DocumentChangeType.Modified)",
                                )
                                continue
                            }
                        }
                        if (change.oldIndex == change.newIndex) {
                            conn.items[index] = newItem
                        } else {
                            conn.items.removeAt(index)
                            conn.items.add(
                                // see similar comment above
                                change.newIndex.coerceAtMost(conn.items.size),
                                newItem,
                            )
                        }
                    }
                }
            }

            // uphold invariant 1.1. and 2.1.
            if (conn.items.isEmpty() && page != pages.lastIndex) {
                // empty pages are disconnected
                conn.dtor()
                pages[page] = null
            } else if (
                originalLength != conn.items.size && // length changed (1.2)

                conn.items.isNotEmpty() && // and it didn't go to 0 (1.1)

                // and this is either:
                (
                    page != pages.lastIndex || // not the last page or (2.2)
                        conn.items.size > lastPageSize
                    ) // it is last page and has grown past its current max items (2.3)
            ) {
                pages.updatePage(page)
            }

            if (originalLastItem !== conn.items.lastOrNull()) {
                val nextPageIdx = pages.nextPage(page) // (3.1)
                if (nextPageIdx != null) {
                    pages.updatePage(nextPageIdx)
                }
            }
            updateFlow.emit(this@LiveQueryView)
        }
    }

    private fun MutableList<Page<T>?>.updatePage(page: Int) {
        val oldPage = this[page] ?: throw NullPointerException("Page $page was null.").apply {
            addCustomProperty(
                "pageState",
                this@updatePage.withIndex().joinToString { (i, p) ->
                    if (p != null) "Page $i" else "null $i"
                },
            )
        }
        val newSize = if (page == this.lastIndex) {
            lastPageSize = max(lastPageSize, oldPage.items.size) // (2.3)
            lastPageSize // (2.5)
        } else {
            oldPage.items.size
        }

        if (newSize <= 0) {
            throw IllegalStateException("newSize should be greater than 0 but was $newSize.").apply {
                addCustomProperty("pageIndex", page)
                addCustomProperty(
                    "pages",
                    this@updatePage.map { if (it === null) "nullPage" else "Page(${it.items.size})" },
                )
                addCustomProperty("lastPageSize", lastPageSize)
                addCustomProperty("oldPage", oldPage.items.size)
                addCustomProperty("itemsPerPage", itemsPerPage)
            }
        }

        oldPage.dtor()
        val dto = query.queryDto()
        val observeParams = when (val prevPage = this.previousPage(page)?.let { this[it] }) { // (3.1)
            null -> dto.copy(snapshotBounds = emptyArray(), limit = newSize)
            else -> dto.copy(
                snapshotBounds = arrayOf(
                    BoundaryPair.from(prevPage.items.last().snapshotRef to BoundType.StartAfter),
                ),
                limit = newSize,
            )
        }
        this[page] = Page(
            oldPage.items,
            query.observeImpl(
                observeParams,
                createCallbackForHandlingSnapshots(
                    pageIndex = page,
                    listForFirstState = oldPage.items,
                )
                    .catchingErrors(
                        sourceAreaId = "LiveQueryView.updatePage",
                        shouldIgnoreCancellationExceptions =
                        /** `false` for now to preserve existing behavior. If this ever logs a [kotlinx.coroutines.CancellationException],
                         * try to understand if this is a consistency corruption of user-perceived state. If not,
                         * change it to `true`.
                         */
                        false,
                        properties = mapOf("page" to page),
                    ),
            ),
        )
    }

    private fun List<Page<T>?>.previousPage(from: Int): Int? {
        for (p in (from - 1) downTo 0) {
            if (this[p] != null) return p
        }
        return null
    }

    private fun List<Page<T>?>.nextPage(from: Int): Int? {
        for (p in (from + 1) until this.size) {
            if (this[p] != null) return p
        }
        return null
    }

    private fun createCallbackForHandlingSnapshots(
        pageIndex: Int,
        listForFirstState: MutableList<T>,
        extraCallbackForFirstState: Callback<LiveQueryView<T>>? = null,
    ): Callback<FirebaseFirestoreQuerySnapshot> {
        var first = true
        return { changeSet ->
            if (first) {
                first = false
                when (changeSet) {
                    is Result.Success -> this@LiveQueryView.scope.launch { /* Use the root `this@LiveQueryView.scope`,
                        - linking it with the caller's context leads to freezing - user library items never loading on
                        refresh, also possibly same freeze is in `LiveQueryViewTest`, though the tests start their own
                        `coroutineScope`s, which may be a separate cause. Probably needs a bigger refactor to
                        make it clear why this root scope is needed.
                        */
                        val maximumParallelTransformsSemaphore = Semaphore(MAXIMUM_PARALLEL_TRANSFORMS)
                        listForFirstState.replaceWith(
                            changeSet.value
                                .docChanges(includeMetadataChanges = false)
                                .asFlow()
                                .map {
                                    this.async {
                                        maximumParallelTransformsSemaphore.withPermit {
                                            transformer(it.doc)
                                                .toNullable(
                                                    log = { e ->
                                                        Log.e(
                                                            DiagnosticEvent(
                                                                sourceAreaId = "LiveQueryView.setItems",
                                                                message = "Error mapping an item-change-snapshot" +
                                                                    " to an item." +
                                                                    " The item will be skipped.",
                                                                nativeError = e.toException(),
                                                                properties = mapOf(
                                                                    "doc.key" to it.doc.key,
                                                                    "doc.type" to it.type.name,
                                                                ),
                                                            ),
                                                        )
                                                    },
                                                )
                                        }
                                    }
                                }
                                .toList()
                                .awaitAll()
                                .mapNotNull {
                                    it
                                }
                                .toList(),
                        )
                        extraCallbackForFirstState?.invoke(this@LiveQueryView.successfully())
                    }
                    is Result.Failure -> extraCallbackForFirstState?.invoke(changeSet)
                }
            } else if (changeSet is Result.Success) {
                try {
                    changeChannel.sendToUnlimited(
                        pageIndex to changeSet.value.docChanges(includeMetadataChanges = false),
                    )
                } catch (e: Throwable) {
                    // We are observing crashes in this call, but we don't know what is causing them.
                    e.addCustomProperty(
                        "changeSet_sent",
                        changeSet.value.docChanges(false)
                            .map { it.stringForTelemetry() },
                    )
                    throw e
                }
            }
        }
    }
}
