[fabric/sqlite-store] Refactor event store and query builder to use the new AsyncResult

This commit is contained in:
Pablo Baleztena 2024-10-16 16:22:02 -03:00
parent 307a82d83c
commit 67921efac7
4 changed files with 103 additions and 92 deletions

View File

@ -1,4 +1,4 @@
import { PosixDate, Run } from "@fabric/core"; import { PosixDate } from "@fabric/core";
import { Event } from "@fabric/domain"; import { Event } from "@fabric/domain";
import { UUIDGeneratorMock } from "@fabric/domain/mocks"; import { UUIDGeneratorMock } from "@fabric/domain/mocks";
import { import {
@ -22,11 +22,11 @@ describe("Event Store", () => {
beforeEach(async () => { beforeEach(async () => {
store = new SQLiteEventStore(":memory:"); store = new SQLiteEventStore(":memory:");
await Run.UNSAFE(() => store.migrate()); await store.migrate().orThrow();
}); });
afterEach(async () => { afterEach(async () => {
await Run.UNSAFE(() => store.close()); await store.close().orThrow();
}); });
test("Should append an event", async () => { test("Should append an event", async () => {
@ -39,9 +39,9 @@ describe("Event Store", () => {
payload: { name: "test" }, payload: { name: "test" },
}; };
await Run.UNSAFE(() => store.append(userCreated)); await store.append(userCreated).orThrow();
const events = await Run.UNSAFE(() => store.getEventsFromStream(newUUID)); const events = await store.getEventsFromStream(newUUID).unwrapOrThrow();
expect(events).toHaveLength(1); expect(events).toHaveLength(1);
@ -69,7 +69,7 @@ describe("Event Store", () => {
store.subscribe(["UserCreated"], subscriber); store.subscribe(["UserCreated"], subscriber);
await Run.UNSAFE(() => store.append(userCreated)); await store.append(userCreated).orThrow();
expect(subscriber).toHaveBeenCalledTimes(1); expect(subscriber).toHaveBeenCalledTimes(1);
expect(subscriber).toHaveBeenCalledWith({ expect(subscriber).toHaveBeenCalledWith({

View File

@ -89,10 +89,7 @@ export class SQLiteEventStore<TEvents extends Event>
}), }),
(version) => this.storeEvent(event.streamId, version + 1n, event), (version) => this.storeEvent(event.streamId, version + 1n, event),
(storedEvent) => (storedEvent) =>
AsyncResult.from(async () => { this.notifySubscribers(storedEvent).map(() => storedEvent),
await this.notifySubscribers(storedEvent);
return storedEvent;
}),
); );
} }

View File

@ -5,13 +5,14 @@ import {
FilterOptions, FilterOptions,
ModelSchema, ModelSchema,
OrderByOptions, OrderByOptions,
QueryDefinition,
SelectableQuery, SelectableQuery,
StoreLimitableQuery, StoreLimitableQuery,
StoreQuery, StoreQuery,
StoreQueryDefinition,
StoreQueryError, StoreQueryError,
StoreSortableQuery, StoreSortableQuery,
} from "@fabric/domain"; } from "@fabric/domain";
import { NotFoundError } from "../../domain/models/store-query/store-query.ts";
import { filterToParams, filterToSQL } from "../sqlite/filter-to-sql.ts"; import { filterToParams, filterToSQL } from "../sqlite/filter-to-sql.ts";
import { transformRow } from "../sqlite/sql-to-value.ts"; import { transformRow } from "../sqlite/sql-to-value.ts";
import { SQLiteDatabase } from "../sqlite/sqlite-database.ts"; import { SQLiteDatabase } from "../sqlite/sqlite-database.ts";
@ -20,7 +21,7 @@ export class QueryBuilder<T> implements StoreQuery<T> {
constructor( constructor(
private db: SQLiteDatabase, private db: SQLiteDatabase,
private schema: ModelSchema, private schema: ModelSchema,
private query: QueryDefinition, private query: StoreQueryDefinition,
) {} ) {}
where(where: FilterOptions<T>): StoreSortableQuery<T> { where(where: FilterOptions<T>): StoreSortableQuery<T> {
@ -93,11 +94,42 @@ export class QueryBuilder<T> implements StoreQuery<T> {
(err) => new StoreQueryError(err.message), (err) => new StoreQueryError(err.message),
); );
} }
selectOneOrFail(): AsyncResult<T, StoreQueryError | NotFoundError>;
selectOneOrFail<K extends Extract<keyof T, string>>(
keys: K[],
): AsyncResult<Pick<T, K>, StoreQueryError | NotFoundError>;
selectOneOrFail<K extends Extract<keyof T, string>>(
keys?: K[],
): AsyncResult<any, StoreQueryError | NotFoundError> {
return AsyncResult.tryFrom(
async () => {
const [stmt, params] = getSelectStatement(
this.schema[this.query.from]!,
{
...this.query,
keys: keys!,
limit: 1,
},
);
const result = await this.db.onePrepared(
stmt,
params,
transformRow(this.schema[this.query.from]!),
);
if (!result) {
throw new NotFoundError();
}
return result;
},
(err) => new StoreQueryError(err.message),
);
}
} }
export function getSelectStatement( export function getSelectStatement(
collection: Collection, collection: Collection,
query: QueryDefinition, query: StoreQueryDefinition,
): [string, Record<string, any>] { ): [string, Record<string, any>] {
const selectFields = query.keys ? query.keys.join(", ") : "*"; const selectFields = query.keys ? query.keys.join(", ") : "*";

View File

@ -26,41 +26,37 @@ describe("State Store", () => {
beforeEach(async () => { beforeEach(async () => {
store = new SQLiteStateStore(":memory:", models); store = new SQLiteStateStore(":memory:", models);
await Run.UNSAFE(() => store.migrate()); await store.migrate().orThrow();
}); });
afterEach(async () => { afterEach(async () => {
await Run.UNSAFE(() => store.close()); await store.close().orThrow();
}); });
test("should insert a record", async () => { test("should insert a record", async () => {
const newUUID = UUIDGeneratorMock.generate(); const newUUID = UUIDGeneratorMock.generate();
await Run.UNSAFE(() => await store.insertInto("users", {
store.insertInto("users", { id: newUUID,
id: newUUID, name: "test",
name: "test", streamId: newUUID,
streamId: newUUID, streamVersion: 1n,
streamVersion: 1n, deletedAt: null,
deletedAt: null, }).orThrow();
})
);
}); });
test("should select all records", async () => { test("should select all records", async () => {
const newUUID = UUIDGeneratorMock.generate(); const newUUID = UUIDGeneratorMock.generate();
await Run.UNSAFE(() => await store.insertInto("users", {
store.insertInto("users", { name: "test",
name: "test", id: newUUID,
id: newUUID, streamId: newUUID,
streamId: newUUID, streamVersion: 1n,
streamVersion: 1n, deletedAt: null,
deletedAt: null, }).orThrow();
})
);
const result = await Run.UNSAFE(() => store.from("users").select()); const result = await store.from("users").select().unwrapOrThrow();
expectTypeOf(result).toEqualTypeOf< expectTypeOf(result).toEqualTypeOf<
{ {
@ -86,7 +82,7 @@ describe("State Store", () => {
test("should select records with a filter", async () => { test("should select records with a filter", async () => {
const newUUID = UUIDGeneratorMock.generate(); const newUUID = UUIDGeneratorMock.generate();
await Run.seqUNSAFE( await Run.seqOrThrow(
() => () =>
store.insertInto("users", { store.insertInto("users", {
name: "test", name: "test",
@ -113,14 +109,12 @@ describe("State Store", () => {
}), }),
); );
const result = await Run.UNSAFE(() => const result = await store
store .from("users")
.from("users") .where({
.where({ name: isLike("te%"),
name: isLike("te%"), })
}) .select().unwrapOrThrow();
.select()
);
expectTypeOf(result).toEqualTypeOf< expectTypeOf(result).toEqualTypeOf<
{ {
@ -146,25 +140,20 @@ describe("State Store", () => {
test("should update a record", async () => { test("should update a record", async () => {
const newUUID = UUIDGeneratorMock.generate(); const newUUID = UUIDGeneratorMock.generate();
await Run.UNSAFE(() => await store.insertInto("users", {
store.insertInto("users", { name: "test",
name: "test", id: newUUID,
id: newUUID, streamId: newUUID,
streamId: newUUID, streamVersion: 1n,
streamVersion: 1n, deletedAt: null,
deletedAt: null, }).orThrow();
})
);
await Run.UNSAFE(() => await store.update("users", newUUID, {
store.update("users", newUUID, { name: "updated",
name: "updated", }).orThrow();
})
);
const result = await Run.UNSAFE(() => const result = await store.from("users").where({ id: newUUID }).selectOne()
store.from("users").where({ id: newUUID }).selectOne() .unwrapOrThrow();
);
expect(result).toEqual({ expect(result).toEqual({
id: newUUID, id: newUUID,
@ -178,21 +167,18 @@ describe("State Store", () => {
test("should delete a record", async () => { test("should delete a record", async () => {
const newUUID = UUIDGeneratorMock.generate(); const newUUID = UUIDGeneratorMock.generate();
await Run.UNSAFE(() => await store.insertInto("users", {
store.insertInto("users", { name: "test",
name: "test", id: newUUID,
id: newUUID, streamId: newUUID,
streamId: newUUID, streamVersion: 1n,
streamVersion: 1n, deletedAt: null,
deletedAt: null, }).orThrow();
})
);
await Run.UNSAFE(() => store.delete("users", newUUID)); await store.delete("users", newUUID).orThrow();
const result = await Run.UNSAFE(() => const result = await store.from("users").where({ id: newUUID }).selectOne()
store.from("users").where({ id: newUUID }).selectOne() .unwrapOrThrow();
);
expect(result).toBeUndefined(); expect(result).toBeUndefined();
}); });
@ -203,25 +189,21 @@ describe("State Store", () => {
const newUUID = UUIDGeneratorMock.generate(); const newUUID = UUIDGeneratorMock.generate();
const ownerUUID = UUIDGeneratorMock.generate(); const ownerUUID = UUIDGeneratorMock.generate();
await Run.UNSAFE(() => await store.insertInto("users", {
store.insertInto("users", { id: ownerUUID,
id: ownerUUID, name: "test",
name: "test", streamId: ownerUUID,
streamId: ownerUUID, streamVersion: 1n,
streamVersion: 1n, deletedAt: null,
deletedAt: null, }).orThrow();
})
);
await Run.UNSAFE(() => await store.insertInto("demo", {
store.insertInto("demo", { id: newUUID,
id: newUUID, value: 1.0,
value: 1.0, owner: ownerUUID,
owner: ownerUUID, streamId: newUUID,
streamId: newUUID, streamVersion: 1n,
streamVersion: 1n, deletedAt: null,
deletedAt: null, }).orThrow();
})
);
}); });
}); });