import { ReplaySubject, Subscription } from 'rxjs';
import { get, isNil, keyBy, set } from 'lodash';
import { SyncEvent, SyncEventStorage, SyncStorage } from './sync.model';
import { SyncService } from './sync.service';
import { mergeCopyArrays } from '@weavix/utils/src/merge-copy-arrays';
import { mongoDbMax } from '@weavix/utils/src/mongodb-max';
import { MetricAggregator } from './metric-aggregator';
import { TelemetryEvent } from '@weavix/models/src/telemetry';

export interface DeduplicationTelemetry {
    topic: string;
    diffMs: number;
    winnerNetwork?: string;
    loserNetwork?: string;
    winnerNetworkType: 'mobile' | 'wifi';
    loserNetworkType: 'mobile' | 'wifi';
}

export class SyncPartition<T extends { id?: string }> {
    readonly loaded$ = new ReplaySubject<boolean>(1);
    readonly reset$ = new ReplaySubject<boolean>(1);

    private subscription?: Subscription;
    private closed = false;
    private updating: Array<SyncEvent<T>> = [];
    private applying = false;
    private simulcastedEventCache: { [key: string]: { date: string, networkId?: number, networkType?: string, id?: string, networkName?: string } } = {};
    private readonly metricAggregator = new MetricAggregator<DeduplicationTelemetry>();
    private readonly missingSyncEventAggregator = new MetricAggregator<{ missingNetwork: 'mobile' | 'wifi' | 'seq' | 'unknown' }>();
    private telemetryInterval?: ReturnType<typeof setInterval>;

    constructor(
        private readonly syncService: SyncService,
        private readonly storage: SyncStorage<T>,
        public readonly topic: string,
        public readonly partition: string,
        private readonly lazy: boolean,
        private readonly simulcast = false,
        private reinitialize = false,
        private readonly eventStorage?: SyncEventStorage<T> | null,
    ) {
        this.subscribeAndGet().catch((e: unknown) => console.error('subscribeAndGet', e));
        if (this.simulcast) {
            this.telemetryInterval = setInterval(() => {
                console.log(`[Sync Partition] checking for deduplication telemetry...`);
                if (this.metricAggregator.dataPointCount > 0) {
                    this.syncService.sendTelemetry(TelemetryEvent.SyncDeduplicate, {
                        date: new Date(),
                        eventCount: this.metricAggregator.dataPointCount,
                        minDiffMs: this.metricAggregator.minOf('diffMs'),
                        maxDiffMs: this.metricAggregator.maxOf('diffMs'),
                        avgDiffMs: this.metricAggregator.avgOf('diffMs'),
                        wifiWins: this.metricAggregator.countOf('winnerNetworkType', 'wifi'),
                        mobileWins: this.metricAggregator.countOf('winnerNetworkType', 'mobile'),
                        winnerNetworks: this.metricAggregator.countOf('winnerNetwork'),
                        loserNetworks: this.metricAggregator.countOf('loserNetwork'),
                        topic: this.topic,
                    });
                    this.metricAggregator.clear();
                }

                if (this.missingSyncEventAggregator.dataPointCount > 0) {
                    this.syncService.sendTelemetry(TelemetryEvent.SyncEventMissing, {
                        date: new Date(),
                        eventCount: this.missingSyncEventAggregator.dataPointCount,
                        topic: this.topic,
                        missingWifi: this.missingSyncEventAggregator.countOf('missingNetwork', 'wifi'),
                        missingMobile: this.missingSyncEventAggregator.countOf('missingNetwork', 'mobile'),
                        missingSeq: this.missingSyncEventAggregator.countOf('missingNetwork', 'seq'),
                    });
                    this.missingSyncEventAggregator.clear();
                }

                const pruned = Object.values(this.simulcastedEventCache).map(x => {
                    if (new Date().getTime() - new Date(x.date).getTime() > 120_000) {
                        this.missingSyncEventAggregator.accumulate({
                            missingNetwork: x.networkType === 'wifi' ? 'mobile' : 'wifi',
                        });
                        return undefined!;
                    }
                    return x;
                }).filter(x => x);

                this.simulcastedEventCache = keyBy(pruned, 'id');
                console.log('[Sync Partition] simulcast event cache pruned.  current size:', pruned.length);
            }, 60_000 * 10);
        }

    }

    async close(reset: boolean) {
        if (this.closed) return;
        this.closed = true;

        console.log(`Clearing sync partition ${this.partition}`);
        this.unsubscribe();
        this.loaded$.next(true);

        if (reset) await this.eventStorage?.close(this.partition);
    }

    async subscribeAndGet() {
        const missing = this.reinitialize || !this.eventStorage ? [] : await this.eventStorage.getMissingSequences(this.partition);
        const { date, seq } = this.reinitialize || typeof missing === 'number' || !this.eventStorage ? { date: null, seq: null } : await this.eventStorage.getDateSequence(this.partition);
        const seqParam = this.reinitialize || typeof missing === 'number' || isNil(seq) ? null : Math.min(seq + 1, ...missing);
        this.reinitialize = false;

        let queue: Array<SyncEvent<T>> | null = [];

        console.log(`Initializing sync partition ${this.topic} from date ${date} and sequence ${seqParam}`);
        await this.storage.dump(this.partition);

        const subject = await this.syncService.subscribe<Array<SyncEvent<T>>>(this.topic, () => this.closed, this.lazy, this.simulcast);
        this.subscription = subject.subscribe({
            // eslint-disable-next-line @typescript-eslint/no-misused-promises
            next: async val => {
                const newPayload = val.payload.map(p => ({ ...p, networkId: val.networkId, networkType: val.networkType }));
                if (queue) queue.push(...newPayload);
                else await this.queueUpdate(newPayload, false);
            },
            error: () => {
                this.subscribeAndGet().catch(e => console.error('subscribeAndGet', e));
            },
        });

        const data = await this.syncService.get<Array<SyncEvent<T>>>(
            `/sync/${this.topic}`,
            date ? { date, seq: seqParam } : {},
            () => this.closed,
            this.lazy,
        );
        this.syncService.reportMetric(data.length, JSON.stringify(data).length);

        await this.queueUpdate(data, true);
        const remaining = queue;
        queue = null;
        await this.queueUpdate(remaining, true);

        this.storage.addPartition(this.partition);
        this.loaded$.next(true);
    }

    async flush() {
        await this.storage.flush();
    }

    private unsubscribe() {
        if (this.subscription) {
            this.subscription.unsubscribe();
            this.subscription = undefined;
        }
        if (this.telemetryInterval) {
            clearInterval(this.telemetryInterval);
            this.telemetryInterval = undefined;
        }
    }

    protected async queueUpdate(publish: Array<SyncEvent<T>>, init: boolean) {
        if (init && publish.length) console.log(`Applying ${publish.length} events to ${this.topic}`);
        this.updating.push(...publish);
        if (this.applying) return;
        this.applying = true;
        const start = new Date().getTime();
        let index = 0;
        while (index < this.updating.length && !this.closed) {
            const event = this.updating[index++];
            const updateWait = this.applyUpdate(event, init);
            // Doing large amounts of promises is slow in react native... has to do with timeouts and stuff
            // So we only await when we need to (most stuff in lazy storage needs loaded once then is fine)
            if ((updateWait as Promise<unknown>)?.then) await updateWait;
            const eventWait = this.eventStorage?.add(this.partition, event.date, event.syncId, event.seq);
            if ((eventWait as Promise<unknown>)?.then) await eventWait;
        }
        this.updating = this.updating.slice(index);
        if (init && publish.length) console.log(`Done applying updates ${this.topic} in ${new Date().getTime() - start} ms sync time ${publish[publish.length - 1].date}`);
        this.applying = false;
        if (publish.length > 0) await this.checkMissingSequences();
    }

    private async checkMissingSequences() {
        if (!this.eventStorage) return;

        const missing = await this.eventStorage.getMissingSequences(this.partition);

        if (typeof missing === 'number') {
            this.resubscribeAndGet(missing);
            return;
        }

        if (!missing?.length) {
            await this.eventStorage.prune(this.partition);
            return;
        }

        const seq = Math.min(...missing);
        const data = await this.syncService.get<Array<SyncEvent<T>>>(
            `/sync/${this.topic}`, { seq }, () => this.closed, this.lazy,
        );
        console.log(`SyncPartition got missing data ${this.topic}: ${data.length}`);
        data.forEach(syncEvent => syncEvent.forceProcess = true);

        for (const _ of data) {
            this.missingSyncEventAggregator.accumulate({ missingNetwork: 'seq' });
        }

        const dataSequences = data.map(x => x.seq);
        const stillMissing = missing.filter(x => !dataSequences.includes(x));

        if (stillMissing.length > 0) {
            console.log(`SyncPartition resyncing because of missing sequences ${this.topic}: ${stillMissing.length}`);
            this.resubscribeAndGet(data.length - missing.length);
            return;
        }
        await this.queueUpdate(data, false);
    }

    private resubscribeAndGet(missing: number) {
        console.log(`SyncPartition resyncing because of missing sequences ${this.topic}: ${missing}`);
        this.subscription?.unsubscribe();
        this.reinitialize = true;
        this.subscribeAndGet().catch(e => console.error('subscribeAndGet', e));
    }

    // eslint-disable-next-line complexity
    protected applyUpdate(publish: SyncEvent<T>, init: boolean) {
        if (publish.syncId && this.eventStorage?.isDuplicate(publish.date, publish.syncId) && !publish.forceProcess) {
            if (!init) {
                console.log(`Sync ${publish.syncId} ignored due to duplication for ${this.topic}`);
                const winner = this.simulcastedEventCache[publish.syncId];
                const network = this.syncService.getCurrentNetworks()[publish.networkId!];
                if (winner) {
                    const winnerDate = new Date(winner.date);
                    const loserDate = new Date();
                    this.metricAggregator.accumulate({
                        topic: this.topic,
                        diffMs: loserDate.getTime() - winnerDate.getTime(),
                        winnerNetwork: winner.networkName,
                        loserNetwork: network?.carrier ?? network?.ssid,
                        winnerNetworkType: winner.networkType as 'mobile' | 'wifi',
                        loserNetworkType: publish.networkType as 'mobile' | 'wifi',
                    });
                    delete this.simulcastedEventCache[publish.syncId];
                }
            }
            return;
        }
        if (this.simulcast && !init && Object.keys(this.syncService.getCurrentNetworks()).length > 1) {
            const network = this.syncService.getCurrentNetworks()[publish.networkId!];
            this.simulcastedEventCache[publish.syncId!] = {
                date: new Date().toISOString(),
                networkId: publish.networkId,
                networkType: publish.networkType,
                id: publish.syncId,
                networkName: network?.carrier ?? network?.ssid,
            };
            console.log(`Sync ${publish.syncId} added to cache for ${this.topic}`);
        }
        switch (publish.type) {
            case 'reset':
                console.log(`Sync reset event for ${this.topic}`);
                return this.reset();
            case 'clear':
                console.log(`Sync clear event for ${this.topic} of size ${publish.payload.length}`);
                return this.eventStorage?.clear(this.partition, publish.payload ?? [], publish.seq ?? 0);
            case 'insert':
                if (!init) console.log(`Sync insert event for ${this.partition} id ${publish.payload.id}`);
                return this.storage.add(publish.payload, this.partition);
            case 'delete':
            case 'update':
            case 'pull':
            case 'push': {
                if (!init) console.log(`Sync ${publish.type} event for ${this.partition} from ${this.topic} id ${publish.id}`);

                const getKeys = () => Object.keys('payload' in publish ? publish.payload ?? {} : {});
                switch (publish.type) {
                    case 'delete':
                        return this.storage.remove(publish.id, this.partition);
                    case 'update':
                        if (publish.inc || publish.set || publish.max) {
                            const incSet = (record: T) => {
                                if (publish.inc) {
                                    for (const [field, incValue] of Object.entries(publish.inc)) {
                                        const current = get(record, field) as number ?? 0;
                                        const newValue = current + incValue as number;
                                        set(record, field, newValue);
                                    }
                                }
                                if (publish.max) {
                                    for (const [field, maxValue] of Object.entries(publish.max)) {
                                        const current = get(record, field) as number ?? 0;
                                        const newValue = mongoDbMax(current, maxValue as number);
                                        set(record, field, newValue);
                                    }
                                }
                                if (publish.set) {
                                    for (const [field, setValue] of Object.entries(publish.set)) {
                                        set(record, field, setValue);
                                    }
                                }
                                return record;
                            };
                            return this.storage.update(publish.id, incSet, getKeys, this.partition);
                        } else {
                            return this.storage.update(publish.id, record => mergeCopyArrays(record, publish.payload), getKeys, this.partition);
                        }
                    case 'pull': {
                        const pull = (record: T) => {
                            Object.keys(publish.payload).forEach(field => {
                                const array = get(record, field) as unknown;
                                if (array && Array.isArray(array)) {
                                    const filter = publish.payload[field as keyof T];
                                    for (let i = 0; i < array.length; i++) {
                                        // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any
                                        if (typeof filter === 'object' && Object.keys(filter).every(key => array[i][key] === (filter as any)[key])
                                            || typeof filter !== 'object' && filter === array[i]) {
                                            array.splice(i--, 1);
                                        }
                                    }
                                }
                            });
                            return record;
                        };
                        return this.storage.update(publish.id, pull, getKeys, this.partition);
                    }
                    case 'push': {
                        const push = (record: T) => {
                            Object.keys(publish.payload).forEach(field => {
                                const array = get(record, field) as unknown;
                                if (!array) set(record, field, [publish.payload[field as keyof T]]);
                                else if (Array.isArray(array)) array.push(publish.payload[field as keyof T]);
                            });
                            return record;
                        };
                        return this.storage.update(publish.id, push, getKeys, this.partition);
                    }
                }
                /* FALLTHROUGH */
            }
            default:
                console.warn(`${(publish as SyncEvent<T>).type} sync event not recognized`);
                return;
        }
    }

    private async reset() {
        await this.close(false);
        this.reset$.next(true);
    }
}
