import { Inject, Injectable } from '@angular/core';

import { MeetingService, MeetingState } from 'meeting/meeting-room/meeting.service';
import {
  Deferred,
  EventEmitter,
  array,
  assertOrThrow,
  bind,
  deferred,
  errors,
  logger,
} from 'utils/util';
import { IncomingEvent, SessionBackendService } from './session-backend.service';


type OutgoingMessage = {
  deferred: Deferred<unknown>;
  channel: string;
  hostMessage: boolean;
  receivers: unknown[];
  message: unknown[];
};


const LEGAL_MEETING_STATES: ReadonlySet<MeetingState> = new Set([
  MeetingState.KNOCKING,
  MeetingState.JOINED,
]);

const HARD_LIMIT_NUM_EVENTS = 100e3;
const SOFT_LIMIT_NUM_EVENTS = HARD_LIMIT_NUM_EVENTS * .9;

const SAVING_EXCLUDED_EVENTS = ['content-shared-mouse'];

const CACHE_PERSISTENT_CHANNELS = [
  'smart-summary-recording-started',
  'smart-summary-recording-stopped',
  'smart-summary-transcript-generated',
  'smart-summary-transcription-failed',
  'smart-summary-summary-failed',
];

@Injectable()
export class BroadcastService {
  private eventEmitter: EventEmitter = new EventEmitter([], true);

  // Receiving initial messages
  private _initializing = true;
  private afterInitializationDeferred: Deferred<undefined> = deferred();
  private prevLegalMeetingState: MeetingState;
  private gettingInitialMessages = false;
  private shouldGetInitialMessagesForMeetingState: MeetingState | null = null;
  private shouldResetMaxReceivedId = false;

  // Receiving messages
  private registeredChannels: Set<string> = new Set();
  private receivedIds: Set<string> = new Set();
  private maxReceivedId: string | null = null;
  private numReceived = 0;
  private receiveQueue: IncomingEvent[] = [];
  private heldMessages: IncomingEvent[] = [];

  // Sending messages
  private sending = false;
  /**
   * Outbound messages are queued and sent one by one, each time waiting for acknowledgement from
   * the server for the previous message. To avoid conflicts: no inbound messages are accepted as
   * long as the send queue is not empty
   */
  private sendQueue: OutgoingMessage[] = [];

  constructor(
    @Inject('meetingReliableSocketService')
    private meetingReliableSocketService,
    private meetingService: MeetingService,
    private sessionBackendService: SessionBackendService,
    @Inject('userService') private userService,
  ) {
    bind(this);
    this.prevLegalMeetingState = this.meetingService.state;
    this.registerChannel('session-join');
    setTimeout(() => {
      for (const state of LEGAL_MEETING_STATES) {
        this.meetingService.eventEmitter.on(state, this.updateMeetingState);
      }
    });
  }

  /***********
   * General *
   ***********/

  /**
   * During initialization, broadcastservice will broadcast the entire backlog of events from a
   * meeting room. Once the backlog has been broadcasted this will return false
   */
  public get initializing(): boolean {
    return this._initializing;
  }

  private updateMeetingState(): void {
    const meetingState = this.meetingService.state;
    const resetMaxReceivedId = meetingState !== this.prevLegalMeetingState;
    this.prevLegalMeetingState = meetingState;
    this.getInitialMessages(resetMaxReceivedId, meetingState);
  }

  public afterInitialization(): Promise<undefined> {
    return this.afterInitializationDeferred.promise;
  }

  private finishAfterInitialization(): void {
    if (this._initializing) {
      this._initializing = false;
      this.afterInitializationDeferred.resolve(undefined);
    }
  }

  private getEmitterChannel(channel: string, ownMessages: boolean): string {
    return `${channel}:${ownMessages}`;
  }


  /********************
   * Receive messages *
   ********************/

  private registerChannel(channel: string): void {
    if (!this.registeredChannels.has(channel)) {
      this.registeredChannels.add(channel);
      this.meetingReliableSocketService.on(channel, this.onMessage);
    }
  }

  public on<T extends unknown[]>(
    channel: string,
    callback: (...args: T) => unknown,
    receiveOwnMessages: boolean,
  ): void {
    if (channel.includes(':')) {
      throw new errors.InvalidArgumentError(
        `Argument "channel" cannot contain semicolons, got "${channel}"`,
      );
    }

    this.registerChannel(channel);

    // If `receiveOwnMessages == true`, both own and remote messages are received
    this.eventEmitter.on(this.getEmitterChannel(channel, false), callback);
    if (receiveOwnMessages) {
      this.eventEmitter.on(this.getEmitterChannel(channel, true), callback);
    }
  }

  private async getInitialMessages(
    resetMaxReceivedId: boolean,
    meetingState: MeetingState,
  ): Promise<void> {
    if (this.gettingInitialMessages) {
      this.shouldGetInitialMessagesForMeetingState = meetingState;
      this.shouldResetMaxReceivedId ||= resetMaxReceivedId;
    } else {
      logger.info(`Getting initial messages for meeting state "${meetingState}".`);
      this.gettingInitialMessages = true;
      this.shouldGetInitialMessagesForMeetingState = null;
      this.shouldResetMaxReceivedId = false;
      if (resetMaxReceivedId) {
        this.maxReceivedId = null;
      }
      const eventsPromise = this.sessionBackendService.fetchEvents(this.maxReceivedId);
      if (meetingState === MeetingState.KNOCKING) {
        this.receiveQueue = await eventsPromise;
      } else if (meetingState === MeetingState.JOINED) {
        const [events, meetingState] = await Promise.all([
          eventsPromise,
          this.fetchCachedMeetingState(),
        ]);
        array.insertIntoOrderedArray(
          events,
          meetingState,
          (event1: IncomingEvent, event2: IncomingEvent) =>
            event1.datetime.getTime() > event2.datetime.getTime(),
        );
        this.receiveQueue = events;
      }
      this.processReceiveQueue();
    }
  }

  /**
   * Fetch the cached meeting state and pretend it's an event, so we can process it at the right
   * time among the other events.
   */
  private async fetchCachedMeetingState(): Promise<IncomingEvent> {
    const meetingState = await this.sessionBackendService.fetchCachedMeetingState();
    logger.info(`Fetched cached meeting state: ${JSON.stringify(meetingState)}`);
    return {
      id: null,
      channel: 'cached-meeting-state',
      senderId: null,
      datetime: new Date(meetingState.timestamp),
      message: meetingState,
    };
  }

  private finishGetInitialMessages(): void {
    this.gettingInitialMessages = false;

    if (this.shouldGetInitialMessagesForMeetingState != null) {
      this.getInitialMessages(
        this.shouldResetMaxReceivedId,
        this.shouldGetInitialMessagesForMeetingState,
      );
    } else {
      this.receiveQueue = this.heldMessages;
      this.heldMessages = [];
      this.processReceiveQueue();
    }
  }

  private onMessage(channel: string, data: any[]): void {
    if (channel[0] === '_') {
      this.onMetaMessage(channel, ...data);

    } else {
      const item: IncomingEvent = {
        id: data[0],
        channel: channel,
        senderId: data[1],
        datetime: new Date(data[2]),
        message: data[3],
      };

      if (
        !this.gettingInitialMessages
        && LEGAL_MEETING_STATES.has(this.meetingService.state)
      ) {
        this.receiveQueue.push(item);
        this.processReceiveQueue();
      } else {
        this.heldMessages.push(item);
      }
    }
  }

  private onMetaMessage(channel: string, ...data: unknown[]): void {
    const emitterChannel = this.getEmitterChannel(channel, false);
    this.eventEmitter.emit(emitterChannel, ...data);
  }

  private processReceiveQueue(): void {
    while (this.receiveQueue.length > 0) {
      if (this.sending) {
        return;
      }

      const item = this.receiveQueue.shift();
      assertOrThrow(item != null);
      const { id, channel, message, datetime, senderId } = item;

      if (id === null || !this.receivedIds.has(id)) {
        if (id !== null) {
          this.numReceived++;

          if (this.numReceived === SOFT_LIMIT_NUM_EVENTS) {
            this.onMetaMessage('_softLimitNumEvents');
          } else if (this.numReceived === HARD_LIMIT_NUM_EVENTS) {
            this.onMetaMessage('_hardLimitNumEvents');
          }
          this.receivedIds.add(id);
          this.maxReceivedId = id;
        }

        const sender = this.userService.getOrCreateSession(senderId);

        if (!this.gettingInitialMessages) {
          logger.debug('Got BM %s on channel "%s" from %s:', id, channel, sender.id, message);
        }

        const args = [channel, sender, datetime].concat(message);

        const emitterChannel = this.getEmitterChannel(channel, sender.isLocal);
        this.eventEmitter.emit(emitterChannel, ...args);

        if (channel === 'session-join' && sender.isLocal) {
          this.finishAfterInitialization();
        }
      }
    }

    if (this.gettingInitialMessages) {
      this.finishGetInitialMessages();
    }
  }


  /*****************
   * Send messages *
   *****************/

  public send(
    channel: string,
    hostMessage: boolean,
    receivers: unknown[],
    ...message: unknown[]
  ): Promise<unknown> {
    const deferred_ = deferred();
    this.sendQueue.push({
      deferred: deferred_,
      channel,
      hostMessage,
      receivers,
      message,
    });
    this.processSendQueue();
    return deferred_.promise;
  }

  public get isSaving(): boolean {
    return this.sendQueue.some(
      item => !SAVING_EXCLUDED_EVENTS.includes(item.channel)
    );
  }

  private processSendQueue(): void {
    // Only send 1 request at a time.
    if (this.sending) {
      return;
    }

    if (this.sendQueue.length > 0) {
      this.sending = true;

      const item = this.sendQueue[0];
      const { channel, hostMessage, receivers, message, deferred } = item;

      let responsePromise: Promise<unknown>;
      if (CACHE_PERSISTENT_CHANNELS.includes(channel)) {
        responsePromise =
          this.sessionBackendService.sendCachePersistentEvent(channel);
      } else {
        responsePromise = this.meetingReliableSocketService
          .send(channel, [hostMessage, receivers, message]);
      }

      responsePromise
        .then(
          (response: unknown) => {
            deferred.resolve(response);
          },
          (error: unknown) => {
            logger.debug('BM rejected on channel "%s" with reason "%s":', channel, error, message);
            deferred.reject(error);
          },
        )
        .finally(() => {
          this.sendQueue.shift();
          this.sending = false;
          this.processSendQueue();
        });

      logger.debug('Sent BM on channel "%s" to %s:', channel, JSON.stringify(receivers), message);

    } else {
      this.processReceiveQueue();
    }
  }
}
