add a console publisher

This commit is contained in:
Elizabeth Hunt 2024-08-26 17:55:04 -07:00
parent 8f584b5b05
commit 18456c13d4
7 changed files with 140 additions and 43 deletions

View File

@ -3,11 +3,11 @@ import * as TE from "fp-ts/lib/TaskEither";
import * as O from "fp-ts/lib/Option"; import * as O from "fp-ts/lib/Option";
import { createTransport } from "nodemailer"; import { createTransport } from "nodemailer";
import { toError } from "fp-ts/lib/Either"; import { toError } from "fp-ts/lib/Either";
import { pipe } from "fp-ts/lib/function"; import { flow, pipe } from "fp-ts/lib/function";
import { ImapFlow, type FetchMessageObject, type FetchQueryObject, type MailboxLockObject } from "imapflow"; import { ImapFlow, type FetchMessageObject, type FetchQueryObject, type MailboxLockObject } from "imapflow";
import * as IO from "fp-ts/lib/IO"; import * as IO from "fp-ts/lib/IO";
import * as T from "fp-ts/lib/Task"; import * as T from "fp-ts/lib/Task";
import { ConsoleLogger } from "../util"; import { ConsoleLogger, type Logger } from "../util";
interface ImapClientI { interface ImapClientI {
fetchAll: (range: string, options: FetchQueryObject) => Promise<FetchMessageObject[]>; fetchAll: (range: string, options: FetchQueryObject) => Promise<FetchMessageObject[]>;
@ -145,9 +145,10 @@ type FindEmailUidInInbox = (
imap: ImapClientI, imap: ImapClientI,
equalsEmail: (message: FetchMessageObject) => boolean, equalsEmail: (message: FetchMessageObject) => boolean,
retries: number, retries: number,
pollIntervalMs: number pollIntervalMs: number,
logger?: Logger
) => TE.TaskEither<Error, number>; ) => TE.TaskEither<Error, number>;
const findEmailUidInInbox: FindEmailUidInInbox = (imap, equalsEmail, retries, pollIntervalMs) => const findEmailUidInInbox: FindEmailUidInInbox = (imap, equalsEmail, retries, pollIntervalMs, logger = ConsoleLogger) =>
pipe( pipe(
fetchMessages(imap), fetchMessages(imap),
TE.flatMap((messages) => { TE.flatMap((messages) => {
@ -160,11 +161,16 @@ const findEmailUidInInbox: FindEmailUidInInbox = (imap, equalsEmail, retries, po
TE.fold( TE.fold(
(e) => (e) =>
pipe( pipe(
TE.fromIO(ConsoleLogger.log(`failed; ${retries} retries left.`)), TE.fromIO(logger.log(`email failed; ${retries} retries left.`)),
TE.chain(() => (retries === 0 ? TE.left(e) : T.delay(pollIntervalMs)(TE.right(null)))), TE.chain(() => (retries === 0 ? TE.left(e) : T.delay(pollIntervalMs)(TE.right(null)))),
TE.chain(() => findEmailUidInInbox(imap, equalsEmail, retries - 1, pollIntervalMs)) TE.chain(() => findEmailUidInInbox(imap, equalsEmail, retries - 1, pollIntervalMs))
), ),
TE.of (s) =>
pipe(
s,
TE.of,
TE.tap(() => TE.fromIO(logger.log("Email succeeded")))
)
) )
); );
@ -207,11 +213,7 @@ export const perform = (
), ),
// cleanup. // cleanup.
TE.bind("deleted", ({ imap, uid, mailboxLock }) => TE.bind("deleted", ({ imap, uid, mailboxLock }) =>
TE.tryCatch( TE.tryCatch(() => imap.messageDelete([uid]), ToErrorWithLock(mailboxLock))
// () => imap.messageDelete([uid], { uid: true }),
() => imap.messageDelete([uid]),
ToErrorWithLock(mailboxLock)
)
), ),
TE.fold( TE.fold(
(e) => { (e) => {

View File

@ -14,7 +14,7 @@ import { ConsoleLogger, type Logger } from "../util";
export interface Job { export interface Job {
id: string; id: string;
toString: () => string; toString: () => string;
execute: <TResult>() => TE.TaskEither<Error, TResult>; execute: () => TE.TaskEither<Error, boolean>;
schedule: Schedule; schedule: Schedule;
maxRetries: number; maxRetries: number;
} }
@ -52,6 +52,13 @@ export type JobsTableRow = {
retries: number; retries: number;
}; };
export type JobsTable = ReadonlyMap<string, JobsTableRow>; export type JobsTable = ReadonlyMap<string, JobsTableRow>;
export const constructJobsTable = (jobs: ReadonlyArray<Job>, now: Date): JobsTable => {
return pipe(
jobs,
RA.map((job) => [job.id, { retries: 0, scheduled: now }] as readonly [string, JobsTableRow]),
RM.fromFoldable(S.Eq, jobsTableRowMagma, RA.Foldable)
);
};
export const jobsTableRowMagma: Magma<JobsTableRow> = { export const jobsTableRowMagma: Magma<JobsTableRow> = {
concat: (a: JobsTableRow, b: JobsTableRow): JobsTableRow => { concat: (a: JobsTableRow, b: JobsTableRow): JobsTableRow => {
@ -94,7 +101,7 @@ export const formatJobResults = <TResult>(
) )
.join("\n"); .join("\n");
const successMessages = right.map(([job, val]) => job.toString() + " | (success) :) | " + val).join("\n"); const successMessages = right.map(([job, val]) => job.toString() + " | (success) :) | " + val).join("\n");
return `FAILURES:\n${failureMessages}\nRETRIES:\n${retryMessages}\nSUCCESSES:\n${successMessages}`; return `FAILURES:\n${failureMessages}\n\nRETRIES:\n${retryMessages}\n\nSUCCESSES:\n${successMessages}`;
}; };
export const perform = ( export const perform = (
@ -136,7 +143,7 @@ export const perform = (
RA.wilt(T.ApplicativePar)(identity), RA.wilt(T.ApplicativePar)(identity),
T.tap(({ left }) => T.tap(({ left }) =>
pipe( pipe(
left left.length
? logger.error("Encountered publishing errors: " + left.toString()) ? logger.error("Encountered publishing errors: " + left.toString())
: logger.log("Published successfully"), : logger.log("Published successfully"),
T.fromIO T.fromIO
@ -155,7 +162,7 @@ export const perform = (
return [job.id, { retries: 1, scheduled: now }]; return [job.id, { retries: 1, scheduled: now }];
} }
if (row.retries >= job.maxRetries) { if (row.retries >= job.maxRetries) {
logger.error(`Hit max retries; scheduling again ;-; ${job.toString()}`); logger.error(`Hit max retries for; scheduling again ;-; ${job.toString()}`);
return [job.id, { retries: 0, scheduled: nextSchedule(job.schedule, now) }] as const; return [job.id, { retries: 0, scheduled: nextSchedule(job.schedule, now) }] as const;
} }
return [ return [

View File

@ -1,7 +1,8 @@
import * as TE from "fp-ts/lib/TaskEither"; import * as TE from "fp-ts/lib/TaskEither";
import * as O from "fp-ts/lib/Option"; import * as O from "fp-ts/lib/Option";
import type { TestJob } from "./job"; import type { EmailJob, TestJob } from "./job";
import type { D } from "../util"; import type { D } from "../util";
import { perform } from "./email";
export enum TestType { export enum TestType {
EMAIL = "email", EMAIL = "email",
@ -22,10 +23,16 @@ export interface Test {
schedule: Schedule; schedule: Schedule;
} }
export type Testable<TJob extends TestJob> = (job: TJob) => TE.TaskEither<Error, boolean>;
export const parseTestType = (testType: string): O.Option<TestType> => export const parseTestType = (testType: string): O.Option<TestType> =>
Object.values(TestType).includes(testType as TestType) ? O.some(testType as TestType) : O.none; Object.values(TestType).includes(testType as TestType) ? O.some(testType as TestType) : O.none;
export const nextSchedule = (schedule: Schedule, date: Date) => export const nextSchedule = (schedule: Schedule, date: Date) =>
new Date(date.getTime() + schedule.every + Math.random() * schedule.jitter); new Date(date.getTime() + schedule.every + Math.random() * schedule.jitter);
export const executorFor = (type: TestType, job: TestJob): (() => TE.TaskEither<Error, boolean>) => {
switch (type) {
case TestType.EMAIL:
return () => perform(job as EmailJob);
}
return () => TE.right(true);
};

View File

@ -1,7 +1,9 @@
import * as IO from "fp-ts/IO"; import * as E from "fp-ts/Either";
import type { Publisher } from "./publisher"; import type { Publisher } from "./publisher";
import { readFileSync } from "fs"; import { readFileSync } from "fs";
import type { Test } from "./canary"; import { TestType, type Test, type TestJob } from "./canary";
import * as D from "./util/duration";
import { pipe } from "fp-ts/lib/function";
export interface Config { export interface Config {
result_publishers: Publisher[]; result_publishers: Publisher[];
@ -10,9 +12,28 @@ export interface Config {
tests: Test[]; tests: Test[];
} }
export const readConfig = export const transformDurations = (obj: any): E.Either<string, any> => {
(filePath: string): IO.IO<Config> => const transform = (o: any): E.Either<string, any> => {
() => { const entries = Object.entries(o);
const confStr = readFileSync(filePath, "utf-8");
return JSON.parse(confStr); for (let [key, value] of entries) {
if (key === "duration" && typeof value === "string") {
return D.parse(value);
} else if (typeof value === "object" && value !== null) {
const result = transform(value);
if (E.isLeft(result)) {
return result;
} else {
o[key] = result.right;
}
}
}
return E.right(o);
}; };
return transform(obj);
};
export const readConfig = (filePath: string): E.Either<string, Config> =>
pipe(readFileSync(filePath, "utf-8"), JSON.parse, transformDurations) as E.Either<string, Config>;

View File

@ -1,13 +1,51 @@
import * as TE from "fp-ts/lib/TaskEither"; import * as TE from "fp-ts/lib/TaskEither";
import * as O from "fp-ts/lib/Option";
import * as RA from "fp-ts/ReadonlyArray";
import { pipe } from "fp-ts/lib/function"; import { pipe } from "fp-ts/lib/function";
import { toError } from "fp-ts/lib/Either";
import { readConfig } from "./config"; import { readConfig } from "./config";
import { parseArgs } from "./args"; import { parseArgs, type Args } from "./args";
import { executorFor, type Test, type TestType } from "./canary";
import { constructJobsTable, perform, type Job } from "./canary/scheduler";
import { publish, PublisherType } from "./publisher";
import { ConsoleLogger } from "./util";
const main: TE.TaskEither<Error, void> = pipe( const testFilters = (args: Args): ReadonlyArray<(test: Test) => boolean> =>
(
[
[args.testType, (testType: TestType) => (test: Test) => test.type === testType],
[args.testName, (testName: string) => (test: Test) => testName === test.name]
] as ReadonlyArray<[O.Option<any>, (t: any) => (test: Test) => boolean]>
)
.map(([o, pred]) =>
pipe(
o,
O.map((val) => pred(val))
)
)
.filter(O.isSome)
.map(({ value }) => value);
const main: TE.TaskEither<string | Error, any> = pipe(
TE.fromEither(parseArgs(Bun.argv)), TE.fromEither(parseArgs(Bun.argv)),
TE.bindTo("args"), TE.bindTo("args"),
TE.bind("config", ({ args }) => TE.fromIO(readConfig(args.testsFile))), TE.bind("now", () => TE.fromIO(() => new Date())),
TE.map(({ config }) => TE.tryCatch(() => {}, toError)) TE.bind("config", ({ args }) => TE.fromEither(readConfig(args.testsFile))),
TE.flatMap(({ config, args, now }) => {
const filters = testFilters(args);
const jobs = config.tests
.filter((test) => filters.every((filter) => filter(test)))
.map((test, i): Job => {
const toString = () => test.name;
const id = i.toString();
const schedule = test.schedule;
const maxRetries = 3;
const execute = executorFor(test.type, test.job);
return { toString, id, schedule, maxRetries, execute };
});
const jobsTable = constructJobsTable(jobs, now);
const publishers = pipe([{ publisherType: PublisherType.CONSOLE, post: ConsoleLogger }] as const, RA.map(publish));
return pipe(perform(jobsTable, jobs, publishers, new Date()), TE.fromTask);
})
); );
main(); main().then(() => process.exit());

View File

@ -2,25 +2,35 @@ import { toError } from "fp-ts/lib/Either";
import * as TE from "fp-ts/TaskEither"; import * as TE from "fp-ts/TaskEither";
import { publishDiscord, type DiscordPost } from "./discord"; import { publishDiscord, type DiscordPost } from "./discord";
import { publishNtfy, type NtfyPost } from "./ntfy"; import { publishNtfy, type NtfyPost } from "./ntfy";
import type { Logger } from "../util";
import { pipe } from "fp-ts/lib/function";
export enum PublisherType { export enum PublisherType {
DISCORD = "discord", DISCORD = "discord",
NTFY = "ntfy" NTFY = "ntfy",
CONSOLE = "console"
} }
export type PublisherPost = DiscordPost | NtfyPost; export type PublisherPost = DiscordPost | NtfyPost | Logger;
export interface Publisher { export interface Publisher {
type: PublisherType; publisherType: PublisherType;
post: PublisherPost; post: PublisherPost;
} }
export const publish = (publisher: Publisher, message: string): TE.TaskEither<Error, number> => { export const publish = (publisher: Publisher): ((message: string) => TE.TaskEither<Error, number>) => {
switch (publisher.type) { switch (publisher.publisherType) {
case PublisherType.DISCORD: case PublisherType.DISCORD:
return TE.tryCatch(() => publishDiscord(publisher.post as DiscordPost, message), toError); return (message) => TE.tryCatch(() => publishDiscord(publisher.post as DiscordPost, message), toError);
case PublisherType.NTFY: case PublisherType.NTFY:
return TE.tryCatch(() => publishNtfy(publisher.post as NtfyPost, message), toError); return (message) => TE.tryCatch(() => publishNtfy(publisher.post as NtfyPost, message), toError);
case PublisherType.CONSOLE:
return (message) =>
pipe(
(publisher.post as Logger).log("\n>=====<\n" + message + "\n>=====<"),
TE.fromIO,
TE.map(() => 200)
);
default: default:
return TE.left(new Error("unknown publisher type: " + publisher.type)); return (_message) => TE.left(new Error("unknown publisher type: " + publisher.publisherType));
} }
}; };

View File

@ -3,9 +3,21 @@ import type { IO } from "fp-ts/lib/IO";
export interface Logger { export interface Logger {
log: (message: string) => IO<void>; log: (message: string) => IO<void>;
error: (message: string) => IO<void>; error: (message: string) => IO<void>;
addPrefix: (prefix: string) => Logger;
} }
export const ConsoleLogger: Logger = { export class ConsoleLoggerI implements Logger {
log: (message: string) => () => console.log(new Date(), "[INFO]", message), constructor(private readonly prefix = "") {}
error: (message: string) => () => console.error(new Date(), "[ERROR]", message)
}; public log(message: string) {
return () => console.log(new Date(), "[INFO]", this.prefix, message);
}
public error(message: string) {
return () => console.error(new Date(), "[ERROR]", this.prefix, message);
}
public addPrefix(prefix: string): ConsoleLoggerI {
return new ConsoleLoggerI(this.prefix + prefix);
}
}
export const ConsoleLogger = new ConsoleLoggerI();