import { partition } from 'lodash';
import { ReadableIterator } from './readable-iterator';
import { MqttDataPacket, MqttStorage, MqttStorePacket } from './mqtt-storage';

type PacketCallback = (error?: Error | null, packet?: MqttStorePacket | null) => void;

const PACKET_EXPIRES_AFTER_MS = 1800 * 1000;

/**
 * Persistent MQTT incoming/outgoing message store.
 * Adapted from ./node_modules/mqtt/lib/store.js, which is an in-memory store only.
 * This requires an MqttStorage instance to be provided, which is a simplified layer
 * that only needs to handle the raw saving/retrieving of data to the native platform.
 *
 * @export
 * @class MqttStore
 * @implements {Store}
 */
export class MqttStore {
    constructor(private type: 'incoming' | 'outgoing', private storage: MqttStorage) {
    }

    async clear() {
        await this.storage.clear();
    }

    public async put(packet: MqttStorePacket, expirationMs?: any, cb?: () => void) {
        if (typeof expirationMs === 'function') {
            cb = expirationMs;
            expirationMs = undefined;
        }
        if (this.type === 'outgoing') packet.expiresAt = new Date(new Date().getTime() + (expirationMs ?? PACKET_EXPIRES_AFTER_MS)).toISOString();
        const data = this.serializePacket(packet);
        await this.storage.set(this.getPacketKey(packet), data);
        cb?.();
        return this;
    }

    // called at startup only, so this is a good time to purge expired data
    public async createStream() {
        const allData: MqttStorePacket[] = (await this.storage.getAll()).map(d => this.deserializePacket(d)).filter(p => p !== null);
        const [expiredPackets, notExpiredPackets] = partition(allData, packet => this.isExpired(packet));
        expiredPackets.forEach(packet => this.del(packet));
        return new ReadableIterator(notExpiredPackets.values());
    }

    // cb must be "Function" to match implemented class definition
    // eslint-disable-next-line @typescript-eslint/no-unsafe-function-type
    public async del(packet: MqttStorePacket, cb?: PacketCallback | Function) {
        const packetKey = this.getPacketKey(packet);
        const foundPacket = await this.storage.get(packetKey);
        if (foundPacket) {
            this.storage.remove(packetKey);
            cb?.(null, packet);
        } else {
            cb?.(new Error('missing packet'));
        }
        return this;
    }

    // cb must be "Function" to match implemented class definition
    // eslint-disable-next-line @typescript-eslint/no-unsafe-function-type
    public async get(packet: MqttStorePacket, cb?: PacketCallback | Function) {
        const data = await this.storage.get(this.getPacketKey(packet));
        if (packet) {
            const parsed = this.deserializePacket(data);
            if (parsed !== null) {
                cb?.(null, parsed);
                return this;
            }
        }
        cb?.(new Error('missing packet'));
        return this;
    }

    // cb must be "Function" to match implemented class definition
    // eslint-disable-next-line @typescript-eslint/no-unsafe-function-type
    public close(cb?: Function): void {
        cb?.();
    }

    private getPacketKey(packet: MqttStorePacket): string {
        return `${packet.messageId}`;
    }

    private isExpired(packet: MqttStorePacket): boolean {
        if (!packet.expiresAt) return false;
        const now = new Date().getTime();
        const expiresAt = new Date(packet.expiresAt).getTime();
        return expiresAt <= now;
    }

    private serializePacket(packet: MqttStorePacket): string {
        let payloadType = 'none';
        let payload = undefined;
        if (this.hasPayload(packet)) {
            if (Buffer.isBuffer(packet.payload)) {
                payloadType = 'raw';
                payload = packet.payload.toString('base64');
            } else {
                payloadType = 'object';
                payload = packet.payload;
            }
        }
        return JSON.stringify({
            ...packet,
            payloadType,
            payload,
        });
    }

    private deserializePacket(data: string): MqttStorePacket | null {
        try {
            const packet: MqttStorePacket = JSON.parse(data);
            if (packet.payloadType === 'raw') {
                packet.payload = Buffer.from(packet.payload, 'base64');
            }
            return packet;
        } catch (e) {
            return null;
        }
    }

    private hasPayload(packet: MqttStorePacket): packet is MqttStorePacket & MqttDataPacket {
        return !!(packet as MqttDataPacket).payload;
    }
}
