package com.speechify.client.internal.actor

import com.speechify.client.api.diagnostics.DiagnosticEvent
import com.speechify.client.api.diagnostics.Log
import com.speechify.client.api.util.Destructor
import com.speechify.client.internal.launchTask
import com.speechify.client.internal.time.Duration
import com.speechify.client.internal.util.extensions.collections.ChannelSendInterface
import com.speechify.client.internal.util.extensions.collections.asChannelWithSends
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.asSharedFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.transformWhile
import kotlinx.coroutines.launch

typealias LoopFn<Command, Event> = suspend CoroutineScope.(
    commands: ReceiveChannel<Command>,
    events: MutableSharedFlow<Event>,
) -> Unit

/**
 * See here examples of using an actor https://www.notion.so/Actor-usage-examples-3d5989a0637e43c6938e206509d5491a
 */
internal abstract class Actor<Command, Event> private constructor(
    private val commands: Channel<Command>,
    private val eventsWithNullAsTerminator: MutableSharedFlow<Event?>,
    internal val scope: CoroutineScope,
) : ChannelSendInterface<Command> by commands.asChannelWithSends() {
    private lateinit var job: Job

    protected val eventsSink = MutableSharedFlow<Event>()

    val events = eventsSink.asSharedFlow()

    private constructor(
        channels: Pair<Channel<Command>, MutableSharedFlow<Event?>>,
        coroutineScope: CoroutineScope,
    ) : this(
        commands = channels.first,
        eventsWithNullAsTerminator = channels.second,
        scope = coroutineScope,
    )

    constructor(
        bufferAndReplaySize: Int = DEFAULT_BUFFER_AND_REPLAY_SIZE,
        coroutineScope: CoroutineScope,
    ) : this(
        channels = makeChannels(
            bufferAndReplaySize = bufferAndReplaySize,
        ),
        coroutineScope = coroutineScope,
    )

    companion object {
        internal const val DEFAULT_BUFFER_AND_REPLAY_SIZE: Int = 100 // chose this arbitrarily

        /**
         * Start an actor composed of only a function
         * @param bufferSize The buffer size of the channels (default 100)
         * @param loop The loop routine to process commands and emit events
         *
         */
        internal fun <Command, Event> functionActor(
            bufferSize: Int = DEFAULT_BUFFER_AND_REPLAY_SIZE,
            coroutineScope: CoroutineScope,
            loop: LoopFn<Command, Event>,
        ): Actor<Command, Event> {
            val actor = object : Actor<Command, Event>(bufferSize, coroutineScope) {
                override suspend fun CoroutineScope.loop(
                    commands: ReceiveChannel<Command>,
                    events: MutableSharedFlow<Event>,
                ) =
                    loop(this@loop, commands, events)
            }
            return actor.apply { start() }
        }
    }

    /**
     * Gracefully stop the whole actor, by closing the channels and waiting for the loop function to terminate normally.
     *
     * This only works if the loop function is well-behaved, i.e. if it returns once the commands channel closes.
     *
     * Once this function is called it can be discarded. The behaviour of calling this function
     * multiple times is left unspecified and should be avoided
     *
     * @param waitTimeForSelfFinish Optional timeout after which the actor will be forcefully stopped.
     */
    open suspend fun stop(waitTimeForSelfFinish: Duration = Duration.seconds(10)) {
        commands.close()
        if (!this::job.isInitialized) return
        if (!job.timeoutJoinOrCancel(
                waitTimeForSelfFinish,
                createException = {
                    CancellationException(
                            "Cancelling after waiting ${waitTimeForSelfFinish.inWholeMilliseconds}ms for the actor's" +
                                " job to finish by itself.",
                        )
                },
            )
        ) {
            Log.d(
                DiagnosticEvent(
                    message = "Actor.stop() was waiting for the actor to finish by" +
                        " itself but it didn't, so proceeded to cancel it.",
                    properties = mapOf("waitTime" to "${waitTimeForSelfFinish.inWholeMilliseconds}ms"),
                    sourceAreaId = "Actor.stop()",
                ),
            )
            eventsWithNullAsTerminator.emit(null)
        }
    }

    /**
     * Spawn a background task that will receive events produced by the actor and call the provided callback with it.
     *
     * @param callback Callback to listen to events
     * @return a function that when called unsubscribes and cancels the spawned task
     *
     * NOTE: If you unsubscribe and subscribe again, the new subscription will get duplicate events
     * (according to the replay specified in constructor's `bufferAndReplaySize`). #ActorsSubscribeGetReplays
     */
    fun subscribe(callback: (event: Event) -> Unit): Destructor = coSubscribe(callback)

    /**
     * Note that the flow is _hot_, so it is not guaranteed to contain events which occurred before the subscription.
     * To get all events from a specific action, subscribe before taking the action.
     *
     * This member can be used instead of [subscribe] in environment supporting coroutines to use structured concurrency
     * for controlling the subscription lifetime (unsubscribing will happen automatically with the cancellation of the
     * coroutine scope's Job).
     */
    internal val eventsHotFlow: Flow<Event>
        get() = this@Actor.events.transformWhile { event -> event?.also { emit(it) } != null }

    /**
     * Spawn a background task that will receive events produced by the actor and call the provided callback with it.
     *
     * @param callback Callback to listen to events
     * @return a function that when called unsubscribes and cancels the spawned task
     */
    internal fun coSubscribe(callback: suspend (event: Event) -> Unit): Destructor {
        val subscriberJob = launchTask {
            eventsHotFlow
                .onEach(callback)
                .launchIn(this)
        }
        return {
            subscriberJob.cancel(CancellationException("Subscription was cancelled using a destructor"))
        }
    }

    protected fun start() {
        job = scope.launch(CoroutineName("Actor.start")) {
            coroutineScope {
                launch(CoroutineName("Actor.start.events.emit")) {
                    eventsSink.onEach { eventsWithNullAsTerminator.emit(it) }
                        .collect()
                }
                launch(CoroutineName("Actor.start.loop")) {
                    loop(commands, eventsSink)
                }
            }
        }
    }

    // TODO(mendess) somehow make the shared flow that is passed here not allow for null values

    /**
     * Note that failing inside this loop will also fail the [scope], correctly communicating to any parent
     * contexts (through the structured-concurrency) that this actor is no longer working, so that
     * the parents have a chance to handle this (e.g. restart some bigger part of the application).
     */
    protected abstract suspend fun CoroutineScope.loop(
        commands: ReceiveChannel<Command>,
        events: MutableSharedFlow<Event>,
    )
}
