import { WsEventTypes } from "../common/constants/wsEvents";
import { WsMessage } from "./socketProvider";
import * as _ from "lodash";

type WsListener = (events: WsMessage<any>[]) => void;

interface Subscriber {
	types: WsEventTypes[];
	listener: WsListener;
}

class WSPublisher {
	private release_interval_ms = 1000;

	private timeout: any = null;

	private subscribers: Subscriber[] = [];

	private accumulator: WsMessage<any>[] = [];

	constructor() {
		this.release = this.release.bind(this);
		this.schedule();
	}

	subscribe(types: WsEventTypes[], listener: WsListener) {
		this.subscribers.push({ types, listener });
		return () => this.unsubscribe(listener);
	}

	unsubscribe(subscriber: WsListener) {
		this.subscribers = this.subscribers.filter(
			({ listener }) => listener !== subscriber
		);
	}

	publish(payload: WsMessage<any>) {
		this.accumulator.push(payload);
	}

	private schedule() {
		this.timeout = setTimeout(this.release, this.release_interval_ms);
	}

	private release() {
		clearTimeout(this.timeout);
		const totalAccumulatedMessages = this.accumulator.length;
		if (totalAccumulatedMessages > 0) {
			console.log(
				"releasing websocket messages, count:",
				_.countBy(this.accumulator, "type"),
				"total",
				totalAccumulatedMessages
			);
			console.log({ messages: this.accumulator });
			this.subscribers.forEach(({ types, listener }) => {
				const messages = this.accumulator.filter(({ type }) =>
					types.includes(type)
				);
				if (messages.length) {
					listener(messages);
				}
			});
			this.accumulator = [];
		}
		// aggressively increase receiving window under load to keep things fast when no load
		// but ensure batching at least some messages when they burst
		this.release_interval_ms = Math.max(
			200,
			Math.min(
				3500,
				totalAccumulatedMessages * totalAccumulatedMessages * 1000
			)
		);
		this.schedule();
	}
}

const wsPublisher$ = new WSPublisher();

export default wsPublisher$;
