Compare commits
No commits in common. "main" and "scheduler" have entirely different histories.
18
.prettierrc
18
.prettierrc
|
@ -1,18 +0,0 @@
|
||||||
{
|
|
||||||
"semi": true,
|
|
||||||
"singleQuote": false,
|
|
||||||
"trailingComma": "none",
|
|
||||||
"useTabs": false,
|
|
||||||
"printWidth": 120,
|
|
||||||
"endOfLine": "lf",
|
|
||||||
"overrides": [
|
|
||||||
{
|
|
||||||
"files": ["**/*.scss"],
|
|
||||||
"options": {
|
|
||||||
"useTabs": false,
|
|
||||||
"tabWidth": 2,
|
|
||||||
"singleQuote": true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
]
|
|
||||||
}
|
|
|
@ -3,14 +3,22 @@ import * as TE from "fp-ts/lib/TaskEither";
|
||||||
import * as O from "fp-ts/lib/Option";
|
import * as O from "fp-ts/lib/Option";
|
||||||
import { createTransport } from "nodemailer";
|
import { createTransport } from "nodemailer";
|
||||||
import { toError } from "fp-ts/lib/Either";
|
import { toError } from "fp-ts/lib/Either";
|
||||||
import { flow, pipe } from "fp-ts/lib/function";
|
import { pipe } from "fp-ts/lib/function";
|
||||||
import { ImapFlow, type FetchMessageObject, type FetchQueryObject, type MailboxLockObject } from "imapflow";
|
import {
|
||||||
|
ImapFlow,
|
||||||
|
type FetchMessageObject,
|
||||||
|
type FetchQueryObject,
|
||||||
|
type MailboxLockObject,
|
||||||
|
} from "imapflow";
|
||||||
import * as IO from "fp-ts/lib/IO";
|
import * as IO from "fp-ts/lib/IO";
|
||||||
import * as T from "fp-ts/lib/Task";
|
import * as T from "fp-ts/lib/Task";
|
||||||
import { ConsoleLogger, type Logger } from "../util";
|
import { ConsoleLogger } from "../util";
|
||||||
|
|
||||||
interface ImapClientI {
|
interface ImapClientI {
|
||||||
fetchAll: (range: string, options: FetchQueryObject) => Promise<FetchMessageObject[]>;
|
fetchAll: (
|
||||||
|
range: string,
|
||||||
|
options: FetchQueryObject,
|
||||||
|
) => Promise<FetchMessageObject[]>;
|
||||||
connect: () => Promise<void>;
|
connect: () => Promise<void>;
|
||||||
getMailboxLock: (mailbox: string) => Promise<MailboxLockObject>;
|
getMailboxLock: (mailbox: string) => Promise<MailboxLockObject>;
|
||||||
messageDelete: (uids: number[]) => Promise<boolean>;
|
messageDelete: (uids: number[]) => Promise<boolean>;
|
||||||
|
@ -32,7 +40,10 @@ class ErrorWithLock extends Error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
const ToErrorWithLock = (lock?: MailboxLockObject) => (error: unknown) =>
|
const ToErrorWithLock = (lock?: MailboxLockObject) => (error: unknown) =>
|
||||||
new ErrorWithLock(error instanceof Error ? error.message : "Unknown error", lock);
|
new ErrorWithLock(
|
||||||
|
error instanceof Error ? error.message : "Unknown error",
|
||||||
|
lock,
|
||||||
|
);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Generate a unique email.
|
* Generate a unique email.
|
||||||
|
@ -40,31 +51,42 @@ const ToErrorWithLock = (lock?: MailboxLockObject) => (error: unknown) =>
|
||||||
* @param to is the email to send to.
|
* @param to is the email to send to.
|
||||||
* @returns an {@link Email}.
|
* @returns an {@link Email}.
|
||||||
*/
|
*/
|
||||||
type EmailGenerator = (from: EmailFromInstruction, to: EmailToInstruction) => IO.IO<Email>;
|
type EmailGenerator = (
|
||||||
const generateEmail: EmailGenerator = (from: EmailFromInstruction, to: EmailToInstruction) => () => ({
|
from: EmailFromInstruction,
|
||||||
from: from.email,
|
to: EmailToInstruction,
|
||||||
to: to.email,
|
) => IO.IO<Email>;
|
||||||
subject: [new Date().toISOString(), crypto.randomUUID()].join(" | "),
|
const generateEmail: EmailGenerator =
|
||||||
text: crypto.randomUUID()
|
(from: EmailFromInstruction, to: EmailToInstruction) => () => ({
|
||||||
});
|
from: from.email,
|
||||||
|
to: to.email,
|
||||||
|
subject: [new Date().toISOString(), crypto.randomUUID()].join(" | "),
|
||||||
|
text: crypto.randomUUID(),
|
||||||
|
});
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the transport layer for a mailbox to send a piece of mail.
|
* Get the transport layer for a mailbox to send a piece of mail.
|
||||||
* @param param0 is the mailbox to send from.
|
* @param param0 is the mailbox to send from.
|
||||||
* @returns a function that takes an email and sends it.
|
* @returns a function that takes an email and sends it.
|
||||||
*/
|
*/
|
||||||
type GetSendEmail = (from: EmailFromInstruction) => (email: Email) => TE.TaskEither<Error, Email>;
|
type GetSendEmail = (
|
||||||
const getSendTransport: GetSendEmail = ({ username, password, server, send_port }) => {
|
from: EmailFromInstruction,
|
||||||
|
) => (email: Email) => TE.TaskEither<Error, Email>;
|
||||||
|
const getSendTransport: GetSendEmail = ({
|
||||||
|
username,
|
||||||
|
password,
|
||||||
|
server,
|
||||||
|
send_port,
|
||||||
|
}) => {
|
||||||
const transport = createTransport({
|
const transport = createTransport({
|
||||||
host: server,
|
host: server,
|
||||||
port: send_port,
|
port: send_port,
|
||||||
auth: {
|
auth: {
|
||||||
user: username,
|
user: username,
|
||||||
pass: password
|
pass: password,
|
||||||
},
|
},
|
||||||
tls: {
|
tls: {
|
||||||
rejectUnauthorized: false
|
rejectUnauthorized: false,
|
||||||
}
|
},
|
||||||
});
|
});
|
||||||
return (email: Email) =>
|
return (email: Email) =>
|
||||||
TE.tryCatch(
|
TE.tryCatch(
|
||||||
|
@ -76,9 +98,9 @@ const getSendTransport: GetSendEmail = ({ username, password, server, send_port
|
||||||
} else {
|
} else {
|
||||||
resolve(email);
|
resolve(email);
|
||||||
}
|
}
|
||||||
})
|
}),
|
||||||
),
|
),
|
||||||
toError
|
toError,
|
||||||
);
|
);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -87,7 +109,9 @@ const getSendTransport: GetSendEmail = ({ username, password, server, send_port
|
||||||
* @param param0 is the mailbox to read from.
|
* @param param0 is the mailbox to read from.
|
||||||
* @returns a Right({@link ImapFlow}) if it connected, else an Left(error).
|
* @returns a Right({@link ImapFlow}) if it connected, else an Left(error).
|
||||||
*/
|
*/
|
||||||
type GetImapClient = (to: EmailToInstruction) => TE.TaskEither<Error, ImapClientI>;
|
type GetImapClient = (
|
||||||
|
to: EmailToInstruction,
|
||||||
|
) => TE.TaskEither<Error, ImapClientI>;
|
||||||
const getImap: GetImapClient = ({ username, password, server, read_port }) => {
|
const getImap: GetImapClient = ({ username, password, server, read_port }) => {
|
||||||
const imap = new ImapFlow({
|
const imap = new ImapFlow({
|
||||||
logger: false,
|
logger: false,
|
||||||
|
@ -96,8 +120,8 @@ const getImap: GetImapClient = ({ username, password, server, read_port }) => {
|
||||||
secure: true,
|
secure: true,
|
||||||
auth: {
|
auth: {
|
||||||
user: username,
|
user: username,
|
||||||
pass: password
|
pass: password,
|
||||||
}
|
},
|
||||||
});
|
});
|
||||||
return TE.tryCatch(() => imap.connect().then(() => imap), toError);
|
return TE.tryCatch(() => imap.connect().then(() => imap), toError);
|
||||||
};
|
};
|
||||||
|
@ -106,16 +130,18 @@ const getImap: GetImapClient = ({ username, password, server, read_port }) => {
|
||||||
* @param imap is the Imap client to fetch messages from.
|
* @param imap is the Imap client to fetch messages from.
|
||||||
* @returns a Right({@link FetchMessageObject}[]) if successful, else a Left(error).
|
* @returns a Right({@link FetchMessageObject}[]) if successful, else a Left(error).
|
||||||
*/
|
*/
|
||||||
const fetchMessages = (imap: ImapClientI): TE.TaskEither<Error, FetchMessageObject[]> =>
|
const fetchMessages = (
|
||||||
|
imap: ImapClientI,
|
||||||
|
): TE.TaskEither<Error, FetchMessageObject[]> =>
|
||||||
TE.tryCatch(
|
TE.tryCatch(
|
||||||
() =>
|
() =>
|
||||||
imap.fetchAll("*", {
|
imap.fetchAll("*", {
|
||||||
uid: true,
|
uid: true,
|
||||||
envelope: true,
|
envelope: true,
|
||||||
headers: true,
|
headers: true,
|
||||||
bodyParts: ["text"]
|
bodyParts: ["text"],
|
||||||
}),
|
}),
|
||||||
toError
|
toError,
|
||||||
);
|
);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -126,7 +152,8 @@ const fetchMessages = (imap: ImapClientI): TE.TaskEither<Error, FetchMessageObje
|
||||||
type EmailMatcher = (email: Email) => (message: FetchMessageObject) => boolean;
|
type EmailMatcher = (email: Email) => (message: FetchMessageObject) => boolean;
|
||||||
const matchesEmail: EmailMatcher = (email) => (message) => {
|
const matchesEmail: EmailMatcher = (email) => (message) => {
|
||||||
const subjectMatches = email.subject === message.envelope.subject;
|
const subjectMatches = email.subject === message.envelope.subject;
|
||||||
const bodyMatches = message.bodyParts.get("text")?.toString().trim() === email.text.trim();
|
const bodyMatches =
|
||||||
|
message.bodyParts.get("text")?.toString().trim() === email.text.trim();
|
||||||
const headers = message.headers.toLocaleString();
|
const headers = message.headers.toLocaleString();
|
||||||
const fromMatches = headers.includes(`Return-Path: <${email.from}>`);
|
const fromMatches = headers.includes(`Return-Path: <${email.from}>`);
|
||||||
const toMatches = headers.includes(`Delivered-To: ${email.to}`);
|
const toMatches = headers.includes(`Delivered-To: ${email.to}`);
|
||||||
|
@ -146,9 +173,13 @@ type FindEmailUidInInbox = (
|
||||||
equalsEmail: (message: FetchMessageObject) => boolean,
|
equalsEmail: (message: FetchMessageObject) => boolean,
|
||||||
retries: number,
|
retries: number,
|
||||||
pollIntervalMs: number,
|
pollIntervalMs: number,
|
||||||
logger?: Logger
|
|
||||||
) => TE.TaskEither<Error, number>;
|
) => TE.TaskEither<Error, number>;
|
||||||
const findEmailUidInInbox: FindEmailUidInInbox = (imap, equalsEmail, retries, pollIntervalMs, logger = ConsoleLogger) =>
|
const findEmailUidInInbox: FindEmailUidInInbox = (
|
||||||
|
imap,
|
||||||
|
equalsEmail,
|
||||||
|
retries,
|
||||||
|
pollIntervalMs,
|
||||||
|
) =>
|
||||||
pipe(
|
pipe(
|
||||||
fetchMessages(imap),
|
fetchMessages(imap),
|
||||||
TE.flatMap((messages) => {
|
TE.flatMap((messages) => {
|
||||||
|
@ -161,17 +192,18 @@ const findEmailUidInInbox: FindEmailUidInInbox = (imap, equalsEmail, retries, po
|
||||||
TE.fold(
|
TE.fold(
|
||||||
(e) =>
|
(e) =>
|
||||||
pipe(
|
pipe(
|
||||||
TE.fromIO(logger.log(`email failed; ${retries} retries left.`)),
|
TE.fromIO(ConsoleLogger.log(`failed; ${retries} retries left.`)),
|
||||||
TE.chain(() => (retries === 0 ? TE.left(e) : T.delay(pollIntervalMs)(TE.right(null)))),
|
TE.chain(() =>
|
||||||
TE.chain(() => findEmailUidInInbox(imap, equalsEmail, retries - 1, pollIntervalMs))
|
retries === 0
|
||||||
|
? TE.left(e)
|
||||||
|
: T.delay(pollIntervalMs)(TE.right(null)),
|
||||||
|
),
|
||||||
|
TE.chain(() =>
|
||||||
|
findEmailUidInInbox(imap, equalsEmail, retries - 1, pollIntervalMs),
|
||||||
|
),
|
||||||
),
|
),
|
||||||
(s) =>
|
TE.of,
|
||||||
pipe(
|
),
|
||||||
s,
|
|
||||||
TE.of,
|
|
||||||
TE.tap(() => TE.fromIO(logger.log("Email succeeded")))
|
|
||||||
)
|
|
||||||
)
|
|
||||||
);
|
);
|
||||||
|
|
||||||
export type EmailJobDependencies = {
|
export type EmailJobDependencies = {
|
||||||
|
@ -193,27 +225,40 @@ export const perform = (
|
||||||
getSendImpl = getSendTransport,
|
getSendImpl = getSendTransport,
|
||||||
getImapImpl = getImap,
|
getImapImpl = getImap,
|
||||||
findEmailUidInInboxImpl = findEmailUidInInbox,
|
findEmailUidInInboxImpl = findEmailUidInInbox,
|
||||||
matchesEmailImpl = matchesEmail
|
matchesEmailImpl = matchesEmail,
|
||||||
}: Partial<EmailJobDependencies> = {}
|
}: Partial<EmailJobDependencies> = {},
|
||||||
): TE.TaskEither<Error, boolean> =>
|
): TE.TaskEither<Error, boolean> =>
|
||||||
pipe(
|
pipe(
|
||||||
// arrange.
|
// arrange.
|
||||||
TE.fromIO(generateEmailImpl(from, to)),
|
TE.fromIO(generateEmailImpl(from, to)),
|
||||||
TE.bindTo("email"),
|
TE.bindTo("email"),
|
||||||
// act.
|
// act.
|
||||||
TE.tap(({ email }) => pipe(getSendImpl(from)(email), TE.mapLeft(ToErrorWithLock()))),
|
TE.tap(({ email }) =>
|
||||||
|
pipe(getSendImpl(from)(email), TE.mapLeft(ToErrorWithLock())),
|
||||||
|
),
|
||||||
TE.bind("imap", () => pipe(getImapImpl(to), TE.mapLeft(ToErrorWithLock()))),
|
TE.bind("imap", () => pipe(getImapImpl(to), TE.mapLeft(ToErrorWithLock()))),
|
||||||
TE.bind("mailboxLock", ({ imap }) => TE.tryCatch(() => imap.getMailboxLock("INBOX"), ToErrorWithLock())),
|
TE.bind("mailboxLock", ({ imap }) =>
|
||||||
|
TE.tryCatch(() => imap.getMailboxLock("INBOX"), ToErrorWithLock()),
|
||||||
|
),
|
||||||
// "assert".
|
// "assert".
|
||||||
TE.bind("uid", ({ imap, email, mailboxLock }) =>
|
TE.bind("uid", ({ imap, email, mailboxLock }) =>
|
||||||
pipe(
|
pipe(
|
||||||
findEmailUidInInboxImpl(imap, matchesEmailImpl(email), retries, interval),
|
findEmailUidInInboxImpl(
|
||||||
TE.mapLeft(ToErrorWithLock(mailboxLock))
|
imap,
|
||||||
)
|
matchesEmailImpl(email),
|
||||||
|
retries,
|
||||||
|
interval,
|
||||||
|
),
|
||||||
|
TE.mapLeft(ToErrorWithLock(mailboxLock)),
|
||||||
|
),
|
||||||
),
|
),
|
||||||
// cleanup.
|
// cleanup.
|
||||||
TE.bind("deleted", ({ imap, uid, mailboxLock }) =>
|
TE.bind("deleted", ({ imap, uid, mailboxLock }) =>
|
||||||
TE.tryCatch(() => imap.messageDelete([uid]), ToErrorWithLock(mailboxLock))
|
TE.tryCatch(
|
||||||
|
// () => imap.messageDelete([uid], { uid: true }),
|
||||||
|
() => imap.messageDelete([uid]),
|
||||||
|
ToErrorWithLock(mailboxLock),
|
||||||
|
),
|
||||||
),
|
),
|
||||||
TE.fold(
|
TE.fold(
|
||||||
(e) => {
|
(e) => {
|
||||||
|
@ -225,6 +270,6 @@ export const perform = (
|
||||||
({ mailboxLock, deleted }) => {
|
({ mailboxLock, deleted }) => {
|
||||||
mailboxLock.release();
|
mailboxLock.release();
|
||||||
return TE.right(deleted);
|
return TE.right(deleted);
|
||||||
}
|
},
|
||||||
)
|
),
|
||||||
);
|
);
|
||||||
|
|
|
@ -38,4 +38,4 @@ export interface EmailJob {
|
||||||
readRetry: Retry;
|
readRetry: Retry;
|
||||||
}
|
}
|
||||||
|
|
||||||
export type TestJob = EmailJob | PingJob | HealthCheckJob | DnsJob;
|
export type Job = EmailJob | PingJob | HealthCheckJob | DnsJob;
|
||||||
|
|
|
@ -1,197 +0,0 @@
|
||||||
import * as TE from "fp-ts/lib/TaskEither";
|
|
||||||
import * as IO from "fp-ts/lib/IO";
|
|
||||||
import * as RA from "fp-ts/lib/ReadonlyArray";
|
|
||||||
import * as RM from "fp-ts/lib/ReadonlyMap";
|
|
||||||
import * as T from "fp-ts/Task";
|
|
||||||
import * as S from "fp-ts/string";
|
|
||||||
import { pipe, identity } from "fp-ts/lib/function";
|
|
||||||
import type { Separated } from "fp-ts/lib/Separated";
|
|
||||||
import type { Magma } from "fp-ts/lib/Magma";
|
|
||||||
import { intercalate } from "fp-ts/lib/Foldable";
|
|
||||||
import { nextSchedule, type Schedule } from "../canary";
|
|
||||||
import { ConsoleLogger, type Logger } from "../util";
|
|
||||||
|
|
||||||
export interface Job {
|
|
||||||
id: string;
|
|
||||||
toString: () => string;
|
|
||||||
execute: () => TE.TaskEither<Error, boolean>;
|
|
||||||
schedule: Schedule;
|
|
||||||
maxRetries: number;
|
|
||||||
}
|
|
||||||
|
|
||||||
export const logExecutingJobs = (jobs: ReadonlyArray<Job>, now: Date, logger: Logger = ConsoleLogger) => {
|
|
||||||
const stringifieds = pipe(
|
|
||||||
jobs,
|
|
||||||
RA.map(({ toString }) => toString()),
|
|
||||||
(stringified) => intercalate(S.Monoid, RA.Foldable)("|", stringified)
|
|
||||||
);
|
|
||||||
return logger.log(`Executing ${stringifieds} at ${now.toUTCString()}`);
|
|
||||||
};
|
|
||||||
|
|
||||||
export const execute = <TResult>(
|
|
||||||
jobs: ReadonlyArray<Job>
|
|
||||||
): T.Task<Separated<ReadonlyArray<[Job, Error]>, ReadonlyArray<[Job, TResult]>>> =>
|
|
||||||
pipe(
|
|
||||||
T.of(jobs),
|
|
||||||
T.map(
|
|
||||||
RA.map((job) =>
|
|
||||||
pipe(
|
|
||||||
job.execute(),
|
|
||||||
TE.bimap(
|
|
||||||
(error) => [job, error] as [Job, Error],
|
|
||||||
(result) => [job, result] as [Job, TResult]
|
|
||||||
)
|
|
||||||
)
|
|
||||||
)
|
|
||||||
),
|
|
||||||
T.flatMap(RA.wilt(T.ApplicativePar)(identity))
|
|
||||||
);
|
|
||||||
|
|
||||||
export type JobsTableRow = {
|
|
||||||
scheduled: Date;
|
|
||||||
retries: number;
|
|
||||||
};
|
|
||||||
export type JobsTable = ReadonlyMap<string, JobsTableRow>;
|
|
||||||
export const constructJobsTable = (jobs: ReadonlyArray<Job>, now: Date): JobsTable => {
|
|
||||||
return pipe(
|
|
||||||
jobs,
|
|
||||||
RA.map((job) => [job.id, { retries: 0, scheduled: now }] as readonly [string, JobsTableRow]),
|
|
||||||
RM.fromFoldable(S.Eq, jobsTableRowMagma, RA.Foldable)
|
|
||||||
);
|
|
||||||
};
|
|
||||||
|
|
||||||
export const jobsTableRowMagma: Magma<JobsTableRow> = {
|
|
||||||
concat: (a: JobsTableRow, b: JobsTableRow): JobsTableRow => {
|
|
||||||
if (a.scheduled <= b.scheduled) {
|
|
||||||
// always prefer the later schedule.
|
|
||||||
return b;
|
|
||||||
}
|
|
||||||
// if (a.retries <= b.retries) {
|
|
||||||
// return a;
|
|
||||||
// }
|
|
||||||
return b;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
export const formatJobResults = <TResult>(
|
|
||||||
jobsTable: JobsTable,
|
|
||||||
{ left, right }: Separated<ReadonlyArray<[Job, Error]>, ReadonlyArray<[Job, TResult]>>
|
|
||||||
) => {
|
|
||||||
const failures = left
|
|
||||||
.map(([job, err]) => ({
|
|
||||||
job,
|
|
||||||
retries: jobsTable.get(job.id)?.retries ?? job.maxRetries,
|
|
||||||
err
|
|
||||||
}))
|
|
||||||
.filter(({ job, retries }) => retries >= job.maxRetries);
|
|
||||||
const retries = left
|
|
||||||
.map(([job, err]) => ({
|
|
||||||
job,
|
|
||||||
retries: jobsTable.get(job.id)?.retries ?? 0,
|
|
||||||
err
|
|
||||||
}))
|
|
||||||
.filter(({ job, retries }) => retries < job.maxRetries);
|
|
||||||
|
|
||||||
const failureMessages = failures
|
|
||||||
.map(({ job, err, retries }) => job.toString() + ` | ${retries} / ${job.maxRetries} | (err) :( | ${err.message}`)
|
|
||||||
.join("\n");
|
|
||||||
const retryMessages = retries
|
|
||||||
.map(
|
|
||||||
({ job, err, retries }) => job.toString() + ` | ${retries} / ${job.maxRetries} | (retry) :/ | ${err.message}`
|
|
||||||
)
|
|
||||||
.join("\n");
|
|
||||||
const successMessages = right.map(([job, val]) => job.toString() + " | (success) :) | " + val).join("\n");
|
|
||||||
return `FAILURES:\n${failureMessages}\n\nRETRIES:\n${retryMessages}\n\nSUCCESSES:\n${successMessages}`;
|
|
||||||
};
|
|
||||||
|
|
||||||
export const perform = (
|
|
||||||
jobsTable: JobsTable,
|
|
||||||
jobs: ReadonlyArray<Job>,
|
|
||||||
publishers: ReadonlyArray<(message: string) => TE.TaskEither<Error, number>>,
|
|
||||||
lastPingAck: Date,
|
|
||||||
publishAckEvery = 24 * (60 * 60 * 1_000),
|
|
||||||
logger = ConsoleLogger
|
|
||||||
) =>
|
|
||||||
pipe(
|
|
||||||
T.of(jobs),
|
|
||||||
T.bindTo("jobs"),
|
|
||||||
T.bind(
|
|
||||||
"now",
|
|
||||||
T.fromIOK(() => IO.of(new Date()))
|
|
||||||
),
|
|
||||||
T.bind("toExecute", ({ jobs, now }) =>
|
|
||||||
pipe(
|
|
||||||
jobs,
|
|
||||||
RA.filter((job) => (jobsTable.get(job.id)?.scheduled ?? now) <= now),
|
|
||||||
T.of
|
|
||||||
)
|
|
||||||
),
|
|
||||||
T.tap(({ toExecute, now }) => T.fromIO(logExecutingJobs(toExecute, now))),
|
|
||||||
T.bind("results", ({ toExecute }) => execute(toExecute)),
|
|
||||||
T.bind("shouldAck", ({ now }) => T.of(now.getTime() - lastPingAck.getTime() >= publishAckEvery)),
|
|
||||||
T.tap(({ results, shouldAck }) => {
|
|
||||||
if (results.left.length === 0 && !shouldAck) {
|
|
||||||
return pipe(
|
|
||||||
T.of({ right: [] as ReadonlyArray<number> }),
|
|
||||||
T.tap(() => T.fromIO(logger.log("not publishing")))
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
return pipe(
|
|
||||||
publishers,
|
|
||||||
RA.map((publish) => publish(formatJobResults(jobsTable, results))),
|
|
||||||
RA.wilt(T.ApplicativePar)(identity),
|
|
||||||
T.tap(({ left }) =>
|
|
||||||
pipe(
|
|
||||||
left.length
|
|
||||||
? logger.error("Encountered publishing errors: " + left.toString())
|
|
||||||
: logger.log("Published successfully"),
|
|
||||||
T.fromIO
|
|
||||||
)
|
|
||||||
)
|
|
||||||
);
|
|
||||||
}),
|
|
||||||
T.map(({ now, results: { left, right }, shouldAck }) => {
|
|
||||||
const ack = shouldAck ? now : lastPingAck;
|
|
||||||
|
|
||||||
const leftResults = pipe(
|
|
||||||
left,
|
|
||||||
RA.map(([job]): readonly [string, JobsTableRow] => {
|
|
||||||
const row = jobsTable.get(job.id);
|
|
||||||
if (!row) {
|
|
||||||
return [job.id, { retries: 1, scheduled: now }];
|
|
||||||
}
|
|
||||||
if (row.retries >= job.maxRetries) {
|
|
||||||
logger.error(`Hit max retries for; scheduling again ;-; ${job.toString()}`);
|
|
||||||
return [job.id, { retries: 0, scheduled: nextSchedule(job.schedule, now) }] as const;
|
|
||||||
}
|
|
||||||
return [
|
|
||||||
job.id,
|
|
||||||
{
|
|
||||||
retries: row.retries + 1,
|
|
||||||
scheduled: now
|
|
||||||
}
|
|
||||||
] as const;
|
|
||||||
})
|
|
||||||
);
|
|
||||||
const results = pipe(
|
|
||||||
right,
|
|
||||||
RA.map(([job]): readonly [string, JobsTableRow] => [
|
|
||||||
job.id,
|
|
||||||
{
|
|
||||||
retries: 0,
|
|
||||||
scheduled: nextSchedule(job.schedule, now)
|
|
||||||
}
|
|
||||||
]),
|
|
||||||
RA.concat(leftResults)
|
|
||||||
);
|
|
||||||
|
|
||||||
const newJobsTable = pipe(
|
|
||||||
jobsTable,
|
|
||||||
RM.toReadonlyArray(S.Ord),
|
|
||||||
RA.concat(results),
|
|
||||||
RM.fromFoldable(S.Eq, jobsTableRowMagma, RA.Foldable)
|
|
||||||
);
|
|
||||||
return { lastPingAck: ack, jobsTable: newJobsTable };
|
|
||||||
})
|
|
||||||
);
|
|
|
@ -1,14 +1,13 @@
|
||||||
import * as TE from "fp-ts/lib/TaskEither";
|
import * as TE from "fp-ts/lib/TaskEither";
|
||||||
import * as O from "fp-ts/lib/Option";
|
import * as O from "fp-ts/lib/Option";
|
||||||
import type { EmailJob, TestJob } from "./job";
|
import type { Job } from "./job";
|
||||||
import type { D } from "../util";
|
import type { D } from "../util";
|
||||||
import { perform } from "./email";
|
|
||||||
|
|
||||||
export enum TestType {
|
export enum TestType {
|
||||||
EMAIL = "email",
|
EMAIL = "email",
|
||||||
PING = "ping",
|
PING = "ping",
|
||||||
HEALTHCHECK = "healthcheck",
|
HEALTHCHECK = "healthcheck",
|
||||||
DNS = "dns"
|
DNS = "dns",
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface Schedule {
|
export interface Schedule {
|
||||||
|
@ -19,20 +18,13 @@ export interface Schedule {
|
||||||
export interface Test {
|
export interface Test {
|
||||||
name: string;
|
name: string;
|
||||||
type: TestType;
|
type: TestType;
|
||||||
job: TestJob;
|
job: Job;
|
||||||
schedule: Schedule;
|
schedule: Schedule;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export type Testable<T extends Job> = (job: T) => TE.TaskEither<Error, boolean>;
|
||||||
|
|
||||||
export const parseTestType = (testType: string): O.Option<TestType> =>
|
export const parseTestType = (testType: string): O.Option<TestType> =>
|
||||||
Object.values(TestType).includes(testType as TestType) ? O.some(testType as TestType) : O.none;
|
Object.values(TestType).includes(testType as TestType)
|
||||||
|
? O.some(testType as TestType)
|
||||||
export const nextSchedule = (schedule: Schedule, date: Date) =>
|
: O.none;
|
||||||
new Date(date.getTime() + schedule.every + Math.random() * schedule.jitter);
|
|
||||||
|
|
||||||
export const executorFor = (type: TestType, job: TestJob): (() => TE.TaskEither<Error, boolean>) => {
|
|
||||||
switch (type) {
|
|
||||||
case TestType.EMAIL:
|
|
||||||
return () => perform(job as EmailJob);
|
|
||||||
}
|
|
||||||
return () => TE.right(true);
|
|
||||||
};
|
|
||||||
|
|
|
@ -1,9 +1,7 @@
|
||||||
import * as E from "fp-ts/Either";
|
import * as IO from "fp-ts/IO";
|
||||||
import type { Publisher } from "./publisher";
|
import type { Publisher } from "./publisher";
|
||||||
import { readFileSync } from "fs";
|
import { readFileSync } from "fs";
|
||||||
import { TestType, type Test, type TestJob } from "./canary";
|
import type { Test } from "./canary";
|
||||||
import * as D from "./util/duration";
|
|
||||||
import { pipe } from "fp-ts/lib/function";
|
|
||||||
|
|
||||||
export interface Config {
|
export interface Config {
|
||||||
result_publishers: Publisher[];
|
result_publishers: Publisher[];
|
||||||
|
@ -12,28 +10,9 @@ export interface Config {
|
||||||
tests: Test[];
|
tests: Test[];
|
||||||
}
|
}
|
||||||
|
|
||||||
export const transformDurations = (obj: any): E.Either<string, any> => {
|
export const readConfig =
|
||||||
const transform = (o: any): E.Either<string, any> => {
|
(filePath: string): IO.IO<Config> =>
|
||||||
const entries = Object.entries(o);
|
() => {
|
||||||
|
const confStr = readFileSync(filePath, "utf-8");
|
||||||
for (let [key, value] of entries) {
|
return JSON.parse(confStr);
|
||||||
if (key === "duration" && typeof value === "string") {
|
|
||||||
return D.parse(value);
|
|
||||||
} else if (typeof value === "object" && value !== null) {
|
|
||||||
const result = transform(value);
|
|
||||||
if (E.isLeft(result)) {
|
|
||||||
return result;
|
|
||||||
} else {
|
|
||||||
o[key] = result.right;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return E.right(o);
|
|
||||||
};
|
};
|
||||||
|
|
||||||
return transform(obj);
|
|
||||||
};
|
|
||||||
|
|
||||||
export const readConfig = (filePath: string): E.Either<string, Config> =>
|
|
||||||
pipe(readFileSync(filePath, "utf-8"), JSON.parse, transformDurations) as E.Either<string, Config>;
|
|
||||||
|
|
53
src/index.ts
53
src/index.ts
|
@ -1,51 +1,16 @@
|
||||||
import * as TE from "fp-ts/lib/TaskEither";
|
import * as TE from "fp-ts/lib/TaskEither";
|
||||||
import * as O from "fp-ts/lib/Option";
|
|
||||||
import * as RA from "fp-ts/ReadonlyArray";
|
|
||||||
import { pipe } from "fp-ts/lib/function";
|
import { pipe } from "fp-ts/lib/function";
|
||||||
|
import { toError } from "fp-ts/lib/Either";
|
||||||
import { readConfig } from "./config";
|
import { readConfig } from "./config";
|
||||||
import { parseArgs, type Args } from "./args";
|
import { parseArgs } from "./args";
|
||||||
import { executorFor, type Test, type TestType } from "./canary";
|
|
||||||
import { constructJobsTable, perform, type Job } from "./canary/scheduler";
|
|
||||||
import { publish, PublisherType } from "./publisher";
|
|
||||||
import { ConsoleLogger } from "./util";
|
|
||||||
|
|
||||||
const testFilters = (args: Args): ReadonlyArray<(test: Test) => boolean> =>
|
const main: TE.TaskEither<Error, void> = pipe(
|
||||||
(
|
|
||||||
[
|
|
||||||
[args.testType, (testType: TestType) => (test: Test) => test.type === testType],
|
|
||||||
[args.testName, (testName: string) => (test: Test) => testName === test.name]
|
|
||||||
] as ReadonlyArray<[O.Option<any>, (t: any) => (test: Test) => boolean]>
|
|
||||||
)
|
|
||||||
.map(([o, pred]) =>
|
|
||||||
pipe(
|
|
||||||
o,
|
|
||||||
O.map((val) => pred(val))
|
|
||||||
)
|
|
||||||
)
|
|
||||||
.filter(O.isSome)
|
|
||||||
.map(({ value }) => value);
|
|
||||||
|
|
||||||
const main: TE.TaskEither<string | Error, any> = pipe(
|
|
||||||
TE.fromEither(parseArgs(Bun.argv)),
|
TE.fromEither(parseArgs(Bun.argv)),
|
||||||
TE.bindTo("args"),
|
TE.bindTo("args"),
|
||||||
TE.bind("now", () => TE.fromIO(() => new Date())),
|
TE.bind("config", ({ args }) => TE.fromIO(readConfig(args.testsFile))),
|
||||||
TE.bind("config", ({ args }) => TE.fromEither(readConfig(args.testsFile))),
|
TE.fold(
|
||||||
TE.flatMap(({ config, args, now }) => {
|
(e) => TE.left(toError(e)),
|
||||||
const filters = testFilters(args);
|
() => TE.right(undefined),
|
||||||
const jobs = config.tests
|
),
|
||||||
.filter((test) => filters.every((filter) => filter(test)))
|
|
||||||
.map((test, i): Job => {
|
|
||||||
const toString = () => test.name;
|
|
||||||
const id = i.toString();
|
|
||||||
const schedule = test.schedule;
|
|
||||||
const maxRetries = 3;
|
|
||||||
const execute = executorFor(test.type, test.job);
|
|
||||||
return { toString, id, schedule, maxRetries, execute };
|
|
||||||
});
|
|
||||||
const jobsTable = constructJobsTable(jobs, now);
|
|
||||||
const publishers = pipe([{ publisherType: PublisherType.CONSOLE, post: ConsoleLogger }] as const, RA.map(publish));
|
|
||||||
|
|
||||||
return pipe(perform(jobsTable, jobs, publishers, new Date()), TE.fromTask);
|
|
||||||
})
|
|
||||||
);
|
);
|
||||||
main().then(() => process.exit());
|
main();
|
||||||
|
|
|
@ -1,19 +0,0 @@
|
||||||
import { $ } from "bun";
|
|
||||||
|
|
||||||
export interface DiscordPost {
|
|
||||||
webhook: string;
|
|
||||||
role_id: string;
|
|
||||||
}
|
|
||||||
|
|
||||||
export const publishDiscord = async (discordPost: DiscordPost, message: string) => {
|
|
||||||
console.log("Publishing to Discord");
|
|
||||||
const ip = await $`dig +noall +short discord.com @1.1.1.1 A | shuf -n 1`.text();
|
|
||||||
return fetch(discordPost.webhook.replace("discord.com", ip), {
|
|
||||||
headers: {
|
|
||||||
Host: "discord.com"
|
|
||||||
},
|
|
||||||
method: "POST",
|
|
||||||
body: JSON.stringify({ message: `<@${discordPost.role_id}>\n${message}` }),
|
|
||||||
tls: { rejectUnauthorized: false }
|
|
||||||
}).then((r) => r.status);
|
|
||||||
};
|
|
|
@ -1,36 +1,23 @@
|
||||||
import { toError } from "fp-ts/lib/Either";
|
import { D } from "../util";
|
||||||
import * as TE from "fp-ts/TaskEither";
|
|
||||||
import { publishDiscord, type DiscordPost } from "./discord";
|
|
||||||
import { publishNtfy, type NtfyPost } from "./ntfy";
|
|
||||||
import type { Logger } from "../util";
|
|
||||||
import { pipe } from "fp-ts/lib/function";
|
|
||||||
|
|
||||||
export enum PublisherType {
|
export enum PublisherType {
|
||||||
DISCORD = "discord",
|
DISCORD = "discord",
|
||||||
NTFY = "ntfy",
|
NTFY = "ntfy",
|
||||||
CONSOLE = "console"
|
|
||||||
}
|
}
|
||||||
export type PublisherPost = DiscordPost | NtfyPost | Logger;
|
|
||||||
|
export interface DiscordPost {
|
||||||
|
webhook: string;
|
||||||
|
role_id: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface NtfyPost {
|
||||||
|
webhook: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
export type PublisherPost = DiscordPost | NtfyPost;
|
||||||
|
|
||||||
export interface Publisher {
|
export interface Publisher {
|
||||||
publisherType: PublisherType;
|
type: PublisherType;
|
||||||
|
at: D.Duration;
|
||||||
post: PublisherPost;
|
post: PublisherPost;
|
||||||
}
|
}
|
||||||
|
|
||||||
export const publish = (publisher: Publisher): ((message: string) => TE.TaskEither<Error, number>) => {
|
|
||||||
switch (publisher.publisherType) {
|
|
||||||
case PublisherType.DISCORD:
|
|
||||||
return (message) => TE.tryCatch(() => publishDiscord(publisher.post as DiscordPost, message), toError);
|
|
||||||
case PublisherType.NTFY:
|
|
||||||
return (message) => TE.tryCatch(() => publishNtfy(publisher.post as NtfyPost, message), toError);
|
|
||||||
case PublisherType.CONSOLE:
|
|
||||||
return (message) =>
|
|
||||||
pipe(
|
|
||||||
(publisher.post as Logger).log("\n>=====<\n" + message + "\n>=====<"),
|
|
||||||
TE.fromIO,
|
|
||||||
TE.map(() => 200)
|
|
||||||
);
|
|
||||||
default:
|
|
||||||
return (_message) => TE.left(new Error("unknown publisher type: " + publisher.publisherType));
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
|
@ -1,11 +0,0 @@
|
||||||
export interface NtfyPost {
|
|
||||||
webhook: string;
|
|
||||||
}
|
|
||||||
|
|
||||||
export const publishNtfy = async (ntfyPost: NtfyPost, message: string) => {
|
|
||||||
console.log("publishing to Ntfy");
|
|
||||||
return fetch(ntfyPost.webhook, {
|
|
||||||
method: "POST",
|
|
||||||
body: message
|
|
||||||
}).then((r) => r.status);
|
|
||||||
};
|
|
|
@ -10,7 +10,7 @@ export enum DurationUnit {
|
||||||
MILLISECOND,
|
MILLISECOND,
|
||||||
SECOND,
|
SECOND,
|
||||||
MINUTE,
|
MINUTE,
|
||||||
HOUR
|
HOUR,
|
||||||
}
|
}
|
||||||
const durationUnitMap: Record<string, DurationUnit> = {
|
const durationUnitMap: Record<string, DurationUnit> = {
|
||||||
ms: DurationUnit.MILLISECOND,
|
ms: DurationUnit.MILLISECOND,
|
||||||
|
@ -21,14 +21,17 @@ const durationUnitMap: Record<string, DurationUnit> = {
|
||||||
minutes: DurationUnit.MINUTE,
|
minutes: DurationUnit.MINUTE,
|
||||||
hr: DurationUnit.HOUR,
|
hr: DurationUnit.HOUR,
|
||||||
hour: DurationUnit.HOUR,
|
hour: DurationUnit.HOUR,
|
||||||
hours: DurationUnit.HOUR
|
hours: DurationUnit.HOUR,
|
||||||
};
|
};
|
||||||
const getDurationUnit = (key: string): O.Option<DurationUnit> => O.fromNullable(durationUnitMap[key.toLowerCase()]);
|
const getDurationUnit = (key: string): O.Option<DurationUnit> =>
|
||||||
|
O.fromNullable(durationUnitMap[key.toLowerCase()]);
|
||||||
|
|
||||||
export const getMs = (duration: Duration): number => duration;
|
export const getMs = (duration: Duration): number => duration;
|
||||||
export const getSeconds = (duration: Duration): number => duration / 1000;
|
export const getSeconds = (duration: Duration): number => duration / 1000;
|
||||||
export const getMinutes = (duration: Duration): number => getSeconds(duration) / 60;
|
export const getMinutes = (duration: Duration): number =>
|
||||||
export const getHours = (duration: Duration): number => getMinutes(duration) / 60;
|
getSeconds(duration) / 60;
|
||||||
|
export const getHours = (duration: Duration): number =>
|
||||||
|
getMinutes(duration) / 60;
|
||||||
export const format = (duration: Duration): string => {
|
export const format = (duration: Duration): string => {
|
||||||
const ms = getMs(duration) % 1000;
|
const ms = getMs(duration) % 1000;
|
||||||
const seconds = getSeconds(duration) % 60;
|
const seconds = getSeconds(duration) % 60;
|
||||||
|
@ -36,7 +39,9 @@ export const format = (duration: Duration): string => {
|
||||||
const hours = getHours(duration);
|
const hours = getHours(duration);
|
||||||
|
|
||||||
return (
|
return (
|
||||||
[hours, minutes, seconds].map((x) => Math.floor(x).toString().padStart(2, "0")).join(":") +
|
[hours, minutes, seconds]
|
||||||
|
.map((x) => Math.floor(x).toString().padStart(2, "0"))
|
||||||
|
.join(":") +
|
||||||
"." +
|
"." +
|
||||||
ms.toString().padStart(3, "0")
|
ms.toString().padStart(3, "0")
|
||||||
);
|
);
|
||||||
|
@ -52,40 +57,49 @@ export const createDurationBuilder = (): DurationBuilder => ({
|
||||||
millis: 0,
|
millis: 0,
|
||||||
seconds: 0,
|
seconds: 0,
|
||||||
minutes: 0,
|
minutes: 0,
|
||||||
hours: 0
|
hours: 0,
|
||||||
});
|
});
|
||||||
|
|
||||||
export type DurationBuilderField<T> = (arg: T) => (builder: DurationBuilder) => DurationBuilder;
|
export type DurationBuilderField<T> = (
|
||||||
|
arg: T,
|
||||||
|
) => (builder: DurationBuilder) => DurationBuilder;
|
||||||
|
|
||||||
export const withMillis: DurationBuilderField<number> = (millis) => (builder) => ({
|
export const withMillis: DurationBuilderField<number> =
|
||||||
...builder,
|
(millis) => (builder) => ({
|
||||||
millis
|
...builder,
|
||||||
});
|
millis,
|
||||||
|
});
|
||||||
|
|
||||||
export const withSeconds: DurationBuilderField<number> = (seconds) => (builder) => ({
|
export const withSeconds: DurationBuilderField<number> =
|
||||||
...builder,
|
(seconds) => (builder) => ({
|
||||||
seconds
|
...builder,
|
||||||
});
|
seconds,
|
||||||
|
});
|
||||||
|
|
||||||
export const withMinutes: DurationBuilderField<number> = (minutes) => (builder) => ({
|
export const withMinutes: DurationBuilderField<number> =
|
||||||
...builder,
|
(minutes) => (builder) => ({
|
||||||
minutes
|
...builder,
|
||||||
});
|
minutes,
|
||||||
|
});
|
||||||
|
|
||||||
export const withHours: DurationBuilderField<number> = (hours) => (builder) => ({
|
export const withHours: DurationBuilderField<number> =
|
||||||
...builder,
|
(hours) => (builder) => ({
|
||||||
hours
|
...builder,
|
||||||
});
|
hours,
|
||||||
|
});
|
||||||
|
|
||||||
export const build = (builder: DurationBuilder): Duration =>
|
export const build = (builder: DurationBuilder): Duration =>
|
||||||
builder.millis + builder.seconds * 1000 + builder.minutes * 60 * 1000 + builder.hours * 60 * 60 * 1000;
|
builder.millis +
|
||||||
|
builder.seconds * 1000 +
|
||||||
|
builder.minutes * 60 * 1000 +
|
||||||
|
builder.hours * 60 * 60 * 1000;
|
||||||
|
|
||||||
export const parse = (duration: string): E.Either<string, Duration> => {
|
export const parse = (duration: string): E.Either<string, Duration> => {
|
||||||
const parts = pipe(
|
const parts = pipe(
|
||||||
duration,
|
duration,
|
||||||
S.split(" "),
|
S.split(" "),
|
||||||
R.map(S.trim),
|
R.map(S.trim),
|
||||||
R.filter((part) => !S.isEmpty(part))
|
R.filter((part) => !S.isEmpty(part)),
|
||||||
);
|
);
|
||||||
|
|
||||||
const valueUnitPairs = pipe(
|
const valueUnitPairs = pipe(
|
||||||
|
@ -106,34 +120,36 @@ export const parse = (duration: string): E.Either<string, Duration> => {
|
||||||
E.map(
|
E.map(
|
||||||
flow(
|
flow(
|
||||||
R.filter(O.isSome),
|
R.filter(O.isSome),
|
||||||
R.map(({ value }) => value)
|
R.map(({ value }) => value),
|
||||||
)
|
),
|
||||||
)
|
),
|
||||||
);
|
);
|
||||||
|
|
||||||
return pipe(
|
return pipe(
|
||||||
valueUnitPairs,
|
valueUnitPairs,
|
||||||
E.flatMap(
|
E.flatMap(
|
||||||
R.reduce(E.of<string, DurationBuilder>(createDurationBuilder()), (builderEither, [unit, value]) =>
|
R.reduce(
|
||||||
pipe(
|
E.of<string, DurationBuilder>(createDurationBuilder()),
|
||||||
builderEither,
|
(builderEither, [unit, value]) =>
|
||||||
E.chain((builder) => {
|
pipe(
|
||||||
switch (unit) {
|
builderEither,
|
||||||
case DurationUnit.MILLISECOND:
|
E.chain((builder) => {
|
||||||
return E.right(withMillis(value)(builder));
|
switch (unit) {
|
||||||
case DurationUnit.SECOND:
|
case DurationUnit.MILLISECOND:
|
||||||
return E.right(withSeconds(value)(builder));
|
return E.right(withMillis(value)(builder));
|
||||||
case DurationUnit.MINUTE:
|
case DurationUnit.SECOND:
|
||||||
return E.right(withMinutes(value)(builder));
|
return E.right(withSeconds(value)(builder));
|
||||||
case DurationUnit.HOUR:
|
case DurationUnit.MINUTE:
|
||||||
return E.right(withHours(value)(builder));
|
return E.right(withMinutes(value)(builder));
|
||||||
default:
|
case DurationUnit.HOUR:
|
||||||
return E.left(`unknown unit: ${unit}`);
|
return E.right(withHours(value)(builder));
|
||||||
}
|
default:
|
||||||
})
|
return E.left(`unknown unit: ${unit}`);
|
||||||
)
|
}
|
||||||
)
|
}),
|
||||||
|
),
|
||||||
|
),
|
||||||
),
|
),
|
||||||
E.map(build)
|
E.map(build),
|
||||||
);
|
);
|
||||||
};
|
};
|
||||||
|
|
|
@ -2,22 +2,8 @@ import type { IO } from "fp-ts/lib/IO";
|
||||||
|
|
||||||
export interface Logger {
|
export interface Logger {
|
||||||
log: (message: string) => IO<void>;
|
log: (message: string) => IO<void>;
|
||||||
error: (message: string) => IO<void>;
|
|
||||||
addPrefix: (prefix: string) => Logger;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
export class ConsoleLoggerI implements Logger {
|
export const ConsoleLogger: Logger = {
|
||||||
constructor(private readonly prefix = "") {}
|
log: (message: string) => () => console.log(message),
|
||||||
|
};
|
||||||
public log(message: string) {
|
|
||||||
return () => console.log(new Date(), "[INFO]", this.prefix, message);
|
|
||||||
}
|
|
||||||
public error(message: string) {
|
|
||||||
return () => console.error(new Date(), "[ERROR]", this.prefix, message);
|
|
||||||
}
|
|
||||||
public addPrefix(prefix: string): ConsoleLoggerI {
|
|
||||||
return new ConsoleLoggerI(this.prefix + prefix);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
export const ConsoleLogger = new ConsoleLoggerI();
|
|
||||||
|
|
|
@ -0,0 +1,93 @@
|
||||||
|
import * as TE from "fp-ts/lib/TaskEither";
|
||||||
|
import * as IO from "fp-ts/lib/IO";
|
||||||
|
import * as RA from "fp-ts/lib/ReadonlyArray";
|
||||||
|
import * as RM from "fp-ts/lib/ReadonlyMap";
|
||||||
|
import * as T from "fp-ts/Task";
|
||||||
|
import * as S from "fp-ts/string";
|
||||||
|
import { contramap, type Eq } from "fp-ts/Eq";
|
||||||
|
import { pipe, identity } from "fp-ts/lib/function";
|
||||||
|
import type { Schedule } from "../canary";
|
||||||
|
import { ConsoleLogger, type Logger } from "./logger";
|
||||||
|
import type { Separated } from "fp-ts/lib/Separated";
|
||||||
|
import type { Magma } from "fp-ts/lib/Magma";
|
||||||
|
import { intercalate } from "fp-ts/lib/Foldable";
|
||||||
|
|
||||||
|
interface Unit {}
|
||||||
|
const Unit: Unit = {};
|
||||||
|
|
||||||
|
interface ScheduledJob {
|
||||||
|
id: string;
|
||||||
|
execute: <TResult>() => TE.TaskEither<Error, TResult>;
|
||||||
|
at: Date;
|
||||||
|
schedule: Schedule;
|
||||||
|
}
|
||||||
|
|
||||||
|
type SchedulerState = ReadonlyArray<ScheduledJob>;
|
||||||
|
|
||||||
|
const logState = (
|
||||||
|
state: SchedulerState,
|
||||||
|
now: Date,
|
||||||
|
logger: Logger = ConsoleLogger,
|
||||||
|
) => {
|
||||||
|
const ids = pipe(
|
||||||
|
state,
|
||||||
|
RA.map(({ id }) => id),
|
||||||
|
(ids) => intercalate(S.Monoid, RA.Foldable)("|", ids),
|
||||||
|
);
|
||||||
|
return logger.log(`Executing ${ids} at ${now.toUTCString()}`);
|
||||||
|
};
|
||||||
|
|
||||||
|
const executeDueJobs = <TResult>(
|
||||||
|
state: SchedulerState,
|
||||||
|
): IO.IO<
|
||||||
|
T.Task<
|
||||||
|
Separated<ReadonlyArray<[string, Error]>, ReadonlyArray<[string, TResult]>>
|
||||||
|
>
|
||||||
|
> =>
|
||||||
|
pipe(
|
||||||
|
IO.of(state),
|
||||||
|
IO.bindTo("state"),
|
||||||
|
IO.bind("now", () => () => new Date()),
|
||||||
|
IO.bind("toExecute", ({ now, state }) =>
|
||||||
|
pipe(IO.of(state), IO.map(RA.filter(({ at }) => at <= now))),
|
||||||
|
),
|
||||||
|
IO.flatMap(({ state, now }) =>
|
||||||
|
pipe(
|
||||||
|
IO.of(state),
|
||||||
|
IO.tap((state) => logState(state, now)),
|
||||||
|
IO.map((state) =>
|
||||||
|
pipe(
|
||||||
|
state,
|
||||||
|
RA.map(({ execute, id }) =>
|
||||||
|
pipe(
|
||||||
|
execute(),
|
||||||
|
TE.bimap(
|
||||||
|
(error) => [id, error] as [string, Error],
|
||||||
|
(result) => [id, result] as [string, TResult],
|
||||||
|
),
|
||||||
|
),
|
||||||
|
),
|
||||||
|
),
|
||||||
|
),
|
||||||
|
),
|
||||||
|
),
|
||||||
|
IO.map((jobs) => pipe(jobs, RA.wilt(T.ApplicativePar)(identity))),
|
||||||
|
);
|
||||||
|
|
||||||
|
const jobEq: Eq<ScheduledJob> = pipe(
|
||||||
|
S.Eq,
|
||||||
|
contramap(({ id }) => id),
|
||||||
|
);
|
||||||
|
const jobMagma: Magma<ScheduledJob> = {
|
||||||
|
concat: (a: ScheduledJob, b: ScheduledJob): ScheduledJob => ({
|
||||||
|
...a,
|
||||||
|
...b,
|
||||||
|
}),
|
||||||
|
};
|
||||||
|
|
||||||
|
export const schedulerLoop = (jobs: ReadonlyArray<ScheduledJob>) => {
|
||||||
|
const jobsTable: Map<
|
||||||
|
string,
|
||||||
|
{ retries: number; specification: ScheduledJob }
|
||||||
|
> = pipe(jobs, RM.fromFoldable(jobEq, jobMagma, RA.Foldable));
|
||||||
|
};
|
|
@ -1,197 +0,0 @@
|
||||||
import { mock, test, expect } from "bun:test";
|
|
||||||
import * as TE from "fp-ts/lib/TaskEither";
|
|
||||||
import type { Logger } from "../../src/util";
|
|
||||||
import {
|
|
||||||
perform,
|
|
||||||
type JobsTable,
|
|
||||||
execute,
|
|
||||||
formatJobResults,
|
|
||||||
logExecutingJobs,
|
|
||||||
type Job
|
|
||||||
} from "../../src/canary/scheduler";
|
|
||||||
|
|
||||||
const getMocks = () => {
|
|
||||||
const mockLogger: Logger = {
|
|
||||||
log: mock(),
|
|
||||||
error: mock()
|
|
||||||
};
|
|
||||||
return { mockLogger };
|
|
||||||
};
|
|
||||||
|
|
||||||
const schedule = {
|
|
||||||
every: 200,
|
|
||||||
jitter: 100
|
|
||||||
};
|
|
||||||
|
|
||||||
test("logging", () => {
|
|
||||||
const { mockLogger } = getMocks();
|
|
||||||
const jobs: Job[] = [
|
|
||||||
{ id: "1", toString: () => "Job 1", execute: mock(), schedule, maxRetries: 3 },
|
|
||||||
{ id: "2", toString: () => "Job 2", execute: mock(), schedule, maxRetries: 3 }
|
|
||||||
];
|
|
||||||
const now = new Date("2023-01-01T00:00:00Z");
|
|
||||||
|
|
||||||
logExecutingJobs(jobs, now, mockLogger);
|
|
||||||
|
|
||||||
expect(mockLogger.log).toHaveBeenCalledWith("Executing Job 1|Job 2 at Sun, 01 Jan 2023 00:00:00 GMT");
|
|
||||||
});
|
|
||||||
|
|
||||||
test("should separate jobs into successful and failed executions", async () => {
|
|
||||||
const job1: Job = {
|
|
||||||
id: "1",
|
|
||||||
toString: () => "Job 1",
|
|
||||||
execute: mock(() => TE.right("Result 1") as any),
|
|
||||||
schedule,
|
|
||||||
maxRetries: 3
|
|
||||||
};
|
|
||||||
const job2: Job = {
|
|
||||||
id: "2",
|
|
||||||
toString: () => "Job 2",
|
|
||||||
execute: mock(() => TE.left(new Error("Failure 2")) as any),
|
|
||||||
schedule,
|
|
||||||
maxRetries: 3
|
|
||||||
};
|
|
||||||
const jobs: Job[] = [job1, job2];
|
|
||||||
|
|
||||||
const result = await execute(jobs)();
|
|
||||||
|
|
||||||
expect(result.left).toEqual([[job2, new Error("Failure 2")]]);
|
|
||||||
expect(result.right).toEqual([[job1, "Result 1"]]);
|
|
||||||
});
|
|
||||||
|
|
||||||
test("should format job results correctly", () => {
|
|
||||||
const jobsTable: JobsTable = new Map([["1", { scheduled: new Date("2023-01-01T00:00:00Z"), retries: 1 }]]);
|
|
||||||
const left = [
|
|
||||||
[{ id: "1", toString: () => "Job 1", execute: mock(), schedule: {}, maxRetries: 3 }, new Error("Error 1")]
|
|
||||||
];
|
|
||||||
const right = [[{ id: "2", toString: () => "Job 2", execute: mock(), schedule: {}, maxRetries: 3 }, "Success 2"]];
|
|
||||||
|
|
||||||
const result = formatJobResults(jobsTable, { left, right } as any);
|
|
||||||
|
|
||||||
expect(result).toContain("Job 1 | 1 / 3 | (retry) :/ | Error 1");
|
|
||||||
expect(result).toContain("Job 2 | (success) :) | Success 2");
|
|
||||||
});
|
|
||||||
|
|
||||||
test("should update jobsTable and lastPingAck correctly", async () => {
|
|
||||||
const jobsTable: JobsTable = new Map([["1", { scheduled: new Date("2023-01-01T00:00:00Z"), retries: 1 }]]);
|
|
||||||
const jobs: Job[] = [
|
|
||||||
{
|
|
||||||
id: "1",
|
|
||||||
toString: () => "Job 1",
|
|
||||||
execute: mock(() => TE.right("Result 1") as any),
|
|
||||||
schedule,
|
|
||||||
maxRetries: 3
|
|
||||||
}
|
|
||||||
];
|
|
||||||
const publishers: any = [];
|
|
||||||
const lastPingAck = new Date("2023-01-01T00:00:00Z");
|
|
||||||
|
|
||||||
const result = await perform(jobsTable, jobs, publishers, lastPingAck)();
|
|
||||||
|
|
||||||
expect(result.lastPingAck).not.toEqual(lastPingAck);
|
|
||||||
expect(result.jobsTable.get("1")).toEqual({
|
|
||||||
retries: 0,
|
|
||||||
scheduled: expect.any(Date)
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
test("should update a job with retry count on failure", async () => {
|
|
||||||
// Create a mock job that fails the first time but succeeds the second time
|
|
||||||
const job1: Job = {
|
|
||||||
id: "1",
|
|
||||||
toString: () => "Job 1",
|
|
||||||
execute: mock(() => TE.left(new Error("Error 1")) as any),
|
|
||||||
schedule,
|
|
||||||
maxRetries: 2
|
|
||||||
};
|
|
||||||
|
|
||||||
const jobsTable: JobsTable = new Map([["1", { scheduled: new Date("2023-01-01T00:00:00Z"), retries: 0 }]]);
|
|
||||||
const jobs: Job[] = [job1];
|
|
||||||
const publishers: any = [];
|
|
||||||
const lastPingAck = new Date("2023-01-01T00:00:00Z");
|
|
||||||
|
|
||||||
const result = await perform(jobsTable, jobs, publishers, lastPingAck, 24 * 60 * 60 * 1000)();
|
|
||||||
|
|
||||||
// Assert the job was retried once and then succeeded
|
|
||||||
expect(job1.execute).toHaveBeenCalled();
|
|
||||||
|
|
||||||
// Check the jobsTable for the updated state
|
|
||||||
expect(result.jobsTable.get("1")).toEqual({
|
|
||||||
retries: 1,
|
|
||||||
scheduled: expect.any(Date)
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
test("should reschedule a job that hits max retries", async () => {
|
|
||||||
// Create a mock job that fails the first time but succeeds the second time
|
|
||||||
const job1: Job = {
|
|
||||||
id: "1",
|
|
||||||
toString: () => "Job 1",
|
|
||||||
execute: mock().mockReturnValue(TE.left(new Error("Error 1"))),
|
|
||||||
schedule,
|
|
||||||
maxRetries: 4
|
|
||||||
};
|
|
||||||
|
|
||||||
const jobsTable: JobsTable = new Map([["1", { scheduled: new Date("2023-01-01T00:00:00Z"), retries: 4 }]]);
|
|
||||||
const jobs: Job[] = [job1];
|
|
||||||
const publishers: any = [mock().mockReturnValue(TE.right(200))];
|
|
||||||
|
|
||||||
const lastPingAck = new Date("2023-01-01T00:00:00Z");
|
|
||||||
|
|
||||||
const now = Date.now();
|
|
||||||
const result = await perform(jobsTable, jobs, publishers, lastPingAck, 24 * 60 * 60 * 1000)();
|
|
||||||
const delta = Date.now() - now;
|
|
||||||
|
|
||||||
// Assert the job was retried once and then fail
|
|
||||||
expect(job1.execute).toHaveBeenCalled();
|
|
||||||
|
|
||||||
// Check the jobsTable for the updated state
|
|
||||||
const { retries, scheduled } = result.jobsTable.get("1")!;
|
|
||||||
expect(retries).toEqual(0);
|
|
||||||
expect(publishers[0]).toHaveBeenCalled();
|
|
||||||
|
|
||||||
expect(scheduled.getTime()).toBeGreaterThan(now - delta + schedule.every);
|
|
||||||
expect(scheduled.getTime()).toBeLessThan(now + delta + schedule.every + schedule.jitter);
|
|
||||||
});
|
|
||||||
|
|
||||||
test("should not publish only successes when should not ack", async () => {
|
|
||||||
// Create a mock job that fails the first time but succeeds the second time
|
|
||||||
const job1: Job = {
|
|
||||||
id: "1",
|
|
||||||
toString: () => "Job 1",
|
|
||||||
execute: mock().mockReturnValue(TE.right(new Error("Error 1"))),
|
|
||||||
schedule,
|
|
||||||
maxRetries: 4
|
|
||||||
};
|
|
||||||
|
|
||||||
const jobsTable: JobsTable = new Map([["1", { scheduled: new Date("2023-01-01T00:00:00Z"), retries: 0 }]]);
|
|
||||||
const jobs: Job[] = [job1];
|
|
||||||
const publishers: any = [mock().mockReturnValue(TE.right(200))];
|
|
||||||
const lastPingAck = new Date();
|
|
||||||
|
|
||||||
await perform(jobsTable, jobs, publishers, lastPingAck, 24 * 60 * 60 * 1000)();
|
|
||||||
|
|
||||||
expect(job1.execute).toHaveBeenCalled();
|
|
||||||
expect(publishers[0]).toHaveBeenCalledTimes(0);
|
|
||||||
});
|
|
||||||
|
|
||||||
test("should publish when should ack", async () => {
|
|
||||||
// Create a mock job that fails the first time but succeeds the second time
|
|
||||||
const job1: Job = {
|
|
||||||
id: "1",
|
|
||||||
toString: () => "Job 1",
|
|
||||||
execute: mock().mockReturnValue(TE.right(new Error("Error 1"))),
|
|
||||||
schedule,
|
|
||||||
maxRetries: 4
|
|
||||||
};
|
|
||||||
|
|
||||||
const jobsTable: JobsTable = new Map([["1", { scheduled: new Date("2023-01-01T00:00:00Z"), retries: 0 }]]);
|
|
||||||
const jobs: Job[] = [job1];
|
|
||||||
const publishers: any = [mock().mockReturnValue(TE.right(200))];
|
|
||||||
const lastPingAck = new Date("2023-01-01T00:00:00Z");
|
|
||||||
|
|
||||||
await perform(jobsTable, jobs, publishers, lastPingAck, 24 * 60 * 60 * 1000)();
|
|
||||||
|
|
||||||
expect(job1.execute).toHaveBeenCalled();
|
|
||||||
expect(publishers[0]).toHaveBeenCalled();
|
|
||||||
});
|
|
Loading…
Reference in New Issue