import { Injectable, OnDestroy } from '@angular/core';
import { logger } from '@app/core/helpers/logger';
import { environment } from '@env/environment';
import { supportedNotices } from '@models/constants';
import { DestroyableComponent } from '@models/destroyable.component';
import {
  AnyType,
  BillingOperation,
  BillingOperationStatusEnum,
  EntityWatchType,
  GenericNotice,
  IAccountUpdate,
  IBalanceUpdate,
  IPlanUpdate,
  IChatMessageUpdate,
  IChatThreadUpdate,
  IEntityUpdate,
  INoticeEvent,
  INoticeOptions,
  IPagedResults,
  IProfileVerificationNotice,
  IReferenceUpdate,
} from 'lingo2-models';
import {
  BehaviorSubject,
  interval,
  Observable,
  Observer,
  of,
  race,
  Subject,
  Subscription,
  switchMap,
  throwError,
  timer,
} from 'rxjs';
import {
  catchError,
  debounceTime,
  delay,
  distinctUntilChanged,
  filter,
  finalize,
  map,
  share,
  takeUntil,
  takeWhile,
  tap,
} from 'rxjs/operators';
import { WebSocketSubject, WebSocketSubjectConfig } from 'rxjs/webSocket';
import { AuthService, ContextService, PlatformService } from '../services';
import { IWebsocketService, IWsMessage, WebSocketConfig } from './websocket.interfaces';

export interface INoticeRead {
  unread_count: number;
}

export interface INoticeOptionsUpdate {
  result: 'done';
}

export interface INoticeEventsPagedResults extends IPagedResults<INoticeEvent[]> {
  unread_count: number;
}

export interface IWsAuthEvent {
  success: boolean;
  user: { id: string };
}

export interface IWsPongEvent {
  user: { id: string };
}

@Injectable({
  providedIn: 'root',
})
export class WebsocketService extends DestroyableComponent implements IWebsocketService, OnDestroy {
  public status$: Observable<boolean>;

  private _authorized = new BehaviorSubject<boolean>(false);
  public authorized$ = this._authorized.asObservable();

  // private static _instance: WebsocketService; // singleton

  private config: WebSocketSubjectConfig<IWsMessage<AnyType>>;

  private connection$: Observer<boolean>;
  private reconnection$: Observable<number>;
  private sendingMessages$: WebSocketSubject<IWsMessage<AnyType>>;
  private receiveMessages$: Subject<IWsMessage<AnyType>> = this.register(new Subject<IWsMessage<AnyType>>());

  private status$$: Subscription;
  private receiveMessages$$: Subscription;

  private reconnectInterval: number;
  private reconnectAttempts: number;
  private isConnected: boolean;

  private _authUserId: string;
  private watches = new Map<string, { entity: EntityWatchType; id: string }>();
  private networkError = false;
  private pingpongSubscription: Subscription;

  public constructor(
    private authService: AuthService,
    private contextService: ContextService,
    protected platform: PlatformService,
  ) {
    super(platform);
    if (!this.isBrowser) {
      // logger.log('WebsocketService init rejected in SSR');
      return;
    }

    this.configure();

    // connection status
    this.status$ = new Observable<boolean>((observer) => (this.connection$ = observer))
      .pipe(takeUntil(this.destroyed$))
      .pipe(share(), distinctUntilChanged());

    contextService.me$
      .pipe(
        filter((user) => (user ? user.id : null) !== this._authUserId),
        debounceTime(1000),
      )
      .pipe(takeUntil(this.destroyed$))
      .subscribe((user) => {
        this._authUserId = user ? user.id : null;
        this.init();
        this.auth();
        this.connect();
      });
  }

  private configure() {
    // logger.debug('WebsocketService:configure');
    const wsConfig: WebSocketConfig = {
      reconnectInterval: 5000, // pause between connections
      reconnectAttempts: 10000, // number of connection attempts
      url: environment.ws_url, // TODO из настроек contextService.ws
    };

    this.reconnectInterval = wsConfig.reconnectInterval;
    this.reconnectAttempts = wsConfig.reconnectAttempts;

    this.config = {
      url: wsConfig.url,
      closeObserver: {
        next: (event: CloseEvent) => {
          logger.debug('WebsocketService disconnected');
          this.sendingMessages$ = null;
          this.connection$.next(false);
        },
      },
      openObserver: {
        next: (event: Event) => {
          logger.debug('WebsocketService connected');
          this.connection$.next(true);
          this.reconnectInterval = wsConfig.reconnectInterval;
          this.reconnectAttempts = wsConfig.reconnectAttempts;
          this.auth();
          this.startPingPong();
        },
      },
    };
  }

  private init() {
    // logger.debug('WebsocketService:init');

    // run reconnect if not connection
    this.status$$?.unsubscribe();
    this.status$$ = this.status$.pipe(takeUntil(this.destroyed$)).subscribe((isConnected) => {
      this.isConnected = isConnected;
    });

    this.receiveMessages$$?.unsubscribe();
    this.receiveMessages$$ = this.receiveMessages$.pipe(takeUntil(this.destroyed$)).subscribe(
      (msg) => {
        if (msg.event !== 'pong') {
          logger.info('WebsocketService:received', msg);
        }
      },
      (error: ErrorEvent) => {
        logger.error('WebsocketService:received:error', error);
      },
    );

    this.onAuthComplete
      .pipe(
        tap((event) => {
          this._authorized.next(event.success);
          if (event.success) {
            this.restartWatchingAll();
          }
        }),
        takeUntil(this.destroyed$),
      )
      .subscribe();
  }

  public ngOnDestroy() {
    this.stopWatchingAll();
    this.disconnect();
    this.stopPingPong();
    super.ngOnDestroy();
  }

  /** Авторизация на сокет-сервере */
  private auth() {
    this.send('auth', {
      token: this.authService.accessToken,
      features: {
        notices: supportedNotices,
      },
    });
    /** @see onAuthComplete */
  }

  /**
   * connect to WebSocked
   */
  private connect(): void {
    if (this.isConnected) {
      return;
    }

    if (this.sendingMessages$) {
      this.sendingMessages$.complete();
    }

    this.sendingMessages$ = new WebSocketSubject(this.config);

    const sendMessages$$ = this.sendingMessages$.pipe(takeUntil(this.destroyed$)).subscribe(
      (message) => this.receiveMessages$.next(message),
      (err: AnyType) => {
        sendMessages$$.unsubscribe();
      },
      () => {
        sendMessages$$.unsubscribe();
      },
    );
  }

  /**
   * disconnect from WebSocket
   */
  private disconnect(): void {
    this.connection$?.complete();
  }

  /**
   * reconnect if not connecting or errors
   */
  private reconnect(): void {
    if (this.reconnection$) {
      return;
    }
    this.reconnection$ = interval(this.reconnectInterval).pipe(
      takeWhile((value) => value < this.reconnectAttempts && !this.isConnected),
    );

    const reconnection$$ = this.reconnection$
      .pipe(
        tap((i) => {
          logger.debug('WebsocketService:reconnect() attempt #' + i.toString());
          this.connect();
        }),
        catchError((err) => {
          logger.error('WebsocketService:reconnect() error', err);
          return throwError(err);
        }),
        finalize(() => {
          // Subject complete if reconnect attemts ending
          reconnection$$.unsubscribe();
          this.reconnection$ = null;

          if (this.isConnected) {
            logger.info('WebsocketService:reconnect() success');
          } else {
            logger.error('WebsocketService:reconnect() failed');
            this.receiveMessages$?.complete();
            this.connection$?.complete();
            this.sendingMessages$?.complete();
            this.sendingMessages$ = null;
          }
        }),
        takeUntil(this.destroyed$),
      )
      .subscribe();
  }

  /**
   * on received message
   * рекомендуется написать более строгий метод с указанием возвращаемого типа данных
   * см. onAuthComplete onNoticeRead и другие
   */
  public on<T>(event: string): Observable<T> {
    if (event) {
      return this.receiveMessages$
        .pipe(
          filter(
            (message: IWsMessage<T>) => typeof message === 'object' && 'event' in message && message.event === event,
          ),
          map((message: IWsMessage<T>) => message.data),
        )
        .pipe(takeUntil(this.destroyed$));
    }
  }

  /** Сообщение об авторизации на сокет-сервере */
  public get onAuthComplete(): Observable<IWsAuthEvent> {
    return this.on<IWsAuthEvent>('auth-complete');
  }

  /** Сообщение о системном событии */
  public get onSystemAction(): Observable<GenericNotice<AnyType>> {
    return this.on('system-action');
  }

  /** Сообщение о количестве непрочитанных уведомлений */
  public get onNoticeRead(): Observable<INoticeRead> {
    return this.on('notice-read');
  }

  /** Новое уведомление */
  public get onNotice(): Observable<INoticeEvent> {
    return this.on('notice');
  }

  /** @Deprecated Заменить в пользу notice */
  public onOperation(
    id: string,
  ): Observable<{ account_id: string; operation_id: string; status: BillingOperationStatusEnum }> {
    return this.on('operation').pipe(
      filter(
        (update: { account_id: string; operation_id: string; status: BillingOperationStatusEnum }) =>
          update.operation_id === id,
      ),
      debounceTime(1000),
    );
  }

  /** Страница уведомлений */
  public get onNotices(): Observable<INoticeEventsPagedResults> {
    return this.on('notices');
  }

  /** Настройки уведомлений */
  public get onNoticeOptions(): Observable<INoticeOptions> {
    return this.on('notice-options');
  }

  /** Подтверждение обновления настроек уведомлений */
  public get onNoticeOptionsUpdate(): Observable<INoticeOptionsUpdate> {
    return this.on('notice-options-update');
  }

  /** Изменение сообщения в нити чата */
  public get onChatMessage(): Observable<IChatMessageUpdate> {
    return this.on('chat-message');
  }

  /** Изменение нити чата */
  public get onChatThread(): Observable<IChatThreadUpdate> {
    return this.on('chat-thread');
  }

  /** Изменение учётной записи */
  public get onAccountUpdate(): Observable<IAccountUpdate> {
    return this.on<IAccountUpdate>('account').pipe(debounceTime(1000));
  }

  /** Изменение финансового профиля (баланс) */
  public get onBalanceUpdate(): Observable<IBalanceUpdate> {
    return this.on<IBalanceUpdate>('balance').pipe(debounceTime(1000));
  }

  /** Изменение финансового профиля (подписки) */
  public get onPlanUpdate(): Observable<IPlanUpdate> {
    return this.on<IPlanUpdate>('plan').pipe(debounceTime(1000));
  }

  /** Изменение операции */
  public onBillingOperation(id: string): Observable<BillingOperation> {
    return of(null);
  }

  /** Изменение справочников */
  public get onReferenceUpdate(): Observable<IReferenceUpdate> {
    return this.on('reference');
  }

  /** Верификация профиля пользователя */
  public get onVerification(): Observable<IProfileVerificationNotice> {
    return this.on('verification');
  }

  /** Изменение сущности */
  public onEntityUpdate(entity: EntityWatchType, id: string): Observable<IEntityUpdate> {
    return this.on<IEntityUpdate>('update').pipe(
      filter((update) => update.entity === entity && update.id === id),
      debounceTime(1000),
    );
  }

  /** Подключить наблюдение за изменениями сущности */
  public startWatching(entity: EntityWatchType, id: string) {
    if (!this.platform.isBrowser || !id) {
      return;
    }
    this.send('watch-start', { entity, id });
    this.watches.set(`${entity}-${id}`, { entity, id });
  }

  /** Отключить наблюдение за изменениями сущности */
  public stopWatching(entity: EntityWatchType, id: string) {
    if (!this.platform.isBrowser || !id) {
      return;
    }
    this.send('watch-stop', { entity, id });
    this.watches.delete(`${entity}-${id}`);
  }

  /** Отключить наблюдение за любыми изменениями сущностей */
  public stopWatchingAll() {
    if (!this.platform.isBrowser) {
      return;
    }
    this.send('watch-stop-all');
    this.watches.clear();
  }

  /** Добавить в наблюдаемые всех ранее наблюдаемых пользователей */
  protected restartWatchingAll() {
    this.watches.forEach((data) => {
      const { entity, id } = data;
      this.startWatching(entity, id);
    });
  }

  /** send message to server */
  public send(event: string, data: AnyType = {}): void {
    if (!this.platform.isBrowser) {
      return;
    }

    if (event) {
      if (this.isConnected && this.sendingMessages$) {
        this.sendingMessages$.next({ event, data });
      }
    }
  }

  protected startPingPong(): void {
    this.stopPingPong();
    this.networkError = false;

    const pingpong$: Observable<string> = timer(1000, 15000).pipe(
      tap(() => this.ping()),
      switchMap(() =>
        race(
          of('timeout').pipe(delay(3000)),
          this.watchPong().pipe(
            map(() => 'pong'),
            catchError(() => of('error')),
          ),
        ),
      ),
      catchError(() => of('!ping')),
    );

    this.pingpongSubscription = pingpong$
      .pipe(
        tap((msg) => {
          if (msg === 'pong') {
            this.networkError = false;
          } else {
            this.networkError = true;
            this.isConnected = false;
            logger.warn('WebsocketService:startPingPong -> heartbeat$=' + msg + ' -> this.reconnect()');
            this.reconnect();
          }
        }),
        takeUntil(this.destroyed$),
      )
      .subscribe();
  }

  protected stopPingPong() {
    if (this.pingpongSubscription) {
      this.pingpongSubscription.unsubscribe();
    }
  }

  private ping() {
    if (this.isConnected && this.sendingMessages$) {
      this.send('ping');
    } else {
      logger.debug('WebsocketService:ping not sent');
    }
  }

  private watchPong() {
    return this.on<IWsPongEvent>('pong');
  }
}
