@file:OptIn(ExperimentalTime::class)

package com.speechify.client.internal.util.extensions.collections.flows

import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map
import kotlin.time.Duration
import kotlin.time.ExperimentalTime
import kotlin.time.TimedValue

internal class BufferedFlowMeasurement<T>(
    val currentItemConsumption: TimedValue<T>,
    val nextItemProduction: TimedValue<T>,
    /**
     * If positive: the consumer wait time caused by a lag in the production
     * If negative: the amount of time the production still had left to fulfill the request without introducing a lag
     * on consumption (ignoring the negative sign, obviously).
     *
     * Negative is the expected value here and its measuring of it is desired - it allows to see how much safety
     * buffer there still is and facilitate early warnings that there is not much left.
     *
     * NOTE: Making lag to be the positive is a design choice to naturally translate into latency, once it gets
     * positive.
     */
    val hangBetweenConsumptionsIfPositiveDuration: Duration,
    val bufferItemsAfterCurrentCount: Int,
)

/**
 * This extension function enhances a Flow by adding monitoring capabilities for tracking
 * the time lag between the consumption of current items and the production of next items,
 * along with the number of items remaining in the buffer at the time of each consumption.
 * See [BufferedFlowMeasurement] for the exact data that is measured.
 */
@OptIn(ExperimentalTime::class)
internal fun <T> Flow<T>.bufferWithMeasurement(
    reportMeasurement: (measurement: BufferedFlowMeasurement<T>) -> Unit,
    bufferSize: Int,
    producerExceptionBehavior: ProducerExceptionBehavior,
    bufferObserver: BufferObserver<T>,
): Flow<T> =
    this
        .withProductionTimeMeasurementWithStartAndEnd()
        .bufferWithObserverAndCount(
            capacity = bufferSize,
            producerExceptionBehavior = producerExceptionBehavior,
            /**
             an "adapter" of sorts, that translates the value in the "wrapped"
             [ValueWithProductionTimeMeasurement<T>] downstream from [withProductionTimeMeasurementWithStartAndEnd]
             into a T, while reporting it to the original [bufferObserver] which is a
             [BufferObserver<T>] that is passed in here.
             */
            bufferObserver = object : BufferObserver<ValueWithProductionTimeMeasurement<T>> {
                override suspend fun onItemAddedToBuffer(item: ValueWithProductionTimeMeasurement<T>) {
                    bufferObserver.onItemAddedToBuffer(item.value)
                }
            },
        )
        .withConsumptionMeasurementWithStartAndEnd()
        .map {
            if (it.previousItemWithConsumptionMeasurement != null) {
                reportMeasurement(
                    BufferedFlowMeasurement(
                        currentItemConsumption = it.previousItemWithConsumptionMeasurement.let { previousConsumption ->
                            TimedValue(
                                value = previousConsumption.value.value.value,
                                duration = previousConsumption.durationWithStartAndEndTime.duration,
                            )
                        },
                        nextItemProduction = it.currentItem.value.let { currentItem ->
                            TimedValue(
                                value = currentItem.value,
                                duration = currentItem.productionTimeMeasurement.duration,
                            )
                        },
                        hangBetweenConsumptionsIfPositiveDuration =
                        /* If the current consumption finished before the next item was produced, this is a hang,
                               and we need to make this value positive
                             */
                        it.currentItem.value.productionTimeMeasurement.endTimeMark -
                            it.previousItemWithConsumptionMeasurement.durationWithStartAndEndTime.endTimeMark,
                        bufferItemsAfterCurrentCount = it.currentItem.getItemsCountInAheadBuffer(),
                    ),
                )
            }
            it.currentItem.value.value
        }

internal abstract class ValueWithInfoOfCountOfItemsAheadInBuffer<T>(
    val value: T,
) {
    /**
     * The number of items ahead in the buffer, excluding the current item.
     */
    abstract fun getItemsCountInAheadBuffer(): Int
}
