import { ActivationState, Client, } from '@stomp/stompjs';
import { hODeepObjectMerge } from 'bc$/utils';
import { debounce } from 'lodash';
export class PiStomp {
    topicConfig;
    getConnectHeaders;
    /**
     * 构造函数
     * @param url ws地址
     * @param headers websocket连接使用的headers
     * @param topicConfig 主题配置
     */
    constructor(url, topicConfig, getConnectHeaders) {
        this.topicConfig = topicConfig;
        this.getConnectHeaders = getConnectHeaders;
        this.init(url);
    }
    /**
     * 二级订阅id累加到的最大值
     */
    static maxSubId = 0;
    /**
     * 是否已经断开过
     */
    isDisconnect = false;
    /**
     * 连接客户端
     */
    client = new Client();
    /**
     * 连接成功后的回调数组
     */
    cbConnects = [];
    /**
     * 断开连接超时判断
     */
    tmo4Deactivate = null;
    /**
     * key为topic name
     * value定义为元组
     *   一级订阅成功后返回的id
     *   用户订阅的实体类
     */
    dicSubscribe = new Map();
    /**
     * 发送订阅参数函数map
     */
    mapSubscribeParams = new Map();
    /**
     * 初始化
     * @param url websocket服务端地址
     */
    init(url) {
        const { client } = this;
        client.brokerURL = url;
        //TODO:调试使用
        // client.debug = (msg) => console.warn(msg);
        client.beforeConnect = () => {
            //websocket连接使用的headers
            const headers = this.getConnectHeaders?.();
            headers && (client.connectHeaders = headers);
        };
        client.onWebSocketError = this.onWebSocketError.bind(this);
        client.onStompError = this.onStompError.bind(this);
        client.onConnect = this.onConnect.bind(this);
        client.onDisconnect = this.onDisconnect.bind(this);
        client.onWebSocketClose = this.onWebSocketClose.bind(this);
    }
    /**
     * 执行连接
     * @param times 循环执行次数
     */
    activate(times = 0) {
        if (times >= 10) {
            console.error('执行连接10次,而连接仍然在断开中！');
            return;
        }
        const { client } = this;
        //执行连接
        switch (client.state) {
            case ActivationState.INACTIVE:
                return client.activate();
            case ActivationState.ACTIVE:
                return;
            case ActivationState.DEACTIVATING:
                setTimeout(() => {
                    this.activate(times + 1);
                }, 100);
                return;
        }
    }
    /**
     * 断开连接
     */
    deactivate() {
        return this.client.deactivate();
    }
    onWebSocketError(evt) {
        // Message.error('连接websocket服务器失败！');
        console.warn('连接websocket服务器失败！', evt);
    }
    onStompError(receipt) {
        // 如果在Broker遇到错误，将被调用
        // 错误的登录名/密码通常会导致错误
        // brokers会在消息头设置简短的消息
        // console.log('Broker reported error: ' + frame.headers['message']);
        // console.log('Additional details: ' + frame.body);
        // Message.error('Stomp发生未知异常！');
        console.warn('Stomp发生未知异常！', receipt);
    }
    /**
     * 断开连接
     */
    onDisconnect() {
        this.isDisconnect = true;
    }
    /**
     * socket断开连接
     */
    onWebSocketClose() {
        this.isDisconnect = true;
    }
    /**
     * 连接成功后的回调
     */
    onConnect() {
        //执行待执行的回调
        const { cbConnects } = this;
        if (cbConnects.length) {
            let cb;
            while ((cb = cbConnects.shift())) {
                cb();
            }
        }
        if (!this.isDisconnect) {
            return;
        }
        this.isDisconnect = false;
        //搞定重新订阅,修改一级订阅ID并重新发起修改参数
        const { client, dicSubscribe } = this;
        const entries = dicSubscribe.entries();
        for (const [topic, subscribeInfo] of entries) {
            //获取请求头参数
            let [, headers] = subscribeInfo;
            if (typeof headers === 'function') {
                headers = headers();
            }
            //向服务端发起订阅
            const stompSub = client.subscribe(topic, this.receive.bind(this), headers);
            //一级订阅id
            subscribeInfo[0] = stompSub.id;
            // 修改订阅参数
            this.sendSubscribeParams(topic)();
        }
    }
    /**
     * 执行订阅
     * @param subOptions 订阅参数对象
     */
    subscribe(subOptions) {
        const { client } = this;
        return new Promise((resolve, reject) => {
            //未连接则执行连接后重新订阅
            if (!client.connected) {
                this.cbConnects.push(() => {
                    this.subscribe(subOptions)
                        .then((result) => {
                        resolve(result);
                    })
                        .catch((err) => {
                        console.warn('订阅消息失败！', err);
                        reject();
                    });
                });
                this.activate();
                return;
            }
            const { topic, callback, headers, params, customId } = subOptions;
            const { dicSubscribe } = this;
            let subscribeInfo = dicSubscribe.get(topic);
            const subId = PiStomp.maxSubId++;
            const options = {
                customId,
                subId,
                params,
                callback: callback.bind(this),
            };
            if (subscribeInfo) {
                //修改订阅请求头
                subscribeInfo[1] = headers;
                //记录全部订阅参数
                subscribeInfo[2].push(options);
            }
            else {
                //执行订阅
                const stompSub = client.subscribe(topic, this.receive.bind(this), headers);
                //写入本地记录
                subscribeInfo = [stompSub.id, headers, [options]];
                dicSubscribe.set(topic, subscribeInfo);
            }
            // 修改订阅参数
            this.sendSubscribeParams(topic)();
            // 订阅返回的数据
            const result = {
                id: subscribeInfo[0],
                subId,
                unsubscribe: () => {
                    this.unsubscribe({ subId });
                },
            };
            resolve(result);
        });
    }
    /**
     * 后端websocket推送过来的消息
     * @param message 消息内容
     */
    receive(message) {
        const { topic, data } = JSON.parse(message.body || '{}');
        if (!topic) {
            return;
        }
        const config = this.topicConfig[topic];
        config?.cbReceive?.(data, this.dicSubscribe.get(topic)[2]);
    }
    /**
     * 合并参数并执行发送到后台
     * @param topic 订阅主题
     */
    sendSubscribeParams(topic) {
        //字典
        const { mapSubscribeParams } = this;
        //防抖函数
        let fn = mapSubscribeParams.get(topic)?.fn;
        if (!fn) {
            fn = debounce(() => {
                //取消之前的请求
                let subscribeParams = mapSubscribeParams.get(topic);
                subscribeParams.controller?.abort();
                // 获取待发送的参数
                let subscribeInfo = this.dicSubscribe.get(topic);
                if (!subscribeInfo) {
                    return;
                }
                //用于取消fetch请求
                const controller = new AbortController();
                const { signal } = controller;
                subscribeParams.controller = controller;
                //优先取最后一次订阅传递的headers
                let headers = subscribeInfo[1];
                // 调用接口发送参数
                const { topicConfig } = this;
                headers = headers ?? topicConfig[topic].headers;
                if (typeof headers === 'function') {
                    headers = headers();
                }
                const headersDefault = {
                    'content-type': 'application/json',
                };
                headers = hODeepObjectMerge({}, headersDefault, headers);
                const ptPaths = subscribeInfo[2].map((p) => p.params);
                // TODO:此处后台设计如此,未来考虑改为websocket.send
                //发起请求
                fetch(topicConfig[topic].paramsChangeUrl, {
                    signal,
                    method: 'POST',
                    headers,
                    body: JSON.stringify({
                        topic,
                        ptPaths,
                    }),
                })
                    .finally(() => {
                    //清除controller
                    delete subscribeParams.controller;
                })
                    .then((res) => res.json())
                    .catch((err) => {
                    // Message.error('修改订阅参数失败！');
                    console.warn('修改订阅参数失败！', topic, ptPaths, err);
                });
            }, 100);
            //保存到map字典
            mapSubscribeParams.set(topic, { fn });
        }
        return fn;
    }
    /**
     * 取消订阅
     * @param param0 可以包含主题和一级/二级订阅id
     */
    unsubscribe({ topic, id, subId, subIds }) {
        const { client, dicSubscribe } = this;
        //通过主题取消全部订阅
        if (topic) {
            let subscribeInfo = dicSubscribe.get(topic);
            if (!subscribeInfo) {
                return;
            }
            client.unsubscribe(subscribeInfo[0]);
            dicSubscribe.delete(topic);
        }
        //通过一级订阅id取消全部订阅
        else if (typeof id === 'number') {
            const entries = dicSubscribe.entries();
            for (const kv of entries) {
                const [key, value] = kv;
                if (value?.[0] === id) {
                    client.unsubscribe(id);
                    dicSubscribe.delete(key);
                    break;
                }
            }
        }
        //根据二级订阅id取消
        else if (typeof subId === 'number') {
            let subscribeInfo = undefined;
            const entries = dicSubscribe.entries();
            for (const kv of entries) {
                const [key, value] = kv;
                const index = value[2].findIndex((p) => p.subId === subId);
                if (~index) {
                    subscribeInfo = dicSubscribe.get(key);
                    subscribeInfo[2].splice(index, 1);
                    if (subscribeInfo[2].length <= 0) {
                        client.unsubscribe(subscribeInfo[0]);
                        dicSubscribe.delete(key);
                    }
                    else {
                        // 修改订阅参数
                        this.sendSubscribeParams(key)();
                    }
                    break;
                }
            }
        }
        //根据多个二级订阅id取消
        else if (Array.isArray(subIds) && subIds.length) {
            const entries = dicSubscribe.entries();
            //key为主题名称
            //value[0]为一级订阅id
            //value[1]为订阅headers
            //value[2]为二级订阅相关信息,包括二级订阅id/订阅参数/订阅回调等
            for (const [key, [, , subOptions]] of entries) {
                const { length } = subOptions;
                for (let index = length - 1; index >= 0; index--) {
                    const { subId } = subOptions[index];
                    //有该二级订阅id则取消
                    if (subIds.some((p) => p === subId)) {
                        subOptions.splice(index, 1);
                    }
                }
                if (subOptions.length <= 0) {
                    client.unsubscribe(key);
                    dicSubscribe.delete(key);
                }
                else {
                    // 修改订阅参数
                    this.sendSubscribeParams(key)();
                }
            }
        }
        //设置超时以节省资源,避免频繁连接
        clearTimeout(this.tmo4Deactivate);
        this.tmo4Deactivate = setTimeout(() => {
            //没有订阅任何内容则断开连接以节省资源
            if (!dicSubscribe.size) {
                this.deactivate().catch((err) => console.error(err));
            }
        }, 1e4);
    }
    /**
     * 销毁实例
     */
    destroy() {
        const { client, unsubscribe } = this;
        const keys = this.dicSubscribe.keys();
        for (const key of keys) {
            unsubscribe.call(this, { topic: key });
        }
        if (!client.connected) {
            this.deactivate().catch((err) => console.error(err));
        }
    }
}
