import { Injectable } from '@angular/core';
import dayjs from 'dayjs';
import { AsyncSubject, Observable, concat, partition } from 'rxjs';
import { delayWhen, filter, finalize, ignoreElements, repeatWhen, takeUntil } from 'rxjs/operators';

import {
  AuthUsecase,
  DistinctSubject,
  Period,
  PeriodSubject,
  UserTimezone,
  WebSocketUsecase,
  recursiveQuery,
} from '@daikin-tic/dxone-com-lib';

import { Record, RecordQueryParams, RecordUpdateParams } from '../models/record.model';
import { LocalArchiveUsecase } from '../usecases/local-archive.usecase';
import { RecordGateway } from '../usecases/record.gateway';

@Injectable()
export class LocalArchiveInteractor extends LocalArchiveUsecase {
  get records$(): Observable<Record[]> {
    return this._records;
  }
  get period$(): Observable<Period> {
    return this._period;
  }

  private readonly _records = new DistinctSubject<Record[]>([]);
  private readonly _period = new PeriodSubject({ from: -1, to: -1 });

  constructor(private _authUsecase: AuthUsecase, private _webSocketUsecase: WebSocketUsecase, private _recordGateway: RecordGateway) {
    super();
    this._authUsecase.authState$.pipe(filter(({ status }) => status === 'signedIn')).subscribe(() => {
      this._period.now();
    });

    const [open$, close$] = partition(this._webSocketUsecase.isOpen$, isOpen => isOpen);
    this.period$
      .pipe(
        filter(({ from, to }) => from >= 0 && to >= 0),
        takeUntil(close$),
        finalize(() => this._records.next([])),
        repeatWhen(notifications => notifications.pipe(delayWhen(() => open$))),
      )
      .subscribe(({ from, to }) => {
        const queryParams: RecordQueryParams = { createdFrom: from.toString(), createdTo: to.toString() };
        recursiveQuery(params => this._recordGateway.listRecords(params), queryParams).subscribe(records => this._records.next(records));
      });
  }

  changeTimezone(timezone: UserTimezone): void {
    this._period.tz(timezone);
  }

  changePeriod(period: Period): void {
    this._period.next(period);
  }

  reload(): void {
    const now = dayjs().tz();
    if (this._period.includes(now.startOf('day').unix())) {
      this._period.next({ from: this._period.value.from, to: now.unix() });
    }
  }

  updateRecord(channelId: string, params: RecordUpdateParams): Observable<never> {
    const result = new AsyncSubject<never>();
    this._recordGateway.updateRecord(channelId, params).subscribe({
      next: updatedRecord => this.replaceRecord(updatedRecord),
      error: result.error.bind(result),
      complete: result.complete.bind(result),
    });
    return result.asObservable();
  }

  getRecordVideo(recordId: string, videoId: string): Observable<string> {
    const result = new AsyncSubject<string>();
    this._recordGateway.getRecordVideo(recordId, videoId).subscribe({
      next: ({ url }) => result.next(url),
      error: result.error.bind(result),
      complete: result.complete.bind(result),
    });
    return result.asObservable();
  }

  deleteRecordVideo(recordId: string, videoId: string): Observable<never> {
    const result = new AsyncSubject<never>();
    const delete$ = this._recordGateway.deleteRecordVideo(recordId, videoId).pipe(ignoreElements());
    const record$ = this._recordGateway.getRecord(recordId);
    concat(delete$, record$).subscribe({
      next: gotRecord => {
        const records = this._records.value.map(record => (record.recordId === recordId ? gotRecord : record));
        this._records.next(records);
      },
      error: result.error.bind(result),
      complete: result.complete.bind(result),
    });
    return result.asObservable();
  }

  private replaceRecord(record: Record): void {
    const records = this._records.value;
    const index = records.findIndex(({ recordId }) => recordId === record.recordId);
    if (index < 0) {
      return;
    }
    records[index] = record;
    this._records.next([...records]);
  }
}
