Compare commits

..

9 Commits

20 changed files with 116 additions and 93 deletions

View File

@ -3,14 +3,15 @@ import { TaggedVariant, VariantTag } from "../variant/index.js";
/** /**
* A TaggedError is a tagged variant with an error message. * A TaggedError is a tagged variant with an error message.
*/ */
export class TaggedError<Tag extends string = string> export abstract class TaggedError<Tag extends string = string>
extends Error extends Error
implements TaggedVariant<Tag> implements TaggedVariant<Tag>
{ {
readonly [VariantTag]: Tag; readonly [VariantTag]: Tag;
constructor(tag: Tag) { constructor(tag: Tag, message?: string) {
super(); super(message);
this[VariantTag] = tag; this[VariantTag] = tag;
this.name = tag;
} }
} }

View File

@ -8,12 +8,7 @@ import { TaggedError } from "./tagged-error.js";
* we must be prepared to handle. * we must be prepared to handle.
*/ */
export class UnexpectedError extends TaggedError<"UnexpectedError"> { export class UnexpectedError extends TaggedError<"UnexpectedError"> {
constructor(readonly context: Record<string, unknown> = {}) { constructor(message?: string) {
super("UnexpectedError"); super("UnexpectedError", message);
this.message = "An unexpected error occurred";
}
toString() {
return `UnexpectedError: ${this.message}\n${JSON.stringify(this.context, null, 2)}`;
} }
} }

View File

@ -5,4 +5,5 @@ export * from "./result/index.js";
export * from "./run/index.js"; export * from "./run/index.js";
export * from "./time/index.js"; export * from "./time/index.js";
export * from "./types/index.js"; export * from "./types/index.js";
export * from "./utils/index.js";
export * from "./variant/index.js"; export * from "./variant/index.js";

View File

@ -3,3 +3,4 @@ export * from "./fn.js";
export * from "./keyof.js"; export * from "./keyof.js";
export * from "./maybe-promise.js"; export * from "./maybe-promise.js";
export * from "./optional.js"; export * from "./optional.js";
export * from "./record.js";

View File

@ -0,0 +1,8 @@
import { UnexpectedError } from "../error/unexpected-error.js";
export function ensureValue<T>(value?: T): T {
if (!value) {
throw new UnexpectedError("Value is undefined");
}
return value;
}

View File

@ -0,0 +1 @@
export * from "./ensure-value.js";

View File

@ -1,17 +1,21 @@
import { Fn } from "../types/fn.js"; import { Fn } from "../types/fn.js";
import { TaggedVariant, VariantFromTag, VariantTag } from "./variant.js"; import { TaggedVariant, VariantFromTag, VariantTag } from "./variant.js";
export type VariantMatcher<TVariant extends TaggedVariant<string>> = { export type VariantMatcher<TVariant extends TaggedVariant<string>, T> = {
[K in TVariant[VariantTag]]: Fn<VariantFromTag<TVariant, K>>; [K in TVariant[VariantTag]]: Fn<VariantFromTag<TVariant, K>, T>;
}; };
export function match<const TVariant extends TaggedVariant<string>>( export function match<const TVariant extends TaggedVariant<string>>(
v: TVariant, v: TVariant,
) { ) {
return { return {
case<const TMatcher extends VariantMatcher<TVariant>>( case<
cases: TMatcher, const TReturnType,
): ReturnType<TMatcher[TVariant[VariantTag]]> { const TMatcher extends VariantMatcher<
TVariant,
TReturnType
> = VariantMatcher<TVariant, TReturnType>,
>(cases: TMatcher): TReturnType {
if (!(v[VariantTag] in cases)) { if (!(v[VariantTag] in cases)) {
throw new Error("Non-exhaustive pattern match"); throw new Error("Non-exhaustive pattern match");
} }

View File

@ -7,7 +7,7 @@ export interface TaggedVariant<TTag extends string> {
export type VariantFromTag< export type VariantFromTag<
TVariant extends TaggedVariant<string>, TVariant extends TaggedVariant<string>,
TTag extends TVariant[typeof VariantTag], TTag extends TVariant[VariantTag],
> = Extract<TVariant, { [VariantTag]: TTag }>; > = Extract<TVariant, { [VariantTag]: TTag }>;
export namespace Variant { export namespace Variant {

View File

@ -1,12 +1,7 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
import { TaggedError } from "@fabric/core"; import { TaggedError } from "@fabric/core";
export class StoreQueryError extends TaggedError<"StoreQueryError"> { export class StoreQueryError extends TaggedError<"StoreQueryError"> {
constructor( constructor(public message: string) {
public message: string, super("StoreQueryError", message);
public context: any,
) {
super("StoreQueryError");
} }
} }

View File

@ -1,7 +1,13 @@
import { AsyncResult, MaybePromise, PosixDate } from "@fabric/core"; import {
AsyncResult,
MaybePromise,
PosixDate,
VariantFromTag,
VariantTag,
} from "@fabric/core";
import { StoreQueryError } from "../errors/query-error.js"; import { StoreQueryError } from "../errors/query-error.js";
import { UUID } from "../types/uuid.js"; import { UUID } from "../types/uuid.js";
import { Event, EventFromKey } from "./event.js"; import { Event } from "./event.js";
import { StoredEvent } from "./stored-event.js"; import { StoredEvent } from "./stored-event.js";
export interface EventStore<TEvents extends Event> { export interface EventStore<TEvents extends Event> {
@ -16,9 +22,9 @@ export interface EventStore<TEvents extends Event> {
streamId: UUID, streamId: UUID,
): AsyncResult<StoredEvent<TEvents>[], StoreQueryError>; ): AsyncResult<StoredEvent<TEvents>[], StoreQueryError>;
subscribe<TEventKey extends TEvents["type"]>( subscribe<TEventKey extends TEvents[VariantTag]>(
events: TEventKey[], events: TEventKey[],
subscriber: EventSubscriber<EventFromKey<TEvents, TEventKey>>, subscriber: EventSubscriber<VariantFromTag<TEvents, TEventKey>>,
): void; ): void;
} }

View File

@ -1,11 +1,12 @@
/* eslint-disable @typescript-eslint/no-explicit-any */ /* eslint-disable @typescript-eslint/no-explicit-any */
import { VariantTag } from "@fabric/core";
import { UUID } from "../types/uuid.js"; import { UUID } from "../types/uuid.js";
/** /**
* An event is a tagged variant with a payload and a timestamp. * An event is a tagged variant with a payload and a timestamp.
*/ */
export interface Event<TTag extends string = string, TPayload = any> { export interface Event<TTag extends string = string, TPayload = any> {
readonly type: TTag; readonly [VariantTag]: TTag;
readonly id: UUID; readonly id: UUID;
readonly streamId: UUID; readonly streamId: UUID;
readonly payload: TPayload; readonly payload: TPayload;
@ -13,5 +14,5 @@ export interface Event<TTag extends string = string, TPayload = any> {
export type EventFromKey< export type EventFromKey<
TEvents extends Event, TEvents extends Event,
TKey extends TEvents["type"], TKey extends TEvents[VariantTag],
> = Extract<TEvents, { type: TKey }>; > = Extract<TEvents, { [VariantTag]: TKey }>;

View File

@ -19,6 +19,7 @@ export const DefaultModelFields = {
isUnsigned: true, isUnsigned: true,
hasArbitraryPrecision: true, hasArbitraryPrecision: true,
}), }),
deletedAt: Field.timestamp({ isOptional: true }),
}; };
export interface Model< export interface Model<

View File

@ -0,0 +1,13 @@
import { VariantTag } from "@fabric/core";
import { Event } from "../events/event.js";
import { StoredEvent } from "../events/stored-event.js";
import { Model, ModelToType } from "../models/model.js";
export interface Projection<TModel extends Model, TEvents extends Event> {
model: TModel;
events: TEvents[VariantTag][];
projection: (
event: StoredEvent<TEvents>,
model?: ModelToType<TModel>,
) => ModelToType<TModel>;
}

View File

@ -26,7 +26,7 @@ describe("Event Store", () => {
const newUUID = UUIDGeneratorMock.generate(); const newUUID = UUIDGeneratorMock.generate();
const userCreated: UserCreated = { const userCreated: UserCreated = {
type: "UserCreated", _tag: "UserCreated",
id: newUUID, id: newUUID,
streamId: newUUID, streamId: newUUID,
payload: { name: "test" }, payload: { name: "test" },
@ -41,7 +41,7 @@ describe("Event Store", () => {
expect(events[0]).toEqual({ expect(events[0]).toEqual({
id: newUUID, id: newUUID,
streamId: newUUID, streamId: newUUID,
type: "UserCreated", _tag: "UserCreated",
version: BigInt(1), version: BigInt(1),
timestamp: expect.any(PosixDate), timestamp: expect.any(PosixDate),
payload: { name: "test" }, payload: { name: "test" },
@ -52,7 +52,7 @@ describe("Event Store", () => {
const newUUID = UUIDGeneratorMock.generate(); const newUUID = UUIDGeneratorMock.generate();
const userCreated: UserCreated = { const userCreated: UserCreated = {
type: "UserCreated", _tag: "UserCreated",
id: newUUID, id: newUUID,
streamId: newUUID, streamId: newUUID,
payload: { name: "test" }, payload: { name: "test" },
@ -68,7 +68,7 @@ describe("Event Store", () => {
expect(subscriber).toHaveBeenCalledWith({ expect(subscriber).toHaveBeenCalledWith({
id: newUUID, id: newUUID,
streamId: newUUID, streamId: newUUID,
type: "UserCreated", _tag: "UserCreated",
version: BigInt(1), version: BigInt(1),
timestamp: expect.any(PosixDate), timestamp: expect.any(PosixDate),
payload: { name: "test" }, payload: { name: "test" },

View File

@ -1,4 +1,10 @@
import { AsyncResult, MaybePromise, PosixDate, Run } from "@fabric/core"; import {
AsyncResult,
MaybePromise,
PosixDate,
Run,
VariantTag,
} from "@fabric/core";
import { import {
Event, Event,
EventFromKey, EventFromKey,
@ -19,7 +25,7 @@ export class SQLiteEventStore<TEvents extends Event>
private streamVersions = new Map<UUID, bigint>(); private streamVersions = new Map<UUID, bigint>();
private eventSubscribers = new Map< private eventSubscribers = new Map<
TEvents["type"], TEvents[VariantTag],
EventSubscriber<TEvents>[] EventSubscriber<TEvents>[]
>(); >();
@ -34,7 +40,7 @@ export class SQLiteEventStore<TEvents extends Event>
await this.db.run( await this.db.run(
`CREATE TABLE IF NOT EXISTS events ( `CREATE TABLE IF NOT EXISTS events (
id TEXT PRIMARY KEY, id TEXT PRIMARY KEY,
type TEXT NOT NULL, _tag TEXT NOT NULL,
streamId TEXT NOT NULL, streamId TEXT NOT NULL,
version INTEGER NOT NULL, version INTEGER NOT NULL,
timestamp NUMERIC NOT NULL, timestamp NUMERIC NOT NULL,
@ -43,7 +49,7 @@ export class SQLiteEventStore<TEvents extends Event>
)`, )`,
); );
}, },
(error) => new StoreQueryError(error.message, { error }), (error) => new StoreQueryError(error.message),
); );
} }
@ -57,18 +63,18 @@ export class SQLiteEventStore<TEvents extends Event>
{ {
$id: streamId, $id: streamId,
}, },
(event) => ({ (e) => ({
id: event.id, id: e.id,
streamId: event.streamId, streamId: e.streamId,
type: event.type, _tag: e._tag,
version: BigInt(event.version), version: BigInt(e.version),
timestamp: new PosixDate(event.timestamp), timestamp: new PosixDate(e.timestamp),
payload: JSONUtils.parse(event.payload), payload: JSONUtils.parse(e.payload),
}), }),
); );
return events; return events;
}, },
(error) => new StoreQueryError(error.message, { error }), (error) => new StoreQueryError(error.message),
); );
} }
@ -95,7 +101,7 @@ export class SQLiteEventStore<TEvents extends Event>
event: StoredEvent<TEvents>, event: StoredEvent<TEvents>,
): AsyncResult<void> { ): AsyncResult<void> {
return AsyncResult.from(async () => { return AsyncResult.from(async () => {
const subscribers = this.eventSubscribers.get(event.type) || []; const subscribers = this.eventSubscribers.get(event[VariantTag]) || [];
await Promise.all(subscribers.map((subscriber) => subscriber(event))); await Promise.all(subscribers.map((subscriber) => subscriber(event)));
}); });
} }
@ -114,20 +120,17 @@ export class SQLiteEventStore<TEvents extends Event>
return !lastVersion ? 0n : BigInt(lastVersion); return !lastVersion ? 0n : BigInt(lastVersion);
}, },
(error) => (error) => new StoreQueryError(error.message),
new StoreQueryError(`Error getting last version:${error.message}`, {
error,
}),
); );
} }
subscribe<TEventKey extends TEvents["type"]>( subscribe<TEventKey extends TEvents[VariantTag]>(
events: TEventKey[], eventNames: TEventKey[],
subscriber: ( subscriber: (
event: StoredEvent<EventFromKey<TEvents, TEventKey>>, event: StoredEvent<EventFromKey<TEvents, TEventKey>>,
) => MaybePromise<void>, ) => MaybePromise<void>,
): void { ): void {
events.forEach((event) => { eventNames.forEach((event) => {
const subscribers = this.eventSubscribers.get(event) || []; const subscribers = this.eventSubscribers.get(event) || [];
const newSubscribers = [ const newSubscribers = [
...subscribers, ...subscribers,
@ -140,7 +143,7 @@ export class SQLiteEventStore<TEvents extends Event>
close(): AsyncResult<void, StoreQueryError> { close(): AsyncResult<void, StoreQueryError> {
return AsyncResult.tryFrom( return AsyncResult.tryFrom(
() => this.db.close(), () => this.db.close(),
(error) => new StoreQueryError(error.message, { error }), (error) => new StoreQueryError(error.message),
); );
} }
@ -157,12 +160,12 @@ export class SQLiteEventStore<TEvents extends Event>
timestamp: new PosixDate(), timestamp: new PosixDate(),
}; };
await this.db.runPrepared( await this.db.runPrepared(
`INSERT INTO events (id, streamId, type, version, timestamp, payload) `INSERT INTO events (id, streamId, _tag, version, timestamp, payload)
VALUES ($id, $streamId, $type, $version, $timestamp, $payload)`, VALUES ($id, $streamId, $_tag, $version, $timestamp, $payload)`,
{ {
$id: storedEvent.id, $id: storedEvent.id,
$streamId: streamId, $streamId: streamId,
$type: storedEvent.type, $_tag: storedEvent[VariantTag],
$version: storedEvent.version.toString(), $version: storedEvent.version.toString(),
$timestamp: storedEvent.timestamp.timestamp, $timestamp: storedEvent.timestamp.timestamp,
$payload: JSON.stringify(storedEvent.payload), $payload: JSON.stringify(storedEvent.payload),
@ -170,7 +173,7 @@ export class SQLiteEventStore<TEvents extends Event>
); );
return storedEvent; return storedEvent;
}, },
(error) => new StoreQueryError("Error appending event", { error }), (error) => new StoreQueryError(error.message),
); );
} }
} }

View File

@ -14,6 +14,9 @@ export function transformRow(model: Collection) {
} }
function valueFromSQL(field: FieldDefinition, value: any): any { function valueFromSQL(field: FieldDefinition, value: any): any {
if (value === null) {
return null;
}
const r = FieldSQLInsertMap[field[VariantTag]]; const r = FieldSQLInsertMap[field[VariantTag]];
return r(field as any, value); return r(field as any, value);
} }

View File

@ -25,6 +25,9 @@ const FieldSQLInsertMap: FieldSQLInsertMap = {
}; };
export function fieldValueToSQL(field: FieldDefinition, value: any) { export function fieldValueToSQL(field: FieldDefinition, value: any) {
if (value === null) {
return null;
}
const r = FieldSQLInsertMap[field[VariantTag]] as any; const r = FieldSQLInsertMap[field[VariantTag]] as any;
return r(field as any, value); return r(field as any, value);
} }

View File

@ -62,11 +62,7 @@ export class QueryBuilder<T> implements StoreQuery<T> {
transformRow(this.schema[this.query.from]), transformRow(this.schema[this.query.from]),
); );
}, },
(err) => (err) => new StoreQueryError(err.message),
new StoreQueryError(err.message, {
err,
query: this.query,
}),
); );
} }
@ -91,11 +87,7 @@ export class QueryBuilder<T> implements StoreQuery<T> {
transformRow(this.schema[this.query.from]), transformRow(this.schema[this.query.from]),
); );
}, },
(err) => (err) => new StoreQueryError(err.message),
new StoreQueryError(err.message, {
err,
query: this.query,
}),
); );
} }
} }

View File

@ -1,4 +1,4 @@
import { Run } from "@fabric/core"; import { PosixDate, Run } from "@fabric/core";
import { defineModel, Field, isLike, UUID } from "@fabric/domain"; import { defineModel, Field, isLike, UUID } from "@fabric/domain";
import { UUIDGeneratorMock } from "@fabric/domain/mocks"; import { UUIDGeneratorMock } from "@fabric/domain/mocks";
import { import {
@ -42,6 +42,7 @@ describe("State Store", () => {
name: "test", name: "test",
streamId: newUUID, streamId: newUUID,
streamVersion: 1n, streamVersion: 1n,
deletedAt: null,
}), }),
); );
}); });
@ -55,6 +56,7 @@ describe("State Store", () => {
id: newUUID, id: newUUID,
streamId: newUUID, streamId: newUUID,
streamVersion: 1n, streamVersion: 1n,
deletedAt: null,
}), }),
); );
@ -66,6 +68,7 @@ describe("State Store", () => {
streamId: UUID; streamId: UUID;
streamVersion: bigint; streamVersion: bigint;
name: string; name: string;
deletedAt: PosixDate | null;
}[] }[]
>(); >();
@ -75,6 +78,7 @@ describe("State Store", () => {
streamId: newUUID, streamId: newUUID,
streamVersion: 1n, streamVersion: 1n,
name: "test", name: "test",
deletedAt: null,
}, },
]); ]);
}); });
@ -89,6 +93,7 @@ describe("State Store", () => {
id: newUUID, id: newUUID,
streamId: newUUID, streamId: newUUID,
streamVersion: 1n, streamVersion: 1n,
deletedAt: null,
}), }),
() => () =>
store.insertInto("users", { store.insertInto("users", {
@ -96,6 +101,7 @@ describe("State Store", () => {
id: UUIDGeneratorMock.generate(), id: UUIDGeneratorMock.generate(),
streamId: UUIDGeneratorMock.generate(), streamId: UUIDGeneratorMock.generate(),
streamVersion: 1n, streamVersion: 1n,
deletedAt: null,
}), }),
() => () =>
store.insertInto("users", { store.insertInto("users", {
@ -103,6 +109,7 @@ describe("State Store", () => {
id: UUIDGeneratorMock.generate(), id: UUIDGeneratorMock.generate(),
streamId: UUIDGeneratorMock.generate(), streamId: UUIDGeneratorMock.generate(),
streamVersion: 1n, streamVersion: 1n,
deletedAt: null,
}), }),
); );
@ -121,6 +128,7 @@ describe("State Store", () => {
streamId: UUID; streamId: UUID;
streamVersion: bigint; streamVersion: bigint;
name: string; name: string;
deletedAt: PosixDate | null;
}[] }[]
>(); >();
@ -130,6 +138,7 @@ describe("State Store", () => {
streamId: newUUID, streamId: newUUID,
streamVersion: 1n, streamVersion: 1n,
name: "test", name: "test",
deletedAt: null,
}, },
]); ]);
}); });
@ -143,6 +152,7 @@ describe("State Store", () => {
id: newUUID, id: newUUID,
streamId: newUUID, streamId: newUUID,
streamVersion: 1n, streamVersion: 1n,
deletedAt: null,
}), }),
); );
@ -161,6 +171,7 @@ describe("State Store", () => {
streamId: newUUID, streamId: newUUID,
streamVersion: 1n, streamVersion: 1n,
name: "updated", name: "updated",
deletedAt: null,
}); });
}); });
@ -173,6 +184,7 @@ describe("State Store", () => {
id: newUUID, id: newUUID,
streamId: newUUID, streamId: newUUID,
streamVersion: 1n, streamVersion: 1n,
deletedAt: null,
}), }),
); );
@ -197,6 +209,7 @@ describe("State Store", () => {
name: "test", name: "test",
streamId: ownerUUID, streamId: ownerUUID,
streamVersion: 1n, streamVersion: 1n,
deletedAt: null,
}), }),
); );
@ -207,6 +220,7 @@ describe("State Store", () => {
owner: ownerUUID, owner: ownerUUID,
streamId: newUUID, streamId: newUUID,
streamVersion: 1n, streamVersion: 1n,
deletedAt: null,
}), }),
); );
}); });

View File

@ -52,12 +52,7 @@ export class SQLiteStateStore<TModel extends Model>
recordToSQLParams(model, record), recordToSQLParams(model, record),
); );
}, },
(error) => (error) => new StoreQueryError(error.message),
new StoreQueryError(error.message, {
error,
collectionName: model.name,
record,
}),
); );
} }
@ -87,12 +82,7 @@ export class SQLiteStateStore<TModel extends Model>
params, params,
); );
}, },
(error) => (error) => new StoreQueryError(error.message),
new StoreQueryError(error.message, {
error,
collectionName: model.name,
record,
}),
); );
} }
@ -109,12 +99,7 @@ export class SQLiteStateStore<TModel extends Model>
{ $id: id }, { $id: id },
); );
}, },
(error) => (error) => new StoreQueryError(error.message),
new StoreQueryError(error.message, {
error,
collectionName: model.name,
id,
}),
); );
} }
@ -130,11 +115,7 @@ export class SQLiteStateStore<TModel extends Model>
} }
}); });
}, },
(error) => (error) => new StoreQueryError(error.message),
new StoreQueryError(error.message, {
error,
schema: this.schema,
}),
); );
} }