import invariant from "tiny-invariant";
import { notifyError } from "../modules/error/errorReporting.tsx";

/**
 * A stream of messages for a single listener. Buffers all messages received until the listener subscribes.
 */
export class MessageStream<T> {
    private readonly buffer: T[] = [];
    private listener: ((message: T) => void) | undefined = undefined;

    private publishBuffer() {
        queueMicrotask(() => {
            const { buffer } = this;
            let i = 0;
            for (; i < buffer.length; i++) {
                if (!this.callListener(buffer[i])) {
                    break;
                }
            }
            buffer.splice(0, i);
        });
    }

    /** @returns if the message was sent to the listener */
    private callListener(message: T) {
        try {
            // Might have been unsubscribed while handling a message
            if (!this.listener) {
                return false;
            }
            this.listener(message);
            return true;
        } catch (e) {
            console.error("Error in message listener", e);
            notifyError(e, { tags: ["callListener"] });
            // Errors count as handled messages
            return true;
        }
    }

    subscribe(listener: (message: T) => void) {
        invariant(!this.listener, "Listener already subscribed");
        this.listener = listener;
        this.publishBuffer();
        return () => {
            this.listener = undefined;
        };
    }

    publish(message: T) {
        // Ensure that the message is sent after the current event loop to prevent React state updates while eg. rendering (since messages can be logged from render methods)
        queueMicrotask(() => {
            if (!this.callListener(message)) {
                this.buffer.push(message);
            }
        });
    }
}
