import { connect as connectMqtt, Client, QoS } from 'mqtt';
import { BehaviorSubject, Observable, Subject, Subscription, defer, filter, finalize, map, shareReplay } from 'rxjs';
import { MqttQos } from './mqtt-qos';
import { MqttStorage } from './persistence/mqtt-storage';
import { sleep } from '@weavix/utils/src/sleep';
import { MqttTransformationType } from './mqtt-transformation-type';
import { MqttStore } from './persistence/mqtt-store';

export interface TopicPayload<T> {
    topic: string; // full topic string
    replacements: string[]; // Regex group matches (replacements[0] is full topic string, replacements[1] is first wildcard, replacements[2] is second wildcard, etc.)
    payload: T;
    dup: boolean;
    networkId: number;
    networkType: 'wifi' | 'mobile';
}

export interface TopicSubscription<T> {
    observable: ObservableSubscribe<TopicPayload<T>>;
    qos: MqttQos;
    resolve?: Function;
    reject?: Function;
}

export type ObservableSubscribe<T> = Observable<T> & { subscribed: Promise<boolean> };

const REFRESH_TIME = 1800000; // Subscriptions error out and the client starts a fresh connection after this long
const timeout = async (errorMessage: string = 'timeout') => {
    const error = new Error(`mqtt-service: ${errorMessage}`);
    await sleep(10000);
    throw error;
};

interface MqttWifiNetwork {
    type: 'wifi';
    ssid?: string;
    mac?: string;
    ip?: string;
}

interface MqttMobileNetwork {
    type: 'mobile';
    carrier?: string;
    imei?: string;
    iccid?: string;
    /** One-indexed. */
    slot?: number;
}

export type MqttNetwork = MqttWifiNetwork | MqttMobileNetwork;

export interface MqttServiceOptions {
    connect?: (options: any) => Client,
    url: string,
    ssl?: boolean,
    timeout?: number,
    port?: number,
    source: string, // tells what kind of mqtt client is connecting for communication reports
    token?: string,
    platform?: string,
    storage: MqttStorage,
    messageIdProvider?: any,
    networkId?: number;
    network?: MqttNetwork;
    clientId?: string;
    deviceId?: string;
    maxReconnectTimeout?: number; // exponentially backoff reconnecting to this max
}

interface Packet {
    topic: string;
    qos: QoS | -1;
    date: number;
}

export class MqttService {
    private static replayableSubscriptions: { [topic: string]: ObservableSubscribe<TopicPayload<any>> } = {};

    static subscribeWithReplay<T>(topic: string, bufferSize: number | undefined, getSubscription: () => ObservableSubscribe<TopicPayload<T>>) {
        if (!this.replayableSubscriptions[topic]) {
            const subscription = getSubscription();
            const replayedSubscription = subscription.pipe(shareReplay({ bufferSize, refCount: false }));
            let sub: Subscription | undefined = replayedSubscription.subscribe(() => { });
            let count = 0;
            const observedSubscription = defer(() => {
                if (count === 0 && this.replayableSubscriptions[topic] !== assignedSubscription) {
                    this.replayableSubscriptions[topic] = assignedSubscription;
                }
                count++;

                sub?.unsubscribe();
                sub = undefined;
                return replayedSubscription.pipe(finalize(() => {
                    count--;
                    if (count <= 0 && this.replayableSubscriptions[topic] === assignedSubscription) {
                        delete this.replayableSubscriptions[topic];
                    }
                }));
            });
            const assignedSubscription = Object.assign(observedSubscription, { subscribed: subscription.subscribed });
            this.replayableSubscriptions[topic] = assignedSubscription;
        }
        return this.replayableSubscriptions[topic];
    }

    private queued: Array<Packet> = [];
    private transit: Array<Packet> = [];

    private working = false;

    // currently connected client
    private client?: Client | null;
    // pending client in process of connecting, either due to disconnection or connection change
    private pendingClient?: Client | null;
    // promise for resolving when a connected client is available
    private clientResolve?: (client: Client) => void;
    private clientPromise = new Promise<Client>(resolve => this.clientResolve = resolve);

    private subscriptions: { [topic: string]: TopicSubscription<any> } = {};
    public message$: Subject<{ topic: string; payload: Buffer; dup: boolean; networkId?: number, networkType?: 'wifi' | 'mobile' }> = new Subject();

    public connected$ = new BehaviorSubject<boolean>(false);
    public socket$ = new BehaviorSubject<any>(null);
    private storage: MqttStore;
    private lastConnected?: number | null;
    private lastSubscriptionRefresh?: number | null;
    private refreshingSubscriptions = false;
    public clientId?: string;
    private clientPrefix: string;
    private connectCount = 0;
    private reconnectInterval = 1000;

    private refreshTimeout?: ReturnType<typeof setTimeout>;
    private reconnectTimeout?: ReturnType<typeof setTimeout>;

    public constructor(private options: MqttServiceOptions) {
        this.storage = new MqttStore('outgoing', options.storage);
        if (!options.platform) options.platform = 'unknown';
        this.clientPrefix = `${this.options.platform}_${Math.random()}`;
    }

    private log(...args: unknown[]) {
        console.log(`MQTT ${this.options?.networkId ?? 'default'} ${this.socket$.value?._socketId ?? ''}:`, ...args);
    }

    private error(...args: unknown[]) {
        console.error(`MQTT ${this.options?.networkId ?? 'default'} ${this.socket$.value?._socketId ?? ''}:`, ...args);
    }

    private warn(...args: unknown[]) {
        console.warn(`MQTT ${this.options?.networkId ?? 'default'} ${this.socket$.value?._socketId ?? ''}:`, ...args);
    }

    /**
     * Connects/reconnects to the mqtt server.
     * This should be called whenever a user logs in.
     *
     * @param {string} token The bearer token for authentication
     * @memberof MqttService
     */
    connect() {
        this.clientId = this.options.clientId ?? `${this.clientPrefix}_${this.connectCount++}`;
        this.disconnect();
        this.newConnection();
    }

    reconnect() {
        this.log('force reconnect');
        this.newConnection();
    }

    private newConnection() {
        this.log(`options: ${this.options.url} ${this.options.port} ${this.clientId} ${this.options.networkId}`);
        if (this.checkForStaleConnection()) {
            return;
        }

        const port = Number(this.options.port ?? '443');
        const client = (this.options.connect ?? connectMqtt)({
            hostname: this.options.url,
            port,
            path: this.buildPath(),
            protocol: (port === 443 || this.options.ssl) ? 'wss' : 'ws',
            clean: false,
            keepalive: this.options.timeout ?? 240,
            clientId: this.clientId,
            username: '',
            password: this.options.token ?? '',
            reschedulePings: false,
            reconnectPeriod: 0,
            messageIdProvider: this.options.messageIdProvider
        });

        ['connect', 'disconnect', 'error', 'close', 'end', 'offline', 'outgoingEmpty'].forEach(evt => {
            client.on(evt, (...args: unknown[]) => this.log(evt, ...args));
        });
        client.on('reconnect', () => this.log(`reconnect`, client.options));
        client.on('packetreceive', () => {
            if (this.client !== client) return;

            // might receive erroneous packets from sleeping / waking computer, so catch that here
            if (this.checkForStaleConnection()) {
                return;
            }

            // Using packet receive since if there are no publish/subscribe messages, this gets ping acks
            this.lastConnected = new Date().getTime();
        });

        client.on('message', (topic: string, payload: Buffer, packet) => {
            this.log(`Received message ${topic} ${payload.length}`);
            this.message$.next({ topic, payload, dup: packet.dup, networkId: this.options.networkId, networkType: this.options.network?.type });
        });

        const refreshSubscriptions = () => {
            if (client !== this.client) return;

            Object.keys(this.subscriptions).forEach(topic => {
                this.queued.push({ topic, qos: this.subscriptions[topic].qos, date: new Date().getTime() });
            });
            this.refreshingSubscriptions = true;
            this.work();
            this.refreshTimeout = setTimeout(refreshSubscriptions, REFRESH_TIME);
        };

        clearTimeout(this.reconnectTimeout);
        client.on('close', () => {
            if (client !== this.client && client !== this.pendingClient) return;
            if (client === this.client) {
                this.client = null;
                this.connected$.next(false);
                this.socket$.next(undefined);
                clearTimeout(this.refreshTimeout);
                if (this.pendingClient) return;
            } else if (client === this.pendingClient) {
                this.pendingClient = null;
            }
            this.reconnectTimeout = setTimeout(() => {
                this.reconnectInterval = Math.min(this.options.maxReconnectTimeout ?? 1000, this.reconnectInterval * 2);
                this.newConnection();
            }, this.reconnectInterval);
        });
        client.on('connect', () => {
            if (client !== this.pendingClient) return;

            const previousClient = this.client;
            this.pendingClient = null;
            this.client = client;
            if (this.clientResolve) this.clientResolve(client);
            this.clientPromise = new Promise(resolve => this.clientResolve = resolve);
            // make sure only one connected client (note the backend also closes the old connection due to matching clientId)
            previousClient?.end(true);

            this.reconnectInterval = 1000;
            this.log(`connected`);
            this.socket$.next((client as any).stream.socket);
            this.connected$.next(true);
            this.reset();
            if (!this.lastSubscriptionRefresh) this.lastSubscriptionRefresh = new Date().getTime();
            const timeSinceRefresh = new Date().getTime() - this.lastSubscriptionRefresh;
            clearTimeout(this.refreshTimeout);
            this.refreshTimeout = setTimeout(refreshSubscriptions, Math.max(1, REFRESH_TIME - timeSinceRefresh));
        });

        const previousPending = this.pendingClient;
        this.pendingClient = client;
        // make sure only one pending client
        previousPending?.end(true);

        this.log('done initializing');
    }

    private buildPath(): string {
        const params = new URLSearchParams({
            source: this.options.source,
        });

        // We patched react-native to pull the networkId parameter and make the ws
        // connection over that network.
        if (this.options.networkId) params.append('networkId', this.options.networkId.toString());

        if (this.options.deviceId) params.append('deviceId', this.options.deviceId);

        this.addNetworkParams(params);

        return '/mqtt?' + params.toString();
    }

    private addNetworkParams(params: URLSearchParams) {
        try {
            if (!this.options.network?.type) return;

            const networkType = this.options.network.type;
            params.append('networkType', networkType);

            if (networkType === 'mobile') {
                if (this.options.network.carrier) params.append('carrier', this.options.network.carrier);
                if (this.options.network.imei) params.append('imei', this.options.network.imei);
                if (this.options.network.iccid) params.append('iccid', this.options.network.iccid);
                if (this.options.network.slot) params.append('slot', this.options.network.slot.toString());
            } else if (networkType === 'wifi') {
                if (this.options.network.ssid) params.append('ssid', this.options.network.ssid);
                if (this.options.network.mac) params.append('mac', this.options.network.mac);
                if (this.options.network.ip) params.append('ip', this.options.network.ip);
            }
        } catch (e) {
            this.error('Failed to add network info to query string parameters.', e);
        }
    }

    private checkForStaleConnection() {
        if (this.lastConnected && new Date().getTime() - this.lastConnected > REFRESH_TIME
            || this.lastSubscriptionRefresh && new Date().getTime() - this.lastSubscriptionRefresh > 2 * REFRESH_TIME - 5 * 60_000) {
            this.connect();
            return true;
        }
        return false;
    }

    /**
     * Publishes data to the given topic
     *
     * @param {string} topic The resolved topic (mqtt wildcard characters should not be present)
     * @param {unknown} data The payload
     * @param {MqttQos} [qos=1] The delivery guarantee
     * @memberof MqttService
     */
    publish(topic: string, data: unknown, qos: MqttQos = 1, expirationMs?: number) {
        const stringified = typeof data === 'string' ? data : JSON.stringify(data || {});
        return this.publishRaw(topic, stringified, qos, expirationMs);
    }

    /**
     * Publishes data to the given topic
     *
     * @param {string} topic The resolved topic (mqtt wildcard characters should not be present)
     * @param {unknown} data The payload
     * @param {MqttQos} [qos=1] The delivery guarantee
     * @memberof MqttService
     */
    async publishRaw(topic: string, data: string | Buffer, qos: MqttQos = 1, expirationMs?: number) {
        this.log(`Publishing message ${topic}`, data.length);
        const messageId = `${Math.random()}`.substring(2);
        const packet = { messageId, topic, qos, payload: data };
        if (qos > 0) this.storage.put(packet as any, expirationMs);
        return this.publishImpl(messageId, topic, data, qos);
    }

    private async publishImpl(messageId: string, topic: string, data: string | Buffer, qos: MqttQos = 1, dup = false) {
        if (!this.connected$.value) return;

        const client = await this.waitForClient();
        const promise = new Promise((resolve, reject) => {
            client.publish(topic, data, { qos: Math.abs(qos) as any, dup }, (err) => {
                if (err) {
                    reject(err);
                    this.error(err);
                    return;
                }
                if (qos > 0) this.storage.del({ messageId } as any);
                resolve(null);
            });
        });
        const timeoutMessage = `Publish timeout ${messageId} ${topic} ${data}`;
        await Promise.race([timeout(timeoutMessage), promise]);
    }

    /**
     * Subscribes to the given topic.
     *
     * @template T The type of payload that will be delivered on this topic
     * @param {string} topic The resolved topic (mqtt wildcard characters should not be present)
     * @param {QoS} [qos=1] The receipt guarantee
     * @return {*}  {ObservableSubscribe<T>}
     * @memberof MqttService
     */
    subscribe<T>(topic: string, qos: QoS = 1, transformation: MqttTransformationType = MqttTransformationType.json): ObservableSubscribe<TopicPayload<T>> {
        return this.subscribeWithTopic<T>(topic, qos, transformation);
    }

    /**
     * Subscribes to the given topic.
     * Use this endpoint when you need metadata about the topic returned in
     * addition to the payload data.
     *
     * @template T The type of payload that will be delivered on this topic
     * @param {string} topic The resolved topic (mqtt wildcard characters should not be present)
     * @param {QoS} [qos=1] The receipt guarantee
     * @return {*}  {ObservableSubscribe<TopicPayload<T>>}
     * @memberof MqttService
     */
    subscribeWithTopic<T>(topic: string, qos: QoS = 1, transformation: MqttTransformationType = MqttTransformationType.json): ObservableSubscribe<TopicPayload<T>> {
        let observable: ObservableSubscribe<TopicPayload<T>> = this.subscriptions[topic]?.observable;
        if (!observable) {
            const regex = new RegExp(`^${topic
                .replace(/\./g, '\\.').replace(/\(/g, '\\(').replace(/\)/g, '\\)').replace(/\*/g, '\\*')
                .replace(/#/g, '(.*)').replace(/(\^|\+)/g, '([^/]+)')}$`);
            let count = 0;
            observable = defer(() => {
                if (count === 0 && this.subscriptions[topic]?.observable !== observable) {
                    // if we unsubscribe and resubscribe right away, need to re-initialize the observable
                    init();
                }
                count++;
                return this.message$.pipe(map(v => {
                    const replacements = v.topic.match(regex);
                    if (!replacements) return;
                    const payload = transformation === MqttTransformationType.json ? JSON.parse(v.payload.toString('utf8')) : v.payload;
                    try {
                        return { replacements, topic: v.topic, payload, dup: v.dup, networkId: v.networkId, networkType: v.networkType };
                    } catch (e) {
                        this.error(e);
                    }
                }), filter(v => !!v), finalize(() => {
                    count--;
                    if (count <= 0 && this.subscriptions[topic]?.observable === observable) {
                        delete this.subscriptions[topic];
                        this.queued.push({ topic, qos: -1, date: new Date().getTime() });
                        this.work();
                    }
                }));
            }) as unknown as ObservableSubscribe<TopicPayload<T>>;
            let resolve: (value: boolean) => void;
            let reject: (reason?: unknown) => void;
            observable.subscribed = new Promise((res, rej) => {
                resolve = res;
                reject = rej;
            });
            const init = () => {
                this.subscriptions[topic] = {
                    observable,
                    qos,
                    resolve,
                    reject
                };
                this.queued.push({ topic, qos, date: new Date().getTime() });
                this.work();
            };
            init();
        }
        return observable;
    }

    disconnect() {
        this.log(`shutting down client`);
        const client = this.client;
        const pendingClient = this.pendingClient;
        const subscriptions = this.subscriptions;
        const message$ = this.message$;
        this.client = null;
        this.pendingClient = null;
        this.lastConnected = null;
        this.lastSubscriptionRefresh = null;
        this.refreshingSubscriptions = false;
        this.message$ = new Subject();
        this.subscriptions = {};
        this.queued = [];
        this.transit = [];
        client?.end(true);
        pendingClient?.end(true);
        this.connected$.next(false);
        this.socket$.next(undefined);
        clearTimeout(this.refreshTimeout);
        clearTimeout(this.reconnectTimeout);
        Object.values(subscriptions).forEach(x => {
            x.reject?.('mqtt client closed');
        });
        message$.error('disconnected'); // Run last so old client stuff is not picked up
    }

    clearStorage() {
        this.storage.clear();
    }

    private async reset() {
        // Anything in transit goes back to the front of the queue, other queued stuff still remains
        this.queued = [...this.transit, ...this.queued];
        this.transit = [];

        // Send any unconfirmed publishes
        const stream = await this.storage.createStream();
        while (true) {
            const packet = stream.read(1);
            if (!packet) break;
            this.publishImpl(packet.messageId, packet.topic, packet.payload, packet.qos, true);
        }
        this.work(true);
    }

    private async work(immediate = false) {
        if (this.working) return;
        this.working = true;

        try {
            if (!this.connected$.value) return; // Don't send anything til connected
            if (!immediate) await sleep(50); // Give some time to batch subscription requests

            const client = await this.waitForClient();
            const subs: { [topic: string]: { qos: QoS } } = {};
            const unsubs: { [topic: string]: -1 } = {};
            const transit = this.queued;
            this.queued = [];
            transit.forEach(x => {
                if (x.qos === -1) {
                    delete subs[x.topic];
                    unsubs[x.topic] = x.qos;
                } else {
                    delete unsubs[x.topic];
                    subs[x.topic] = { qos: x.qos };
                }
            });
            this.transit.push(...transit);
            if (Object.keys(subs).length) {
                client.subscribe(subs, (err, granted) => {
                    if (err) {
                        return this.error(err);
                    }

                    if (client !== this.client) return;

                    if (this.checkForStaleConnection()) {
                        return;
                    }

                    if (this.refreshingSubscriptions) {
                        this.refreshingSubscriptions = false;
                        this.lastSubscriptionRefresh = new Date().getTime();
                    }

                    // Only remove transit added from here (future transit could have unsubbed the same topic)
                    this.transit = this.transit.filter(x => !transit.some(y => y === x) || !subs[x.topic]);

                    this.log(`Subscribed to ${Object.keys(subs).join(', ')}`);
                    granted?.forEach(x => {
                        if (x.qos === 128) this.warn(`Subscribe not authorized to ${x.topic}`);
                    });

                    Object.keys(subs).forEach(topic => {
                        this.subscriptions[topic]?.resolve?.();
                    });
                });
            }
            if (Object.keys(unsubs).length) {
                client.unsubscribe(Object.keys(unsubs), (err?: Error) => {
                    if (err) {
                        return this.error(err);
                    }

                    if (client !== this.client) return;

                    // Only remove transit added from here (future transit could have subbed the same topic);
                    this.transit = this.transit.filter(x => !transit.some(y => y === x) || !unsubs[x.topic]);

                    this.log(`Unsubscribed from ${Object.keys(unsubs).join(', ')}`);
                });
            }
        } catch (e) {
            this.error(e);
        } finally {
            this.working = false;
        }
    }

    private async waitForClient(): Promise<Client> {
        /*
            wait a few seconds to cover out-of-order things during startup.
            buf if it takes too long, maybe we've logged out or something
        */
        if (this.client) return this.client;
        return Promise.race([
            timeout('client startup timeout'),
            this.clientPromise
        ]);
    }
}
