Compare commits

..

No commits in common. "main" and "scheduler" have entirely different histories.

14 changed files with 293 additions and 672 deletions

View File

@ -1,18 +0,0 @@
{
"semi": true,
"singleQuote": false,
"trailingComma": "none",
"useTabs": false,
"printWidth": 120,
"endOfLine": "lf",
"overrides": [
{
"files": ["**/*.scss"],
"options": {
"useTabs": false,
"tabWidth": 2,
"singleQuote": true
}
}
]
}

View File

@ -3,14 +3,22 @@ import * as TE from "fp-ts/lib/TaskEither";
import * as O from "fp-ts/lib/Option";
import { createTransport } from "nodemailer";
import { toError } from "fp-ts/lib/Either";
import { flow, pipe } from "fp-ts/lib/function";
import { ImapFlow, type FetchMessageObject, type FetchQueryObject, type MailboxLockObject } from "imapflow";
import { pipe } from "fp-ts/lib/function";
import {
ImapFlow,
type FetchMessageObject,
type FetchQueryObject,
type MailboxLockObject,
} from "imapflow";
import * as IO from "fp-ts/lib/IO";
import * as T from "fp-ts/lib/Task";
import { ConsoleLogger, type Logger } from "../util";
import { ConsoleLogger } from "../util";
interface ImapClientI {
fetchAll: (range: string, options: FetchQueryObject) => Promise<FetchMessageObject[]>;
fetchAll: (
range: string,
options: FetchQueryObject,
) => Promise<FetchMessageObject[]>;
connect: () => Promise<void>;
getMailboxLock: (mailbox: string) => Promise<MailboxLockObject>;
messageDelete: (uids: number[]) => Promise<boolean>;
@ -32,7 +40,10 @@ class ErrorWithLock extends Error {
}
}
const ToErrorWithLock = (lock?: MailboxLockObject) => (error: unknown) =>
new ErrorWithLock(error instanceof Error ? error.message : "Unknown error", lock);
new ErrorWithLock(
error instanceof Error ? error.message : "Unknown error",
lock,
);
/**
* Generate a unique email.
@ -40,31 +51,42 @@ const ToErrorWithLock = (lock?: MailboxLockObject) => (error: unknown) =>
* @param to is the email to send to.
* @returns an {@link Email}.
*/
type EmailGenerator = (from: EmailFromInstruction, to: EmailToInstruction) => IO.IO<Email>;
const generateEmail: EmailGenerator = (from: EmailFromInstruction, to: EmailToInstruction) => () => ({
from: from.email,
to: to.email,
subject: [new Date().toISOString(), crypto.randomUUID()].join(" | "),
text: crypto.randomUUID()
});
type EmailGenerator = (
from: EmailFromInstruction,
to: EmailToInstruction,
) => IO.IO<Email>;
const generateEmail: EmailGenerator =
(from: EmailFromInstruction, to: EmailToInstruction) => () => ({
from: from.email,
to: to.email,
subject: [new Date().toISOString(), crypto.randomUUID()].join(" | "),
text: crypto.randomUUID(),
});
/**
* Get the transport layer for a mailbox to send a piece of mail.
* @param param0 is the mailbox to send from.
* @returns a function that takes an email and sends it.
*/
type GetSendEmail = (from: EmailFromInstruction) => (email: Email) => TE.TaskEither<Error, Email>;
const getSendTransport: GetSendEmail = ({ username, password, server, send_port }) => {
type GetSendEmail = (
from: EmailFromInstruction,
) => (email: Email) => TE.TaskEither<Error, Email>;
const getSendTransport: GetSendEmail = ({
username,
password,
server,
send_port,
}) => {
const transport = createTransport({
host: server,
port: send_port,
auth: {
user: username,
pass: password
pass: password,
},
tls: {
rejectUnauthorized: false
}
rejectUnauthorized: false,
},
});
return (email: Email) =>
TE.tryCatch(
@ -76,9 +98,9 @@ const getSendTransport: GetSendEmail = ({ username, password, server, send_port
} else {
resolve(email);
}
})
}),
),
toError
toError,
);
};
@ -87,7 +109,9 @@ const getSendTransport: GetSendEmail = ({ username, password, server, send_port
* @param param0 is the mailbox to read from.
* @returns a Right({@link ImapFlow}) if it connected, else an Left(error).
*/
type GetImapClient = (to: EmailToInstruction) => TE.TaskEither<Error, ImapClientI>;
type GetImapClient = (
to: EmailToInstruction,
) => TE.TaskEither<Error, ImapClientI>;
const getImap: GetImapClient = ({ username, password, server, read_port }) => {
const imap = new ImapFlow({
logger: false,
@ -96,8 +120,8 @@ const getImap: GetImapClient = ({ username, password, server, read_port }) => {
secure: true,
auth: {
user: username,
pass: password
}
pass: password,
},
});
return TE.tryCatch(() => imap.connect().then(() => imap), toError);
};
@ -106,16 +130,18 @@ const getImap: GetImapClient = ({ username, password, server, read_port }) => {
* @param imap is the Imap client to fetch messages from.
* @returns a Right({@link FetchMessageObject}[]) if successful, else a Left(error).
*/
const fetchMessages = (imap: ImapClientI): TE.TaskEither<Error, FetchMessageObject[]> =>
const fetchMessages = (
imap: ImapClientI,
): TE.TaskEither<Error, FetchMessageObject[]> =>
TE.tryCatch(
() =>
imap.fetchAll("*", {
uid: true,
envelope: true,
headers: true,
bodyParts: ["text"]
bodyParts: ["text"],
}),
toError
toError,
);
/**
@ -126,7 +152,8 @@ const fetchMessages = (imap: ImapClientI): TE.TaskEither<Error, FetchMessageObje
type EmailMatcher = (email: Email) => (message: FetchMessageObject) => boolean;
const matchesEmail: EmailMatcher = (email) => (message) => {
const subjectMatches = email.subject === message.envelope.subject;
const bodyMatches = message.bodyParts.get("text")?.toString().trim() === email.text.trim();
const bodyMatches =
message.bodyParts.get("text")?.toString().trim() === email.text.trim();
const headers = message.headers.toLocaleString();
const fromMatches = headers.includes(`Return-Path: <${email.from}>`);
const toMatches = headers.includes(`Delivered-To: ${email.to}`);
@ -146,9 +173,13 @@ type FindEmailUidInInbox = (
equalsEmail: (message: FetchMessageObject) => boolean,
retries: number,
pollIntervalMs: number,
logger?: Logger
) => TE.TaskEither<Error, number>;
const findEmailUidInInbox: FindEmailUidInInbox = (imap, equalsEmail, retries, pollIntervalMs, logger = ConsoleLogger) =>
const findEmailUidInInbox: FindEmailUidInInbox = (
imap,
equalsEmail,
retries,
pollIntervalMs,
) =>
pipe(
fetchMessages(imap),
TE.flatMap((messages) => {
@ -161,17 +192,18 @@ const findEmailUidInInbox: FindEmailUidInInbox = (imap, equalsEmail, retries, po
TE.fold(
(e) =>
pipe(
TE.fromIO(logger.log(`email failed; ${retries} retries left.`)),
TE.chain(() => (retries === 0 ? TE.left(e) : T.delay(pollIntervalMs)(TE.right(null)))),
TE.chain(() => findEmailUidInInbox(imap, equalsEmail, retries - 1, pollIntervalMs))
TE.fromIO(ConsoleLogger.log(`failed; ${retries} retries left.`)),
TE.chain(() =>
retries === 0
? TE.left(e)
: T.delay(pollIntervalMs)(TE.right(null)),
),
TE.chain(() =>
findEmailUidInInbox(imap, equalsEmail, retries - 1, pollIntervalMs),
),
),
(s) =>
pipe(
s,
TE.of,
TE.tap(() => TE.fromIO(logger.log("Email succeeded")))
)
)
TE.of,
),
);
export type EmailJobDependencies = {
@ -193,27 +225,40 @@ export const perform = (
getSendImpl = getSendTransport,
getImapImpl = getImap,
findEmailUidInInboxImpl = findEmailUidInInbox,
matchesEmailImpl = matchesEmail
}: Partial<EmailJobDependencies> = {}
matchesEmailImpl = matchesEmail,
}: Partial<EmailJobDependencies> = {},
): TE.TaskEither<Error, boolean> =>
pipe(
// arrange.
TE.fromIO(generateEmailImpl(from, to)),
TE.bindTo("email"),
// act.
TE.tap(({ email }) => pipe(getSendImpl(from)(email), TE.mapLeft(ToErrorWithLock()))),
TE.tap(({ email }) =>
pipe(getSendImpl(from)(email), TE.mapLeft(ToErrorWithLock())),
),
TE.bind("imap", () => pipe(getImapImpl(to), TE.mapLeft(ToErrorWithLock()))),
TE.bind("mailboxLock", ({ imap }) => TE.tryCatch(() => imap.getMailboxLock("INBOX"), ToErrorWithLock())),
TE.bind("mailboxLock", ({ imap }) =>
TE.tryCatch(() => imap.getMailboxLock("INBOX"), ToErrorWithLock()),
),
// "assert".
TE.bind("uid", ({ imap, email, mailboxLock }) =>
pipe(
findEmailUidInInboxImpl(imap, matchesEmailImpl(email), retries, interval),
TE.mapLeft(ToErrorWithLock(mailboxLock))
)
findEmailUidInInboxImpl(
imap,
matchesEmailImpl(email),
retries,
interval,
),
TE.mapLeft(ToErrorWithLock(mailboxLock)),
),
),
// cleanup.
TE.bind("deleted", ({ imap, uid, mailboxLock }) =>
TE.tryCatch(() => imap.messageDelete([uid]), ToErrorWithLock(mailboxLock))
TE.tryCatch(
// () => imap.messageDelete([uid], { uid: true }),
() => imap.messageDelete([uid]),
ToErrorWithLock(mailboxLock),
),
),
TE.fold(
(e) => {
@ -225,6 +270,6 @@ export const perform = (
({ mailboxLock, deleted }) => {
mailboxLock.release();
return TE.right(deleted);
}
)
},
),
);

View File

@ -38,4 +38,4 @@ export interface EmailJob {
readRetry: Retry;
}
export type TestJob = EmailJob | PingJob | HealthCheckJob | DnsJob;
export type Job = EmailJob | PingJob | HealthCheckJob | DnsJob;

View File

@ -1,197 +0,0 @@
import * as TE from "fp-ts/lib/TaskEither";
import * as IO from "fp-ts/lib/IO";
import * as RA from "fp-ts/lib/ReadonlyArray";
import * as RM from "fp-ts/lib/ReadonlyMap";
import * as T from "fp-ts/Task";
import * as S from "fp-ts/string";
import { pipe, identity } from "fp-ts/lib/function";
import type { Separated } from "fp-ts/lib/Separated";
import type { Magma } from "fp-ts/lib/Magma";
import { intercalate } from "fp-ts/lib/Foldable";
import { nextSchedule, type Schedule } from "../canary";
import { ConsoleLogger, type Logger } from "../util";
export interface Job {
id: string;
toString: () => string;
execute: () => TE.TaskEither<Error, boolean>;
schedule: Schedule;
maxRetries: number;
}
export const logExecutingJobs = (jobs: ReadonlyArray<Job>, now: Date, logger: Logger = ConsoleLogger) => {
const stringifieds = pipe(
jobs,
RA.map(({ toString }) => toString()),
(stringified) => intercalate(S.Monoid, RA.Foldable)("|", stringified)
);
return logger.log(`Executing ${stringifieds} at ${now.toUTCString()}`);
};
export const execute = <TResult>(
jobs: ReadonlyArray<Job>
): T.Task<Separated<ReadonlyArray<[Job, Error]>, ReadonlyArray<[Job, TResult]>>> =>
pipe(
T.of(jobs),
T.map(
RA.map((job) =>
pipe(
job.execute(),
TE.bimap(
(error) => [job, error] as [Job, Error],
(result) => [job, result] as [Job, TResult]
)
)
)
),
T.flatMap(RA.wilt(T.ApplicativePar)(identity))
);
export type JobsTableRow = {
scheduled: Date;
retries: number;
};
export type JobsTable = ReadonlyMap<string, JobsTableRow>;
export const constructJobsTable = (jobs: ReadonlyArray<Job>, now: Date): JobsTable => {
return pipe(
jobs,
RA.map((job) => [job.id, { retries: 0, scheduled: now }] as readonly [string, JobsTableRow]),
RM.fromFoldable(S.Eq, jobsTableRowMagma, RA.Foldable)
);
};
export const jobsTableRowMagma: Magma<JobsTableRow> = {
concat: (a: JobsTableRow, b: JobsTableRow): JobsTableRow => {
if (a.scheduled <= b.scheduled) {
// always prefer the later schedule.
return b;
}
// if (a.retries <= b.retries) {
// return a;
// }
return b;
}
};
export const formatJobResults = <TResult>(
jobsTable: JobsTable,
{ left, right }: Separated<ReadonlyArray<[Job, Error]>, ReadonlyArray<[Job, TResult]>>
) => {
const failures = left
.map(([job, err]) => ({
job,
retries: jobsTable.get(job.id)?.retries ?? job.maxRetries,
err
}))
.filter(({ job, retries }) => retries >= job.maxRetries);
const retries = left
.map(([job, err]) => ({
job,
retries: jobsTable.get(job.id)?.retries ?? 0,
err
}))
.filter(({ job, retries }) => retries < job.maxRetries);
const failureMessages = failures
.map(({ job, err, retries }) => job.toString() + ` | ${retries} / ${job.maxRetries} | (err) :( | ${err.message}`)
.join("\n");
const retryMessages = retries
.map(
({ job, err, retries }) => job.toString() + ` | ${retries} / ${job.maxRetries} | (retry) :/ | ${err.message}`
)
.join("\n");
const successMessages = right.map(([job, val]) => job.toString() + " | (success) :) | " + val).join("\n");
return `FAILURES:\n${failureMessages}\n\nRETRIES:\n${retryMessages}\n\nSUCCESSES:\n${successMessages}`;
};
export const perform = (
jobsTable: JobsTable,
jobs: ReadonlyArray<Job>,
publishers: ReadonlyArray<(message: string) => TE.TaskEither<Error, number>>,
lastPingAck: Date,
publishAckEvery = 24 * (60 * 60 * 1_000),
logger = ConsoleLogger
) =>
pipe(
T.of(jobs),
T.bindTo("jobs"),
T.bind(
"now",
T.fromIOK(() => IO.of(new Date()))
),
T.bind("toExecute", ({ jobs, now }) =>
pipe(
jobs,
RA.filter((job) => (jobsTable.get(job.id)?.scheduled ?? now) <= now),
T.of
)
),
T.tap(({ toExecute, now }) => T.fromIO(logExecutingJobs(toExecute, now))),
T.bind("results", ({ toExecute }) => execute(toExecute)),
T.bind("shouldAck", ({ now }) => T.of(now.getTime() - lastPingAck.getTime() >= publishAckEvery)),
T.tap(({ results, shouldAck }) => {
if (results.left.length === 0 && !shouldAck) {
return pipe(
T.of({ right: [] as ReadonlyArray<number> }),
T.tap(() => T.fromIO(logger.log("not publishing")))
);
}
return pipe(
publishers,
RA.map((publish) => publish(formatJobResults(jobsTable, results))),
RA.wilt(T.ApplicativePar)(identity),
T.tap(({ left }) =>
pipe(
left.length
? logger.error("Encountered publishing errors: " + left.toString())
: logger.log("Published successfully"),
T.fromIO
)
)
);
}),
T.map(({ now, results: { left, right }, shouldAck }) => {
const ack = shouldAck ? now : lastPingAck;
const leftResults = pipe(
left,
RA.map(([job]): readonly [string, JobsTableRow] => {
const row = jobsTable.get(job.id);
if (!row) {
return [job.id, { retries: 1, scheduled: now }];
}
if (row.retries >= job.maxRetries) {
logger.error(`Hit max retries for; scheduling again ;-; ${job.toString()}`);
return [job.id, { retries: 0, scheduled: nextSchedule(job.schedule, now) }] as const;
}
return [
job.id,
{
retries: row.retries + 1,
scheduled: now
}
] as const;
})
);
const results = pipe(
right,
RA.map(([job]): readonly [string, JobsTableRow] => [
job.id,
{
retries: 0,
scheduled: nextSchedule(job.schedule, now)
}
]),
RA.concat(leftResults)
);
const newJobsTable = pipe(
jobsTable,
RM.toReadonlyArray(S.Ord),
RA.concat(results),
RM.fromFoldable(S.Eq, jobsTableRowMagma, RA.Foldable)
);
return { lastPingAck: ack, jobsTable: newJobsTable };
})
);

View File

@ -1,14 +1,13 @@
import * as TE from "fp-ts/lib/TaskEither";
import * as O from "fp-ts/lib/Option";
import type { EmailJob, TestJob } from "./job";
import type { Job } from "./job";
import type { D } from "../util";
import { perform } from "./email";
export enum TestType {
EMAIL = "email",
PING = "ping",
HEALTHCHECK = "healthcheck",
DNS = "dns"
DNS = "dns",
}
export interface Schedule {
@ -19,20 +18,13 @@ export interface Schedule {
export interface Test {
name: string;
type: TestType;
job: TestJob;
job: Job;
schedule: Schedule;
}
export type Testable<T extends Job> = (job: T) => TE.TaskEither<Error, boolean>;
export const parseTestType = (testType: string): O.Option<TestType> =>
Object.values(TestType).includes(testType as TestType) ? O.some(testType as TestType) : O.none;
export const nextSchedule = (schedule: Schedule, date: Date) =>
new Date(date.getTime() + schedule.every + Math.random() * schedule.jitter);
export const executorFor = (type: TestType, job: TestJob): (() => TE.TaskEither<Error, boolean>) => {
switch (type) {
case TestType.EMAIL:
return () => perform(job as EmailJob);
}
return () => TE.right(true);
};
Object.values(TestType).includes(testType as TestType)
? O.some(testType as TestType)
: O.none;

View File

@ -1,9 +1,7 @@
import * as E from "fp-ts/Either";
import * as IO from "fp-ts/IO";
import type { Publisher } from "./publisher";
import { readFileSync } from "fs";
import { TestType, type Test, type TestJob } from "./canary";
import * as D from "./util/duration";
import { pipe } from "fp-ts/lib/function";
import type { Test } from "./canary";
export interface Config {
result_publishers: Publisher[];
@ -12,28 +10,9 @@ export interface Config {
tests: Test[];
}
export const transformDurations = (obj: any): E.Either<string, any> => {
const transform = (o: any): E.Either<string, any> => {
const entries = Object.entries(o);
for (let [key, value] of entries) {
if (key === "duration" && typeof value === "string") {
return D.parse(value);
} else if (typeof value === "object" && value !== null) {
const result = transform(value);
if (E.isLeft(result)) {
return result;
} else {
o[key] = result.right;
}
}
}
return E.right(o);
export const readConfig =
(filePath: string): IO.IO<Config> =>
() => {
const confStr = readFileSync(filePath, "utf-8");
return JSON.parse(confStr);
};
return transform(obj);
};
export const readConfig = (filePath: string): E.Either<string, Config> =>
pipe(readFileSync(filePath, "utf-8"), JSON.parse, transformDurations) as E.Either<string, Config>;

View File

@ -1,51 +1,16 @@
import * as TE from "fp-ts/lib/TaskEither";
import * as O from "fp-ts/lib/Option";
import * as RA from "fp-ts/ReadonlyArray";
import { pipe } from "fp-ts/lib/function";
import { toError } from "fp-ts/lib/Either";
import { readConfig } from "./config";
import { parseArgs, type Args } from "./args";
import { executorFor, type Test, type TestType } from "./canary";
import { constructJobsTable, perform, type Job } from "./canary/scheduler";
import { publish, PublisherType } from "./publisher";
import { ConsoleLogger } from "./util";
import { parseArgs } from "./args";
const testFilters = (args: Args): ReadonlyArray<(test: Test) => boolean> =>
(
[
[args.testType, (testType: TestType) => (test: Test) => test.type === testType],
[args.testName, (testName: string) => (test: Test) => testName === test.name]
] as ReadonlyArray<[O.Option<any>, (t: any) => (test: Test) => boolean]>
)
.map(([o, pred]) =>
pipe(
o,
O.map((val) => pred(val))
)
)
.filter(O.isSome)
.map(({ value }) => value);
const main: TE.TaskEither<string | Error, any> = pipe(
const main: TE.TaskEither<Error, void> = pipe(
TE.fromEither(parseArgs(Bun.argv)),
TE.bindTo("args"),
TE.bind("now", () => TE.fromIO(() => new Date())),
TE.bind("config", ({ args }) => TE.fromEither(readConfig(args.testsFile))),
TE.flatMap(({ config, args, now }) => {
const filters = testFilters(args);
const jobs = config.tests
.filter((test) => filters.every((filter) => filter(test)))
.map((test, i): Job => {
const toString = () => test.name;
const id = i.toString();
const schedule = test.schedule;
const maxRetries = 3;
const execute = executorFor(test.type, test.job);
return { toString, id, schedule, maxRetries, execute };
});
const jobsTable = constructJobsTable(jobs, now);
const publishers = pipe([{ publisherType: PublisherType.CONSOLE, post: ConsoleLogger }] as const, RA.map(publish));
return pipe(perform(jobsTable, jobs, publishers, new Date()), TE.fromTask);
})
TE.bind("config", ({ args }) => TE.fromIO(readConfig(args.testsFile))),
TE.fold(
(e) => TE.left(toError(e)),
() => TE.right(undefined),
),
);
main().then(() => process.exit());
main();

View File

@ -1,19 +0,0 @@
import { $ } from "bun";
export interface DiscordPost {
webhook: string;
role_id: string;
}
export const publishDiscord = async (discordPost: DiscordPost, message: string) => {
console.log("Publishing to Discord");
const ip = await $`dig +noall +short discord.com @1.1.1.1 A | shuf -n 1`.text();
return fetch(discordPost.webhook.replace("discord.com", ip), {
headers: {
Host: "discord.com"
},
method: "POST",
body: JSON.stringify({ message: `<@${discordPost.role_id}>\n${message}` }),
tls: { rejectUnauthorized: false }
}).then((r) => r.status);
};

View File

@ -1,36 +1,23 @@
import { toError } from "fp-ts/lib/Either";
import * as TE from "fp-ts/TaskEither";
import { publishDiscord, type DiscordPost } from "./discord";
import { publishNtfy, type NtfyPost } from "./ntfy";
import type { Logger } from "../util";
import { pipe } from "fp-ts/lib/function";
import { D } from "../util";
export enum PublisherType {
DISCORD = "discord",
NTFY = "ntfy",
CONSOLE = "console"
}
export type PublisherPost = DiscordPost | NtfyPost | Logger;
export interface DiscordPost {
webhook: string;
role_id: string;
}
export interface NtfyPost {
webhook: string;
}
export type PublisherPost = DiscordPost | NtfyPost;
export interface Publisher {
publisherType: PublisherType;
type: PublisherType;
at: D.Duration;
post: PublisherPost;
}
export const publish = (publisher: Publisher): ((message: string) => TE.TaskEither<Error, number>) => {
switch (publisher.publisherType) {
case PublisherType.DISCORD:
return (message) => TE.tryCatch(() => publishDiscord(publisher.post as DiscordPost, message), toError);
case PublisherType.NTFY:
return (message) => TE.tryCatch(() => publishNtfy(publisher.post as NtfyPost, message), toError);
case PublisherType.CONSOLE:
return (message) =>
pipe(
(publisher.post as Logger).log("\n>=====<\n" + message + "\n>=====<"),
TE.fromIO,
TE.map(() => 200)
);
default:
return (_message) => TE.left(new Error("unknown publisher type: " + publisher.publisherType));
}
};

View File

@ -1,11 +0,0 @@
export interface NtfyPost {
webhook: string;
}
export const publishNtfy = async (ntfyPost: NtfyPost, message: string) => {
console.log("publishing to Ntfy");
return fetch(ntfyPost.webhook, {
method: "POST",
body: message
}).then((r) => r.status);
};

View File

@ -10,7 +10,7 @@ export enum DurationUnit {
MILLISECOND,
SECOND,
MINUTE,
HOUR
HOUR,
}
const durationUnitMap: Record<string, DurationUnit> = {
ms: DurationUnit.MILLISECOND,
@ -21,14 +21,17 @@ const durationUnitMap: Record<string, DurationUnit> = {
minutes: DurationUnit.MINUTE,
hr: DurationUnit.HOUR,
hour: DurationUnit.HOUR,
hours: DurationUnit.HOUR
hours: DurationUnit.HOUR,
};
const getDurationUnit = (key: string): O.Option<DurationUnit> => O.fromNullable(durationUnitMap[key.toLowerCase()]);
const getDurationUnit = (key: string): O.Option<DurationUnit> =>
O.fromNullable(durationUnitMap[key.toLowerCase()]);
export const getMs = (duration: Duration): number => duration;
export const getSeconds = (duration: Duration): number => duration / 1000;
export const getMinutes = (duration: Duration): number => getSeconds(duration) / 60;
export const getHours = (duration: Duration): number => getMinutes(duration) / 60;
export const getMinutes = (duration: Duration): number =>
getSeconds(duration) / 60;
export const getHours = (duration: Duration): number =>
getMinutes(duration) / 60;
export const format = (duration: Duration): string => {
const ms = getMs(duration) % 1000;
const seconds = getSeconds(duration) % 60;
@ -36,7 +39,9 @@ export const format = (duration: Duration): string => {
const hours = getHours(duration);
return (
[hours, minutes, seconds].map((x) => Math.floor(x).toString().padStart(2, "0")).join(":") +
[hours, minutes, seconds]
.map((x) => Math.floor(x).toString().padStart(2, "0"))
.join(":") +
"." +
ms.toString().padStart(3, "0")
);
@ -52,40 +57,49 @@ export const createDurationBuilder = (): DurationBuilder => ({
millis: 0,
seconds: 0,
minutes: 0,
hours: 0
hours: 0,
});
export type DurationBuilderField<T> = (arg: T) => (builder: DurationBuilder) => DurationBuilder;
export type DurationBuilderField<T> = (
arg: T,
) => (builder: DurationBuilder) => DurationBuilder;
export const withMillis: DurationBuilderField<number> = (millis) => (builder) => ({
...builder,
millis
});
export const withMillis: DurationBuilderField<number> =
(millis) => (builder) => ({
...builder,
millis,
});
export const withSeconds: DurationBuilderField<number> = (seconds) => (builder) => ({
...builder,
seconds
});
export const withSeconds: DurationBuilderField<number> =
(seconds) => (builder) => ({
...builder,
seconds,
});
export const withMinutes: DurationBuilderField<number> = (minutes) => (builder) => ({
...builder,
minutes
});
export const withMinutes: DurationBuilderField<number> =
(minutes) => (builder) => ({
...builder,
minutes,
});
export const withHours: DurationBuilderField<number> = (hours) => (builder) => ({
...builder,
hours
});
export const withHours: DurationBuilderField<number> =
(hours) => (builder) => ({
...builder,
hours,
});
export const build = (builder: DurationBuilder): Duration =>
builder.millis + builder.seconds * 1000 + builder.minutes * 60 * 1000 + builder.hours * 60 * 60 * 1000;
builder.millis +
builder.seconds * 1000 +
builder.minutes * 60 * 1000 +
builder.hours * 60 * 60 * 1000;
export const parse = (duration: string): E.Either<string, Duration> => {
const parts = pipe(
duration,
S.split(" "),
R.map(S.trim),
R.filter((part) => !S.isEmpty(part))
R.filter((part) => !S.isEmpty(part)),
);
const valueUnitPairs = pipe(
@ -106,34 +120,36 @@ export const parse = (duration: string): E.Either<string, Duration> => {
E.map(
flow(
R.filter(O.isSome),
R.map(({ value }) => value)
)
)
R.map(({ value }) => value),
),
),
);
return pipe(
valueUnitPairs,
E.flatMap(
R.reduce(E.of<string, DurationBuilder>(createDurationBuilder()), (builderEither, [unit, value]) =>
pipe(
builderEither,
E.chain((builder) => {
switch (unit) {
case DurationUnit.MILLISECOND:
return E.right(withMillis(value)(builder));
case DurationUnit.SECOND:
return E.right(withSeconds(value)(builder));
case DurationUnit.MINUTE:
return E.right(withMinutes(value)(builder));
case DurationUnit.HOUR:
return E.right(withHours(value)(builder));
default:
return E.left(`unknown unit: ${unit}`);
}
})
)
)
R.reduce(
E.of<string, DurationBuilder>(createDurationBuilder()),
(builderEither, [unit, value]) =>
pipe(
builderEither,
E.chain((builder) => {
switch (unit) {
case DurationUnit.MILLISECOND:
return E.right(withMillis(value)(builder));
case DurationUnit.SECOND:
return E.right(withSeconds(value)(builder));
case DurationUnit.MINUTE:
return E.right(withMinutes(value)(builder));
case DurationUnit.HOUR:
return E.right(withHours(value)(builder));
default:
return E.left(`unknown unit: ${unit}`);
}
}),
),
),
),
E.map(build)
E.map(build),
);
};

View File

@ -2,22 +2,8 @@ import type { IO } from "fp-ts/lib/IO";
export interface Logger {
log: (message: string) => IO<void>;
error: (message: string) => IO<void>;
addPrefix: (prefix: string) => Logger;
}
export class ConsoleLoggerI implements Logger {
constructor(private readonly prefix = "") {}
public log(message: string) {
return () => console.log(new Date(), "[INFO]", this.prefix, message);
}
public error(message: string) {
return () => console.error(new Date(), "[ERROR]", this.prefix, message);
}
public addPrefix(prefix: string): ConsoleLoggerI {
return new ConsoleLoggerI(this.prefix + prefix);
}
}
export const ConsoleLogger = new ConsoleLoggerI();
export const ConsoleLogger: Logger = {
log: (message: string) => () => console.log(message),
};

93
src/util/scheduler.ts Normal file
View File

@ -0,0 +1,93 @@
import * as TE from "fp-ts/lib/TaskEither";
import * as IO from "fp-ts/lib/IO";
import * as RA from "fp-ts/lib/ReadonlyArray";
import * as RM from "fp-ts/lib/ReadonlyMap";
import * as T from "fp-ts/Task";
import * as S from "fp-ts/string";
import { contramap, type Eq } from "fp-ts/Eq";
import { pipe, identity } from "fp-ts/lib/function";
import type { Schedule } from "../canary";
import { ConsoleLogger, type Logger } from "./logger";
import type { Separated } from "fp-ts/lib/Separated";
import type { Magma } from "fp-ts/lib/Magma";
import { intercalate } from "fp-ts/lib/Foldable";
interface Unit {}
const Unit: Unit = {};
interface ScheduledJob {
id: string;
execute: <TResult>() => TE.TaskEither<Error, TResult>;
at: Date;
schedule: Schedule;
}
type SchedulerState = ReadonlyArray<ScheduledJob>;
const logState = (
state: SchedulerState,
now: Date,
logger: Logger = ConsoleLogger,
) => {
const ids = pipe(
state,
RA.map(({ id }) => id),
(ids) => intercalate(S.Monoid, RA.Foldable)("|", ids),
);
return logger.log(`Executing ${ids} at ${now.toUTCString()}`);
};
const executeDueJobs = <TResult>(
state: SchedulerState,
): IO.IO<
T.Task<
Separated<ReadonlyArray<[string, Error]>, ReadonlyArray<[string, TResult]>>
>
> =>
pipe(
IO.of(state),
IO.bindTo("state"),
IO.bind("now", () => () => new Date()),
IO.bind("toExecute", ({ now, state }) =>
pipe(IO.of(state), IO.map(RA.filter(({ at }) => at <= now))),
),
IO.flatMap(({ state, now }) =>
pipe(
IO.of(state),
IO.tap((state) => logState(state, now)),
IO.map((state) =>
pipe(
state,
RA.map(({ execute, id }) =>
pipe(
execute(),
TE.bimap(
(error) => [id, error] as [string, Error],
(result) => [id, result] as [string, TResult],
),
),
),
),
),
),
),
IO.map((jobs) => pipe(jobs, RA.wilt(T.ApplicativePar)(identity))),
);
const jobEq: Eq<ScheduledJob> = pipe(
S.Eq,
contramap(({ id }) => id),
);
const jobMagma: Magma<ScheduledJob> = {
concat: (a: ScheduledJob, b: ScheduledJob): ScheduledJob => ({
...a,
...b,
}),
};
export const schedulerLoop = (jobs: ReadonlyArray<ScheduledJob>) => {
const jobsTable: Map<
string,
{ retries: number; specification: ScheduledJob }
> = pipe(jobs, RM.fromFoldable(jobEq, jobMagma, RA.Foldable));
};

View File

@ -1,197 +0,0 @@
import { mock, test, expect } from "bun:test";
import * as TE from "fp-ts/lib/TaskEither";
import type { Logger } from "../../src/util";
import {
perform,
type JobsTable,
execute,
formatJobResults,
logExecutingJobs,
type Job
} from "../../src/canary/scheduler";
const getMocks = () => {
const mockLogger: Logger = {
log: mock(),
error: mock()
};
return { mockLogger };
};
const schedule = {
every: 200,
jitter: 100
};
test("logging", () => {
const { mockLogger } = getMocks();
const jobs: Job[] = [
{ id: "1", toString: () => "Job 1", execute: mock(), schedule, maxRetries: 3 },
{ id: "2", toString: () => "Job 2", execute: mock(), schedule, maxRetries: 3 }
];
const now = new Date("2023-01-01T00:00:00Z");
logExecutingJobs(jobs, now, mockLogger);
expect(mockLogger.log).toHaveBeenCalledWith("Executing Job 1|Job 2 at Sun, 01 Jan 2023 00:00:00 GMT");
});
test("should separate jobs into successful and failed executions", async () => {
const job1: Job = {
id: "1",
toString: () => "Job 1",
execute: mock(() => TE.right("Result 1") as any),
schedule,
maxRetries: 3
};
const job2: Job = {
id: "2",
toString: () => "Job 2",
execute: mock(() => TE.left(new Error("Failure 2")) as any),
schedule,
maxRetries: 3
};
const jobs: Job[] = [job1, job2];
const result = await execute(jobs)();
expect(result.left).toEqual([[job2, new Error("Failure 2")]]);
expect(result.right).toEqual([[job1, "Result 1"]]);
});
test("should format job results correctly", () => {
const jobsTable: JobsTable = new Map([["1", { scheduled: new Date("2023-01-01T00:00:00Z"), retries: 1 }]]);
const left = [
[{ id: "1", toString: () => "Job 1", execute: mock(), schedule: {}, maxRetries: 3 }, new Error("Error 1")]
];
const right = [[{ id: "2", toString: () => "Job 2", execute: mock(), schedule: {}, maxRetries: 3 }, "Success 2"]];
const result = formatJobResults(jobsTable, { left, right } as any);
expect(result).toContain("Job 1 | 1 / 3 | (retry) :/ | Error 1");
expect(result).toContain("Job 2 | (success) :) | Success 2");
});
test("should update jobsTable and lastPingAck correctly", async () => {
const jobsTable: JobsTable = new Map([["1", { scheduled: new Date("2023-01-01T00:00:00Z"), retries: 1 }]]);
const jobs: Job[] = [
{
id: "1",
toString: () => "Job 1",
execute: mock(() => TE.right("Result 1") as any),
schedule,
maxRetries: 3
}
];
const publishers: any = [];
const lastPingAck = new Date("2023-01-01T00:00:00Z");
const result = await perform(jobsTable, jobs, publishers, lastPingAck)();
expect(result.lastPingAck).not.toEqual(lastPingAck);
expect(result.jobsTable.get("1")).toEqual({
retries: 0,
scheduled: expect.any(Date)
});
});
test("should update a job with retry count on failure", async () => {
// Create a mock job that fails the first time but succeeds the second time
const job1: Job = {
id: "1",
toString: () => "Job 1",
execute: mock(() => TE.left(new Error("Error 1")) as any),
schedule,
maxRetries: 2
};
const jobsTable: JobsTable = new Map([["1", { scheduled: new Date("2023-01-01T00:00:00Z"), retries: 0 }]]);
const jobs: Job[] = [job1];
const publishers: any = [];
const lastPingAck = new Date("2023-01-01T00:00:00Z");
const result = await perform(jobsTable, jobs, publishers, lastPingAck, 24 * 60 * 60 * 1000)();
// Assert the job was retried once and then succeeded
expect(job1.execute).toHaveBeenCalled();
// Check the jobsTable for the updated state
expect(result.jobsTable.get("1")).toEqual({
retries: 1,
scheduled: expect.any(Date)
});
});
test("should reschedule a job that hits max retries", async () => {
// Create a mock job that fails the first time but succeeds the second time
const job1: Job = {
id: "1",
toString: () => "Job 1",
execute: mock().mockReturnValue(TE.left(new Error("Error 1"))),
schedule,
maxRetries: 4
};
const jobsTable: JobsTable = new Map([["1", { scheduled: new Date("2023-01-01T00:00:00Z"), retries: 4 }]]);
const jobs: Job[] = [job1];
const publishers: any = [mock().mockReturnValue(TE.right(200))];
const lastPingAck = new Date("2023-01-01T00:00:00Z");
const now = Date.now();
const result = await perform(jobsTable, jobs, publishers, lastPingAck, 24 * 60 * 60 * 1000)();
const delta = Date.now() - now;
// Assert the job was retried once and then fail
expect(job1.execute).toHaveBeenCalled();
// Check the jobsTable for the updated state
const { retries, scheduled } = result.jobsTable.get("1")!;
expect(retries).toEqual(0);
expect(publishers[0]).toHaveBeenCalled();
expect(scheduled.getTime()).toBeGreaterThan(now - delta + schedule.every);
expect(scheduled.getTime()).toBeLessThan(now + delta + schedule.every + schedule.jitter);
});
test("should not publish only successes when should not ack", async () => {
// Create a mock job that fails the first time but succeeds the second time
const job1: Job = {
id: "1",
toString: () => "Job 1",
execute: mock().mockReturnValue(TE.right(new Error("Error 1"))),
schedule,
maxRetries: 4
};
const jobsTable: JobsTable = new Map([["1", { scheduled: new Date("2023-01-01T00:00:00Z"), retries: 0 }]]);
const jobs: Job[] = [job1];
const publishers: any = [mock().mockReturnValue(TE.right(200))];
const lastPingAck = new Date();
await perform(jobsTable, jobs, publishers, lastPingAck, 24 * 60 * 60 * 1000)();
expect(job1.execute).toHaveBeenCalled();
expect(publishers[0]).toHaveBeenCalledTimes(0);
});
test("should publish when should ack", async () => {
// Create a mock job that fails the first time but succeeds the second time
const job1: Job = {
id: "1",
toString: () => "Job 1",
execute: mock().mockReturnValue(TE.right(new Error("Error 1"))),
schedule,
maxRetries: 4
};
const jobsTable: JobsTable = new Map([["1", { scheduled: new Date("2023-01-01T00:00:00Z"), retries: 0 }]]);
const jobs: Job[] = [job1];
const publishers: any = [mock().mockReturnValue(TE.right(200))];
const lastPingAck = new Date("2023-01-01T00:00:00Z");
await perform(jobsTable, jobs, publishers, lastPingAck, 24 * 60 * 60 * 1000)();
expect(job1.execute).toHaveBeenCalled();
expect(publishers[0]).toHaveBeenCalled();
});