import { catching } from "@warrenio/utils/catching";
import { mapGetOrCreate } from "@warrenio/utils/collections/maps";
import { notNull } from "@warrenio/utils/notNull";
import { atom } from "jotai/vanilla";
import { last } from "remeda";
import invariant from "tiny-invariant";
import { stable } from "../../../utils/stable.ts";
import { showWarn } from "../../error/errorStream.ts";
import type { LocationSlug } from "../../location/query.ts";
import { cpuSet, type HostId, type MetricId, type MetricSet, otherSet } from "./metricIds.ts";
import type { MetricsEvent } from "./metrics.schema.ts";
import { type MetricsSocket, type SocketFactory, socketFactoryAtom } from "./MetricsSocket.ts";

//#region Constants / configuration

const MAX_HISTORY_EVENTS = 100;
const MERGE_MAX_DELTA_MS = 1000;

const SOCKETS_CHANGE_CHECK_INTERVAL = 1000;

const DEFAULT_HOST_LIMIT = 10;
const DEFAULT_IDLE_HOST_LIMIT = 4;

//#endregion

class Listeners extends Map<HostId, Map<MetricId, Set<MetricListener>>> {
    isEmpty() {
        return this.size === 0;
    }

    lookup(host: HostId, metric: MetricId) {
        return this.get(host)?.get(metric);
    }

    /** Try to add a listener to an active host.
     * @returns `true` if the listener was added, `false` if the host is not active
     */
    addLive(host: HostId, metric: MetricId, listener: MetricListener): boolean {
        const hostListeners = this.get(host);
        if (!hostListeners) {
            return false;
        }

        // NB: Assumes the metric is always active for this socket (ie. the listener is only added after filtering by the metric set)
        const metricListeners = mapGetOrCreate(hostListeners, metric, () => new Set<MetricListener>());
        metricListeners.add(listener);
        return true;
    }

    private createListenerSet(host: HostId, metric: MetricId) {
        const hostListeners = mapGetOrCreate(this, host, () => new Map());
        return mapGetOrCreate(hostListeners, metric, () => new Set<MetricListener>());
    }

    add(host: HostId, metric: MetricId, listener: MetricListener) {
        this.createListenerSet(host, metric).add(listener);
    }

    remove(host: HostId, metric: MetricId, listener: MetricListener) {
        this.get(host)?.get(metric)?.delete(listener);
    }

    hasListener(host: HostId, metric: MetricId, listener: MetricListener) {
        return this.get(host)?.get(metric)?.has(listener) ?? false;
    }

    *allListeners() {
        for (const hostListeners of this.values()) {
            for (const metricListeners of hostListeners.values()) {
                yield* metricListeners;
            }
        }
    }

    /** @returns Hosts that are idle (ie. have no listeners) */
    copyTo(other: Listeners) {
        const idleHosts = [];
        for (const [host, metricListeners] of this) {
            let idle = true;
            for (const [metric, listeners] of metricListeners) {
                let otherListeners;
                for (const listener of listeners) {
                    // NB: Does not create the other set if there are no listeners
                    idle = false;
                    otherListeners ??= other.createListenerSet(host, metric);
                    otherListeners.add(listener);
                }
            }
            if (idle) {
                idleHosts.push(host);
            }
        }
        return idleHosts;
    }
}

export type State = "connecting" | "open" | "closed" | "suspended";

export interface MetricListener {
    onState: (state: State) => void;
    onMessage: (event: MetricsEvent, history: History) => void;
}

export interface EventSummary {
    readonly time: Date;
    readonly value: number;
}

export function summaryFromEvent(event: MetricsEvent): EventSummary | undefined {
    if (event.metric === undefined) {
        if (event.state !== "expired") {
            showWarn("Unknown metric event schema", event);
        }
        return undefined;
    }

    return {
        time: new Date(event.time),
        value: event.metric,
    };
}

export class History {
    public readonly events: EventSummary[] = [];

    constructor(public readonly maxEvents: number = MAX_HISTORY_EVENTS) {}

    last() {
        return last(this.events);
    }

    /** @returns `true` if events were changed or added */
    add(event: MetricsEvent) {
        const summary = summaryFromEvent(event);
        if (!summary) {
            return false;
        }

        // Deduplicate events with the same time
        const prev = last(this.events);
        if (prev && Math.abs(prev.time.getTime() - summary.time.getTime()) < MERGE_MAX_DELTA_MS) {
            if (prev.value !== summary.value) {
                this.events[this.events.length - 1] = { ...prev, value: summary.value };
                return true;
            }
            return false;
        }

        this.events.push(summary);
        if (this.events.length > this.maxEvents) {
            this.events.shift();
        }
        return true;
    }
}

export class Histories {
    // Does not store location, since it assumes `HostId`s are unique across locations
    private readonly histories = new Map<HostId, Map<MetricId, History>>();

    get(host: HostId, metric: MetricId) {
        return this.histories.get(host)?.get(metric);
    }

    add(event: MetricsEvent) {
        const { host, service: metric } = event;
        const hostHistories = mapGetOrCreate(this.histories, host, () => new Map());
        const history = mapGetOrCreate(hostHistories, metric, () => new History());
        const wasChanged = history.add(event);
        return [history, wasChanged] as const;
    }
}

/**
 * Manages a single metrics socket for a specific location and metric set.
 *
 * Dynamically reconnects with an appropriate set of hosts based on the listeners.
 */
class SocketManager {
    private socket: MetricsSocket | null = null;

    /** Currently active hosts */
    private readonly listeners = new Listeners();
    /** Hosts that have pending listeners but are not connected yet */
    private readonly pending = new Listeners();
    /** Hosts that currently have no listeners (but have had in the past) */
    private idleHosts: HostId[] = [];

    private tickTimer: ReturnType<typeof setTimeout> | undefined;

    private connectionTimer: ReturnType<typeof setTimeout> | undefined;
    private retryCount = 0;

    private disposed = false;

    constructor(
        public readonly histories: Histories,
        public readonly factory: SocketFactory,

        public readonly location: string,
        public readonly metricSet: MetricSet,

        public readonly hostLimit = DEFAULT_HOST_LIMIT,
        public readonly idleLimit = DEFAULT_IDLE_HOST_LIMIT,
    ) {}

    dispose() {
        this.log("Disposing");
        this.disposed = true;
        this.close();
    }

    private log(msg: string, ...args: unknown[]) {
        console.debug(`[Metrics/${this.metricSet.name}@${this.location}] ${msg}`, ...args);
    }

    private queueTick() {
        if (!this.tickTimer) {
            this.tickTimer = setTimeout(() => {
                this.tickTimer = undefined;
                this.tick();
            }, SOCKETS_CHANGE_CHECK_INTERVAL);
        }
    }

    private tick() {
        if (this.disposed) {
            return;
        }

        if (!this.socket) {
            if (!this.connectionTimer) {
                // Exponential backoff
                const retryDelaySec = Math.min(this.retryCount === 0 ? 0 : 2 ** this.retryCount, 60);
                this.log("No socket, connecting in %d seconds", retryDelaySec);
                this.connectionTimer = setTimeout(() => {
                    this.connectionTimer = undefined;
                    this.connectPending();
                }, retryDelaySec * 1000);
            }
        } else if (!this.pending.isEmpty()) {
            // XXX: This will re-create the socket even if pending listeners have been removed in the meantime
            this.log("Pending listeners, re-creating socket");
            this.close();
        }
    }

    private connectPending() {
        if (this.disposed) {
            return;
        }

        invariant(!this.socket, "Socket should be closed when connecting pending listeners");

        /** Hosts to connect to */
        const hosts = [...this.pending.keys()].slice(0, this.hostLimit);

        // Fill up to the host limit with idle hosts
        let additionalHosts;
        if (hosts.length < this.hostLimit && this.idleHosts.length > 0) {
            const remainingLimit = Math.min(this.hostLimit - hosts.length, this.idleLimit);
            additionalHosts = this.idleHosts.slice(0, remainingLimit);
            hosts.push(...additionalHosts);
        }

        if (hosts.length === 0) {
            this.log("No pending listeners to connect, going idle");
            return;
        }

        // Transfer listeners from pending to live
        invariant(this.listeners.isEmpty(), "Listeners should be empty when connecting pending listeners");
        for (const host of hosts) {
            const hostListeners = this.pending.get(host);
            this.pending.delete(host);
            this.listeners.set(host, hostListeners ?? new Map(hostListeners));
        }
        for (const listener of this.listeners.allListeners()) {
            catching(() => listener.onState("connecting"));
        }

        // Notify remaining pending listeners as suspended
        for (const listener of this.pending.allListeners()) {
            catching(() => listener.onState("suspended"));
        }

        // Connect to the socket
        this.log(
            "Connecting to socket with hosts: [%s], extra: [%s]",
            hosts.join(", "),
            additionalHosts?.join(", ") ?? "",
        );
        this.socket = this.factory(this.location, this.metricSet, hosts);
        this.socket.onopen = () => {
            this.log("Socket opened");
            this.retryCount = 0;
            for (const listener of this.listeners.allListeners()) {
                catching(() => listener.onState("open"));
            }
        };
        this.socket.onmessage = (event) => {
            this.handleEvent(event);
        };
        this.socket.onclose = (e) => {
            this.onClose(e);
        };
        // TODO: Connection error handling?
    }

    private close() {
        this.socket?.close();
    }

    private transferListenersToPending() {
        // "Appending" the existing listeners into the pending set will prioritize newly added listeners over existing
        // ones (since existing ones will then be iterated through last in `connectPending()`)
        this.idleHosts = this.listeners.copyTo(this.pending);
        this.listeners.clear();
    }

    private onClose(wasClean: boolean) {
        this.log("Socket closed %s", wasClean ? "" : "(unclean)");

        for (const listener of this.listeners.allListeners()) {
            catching(() => listener.onState("closed"));
        }

        this.transferListenersToPending();

        if (!wasClean) {
            this.retryCount++;
        }
        this.socket = null;

        this.queueTick();
    }

    private handleEvent(event: MetricsEvent) {
        const [history, wasAdded] = this.histories.add(event);
        // Filter out duplicate events
        if (!wasAdded) {
            return;
        }

        // TODO: Have sort of rate-limiting / batching for events?
        const listeners = this.listeners.lookup(event.host, event.service as MetricId);
        if (listeners) {
            for (const listener of listeners) {
                catching(() => listener.onMessage(event, history));
            }
        }
    }

    addListener(host: HostId, metric: MetricId, listener: MetricListener) {
        invariant(this.metricSet.metrics.includes(metric), "Metric not in this manager");
        invariant(!this.disposed, "Cannot add listeners to a disposed manager");

        if (this.listeners.addLive(host, metric, listener)) {
            // XXX: This should queue a microtask but that requires also checking for interfering removals
            catching(() => listener.onState("open"));
        } else {
            this.pending.add(host, metric, listener);
            this.queueTick();
        }

        return () => {
            // It is in only one of these sets, but call `remove` on both for simplicity
            this.listeners.remove(host, metric, listener);
            this.pending.remove(host, metric, listener);
            // TODO: Garbage collect when no listeners
        };
    }
}

export class MetricsManager {
    private readonly sockets = new Map<MetricSet, Map<LocationSlug, SocketManager>>();
    public readonly histories = new Histories();
    private readonly sets = [cpuSet, otherSet];

    private disposed = false;

    constructor(private readonly socketFactory: SocketFactory) {}

    private getMetricSet(metric: MetricId): MetricSet {
        return notNull(
            this.sets.find((set) => set.metrics.includes(metric)),
            "No set for metric",
        );
    }

    private getManager(location: LocationSlug, metricSet: MetricSet) {
        invariant(!this.disposed, "Cannot access a disposed manager");
        return mapGetOrCreate(
            mapGetOrCreate(this.sockets, metricSet, () => new Map()),
            location,
            () => new SocketManager(this.histories, this.socketFactory, location, metricSet),
        );
    }

    addListener(location: LocationSlug, host: HostId, metric: MetricId, listener: MetricListener) {
        return this.getManager(location, this.getMetricSet(metric)).addListener(host, metric, listener);
    }

    dispose() {
        this.disposed = true;
        for (const managers of this.sockets.values()) {
            for (const manager of managers.values()) {
                manager.dispose();
            }
        }
    }
}

// NB: Only allow one manager at a time to avoid leaking resources
const [managerRef] = stable<{ current?: MetricsManager }>("metrics/managerRef", () => ({ current: undefined }));

export const metricsAtom = atom((get) => {
    // Create a new manager on every atom init (which only happens once or on hot reload)
    managerRef.current?.dispose();

    const manager = new MetricsManager(get(socketFactoryAtom));
    managerRef.current = manager;
    return manager;
});
