scheduler pt 1

This commit is contained in:
Elizabeth Hunt 2024-08-06 23:56:14 -07:00
parent 5dbd168785
commit 3e7def3a26
Signed by: simponic
GPG Key ID: 2909B9A7FF6213EE
1 changed files with 81 additions and 13 deletions

View File

@ -1,25 +1,93 @@
import * as TE from "fp-ts/lib/TaskEither"; import * as TE from "fp-ts/lib/TaskEither";
import { pipe } from "fp-ts/lib/function"; import * as IO from "fp-ts/lib/IO";
import * as RA from "fp-ts/lib/ReadonlyArray";
import * as RM from "fp-ts/lib/ReadonlyMap";
import * as T from "fp-ts/Task";
import * as S from "fp-ts/string";
import { contramap, type Eq } from "fp-ts/Eq";
import { pipe, identity } from "fp-ts/lib/function";
import type { Schedule } from "../canary"; import type { Schedule } from "../canary";
import { ConsoleLogger, type Logger } from "./logger";
import type { Separated } from "fp-ts/lib/Separated";
import type { Magma } from "fp-ts/lib/Magma";
import { intercalate } from "fp-ts/lib/Foldable";
interface Unit {}
const Unit: Unit = {};
interface ScheduledJob { interface ScheduledJob {
id: string; id: string;
execute: TE.TaskEither<Error, boolean>; execute: <TResult>() => TE.TaskEither<Error, TResult>;
at: Date; at: Date;
schedule: Schedule; schedule: Schedule;
} }
type SchedulerState = ReadonlyArray<ScheduledJob>; type SchedulerState = ReadonlyArray<ScheduledJob>;
export const schedulerLoop = const logState = (
(state: SchedulerState): TE.TaskEither<Error, void> => state: SchedulerState,
() => { now: Date,
const loop = (currentState: SchedulerState): TE.TaskEither<Error, void> => logger: Logger = ConsoleLogger,
) => {
const ids = pipe(
state,
RA.map(({ id }) => id),
(ids) => intercalate(S.Monoid, RA.Foldable)("|", ids),
);
return logger.log(`Executing ${ids} at ${now.toUTCString()}`);
};
const executeDueJobs = <TResult>(
state: SchedulerState,
): IO.IO<
T.Task<
Separated<ReadonlyArray<[string, Error]>, ReadonlyArray<[string, TResult]>>
>
> =>
pipe( pipe(
executeDueJobs(currentState), IO.of(state),
delay(1000), // Delay for 1 second IO.bindTo("state"),
map((newState) => loop(newState)), IO.bind("now", () => () => new Date()),
IO.bind("toExecute", ({ now, state }) =>
pipe(IO.of(state), IO.map(RA.filter(({ at }) => at <= now))),
),
IO.flatMap(({ state, now }) =>
pipe(
IO.of(state),
IO.tap((state) => logState(state, now)),
IO.map((state) =>
pipe(
state,
RA.map(({ execute, id }) =>
pipe(
execute(),
TE.bimap(
(error) => [id, error] as [string, Error],
(result) => [id, result] as [string, TResult],
),
),
),
),
),
),
),
IO.map((jobs) => pipe(jobs, RA.wilt(T.ApplicativePar)(identity))),
); );
return loop(state)(); const jobEq: Eq<ScheduledJob> = pipe(
}; S.Eq,
contramap(({ id }) => id),
);
const jobMagma: Magma<ScheduledJob> = {
concat: (a: ScheduledJob, b: ScheduledJob): ScheduledJob => ({
...a,
...b,
}),
};
export const schedulerLoop = (jobs: ReadonlyArray<ScheduledJob>) => {
const jobsTable: Map<
string,
{ retries: number; specification: ScheduledJob }
> = pipe(jobs, RM.fromFoldable(jobEq, jobMagma, RA.Foldable));
};