import { HttpParams } from '@angular/common/http';
import { Injectable } from '@angular/core';
import dayjs from 'dayjs';
import hark from 'hark';
import RecordRTC, { StereoAudioRecorder } from 'recordrtc';
import { BehaviorSubject, Observable, ReplaySubject, Subject, of, throwError, timer } from 'rxjs';
import { catchError, concatMap, filter, map, mergeMap, retryWhen, takeUntil } from 'rxjs/operators';

import { AuthUsecase, WebSocketSubject } from '@daikin-tic/dxone-com-lib';

import {
  AUDIO_CHANNELS,
  BIT_DEPTH,
  MIMI_ASR_API_URL,
  MIMI_ASR_LANGUAGE,
  MIMI_ASR_MIME_TYPE,
  MIMI_ASR_PROCESS,
  NICT_ASR_FORMAT,
  NICT_ASR_PROGRESSIVE,
  NICT_ASR_TEMPORARY,
  NICT_ASR_TEMPORARY_INTERVAL,
  SAMPLE_RATE,
  Speech,
  SpeechStatus,
} from '../models/transcribe.model';
import { TranscribeUsecase } from '../usecases/transcribe.usecase';

const RETRY_LIMIT = 10;
const RETRY_INTERVAL = 100;
const WAV_HEADER_SIZE = 44;

const deserialize = (message: string): { sessionId: string; status: SpeechStatus; text: string } => {
  // eslint-disable-next-line @typescript-eslint/naming-convention
  const { session_id, status, response } = JSON.parse(message);
  return {
    sessionId: session_id,
    status,
    text: response[0]?.result,
  };
};

class AudioSubject extends ReplaySubject<Blob> {
  constructor(public lang: string, public timestamp: number) {
    super();
  }
}

@Injectable()
export class TranscribeInteractor extends TranscribeUsecase {
  speech$: Observable<Speech>;

  private _audioTracks?: MediaStreamTrack[];
  private _recorder?: RecordRTC;
  private _harker?: hark.Harker;

  private readonly _pauseSubject = new Subject<void>();
  private readonly _chunkSubject = new BehaviorSubject<Blob>(new Blob());
  private readonly _queueSubject = new Subject<AudioSubject>();

  private readonly _options = {
    type: 'audio',
    mimeType: 'audio/wav',
    recorderType: StereoAudioRecorder,
    timeSlice: 500,
    ondataavailable: blob => this._chunkSubject.next(blob), // cspell: disable-line
    sampleRate: SAMPLE_RATE,
    numberOfAudioChannels: AUDIO_CHANNELS,
    disableLogs: true,
  } as RecordRTC.Options;

  constructor(private _authUsecase: AuthUsecase) {
    super();
    this.speech$ = this._queueSubject.pipe(
      concatMap(audio =>
        this.transcribe(audio).pipe(
          retryWhen(errors => errors.pipe(mergeMap((err, index) => (index < RETRY_LIMIT ? timer(RETRY_INTERVAL) : throwError(err))))),
          catchError(() => of({ sessionId: '', text: '', lang: audio.lang, timestamp: audio.timestamp })),
        ),
      ),
    );
  }

  start(stream: MediaStream): void {
    this._audioTracks = stream.getAudioTracks().map(track => track.clone());
    this._audioTracks.forEach(track => (track.enabled = true));

    const audioStream = new MediaStream(this._audioTracks);
    this._recorder = new RecordRTC(audioStream, this._options);
    this._recorder.startRecording();

    this._harker = hark(stream);
    this._harker.on('speaking', () => this.onSpeaking());
    this._harker.on('stopped_speaking', () => this._pauseSubject.next());
  }

  stop(): void {
    if (this._harker) {
      this._harker.stop();
      this._harker = undefined;
    }
    if (this._recorder) {
      const recorder = this._recorder;
      this._recorder.stopRecording(() => recorder.destroy());
      this._recorder = undefined;
    }
    if (this._audioTracks) {
      this._audioTracks.forEach(track => track.stop());
      this._audioTracks = undefined;
    }
  }

  private onSpeaking(): void {
    const audioSubject = new AudioSubject(MIMI_ASR_LANGUAGE, dayjs().unix());
    this._chunkSubject
      .pipe(
        takeUntil(this._pauseSubject),
        map(blob => (blob ? blob.slice(WAV_HEADER_SIZE) : new Blob())),
        filter(audio => !!audio.size),
      )
      .subscribe(audioSubject);
    this._queueSubject.next(audioSubject);
  }

  private transcribe(audio: AudioSubject): Observable<Speech> {
    return new Observable<Speech>(observer => {
      this._authUsecase.payload$.subscribe(payload => {
        const { lang, timestamp } = audio;
        const { accessToken, backendId } = payload;
        const params = new HttpParams({
          fromObject: {
            ['access-token']: accessToken,
            ['content-type']: [MIMI_ASR_MIME_TYPE, `bit=${BIT_DEPTH}`, `rate=${SAMPLE_RATE}`, `channels=${AUDIO_CHANNELS}`].join(';'),
            ['process']: backendId ? `${MIMI_ASR_PROCESS}#${backendId}` : MIMI_ASR_PROCESS,
            ['input-language']: lang,
            ['nict-asr-options']: [
              `response_format=${NICT_ASR_FORMAT}`,
              `progressive=${NICT_ASR_PROGRESSIVE}`,
              `temporary=${NICT_ASR_TEMPORARY}`,
              `temporary_interval=${NICT_ASR_TEMPORARY_INTERVAL}`,
            ].join(';'),
          },
        });
        const url = `${MIMI_ASR_API_URL}/?${params.toString()}`;
        const ws = new WebSocketSubject(url, undefined, 'blob');
        ws.pipe(
          filter((message): message is string => typeof message === 'string'),
          map(message => ({ ...deserialize(message), lang, timestamp })),
        ).subscribe(observer);
        audio.subscribe({
          next: blob => ws.next(blob),
          complete: () => ws.next(JSON.stringify({ command: 'recog-break' })),
        });
      });
    });
  }
}
