/* eslint-disable no-loop-func */
/* eslint-disable no-await-in-loop */

import { fetchEventSource } from '@microsoft/fetch-event-source';
import { restApi } from '@icp/settings';
import { getLanguage } from '@icp/i18n';

class RetriableError extends Error {}

const createAbortError = (reason) => {
  const error = Error(reason);
  error.name = 'abort';
  return error;
};

export default async function* fetchSSE({ method = 'POST', url, body, retry = 3, signal }) {
  // fetchEventSource 内部只监听signal的abort事件，不会因传入时已经abort而直接终止，所以这里需要额外判断
  if (signal?.aborted) throw createAbortError(signal.reason);
  let resolve = null;
  let reject = null;
  let done = false;
  let retryTimes = 0;

  const queue = [];

  const enqueue = (msg) => {
    queue.push(msg);
    resolve?.();
  };

  fetchEventSource(url, {
    signal,
    method,
    headers: {
      'Content-Type': 'application/json',
      Accept: '*/*, text/event-stream',
      'Accept-Language': getLanguage(),
      Authorization: `Bearer ${restApi.auth.getAccessToken()}`,
    },
    body,
    openWhenHidden: true,
    // Bypass basic validation: content-type === text/event-stream
    // https://github.com/Azure/fetch-event-source/blob/main/src/fetch.ts#L19
    onopen: async (response) => {
      if (response.status >= 200 && response.status < 300) {
        return response;
      }
      if (retryTimes < retry) {
        retryTimes += 1;
        throw new RetriableError();
      }
      const responseBody = await response.json();
      throw Error(responseBody.message || response.statusText);
    },
    onmessage: enqueue,
    onerror: (err) => {
      if (err instanceof RetriableError) {
        return 1000; // retry after 1000 ms
      }
      reject?.(err);
      // Rethrow the error to stop the entire operation.
      // https://github.com/Azure/fetch-event-source/blob/main/src/fetch.ts#L43
      throw err;
    },
  }).finally(() => {
    done = true;
    resolve?.();
  });

  while (!done) {
    if (queue.length === 0) {
      await new Promise((res, rej) => {
        resolve = res;
        reject = rej;
      });
    }
    const size = queue.length;
    for (let i = 0; i < size; i += 1) {
      yield queue.shift();
    }
  }
  // 外部 abort 时 fetchEventSource 不reject而是resolve，这里手动抛一下
  if (signal?.aborted) throw createAbortError(signal.reason);
}
