import { Observable, OperatorFunction, SchedulerLike, race } from 'rxjs';
import { bufferWhen, delay, filter, map, share, tap } from 'rxjs/operators';

/**
 * Buffers values until reaching either a maximum buffer size or a timeout and returns those values in a list
 *
 * @param time number
 * @param limit number
 * @param scheduler SchedulerLike
 * @returns OperatorFunction<T, T[]>
 */
export function bufferUntilTimeWithLimit<T>(
  time: number = 0,
  limit: number = 100,
  scheduler?: SchedulerLike
): OperatorFunction<T, T[]> {
  return (source: Observable<T>) => {
    const sharedSource = source.pipe(share());

    let bufferedValues: T[] = [];

    const notifierDebouncing$ = sharedSource.pipe(delay(time, scheduler));

    const notifierLimiting$ = sharedSource.pipe(
      tap((value) => bufferedValues.push(value)),
      filter(() => bufferedValues.length >= limit)
    );

    const notifier$ = race(notifierLimiting$, notifierDebouncing$);

    return sharedSource.pipe(
      bufferWhen(() => notifier$),
      map(() => bufferedValues),
      tap(() => (bufferedValues = []))
    );
  };
}
