From 3e7def3a26eed5d44fab1d13ea222dabddee5c0e Mon Sep 17 00:00:00 2001 From: Elizabeth Hunt Date: Tue, 6 Aug 2024 23:56:14 -0700 Subject: [PATCH] scheduler pt 1 --- src/util/scheduler.ts | 94 +++++++++++++++++++++++++++++++++++++------ 1 file changed, 81 insertions(+), 13 deletions(-) diff --git a/src/util/scheduler.ts b/src/util/scheduler.ts index 9f9f594..3265c7c 100644 --- a/src/util/scheduler.ts +++ b/src/util/scheduler.ts @@ -1,25 +1,93 @@ import * as TE from "fp-ts/lib/TaskEither"; -import { pipe } from "fp-ts/lib/function"; +import * as IO from "fp-ts/lib/IO"; +import * as RA from "fp-ts/lib/ReadonlyArray"; +import * as RM from "fp-ts/lib/ReadonlyMap"; +import * as T from "fp-ts/Task"; +import * as S from "fp-ts/string"; +import { contramap, type Eq } from "fp-ts/Eq"; +import { pipe, identity } from "fp-ts/lib/function"; import type { Schedule } from "../canary"; +import { ConsoleLogger, type Logger } from "./logger"; +import type { Separated } from "fp-ts/lib/Separated"; +import type { Magma } from "fp-ts/lib/Magma"; +import { intercalate } from "fp-ts/lib/Foldable"; + +interface Unit {} +const Unit: Unit = {}; interface ScheduledJob { id: string; - execute: TE.TaskEither; + execute: () => TE.TaskEither; at: Date; schedule: Schedule; } type SchedulerState = ReadonlyArray; -export const schedulerLoop = - (state: SchedulerState): TE.TaskEither => - () => { - const loop = (currentState: SchedulerState): TE.TaskEither => - pipe( - executeDueJobs(currentState), - delay(1000), // Delay for 1 second - map((newState) => loop(newState)), - ); +const logState = ( + state: SchedulerState, + now: Date, + logger: Logger = ConsoleLogger, +) => { + const ids = pipe( + state, + RA.map(({ id }) => id), + (ids) => intercalate(S.Monoid, RA.Foldable)("|", ids), + ); + return logger.log(`Executing ${ids} at ${now.toUTCString()}`); +}; - return loop(state)(); - }; +const executeDueJobs = ( + state: SchedulerState, +): IO.IO< + T.Task< + Separated, ReadonlyArray<[string, TResult]>> + > +> => + pipe( + IO.of(state), + IO.bindTo("state"), + IO.bind("now", () => () => new Date()), + IO.bind("toExecute", ({ now, state }) => + pipe(IO.of(state), IO.map(RA.filter(({ at }) => at <= now))), + ), + IO.flatMap(({ state, now }) => + pipe( + IO.of(state), + IO.tap((state) => logState(state, now)), + IO.map((state) => + pipe( + state, + RA.map(({ execute, id }) => + pipe( + execute(), + TE.bimap( + (error) => [id, error] as [string, Error], + (result) => [id, result] as [string, TResult], + ), + ), + ), + ), + ), + ), + ), + IO.map((jobs) => pipe(jobs, RA.wilt(T.ApplicativePar)(identity))), + ); + +const jobEq: Eq = pipe( + S.Eq, + contramap(({ id }) => id), +); +const jobMagma: Magma = { + concat: (a: ScheduledJob, b: ScheduledJob): ScheduledJob => ({ + ...a, + ...b, + }), +}; + +export const schedulerLoop = (jobs: ReadonlyArray) => { + const jobsTable: Map< + string, + { retries: number; specification: ScheduledJob } + > = pipe(jobs, RM.fromFoldable(jobEq, jobMagma, RA.Foldable)); +};