import { ofType } from 'redux-observable';
import { NEVER, concat } from 'rxjs';
import {
  delay,
  groupBy,
  mergeMap,
  exhaustMap,
  scan,
  filter,
  bufferWhen,
  debounceTime,
  map,
  takeUntil,
  takeLast,
  share,
} from 'rxjs/operators';

const equalizeTypes = type => action$ =>
  action$.pipe(
    scan((value, { type: current }) => Math.max(0, value + (current === type ? 1 : -1)), 0),
    filter(value => value === 0),
  );

const factorySubscribeEpic =
  ([SUBSCRIBE_ACTION, UNSUBSCRIBE_ACTION], groupBySelector, streamSelector, actionSelector) =>
  (action$, state$) =>
    action$.pipe(
      ofType(SUBSCRIBE_ACTION, UNSUBSCRIBE_ACTION),
      groupBy(groupBySelector, null, equalizeTypes(SUBSCRIBE_ACTION)),
      mergeMap(group$ =>
        group$.pipe(
          exhaustMap(action => {
            if (action.type === UNSUBSCRIBE_ACTION)
              throw new Error(
                `You need to dispatch ${SUBSCRIBE_ACTION} before ${UNSUBSCRIBE_ACTION}`,
              );

            /*
            The delay(1) is needed so both subscriptions always have time to be set up.
            If the stream is to quick, the debounceTime triggers before the data pipe is subscribed to
           */
            const stream$ = concat(streamSelector(action, state$), NEVER).pipe(delay(1), share());

            return stream$.pipe(
              bufferWhen(() => stream$.pipe(debounceTime(0))),
              map(actionSelector(action, state$)),
            );
          }),
          takeUntil(group$.pipe(takeLast(1))),
        ),
      ),
    );

export default factorySubscribeEpic;
