From 8f584b5b052748a6eb4b1d3ca31853a84f1febf3 Mon Sep 17 00:00:00 2001 From: Elizabeth Hunt Date: Sun, 11 Aug 2024 21:21:49 -0700 Subject: [PATCH] finish up the scheduler --- .prettierrc | 18 ++++ src/canary/email.ts | 129 ++++++++--------------- src/canary/job.ts | 2 +- src/canary/scheduler.ts | 190 +++++++++++++++++++++++++++++++++ src/canary/test.ts | 15 +-- src/index.ts | 5 +- src/publisher/discord.ts | 19 ++++ src/publisher/index.ts | 29 +++--- src/publisher/ntfy.ts | 11 ++ src/util/duration.ts | 114 +++++++++----------- src/util/logger.ts | 4 +- src/util/scheduler.ts | 93 ----------------- tst/canary/scheduler.spec.ts | 197 +++++++++++++++++++++++++++++++++++ 13 files changed, 554 insertions(+), 272 deletions(-) create mode 100644 .prettierrc create mode 100644 src/canary/scheduler.ts create mode 100644 src/publisher/discord.ts create mode 100644 src/publisher/ntfy.ts delete mode 100644 src/util/scheduler.ts create mode 100644 tst/canary/scheduler.spec.ts diff --git a/.prettierrc b/.prettierrc new file mode 100644 index 0000000..24cb662 --- /dev/null +++ b/.prettierrc @@ -0,0 +1,18 @@ +{ + "semi": true, + "singleQuote": false, + "trailingComma": "none", + "useTabs": false, + "printWidth": 120, + "endOfLine": "lf", + "overrides": [ + { + "files": ["**/*.scss"], + "options": { + "useTabs": false, + "tabWidth": 2, + "singleQuote": true + } + } + ] +} \ No newline at end of file diff --git a/src/canary/email.ts b/src/canary/email.ts index 1961c60..f20da8d 100644 --- a/src/canary/email.ts +++ b/src/canary/email.ts @@ -4,21 +4,13 @@ import * as O from "fp-ts/lib/Option"; import { createTransport } from "nodemailer"; import { toError } from "fp-ts/lib/Either"; import { pipe } from "fp-ts/lib/function"; -import { - ImapFlow, - type FetchMessageObject, - type FetchQueryObject, - type MailboxLockObject, -} from "imapflow"; +import { ImapFlow, type FetchMessageObject, type FetchQueryObject, type MailboxLockObject } from "imapflow"; import * as IO from "fp-ts/lib/IO"; import * as T from "fp-ts/lib/Task"; import { ConsoleLogger } from "../util"; interface ImapClientI { - fetchAll: ( - range: string, - options: FetchQueryObject, - ) => Promise; + fetchAll: (range: string, options: FetchQueryObject) => Promise; connect: () => Promise; getMailboxLock: (mailbox: string) => Promise; messageDelete: (uids: number[]) => Promise; @@ -40,10 +32,7 @@ class ErrorWithLock extends Error { } } const ToErrorWithLock = (lock?: MailboxLockObject) => (error: unknown) => - new ErrorWithLock( - error instanceof Error ? error.message : "Unknown error", - lock, - ); + new ErrorWithLock(error instanceof Error ? error.message : "Unknown error", lock); /** * Generate a unique email. @@ -51,42 +40,31 @@ const ToErrorWithLock = (lock?: MailboxLockObject) => (error: unknown) => * @param to is the email to send to. * @returns an {@link Email}. */ -type EmailGenerator = ( - from: EmailFromInstruction, - to: EmailToInstruction, -) => IO.IO; -const generateEmail: EmailGenerator = - (from: EmailFromInstruction, to: EmailToInstruction) => () => ({ - from: from.email, - to: to.email, - subject: [new Date().toISOString(), crypto.randomUUID()].join(" | "), - text: crypto.randomUUID(), - }); +type EmailGenerator = (from: EmailFromInstruction, to: EmailToInstruction) => IO.IO; +const generateEmail: EmailGenerator = (from: EmailFromInstruction, to: EmailToInstruction) => () => ({ + from: from.email, + to: to.email, + subject: [new Date().toISOString(), crypto.randomUUID()].join(" | "), + text: crypto.randomUUID() +}); /** * Get the transport layer for a mailbox to send a piece of mail. * @param param0 is the mailbox to send from. * @returns a function that takes an email and sends it. */ -type GetSendEmail = ( - from: EmailFromInstruction, -) => (email: Email) => TE.TaskEither; -const getSendTransport: GetSendEmail = ({ - username, - password, - server, - send_port, -}) => { +type GetSendEmail = (from: EmailFromInstruction) => (email: Email) => TE.TaskEither; +const getSendTransport: GetSendEmail = ({ username, password, server, send_port }) => { const transport = createTransport({ host: server, port: send_port, auth: { user: username, - pass: password, + pass: password }, tls: { - rejectUnauthorized: false, - }, + rejectUnauthorized: false + } }); return (email: Email) => TE.tryCatch( @@ -98,9 +76,9 @@ const getSendTransport: GetSendEmail = ({ } else { resolve(email); } - }), + }) ), - toError, + toError ); }; @@ -109,9 +87,7 @@ const getSendTransport: GetSendEmail = ({ * @param param0 is the mailbox to read from. * @returns a Right({@link ImapFlow}) if it connected, else an Left(error). */ -type GetImapClient = ( - to: EmailToInstruction, -) => TE.TaskEither; +type GetImapClient = (to: EmailToInstruction) => TE.TaskEither; const getImap: GetImapClient = ({ username, password, server, read_port }) => { const imap = new ImapFlow({ logger: false, @@ -120,8 +96,8 @@ const getImap: GetImapClient = ({ username, password, server, read_port }) => { secure: true, auth: { user: username, - pass: password, - }, + pass: password + } }); return TE.tryCatch(() => imap.connect().then(() => imap), toError); }; @@ -130,18 +106,16 @@ const getImap: GetImapClient = ({ username, password, server, read_port }) => { * @param imap is the Imap client to fetch messages from. * @returns a Right({@link FetchMessageObject}[]) if successful, else a Left(error). */ -const fetchMessages = ( - imap: ImapClientI, -): TE.TaskEither => +const fetchMessages = (imap: ImapClientI): TE.TaskEither => TE.tryCatch( () => imap.fetchAll("*", { uid: true, envelope: true, headers: true, - bodyParts: ["text"], + bodyParts: ["text"] }), - toError, + toError ); /** @@ -152,8 +126,7 @@ const fetchMessages = ( type EmailMatcher = (email: Email) => (message: FetchMessageObject) => boolean; const matchesEmail: EmailMatcher = (email) => (message) => { const subjectMatches = email.subject === message.envelope.subject; - const bodyMatches = - message.bodyParts.get("text")?.toString().trim() === email.text.trim(); + const bodyMatches = message.bodyParts.get("text")?.toString().trim() === email.text.trim(); const headers = message.headers.toLocaleString(); const fromMatches = headers.includes(`Return-Path: <${email.from}>`); const toMatches = headers.includes(`Delivered-To: ${email.to}`); @@ -172,14 +145,9 @@ type FindEmailUidInInbox = ( imap: ImapClientI, equalsEmail: (message: FetchMessageObject) => boolean, retries: number, - pollIntervalMs: number, + pollIntervalMs: number ) => TE.TaskEither; -const findEmailUidInInbox: FindEmailUidInInbox = ( - imap, - equalsEmail, - retries, - pollIntervalMs, -) => +const findEmailUidInInbox: FindEmailUidInInbox = (imap, equalsEmail, retries, pollIntervalMs) => pipe( fetchMessages(imap), TE.flatMap((messages) => { @@ -193,17 +161,11 @@ const findEmailUidInInbox: FindEmailUidInInbox = ( (e) => pipe( TE.fromIO(ConsoleLogger.log(`failed; ${retries} retries left.`)), - TE.chain(() => - retries === 0 - ? TE.left(e) - : T.delay(pollIntervalMs)(TE.right(null)), - ), - TE.chain(() => - findEmailUidInInbox(imap, equalsEmail, retries - 1, pollIntervalMs), - ), + TE.chain(() => (retries === 0 ? TE.left(e) : T.delay(pollIntervalMs)(TE.right(null)))), + TE.chain(() => findEmailUidInInbox(imap, equalsEmail, retries - 1, pollIntervalMs)) ), - TE.of, - ), + TE.of + ) ); export type EmailJobDependencies = { @@ -225,40 +187,31 @@ export const perform = ( getSendImpl = getSendTransport, getImapImpl = getImap, findEmailUidInInboxImpl = findEmailUidInInbox, - matchesEmailImpl = matchesEmail, - }: Partial = {}, + matchesEmailImpl = matchesEmail + }: Partial = {} ): TE.TaskEither => pipe( // arrange. TE.fromIO(generateEmailImpl(from, to)), TE.bindTo("email"), // act. - TE.tap(({ email }) => - pipe(getSendImpl(from)(email), TE.mapLeft(ToErrorWithLock())), - ), + TE.tap(({ email }) => pipe(getSendImpl(from)(email), TE.mapLeft(ToErrorWithLock()))), TE.bind("imap", () => pipe(getImapImpl(to), TE.mapLeft(ToErrorWithLock()))), - TE.bind("mailboxLock", ({ imap }) => - TE.tryCatch(() => imap.getMailboxLock("INBOX"), ToErrorWithLock()), - ), + TE.bind("mailboxLock", ({ imap }) => TE.tryCatch(() => imap.getMailboxLock("INBOX"), ToErrorWithLock())), // "assert". TE.bind("uid", ({ imap, email, mailboxLock }) => pipe( - findEmailUidInInboxImpl( - imap, - matchesEmailImpl(email), - retries, - interval, - ), - TE.mapLeft(ToErrorWithLock(mailboxLock)), - ), + findEmailUidInInboxImpl(imap, matchesEmailImpl(email), retries, interval), + TE.mapLeft(ToErrorWithLock(mailboxLock)) + ) ), // cleanup. TE.bind("deleted", ({ imap, uid, mailboxLock }) => TE.tryCatch( // () => imap.messageDelete([uid], { uid: true }), () => imap.messageDelete([uid]), - ToErrorWithLock(mailboxLock), - ), + ToErrorWithLock(mailboxLock) + ) ), TE.fold( (e) => { @@ -270,6 +223,6 @@ export const perform = ( ({ mailboxLock, deleted }) => { mailboxLock.release(); return TE.right(deleted); - }, - ), + } + ) ); diff --git a/src/canary/job.ts b/src/canary/job.ts index 61af876..673e134 100644 --- a/src/canary/job.ts +++ b/src/canary/job.ts @@ -38,4 +38,4 @@ export interface EmailJob { readRetry: Retry; } -export type Job = EmailJob | PingJob | HealthCheckJob | DnsJob; +export type TestJob = EmailJob | PingJob | HealthCheckJob | DnsJob; diff --git a/src/canary/scheduler.ts b/src/canary/scheduler.ts new file mode 100644 index 0000000..2975991 --- /dev/null +++ b/src/canary/scheduler.ts @@ -0,0 +1,190 @@ +import * as TE from "fp-ts/lib/TaskEither"; +import * as IO from "fp-ts/lib/IO"; +import * as RA from "fp-ts/lib/ReadonlyArray"; +import * as RM from "fp-ts/lib/ReadonlyMap"; +import * as T from "fp-ts/Task"; +import * as S from "fp-ts/string"; +import { pipe, identity } from "fp-ts/lib/function"; +import type { Separated } from "fp-ts/lib/Separated"; +import type { Magma } from "fp-ts/lib/Magma"; +import { intercalate } from "fp-ts/lib/Foldable"; +import { nextSchedule, type Schedule } from "../canary"; +import { ConsoleLogger, type Logger } from "../util"; + +export interface Job { + id: string; + toString: () => string; + execute: () => TE.TaskEither; + schedule: Schedule; + maxRetries: number; +} + +export const logExecutingJobs = (jobs: ReadonlyArray, now: Date, logger: Logger = ConsoleLogger) => { + const stringifieds = pipe( + jobs, + RA.map(({ toString }) => toString()), + (stringified) => intercalate(S.Monoid, RA.Foldable)("|", stringified) + ); + return logger.log(`Executing ${stringifieds} at ${now.toUTCString()}`); +}; + +export const execute = ( + jobs: ReadonlyArray +): T.Task, ReadonlyArray<[Job, TResult]>>> => + pipe( + T.of(jobs), + T.map( + RA.map((job) => + pipe( + job.execute(), + TE.bimap( + (error) => [job, error] as [Job, Error], + (result) => [job, result] as [Job, TResult] + ) + ) + ) + ), + T.flatMap(RA.wilt(T.ApplicativePar)(identity)) + ); + +export type JobsTableRow = { + scheduled: Date; + retries: number; +}; +export type JobsTable = ReadonlyMap; + +export const jobsTableRowMagma: Magma = { + concat: (a: JobsTableRow, b: JobsTableRow): JobsTableRow => { + if (a.scheduled <= b.scheduled) { + // always prefer the later schedule. + return b; + } + // if (a.retries <= b.retries) { + // return a; + // } + return b; + } +}; + +export const formatJobResults = ( + jobsTable: JobsTable, + { left, right }: Separated, ReadonlyArray<[Job, TResult]>> +) => { + const failures = left + .map(([job, err]) => ({ + job, + retries: jobsTable.get(job.id)?.retries ?? job.maxRetries, + err + })) + .filter(({ job, retries }) => retries >= job.maxRetries); + const retries = left + .map(([job, err]) => ({ + job, + retries: jobsTable.get(job.id)?.retries ?? 0, + err + })) + .filter(({ job, retries }) => retries < job.maxRetries); + + const failureMessages = failures + .map(({ job, err, retries }) => job.toString() + ` | ${retries} / ${job.maxRetries} | (err) :( | ${err.message}`) + .join("\n"); + const retryMessages = retries + .map( + ({ job, err, retries }) => job.toString() + ` | ${retries} / ${job.maxRetries} | (retry) :/ | ${err.message}` + ) + .join("\n"); + const successMessages = right.map(([job, val]) => job.toString() + " | (success) :) | " + val).join("\n"); + return `FAILURES:\n${failureMessages}\nRETRIES:\n${retryMessages}\nSUCCESSES:\n${successMessages}`; +}; + +export const perform = ( + jobsTable: JobsTable, + jobs: ReadonlyArray, + publishers: ReadonlyArray<(message: string) => TE.TaskEither>, + lastPingAck: Date, + publishAckEvery = 24 * (60 * 60 * 1_000), + logger = ConsoleLogger +) => + pipe( + T.of(jobs), + T.bindTo("jobs"), + T.bind( + "now", + T.fromIOK(() => IO.of(new Date())) + ), + T.bind("toExecute", ({ jobs, now }) => + pipe( + jobs, + RA.filter((job) => (jobsTable.get(job.id)?.scheduled ?? now) <= now), + T.of + ) + ), + T.tap(({ toExecute, now }) => T.fromIO(logExecutingJobs(toExecute, now))), + T.bind("results", ({ toExecute }) => execute(toExecute)), + T.bind("shouldAck", ({ now }) => T.of(now.getTime() - lastPingAck.getTime() >= publishAckEvery)), + T.tap(({ results, shouldAck }) => { + if (results.left.length === 0 && !shouldAck) { + return pipe( + T.of({ right: [] as ReadonlyArray }), + T.tap(() => T.fromIO(logger.log("not publishing"))) + ); + } + + return pipe( + publishers, + RA.map((publish) => publish(formatJobResults(jobsTable, results))), + RA.wilt(T.ApplicativePar)(identity), + T.tap(({ left }) => + pipe( + left + ? logger.error("Encountered publishing errors: " + left.toString()) + : logger.log("Published successfully"), + T.fromIO + ) + ) + ); + }), + T.map(({ now, results: { left, right }, shouldAck }) => { + const ack = shouldAck ? now : lastPingAck; + + const leftResults = pipe( + left, + RA.map(([job]): readonly [string, JobsTableRow] => { + const row = jobsTable.get(job.id); + if (!row) { + return [job.id, { retries: 1, scheduled: now }]; + } + if (row.retries >= job.maxRetries) { + logger.error(`Hit max retries; scheduling again ;-; ${job.toString()}`); + return [job.id, { retries: 0, scheduled: nextSchedule(job.schedule, now) }] as const; + } + return [ + job.id, + { + retries: row.retries + 1, + scheduled: now + } + ] as const; + }) + ); + const results = pipe( + right, + RA.map(([job]): readonly [string, JobsTableRow] => [ + job.id, + { + retries: 0, + scheduled: nextSchedule(job.schedule, now) + } + ]), + RA.concat(leftResults) + ); + + const newJobsTable = pipe( + jobsTable, + RM.toReadonlyArray(S.Ord), + RA.concat(results), + RM.fromFoldable(S.Eq, jobsTableRowMagma, RA.Foldable) + ); + return { lastPingAck: ack, jobsTable: newJobsTable }; + }) + ); diff --git a/src/canary/test.ts b/src/canary/test.ts index c03fb30..d60d2ec 100644 --- a/src/canary/test.ts +++ b/src/canary/test.ts @@ -1,13 +1,13 @@ import * as TE from "fp-ts/lib/TaskEither"; import * as O from "fp-ts/lib/Option"; -import type { Job } from "./job"; +import type { TestJob } from "./job"; import type { D } from "../util"; export enum TestType { EMAIL = "email", PING = "ping", HEALTHCHECK = "healthcheck", - DNS = "dns", + DNS = "dns" } export interface Schedule { @@ -18,13 +18,14 @@ export interface Schedule { export interface Test { name: string; type: TestType; - job: Job; + job: TestJob; schedule: Schedule; } -export type Testable = (job: T) => TE.TaskEither; +export type Testable = (job: TJob) => TE.TaskEither; export const parseTestType = (testType: string): O.Option => - Object.values(TestType).includes(testType as TestType) - ? O.some(testType as TestType) - : O.none; + Object.values(TestType).includes(testType as TestType) ? O.some(testType as TestType) : O.none; + +export const nextSchedule = (schedule: Schedule, date: Date) => + new Date(date.getTime() + schedule.every + Math.random() * schedule.jitter); diff --git a/src/index.ts b/src/index.ts index 2b3e95e..c7d8438 100644 --- a/src/index.ts +++ b/src/index.ts @@ -8,9 +8,6 @@ const main: TE.TaskEither = pipe( TE.fromEither(parseArgs(Bun.argv)), TE.bindTo("args"), TE.bind("config", ({ args }) => TE.fromIO(readConfig(args.testsFile))), - TE.fold( - (e) => TE.left(toError(e)), - () => TE.right(undefined), - ), + TE.map(({ config }) => TE.tryCatch(() => {}, toError)) ); main(); diff --git a/src/publisher/discord.ts b/src/publisher/discord.ts new file mode 100644 index 0000000..02082b4 --- /dev/null +++ b/src/publisher/discord.ts @@ -0,0 +1,19 @@ +import { $ } from "bun"; + +export interface DiscordPost { + webhook: string; + role_id: string; +} + +export const publishDiscord = async (discordPost: DiscordPost, message: string) => { + console.log("Publishing to Discord"); + const ip = await $`dig +noall +short discord.com @1.1.1.1 A | shuf -n 1`.text(); + return fetch(discordPost.webhook.replace("discord.com", ip), { + headers: { + Host: "discord.com" + }, + method: "POST", + body: JSON.stringify({ message: `<@${discordPost.role_id}>\n${message}` }), + tls: { rejectUnauthorized: false } + }).then((r) => r.status); +}; diff --git a/src/publisher/index.ts b/src/publisher/index.ts index f37e108..5d17b5b 100644 --- a/src/publisher/index.ts +++ b/src/publisher/index.ts @@ -1,23 +1,26 @@ -import { D } from "../util"; +import { toError } from "fp-ts/lib/Either"; +import * as TE from "fp-ts/TaskEither"; +import { publishDiscord, type DiscordPost } from "./discord"; +import { publishNtfy, type NtfyPost } from "./ntfy"; export enum PublisherType { DISCORD = "discord", - NTFY = "ntfy", + NTFY = "ntfy" } - -export interface DiscordPost { - webhook: string; - role_id: string; -} - -export interface NtfyPost { - webhook: string; -} - export type PublisherPost = DiscordPost | NtfyPost; export interface Publisher { type: PublisherType; - at: D.Duration; post: PublisherPost; } + +export const publish = (publisher: Publisher, message: string): TE.TaskEither => { + switch (publisher.type) { + case PublisherType.DISCORD: + return TE.tryCatch(() => publishDiscord(publisher.post as DiscordPost, message), toError); + case PublisherType.NTFY: + return TE.tryCatch(() => publishNtfy(publisher.post as NtfyPost, message), toError); + default: + return TE.left(new Error("unknown publisher type: " + publisher.type)); + } +}; diff --git a/src/publisher/ntfy.ts b/src/publisher/ntfy.ts new file mode 100644 index 0000000..803cef3 --- /dev/null +++ b/src/publisher/ntfy.ts @@ -0,0 +1,11 @@ +export interface NtfyPost { + webhook: string; +} + +export const publishNtfy = async (ntfyPost: NtfyPost, message: string) => { + console.log("publishing to Ntfy"); + return fetch(ntfyPost.webhook, { + method: "POST", + body: message + }).then((r) => r.status); +}; diff --git a/src/util/duration.ts b/src/util/duration.ts index 3d1a44c..8a128d5 100644 --- a/src/util/duration.ts +++ b/src/util/duration.ts @@ -10,7 +10,7 @@ export enum DurationUnit { MILLISECOND, SECOND, MINUTE, - HOUR, + HOUR } const durationUnitMap: Record = { ms: DurationUnit.MILLISECOND, @@ -21,17 +21,14 @@ const durationUnitMap: Record = { minutes: DurationUnit.MINUTE, hr: DurationUnit.HOUR, hour: DurationUnit.HOUR, - hours: DurationUnit.HOUR, + hours: DurationUnit.HOUR }; -const getDurationUnit = (key: string): O.Option => - O.fromNullable(durationUnitMap[key.toLowerCase()]); +const getDurationUnit = (key: string): O.Option => O.fromNullable(durationUnitMap[key.toLowerCase()]); export const getMs = (duration: Duration): number => duration; export const getSeconds = (duration: Duration): number => duration / 1000; -export const getMinutes = (duration: Duration): number => - getSeconds(duration) / 60; -export const getHours = (duration: Duration): number => - getMinutes(duration) / 60; +export const getMinutes = (duration: Duration): number => getSeconds(duration) / 60; +export const getHours = (duration: Duration): number => getMinutes(duration) / 60; export const format = (duration: Duration): string => { const ms = getMs(duration) % 1000; const seconds = getSeconds(duration) % 60; @@ -39,9 +36,7 @@ export const format = (duration: Duration): string => { const hours = getHours(duration); return ( - [hours, minutes, seconds] - .map((x) => Math.floor(x).toString().padStart(2, "0")) - .join(":") + + [hours, minutes, seconds].map((x) => Math.floor(x).toString().padStart(2, "0")).join(":") + "." + ms.toString().padStart(3, "0") ); @@ -57,49 +52,40 @@ export const createDurationBuilder = (): DurationBuilder => ({ millis: 0, seconds: 0, minutes: 0, - hours: 0, + hours: 0 }); -export type DurationBuilderField = ( - arg: T, -) => (builder: DurationBuilder) => DurationBuilder; +export type DurationBuilderField = (arg: T) => (builder: DurationBuilder) => DurationBuilder; -export const withMillis: DurationBuilderField = - (millis) => (builder) => ({ - ...builder, - millis, - }); +export const withMillis: DurationBuilderField = (millis) => (builder) => ({ + ...builder, + millis +}); -export const withSeconds: DurationBuilderField = - (seconds) => (builder) => ({ - ...builder, - seconds, - }); +export const withSeconds: DurationBuilderField = (seconds) => (builder) => ({ + ...builder, + seconds +}); -export const withMinutes: DurationBuilderField = - (minutes) => (builder) => ({ - ...builder, - minutes, - }); +export const withMinutes: DurationBuilderField = (minutes) => (builder) => ({ + ...builder, + minutes +}); -export const withHours: DurationBuilderField = - (hours) => (builder) => ({ - ...builder, - hours, - }); +export const withHours: DurationBuilderField = (hours) => (builder) => ({ + ...builder, + hours +}); export const build = (builder: DurationBuilder): Duration => - builder.millis + - builder.seconds * 1000 + - builder.minutes * 60 * 1000 + - builder.hours * 60 * 60 * 1000; + builder.millis + builder.seconds * 1000 + builder.minutes * 60 * 1000 + builder.hours * 60 * 60 * 1000; export const parse = (duration: string): E.Either => { const parts = pipe( duration, S.split(" "), R.map(S.trim), - R.filter((part) => !S.isEmpty(part)), + R.filter((part) => !S.isEmpty(part)) ); const valueUnitPairs = pipe( @@ -120,36 +106,34 @@ export const parse = (duration: string): E.Either => { E.map( flow( R.filter(O.isSome), - R.map(({ value }) => value), - ), - ), + R.map(({ value }) => value) + ) + ) ); return pipe( valueUnitPairs, E.flatMap( - R.reduce( - E.of(createDurationBuilder()), - (builderEither, [unit, value]) => - pipe( - builderEither, - E.chain((builder) => { - switch (unit) { - case DurationUnit.MILLISECOND: - return E.right(withMillis(value)(builder)); - case DurationUnit.SECOND: - return E.right(withSeconds(value)(builder)); - case DurationUnit.MINUTE: - return E.right(withMinutes(value)(builder)); - case DurationUnit.HOUR: - return E.right(withHours(value)(builder)); - default: - return E.left(`unknown unit: ${unit}`); - } - }), - ), - ), + R.reduce(E.of(createDurationBuilder()), (builderEither, [unit, value]) => + pipe( + builderEither, + E.chain((builder) => { + switch (unit) { + case DurationUnit.MILLISECOND: + return E.right(withMillis(value)(builder)); + case DurationUnit.SECOND: + return E.right(withSeconds(value)(builder)); + case DurationUnit.MINUTE: + return E.right(withMinutes(value)(builder)); + case DurationUnit.HOUR: + return E.right(withHours(value)(builder)); + default: + return E.left(`unknown unit: ${unit}`); + } + }) + ) + ) ), - E.map(build), + E.map(build) ); }; diff --git a/src/util/logger.ts b/src/util/logger.ts index 28e38f0..3215844 100644 --- a/src/util/logger.ts +++ b/src/util/logger.ts @@ -2,8 +2,10 @@ import type { IO } from "fp-ts/lib/IO"; export interface Logger { log: (message: string) => IO; + error: (message: string) => IO; } export const ConsoleLogger: Logger = { - log: (message: string) => () => console.log(message), + log: (message: string) => () => console.log(new Date(), "[INFO]", message), + error: (message: string) => () => console.error(new Date(), "[ERROR]", message) }; diff --git a/src/util/scheduler.ts b/src/util/scheduler.ts deleted file mode 100644 index 3265c7c..0000000 --- a/src/util/scheduler.ts +++ /dev/null @@ -1,93 +0,0 @@ -import * as TE from "fp-ts/lib/TaskEither"; -import * as IO from "fp-ts/lib/IO"; -import * as RA from "fp-ts/lib/ReadonlyArray"; -import * as RM from "fp-ts/lib/ReadonlyMap"; -import * as T from "fp-ts/Task"; -import * as S from "fp-ts/string"; -import { contramap, type Eq } from "fp-ts/Eq"; -import { pipe, identity } from "fp-ts/lib/function"; -import type { Schedule } from "../canary"; -import { ConsoleLogger, type Logger } from "./logger"; -import type { Separated } from "fp-ts/lib/Separated"; -import type { Magma } from "fp-ts/lib/Magma"; -import { intercalate } from "fp-ts/lib/Foldable"; - -interface Unit {} -const Unit: Unit = {}; - -interface ScheduledJob { - id: string; - execute: () => TE.TaskEither; - at: Date; - schedule: Schedule; -} - -type SchedulerState = ReadonlyArray; - -const logState = ( - state: SchedulerState, - now: Date, - logger: Logger = ConsoleLogger, -) => { - const ids = pipe( - state, - RA.map(({ id }) => id), - (ids) => intercalate(S.Monoid, RA.Foldable)("|", ids), - ); - return logger.log(`Executing ${ids} at ${now.toUTCString()}`); -}; - -const executeDueJobs = ( - state: SchedulerState, -): IO.IO< - T.Task< - Separated, ReadonlyArray<[string, TResult]>> - > -> => - pipe( - IO.of(state), - IO.bindTo("state"), - IO.bind("now", () => () => new Date()), - IO.bind("toExecute", ({ now, state }) => - pipe(IO.of(state), IO.map(RA.filter(({ at }) => at <= now))), - ), - IO.flatMap(({ state, now }) => - pipe( - IO.of(state), - IO.tap((state) => logState(state, now)), - IO.map((state) => - pipe( - state, - RA.map(({ execute, id }) => - pipe( - execute(), - TE.bimap( - (error) => [id, error] as [string, Error], - (result) => [id, result] as [string, TResult], - ), - ), - ), - ), - ), - ), - ), - IO.map((jobs) => pipe(jobs, RA.wilt(T.ApplicativePar)(identity))), - ); - -const jobEq: Eq = pipe( - S.Eq, - contramap(({ id }) => id), -); -const jobMagma: Magma = { - concat: (a: ScheduledJob, b: ScheduledJob): ScheduledJob => ({ - ...a, - ...b, - }), -}; - -export const schedulerLoop = (jobs: ReadonlyArray) => { - const jobsTable: Map< - string, - { retries: number; specification: ScheduledJob } - > = pipe(jobs, RM.fromFoldable(jobEq, jobMagma, RA.Foldable)); -}; diff --git a/tst/canary/scheduler.spec.ts b/tst/canary/scheduler.spec.ts new file mode 100644 index 0000000..dbfed19 --- /dev/null +++ b/tst/canary/scheduler.spec.ts @@ -0,0 +1,197 @@ +import { mock, test, expect } from "bun:test"; +import * as TE from "fp-ts/lib/TaskEither"; +import type { Logger } from "../../src/util"; +import { + perform, + type JobsTable, + execute, + formatJobResults, + logExecutingJobs, + type Job +} from "../../src/canary/scheduler"; + +const getMocks = () => { + const mockLogger: Logger = { + log: mock(), + error: mock() + }; + return { mockLogger }; +}; + +const schedule = { + every: 200, + jitter: 100 +}; + +test("logging", () => { + const { mockLogger } = getMocks(); + const jobs: Job[] = [ + { id: "1", toString: () => "Job 1", execute: mock(), schedule, maxRetries: 3 }, + { id: "2", toString: () => "Job 2", execute: mock(), schedule, maxRetries: 3 } + ]; + const now = new Date("2023-01-01T00:00:00Z"); + + logExecutingJobs(jobs, now, mockLogger); + + expect(mockLogger.log).toHaveBeenCalledWith("Executing Job 1|Job 2 at Sun, 01 Jan 2023 00:00:00 GMT"); +}); + +test("should separate jobs into successful and failed executions", async () => { + const job1: Job = { + id: "1", + toString: () => "Job 1", + execute: mock(() => TE.right("Result 1") as any), + schedule, + maxRetries: 3 + }; + const job2: Job = { + id: "2", + toString: () => "Job 2", + execute: mock(() => TE.left(new Error("Failure 2")) as any), + schedule, + maxRetries: 3 + }; + const jobs: Job[] = [job1, job2]; + + const result = await execute(jobs)(); + + expect(result.left).toEqual([[job2, new Error("Failure 2")]]); + expect(result.right).toEqual([[job1, "Result 1"]]); +}); + +test("should format job results correctly", () => { + const jobsTable: JobsTable = new Map([["1", { scheduled: new Date("2023-01-01T00:00:00Z"), retries: 1 }]]); + const left = [ + [{ id: "1", toString: () => "Job 1", execute: mock(), schedule: {}, maxRetries: 3 }, new Error("Error 1")] + ]; + const right = [[{ id: "2", toString: () => "Job 2", execute: mock(), schedule: {}, maxRetries: 3 }, "Success 2"]]; + + const result = formatJobResults(jobsTable, { left, right } as any); + + expect(result).toContain("Job 1 | 1 / 3 | (retry) :/ | Error 1"); + expect(result).toContain("Job 2 | (success) :) | Success 2"); +}); + +test("should update jobsTable and lastPingAck correctly", async () => { + const jobsTable: JobsTable = new Map([["1", { scheduled: new Date("2023-01-01T00:00:00Z"), retries: 1 }]]); + const jobs: Job[] = [ + { + id: "1", + toString: () => "Job 1", + execute: mock(() => TE.right("Result 1") as any), + schedule, + maxRetries: 3 + } + ]; + const publishers: any = []; + const lastPingAck = new Date("2023-01-01T00:00:00Z"); + + const result = await perform(jobsTable, jobs, publishers, lastPingAck)(); + + expect(result.lastPingAck).not.toEqual(lastPingAck); + expect(result.jobsTable.get("1")).toEqual({ + retries: 0, + scheduled: expect.any(Date) + }); +}); + +test("should update a job with retry count on failure", async () => { + // Create a mock job that fails the first time but succeeds the second time + const job1: Job = { + id: "1", + toString: () => "Job 1", + execute: mock(() => TE.left(new Error("Error 1")) as any), + schedule, + maxRetries: 2 + }; + + const jobsTable: JobsTable = new Map([["1", { scheduled: new Date("2023-01-01T00:00:00Z"), retries: 0 }]]); + const jobs: Job[] = [job1]; + const publishers: any = []; + const lastPingAck = new Date("2023-01-01T00:00:00Z"); + + const result = await perform(jobsTable, jobs, publishers, lastPingAck, 24 * 60 * 60 * 1000)(); + + // Assert the job was retried once and then succeeded + expect(job1.execute).toHaveBeenCalled(); + + // Check the jobsTable for the updated state + expect(result.jobsTable.get("1")).toEqual({ + retries: 1, + scheduled: expect.any(Date) + }); +}); + +test("should reschedule a job that hits max retries", async () => { + // Create a mock job that fails the first time but succeeds the second time + const job1: Job = { + id: "1", + toString: () => "Job 1", + execute: mock().mockReturnValue(TE.left(new Error("Error 1"))), + schedule, + maxRetries: 4 + }; + + const jobsTable: JobsTable = new Map([["1", { scheduled: new Date("2023-01-01T00:00:00Z"), retries: 4 }]]); + const jobs: Job[] = [job1]; + const publishers: any = [mock().mockReturnValue(TE.right(200))]; + + const lastPingAck = new Date("2023-01-01T00:00:00Z"); + + const now = Date.now(); + const result = await perform(jobsTable, jobs, publishers, lastPingAck, 24 * 60 * 60 * 1000)(); + const delta = Date.now() - now; + + // Assert the job was retried once and then fail + expect(job1.execute).toHaveBeenCalled(); + + // Check the jobsTable for the updated state + const { retries, scheduled } = result.jobsTable.get("1")!; + expect(retries).toEqual(0); + expect(publishers[0]).toHaveBeenCalled(); + + expect(scheduled.getTime()).toBeGreaterThan(now - delta + schedule.every); + expect(scheduled.getTime()).toBeLessThan(now + delta + schedule.every + schedule.jitter); +}); + +test("should not publish only successes when should not ack", async () => { + // Create a mock job that fails the first time but succeeds the second time + const job1: Job = { + id: "1", + toString: () => "Job 1", + execute: mock().mockReturnValue(TE.right(new Error("Error 1"))), + schedule, + maxRetries: 4 + }; + + const jobsTable: JobsTable = new Map([["1", { scheduled: new Date("2023-01-01T00:00:00Z"), retries: 0 }]]); + const jobs: Job[] = [job1]; + const publishers: any = [mock().mockReturnValue(TE.right(200))]; + const lastPingAck = new Date(); + + await perform(jobsTable, jobs, publishers, lastPingAck, 24 * 60 * 60 * 1000)(); + + expect(job1.execute).toHaveBeenCalled(); + expect(publishers[0]).toHaveBeenCalledTimes(0); +}); + +test("should publish when should ack", async () => { + // Create a mock job that fails the first time but succeeds the second time + const job1: Job = { + id: "1", + toString: () => "Job 1", + execute: mock().mockReturnValue(TE.right(new Error("Error 1"))), + schedule, + maxRetries: 4 + }; + + const jobsTable: JobsTable = new Map([["1", { scheduled: new Date("2023-01-01T00:00:00Z"), retries: 0 }]]); + const jobs: Job[] = [job1]; + const publishers: any = [mock().mockReturnValue(TE.right(200))]; + const lastPingAck = new Date("2023-01-01T00:00:00Z"); + + await perform(jobsTable, jobs, publishers, lastPingAck, 24 * 60 * 60 * 1000)(); + + expect(job1.execute).toHaveBeenCalled(); + expect(publishers[0]).toHaveBeenCalled(); +});