finish up the scheduler

This commit is contained in:
Elizabeth Hunt 2024-08-11 20:46:58 -07:00
parent 3e7def3a26
commit f0dc72c60b
Signed by: simponic
GPG Key ID: 2909B9A7FF6213EE
12 changed files with 549 additions and 268 deletions

18
.prettierrc Normal file
View File

@ -0,0 +1,18 @@
{
"semi": true,
"singleQuote": false,
"trailingComma": "none",
"useTabs": false,
"printWidth": 120,
"endOfLine": "lf",
"overrides": [
{
"files": ["**/*.scss"],
"options": {
"useTabs": false,
"tabWidth": 2,
"singleQuote": true
}
}
]
}

View File

@ -4,21 +4,13 @@ import * as O from "fp-ts/lib/Option";
import { createTransport } from "nodemailer";
import { toError } from "fp-ts/lib/Either";
import { pipe } from "fp-ts/lib/function";
import {
ImapFlow,
type FetchMessageObject,
type FetchQueryObject,
type MailboxLockObject,
} from "imapflow";
import { ImapFlow, type FetchMessageObject, type FetchQueryObject, type MailboxLockObject } from "imapflow";
import * as IO from "fp-ts/lib/IO";
import * as T from "fp-ts/lib/Task";
import { ConsoleLogger } from "../util";
interface ImapClientI {
fetchAll: (
range: string,
options: FetchQueryObject,
) => Promise<FetchMessageObject[]>;
fetchAll: (range: string, options: FetchQueryObject) => Promise<FetchMessageObject[]>;
connect: () => Promise<void>;
getMailboxLock: (mailbox: string) => Promise<MailboxLockObject>;
messageDelete: (uids: number[]) => Promise<boolean>;
@ -40,10 +32,7 @@ class ErrorWithLock extends Error {
}
}
const ToErrorWithLock = (lock?: MailboxLockObject) => (error: unknown) =>
new ErrorWithLock(
error instanceof Error ? error.message : "Unknown error",
lock,
);
new ErrorWithLock(error instanceof Error ? error.message : "Unknown error", lock);
/**
* Generate a unique email.
@ -51,42 +40,31 @@ const ToErrorWithLock = (lock?: MailboxLockObject) => (error: unknown) =>
* @param to is the email to send to.
* @returns an {@link Email}.
*/
type EmailGenerator = (
from: EmailFromInstruction,
to: EmailToInstruction,
) => IO.IO<Email>;
const generateEmail: EmailGenerator =
(from: EmailFromInstruction, to: EmailToInstruction) => () => ({
type EmailGenerator = (from: EmailFromInstruction, to: EmailToInstruction) => IO.IO<Email>;
const generateEmail: EmailGenerator = (from: EmailFromInstruction, to: EmailToInstruction) => () => ({
from: from.email,
to: to.email,
subject: [new Date().toISOString(), crypto.randomUUID()].join(" | "),
text: crypto.randomUUID(),
});
text: crypto.randomUUID()
});
/**
* Get the transport layer for a mailbox to send a piece of mail.
* @param param0 is the mailbox to send from.
* @returns a function that takes an email and sends it.
*/
type GetSendEmail = (
from: EmailFromInstruction,
) => (email: Email) => TE.TaskEither<Error, Email>;
const getSendTransport: GetSendEmail = ({
username,
password,
server,
send_port,
}) => {
type GetSendEmail = (from: EmailFromInstruction) => (email: Email) => TE.TaskEither<Error, Email>;
const getSendTransport: GetSendEmail = ({ username, password, server, send_port }) => {
const transport = createTransport({
host: server,
port: send_port,
auth: {
user: username,
pass: password,
pass: password
},
tls: {
rejectUnauthorized: false,
},
rejectUnauthorized: false
}
});
return (email: Email) =>
TE.tryCatch(
@ -98,9 +76,9 @@ const getSendTransport: GetSendEmail = ({
} else {
resolve(email);
}
}),
})
),
toError,
toError
);
};
@ -109,9 +87,7 @@ const getSendTransport: GetSendEmail = ({
* @param param0 is the mailbox to read from.
* @returns a Right({@link ImapFlow}) if it connected, else an Left(error).
*/
type GetImapClient = (
to: EmailToInstruction,
) => TE.TaskEither<Error, ImapClientI>;
type GetImapClient = (to: EmailToInstruction) => TE.TaskEither<Error, ImapClientI>;
const getImap: GetImapClient = ({ username, password, server, read_port }) => {
const imap = new ImapFlow({
logger: false,
@ -120,8 +96,8 @@ const getImap: GetImapClient = ({ username, password, server, read_port }) => {
secure: true,
auth: {
user: username,
pass: password,
},
pass: password
}
});
return TE.tryCatch(() => imap.connect().then(() => imap), toError);
};
@ -130,18 +106,16 @@ const getImap: GetImapClient = ({ username, password, server, read_port }) => {
* @param imap is the Imap client to fetch messages from.
* @returns a Right({@link FetchMessageObject}[]) if successful, else a Left(error).
*/
const fetchMessages = (
imap: ImapClientI,
): TE.TaskEither<Error, FetchMessageObject[]> =>
const fetchMessages = (imap: ImapClientI): TE.TaskEither<Error, FetchMessageObject[]> =>
TE.tryCatch(
() =>
imap.fetchAll("*", {
uid: true,
envelope: true,
headers: true,
bodyParts: ["text"],
bodyParts: ["text"]
}),
toError,
toError
);
/**
@ -152,8 +126,7 @@ const fetchMessages = (
type EmailMatcher = (email: Email) => (message: FetchMessageObject) => boolean;
const matchesEmail: EmailMatcher = (email) => (message) => {
const subjectMatches = email.subject === message.envelope.subject;
const bodyMatches =
message.bodyParts.get("text")?.toString().trim() === email.text.trim();
const bodyMatches = message.bodyParts.get("text")?.toString().trim() === email.text.trim();
const headers = message.headers.toLocaleString();
const fromMatches = headers.includes(`Return-Path: <${email.from}>`);
const toMatches = headers.includes(`Delivered-To: ${email.to}`);
@ -172,14 +145,9 @@ type FindEmailUidInInbox = (
imap: ImapClientI,
equalsEmail: (message: FetchMessageObject) => boolean,
retries: number,
pollIntervalMs: number,
pollIntervalMs: number
) => TE.TaskEither<Error, number>;
const findEmailUidInInbox: FindEmailUidInInbox = (
imap,
equalsEmail,
retries,
pollIntervalMs,
) =>
const findEmailUidInInbox: FindEmailUidInInbox = (imap, equalsEmail, retries, pollIntervalMs) =>
pipe(
fetchMessages(imap),
TE.flatMap((messages) => {
@ -193,17 +161,11 @@ const findEmailUidInInbox: FindEmailUidInInbox = (
(e) =>
pipe(
TE.fromIO(ConsoleLogger.log(`failed; ${retries} retries left.`)),
TE.chain(() =>
retries === 0
? TE.left(e)
: T.delay(pollIntervalMs)(TE.right(null)),
),
TE.chain(() =>
findEmailUidInInbox(imap, equalsEmail, retries - 1, pollIntervalMs),
),
),
TE.of,
TE.chain(() => (retries === 0 ? TE.left(e) : T.delay(pollIntervalMs)(TE.right(null)))),
TE.chain(() => findEmailUidInInbox(imap, equalsEmail, retries - 1, pollIntervalMs))
),
TE.of
)
);
export type EmailJobDependencies = {
@ -225,40 +187,31 @@ export const perform = (
getSendImpl = getSendTransport,
getImapImpl = getImap,
findEmailUidInInboxImpl = findEmailUidInInbox,
matchesEmailImpl = matchesEmail,
}: Partial<EmailJobDependencies> = {},
matchesEmailImpl = matchesEmail
}: Partial<EmailJobDependencies> = {}
): TE.TaskEither<Error, boolean> =>
pipe(
// arrange.
TE.fromIO(generateEmailImpl(from, to)),
TE.bindTo("email"),
// act.
TE.tap(({ email }) =>
pipe(getSendImpl(from)(email), TE.mapLeft(ToErrorWithLock())),
),
TE.tap(({ email }) => pipe(getSendImpl(from)(email), TE.mapLeft(ToErrorWithLock()))),
TE.bind("imap", () => pipe(getImapImpl(to), TE.mapLeft(ToErrorWithLock()))),
TE.bind("mailboxLock", ({ imap }) =>
TE.tryCatch(() => imap.getMailboxLock("INBOX"), ToErrorWithLock()),
),
TE.bind("mailboxLock", ({ imap }) => TE.tryCatch(() => imap.getMailboxLock("INBOX"), ToErrorWithLock())),
// "assert".
TE.bind("uid", ({ imap, email, mailboxLock }) =>
pipe(
findEmailUidInInboxImpl(
imap,
matchesEmailImpl(email),
retries,
interval,
),
TE.mapLeft(ToErrorWithLock(mailboxLock)),
),
findEmailUidInInboxImpl(imap, matchesEmailImpl(email), retries, interval),
TE.mapLeft(ToErrorWithLock(mailboxLock))
)
),
// cleanup.
TE.bind("deleted", ({ imap, uid, mailboxLock }) =>
TE.tryCatch(
// () => imap.messageDelete([uid], { uid: true }),
() => imap.messageDelete([uid]),
ToErrorWithLock(mailboxLock),
),
ToErrorWithLock(mailboxLock)
)
),
TE.fold(
(e) => {
@ -270,6 +223,6 @@ export const perform = (
({ mailboxLock, deleted }) => {
mailboxLock.release();
return TE.right(deleted);
},
),
}
)
);

View File

@ -38,4 +38,4 @@ export interface EmailJob {
readRetry: Retry;
}
export type Job = EmailJob | PingJob | HealthCheckJob | DnsJob;
export type TestJob = EmailJob | PingJob | HealthCheckJob | DnsJob;

193
src/canary/scheduler.ts Normal file
View File

@ -0,0 +1,193 @@
import * as TE from "fp-ts/lib/TaskEither";
import * as IO from "fp-ts/lib/IO";
import * as RA from "fp-ts/lib/ReadonlyArray";
import * as RM from "fp-ts/lib/ReadonlyMap";
import * as T from "fp-ts/Task";
import * as S from "fp-ts/string";
import { pipe, identity } from "fp-ts/lib/function";
import type { Separated } from "fp-ts/lib/Separated";
import type { Magma } from "fp-ts/lib/Magma";
import { intercalate } from "fp-ts/lib/Foldable";
import { nextSchedule, type Schedule } from "../canary";
import { ConsoleLogger, type Logger } from "../util";
export interface Job {
id: string;
toString: () => string;
execute: <TResult>() => TE.TaskEither<Error, TResult>;
schedule: Schedule;
maxRetries: number;
}
export const logExecutingJobs = (jobs: ReadonlyArray<Job>, now: Date, logger: Logger = ConsoleLogger) => {
const stringifieds = pipe(
jobs,
RA.map(({ toString }) => toString()),
(stringified) => intercalate(S.Monoid, RA.Foldable)("|", stringified)
);
return logger.log(`Executing ${stringifieds} at ${now.toUTCString()}`);
};
export const execute = <TResult>(
jobs: ReadonlyArray<Job>
): T.Task<Separated<ReadonlyArray<[Job, Error]>, ReadonlyArray<[Job, TResult]>>> =>
pipe(
T.of(jobs),
T.map(
RA.map((job) =>
pipe(
job.execute(),
TE.bimap(
(error) => [job, error] as [Job, Error],
(result) => [job, result] as [Job, TResult]
)
)
)
),
T.flatMap(RA.wilt(T.ApplicativePar)(identity))
);
export type JobsTableRow = {
scheduled: Date;
retries: number;
};
export type JobsTable = ReadonlyMap<string, JobsTableRow>;
export const jobsTableRowMagma: Magma<JobsTableRow> = {
concat: (a: JobsTableRow, b: JobsTableRow): JobsTableRow => {
if (a.scheduled <= b.scheduled) {
// always prefer the later schedule.
return b;
}
// if (a.retries <= b.retries) {
// return a;
// }
return b;
}
};
export const formatJobResults = <TResult>(
jobsTable: JobsTable,
{ left, right }: Separated<ReadonlyArray<[Job, Error]>, ReadonlyArray<[Job, TResult]>>
) => {
const failures = left
.map(([job, err]) => ({
job,
retries: jobsTable.get(job.id)?.retries ?? job.maxRetries,
err
}))
.filter(({ job, retries }) => retries >= job.maxRetries);
const retries = left
.map(([job, err]) => ({
job,
retries: jobsTable.get(job.id)?.retries ?? 0,
err
}))
.filter(({ job, retries }) => retries < job.maxRetries);
const failureMessages = failures
.map(({ job, err, retries }) => job.toString() + ` | ${retries} / ${job.maxRetries} | (err) :( | ${err.message}`)
.join("\n");
const retryMessages = retries
.map(
({ job, err, retries }) => job.toString() + ` | ${retries} / ${job.maxRetries} | (retry) :/ | ${err.message}`
)
.join("\n");
const successMessages = right.map(([job, val]) => job.toString() + " | (success) :) | " + val).join("\n");
return `FAILURES:\n${failureMessages}\nRETRIES:\n${retryMessages}\nSUCCESSES:\n${successMessages}`;
};
export const perform = (
jobsTable: JobsTable,
jobs: ReadonlyArray<Job>,
publishers: ReadonlyArray<(message: string) => TE.TaskEither<Error, number>>,
lastPingAck: Date,
publishAckEvery = 24 * (60 * 60 * 1_000),
logger = ConsoleLogger
) =>
pipe(
T.of(jobs),
T.bindTo("jobs"),
T.bind(
"now",
T.fromIOK(() => IO.of(new Date()))
),
T.bind("toExecute", ({ jobs, now }) =>
pipe(
jobs,
RA.filter((job) => (jobsTable.get(job.id)?.scheduled ?? now) <= now),
T.of
)
),
T.tap(({ toExecute, now }) => T.fromIO(logExecutingJobs(toExecute, now))),
T.bind("results", ({ toExecute }) => execute(toExecute)),
T.bind("shouldAck", ({ now }) => T.of(now.getTime() - lastPingAck.getTime() >= publishAckEvery)),
T.tap(({ results, shouldAck }) => {
if (results.left.length === 0 && !shouldAck) {
return pipe(
T.of({ right: [] as ReadonlyArray<number> }),
T.tap(() => T.fromIO(logger.log("not publishing")))
);
}
return pipe(
publishers,
RA.map((publish) => publish(formatJobResults(jobsTable, results))),
RA.wilt(T.ApplicativePar)(identity),
T.tap(({ left }) =>
pipe(
left
? logger.error("Encountered publishing errors: " + left.toString())
: logger.log("Published successfully"),
T.fromIO
)
)
);
}),
T.map(({ now, results: { left, right }, shouldAck }) => {
const result = { lastPingAck, jobsTable };
if (shouldAck) {
result.lastPingAck = now;
}
const leftResults = pipe(
left,
RA.map(([job]): readonly [string, JobsTableRow] => {
const row = jobsTable.get(job.id);
if (!row) {
return [job.id, { retries: 1, scheduled: now }];
}
if (row.retries >= job.maxRetries) {
logger.error(`Hit max retries; scheduling again ;-; ${job.toString()}`);
return [job.id, { retries: 0, scheduled: nextSchedule(job.schedule, now) }] as const;
}
return [
job.id,
{
retries: row.retries + 1,
scheduled: now
}
] as const;
})
);
const results = pipe(
right,
RA.map(([job]): readonly [string, JobsTableRow] => [
job.id,
{
retries: 0,
scheduled: nextSchedule(job.schedule, now)
}
]),
RA.concat(leftResults)
);
result.jobsTable = pipe(
jobsTable,
RM.toReadonlyArray(S.Ord),
RA.concat(results),
RM.fromFoldable(S.Eq, jobsTableRowMagma, RA.Foldable)
);
return result;
})
);

View File

@ -1,13 +1,13 @@
import * as TE from "fp-ts/lib/TaskEither";
import * as O from "fp-ts/lib/Option";
import type { Job } from "./job";
import type { TestJob } from "./job";
import type { D } from "../util";
export enum TestType {
EMAIL = "email",
PING = "ping",
HEALTHCHECK = "healthcheck",
DNS = "dns",
DNS = "dns"
}
export interface Schedule {
@ -18,13 +18,14 @@ export interface Schedule {
export interface Test {
name: string;
type: TestType;
job: Job;
job: TestJob;
schedule: Schedule;
}
export type Testable<T extends Job> = (job: T) => TE.TaskEither<Error, boolean>;
export type Testable<TJob extends TestJob> = (job: TJob) => TE.TaskEither<Error, boolean>;
export const parseTestType = (testType: string): O.Option<TestType> =>
Object.values(TestType).includes(testType as TestType)
? O.some(testType as TestType)
: O.none;
Object.values(TestType).includes(testType as TestType) ? O.some(testType as TestType) : O.none;
export const nextSchedule = (schedule: Schedule, date: Date) =>
new Date(date.getTime() + schedule.every + Math.random() * schedule.jitter);

12
src/publisher/discord.ts Normal file
View File

@ -0,0 +1,12 @@
export interface DiscordPost {
webhook: string;
role_id: string;
}
export const publishDiscord = async (discordPost: DiscordPost, message: string) => {
console.log("Publishing to Discord");
return fetch(discordPost.webhook, {
method: "POST",
body: JSON.stringify({ message: `<@${discordPost.role_id}>\n${message}` })
}).then((r) => r.status);
};

View File

@ -1,23 +1,26 @@
import { D } from "../util";
import { toError } from "fp-ts/lib/Either";
import * as TE from "fp-ts/TaskEither";
import { publishDiscord, type DiscordPost } from "./discord";
import { publishNtfy, type NtfyPost } from "./ntfy";
export enum PublisherType {
DISCORD = "discord",
NTFY = "ntfy",
NTFY = "ntfy"
}
export interface DiscordPost {
webhook: string;
role_id: string;
}
export interface NtfyPost {
webhook: string;
}
export type PublisherPost = DiscordPost | NtfyPost;
export interface Publisher {
type: PublisherType;
at: D.Duration;
post: PublisherPost;
}
export const publish = (publisher: Publisher, message: string): TE.TaskEither<Error, number> => {
switch (publisher.type) {
case PublisherType.DISCORD:
return TE.tryCatch(() => publishDiscord(publisher.post as DiscordPost, message), toError);
case PublisherType.NTFY:
return TE.tryCatch(() => publishNtfy(publisher.post as NtfyPost, message), toError);
default:
return TE.left(new Error("unknown publisher type: " + publisher.type));
}
};

11
src/publisher/ntfy.ts Normal file
View File

@ -0,0 +1,11 @@
export interface NtfyPost {
webhook: string;
}
export const publishNtfy = async (ntfyPost: NtfyPost, message: string) => {
console.log("publishing to Ntfy");
return fetch(ntfyPost.webhook, {
method: "POST",
body: message
}).then((r) => r.status);
};

View File

@ -10,7 +10,7 @@ export enum DurationUnit {
MILLISECOND,
SECOND,
MINUTE,
HOUR,
HOUR
}
const durationUnitMap: Record<string, DurationUnit> = {
ms: DurationUnit.MILLISECOND,
@ -21,17 +21,14 @@ const durationUnitMap: Record<string, DurationUnit> = {
minutes: DurationUnit.MINUTE,
hr: DurationUnit.HOUR,
hour: DurationUnit.HOUR,
hours: DurationUnit.HOUR,
hours: DurationUnit.HOUR
};
const getDurationUnit = (key: string): O.Option<DurationUnit> =>
O.fromNullable(durationUnitMap[key.toLowerCase()]);
const getDurationUnit = (key: string): O.Option<DurationUnit> => O.fromNullable(durationUnitMap[key.toLowerCase()]);
export const getMs = (duration: Duration): number => duration;
export const getSeconds = (duration: Duration): number => duration / 1000;
export const getMinutes = (duration: Duration): number =>
getSeconds(duration) / 60;
export const getHours = (duration: Duration): number =>
getMinutes(duration) / 60;
export const getMinutes = (duration: Duration): number => getSeconds(duration) / 60;
export const getHours = (duration: Duration): number => getMinutes(duration) / 60;
export const format = (duration: Duration): string => {
const ms = getMs(duration) % 1000;
const seconds = getSeconds(duration) % 60;
@ -39,9 +36,7 @@ export const format = (duration: Duration): string => {
const hours = getHours(duration);
return (
[hours, minutes, seconds]
.map((x) => Math.floor(x).toString().padStart(2, "0"))
.join(":") +
[hours, minutes, seconds].map((x) => Math.floor(x).toString().padStart(2, "0")).join(":") +
"." +
ms.toString().padStart(3, "0")
);
@ -57,49 +52,40 @@ export const createDurationBuilder = (): DurationBuilder => ({
millis: 0,
seconds: 0,
minutes: 0,
hours: 0,
hours: 0
});
export type DurationBuilderField<T> = (
arg: T,
) => (builder: DurationBuilder) => DurationBuilder;
export type DurationBuilderField<T> = (arg: T) => (builder: DurationBuilder) => DurationBuilder;
export const withMillis: DurationBuilderField<number> =
(millis) => (builder) => ({
export const withMillis: DurationBuilderField<number> = (millis) => (builder) => ({
...builder,
millis,
});
millis
});
export const withSeconds: DurationBuilderField<number> =
(seconds) => (builder) => ({
export const withSeconds: DurationBuilderField<number> = (seconds) => (builder) => ({
...builder,
seconds,
});
seconds
});
export const withMinutes: DurationBuilderField<number> =
(minutes) => (builder) => ({
export const withMinutes: DurationBuilderField<number> = (minutes) => (builder) => ({
...builder,
minutes,
});
minutes
});
export const withHours: DurationBuilderField<number> =
(hours) => (builder) => ({
export const withHours: DurationBuilderField<number> = (hours) => (builder) => ({
...builder,
hours,
});
hours
});
export const build = (builder: DurationBuilder): Duration =>
builder.millis +
builder.seconds * 1000 +
builder.minutes * 60 * 1000 +
builder.hours * 60 * 60 * 1000;
builder.millis + builder.seconds * 1000 + builder.minutes * 60 * 1000 + builder.hours * 60 * 60 * 1000;
export const parse = (duration: string): E.Either<string, Duration> => {
const parts = pipe(
duration,
S.split(" "),
R.map(S.trim),
R.filter((part) => !S.isEmpty(part)),
R.filter((part) => !S.isEmpty(part))
);
const valueUnitPairs = pipe(
@ -120,17 +106,15 @@ export const parse = (duration: string): E.Either<string, Duration> => {
E.map(
flow(
R.filter(O.isSome),
R.map(({ value }) => value),
),
),
R.map(({ value }) => value)
)
)
);
return pipe(
valueUnitPairs,
E.flatMap(
R.reduce(
E.of<string, DurationBuilder>(createDurationBuilder()),
(builderEither, [unit, value]) =>
R.reduce(E.of<string, DurationBuilder>(createDurationBuilder()), (builderEither, [unit, value]) =>
pipe(
builderEither,
E.chain((builder) => {
@ -146,10 +130,10 @@ export const parse = (duration: string): E.Either<string, Duration> => {
default:
return E.left(`unknown unit: ${unit}`);
}
}),
})
)
)
),
),
),
E.map(build),
E.map(build)
);
};

View File

@ -2,8 +2,10 @@ import type { IO } from "fp-ts/lib/IO";
export interface Logger {
log: (message: string) => IO<void>;
error: (message: string) => IO<void>;
}
export const ConsoleLogger: Logger = {
log: (message: string) => () => console.log(message),
log: (message: string) => () => console.log(new Date(), "[INFO]", message),
error: (message: string) => () => console.error(new Date(), "[ERROR]", message)
};

View File

@ -1,93 +0,0 @@
import * as TE from "fp-ts/lib/TaskEither";
import * as IO from "fp-ts/lib/IO";
import * as RA from "fp-ts/lib/ReadonlyArray";
import * as RM from "fp-ts/lib/ReadonlyMap";
import * as T from "fp-ts/Task";
import * as S from "fp-ts/string";
import { contramap, type Eq } from "fp-ts/Eq";
import { pipe, identity } from "fp-ts/lib/function";
import type { Schedule } from "../canary";
import { ConsoleLogger, type Logger } from "./logger";
import type { Separated } from "fp-ts/lib/Separated";
import type { Magma } from "fp-ts/lib/Magma";
import { intercalate } from "fp-ts/lib/Foldable";
interface Unit {}
const Unit: Unit = {};
interface ScheduledJob {
id: string;
execute: <TResult>() => TE.TaskEither<Error, TResult>;
at: Date;
schedule: Schedule;
}
type SchedulerState = ReadonlyArray<ScheduledJob>;
const logState = (
state: SchedulerState,
now: Date,
logger: Logger = ConsoleLogger,
) => {
const ids = pipe(
state,
RA.map(({ id }) => id),
(ids) => intercalate(S.Monoid, RA.Foldable)("|", ids),
);
return logger.log(`Executing ${ids} at ${now.toUTCString()}`);
};
const executeDueJobs = <TResult>(
state: SchedulerState,
): IO.IO<
T.Task<
Separated<ReadonlyArray<[string, Error]>, ReadonlyArray<[string, TResult]>>
>
> =>
pipe(
IO.of(state),
IO.bindTo("state"),
IO.bind("now", () => () => new Date()),
IO.bind("toExecute", ({ now, state }) =>
pipe(IO.of(state), IO.map(RA.filter(({ at }) => at <= now))),
),
IO.flatMap(({ state, now }) =>
pipe(
IO.of(state),
IO.tap((state) => logState(state, now)),
IO.map((state) =>
pipe(
state,
RA.map(({ execute, id }) =>
pipe(
execute(),
TE.bimap(
(error) => [id, error] as [string, Error],
(result) => [id, result] as [string, TResult],
),
),
),
),
),
),
),
IO.map((jobs) => pipe(jobs, RA.wilt(T.ApplicativePar)(identity))),
);
const jobEq: Eq<ScheduledJob> = pipe(
S.Eq,
contramap(({ id }) => id),
);
const jobMagma: Magma<ScheduledJob> = {
concat: (a: ScheduledJob, b: ScheduledJob): ScheduledJob => ({
...a,
...b,
}),
};
export const schedulerLoop = (jobs: ReadonlyArray<ScheduledJob>) => {
const jobsTable: Map<
string,
{ retries: number; specification: ScheduledJob }
> = pipe(jobs, RM.fromFoldable(jobEq, jobMagma, RA.Foldable));
};

View File

@ -0,0 +1,197 @@
import { mock, test, expect } from "bun:test";
import * as TE from "fp-ts/lib/TaskEither";
import type { Logger } from "../../src/util";
import {
perform,
type JobsTable,
execute,
formatJobResults,
logExecutingJobs,
type Job
} from "../../src/canary/scheduler";
const getMocks = () => {
const mockLogger: Logger = {
log: mock(),
error: mock()
};
return { mockLogger };
};
const schedule = {
every: 200,
jitter: 100
};
test("logging", () => {
const { mockLogger } = getMocks();
const jobs: Job[] = [
{ id: "1", toString: () => "Job 1", execute: mock(), schedule, maxRetries: 3 },
{ id: "2", toString: () => "Job 2", execute: mock(), schedule, maxRetries: 3 }
];
const now = new Date("2023-01-01T00:00:00Z");
logExecutingJobs(jobs, now, mockLogger);
expect(mockLogger.log).toHaveBeenCalledWith("Executing Job 1|Job 2 at Sun, 01 Jan 2023 00:00:00 GMT");
});
test("should separate jobs into successful and failed executions", async () => {
const job1: Job = {
id: "1",
toString: () => "Job 1",
execute: mock(() => TE.right("Result 1") as any),
schedule,
maxRetries: 3
};
const job2: Job = {
id: "2",
toString: () => "Job 2",
execute: mock(() => TE.left(new Error("Failure 2")) as any),
schedule,
maxRetries: 3
};
const jobs: Job[] = [job1, job2];
const result = await execute(jobs)();
expect(result.left).toEqual([[job2, new Error("Failure 2")]]);
expect(result.right).toEqual([[job1, "Result 1"]]);
});
test("should format job results correctly", () => {
const jobsTable: JobsTable = new Map([["1", { scheduled: new Date("2023-01-01T00:00:00Z"), retries: 1 }]]);
const left = [
[{ id: "1", toString: () => "Job 1", execute: mock(), schedule: {}, maxRetries: 3 }, new Error("Error 1")]
];
const right = [[{ id: "2", toString: () => "Job 2", execute: mock(), schedule: {}, maxRetries: 3 }, "Success 2"]];
const result = formatJobResults(jobsTable, { left, right } as any);
expect(result).toContain("Job 1 | 1 / 3 | (retry) :/ | Error 1");
expect(result).toContain("Job 2 | (success) :) | Success 2");
});
test("should update jobsTable and lastPingAck correctly", async () => {
const jobsTable: JobsTable = new Map([["1", { scheduled: new Date("2023-01-01T00:00:00Z"), retries: 1 }]]);
const jobs: Job[] = [
{
id: "1",
toString: () => "Job 1",
execute: mock(() => TE.right("Result 1") as any),
schedule,
maxRetries: 3
}
];
const publishers: any = [];
const lastPingAck = new Date("2023-01-01T00:00:00Z");
const result = await perform(jobsTable, jobs, publishers, lastPingAck)();
expect(result.lastPingAck).not.toEqual(lastPingAck);
expect(result.jobsTable.get("1")).toEqual({
retries: 0,
scheduled: expect.any(Date)
});
});
test("should update a job with retry count on failure", async () => {
// Create a mock job that fails the first time but succeeds the second time
const job1: Job = {
id: "1",
toString: () => "Job 1",
execute: mock(() => TE.left(new Error("Error 1")) as any),
schedule,
maxRetries: 2
};
const jobsTable: JobsTable = new Map([["1", { scheduled: new Date("2023-01-01T00:00:00Z"), retries: 0 }]]);
const jobs: Job[] = [job1];
const publishers: any = [];
const lastPingAck = new Date("2023-01-01T00:00:00Z");
const result = await perform(jobsTable, jobs, publishers, lastPingAck, 24 * 60 * 60 * 1000)();
// Assert the job was retried once and then succeeded
expect(job1.execute).toHaveBeenCalled();
// Check the jobsTable for the updated state
expect(result.jobsTable.get("1")).toEqual({
retries: 1,
scheduled: expect.any(Date)
});
});
test("should reschedule a job that hits max retries", async () => {
// Create a mock job that fails the first time but succeeds the second time
const job1: Job = {
id: "1",
toString: () => "Job 1",
execute: mock().mockReturnValue(TE.left(new Error("Error 1"))),
schedule,
maxRetries: 4
};
const jobsTable: JobsTable = new Map([["1", { scheduled: new Date("2023-01-01T00:00:00Z"), retries: 4 }]]);
const jobs: Job[] = [job1];
const publishers: any = [mock().mockReturnValue(TE.right(200))];
const lastPingAck = new Date("2023-01-01T00:00:00Z");
const now = Date.now();
const result = await perform(jobsTable, jobs, publishers, lastPingAck, 24 * 60 * 60 * 1000)();
const delta = Date.now() - now;
// Assert the job was retried once and then fail
expect(job1.execute).toHaveBeenCalled();
// Check the jobsTable for the updated state
const { retries, scheduled } = result.jobsTable.get("1")!;
expect(retries).toEqual(0);
expect(publishers[0]).toHaveBeenCalled();
expect(scheduled.getTime()).toBeGreaterThan(now - delta + schedule.every);
expect(scheduled.getTime()).toBeLessThan(now + delta + schedule.every + schedule.jitter);
});
test("should not publish only successes when should not ack", async () => {
// Create a mock job that fails the first time but succeeds the second time
const job1: Job = {
id: "1",
toString: () => "Job 1",
execute: mock().mockReturnValue(TE.right(new Error("Error 1"))),
schedule,
maxRetries: 4
};
const jobsTable: JobsTable = new Map([["1", { scheduled: new Date("2023-01-01T00:00:00Z"), retries: 0 }]]);
const jobs: Job[] = [job1];
const publishers: any = [mock().mockReturnValue(TE.right(200))];
const lastPingAck = new Date();
await perform(jobsTable, jobs, publishers, lastPingAck, 24 * 60 * 60 * 1000)();
expect(job1.execute).toHaveBeenCalled();
expect(publishers[0]).toHaveBeenCalledTimes(0);
});
test("should publish when should ack", async () => {
// Create a mock job that fails the first time but succeeds the second time
const job1: Job = {
id: "1",
toString: () => "Job 1",
execute: mock().mockReturnValue(TE.right(new Error("Error 1"))),
schedule,
maxRetries: 4
};
const jobsTable: JobsTable = new Map([["1", { scheduled: new Date("2023-01-01T00:00:00Z"), retries: 0 }]]);
const jobs: Job[] = [job1];
const publishers: any = [mock().mockReturnValue(TE.right(200))];
const lastPingAck = new Date("2023-01-01T00:00:00Z");
await perform(jobsTable, jobs, publishers, lastPingAck, 24 * 60 * 60 * 1000)();
expect(job1.execute).toHaveBeenCalled();
expect(publishers[0]).toHaveBeenCalled();
});