diff --git a/packages/fabric/sqlite-store/src/events/event-store.spec.ts b/packages/fabric/sqlite-store/src/events/event-store.spec.ts new file mode 100644 index 0000000..4ca0888 --- /dev/null +++ b/packages/fabric/sqlite-store/src/events/event-store.spec.ts @@ -0,0 +1,77 @@ +import { PosixDate, Run } from "@fabric/core"; +import { Event } from "@fabric/domain"; +import { UUIDGeneratorMock } from "@fabric/domain/mocks"; +import { afterEach, beforeEach, describe, expect, it, vitest } from "vitest"; +import { SQLiteEventStore } from "./event-store.js"; + +describe("Event Store", () => { + type UserCreated = Event<"UserCreated", { name: string }>; + type UserUpdated = Event<"UserUpdated", { name: string }>; + type UserDeleted = Event<"UserDeleted", void>; + + type UserEvents = UserCreated | UserUpdated | UserDeleted; + + let store: SQLiteEventStore; + + beforeEach(async () => { + store = new SQLiteEventStore(":memory:"); + await Run.UNSAFE(() => store.migrate()); + }); + + afterEach(async () => { + await Run.UNSAFE(() => store.close()); + }); + + it("Should append an event", async () => { + const newUUID = UUIDGeneratorMock.generate(); + + const userCreated: UserCreated = { + type: "UserCreated", + id: newUUID, + streamId: newUUID, + payload: { name: "test" }, + }; + + await Run.UNSAFE(() => store.append(userCreated)); + + const events = await Run.UNSAFE(() => store.getEventsFromStream(newUUID)); + + expect(events).toHaveLength(1); + + expect(events[0]).toEqual({ + id: newUUID, + streamId: newUUID, + type: "UserCreated", + version: BigInt(1), + timestamp: expect.any(PosixDate), + payload: { name: "test" }, + }); + }); + + it("should notify subscribers on append", async () => { + const newUUID = UUIDGeneratorMock.generate(); + + const userCreated: UserCreated = { + type: "UserCreated", + id: newUUID, + streamId: newUUID, + payload: { name: "test" }, + }; + + const subscriber = vitest.fn(); + + store.subscribe(["UserCreated"], subscriber); + + await Run.UNSAFE(() => store.append(userCreated)); + + expect(subscriber).toHaveBeenCalledTimes(1); + expect(subscriber).toHaveBeenCalledWith({ + id: newUUID, + streamId: newUUID, + type: "UserCreated", + version: BigInt(1), + timestamp: expect.any(PosixDate), + payload: { name: "test" }, + }); + }); +}); diff --git a/packages/fabric/sqlite-store/src/events/event-store.ts b/packages/fabric/sqlite-store/src/events/event-store.ts new file mode 100644 index 0000000..8700f30 --- /dev/null +++ b/packages/fabric/sqlite-store/src/events/event-store.ts @@ -0,0 +1,176 @@ +import { AsyncResult, MaybePromise, PosixDate, Run } from "@fabric/core"; +import { + Event, + EventFromKey, + EventStore, + EventSubscriber, + JSONUtils, + StoredEvent, + StoreQueryError, + UUID, +} from "@fabric/domain"; +import { SQLiteDatabase } from "../sqlite/sqlite-database.js"; + +export class SQLiteEventStore + implements EventStore +{ + private db: SQLiteDatabase; + + private streamVersions = new Map(); + + private eventSubscribers = new Map< + TEvents["type"], + EventSubscriber[] + >(); + + constructor(private readonly dbPath: string) { + this.db = new SQLiteDatabase(dbPath); + } + + async migrate(): AsyncResult { + return AsyncResult.tryFrom( + async () => { + await this.db.init(); + await this.db.run( + `CREATE TABLE IF NOT EXISTS events ( + id TEXT PRIMARY KEY, + type TEXT NOT NULL, + streamId TEXT NOT NULL, + version INTEGER NOT NULL, + timestamp NUMERIC NOT NULL, + payload TEXT NOT NULL, + UNIQUE(streamId, version) + )`, + ); + }, + (error) => new StoreQueryError(error.message, { error }), + ); + } + + async getEventsFromStream( + streamId: UUID, + ): AsyncResult[], StoreQueryError> { + return AsyncResult.tryFrom( + async () => { + const events = await this.db.allPrepared( + `SELECT * FROM events WHERE streamId = $id`, + { + $id: streamId, + }, + (event) => ({ + id: event.id, + streamId: event.streamId, + type: event.type, + version: BigInt(event.version), + timestamp: new PosixDate(event.timestamp), + payload: JSONUtils.parse(event.payload), + }), + ); + return events; + }, + (error) => new StoreQueryError(error.message, { error }), + ); + } + + async append( + event: T, + ): AsyncResult, StoreQueryError> { + return Run.seq( + () => this.getLastVersion(event.streamId), + (version) => + AsyncResult.from(() => { + this.streamVersions.set(event.streamId, version + 1n); + return version; + }), + (version) => this.storeEvent(event.streamId, version + 1n, event), + (storedEvent) => + AsyncResult.from(async () => { + await this.notifySubscribers(storedEvent); + return storedEvent; + }), + ); + } + + private async notifySubscribers( + event: StoredEvent, + ): AsyncResult { + return AsyncResult.from(async () => { + const subscribers = this.eventSubscribers.get(event.type) || []; + await Promise.all(subscribers.map((subscriber) => subscriber(event))); + }); + } + + private async getLastVersion( + streamId: UUID, + ): AsyncResult { + return AsyncResult.tryFrom( + async () => { + const { lastVersion } = await this.db.onePrepared( + `SELECT max(version) as lastVersion FROM events WHERE streamId = $id`, + { + $id: streamId, + }, + ); + + return !lastVersion ? 0n : BigInt(lastVersion); + }, + (error) => + new StoreQueryError(`Error getting last version:${error.message}`, { + error, + }), + ); + } + + subscribe( + events: TEventKey[], + subscriber: ( + event: StoredEvent>, + ) => MaybePromise, + ): void { + events.forEach((event) => { + const subscribers = this.eventSubscribers.get(event) || []; + const newSubscribers = [ + ...subscribers, + subscriber, + ] as EventSubscriber[]; + this.eventSubscribers.set(event, newSubscribers); + }); + } + + close(): AsyncResult { + return AsyncResult.tryFrom( + () => this.db.close(), + (error) => new StoreQueryError(error.message, { error }), + ); + } + + private storeEvent( + streamId: UUID, + version: bigint, + event: T, + ): AsyncResult, StoreQueryError> { + return AsyncResult.tryFrom( + async () => { + const storedEvent: StoredEvent = { + ...event, + version: version, + timestamp: new PosixDate(), + }; + await this.db.runPrepared( + `INSERT INTO events (id, streamId, type, version, timestamp, payload) + VALUES ($id, $streamId, $type, $version, $timestamp, $payload)`, + { + $id: storedEvent.id, + $streamId: streamId, + $type: storedEvent.type, + $version: storedEvent.version.toString(), + $timestamp: storedEvent.timestamp.timestamp, + $payload: JSON.stringify(storedEvent.payload), + }, + ); + return storedEvent; + }, + (error) => new StoreQueryError("Error appending event", { error }), + ); + } +} diff --git a/packages/fabric/sqlite-store/src/events/index.ts b/packages/fabric/sqlite-store/src/events/index.ts new file mode 100644 index 0000000..84a156c --- /dev/null +++ b/packages/fabric/sqlite-store/src/events/index.ts @@ -0,0 +1 @@ +export * from "./event-store.js"; diff --git a/packages/fabric/sqlite-store/src/sqlite/sqlite-wrapper.ts b/packages/fabric/sqlite-store/src/sqlite/sqlite-database.ts similarity index 90% rename from packages/fabric/sqlite-store/src/sqlite/sqlite-wrapper.ts rename to packages/fabric/sqlite-store/src/sqlite/sqlite-database.ts index 7424a0f..7daecb7 100644 --- a/packages/fabric/sqlite-store/src/sqlite/sqlite-wrapper.ts +++ b/packages/fabric/sqlite-store/src/sqlite/sqlite-database.ts @@ -79,7 +79,11 @@ export class SQLiteDatabase { if (err) { reject(err); } else { - resolve(transformer ? rows.map(transformer) : rows); + try { + resolve(transformer ? rows.map(transformer) : rows); + } catch (e) { + reject(e); + } } }, ); @@ -100,7 +104,11 @@ export class SQLiteDatabase { if (err) { reject(err); } else { - resolve(transformer ? rows.map(transformer)[0] : rows[0]); + try { + resolve(transformer ? rows.map(transformer)[0] : rows[0]); + } catch (e) { + reject(e); + } } }, ); diff --git a/packages/fabric/sqlite-store/src/state/query-builder.ts b/packages/fabric/sqlite-store/src/state/query-builder.ts index 60b3438..ef29134 100644 --- a/packages/fabric/sqlite-store/src/state/query-builder.ts +++ b/packages/fabric/sqlite-store/src/state/query-builder.ts @@ -14,7 +14,7 @@ import { } from "@fabric/domain"; import { filterToParams, filterToSQL } from "../sqlite/filter-to-sql.js"; import { transformRow } from "../sqlite/sql-to-value.js"; -import { SQLiteDatabase } from "../sqlite/sqlite-wrapper.js"; +import { SQLiteDatabase } from "../sqlite/sqlite-database.js"; export class QueryBuilder implements StoreQuery { constructor( diff --git a/packages/fabric/sqlite-store/src/state/state-store.ts b/packages/fabric/sqlite-store/src/state/state-store.ts index e178e7f..2c097c1 100644 --- a/packages/fabric/sqlite-store/src/state/state-store.ts +++ b/packages/fabric/sqlite-store/src/state/state-store.ts @@ -16,7 +16,7 @@ import { recordToSQLParams, recordToSQLSet, } from "../sqlite/record-utils.js"; -import { SQLiteDatabase } from "../sqlite/sqlite-wrapper.js"; +import { SQLiteDatabase } from "../sqlite/sqlite-database.js"; import { QueryBuilder } from "./query-builder.js"; export class SQLiteStateStore @@ -26,7 +26,7 @@ export class SQLiteStateStore private db: SQLiteDatabase; constructor( - private dbPath: string, + private readonly dbPath: string, models: TModel[], ) { this.schema = models.reduce((acc, model: TModel) => {