import { queueScheduler, BehaviorSubject } from 'rxjs';
import { webSocket } from 'rxjs/webSocket';
import { retry, filter, take } from 'rxjs/operators';
import { error } from 'znipe-logger';
import isBrowser from 'znipe-utils/web/isBrowser';
import registerZnipeGlobal from 'znipe-utils/web/registerZnipeGlobal';
import checkOptions from './utils/checkOptions';
import createErrorList from './utils/createErrorList';
import apps from './apps';
import config from './config';

const socketInfo = registerZnipeGlobal('websocketInfo', {
  subscriptions: {},
  socket: null,
  authSubscription: {
    sub: null,
    token: undefined,
  },
});

const authorizedSubject = new BehaviorSubject(false);

const cj = appName => {
  const app = apps[appName];
  if (!app) {
    throw new Error(
      `No app with name '${appName}' found. Available apps are: ${createErrorList(
        Object.keys(apps),
      )}`,
    );
  }

  const { requiredOptions, moduleIdentifier } = app;

  if (!socketInfo.socket && isBrowser()) {
    socketInfo.socket = webSocket(`${config.BIGSMOKE_URL}`);
  }

  const ref = (path = '', options = {}) => {
    const missingOptions = checkOptions(options, requiredOptions);
    if (missingOptions.length > 0) {
      throw new Error(
        `Missing required options for app '${appName}': ${createErrorList(missingOptions)}`,
      );
    }

    const requiresAuth = requiredOptions.includes('authToken');

    const target = `${moduleIdentifier}_${path}`;

    const onMessage = msg => {
      if (msg?.type) socketInfo.subscriptions[target].latestMessages[msg.type] = msg;
      Object.values(socketInfo.subscriptions[target].callbacks).forEach(cb => cb(msg));
    };

    if (!socketInfo.subscriptions[target]) {
      socketInfo.subscriptions[target] = {
        onMessage,
        subscription: null,
        callbacks: [],
        latestMessages: {},
      };
    }

    const setupsSubs = [];

    const setup = () => {
      if (!socketInfo.subscriptions[target].subscription) {
        const observable$ = socketInfo.socket.multiplex(
          () => ({ type: 'SUBSCRIBE', channel: path, module: moduleIdentifier }),
          () => ({ type: 'UNSUBSCRIBE', channel: path, module: moduleIdentifier }),
          message => message.module === moduleIdentifier && message.channel === path,
        );
        const sub = authorizedSubject
          .pipe(
            filter(authorized => !requiresAuth || authorized),
            take(1),
          )
          .subscribe(() => {
            socketInfo.subscriptions[target].subscription = observable$
              .pipe(retry({ delay: 5000 }))
              .subscribe(onMessage);
          });

        setupsSubs.push(sub);
      }
    };

    const cleanup = () => {
      if (!socketInfo.subscriptions[target].subscription) return;
      if (socketInfo.subscriptions[target].callbacks.length < 1) {
        socketInfo.subscriptions[target].subscription?.unsubscribe();
        socketInfo.subscriptions[target].subscription = null;
        socketInfo.subscriptions[target].latestMessages = {};
        setupsSubs.forEach(sub => sub.unsubscribe());
      }
    };

    return {
      on: (event, callback) => {
        if (event !== 'value') {
          throw new Error(`Invalid event '${event}'`);
        }
        if (!callback) return;
        queueScheduler.schedule(() => {
          const index = socketInfo.subscriptions[target].callbacks.indexOf(callback);
          if (index > -1) {
            error('Callback is already registered on reference');
            return;
          }
          const { latestMessages } = socketInfo.subscriptions[target];
          Object.values(latestMessages).forEach(msg => callback(msg));

          socketInfo.subscriptions[target].callbacks.push(callback);
          setup();
        });
      },
      off: (event, callback) => {
        if (event !== 'value') {
          throw new Error(`Invalid event '${event}'`);
        }
        if (!callback) return;
        queueScheduler.schedule(() => {
          const index = socketInfo.subscriptions[target].callbacks.indexOf(callback);
          if (index < 0) return;
          socketInfo.subscriptions[target].callbacks.splice(index, 1);

          cleanup();
        });
      },
      send: event => {
        queueScheduler.schedule(() => {
          socketInfo.socket.next(event);
        });
      },
    };
  };

  const unauthenticate = (fullReset = true) => {
    if (!socketInfo.authSubscription.sub) return;
    socketInfo.authSubscription.sub.unsubscribe();
    socketInfo.authSubscription.sub = null;
    socketInfo.authSubscription.token = undefined;
    if (fullReset) authorizedSubject.next(false);
  };

  const authenticate = authToken => {
    if (socketInfo.authSubscription.sub && socketInfo.authSubscription.token !== authToken) {
      unauthenticate(!authToken);
    }
    if (socketInfo.authSubscription.token === authToken) return;

    socketInfo.authSubscription.token = authToken;
    if (!authToken) return;
    socketInfo.authSubscription.sub = socketInfo.socket
      .multiplex(
        () => ({ type: 'AUTH', token: authToken }),
        null,
        message => message.type === 'AUTH',
      )
      .pipe(retry({ delay: 5000 }))
      .subscribe(response => authorizedSubject.next(response?.status === 'OK'));
  };

  return {
    ref,
    authenticate,
    unauthenticate,
  };
};

export default cj;
