import { map, switchMap, takeUntil } from 'rxjs/operators';
import orderBy from 'lodash-es/orderBy';
import { Injectable } from '@angular/core';
import {
  firstValueFrom,
  interval,
  Observable,
  of,
  Subject,
  throwError
} from 'rxjs';

import { FirebaseDbService } from 'src/app/shared/firebase-db.service';
import { JOBS_PATH } from '../shared/firebase-paths';
import { unwrapObject } from '../shared/utils/functions';
import { SkSyncHttpService } from '../shared/sksync-http.service';
import {
  Job,
  JobProgressStep,
  SKSyncJobStatusResponse,
  JobStatus
} from './job.interface';

@Injectable({ providedIn: 'root' })
export class JobService {
  constructor(
    private angularFire: FirebaseDbService,
    private skSyncHttp: SkSyncHttpService
  ) {}

  get(jobId: string): Observable<Job> {
    return this.angularFire
      .getObject(`/${JOBS_PATH}/${jobId}`)
      .pipe(map(job => this.transformJob(job)));
  }

  getByStatus(status: JobStatus): Observable<Job[]> {
    return this.angularFire
      .getList(`/${JOBS_PATH}`, ref =>
        ref.orderByChild('status').equalTo(status).limitToLast(20)
      )
      .pipe(
        map((jobs: Job[]) =>
          jobs.length ? jobs.map(job => this.transformJob(job)) : []
        ),
        map(jobs =>
          orderBy(jobs, ['completedAt', 'createdAt'], ['desc', 'desc'])
        )
      );
  }

  transformJob(job: Job): Job {
    job.progress = this.transformJobProgress(job.progress);
    return job;
  }

  transformJobProgress(progress): JobProgressStep[] {
    return orderBy(unwrapObject(progress), ['step'], ['asc']);
  }

  async retryJob(jobId: string): Promise<string> {
    await firstValueFrom(this.skSyncHttp.get(`/accounts/flow/${jobId}/retry`));

    return jobId;
  }

  getProgress(jobId: string): Observable<JobProgressStep[]> {
    return this.angularFire
      .getObject(`/${JOBS_PATH}/${jobId}/progress`)
      .pipe(map(progress => this.transformJobProgress(progress)));
  }

  updateStepStatus(
    jobId: string,
    stepId: string,
    status: JobStatus
  ): Promise<void> {
    return this.angularFire
      .object(`${JOBS_PATH}/${jobId}/progress/${stepId}/status`)
      .set(status);
  }

  getSkSyncJobStatus(
    queue: string,
    jobId: number
  ): Observable<SKSyncJobStatusResponse> {
    const destroySubject$: Subject<void> = new Subject<void>();
    return interval(1000).pipe(
      takeUntil(destroySubject$),
      switchMap(() =>
        this.skSyncHttp.get<SKSyncJobStatusResponse>(`/jobs/${queue}/${jobId}`)
      ),
      switchMap(jobProgress => {
        if (jobProgress.status === 'completed') {
          destroySubject$.next();
          return of(jobProgress);
        } else if (jobProgress.status === 'failed') {
          return throwError(jobProgress);
        } else {
          return of(jobProgress);
        }
      })
    );
  }
}
