package com.speechify.client.internal.util.extensions.collections.flows

import com.speechify.client.internal.sync.AtomicInt
import com.speechify.client.internal.sync.decrement
import com.speechify.client.internal.sync.increment
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.emitAll
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map

/**
 * Creates a buffered flow with a buffer of specified size, also keeps track of the count of items ahead in buffer,
 * while also having a [BufferObserver] within.
 * [producerExceptionBehavior] specifies the behavior of the flow should an exception arise.
 * See [ProducerExceptionBehavior] for more information.
 */
internal fun <T> Flow<T>.bufferWithObserverAndCount(
    capacity: Int,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
    producerExceptionBehavior: ProducerExceptionBehavior,
    bufferObserver: BufferObserver<T>,
): Flow<ValueWithInfoOfCountOfItemsAheadInBuffer<T>> = flow {
    val numItemsAheadInBuffer = AtomicInt(0)

    emitAll(
        this@bufferWithObserverAndCount
            .bufferWithObserver(
                capacity = capacity,
                onBufferOverflow = onBufferOverflow,
                producerExceptionBehavior = producerExceptionBehavior,
                bufferObserver = object : BufferObserver<T> {
                    override suspend fun onItemAddedToBuffer(item: T) {
                        numItemsAheadInBuffer.increment()
                        bufferObserver.onItemAddedToBuffer(item)
                    }
                },
            )
            .map {
                numItemsAheadInBuffer.decrement()
                object : ValueWithInfoOfCountOfItemsAheadInBuffer<T>(it) {
                    override fun getItemsCountInAheadBuffer(): Int = numItemsAheadInBuffer.get()
                }
            },
    )
}
