import { Observable, OperatorFunction, SchedulerLike, Subject, race } from 'rxjs';
import { debounce, debounceTime, distinct, filter, map, share, take, tap } from 'rxjs/operators';

export function bufferDebounceTime<T>(time: number = 0, scheduler?: SchedulerLike): OperatorFunction<T, T[]> {
  return (source: Observable<T>) => {
    let bufferedValues: T[] = [];

    return source.pipe(
      tap((value) => bufferedValues.push(value)),
      debounceTime(time, scheduler),
      map(() => bufferedValues),
      tap(() => (bufferedValues = []))
    );
  };
}

export function bufferDebounceTimeWithLimit<T>(
  time: number = 0,
  limit: number = 100,
  scheduler?: SchedulerLike
): OperatorFunction<T, T[]> {
  return (source: Observable<T>) => {
    const sharedSource = source.pipe(share());

    let bufferedValues: T[] = [];
    const notifierDebouncing$ = sharedSource.pipe(debounceTime(time, scheduler));

    const notifierLimiting$ = sharedSource.pipe(filter(() => bufferedValues.length >= limit));

    const notifier$ = race(notifierLimiting$, notifierDebouncing$).pipe(take(1));

    return source.pipe(
      tap((value) => bufferedValues.push(value)),
      debounce((_) => notifier$),
      map(() => bufferedValues),
      tap(() => (bufferedValues = []))
    );
  };
}

export function distinctBufferDebounceTime<T, K>(
  keySelector?: (value: T) => K,
  time: number = 0,
  scheduler?: SchedulerLike
): OperatorFunction<T, T[]> {
  return (source: Observable<T>) => {
    let bufferedValues: T[] = [];
    const flush = new Subject<void>();

    return source.pipe(
      distinct(keySelector, flush.asObservable()),
      tap((value) => bufferedValues.push(value)),
      debounceTime(time, scheduler),
      map(() => bufferedValues),
      tap(() => flush.next()),
      tap(() => (bufferedValues = []))
    );
  };
}
