diff --git a/packages/fabric/sqlite-store/events/event-store.test.ts b/packages/fabric/sqlite-store/events/event-store.test.ts index ec797aa..22662ad 100644 --- a/packages/fabric/sqlite-store/events/event-store.test.ts +++ b/packages/fabric/sqlite-store/events/event-store.test.ts @@ -1,4 +1,4 @@ -import { PosixDate, Run } from "@fabric/core"; +import { PosixDate } from "@fabric/core"; import { Event } from "@fabric/domain"; import { UUIDGeneratorMock } from "@fabric/domain/mocks"; import { @@ -22,11 +22,11 @@ describe("Event Store", () => { beforeEach(async () => { store = new SQLiteEventStore(":memory:"); - await Run.UNSAFE(() => store.migrate()); + await store.migrate().orThrow(); }); afterEach(async () => { - await Run.UNSAFE(() => store.close()); + await store.close().orThrow(); }); test("Should append an event", async () => { @@ -39,9 +39,9 @@ describe("Event Store", () => { payload: { name: "test" }, }; - await Run.UNSAFE(() => store.append(userCreated)); + await store.append(userCreated).orThrow(); - const events = await Run.UNSAFE(() => store.getEventsFromStream(newUUID)); + const events = await store.getEventsFromStream(newUUID).unwrapOrThrow(); expect(events).toHaveLength(1); @@ -69,7 +69,7 @@ describe("Event Store", () => { store.subscribe(["UserCreated"], subscriber); - await Run.UNSAFE(() => store.append(userCreated)); + await store.append(userCreated).orThrow(); expect(subscriber).toHaveBeenCalledTimes(1); expect(subscriber).toHaveBeenCalledWith({ diff --git a/packages/fabric/sqlite-store/events/event-store.ts b/packages/fabric/sqlite-store/events/event-store.ts index 4516cbe..84d511d 100644 --- a/packages/fabric/sqlite-store/events/event-store.ts +++ b/packages/fabric/sqlite-store/events/event-store.ts @@ -89,10 +89,7 @@ export class SQLiteEventStore }), (version) => this.storeEvent(event.streamId, version + 1n, event), (storedEvent) => - AsyncResult.from(async () => { - await this.notifySubscribers(storedEvent); - return storedEvent; - }), + this.notifySubscribers(storedEvent).map(() => storedEvent), ); } diff --git a/packages/fabric/sqlite-store/state/query-builder.ts b/packages/fabric/sqlite-store/state/query-builder.ts index 3a555a6..8fd1ac8 100644 --- a/packages/fabric/sqlite-store/state/query-builder.ts +++ b/packages/fabric/sqlite-store/state/query-builder.ts @@ -5,13 +5,14 @@ import { FilterOptions, ModelSchema, OrderByOptions, - QueryDefinition, SelectableQuery, StoreLimitableQuery, StoreQuery, + StoreQueryDefinition, StoreQueryError, StoreSortableQuery, } from "@fabric/domain"; +import { NotFoundError } from "../../domain/models/store-query/store-query.ts"; import { filterToParams, filterToSQL } from "../sqlite/filter-to-sql.ts"; import { transformRow } from "../sqlite/sql-to-value.ts"; import { SQLiteDatabase } from "../sqlite/sqlite-database.ts"; @@ -20,7 +21,7 @@ export class QueryBuilder implements StoreQuery { constructor( private db: SQLiteDatabase, private schema: ModelSchema, - private query: QueryDefinition, + private query: StoreQueryDefinition, ) {} where(where: FilterOptions): StoreSortableQuery { @@ -93,11 +94,42 @@ export class QueryBuilder implements StoreQuery { (err) => new StoreQueryError(err.message), ); } + + selectOneOrFail(): AsyncResult; + selectOneOrFail>( + keys: K[], + ): AsyncResult, StoreQueryError | NotFoundError>; + selectOneOrFail>( + keys?: K[], + ): AsyncResult { + return AsyncResult.tryFrom( + async () => { + const [stmt, params] = getSelectStatement( + this.schema[this.query.from]!, + { + ...this.query, + keys: keys!, + limit: 1, + }, + ); + const result = await this.db.onePrepared( + stmt, + params, + transformRow(this.schema[this.query.from]!), + ); + if (!result) { + throw new NotFoundError(); + } + return result; + }, + (err) => new StoreQueryError(err.message), + ); + } } export function getSelectStatement( collection: Collection, - query: QueryDefinition, + query: StoreQueryDefinition, ): [string, Record] { const selectFields = query.keys ? query.keys.join(", ") : "*"; diff --git a/packages/fabric/sqlite-store/state/state-store.test.ts b/packages/fabric/sqlite-store/state/state-store.test.ts index 1521a00..3b8a1f7 100644 --- a/packages/fabric/sqlite-store/state/state-store.test.ts +++ b/packages/fabric/sqlite-store/state/state-store.test.ts @@ -26,41 +26,37 @@ describe("State Store", () => { beforeEach(async () => { store = new SQLiteStateStore(":memory:", models); - await Run.UNSAFE(() => store.migrate()); + await store.migrate().orThrow(); }); afterEach(async () => { - await Run.UNSAFE(() => store.close()); + await store.close().orThrow(); }); test("should insert a record", async () => { const newUUID = UUIDGeneratorMock.generate(); - await Run.UNSAFE(() => - store.insertInto("users", { - id: newUUID, - name: "test", - streamId: newUUID, - streamVersion: 1n, - deletedAt: null, - }) - ); + await store.insertInto("users", { + id: newUUID, + name: "test", + streamId: newUUID, + streamVersion: 1n, + deletedAt: null, + }).orThrow(); }); test("should select all records", async () => { const newUUID = UUIDGeneratorMock.generate(); - await Run.UNSAFE(() => - store.insertInto("users", { - name: "test", - id: newUUID, - streamId: newUUID, - streamVersion: 1n, - deletedAt: null, - }) - ); + await store.insertInto("users", { + name: "test", + id: newUUID, + streamId: newUUID, + streamVersion: 1n, + deletedAt: null, + }).orThrow(); - const result = await Run.UNSAFE(() => store.from("users").select()); + const result = await store.from("users").select().unwrapOrThrow(); expectTypeOf(result).toEqualTypeOf< { @@ -86,7 +82,7 @@ describe("State Store", () => { test("should select records with a filter", async () => { const newUUID = UUIDGeneratorMock.generate(); - await Run.seqUNSAFE( + await Run.seqOrThrow( () => store.insertInto("users", { name: "test", @@ -113,14 +109,12 @@ describe("State Store", () => { }), ); - const result = await Run.UNSAFE(() => - store - .from("users") - .where({ - name: isLike("te%"), - }) - .select() - ); + const result = await store + .from("users") + .where({ + name: isLike("te%"), + }) + .select().unwrapOrThrow(); expectTypeOf(result).toEqualTypeOf< { @@ -146,25 +140,20 @@ describe("State Store", () => { test("should update a record", async () => { const newUUID = UUIDGeneratorMock.generate(); - await Run.UNSAFE(() => - store.insertInto("users", { - name: "test", - id: newUUID, - streamId: newUUID, - streamVersion: 1n, - deletedAt: null, - }) - ); + await store.insertInto("users", { + name: "test", + id: newUUID, + streamId: newUUID, + streamVersion: 1n, + deletedAt: null, + }).orThrow(); - await Run.UNSAFE(() => - store.update("users", newUUID, { - name: "updated", - }) - ); + await store.update("users", newUUID, { + name: "updated", + }).orThrow(); - const result = await Run.UNSAFE(() => - store.from("users").where({ id: newUUID }).selectOne() - ); + const result = await store.from("users").where({ id: newUUID }).selectOne() + .unwrapOrThrow(); expect(result).toEqual({ id: newUUID, @@ -178,21 +167,18 @@ describe("State Store", () => { test("should delete a record", async () => { const newUUID = UUIDGeneratorMock.generate(); - await Run.UNSAFE(() => - store.insertInto("users", { - name: "test", - id: newUUID, - streamId: newUUID, - streamVersion: 1n, - deletedAt: null, - }) - ); + await store.insertInto("users", { + name: "test", + id: newUUID, + streamId: newUUID, + streamVersion: 1n, + deletedAt: null, + }).orThrow(); - await Run.UNSAFE(() => store.delete("users", newUUID)); + await store.delete("users", newUUID).orThrow(); - const result = await Run.UNSAFE(() => - store.from("users").where({ id: newUUID }).selectOne() - ); + const result = await store.from("users").where({ id: newUUID }).selectOne() + .unwrapOrThrow(); expect(result).toBeUndefined(); }); @@ -203,25 +189,21 @@ describe("State Store", () => { const newUUID = UUIDGeneratorMock.generate(); const ownerUUID = UUIDGeneratorMock.generate(); - await Run.UNSAFE(() => - store.insertInto("users", { - id: ownerUUID, - name: "test", - streamId: ownerUUID, - streamVersion: 1n, - deletedAt: null, - }) - ); + await store.insertInto("users", { + id: ownerUUID, + name: "test", + streamId: ownerUUID, + streamVersion: 1n, + deletedAt: null, + }).orThrow(); - await Run.UNSAFE(() => - store.insertInto("demo", { - id: newUUID, - value: 1.0, - owner: ownerUUID, - streamId: newUUID, - streamVersion: 1n, - deletedAt: null, - }) - ); + await store.insertInto("demo", { + id: newUUID, + value: 1.0, + owner: ownerUUID, + streamId: newUUID, + streamVersion: 1n, + deletedAt: null, + }).orThrow(); }); });