@file:Suppress("OPT_IN_USAGE")

package net.sergeych.parsec3

import io.ktor.utils.io.errors.*
import kotlinx.coroutines.*
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.serialization.json.Json
import net.sergeych.boss_serialization.BossDecoder
import net.sergeych.boss_serialization_mp.BossEncoder
import net.sergeych.boss_serialization_mp.decodeBoss
import net.sergeych.mp_logger.LogTag
import net.sergeych.mp_logger.exception
import net.sergeych.mp_logger.warning
import net.sergeych.mptools.toDump

/**
 * Create adapter, an interface to provide local API commands and invoke remote API commands
 * asynchronously and concurrently. To implement adapter over some protocol you need:
 *
 * - create some class T that will hold the "state" of the API, e.g. session. Use `Unit` for stateless
 * - create an CommandHost<T> class and fill it with commands to be executed by a local party (could be empty).
 * - provide a method that will transmit binary frames to a remote
 * - for any incoming binary frame call [Adapter.receiveFrame].
 *
 * Here is the sample of short-circuit pair of adapters:
 *
 * ```
 *   val ch12 = Channel<ByteArray>()
 *   val ch21 = Channel<ByteArray>()
 *
 *   val api1 = Api1()
 *   val api2 = Api2()
 *
 *   // this interface is provided by api1 locally
 *   api1.on(api1.foo) {
 *       it + "foo"
 *   }
 *   // and that by api2 locally
 *   api2.on(api2.bar) {
 *       it + "bar"
 *   }
 *
 *   // respective adapters that send to a channel:
 *   val a1 = Adapter(Unit,api1) { ch12.send(it) }
 *   val a2 = Adapter(Unit,api2) { ch21.send(it) }
 *
 *   // pumps to load frames from the respecive channel and pass them to the adapter:
 *   launch { for( b in ch12) a2.receiveFrame(b) }
 *   launch { for( b in ch21) a1.receiveFrame(b) }
 *
 *   // note that adapter `a1` is exepcted to provide `Api2` and vice versa:
 *   assertEquals("123bar", a1.invokeCommand(api2.bar, "123"))
 *   assertEquals("321foo", a2.invokeCommand(api1.foo, "321"))
 *
 *   ch12.cancel() // to close channel pump
 *   ch21.cancel()
 * ```
 *
 *  See [CommandHost] class documentation to learn how to declare API interfaces in a compile time type safe
 *  manner.
 *
 * @param instance any instance that represent the state of the interface. Could be `Unit` for stateless.
 * @param commandHost the Api __this adapter provides to a remote__. It differs from the interface expected on the
 *                    remote side.
 * @param exceptionRegistry allows to transform serialized parsec exception (see [ParsecException]) to
 *                          application-specific exception classes. Default implementation only decodes
 *                          parsec3 built-in exceptions.
 * @param sendEncoded a method that performs actual sending of the packed binary frame to the remote side
 */
open class Adapter<T: WithAdapter>(
    private val instance: T,
    private val commandHost: CommandHost<T>,
    private val exceptionRegistry: ExceptionsRegistry = ExceptionsRegistry(),
    private val sendEncoded: suspend (data: ByteArray) -> Unit,
) : LogTag("ADPTR") {

    val scope = CoroutineScope(GlobalScope.coroutineContext)

    /**
     * If you plaan to cancel the adapter from outside its context (e.g. from API command or like)
     * provide specific close code that frees resource for this adapter (like closing websocket connection).
     *
     * Typically this method should be set by the transport when connection is established. It is, still, an option,
     * as not any adapter/trasport combination requires it. Default websock transport implements it
     * out of the box.
     */
    var onCancel: suspend ()->Unit = { throw NotImplementedError("this adapted has no onCancel implementation, provide it")}

    private val completions = mutableMapOf<Int, CompletableDeferred<ByteArray>>()
    private var lastId = 1
    private val access = Mutex()

    /**
     * Call the remote party for a type command. See [CommandHost] on how to declare and implement
     * such commands in parsec3. Suspends until receiving answer from a remote party.
     *
     * @param ca command descriptor, provided by [CommandHost.command], usually, it should be a val in the
     *            [CommandHost] or derived instance.
     * @param args command specific args of any serializable type
     * @return value from remote partm any serializable type.
     */
    @Suppress("UNCHECKED_CAST")
    suspend fun <A, R> invokeCommand(ca: CommandDescriptor<A, R>, args: A = Unit as A): R {
//        var myId = -1
        return CompletableDeferred<ByteArray>().also { dr ->
            sendPackage(
                access.withLock {
//                    debug { "calling $lastId:${ca.name}($args)" }
                    completions[lastId] = dr
//                    myId = lastId
                    Package.Command(lastId++, ca.name, BossEncoder.encode(ca.ass, args))
                }
            )
        }.await().let {
//            debug { "result  $myId:$it" }
            BossDecoder.decodeFrom(ca.rss, it)
        }
    }

    /**
     * Cancels the scope that is used to call incoming commands. Cancelling the scope effectively cancels any
     * unfinished commands. It _will not wait for its completion_.
     *
     * Not calling it might cause unknown number of pending command processing coroutines to remain active.
     */
    suspend fun cancel() {
        scope.cancel()
        onCancel.invoke()
    }

    /**
     * merge exceptions registry with current (existing entries will be overwritten)
     */
    @Suppress("unused")
    fun registerErrors(otherRegistry: ExceptionsRegistry) {
        exceptionRegistry.putAll(otherRegistry)
    }

    private suspend fun processIncomingPackage(pe: Package) {
        when (pe) {
            is Package.Command -> {
                    scope.launch {
                        try {
                            val handler = commandHost.handler(pe.name)
                            val result = handler.invoke(instance, pe.args)
                            sendPackage(
                                Package.Response(pe.id, result)
                            )
                        } catch(_: CancellationException) {
                            // just ignore it
                        } catch (ae: ParsecException) {
                            sendPackage(Package.Response(pe.id, null, ae.code, ae.text))
                        } catch (ex: Throwable) {
                            exceptionRegistry.classCodes[ex::class]?.let { code ->
                                sendPackage(Package.Response(pe.id, null, code, ex.toString()))
                            } ?: run {
                                ex.printStackTrace()
                                sendPackage(Package.Response(pe.id, null, "UNKNOWN_ERROR", ex.toString()))
                            }
                        }
                    }
            }

            is Package.Response -> {
                val dr = access.withLock { completions.remove(pe.toId) }
                if (dr == null)
                    warning { "response to unregistered toId: ${pe.toId}, ignoring" }
                else {
                    if (pe.result != null)
                        dr.complete(pe.result)
                    else
                        dr.completeExceptionally(
                            pe.errorCode?.let { exceptionRegistry.getException(it, pe.errorText) }
                                ?: InvalidFrameException("invalid package: no result, no error code")
                        )
                }
            }
        }
    }

    private suspend fun sendPackage(pe: Package) {
            sendEncoded(BossEncoder.encode(pe))
    }

    /**
     * Provide an incoming frame to the adapter. Consumer software receives binary blocks from whatever
     * protocol it uses (say, UDP) and feed them to this method.
     */
    suspend fun receiveFrame(data: ByteArray) {
        try {
            processIncomingPackage(data.decodeBoss())
        } catch (x: Exception) {
            exception { "unexpected error processing frame: \n${data.toDump()}" to x }
        }
    }

    class CloseError : IOException("adapter is closed")

    /**
     * Frees any allocated resources, for example, pending commands.
     * Any protocol implementation MUST call it when connection is closed.
     * Note that penging commands are completed exceptionally woth [CloseError].
     *
     * Normally, end user does not call it. Instead, it should call [cancel] if adapter needs
     * to be explicitedly cancelled. Instead, [Parsec3Transport] inplementations _must_ call `close()`
     * on the adapter instance it create when the connection is closed.
     */
    fun close() {
        val error = CloseError()
        completions.forEach {
            it.value.completeExceptionally(error)
        }
    }

    companion object {
        val format = Json { prettyPrint = true }
    }
}

