package com.speechify.client.helpers.content.standard.dynamic

import com.speechify.client.api.content.ContentCursor
import com.speechify.client.api.content.ContentElementReference
import com.speechify.client.api.content.StandardViewWithIndex
import com.speechify.client.api.content.isBeginningOfElementOrEquivalent
import com.speechify.client.api.content.view.standard.StandardBlock
import com.speechify.client.api.content.view.standard.StandardBlocks
import com.speechify.client.api.content.view.standard.coGetBlocksAroundCursor
import com.speechify.client.api.content.view.standard.coGetBlocksBetweenCursors
import com.speechify.client.api.content.view.standard.getContentTexts
import com.speechify.client.api.util.AbortableStateAsync
import com.speechify.client.api.util.AsyncDestructible
import com.speechify.client.api.util.Callback
import com.speechify.client.api.util.Destructible
import com.speechify.client.api.util.Result
import com.speechify.client.api.util.fromCo
import com.speechify.client.api.util.successfully
import com.speechify.client.helpers.content.standard.ContentMutationsInfo
import com.speechify.client.helpers.content.standard.HasInfoOfContentLimitedLife
import com.speechify.client.helpers.content.standard.HasInfoOfEffectOfPullingContentOnUser
import com.speechify.client.helpers.content.standard.StandardBlocksFlowProvider
import com.speechify.client.helpers.content.standard.createAsAllChildren
import com.speechify.client.helpers.content.standard.createChildrenBlocksFactory
import com.speechify.client.helpers.content.standard.dropUntilBeforeCursorUnlessNoContentBefore
import com.speechify.client.helpers.content.standard.dynamic.contentProviders.ContentRequestInfo
import com.speechify.client.helpers.content.standard.dynamic.contentProviders.ContentResponseProducer
import com.speechify.client.helpers.content.standard.dynamic.contentProviders.DynamicContentProvider
import com.speechify.client.helpers.content.standard.dynamic.contentProviders.DynamicContentProviderForImmutableAlwaysLiveContent
import com.speechify.client.helpers.content.standard.dynamic.contentProviders.DynamicContentProviderForImmutableLimitedLifeContent
import com.speechify.client.helpers.content.standard.dynamic.contentProviders.DynamicContentProviderForMutableLimitedLifeContent
import com.speechify.client.helpers.content.standard.dynamic.contentProviders.MidContentStartInfo
import com.speechify.client.helpers.content.standard.dynamic.contentProviders.PointerToChunkFraction
import com.speechify.client.helpers.content.standard.dynamic.contentProviders.TryGetAliveContentResponseProducer
import com.speechify.client.helpers.content.standard.streamable.StreamableContentChunk
import com.speechify.client.helpers.content.standard.streamable.StreamableContentChunksBuilder
import com.speechify.client.helpers.content.standard.streamable.items.topLevelItems.TopLevelItem
import com.speechify.client.internal.createTopLevelCoroutineScope
import com.speechify.client.internal.sync.AtomicBool
import com.speechify.client.internal.sync.AtomicRef
import com.speechify.client.internal.sync.AtomicRefOfNullablePrimitive
import com.speechify.client.internal.sync.CoLateInit
import com.speechify.client.internal.sync.compareAndSetOrThrow
import com.speechify.client.internal.sync.swap
import com.speechify.client.internal.sync.valueInitedOrNull
import com.speechify.client.internal.util.collections.arrayOfNotNull
import com.speechify.client.internal.util.collections.flows.ExternalStateChangesFlow
import com.speechify.client.internal.util.collections.flows.GenerateFlowResult
import com.speechify.client.internal.util.collections.flows.SharedFlowThatFinishesDestructible
import com.speechify.client.internal.util.collections.flows.emitAllGeneratedByEachInScope
import com.speechify.client.internal.util.collections.flows.lazyFlowStartingAfterCancellingPrevious
import com.speechify.client.internal.util.extensions.collections.flows.sharePullingOnlyWhenNeededIn
import com.speechify.client.internal.util.extensions.collections.groupConsecutiveBy
import com.speechify.client.internal.util.extensions.coroutines.createChildSupervisorJob
import com.speechify.client.internal.util.extensions.coroutines.launchJobControlledFromScopeOnly
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.cancel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.emitAll
import kotlinx.coroutines.flow.firstOrNull
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.job
import kotlinx.coroutines.plus
import kotlinx.coroutines.suspendCancellableCoroutine
import kotlin.coroutines.resume
import kotlin.js.JsExport
import kotlin.js.JsName

/**
 * A standard view for dynamic content.
 * It consumes content provided in _chunks_, a concept which can, for example, correspond to pages of a medium like a
 * PDF, or a paged online article, or they can be entirely arbitrary, including a case each line could be
 * made a chunk, which would result in no chunking. Chunking, where possible, should be desired however, because it
 * provides the additional benefit of making the total-reading-time and the scrubber representative of the entire
 * content.
 */
@JsExport
class DynamicStandardView private constructor(
    /**
     * The component encapsulating the content-source-specific logic for obtaining readable content.
     */
    private val dynamicContentProvider: DynamicContentProvider,
    /**
     * The index of the first chunk that will be requested from [dynamicContentProvider].
     * A by-the-way extra feature is that you can provide a `null` to `startingIndex` here to implement 'play from
     * what's currently in scroll' (e.g. user scrolls somewhere, and presses 'play' on the floating player, rather than
     * using a paragraph-player).
     * The result will be that your `DynamicContentProvider.getContent` will be given `null` `coordinates` respectively,
     * and it must return the actual index in `estimatedChunksCountBefore` argument to `MatchingChunkWithEstimate(:, ...)`)
     */
    private val startingChunkIndex: Int?,
    override val contentMutationsInfo: ContentMutationsInfo,
) :
    DynamicContentIndexBase(),
    HasInfoOfEffectOfPullingContentOnUser by dynamicContentProvider,
    HasInfoOfContentLimitedLife by dynamicContentProvider,
    StandardBlocksFlowProvider,
    StandardViewWithIndex,
    Destructible {
    /**
     * See [DynamicContentProviderForMutableLimitedLifeContent] for documentation of the meaning of this type
     * of dynamic content.
     */
    @JsName("forMutableLimitedLifeContent")
    constructor(
        dynamicMutableContentProvider: DynamicContentProviderForMutableLimitedLifeContent,
        startingChunkIndex: Int?,
    ) : this(
        dynamicContentProvider = DynamicContentProvider(dynamicMutableContentProvider),
        startingChunkIndex = startingChunkIndex,
        contentMutationsInfo = ContentMutationsInfo.Mutable(
            mutationsFlow = dynamicMutableContentProvider.mutationsFlow,
        ),
    )

    /**
     * See [DynamicContentProviderForImmutableLimitedLifeContent] for documentation of the meaning of this type
     * of dynamic content.
     */
    @JsName("forImmutableLimitedLifeContent")
    constructor(
        dynamicContentProviderForImmutableAlwaysLiveContent: DynamicContentProviderForImmutableLimitedLifeContent,
        startingChunkIndex: Int?,
    ) : this(
        dynamicContentProvider = DynamicContentProvider(dynamicContentProviderForImmutableAlwaysLiveContent),
        startingChunkIndex = startingChunkIndex,
        contentMutationsInfo = ContentMutationsInfo.Immutable,
    )

    /**
     * See [DynamicContentProviderForImmutableAlwaysLiveContent] for documentation of the meaning of this type
     * of dynamic content.
     */
    @JsName("forImmutableAlwaysLiveContent")
    constructor(
        dynamicContentProviderForImmutableAlwaysLiveContent: DynamicContentProviderForImmutableAlwaysLiveContent,
        startingChunkIndex: Int?,
    ) : this(
        dynamicContentProvider = DynamicContentProvider(dynamicContentProviderForImmutableAlwaysLiveContent),
        startingChunkIndex = startingChunkIndex,
        contentMutationsInfo = ContentMutationsInfo.Immutable,
    )

    /**
     * Use this to implement 'play from here'. The returned cursor should then be used in
     * `bundle.listeningBundle.audioController.play(CursorQueryBuilder.fromCursor(cursorToStartFrom))`
     *
     * DEPRECATED: Use: [com.speechify.client.bundlers.reading.dynamic.DynamicStandardViewReadingBundle.playFromContent]
     * (see the documentation there for documentation of the parameters)
     */
    @Deprecated("Use `DynamicStandardViewReadingBundle.playFromContent`")
    fun addContentAndGetCursorToStartFromIt(
        getChunkPartsForStart: (builder: StreamableContentChunksBuilder) -> MidContentStartInfo,
        chunkIndex: Int,
        estimatedChunksBefore: Int?,
        estimatedChunksAfter: Int?,
        resetIndexToOnlyThisInfo: Boolean = false,
    ): ContentCursor {
        val contentStartInfo = getChunkPartsForStart(streamableContentChunksBuilder)

        val blocks = createBlocksFromAllChunks(
            startingChunkIdx = chunkIndex,
            chunks = arrayOf(
                contentStartInfo.unusedItemsBeforeStartInThisChunk +
                    contentStartInfo.startItemAndFollowingItems,
            ),
        ).single().toList()

        super.putChunks(
            chunks = listOf(blocks),
            startingChunkIdx = chunkIndex,
            estimatedTotalChunksBefore = estimatedChunksBefore,
            estimatedTotalChunksAfter = estimatedChunksAfter,
            resetIndexToOnlyThisInfo = resetIndexToOnlyThisInfo,
        )

        val fromStartBlocks = blocks.subList(
            fromIndex = contentStartInfo.unusedItemsBeforeStartInThisChunk.size,
            toIndex = blocks.size,
        )

        val nextCoordsChunkIndex: Int
        val initialBlocksForFlow: Iterable<StandardBlock>
        if (contentMutationsInfo.isMutable.not()) {
            nextCoordsChunkIndex =
                /* `+ 1` because we are already filling the equal one, so next we will need the following one */
                chunkIndex + 1
            initialBlocksForFlow = blocks
        } else {
            /* This is one place where we achieve #MutableDynamicContentByHavingNoMemory - not having any memory of
             * actual content, thus letting the playback query for it only when about to play.
             */
            nextCoordsChunkIndex =
              /* Flow from exactly the same chunk, because mutable content should be always queried by playback
                 * and there should be no memory in the flow. */
                chunkIndex
            initialBlocksForFlow = listOf()
        }
        startNewContiguousFlow(
            startingChunkIdx = chunkIndex,
            initialBlocks = initialBlocksForFlow,
            nextCoords = PointerToChunkFraction(
                chunkIndex = nextCoordsChunkIndex,
                fractionIntoChunk = 0.0,
            ),
            shouldMarkFirstRequestAsUserSeeking = false,
            /* The first request won't be the user seeking, as the real
             first seek was the user pointing, and making it loaded into `initialBlocks`.
            */
        )

        return fromStartBlocks.first().start
    }

    private val scope =
        createTopLevelCoroutineScope().let {
            it + it.coroutineContext.job.createChildSupervisorJob() /* Changing into a [SupervisorJob], so that this
         view doesn't fail, when any of the children flows fail (this component has the capability to recover from this,
         by just starting a new child again).
         Notably, we don't hold this `CompletableJob` in any variable, because we currently don't have a need to
         complete it normally (`destroy` completes it by cancelling the parent scope, which happens without a need to
         access ).
         */
        }

    /* Hold on to the scope. Create it as early as possible, as it's
   needed for initialization of other fields, while Kotlin doesn't raise compile-time errors if it's used before the
   initialization.*/

    private val thisViewElementReference = ContentElementReference.forRoot()

    private val currentContiguousFlow = AtomicRef(
        BlocksFlowInitialParams(
            parentScope = scope,
            startingChunkIdx = startingChunkIndex,
            nextCoords = startingChunkIndex?.let {
                PointerToChunkFraction(
                    chunkIndex = startingChunkIndex,
                    fractionIntoChunk = 0.0,
                )
            },
            shouldMarkFirstRequestAsUserSeeking = false, /* `false` because the initial chunk is requested from
                 autoplay or the user pressing play, which may have been preceded by the user scrolling away from that
                 first chunk, so let's not astonish them with auto-scrolling. */
        ).let {
            CurrentContiguousFlowInfo(
                startingChunkIdx = it.startingChunkIdx,
                flow = createNewContiguousFlow(
                    initialParams = it,
                ),
            )
        },
    )

    private fun createNewContiguousFlow(
        initialParams: BlocksFlowInitialParams,
    ):
        SharedFlowThatFinishesDestructible<StandardBlock> = flow {
        val startingChunkIdxLateInit = initialParams.startingChunkIdx
        val initialBlocks = initialParams.initialBlocks
        val nextCoords = initialParams.nextCoords
        val shouldMarkFirstRequestAsUserSeeking = initialParams.shouldMarkFirstRequestAsUserSeeking

        emitAll(initialBlocks.asFlow())

        val direction: Direction = forward

        val startingIndexVar = AtomicRefOfNullablePrimitive(nextCoords?.chunkIndex) /* Use `AtomicRef*` and not
         `AtomicInt` because `null` value represents uninitialized (and we don't want to make code hard to understand
         with a magic number symbolizing the uninitialized state)
        */

        /**
         * To make developer aware if anything happens out of order.
         */
        fun startingIndexVarSafeCompareAndSet(expected: Int?, set: Int?) {
            startingIndexVar.compareAndSetOrThrow(expected, with = set) {
                """The chunk index in `startingIndexVar` has changed since (expected $expected but now finding
                    | ${startingIndexVar.value}) - could be a bug (concurrency where it shouldn't happen - check if
                    | there is concurrency protection everywhere or if there was an earlier exception), but if this is
                    | correct then one of the requests should have been cancelled. This should never happen. Check
                    | concurrency protection or earlier exceptions."""
                    .trimMargin()
            }
        }

        val hasFirstChunkBeenRequested = AtomicBool(false)
        val shouldSetStartingChunkIndex = AtomicBool(!startingChunkIdxLateInit.hasValue)

        val abortableState = AbortableStateAsync<Throwable?>()
        emitAllGeneratedByEachInScope { /* The `EachInScope` is especially needed to connect with the
            response-handling code, which gets triggered from outside Kotlin, to send it the cancellation, and the
            cooperative `cancelAndJoin()` to the parent (to never run two concurrent jobs here).
         */
            val startingIndexForRequest = startingIndexVar.value
            val isFirstChunkRequest = !hasFirstChunkBeenRequested.swap(newValue = true)
            val coords = if (isFirstChunkRequest) {
                nextCoords
            } else {
                startingIndexForRequest?.let {
                    PointerToChunkFraction(
                        chunkIndex = it,
                        fractionIntoChunk = 0.0,
                    )
                }
            }

            if (coords != null &&
                (
                    coords.chunkIndex != this@DynamicStandardView.startingChunkIndex &&
                        !isChunkIdxPossible(coords.chunkIndex)
                    )
            ) {
                return@emitAllGeneratedByEachInScope GenerateFlowResult.NoMoreItems.asBaseClass()
            }

            return@emitAllGeneratedByEachInScope suspendCancellableCoroutine {
                it.invokeOnCancellation { cause ->
                    abortableState.producer.abort(cause)
                }

                dynamicContentProvider.getContent(
                    requestInfo = ContentRequestInfo(
                        coordinates = coords,
                        isUserSeeking = isFirstChunkRequest && shouldMarkFirstRequestAsUserSeeking,
                    ),
                    responseProducer = object : ContentResponseProducer {
                        /* TEMPORARY PRIVATIZATION UNTIL - #TODOSupportMoreThanOneChunk
                        override
                        */
                        private fun MatchingChunks(
                            chunks: Array<StreamableContentChunk>,
                            /**
                             * See description of the same parameter in [MatchingChunk].
                             */
                            isLastChunks: Boolean?,
                        ) = MatchingChunksWithEstimate(
                            estimatedChunksCountBefore = null,
                            estimatedChunksCountAfter = isLastChunks?.let { if (it) 0 else 1 },
                            chunks = chunks,
                        )

                        /* TEMPORARY PRIVATIZATION UNTIL - #TODOSupportMoreThanOneChunk
                        override
                        */
                        private fun MatchingChunksWithEstimate(
                            estimatedChunksCountBefore: Int?,
                            estimatedChunksCountAfter: Int?,
                            chunks: Array<StreamableContentChunk>,
                        ) = launchJobControlledFromScopeOnly {
                            val startingChunkIdx =
                                startingIndexForRequest
                                    ?: estimatedChunksCountBefore.also {
                                        startingIndexVarSafeCompareAndSet(
                                            expected = null,
                                            set = estimatedChunksCountBefore,
                                        )
                                    }
                                    ?: throw Error(
                                        """When `getContent` is called with `startIndex==null`, it means it should
                                        | provide the index in `estimatedChunksCountBefore`
                                        """.trimMargin(),
                                    )

                            putChunksWithBestKnowledge(
                                startingChunkIdx = startingChunkIdx,
                                chunks = chunks.map { it.items }.toTypedArray(),
                                estimatedChunksCountBefore = estimatedChunksCountBefore,
                                estimatedChunksCountAfter = estimatedChunksCountAfter,
                            )
                        }

                        /* TEMPORARY PRIVATIZATION UNTIL - #TODOSupportMoreThanOneChunk
                        override
                        */
                        private fun EarlyLastChunks(
                            actualStartIndex: Int,
                            chunks: Array<StreamableContentChunk>,
                        ) = launchJobControlledFromScopeOnly {
                            val chunksAsItemsArrays = chunks.map { it.items }.toTypedArray()
                            if (chunksAsItemsArrays.isEmpty()) {
                                TODO(
                                    "The `EarlyLastChunks` need to provide at least the last chunk," +
                                        "because just modifying the last one in SDK is not implemented.",
                                    /* Here we also ensure that chunk response always has content, which is relied
                                       on in shrinking estimates - see #ChunksResponseAlwaysHasContent.
                                    */
                                )
                            }

                            startingIndexVarSafeCompareAndSet(
                                expected = startingIndexForRequest,
                                set = actualStartIndex,
                            )

                            putChunksWithBestKnowledge(
                                startingChunkIdx = actualStartIndex,
                                chunks = chunksAsItemsArrays,
                                estimatedChunksCountBefore = if (direction === forward) null else 0,
                                estimatedChunksCountAfter = if (direction === forward) 0 else null,
                            )
                        }

                        override fun MatchingChunk(
                            chunk: StreamableContentChunk,
                            isLastChunk: Boolean?,
                        ) =
                            MatchingChunks(
                                chunks = arrayOf(chunk),
                                isLastChunks = isLastChunk,
                            )

                        override fun MatchingChunkWithEstimate(
                            estimatedChunksCountBefore: Int?,
                            estimatedChunksCountAfter: Int?,
                            chunk: StreamableContentChunk,
                        ) =
                            MatchingChunksWithEstimate(
                                estimatedChunksCountBefore = estimatedChunksCountBefore,
                                estimatedChunksCountAfter = estimatedChunksCountAfter,
                                chunks = arrayOf(chunk),
                            )

                        override fun EarlyLastChunk(
                            actualLastChunkIdx: Int,
                            chunk: StreamableContentChunk,
                        ) =
                            EarlyLastChunks(
                                actualStartIndex = actualLastChunkIdx,
                                chunks = arrayOfNotNull(chunk),
                            )

                        private fun putChunksWithBestKnowledge(
                            startingChunkIdx: Int,
                            chunks: Array<Array<TopLevelItem>>,
                            estimatedChunksCountBefore: Int?,
                            estimatedChunksCountAfter: Int?,
                        ) {
                            val chunksWithBlocks = createBlocksFromAllChunks(
                                startingChunkIdx = startingChunkIdx,
                                chunks = chunks,
                            )

                            super@DynamicStandardView.putChunks(
                                chunks = chunksWithBlocks,
                                startingChunkIdx = startingChunkIdx,
                                estimatedTotalChunksBefore = estimatedChunksCountBefore,
                                estimatedTotalChunksAfter = estimatedChunksCountAfter,
                                resetIndexToOnlyThisInfo = false,
                            )

                            if (shouldSetStartingChunkIndex.swap(newValue = false)) {
                                startingChunkIdxLateInit.value = startingChunkIdx
                            }

                            it.resume(GenerateFlowResult.MoreItems(chunksWithBlocks.flatten().iterator()))

                            startingIndexVarSafeCompareAndSet(
                                expected = startingChunkIdx,
                                set = direction(
                                    /* current = */ startingChunkIdx,
                                    /*offset = */ chunksWithBlocks.size,
                                ),
                            )
                        }
                    },
                    builder = streamableContentChunksBuilder,
                    abortThisRequestReceiver = abortableState.receiver,
                )
            }
        }
    }
        .let(
            block = { flow ->
                if (contentMutationsInfo.isMutable.not()) {
                    flow
                        .sharePullingOnlyWhenNeededIn(scope)
                } else {
                    /* This is one place where we achieve #MutableDynamicContentByHavingNoMemory - not having any memory of
                     * actual content, thus letting the playback query for it only when about to play.
                     * (this is #MutableDynamicContentByHavingNoMemory_ByNotUsingSharedFlow
                     */
                    @JsExport.Ignore
                    object : SharedFlowThatFinishesDestructible<StandardBlock>, Flow<StandardBlock> by flow {
                        override suspend fun destroyAndAwaitFinish() {
                        }

                        override val replayCache: List<StandardBlock>
                            get() = listOf()
                    }
                }
            },
        )

    private fun startNewContiguousFlow(
        startingChunkIdx: Int?,
        initialBlocks: Iterable<StandardBlock> = listOf(),
        nextCoords: PointerToChunkFraction?,
        shouldMarkFirstRequestAsUserSeeking: Boolean,
    ): Flow<StandardBlock> {
        val previousFlow = CoLateInit<SharedFlowThatFinishesDestructible<StandardBlock>>(parentScope = scope)

        val newFlowInfo = BlocksFlowInitialParams(
            parentScope = scope,
            startingChunkIdx = startingChunkIdx,
            initialBlocks = initialBlocks,
            nextCoords = nextCoords,
            shouldMarkFirstRequestAsUserSeeking = shouldMarkFirstRequestAsUserSeeking,
        ).let {
            CurrentContiguousFlowInfo(
                startingChunkIdx = it.startingChunkIdx,
                flow = lazyFlowStartingAfterCancellingPrevious(
                    previousFlowSlot = previousFlow,
                    getFlow = {
                        createNewContiguousFlow(
                            initialParams = it,
                        )
                    },
                ),
            )
        }

        previousFlow.value = currentContiguousFlow.swap(
            with = newFlowInfo,
        ).flow

        return newFlowInfo.flow
    }

    private fun createBlocksFromAllChunks(startingChunkIdx: Int, chunks: Array<Array<TopLevelItem>>):
        List<Iterable<StandardBlock>> =
        sequence {
            var currentChunkIdx = startingChunkIdx
            for (chunk in chunks) {
                val currChunkIdx = currentChunkIdx++
                val chunkAsList = chunk.toList()
                yield(
                    if (chunkAsList.isEmpty()) {
                        /* Return some dummy content if the chunk turns out empty to implement a #SolvingRequirementOfNonEmptyChunks
                         * The situation does happen, e.g. a Google Docs document can have a blank page, and it won't
                         * return any content on it. The infrastructure down the line does require some cursors (notably
                         * near #SolvingRequirementOfNonEmptyChunks), so we create some, not to leave a footgun to
                         * SDK-consumers that waits for that empty content.
                         */
                        listOf(
                            thisViewElementReference.getChild(currChunkIdx)
                                .createChildrenBlocksFactory()
                                .createNextChildBlock(
                                    build = {
                                        this.Paragraph(
                                            text =
                                            /** Using the "\n" because, interestingly, a `""` doesn't work, and causes
                                             * a premature end-of-document (perhaps due to the cursor-based
                                             * logic in [com.speechify.client.api.content.view.standard.getAllBlocksAsFlowOfChunks]),
                                             * maybe leading to returning the same chunk in subsequent calls.
                                             */
                                            "\n",
                                        )
                                    },
                                ),
                        )
                    } else {
                        createBlocksFromChunkIdx(currChunkIdx, chunkAsList)
                    },
                )
            }
        }
            .toList()

    private fun createBlocksFromChunkIdx(chunkIdx: Int, chunk: Iterable<TopLevelItem>):
        Iterable<StandardBlock> =
        thisViewElementReference.getChild(chunkIdx).createAsAllChildren(chunk)

    /* Safe to `Ignore` - the member is actually on `internal` interface, so just for SDK's internal use */
    @JsExport.Ignore
    override suspend fun getFlowStartingBeforeCursorUnlessNoContentBefore(
        cursor: ContentCursor,
    ): Flow<StandardBlock> {
        val requestedChunkIdxOrNullIfStart = cursor.getChunkIdxOrNull()

        /* This is one of the places where we achieve #MutableDynamicContentByHavingNoMemory - not having any memory of
         * the flow from previous call to `getFlowStartingBeforeCursorUnlessNoContentBefore`.
         * Remembering the flow's `startingChunkIdx` is useful when the flow has memory (because taking any first chunks
         * already fetched will not request them from SDK-consumers), but since it doesn't have any memory as per
         * #MutableDynamicContentByHavingNoMemory_ByNotUsingSharedFlow, then every time the chunk is different from the
         * current, we need to bring it to be current, so as not to cause a request to SDK-consumers for the `startingChunkIdx` chunks,
         * which may even be out-of-view already.
         */
        if (
            contentMutationsInfo.isMutable &&
            requestedChunkIdxOrNullIfStart != currentContiguousFlow.value.startingChunkIdx.value
        ) {
            val chunkToRequest = requestedChunkIdxOrNullIfStart ?: startingChunkIndex
            startNewContiguousFlow(
                startingChunkIdx = chunkToRequest,
                nextCoords = chunkToRequest?.let {
                    PointerToChunkFraction(
                        chunkIndex = it,
                        fractionIntoChunk = 0.0,
                    )
                },
                shouldMarkFirstRequestAsUserSeeking = false,
            )
        }

        val flowWithEnsuredNavigabilityBackwards = run {
            val currentContiguousFlow = currentContiguousFlow.value
            val currentFlowStartingChunkIdx = currentContiguousFlow.startingChunkIdx.valueInitedOrNull()
            if (requestedChunkIdxOrNullIfStart != null && currentFlowStartingChunkIdx != null) {
                val requestedChunkIdx = requestedChunkIdxOrNullIfStart
                /* If this is request for the very first position in the page that the current flow started from, it
                   will never contain any previous content, preventing being able to scan to previous sentence (e.g.
                   using the 'prev' button), so we need to now request the previous chunk (scanning backwards relies on
                   `getBlocksAroundCursor` returning at least some content *before* the cursor).
                   (
                 */

                val previousChunkIdx = currentFlowStartingChunkIdx - 1
                if (
                    currentFlowStartingChunkIdx == requestedChunkIdx &&
                    cursor.isBeginningOfElementOrEquivalent(start.getParentElement().getChild(requestedChunkIdx)) &&
                    isChunkIdxPossible(previousChunkIdx)
                ) {
                    return@run startNewContiguousFlow(
                        startingChunkIdx = previousChunkIdx,
                        nextCoords = PointerToChunkFraction(
                            chunkIndex = previousChunkIdx,
                            fractionIntoChunk = 0.9,
                        ),
                        shouldMarkFirstRequestAsUserSeeking = true, /* This is a case of user seeking using 'rewind'
                         - they should not be astonished if we help them to scroll the chunk into view. */
                    )
                }
            }
            return@run currentContiguousFlow.flow
        }

        return flowWithEnsuredNavigabilityBackwards.dropUntilBeforeCursorUnlessNoContentBefore(cursor)
            .map { originalBlock ->
                val referenceObject = originalBlock.getContentTexts().map {
                    it.start.getParentElement().ref
                }.firstOrNull { it.value !== null }
                /** See if the `referenceObject` is dead and should be refreshed: */
                if (
                    /* No reference object (headless mode or no highlighting?), so nothing to refresh: */
                    referenceObject == null ||
                    /** No refresh if content is mutable. As per #MutableContentResurrectsViaGetContent, if content is
                     *  mutable, it gets content re-requested all the time through the [DynamicContentProviderForMutableLimitedLifeContent.getContent], so it
                     * has no need for refreshes via [com.speechify.client.helpers.content.standard.dynamic.contentProviders.DynamicContentProviderFullInterface].
                     * #DontCallTryGetResurrectedContentWhenMutable
                     */
                    contentMutationsInfo.isMutable ||
                    /**
                     * Then, if the reference object is alive, no need to refresh, obviously:
                     */
                    dynamicContentProvider.isAlive(referenceObject)
                ) {
                    originalBlock
                } else {
                    /* TODO - This implementation is chatty and CPU intense, as `tryGetResurrectedContent` happens on
                           every block, and the whole page is re-resurrected multiple times.
                           Consider:
                         * persistently saving the new content from `tryGetResurrectedContent`, so move this earlier in
                           the flow.
                         * when the big refactoring of switching play to a single-flow consumption is done, the
                           interaction with `DynamicContentProvider` could also be refactored to take advantage of the
                           knowledge which text is currently being played, so that there is no storage of blocks, and
                           the resurrection happens almost instantaneously (on a change of highlighted word, rather than
                           the entire block)
                     */
                    val chunkIdx = originalBlock.start.getChunkIdxOrNull()!!
                    val resurrectedBlockOrNull = suspendCancellableCoroutine { continuation ->
                        val abortableState = AbortableStateAsync<Throwable?>()
                        continuation.invokeOnCancellation { abortableState.producer.abort(it) }
                        dynamicContentProvider.tryGetResurrectedContent(
                            chunkIndex = chunkIdx,
                            responseProducer = object : TryGetAliveContentResponseProducer {
                                override fun FoundAliveContent(
                                    chunk: StreamableContentChunk,
                                ) {
                                    continuation.resume(chunk)
                                }

                                override fun FoundNoAliveContent() {
                                    continuation.resume(null)
                                }
                            },
                            builder = streamableContentChunksBuilder,
                            abortThisRequestReceiver = abortableState.receiver,
                        )
                    }?.let {
                        createBlocksFromChunkIdx(
                            chunkIdx,
                            it.items.asIterable(),
                        ).firstOrNull { resurrectedBlock -> resurrectedBlock.start.isEqual(originalBlock.start) }
                    }

                    resurrectedBlockOrNull ?: originalBlock
                }
            }
    }

    override suspend fun getCursorForEstimatedChunk(pointerToEstimatedChunk: PointerToChunkFraction):
        Result<ContentCursor> {
        /* TODO: as per #TODODoubleRequestForContentOngetCursorFromProgress, this is going to be disposed after exiting
            from `getCursorFromProgress`, which is also somewhat wasteful. Consider optimizing.
                    */

        val newFlow = startNewContiguousFlow(
            startingChunkIdx = null, // We don't know the actual chunks' index, as the estimate may be an overshot.
            nextCoords = PointerToChunkFraction(
                chunkIndex = pointerToEstimatedChunk.chunkIndex,
                fractionIntoChunk = pointerToEstimatedChunk.fractionIntoChunk,
            ),
            shouldMarkFirstRequestAsUserSeeking = true, /* we are in a function that indeed gets called as a result of
             user seeking. */
        )

        val (_, chunkBlocks) = newFlow
            .groupConsecutiveBy { it.start.getChunkIdxOrNull()!! }.firstOrNull {
                it.second.isNotEmpty()
            } /* Because the actual index returned can be different from `pointerToEstimatedChunk.chunkIdx`, we discover
             actual index from the actual chunks returned. */
            ?: TODO(
                "Found no content in any of the chunks returned. If the last chunk has no content, " +
                    "then the last one that does should be reported. And `EarlyLastChunks` needs to provide" +
                    " at least the last chunk, because searching for the last one in SDK is not implemented.",
                /* need to go 'end of the previous chunk'. BTW, shrinking the estimate from the beginning is
                 already supported because we seek the fist one in the newFlow) */
            ) // This shouldn't throw here, but rather near the first #ChunksResponseAlwaysHasContent

        return (
            chunkBlocks.allContentTexts.getContentTextContainingWordNumber(
                (pointerToEstimatedChunk.fractionIntoChunk * chunkBlocks.allContentTexts.wordCount)
                    .toInt(),
            ) as ResultOfGetContentTextContainingWordNumber.FoundContentText
            ).contentText.start.successfully()
    }

    override val contentAmountStateFlow: ExternalStateChangesFlow
        get() = super.contentAmountStateFlow

    override suspend fun coGetCursorFromProgress(progress: Double): Result<ContentCursor> {
        val cursorResult = super.getCursorWithCoordsFromProgress(progress).orReturn { return it }
        val chunkIdx = cursorResult.contentCursor.getChunkIdxOrNull()!!

        /* TODO - #TODODoubleRequestForContentOngetCursorFromProgress . If the chunk has already been loaded before,
             then the above `getCursorWithCoordsFromProgress` will return without starting a new flow, but if this is
             the first time for this chunk to be requested, then the above statement will run
             `getCursorForEstimatedChunk`, which will already create a new flow, making the below redundant.
              TODO: consider doing something to prevent the  double-request. Consider remembering the starting
                position of the flow (on the other hand, the SDK consumers may be getting new requests all the time
                if we fix highlighting by always fetching from them)
             */
        startNewContiguousFlow(
            startingChunkIdx = chunkIdx,
            nextCoords = PointerToChunkFraction(
                chunkIndex = chunkIdx,
                fractionIntoChunk = cursorResult.coords.fractionIntoChunk,
            ),
            shouldMarkFirstRequestAsUserSeeking = true, /* we are in a function that indeed gets called as a result of
             user seeking. */
        )

        return cursorResult.contentCursor.successfully()
    }

    override fun destroy() =
        scope.cancel()

    /* We're delegating StandardView manually, because it's not easy to do `by standardViewDelegate` when parts of our
        implementation require `this`. Also, we would have to use the main constructor, making it private, which would
         result in the `new` syntax being unavailable in JavaScript.
       */
    private val standardViewFromThisFlowDelegate = DynamicStandardViewFromFlowOfOnDemandContent(
        referenceOfParentBlock = thisViewElementReference,
        standardBlocksFlowProvider = this,
    )
    override fun getBlocksAroundCursor(cursor: ContentCursor, callback: Callback<StandardBlocks>) = callback.fromCo {
        standardViewFromThisFlowDelegate.coGetBlocksAroundCursor(cursor)
    }

    override fun getBlocksBetweenCursors(
        start: ContentCursor,
        end: ContentCursor,
        callback: Callback<StandardBlocks>,
    ) = callback.fromCo {
        standardViewFromThisFlowDelegate.coGetBlocksBetweenCursors(start, end)
    }

    override val start: ContentCursor =
        standardViewFromThisFlowDelegate.start
    override val end: ContentCursor =
        standardViewFromThisFlowDelegate.end
}

typealias Direction = (current: Int, offset: Int) -> Int

val forward: Direction = { current: Int, offset: Int -> current + offset }

val backward: Direction = { current: Int, offset: Int -> current - offset }

private class CurrentContiguousFlowInfo(
    val startingChunkIdx: CoLateInit<Int>,
    val flow: SharedFlowThatFinishesDestructible<StandardBlock>,
) : AsyncDestructible {
    override suspend fun destroyAndAwaitFinish() {
        startingChunkIdx.cancel()
        flow.destroyAndAwaitFinish()
    }
}

private fun ContentCursor.getChunkIdxOrNull() = getParentElement().path.firstOrNull()

private val streamableContentChunksBuilder = StreamableContentChunksBuilder()

private class BlocksFlowInitialParams(
    parentScope: CoroutineScope,
    startingChunkIdx: Int?,
    val initialBlocks: Iterable<StandardBlock> = listOf(),
    val nextCoords: PointerToChunkFraction?,
    val shouldMarkFirstRequestAsUserSeeking: Boolean,
) {
    val startingChunkIdx: CoLateInit<Int> = if (startingChunkIdx == null) {
        CoLateInit(parentScope = parentScope)
    } else {
        CoLateInit(initializedValue = startingChunkIdx)
    }
}
