finish up the scheduler
This commit is contained in:
parent
3e7def3a26
commit
8f584b5b05
|
@ -0,0 +1,18 @@
|
||||||
|
{
|
||||||
|
"semi": true,
|
||||||
|
"singleQuote": false,
|
||||||
|
"trailingComma": "none",
|
||||||
|
"useTabs": false,
|
||||||
|
"printWidth": 120,
|
||||||
|
"endOfLine": "lf",
|
||||||
|
"overrides": [
|
||||||
|
{
|
||||||
|
"files": ["**/*.scss"],
|
||||||
|
"options": {
|
||||||
|
"useTabs": false,
|
||||||
|
"tabWidth": 2,
|
||||||
|
"singleQuote": true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
|
@ -4,21 +4,13 @@ import * as O from "fp-ts/lib/Option";
|
||||||
import { createTransport } from "nodemailer";
|
import { createTransport } from "nodemailer";
|
||||||
import { toError } from "fp-ts/lib/Either";
|
import { toError } from "fp-ts/lib/Either";
|
||||||
import { pipe } from "fp-ts/lib/function";
|
import { pipe } from "fp-ts/lib/function";
|
||||||
import {
|
import { ImapFlow, type FetchMessageObject, type FetchQueryObject, type MailboxLockObject } from "imapflow";
|
||||||
ImapFlow,
|
|
||||||
type FetchMessageObject,
|
|
||||||
type FetchQueryObject,
|
|
||||||
type MailboxLockObject,
|
|
||||||
} from "imapflow";
|
|
||||||
import * as IO from "fp-ts/lib/IO";
|
import * as IO from "fp-ts/lib/IO";
|
||||||
import * as T from "fp-ts/lib/Task";
|
import * as T from "fp-ts/lib/Task";
|
||||||
import { ConsoleLogger } from "../util";
|
import { ConsoleLogger } from "../util";
|
||||||
|
|
||||||
interface ImapClientI {
|
interface ImapClientI {
|
||||||
fetchAll: (
|
fetchAll: (range: string, options: FetchQueryObject) => Promise<FetchMessageObject[]>;
|
||||||
range: string,
|
|
||||||
options: FetchQueryObject,
|
|
||||||
) => Promise<FetchMessageObject[]>;
|
|
||||||
connect: () => Promise<void>;
|
connect: () => Promise<void>;
|
||||||
getMailboxLock: (mailbox: string) => Promise<MailboxLockObject>;
|
getMailboxLock: (mailbox: string) => Promise<MailboxLockObject>;
|
||||||
messageDelete: (uids: number[]) => Promise<boolean>;
|
messageDelete: (uids: number[]) => Promise<boolean>;
|
||||||
|
@ -40,10 +32,7 @@ class ErrorWithLock extends Error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
const ToErrorWithLock = (lock?: MailboxLockObject) => (error: unknown) =>
|
const ToErrorWithLock = (lock?: MailboxLockObject) => (error: unknown) =>
|
||||||
new ErrorWithLock(
|
new ErrorWithLock(error instanceof Error ? error.message : "Unknown error", lock);
|
||||||
error instanceof Error ? error.message : "Unknown error",
|
|
||||||
lock,
|
|
||||||
);
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Generate a unique email.
|
* Generate a unique email.
|
||||||
|
@ -51,42 +40,31 @@ const ToErrorWithLock = (lock?: MailboxLockObject) => (error: unknown) =>
|
||||||
* @param to is the email to send to.
|
* @param to is the email to send to.
|
||||||
* @returns an {@link Email}.
|
* @returns an {@link Email}.
|
||||||
*/
|
*/
|
||||||
type EmailGenerator = (
|
type EmailGenerator = (from: EmailFromInstruction, to: EmailToInstruction) => IO.IO<Email>;
|
||||||
from: EmailFromInstruction,
|
const generateEmail: EmailGenerator = (from: EmailFromInstruction, to: EmailToInstruction) => () => ({
|
||||||
to: EmailToInstruction,
|
from: from.email,
|
||||||
) => IO.IO<Email>;
|
to: to.email,
|
||||||
const generateEmail: EmailGenerator =
|
subject: [new Date().toISOString(), crypto.randomUUID()].join(" | "),
|
||||||
(from: EmailFromInstruction, to: EmailToInstruction) => () => ({
|
text: crypto.randomUUID()
|
||||||
from: from.email,
|
});
|
||||||
to: to.email,
|
|
||||||
subject: [new Date().toISOString(), crypto.randomUUID()].join(" | "),
|
|
||||||
text: crypto.randomUUID(),
|
|
||||||
});
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the transport layer for a mailbox to send a piece of mail.
|
* Get the transport layer for a mailbox to send a piece of mail.
|
||||||
* @param param0 is the mailbox to send from.
|
* @param param0 is the mailbox to send from.
|
||||||
* @returns a function that takes an email and sends it.
|
* @returns a function that takes an email and sends it.
|
||||||
*/
|
*/
|
||||||
type GetSendEmail = (
|
type GetSendEmail = (from: EmailFromInstruction) => (email: Email) => TE.TaskEither<Error, Email>;
|
||||||
from: EmailFromInstruction,
|
const getSendTransport: GetSendEmail = ({ username, password, server, send_port }) => {
|
||||||
) => (email: Email) => TE.TaskEither<Error, Email>;
|
|
||||||
const getSendTransport: GetSendEmail = ({
|
|
||||||
username,
|
|
||||||
password,
|
|
||||||
server,
|
|
||||||
send_port,
|
|
||||||
}) => {
|
|
||||||
const transport = createTransport({
|
const transport = createTransport({
|
||||||
host: server,
|
host: server,
|
||||||
port: send_port,
|
port: send_port,
|
||||||
auth: {
|
auth: {
|
||||||
user: username,
|
user: username,
|
||||||
pass: password,
|
pass: password
|
||||||
},
|
},
|
||||||
tls: {
|
tls: {
|
||||||
rejectUnauthorized: false,
|
rejectUnauthorized: false
|
||||||
},
|
}
|
||||||
});
|
});
|
||||||
return (email: Email) =>
|
return (email: Email) =>
|
||||||
TE.tryCatch(
|
TE.tryCatch(
|
||||||
|
@ -98,9 +76,9 @@ const getSendTransport: GetSendEmail = ({
|
||||||
} else {
|
} else {
|
||||||
resolve(email);
|
resolve(email);
|
||||||
}
|
}
|
||||||
}),
|
})
|
||||||
),
|
),
|
||||||
toError,
|
toError
|
||||||
);
|
);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -109,9 +87,7 @@ const getSendTransport: GetSendEmail = ({
|
||||||
* @param param0 is the mailbox to read from.
|
* @param param0 is the mailbox to read from.
|
||||||
* @returns a Right({@link ImapFlow}) if it connected, else an Left(error).
|
* @returns a Right({@link ImapFlow}) if it connected, else an Left(error).
|
||||||
*/
|
*/
|
||||||
type GetImapClient = (
|
type GetImapClient = (to: EmailToInstruction) => TE.TaskEither<Error, ImapClientI>;
|
||||||
to: EmailToInstruction,
|
|
||||||
) => TE.TaskEither<Error, ImapClientI>;
|
|
||||||
const getImap: GetImapClient = ({ username, password, server, read_port }) => {
|
const getImap: GetImapClient = ({ username, password, server, read_port }) => {
|
||||||
const imap = new ImapFlow({
|
const imap = new ImapFlow({
|
||||||
logger: false,
|
logger: false,
|
||||||
|
@ -120,8 +96,8 @@ const getImap: GetImapClient = ({ username, password, server, read_port }) => {
|
||||||
secure: true,
|
secure: true,
|
||||||
auth: {
|
auth: {
|
||||||
user: username,
|
user: username,
|
||||||
pass: password,
|
pass: password
|
||||||
},
|
}
|
||||||
});
|
});
|
||||||
return TE.tryCatch(() => imap.connect().then(() => imap), toError);
|
return TE.tryCatch(() => imap.connect().then(() => imap), toError);
|
||||||
};
|
};
|
||||||
|
@ -130,18 +106,16 @@ const getImap: GetImapClient = ({ username, password, server, read_port }) => {
|
||||||
* @param imap is the Imap client to fetch messages from.
|
* @param imap is the Imap client to fetch messages from.
|
||||||
* @returns a Right({@link FetchMessageObject}[]) if successful, else a Left(error).
|
* @returns a Right({@link FetchMessageObject}[]) if successful, else a Left(error).
|
||||||
*/
|
*/
|
||||||
const fetchMessages = (
|
const fetchMessages = (imap: ImapClientI): TE.TaskEither<Error, FetchMessageObject[]> =>
|
||||||
imap: ImapClientI,
|
|
||||||
): TE.TaskEither<Error, FetchMessageObject[]> =>
|
|
||||||
TE.tryCatch(
|
TE.tryCatch(
|
||||||
() =>
|
() =>
|
||||||
imap.fetchAll("*", {
|
imap.fetchAll("*", {
|
||||||
uid: true,
|
uid: true,
|
||||||
envelope: true,
|
envelope: true,
|
||||||
headers: true,
|
headers: true,
|
||||||
bodyParts: ["text"],
|
bodyParts: ["text"]
|
||||||
}),
|
}),
|
||||||
toError,
|
toError
|
||||||
);
|
);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -152,8 +126,7 @@ const fetchMessages = (
|
||||||
type EmailMatcher = (email: Email) => (message: FetchMessageObject) => boolean;
|
type EmailMatcher = (email: Email) => (message: FetchMessageObject) => boolean;
|
||||||
const matchesEmail: EmailMatcher = (email) => (message) => {
|
const matchesEmail: EmailMatcher = (email) => (message) => {
|
||||||
const subjectMatches = email.subject === message.envelope.subject;
|
const subjectMatches = email.subject === message.envelope.subject;
|
||||||
const bodyMatches =
|
const bodyMatches = message.bodyParts.get("text")?.toString().trim() === email.text.trim();
|
||||||
message.bodyParts.get("text")?.toString().trim() === email.text.trim();
|
|
||||||
const headers = message.headers.toLocaleString();
|
const headers = message.headers.toLocaleString();
|
||||||
const fromMatches = headers.includes(`Return-Path: <${email.from}>`);
|
const fromMatches = headers.includes(`Return-Path: <${email.from}>`);
|
||||||
const toMatches = headers.includes(`Delivered-To: ${email.to}`);
|
const toMatches = headers.includes(`Delivered-To: ${email.to}`);
|
||||||
|
@ -172,14 +145,9 @@ type FindEmailUidInInbox = (
|
||||||
imap: ImapClientI,
|
imap: ImapClientI,
|
||||||
equalsEmail: (message: FetchMessageObject) => boolean,
|
equalsEmail: (message: FetchMessageObject) => boolean,
|
||||||
retries: number,
|
retries: number,
|
||||||
pollIntervalMs: number,
|
pollIntervalMs: number
|
||||||
) => TE.TaskEither<Error, number>;
|
) => TE.TaskEither<Error, number>;
|
||||||
const findEmailUidInInbox: FindEmailUidInInbox = (
|
const findEmailUidInInbox: FindEmailUidInInbox = (imap, equalsEmail, retries, pollIntervalMs) =>
|
||||||
imap,
|
|
||||||
equalsEmail,
|
|
||||||
retries,
|
|
||||||
pollIntervalMs,
|
|
||||||
) =>
|
|
||||||
pipe(
|
pipe(
|
||||||
fetchMessages(imap),
|
fetchMessages(imap),
|
||||||
TE.flatMap((messages) => {
|
TE.flatMap((messages) => {
|
||||||
|
@ -193,17 +161,11 @@ const findEmailUidInInbox: FindEmailUidInInbox = (
|
||||||
(e) =>
|
(e) =>
|
||||||
pipe(
|
pipe(
|
||||||
TE.fromIO(ConsoleLogger.log(`failed; ${retries} retries left.`)),
|
TE.fromIO(ConsoleLogger.log(`failed; ${retries} retries left.`)),
|
||||||
TE.chain(() =>
|
TE.chain(() => (retries === 0 ? TE.left(e) : T.delay(pollIntervalMs)(TE.right(null)))),
|
||||||
retries === 0
|
TE.chain(() => findEmailUidInInbox(imap, equalsEmail, retries - 1, pollIntervalMs))
|
||||||
? TE.left(e)
|
|
||||||
: T.delay(pollIntervalMs)(TE.right(null)),
|
|
||||||
),
|
|
||||||
TE.chain(() =>
|
|
||||||
findEmailUidInInbox(imap, equalsEmail, retries - 1, pollIntervalMs),
|
|
||||||
),
|
|
||||||
),
|
),
|
||||||
TE.of,
|
TE.of
|
||||||
),
|
)
|
||||||
);
|
);
|
||||||
|
|
||||||
export type EmailJobDependencies = {
|
export type EmailJobDependencies = {
|
||||||
|
@ -225,40 +187,31 @@ export const perform = (
|
||||||
getSendImpl = getSendTransport,
|
getSendImpl = getSendTransport,
|
||||||
getImapImpl = getImap,
|
getImapImpl = getImap,
|
||||||
findEmailUidInInboxImpl = findEmailUidInInbox,
|
findEmailUidInInboxImpl = findEmailUidInInbox,
|
||||||
matchesEmailImpl = matchesEmail,
|
matchesEmailImpl = matchesEmail
|
||||||
}: Partial<EmailJobDependencies> = {},
|
}: Partial<EmailJobDependencies> = {}
|
||||||
): TE.TaskEither<Error, boolean> =>
|
): TE.TaskEither<Error, boolean> =>
|
||||||
pipe(
|
pipe(
|
||||||
// arrange.
|
// arrange.
|
||||||
TE.fromIO(generateEmailImpl(from, to)),
|
TE.fromIO(generateEmailImpl(from, to)),
|
||||||
TE.bindTo("email"),
|
TE.bindTo("email"),
|
||||||
// act.
|
// act.
|
||||||
TE.tap(({ email }) =>
|
TE.tap(({ email }) => pipe(getSendImpl(from)(email), TE.mapLeft(ToErrorWithLock()))),
|
||||||
pipe(getSendImpl(from)(email), TE.mapLeft(ToErrorWithLock())),
|
|
||||||
),
|
|
||||||
TE.bind("imap", () => pipe(getImapImpl(to), TE.mapLeft(ToErrorWithLock()))),
|
TE.bind("imap", () => pipe(getImapImpl(to), TE.mapLeft(ToErrorWithLock()))),
|
||||||
TE.bind("mailboxLock", ({ imap }) =>
|
TE.bind("mailboxLock", ({ imap }) => TE.tryCatch(() => imap.getMailboxLock("INBOX"), ToErrorWithLock())),
|
||||||
TE.tryCatch(() => imap.getMailboxLock("INBOX"), ToErrorWithLock()),
|
|
||||||
),
|
|
||||||
// "assert".
|
// "assert".
|
||||||
TE.bind("uid", ({ imap, email, mailboxLock }) =>
|
TE.bind("uid", ({ imap, email, mailboxLock }) =>
|
||||||
pipe(
|
pipe(
|
||||||
findEmailUidInInboxImpl(
|
findEmailUidInInboxImpl(imap, matchesEmailImpl(email), retries, interval),
|
||||||
imap,
|
TE.mapLeft(ToErrorWithLock(mailboxLock))
|
||||||
matchesEmailImpl(email),
|
)
|
||||||
retries,
|
|
||||||
interval,
|
|
||||||
),
|
|
||||||
TE.mapLeft(ToErrorWithLock(mailboxLock)),
|
|
||||||
),
|
|
||||||
),
|
),
|
||||||
// cleanup.
|
// cleanup.
|
||||||
TE.bind("deleted", ({ imap, uid, mailboxLock }) =>
|
TE.bind("deleted", ({ imap, uid, mailboxLock }) =>
|
||||||
TE.tryCatch(
|
TE.tryCatch(
|
||||||
// () => imap.messageDelete([uid], { uid: true }),
|
// () => imap.messageDelete([uid], { uid: true }),
|
||||||
() => imap.messageDelete([uid]),
|
() => imap.messageDelete([uid]),
|
||||||
ToErrorWithLock(mailboxLock),
|
ToErrorWithLock(mailboxLock)
|
||||||
),
|
)
|
||||||
),
|
),
|
||||||
TE.fold(
|
TE.fold(
|
||||||
(e) => {
|
(e) => {
|
||||||
|
@ -270,6 +223,6 @@ export const perform = (
|
||||||
({ mailboxLock, deleted }) => {
|
({ mailboxLock, deleted }) => {
|
||||||
mailboxLock.release();
|
mailboxLock.release();
|
||||||
return TE.right(deleted);
|
return TE.right(deleted);
|
||||||
},
|
}
|
||||||
),
|
)
|
||||||
);
|
);
|
||||||
|
|
|
@ -38,4 +38,4 @@ export interface EmailJob {
|
||||||
readRetry: Retry;
|
readRetry: Retry;
|
||||||
}
|
}
|
||||||
|
|
||||||
export type Job = EmailJob | PingJob | HealthCheckJob | DnsJob;
|
export type TestJob = EmailJob | PingJob | HealthCheckJob | DnsJob;
|
||||||
|
|
|
@ -0,0 +1,190 @@
|
||||||
|
import * as TE from "fp-ts/lib/TaskEither";
|
||||||
|
import * as IO from "fp-ts/lib/IO";
|
||||||
|
import * as RA from "fp-ts/lib/ReadonlyArray";
|
||||||
|
import * as RM from "fp-ts/lib/ReadonlyMap";
|
||||||
|
import * as T from "fp-ts/Task";
|
||||||
|
import * as S from "fp-ts/string";
|
||||||
|
import { pipe, identity } from "fp-ts/lib/function";
|
||||||
|
import type { Separated } from "fp-ts/lib/Separated";
|
||||||
|
import type { Magma } from "fp-ts/lib/Magma";
|
||||||
|
import { intercalate } from "fp-ts/lib/Foldable";
|
||||||
|
import { nextSchedule, type Schedule } from "../canary";
|
||||||
|
import { ConsoleLogger, type Logger } from "../util";
|
||||||
|
|
||||||
|
export interface Job {
|
||||||
|
id: string;
|
||||||
|
toString: () => string;
|
||||||
|
execute: <TResult>() => TE.TaskEither<Error, TResult>;
|
||||||
|
schedule: Schedule;
|
||||||
|
maxRetries: number;
|
||||||
|
}
|
||||||
|
|
||||||
|
export const logExecutingJobs = (jobs: ReadonlyArray<Job>, now: Date, logger: Logger = ConsoleLogger) => {
|
||||||
|
const stringifieds = pipe(
|
||||||
|
jobs,
|
||||||
|
RA.map(({ toString }) => toString()),
|
||||||
|
(stringified) => intercalate(S.Monoid, RA.Foldable)("|", stringified)
|
||||||
|
);
|
||||||
|
return logger.log(`Executing ${stringifieds} at ${now.toUTCString()}`);
|
||||||
|
};
|
||||||
|
|
||||||
|
export const execute = <TResult>(
|
||||||
|
jobs: ReadonlyArray<Job>
|
||||||
|
): T.Task<Separated<ReadonlyArray<[Job, Error]>, ReadonlyArray<[Job, TResult]>>> =>
|
||||||
|
pipe(
|
||||||
|
T.of(jobs),
|
||||||
|
T.map(
|
||||||
|
RA.map((job) =>
|
||||||
|
pipe(
|
||||||
|
job.execute(),
|
||||||
|
TE.bimap(
|
||||||
|
(error) => [job, error] as [Job, Error],
|
||||||
|
(result) => [job, result] as [Job, TResult]
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
),
|
||||||
|
T.flatMap(RA.wilt(T.ApplicativePar)(identity))
|
||||||
|
);
|
||||||
|
|
||||||
|
export type JobsTableRow = {
|
||||||
|
scheduled: Date;
|
||||||
|
retries: number;
|
||||||
|
};
|
||||||
|
export type JobsTable = ReadonlyMap<string, JobsTableRow>;
|
||||||
|
|
||||||
|
export const jobsTableRowMagma: Magma<JobsTableRow> = {
|
||||||
|
concat: (a: JobsTableRow, b: JobsTableRow): JobsTableRow => {
|
||||||
|
if (a.scheduled <= b.scheduled) {
|
||||||
|
// always prefer the later schedule.
|
||||||
|
return b;
|
||||||
|
}
|
||||||
|
// if (a.retries <= b.retries) {
|
||||||
|
// return a;
|
||||||
|
// }
|
||||||
|
return b;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
export const formatJobResults = <TResult>(
|
||||||
|
jobsTable: JobsTable,
|
||||||
|
{ left, right }: Separated<ReadonlyArray<[Job, Error]>, ReadonlyArray<[Job, TResult]>>
|
||||||
|
) => {
|
||||||
|
const failures = left
|
||||||
|
.map(([job, err]) => ({
|
||||||
|
job,
|
||||||
|
retries: jobsTable.get(job.id)?.retries ?? job.maxRetries,
|
||||||
|
err
|
||||||
|
}))
|
||||||
|
.filter(({ job, retries }) => retries >= job.maxRetries);
|
||||||
|
const retries = left
|
||||||
|
.map(([job, err]) => ({
|
||||||
|
job,
|
||||||
|
retries: jobsTable.get(job.id)?.retries ?? 0,
|
||||||
|
err
|
||||||
|
}))
|
||||||
|
.filter(({ job, retries }) => retries < job.maxRetries);
|
||||||
|
|
||||||
|
const failureMessages = failures
|
||||||
|
.map(({ job, err, retries }) => job.toString() + ` | ${retries} / ${job.maxRetries} | (err) :( | ${err.message}`)
|
||||||
|
.join("\n");
|
||||||
|
const retryMessages = retries
|
||||||
|
.map(
|
||||||
|
({ job, err, retries }) => job.toString() + ` | ${retries} / ${job.maxRetries} | (retry) :/ | ${err.message}`
|
||||||
|
)
|
||||||
|
.join("\n");
|
||||||
|
const successMessages = right.map(([job, val]) => job.toString() + " | (success) :) | " + val).join("\n");
|
||||||
|
return `FAILURES:\n${failureMessages}\nRETRIES:\n${retryMessages}\nSUCCESSES:\n${successMessages}`;
|
||||||
|
};
|
||||||
|
|
||||||
|
export const perform = (
|
||||||
|
jobsTable: JobsTable,
|
||||||
|
jobs: ReadonlyArray<Job>,
|
||||||
|
publishers: ReadonlyArray<(message: string) => TE.TaskEither<Error, number>>,
|
||||||
|
lastPingAck: Date,
|
||||||
|
publishAckEvery = 24 * (60 * 60 * 1_000),
|
||||||
|
logger = ConsoleLogger
|
||||||
|
) =>
|
||||||
|
pipe(
|
||||||
|
T.of(jobs),
|
||||||
|
T.bindTo("jobs"),
|
||||||
|
T.bind(
|
||||||
|
"now",
|
||||||
|
T.fromIOK(() => IO.of(new Date()))
|
||||||
|
),
|
||||||
|
T.bind("toExecute", ({ jobs, now }) =>
|
||||||
|
pipe(
|
||||||
|
jobs,
|
||||||
|
RA.filter((job) => (jobsTable.get(job.id)?.scheduled ?? now) <= now),
|
||||||
|
T.of
|
||||||
|
)
|
||||||
|
),
|
||||||
|
T.tap(({ toExecute, now }) => T.fromIO(logExecutingJobs(toExecute, now))),
|
||||||
|
T.bind("results", ({ toExecute }) => execute(toExecute)),
|
||||||
|
T.bind("shouldAck", ({ now }) => T.of(now.getTime() - lastPingAck.getTime() >= publishAckEvery)),
|
||||||
|
T.tap(({ results, shouldAck }) => {
|
||||||
|
if (results.left.length === 0 && !shouldAck) {
|
||||||
|
return pipe(
|
||||||
|
T.of({ right: [] as ReadonlyArray<number> }),
|
||||||
|
T.tap(() => T.fromIO(logger.log("not publishing")))
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
return pipe(
|
||||||
|
publishers,
|
||||||
|
RA.map((publish) => publish(formatJobResults(jobsTable, results))),
|
||||||
|
RA.wilt(T.ApplicativePar)(identity),
|
||||||
|
T.tap(({ left }) =>
|
||||||
|
pipe(
|
||||||
|
left
|
||||||
|
? logger.error("Encountered publishing errors: " + left.toString())
|
||||||
|
: logger.log("Published successfully"),
|
||||||
|
T.fromIO
|
||||||
|
)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}),
|
||||||
|
T.map(({ now, results: { left, right }, shouldAck }) => {
|
||||||
|
const ack = shouldAck ? now : lastPingAck;
|
||||||
|
|
||||||
|
const leftResults = pipe(
|
||||||
|
left,
|
||||||
|
RA.map(([job]): readonly [string, JobsTableRow] => {
|
||||||
|
const row = jobsTable.get(job.id);
|
||||||
|
if (!row) {
|
||||||
|
return [job.id, { retries: 1, scheduled: now }];
|
||||||
|
}
|
||||||
|
if (row.retries >= job.maxRetries) {
|
||||||
|
logger.error(`Hit max retries; scheduling again ;-; ${job.toString()}`);
|
||||||
|
return [job.id, { retries: 0, scheduled: nextSchedule(job.schedule, now) }] as const;
|
||||||
|
}
|
||||||
|
return [
|
||||||
|
job.id,
|
||||||
|
{
|
||||||
|
retries: row.retries + 1,
|
||||||
|
scheduled: now
|
||||||
|
}
|
||||||
|
] as const;
|
||||||
|
})
|
||||||
|
);
|
||||||
|
const results = pipe(
|
||||||
|
right,
|
||||||
|
RA.map(([job]): readonly [string, JobsTableRow] => [
|
||||||
|
job.id,
|
||||||
|
{
|
||||||
|
retries: 0,
|
||||||
|
scheduled: nextSchedule(job.schedule, now)
|
||||||
|
}
|
||||||
|
]),
|
||||||
|
RA.concat(leftResults)
|
||||||
|
);
|
||||||
|
|
||||||
|
const newJobsTable = pipe(
|
||||||
|
jobsTable,
|
||||||
|
RM.toReadonlyArray(S.Ord),
|
||||||
|
RA.concat(results),
|
||||||
|
RM.fromFoldable(S.Eq, jobsTableRowMagma, RA.Foldable)
|
||||||
|
);
|
||||||
|
return { lastPingAck: ack, jobsTable: newJobsTable };
|
||||||
|
})
|
||||||
|
);
|
|
@ -1,13 +1,13 @@
|
||||||
import * as TE from "fp-ts/lib/TaskEither";
|
import * as TE from "fp-ts/lib/TaskEither";
|
||||||
import * as O from "fp-ts/lib/Option";
|
import * as O from "fp-ts/lib/Option";
|
||||||
import type { Job } from "./job";
|
import type { TestJob } from "./job";
|
||||||
import type { D } from "../util";
|
import type { D } from "../util";
|
||||||
|
|
||||||
export enum TestType {
|
export enum TestType {
|
||||||
EMAIL = "email",
|
EMAIL = "email",
|
||||||
PING = "ping",
|
PING = "ping",
|
||||||
HEALTHCHECK = "healthcheck",
|
HEALTHCHECK = "healthcheck",
|
||||||
DNS = "dns",
|
DNS = "dns"
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface Schedule {
|
export interface Schedule {
|
||||||
|
@ -18,13 +18,14 @@ export interface Schedule {
|
||||||
export interface Test {
|
export interface Test {
|
||||||
name: string;
|
name: string;
|
||||||
type: TestType;
|
type: TestType;
|
||||||
job: Job;
|
job: TestJob;
|
||||||
schedule: Schedule;
|
schedule: Schedule;
|
||||||
}
|
}
|
||||||
|
|
||||||
export type Testable<T extends Job> = (job: T) => TE.TaskEither<Error, boolean>;
|
export type Testable<TJob extends TestJob> = (job: TJob) => TE.TaskEither<Error, boolean>;
|
||||||
|
|
||||||
export const parseTestType = (testType: string): O.Option<TestType> =>
|
export const parseTestType = (testType: string): O.Option<TestType> =>
|
||||||
Object.values(TestType).includes(testType as TestType)
|
Object.values(TestType).includes(testType as TestType) ? O.some(testType as TestType) : O.none;
|
||||||
? O.some(testType as TestType)
|
|
||||||
: O.none;
|
export const nextSchedule = (schedule: Schedule, date: Date) =>
|
||||||
|
new Date(date.getTime() + schedule.every + Math.random() * schedule.jitter);
|
||||||
|
|
|
@ -8,9 +8,6 @@ const main: TE.TaskEither<Error, void> = pipe(
|
||||||
TE.fromEither(parseArgs(Bun.argv)),
|
TE.fromEither(parseArgs(Bun.argv)),
|
||||||
TE.bindTo("args"),
|
TE.bindTo("args"),
|
||||||
TE.bind("config", ({ args }) => TE.fromIO(readConfig(args.testsFile))),
|
TE.bind("config", ({ args }) => TE.fromIO(readConfig(args.testsFile))),
|
||||||
TE.fold(
|
TE.map(({ config }) => TE.tryCatch(() => {}, toError))
|
||||||
(e) => TE.left(toError(e)),
|
|
||||||
() => TE.right(undefined),
|
|
||||||
),
|
|
||||||
);
|
);
|
||||||
main();
|
main();
|
||||||
|
|
|
@ -0,0 +1,19 @@
|
||||||
|
import { $ } from "bun";
|
||||||
|
|
||||||
|
export interface DiscordPost {
|
||||||
|
webhook: string;
|
||||||
|
role_id: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
export const publishDiscord = async (discordPost: DiscordPost, message: string) => {
|
||||||
|
console.log("Publishing to Discord");
|
||||||
|
const ip = await $`dig +noall +short discord.com @1.1.1.1 A | shuf -n 1`.text();
|
||||||
|
return fetch(discordPost.webhook.replace("discord.com", ip), {
|
||||||
|
headers: {
|
||||||
|
Host: "discord.com"
|
||||||
|
},
|
||||||
|
method: "POST",
|
||||||
|
body: JSON.stringify({ message: `<@${discordPost.role_id}>\n${message}` }),
|
||||||
|
tls: { rejectUnauthorized: false }
|
||||||
|
}).then((r) => r.status);
|
||||||
|
};
|
|
@ -1,23 +1,26 @@
|
||||||
import { D } from "../util";
|
import { toError } from "fp-ts/lib/Either";
|
||||||
|
import * as TE from "fp-ts/TaskEither";
|
||||||
|
import { publishDiscord, type DiscordPost } from "./discord";
|
||||||
|
import { publishNtfy, type NtfyPost } from "./ntfy";
|
||||||
|
|
||||||
export enum PublisherType {
|
export enum PublisherType {
|
||||||
DISCORD = "discord",
|
DISCORD = "discord",
|
||||||
NTFY = "ntfy",
|
NTFY = "ntfy"
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface DiscordPost {
|
|
||||||
webhook: string;
|
|
||||||
role_id: string;
|
|
||||||
}
|
|
||||||
|
|
||||||
export interface NtfyPost {
|
|
||||||
webhook: string;
|
|
||||||
}
|
|
||||||
|
|
||||||
export type PublisherPost = DiscordPost | NtfyPost;
|
export type PublisherPost = DiscordPost | NtfyPost;
|
||||||
|
|
||||||
export interface Publisher {
|
export interface Publisher {
|
||||||
type: PublisherType;
|
type: PublisherType;
|
||||||
at: D.Duration;
|
|
||||||
post: PublisherPost;
|
post: PublisherPost;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export const publish = (publisher: Publisher, message: string): TE.TaskEither<Error, number> => {
|
||||||
|
switch (publisher.type) {
|
||||||
|
case PublisherType.DISCORD:
|
||||||
|
return TE.tryCatch(() => publishDiscord(publisher.post as DiscordPost, message), toError);
|
||||||
|
case PublisherType.NTFY:
|
||||||
|
return TE.tryCatch(() => publishNtfy(publisher.post as NtfyPost, message), toError);
|
||||||
|
default:
|
||||||
|
return TE.left(new Error("unknown publisher type: " + publisher.type));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
|
@ -0,0 +1,11 @@
|
||||||
|
export interface NtfyPost {
|
||||||
|
webhook: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
export const publishNtfy = async (ntfyPost: NtfyPost, message: string) => {
|
||||||
|
console.log("publishing to Ntfy");
|
||||||
|
return fetch(ntfyPost.webhook, {
|
||||||
|
method: "POST",
|
||||||
|
body: message
|
||||||
|
}).then((r) => r.status);
|
||||||
|
};
|
|
@ -10,7 +10,7 @@ export enum DurationUnit {
|
||||||
MILLISECOND,
|
MILLISECOND,
|
||||||
SECOND,
|
SECOND,
|
||||||
MINUTE,
|
MINUTE,
|
||||||
HOUR,
|
HOUR
|
||||||
}
|
}
|
||||||
const durationUnitMap: Record<string, DurationUnit> = {
|
const durationUnitMap: Record<string, DurationUnit> = {
|
||||||
ms: DurationUnit.MILLISECOND,
|
ms: DurationUnit.MILLISECOND,
|
||||||
|
@ -21,17 +21,14 @@ const durationUnitMap: Record<string, DurationUnit> = {
|
||||||
minutes: DurationUnit.MINUTE,
|
minutes: DurationUnit.MINUTE,
|
||||||
hr: DurationUnit.HOUR,
|
hr: DurationUnit.HOUR,
|
||||||
hour: DurationUnit.HOUR,
|
hour: DurationUnit.HOUR,
|
||||||
hours: DurationUnit.HOUR,
|
hours: DurationUnit.HOUR
|
||||||
};
|
};
|
||||||
const getDurationUnit = (key: string): O.Option<DurationUnit> =>
|
const getDurationUnit = (key: string): O.Option<DurationUnit> => O.fromNullable(durationUnitMap[key.toLowerCase()]);
|
||||||
O.fromNullable(durationUnitMap[key.toLowerCase()]);
|
|
||||||
|
|
||||||
export const getMs = (duration: Duration): number => duration;
|
export const getMs = (duration: Duration): number => duration;
|
||||||
export const getSeconds = (duration: Duration): number => duration / 1000;
|
export const getSeconds = (duration: Duration): number => duration / 1000;
|
||||||
export const getMinutes = (duration: Duration): number =>
|
export const getMinutes = (duration: Duration): number => getSeconds(duration) / 60;
|
||||||
getSeconds(duration) / 60;
|
export const getHours = (duration: Duration): number => getMinutes(duration) / 60;
|
||||||
export const getHours = (duration: Duration): number =>
|
|
||||||
getMinutes(duration) / 60;
|
|
||||||
export const format = (duration: Duration): string => {
|
export const format = (duration: Duration): string => {
|
||||||
const ms = getMs(duration) % 1000;
|
const ms = getMs(duration) % 1000;
|
||||||
const seconds = getSeconds(duration) % 60;
|
const seconds = getSeconds(duration) % 60;
|
||||||
|
@ -39,9 +36,7 @@ export const format = (duration: Duration): string => {
|
||||||
const hours = getHours(duration);
|
const hours = getHours(duration);
|
||||||
|
|
||||||
return (
|
return (
|
||||||
[hours, minutes, seconds]
|
[hours, minutes, seconds].map((x) => Math.floor(x).toString().padStart(2, "0")).join(":") +
|
||||||
.map((x) => Math.floor(x).toString().padStart(2, "0"))
|
|
||||||
.join(":") +
|
|
||||||
"." +
|
"." +
|
||||||
ms.toString().padStart(3, "0")
|
ms.toString().padStart(3, "0")
|
||||||
);
|
);
|
||||||
|
@ -57,49 +52,40 @@ export const createDurationBuilder = (): DurationBuilder => ({
|
||||||
millis: 0,
|
millis: 0,
|
||||||
seconds: 0,
|
seconds: 0,
|
||||||
minutes: 0,
|
minutes: 0,
|
||||||
hours: 0,
|
hours: 0
|
||||||
});
|
});
|
||||||
|
|
||||||
export type DurationBuilderField<T> = (
|
export type DurationBuilderField<T> = (arg: T) => (builder: DurationBuilder) => DurationBuilder;
|
||||||
arg: T,
|
|
||||||
) => (builder: DurationBuilder) => DurationBuilder;
|
|
||||||
|
|
||||||
export const withMillis: DurationBuilderField<number> =
|
export const withMillis: DurationBuilderField<number> = (millis) => (builder) => ({
|
||||||
(millis) => (builder) => ({
|
...builder,
|
||||||
...builder,
|
millis
|
||||||
millis,
|
});
|
||||||
});
|
|
||||||
|
|
||||||
export const withSeconds: DurationBuilderField<number> =
|
export const withSeconds: DurationBuilderField<number> = (seconds) => (builder) => ({
|
||||||
(seconds) => (builder) => ({
|
...builder,
|
||||||
...builder,
|
seconds
|
||||||
seconds,
|
});
|
||||||
});
|
|
||||||
|
|
||||||
export const withMinutes: DurationBuilderField<number> =
|
export const withMinutes: DurationBuilderField<number> = (minutes) => (builder) => ({
|
||||||
(minutes) => (builder) => ({
|
...builder,
|
||||||
...builder,
|
minutes
|
||||||
minutes,
|
});
|
||||||
});
|
|
||||||
|
|
||||||
export const withHours: DurationBuilderField<number> =
|
export const withHours: DurationBuilderField<number> = (hours) => (builder) => ({
|
||||||
(hours) => (builder) => ({
|
...builder,
|
||||||
...builder,
|
hours
|
||||||
hours,
|
});
|
||||||
});
|
|
||||||
|
|
||||||
export const build = (builder: DurationBuilder): Duration =>
|
export const build = (builder: DurationBuilder): Duration =>
|
||||||
builder.millis +
|
builder.millis + builder.seconds * 1000 + builder.minutes * 60 * 1000 + builder.hours * 60 * 60 * 1000;
|
||||||
builder.seconds * 1000 +
|
|
||||||
builder.minutes * 60 * 1000 +
|
|
||||||
builder.hours * 60 * 60 * 1000;
|
|
||||||
|
|
||||||
export const parse = (duration: string): E.Either<string, Duration> => {
|
export const parse = (duration: string): E.Either<string, Duration> => {
|
||||||
const parts = pipe(
|
const parts = pipe(
|
||||||
duration,
|
duration,
|
||||||
S.split(" "),
|
S.split(" "),
|
||||||
R.map(S.trim),
|
R.map(S.trim),
|
||||||
R.filter((part) => !S.isEmpty(part)),
|
R.filter((part) => !S.isEmpty(part))
|
||||||
);
|
);
|
||||||
|
|
||||||
const valueUnitPairs = pipe(
|
const valueUnitPairs = pipe(
|
||||||
|
@ -120,36 +106,34 @@ export const parse = (duration: string): E.Either<string, Duration> => {
|
||||||
E.map(
|
E.map(
|
||||||
flow(
|
flow(
|
||||||
R.filter(O.isSome),
|
R.filter(O.isSome),
|
||||||
R.map(({ value }) => value),
|
R.map(({ value }) => value)
|
||||||
),
|
)
|
||||||
),
|
)
|
||||||
);
|
);
|
||||||
|
|
||||||
return pipe(
|
return pipe(
|
||||||
valueUnitPairs,
|
valueUnitPairs,
|
||||||
E.flatMap(
|
E.flatMap(
|
||||||
R.reduce(
|
R.reduce(E.of<string, DurationBuilder>(createDurationBuilder()), (builderEither, [unit, value]) =>
|
||||||
E.of<string, DurationBuilder>(createDurationBuilder()),
|
pipe(
|
||||||
(builderEither, [unit, value]) =>
|
builderEither,
|
||||||
pipe(
|
E.chain((builder) => {
|
||||||
builderEither,
|
switch (unit) {
|
||||||
E.chain((builder) => {
|
case DurationUnit.MILLISECOND:
|
||||||
switch (unit) {
|
return E.right(withMillis(value)(builder));
|
||||||
case DurationUnit.MILLISECOND:
|
case DurationUnit.SECOND:
|
||||||
return E.right(withMillis(value)(builder));
|
return E.right(withSeconds(value)(builder));
|
||||||
case DurationUnit.SECOND:
|
case DurationUnit.MINUTE:
|
||||||
return E.right(withSeconds(value)(builder));
|
return E.right(withMinutes(value)(builder));
|
||||||
case DurationUnit.MINUTE:
|
case DurationUnit.HOUR:
|
||||||
return E.right(withMinutes(value)(builder));
|
return E.right(withHours(value)(builder));
|
||||||
case DurationUnit.HOUR:
|
default:
|
||||||
return E.right(withHours(value)(builder));
|
return E.left(`unknown unit: ${unit}`);
|
||||||
default:
|
}
|
||||||
return E.left(`unknown unit: ${unit}`);
|
})
|
||||||
}
|
)
|
||||||
}),
|
)
|
||||||
),
|
|
||||||
),
|
|
||||||
),
|
),
|
||||||
E.map(build),
|
E.map(build)
|
||||||
);
|
);
|
||||||
};
|
};
|
||||||
|
|
|
@ -2,8 +2,10 @@ import type { IO } from "fp-ts/lib/IO";
|
||||||
|
|
||||||
export interface Logger {
|
export interface Logger {
|
||||||
log: (message: string) => IO<void>;
|
log: (message: string) => IO<void>;
|
||||||
|
error: (message: string) => IO<void>;
|
||||||
}
|
}
|
||||||
|
|
||||||
export const ConsoleLogger: Logger = {
|
export const ConsoleLogger: Logger = {
|
||||||
log: (message: string) => () => console.log(message),
|
log: (message: string) => () => console.log(new Date(), "[INFO]", message),
|
||||||
|
error: (message: string) => () => console.error(new Date(), "[ERROR]", message)
|
||||||
};
|
};
|
||||||
|
|
|
@ -1,93 +0,0 @@
|
||||||
import * as TE from "fp-ts/lib/TaskEither";
|
|
||||||
import * as IO from "fp-ts/lib/IO";
|
|
||||||
import * as RA from "fp-ts/lib/ReadonlyArray";
|
|
||||||
import * as RM from "fp-ts/lib/ReadonlyMap";
|
|
||||||
import * as T from "fp-ts/Task";
|
|
||||||
import * as S from "fp-ts/string";
|
|
||||||
import { contramap, type Eq } from "fp-ts/Eq";
|
|
||||||
import { pipe, identity } from "fp-ts/lib/function";
|
|
||||||
import type { Schedule } from "../canary";
|
|
||||||
import { ConsoleLogger, type Logger } from "./logger";
|
|
||||||
import type { Separated } from "fp-ts/lib/Separated";
|
|
||||||
import type { Magma } from "fp-ts/lib/Magma";
|
|
||||||
import { intercalate } from "fp-ts/lib/Foldable";
|
|
||||||
|
|
||||||
interface Unit {}
|
|
||||||
const Unit: Unit = {};
|
|
||||||
|
|
||||||
interface ScheduledJob {
|
|
||||||
id: string;
|
|
||||||
execute: <TResult>() => TE.TaskEither<Error, TResult>;
|
|
||||||
at: Date;
|
|
||||||
schedule: Schedule;
|
|
||||||
}
|
|
||||||
|
|
||||||
type SchedulerState = ReadonlyArray<ScheduledJob>;
|
|
||||||
|
|
||||||
const logState = (
|
|
||||||
state: SchedulerState,
|
|
||||||
now: Date,
|
|
||||||
logger: Logger = ConsoleLogger,
|
|
||||||
) => {
|
|
||||||
const ids = pipe(
|
|
||||||
state,
|
|
||||||
RA.map(({ id }) => id),
|
|
||||||
(ids) => intercalate(S.Monoid, RA.Foldable)("|", ids),
|
|
||||||
);
|
|
||||||
return logger.log(`Executing ${ids} at ${now.toUTCString()}`);
|
|
||||||
};
|
|
||||||
|
|
||||||
const executeDueJobs = <TResult>(
|
|
||||||
state: SchedulerState,
|
|
||||||
): IO.IO<
|
|
||||||
T.Task<
|
|
||||||
Separated<ReadonlyArray<[string, Error]>, ReadonlyArray<[string, TResult]>>
|
|
||||||
>
|
|
||||||
> =>
|
|
||||||
pipe(
|
|
||||||
IO.of(state),
|
|
||||||
IO.bindTo("state"),
|
|
||||||
IO.bind("now", () => () => new Date()),
|
|
||||||
IO.bind("toExecute", ({ now, state }) =>
|
|
||||||
pipe(IO.of(state), IO.map(RA.filter(({ at }) => at <= now))),
|
|
||||||
),
|
|
||||||
IO.flatMap(({ state, now }) =>
|
|
||||||
pipe(
|
|
||||||
IO.of(state),
|
|
||||||
IO.tap((state) => logState(state, now)),
|
|
||||||
IO.map((state) =>
|
|
||||||
pipe(
|
|
||||||
state,
|
|
||||||
RA.map(({ execute, id }) =>
|
|
||||||
pipe(
|
|
||||||
execute(),
|
|
||||||
TE.bimap(
|
|
||||||
(error) => [id, error] as [string, Error],
|
|
||||||
(result) => [id, result] as [string, TResult],
|
|
||||||
),
|
|
||||||
),
|
|
||||||
),
|
|
||||||
),
|
|
||||||
),
|
|
||||||
),
|
|
||||||
),
|
|
||||||
IO.map((jobs) => pipe(jobs, RA.wilt(T.ApplicativePar)(identity))),
|
|
||||||
);
|
|
||||||
|
|
||||||
const jobEq: Eq<ScheduledJob> = pipe(
|
|
||||||
S.Eq,
|
|
||||||
contramap(({ id }) => id),
|
|
||||||
);
|
|
||||||
const jobMagma: Magma<ScheduledJob> = {
|
|
||||||
concat: (a: ScheduledJob, b: ScheduledJob): ScheduledJob => ({
|
|
||||||
...a,
|
|
||||||
...b,
|
|
||||||
}),
|
|
||||||
};
|
|
||||||
|
|
||||||
export const schedulerLoop = (jobs: ReadonlyArray<ScheduledJob>) => {
|
|
||||||
const jobsTable: Map<
|
|
||||||
string,
|
|
||||||
{ retries: number; specification: ScheduledJob }
|
|
||||||
> = pipe(jobs, RM.fromFoldable(jobEq, jobMagma, RA.Foldable));
|
|
||||||
};
|
|
|
@ -0,0 +1,197 @@
|
||||||
|
import { mock, test, expect } from "bun:test";
|
||||||
|
import * as TE from "fp-ts/lib/TaskEither";
|
||||||
|
import type { Logger } from "../../src/util";
|
||||||
|
import {
|
||||||
|
perform,
|
||||||
|
type JobsTable,
|
||||||
|
execute,
|
||||||
|
formatJobResults,
|
||||||
|
logExecutingJobs,
|
||||||
|
type Job
|
||||||
|
} from "../../src/canary/scheduler";
|
||||||
|
|
||||||
|
const getMocks = () => {
|
||||||
|
const mockLogger: Logger = {
|
||||||
|
log: mock(),
|
||||||
|
error: mock()
|
||||||
|
};
|
||||||
|
return { mockLogger };
|
||||||
|
};
|
||||||
|
|
||||||
|
const schedule = {
|
||||||
|
every: 200,
|
||||||
|
jitter: 100
|
||||||
|
};
|
||||||
|
|
||||||
|
test("logging", () => {
|
||||||
|
const { mockLogger } = getMocks();
|
||||||
|
const jobs: Job[] = [
|
||||||
|
{ id: "1", toString: () => "Job 1", execute: mock(), schedule, maxRetries: 3 },
|
||||||
|
{ id: "2", toString: () => "Job 2", execute: mock(), schedule, maxRetries: 3 }
|
||||||
|
];
|
||||||
|
const now = new Date("2023-01-01T00:00:00Z");
|
||||||
|
|
||||||
|
logExecutingJobs(jobs, now, mockLogger);
|
||||||
|
|
||||||
|
expect(mockLogger.log).toHaveBeenCalledWith("Executing Job 1|Job 2 at Sun, 01 Jan 2023 00:00:00 GMT");
|
||||||
|
});
|
||||||
|
|
||||||
|
test("should separate jobs into successful and failed executions", async () => {
|
||||||
|
const job1: Job = {
|
||||||
|
id: "1",
|
||||||
|
toString: () => "Job 1",
|
||||||
|
execute: mock(() => TE.right("Result 1") as any),
|
||||||
|
schedule,
|
||||||
|
maxRetries: 3
|
||||||
|
};
|
||||||
|
const job2: Job = {
|
||||||
|
id: "2",
|
||||||
|
toString: () => "Job 2",
|
||||||
|
execute: mock(() => TE.left(new Error("Failure 2")) as any),
|
||||||
|
schedule,
|
||||||
|
maxRetries: 3
|
||||||
|
};
|
||||||
|
const jobs: Job[] = [job1, job2];
|
||||||
|
|
||||||
|
const result = await execute(jobs)();
|
||||||
|
|
||||||
|
expect(result.left).toEqual([[job2, new Error("Failure 2")]]);
|
||||||
|
expect(result.right).toEqual([[job1, "Result 1"]]);
|
||||||
|
});
|
||||||
|
|
||||||
|
test("should format job results correctly", () => {
|
||||||
|
const jobsTable: JobsTable = new Map([["1", { scheduled: new Date("2023-01-01T00:00:00Z"), retries: 1 }]]);
|
||||||
|
const left = [
|
||||||
|
[{ id: "1", toString: () => "Job 1", execute: mock(), schedule: {}, maxRetries: 3 }, new Error("Error 1")]
|
||||||
|
];
|
||||||
|
const right = [[{ id: "2", toString: () => "Job 2", execute: mock(), schedule: {}, maxRetries: 3 }, "Success 2"]];
|
||||||
|
|
||||||
|
const result = formatJobResults(jobsTable, { left, right } as any);
|
||||||
|
|
||||||
|
expect(result).toContain("Job 1 | 1 / 3 | (retry) :/ | Error 1");
|
||||||
|
expect(result).toContain("Job 2 | (success) :) | Success 2");
|
||||||
|
});
|
||||||
|
|
||||||
|
test("should update jobsTable and lastPingAck correctly", async () => {
|
||||||
|
const jobsTable: JobsTable = new Map([["1", { scheduled: new Date("2023-01-01T00:00:00Z"), retries: 1 }]]);
|
||||||
|
const jobs: Job[] = [
|
||||||
|
{
|
||||||
|
id: "1",
|
||||||
|
toString: () => "Job 1",
|
||||||
|
execute: mock(() => TE.right("Result 1") as any),
|
||||||
|
schedule,
|
||||||
|
maxRetries: 3
|
||||||
|
}
|
||||||
|
];
|
||||||
|
const publishers: any = [];
|
||||||
|
const lastPingAck = new Date("2023-01-01T00:00:00Z");
|
||||||
|
|
||||||
|
const result = await perform(jobsTable, jobs, publishers, lastPingAck)();
|
||||||
|
|
||||||
|
expect(result.lastPingAck).not.toEqual(lastPingAck);
|
||||||
|
expect(result.jobsTable.get("1")).toEqual({
|
||||||
|
retries: 0,
|
||||||
|
scheduled: expect.any(Date)
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
test("should update a job with retry count on failure", async () => {
|
||||||
|
// Create a mock job that fails the first time but succeeds the second time
|
||||||
|
const job1: Job = {
|
||||||
|
id: "1",
|
||||||
|
toString: () => "Job 1",
|
||||||
|
execute: mock(() => TE.left(new Error("Error 1")) as any),
|
||||||
|
schedule,
|
||||||
|
maxRetries: 2
|
||||||
|
};
|
||||||
|
|
||||||
|
const jobsTable: JobsTable = new Map([["1", { scheduled: new Date("2023-01-01T00:00:00Z"), retries: 0 }]]);
|
||||||
|
const jobs: Job[] = [job1];
|
||||||
|
const publishers: any = [];
|
||||||
|
const lastPingAck = new Date("2023-01-01T00:00:00Z");
|
||||||
|
|
||||||
|
const result = await perform(jobsTable, jobs, publishers, lastPingAck, 24 * 60 * 60 * 1000)();
|
||||||
|
|
||||||
|
// Assert the job was retried once and then succeeded
|
||||||
|
expect(job1.execute).toHaveBeenCalled();
|
||||||
|
|
||||||
|
// Check the jobsTable for the updated state
|
||||||
|
expect(result.jobsTable.get("1")).toEqual({
|
||||||
|
retries: 1,
|
||||||
|
scheduled: expect.any(Date)
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
test("should reschedule a job that hits max retries", async () => {
|
||||||
|
// Create a mock job that fails the first time but succeeds the second time
|
||||||
|
const job1: Job = {
|
||||||
|
id: "1",
|
||||||
|
toString: () => "Job 1",
|
||||||
|
execute: mock().mockReturnValue(TE.left(new Error("Error 1"))),
|
||||||
|
schedule,
|
||||||
|
maxRetries: 4
|
||||||
|
};
|
||||||
|
|
||||||
|
const jobsTable: JobsTable = new Map([["1", { scheduled: new Date("2023-01-01T00:00:00Z"), retries: 4 }]]);
|
||||||
|
const jobs: Job[] = [job1];
|
||||||
|
const publishers: any = [mock().mockReturnValue(TE.right(200))];
|
||||||
|
|
||||||
|
const lastPingAck = new Date("2023-01-01T00:00:00Z");
|
||||||
|
|
||||||
|
const now = Date.now();
|
||||||
|
const result = await perform(jobsTable, jobs, publishers, lastPingAck, 24 * 60 * 60 * 1000)();
|
||||||
|
const delta = Date.now() - now;
|
||||||
|
|
||||||
|
// Assert the job was retried once and then fail
|
||||||
|
expect(job1.execute).toHaveBeenCalled();
|
||||||
|
|
||||||
|
// Check the jobsTable for the updated state
|
||||||
|
const { retries, scheduled } = result.jobsTable.get("1")!;
|
||||||
|
expect(retries).toEqual(0);
|
||||||
|
expect(publishers[0]).toHaveBeenCalled();
|
||||||
|
|
||||||
|
expect(scheduled.getTime()).toBeGreaterThan(now - delta + schedule.every);
|
||||||
|
expect(scheduled.getTime()).toBeLessThan(now + delta + schedule.every + schedule.jitter);
|
||||||
|
});
|
||||||
|
|
||||||
|
test("should not publish only successes when should not ack", async () => {
|
||||||
|
// Create a mock job that fails the first time but succeeds the second time
|
||||||
|
const job1: Job = {
|
||||||
|
id: "1",
|
||||||
|
toString: () => "Job 1",
|
||||||
|
execute: mock().mockReturnValue(TE.right(new Error("Error 1"))),
|
||||||
|
schedule,
|
||||||
|
maxRetries: 4
|
||||||
|
};
|
||||||
|
|
||||||
|
const jobsTable: JobsTable = new Map([["1", { scheduled: new Date("2023-01-01T00:00:00Z"), retries: 0 }]]);
|
||||||
|
const jobs: Job[] = [job1];
|
||||||
|
const publishers: any = [mock().mockReturnValue(TE.right(200))];
|
||||||
|
const lastPingAck = new Date();
|
||||||
|
|
||||||
|
await perform(jobsTable, jobs, publishers, lastPingAck, 24 * 60 * 60 * 1000)();
|
||||||
|
|
||||||
|
expect(job1.execute).toHaveBeenCalled();
|
||||||
|
expect(publishers[0]).toHaveBeenCalledTimes(0);
|
||||||
|
});
|
||||||
|
|
||||||
|
test("should publish when should ack", async () => {
|
||||||
|
// Create a mock job that fails the first time but succeeds the second time
|
||||||
|
const job1: Job = {
|
||||||
|
id: "1",
|
||||||
|
toString: () => "Job 1",
|
||||||
|
execute: mock().mockReturnValue(TE.right(new Error("Error 1"))),
|
||||||
|
schedule,
|
||||||
|
maxRetries: 4
|
||||||
|
};
|
||||||
|
|
||||||
|
const jobsTable: JobsTable = new Map([["1", { scheduled: new Date("2023-01-01T00:00:00Z"), retries: 0 }]]);
|
||||||
|
const jobs: Job[] = [job1];
|
||||||
|
const publishers: any = [mock().mockReturnValue(TE.right(200))];
|
||||||
|
const lastPingAck = new Date("2023-01-01T00:00:00Z");
|
||||||
|
|
||||||
|
await perform(jobsTable, jobs, publishers, lastPingAck, 24 * 60 * 60 * 1000)();
|
||||||
|
|
||||||
|
expect(job1.execute).toHaveBeenCalled();
|
||||||
|
expect(publishers[0]).toHaveBeenCalled();
|
||||||
|
});
|
Loading…
Reference in New Issue