From d443e9e3955790ec36a8071468b3e1f37448b24d Mon Sep 17 00:00:00 2001 From: Pablo Baleztena Date: Sat, 12 Oct 2024 17:01:25 -0300 Subject: [PATCH] Remove storage-driver interface; Add state-store as driver-specific implementation --- packages/fabric/domain/src/index.ts | 1 - .../fabric/domain/src/models/model-schema.ts | 6 +- .../fabric/domain/src/models/query/index.ts | 1 - .../domain/src/models/query/query-builder.ts | 66 ----- .../fabric/domain/src/models/query/query.ts | 18 +- .../domain/src/models/state-store.spec.ts | 136 --------- .../fabric/domain/src/models/state-store.ts | 43 +-- packages/fabric/domain/src/storage/index.ts | 1 - .../domain/src/storage/storage-driver.ts | 65 ----- packages/fabric/store-sqlite/src/index.ts | 2 +- .../store-sqlite/src/sqlite-driver.spec.ts | 273 ------------------ .../fabric/store-sqlite/src/sqlite-driver.ts | 231 --------------- .../fabric/store-sqlite/src/state/index.ts | 1 + .../store-sqlite/src/state/query-builder.ts | 127 ++++++++ .../src/state/state-store.spec.ts | 213 ++++++++++++++ .../store-sqlite/src/state/state-store.ts | 144 +++++++++ 16 files changed, 508 insertions(+), 820 deletions(-) delete mode 100644 packages/fabric/domain/src/models/query/query-builder.ts delete mode 100644 packages/fabric/domain/src/models/state-store.spec.ts delete mode 100644 packages/fabric/domain/src/storage/index.ts delete mode 100644 packages/fabric/domain/src/storage/storage-driver.ts delete mode 100644 packages/fabric/store-sqlite/src/sqlite-driver.spec.ts delete mode 100644 packages/fabric/store-sqlite/src/sqlite-driver.ts create mode 100644 packages/fabric/store-sqlite/src/state/index.ts create mode 100644 packages/fabric/store-sqlite/src/state/query-builder.ts create mode 100644 packages/fabric/store-sqlite/src/state/state-store.spec.ts create mode 100644 packages/fabric/store-sqlite/src/state/state-store.ts diff --git a/packages/fabric/domain/src/index.ts b/packages/fabric/domain/src/index.ts index af3bf8e..00fedea 100644 --- a/packages/fabric/domain/src/index.ts +++ b/packages/fabric/domain/src/index.ts @@ -4,6 +4,5 @@ export * from "./files/index.js"; export * from "./models/index.js"; export * from "./security/index.js"; export * from "./services/index.js"; -export * from "./storage/index.js"; export * from "./types/index.js"; export * from "./use-case/index.js"; diff --git a/packages/fabric/domain/src/models/model-schema.ts b/packages/fabric/domain/src/models/model-schema.ts index ca5ce05..8150980 100644 --- a/packages/fabric/domain/src/models/model-schema.ts +++ b/packages/fabric/domain/src/models/model-schema.ts @@ -1,7 +1,7 @@ -import { Collection } from "./model.js"; +import { Model } from "./model.js"; -export type ModelSchema = Record; +export type ModelSchema = Record; -export type ModelSchemaFromModels = { +export type ModelSchemaFromModels = { [K in TModels["name"]]: Extract; }; diff --git a/packages/fabric/domain/src/models/query/index.ts b/packages/fabric/domain/src/models/query/index.ts index 1aa861d..ff7581f 100644 --- a/packages/fabric/domain/src/models/query/index.ts +++ b/packages/fabric/domain/src/models/query/index.ts @@ -1,4 +1,3 @@ export * from "./filter-options.js"; export * from "./order-by-options.js"; -export * from "./query-builder.js"; export * from "./query.js"; diff --git a/packages/fabric/domain/src/models/query/query-builder.ts b/packages/fabric/domain/src/models/query/query-builder.ts deleted file mode 100644 index d8aa630..0000000 --- a/packages/fabric/domain/src/models/query/query-builder.ts +++ /dev/null @@ -1,66 +0,0 @@ -/* eslint-disable @typescript-eslint/no-explicit-any */ -import { AsyncResult, Keyof } from "@fabric/core"; -import { StoreQueryError } from "../../errors/query-error.js"; -import { StorageDriver } from "../../storage/storage-driver.js"; -import { ModelSchema } from "../model-schema.js"; -import { FilterOptions } from "./filter-options.js"; -import { OrderByOptions } from "./order-by-options.js"; -import { - QueryDefinition, - SelectableQuery, - StoreLimitableQuery, - StoreQuery, - StoreSortableQuery, -} from "./query.js"; - -export class QueryBuilder implements StoreQuery { - constructor( - private driver: StorageDriver, - private schema: ModelSchema, - private query: QueryDefinition, - ) {} - - where(where: FilterOptions): StoreSortableQuery { - return new QueryBuilder(this.driver, this.schema, { - ...this.query, - where, - }); - } - - orderBy(opts: OrderByOptions): StoreLimitableQuery { - return new QueryBuilder(this.driver, this.schema, { - ...this.query, - orderBy: opts, - }); - } - - limit(limit: number, offset?: number | undefined): SelectableQuery { - return new QueryBuilder(this.driver, this.schema, { - ...this.query, - limit, - offset, - }); - } - - select(): AsyncResult; - select>( - keys: K[], - ): AsyncResult[], StoreQueryError>; - select>(keys?: K[]): AsyncResult { - return this.driver.select(this.schema, { - ...this.query, - keys, - }); - } - - selectOne(): AsyncResult; - selectOne>( - keys: K, - ): AsyncResult, StoreQueryError>; - selectOne>(keys?: K[]): AsyncResult { - return this.driver.selectOne(this.schema, { - ...this.query, - keys, - }); - } -} diff --git a/packages/fabric/domain/src/models/query/query.ts b/packages/fabric/domain/src/models/query/query.ts index febc5d8..8f25a4d 100644 --- a/packages/fabric/domain/src/models/query/query.ts +++ b/packages/fabric/domain/src/models/query/query.ts @@ -1,5 +1,5 @@ /* eslint-disable @typescript-eslint/no-explicit-any */ -import { AsyncResult, Keyof } from "@fabric/core"; +import { AsyncResult, Keyof, Optional } from "@fabric/core"; import { StoreQueryError } from "../../errors/query-error.js"; import { FilterOptions } from "./filter-options.js"; import { OrderByOptions } from "./order-by-options.js"; @@ -14,10 +14,10 @@ export interface StoreQuery { keys: K[], ): AsyncResult[], StoreQueryError>; - selectOne(): AsyncResult; + selectOne(): AsyncResult, StoreQueryError>; selectOne>( keys: K[], - ): AsyncResult, StoreQueryError>; + ): AsyncResult>, StoreQueryError>; } export interface StoreSortableQuery { @@ -29,10 +29,10 @@ export interface StoreSortableQuery { keys: K[], ): AsyncResult[], StoreQueryError>; - selectOne(): AsyncResult; + selectOne(): AsyncResult, StoreQueryError>; selectOne>( keys: K[], - ): AsyncResult, StoreQueryError>; + ): AsyncResult>, StoreQueryError>; } export interface StoreLimitableQuery { @@ -43,10 +43,10 @@ export interface StoreLimitableQuery { keys: K[], ): AsyncResult[], StoreQueryError>; - selectOne(): AsyncResult; + selectOne(): AsyncResult, StoreQueryError>; selectOne>( keys: K[], - ): AsyncResult, StoreQueryError>; + ): AsyncResult>, StoreQueryError>; } export interface SelectableQuery { @@ -55,10 +55,10 @@ export interface SelectableQuery { keys: K[], ): AsyncResult[], StoreQueryError>; - selectOne(): AsyncResult; + selectOne(): AsyncResult, StoreQueryError>; selectOne>( keys: K[], - ): AsyncResult, StoreQueryError>; + ): AsyncResult>, StoreQueryError>; } export interface QueryDefinition { diff --git a/packages/fabric/domain/src/models/state-store.spec.ts b/packages/fabric/domain/src/models/state-store.spec.ts deleted file mode 100644 index f42c899..0000000 --- a/packages/fabric/domain/src/models/state-store.spec.ts +++ /dev/null @@ -1,136 +0,0 @@ -import { isError, Run } from "@fabric/core"; -import { SQLiteStorageDriver } from "@fabric/sqlite-store"; -import { - afterEach, - beforeEach, - describe, - expect, - expectTypeOf, - it, -} from "vitest"; -import { UUIDGeneratorMock } from "../services/uuid-generator.mock.js"; -import { UUID } from "../types/uuid.js"; -import { Field } from "./fields/index.js"; -import { defineModel } from "./model.js"; -import { isLike } from "./query/filter-options.js"; -import { StateStore } from "./state-store.js"; - -describe("State Store", () => { - const models = [ - defineModel("users", { - name: Field.string(), - }), - ]; - - let driver: SQLiteStorageDriver; - let store: StateStore<(typeof models)[number]>; - - beforeEach(async () => { - driver = new SQLiteStorageDriver(":memory:"); - store = new StateStore(driver, models); - const migrationResult = await store.migrate(); - if (isError(migrationResult)) throw migrationResult; - }); - - afterEach(async () => { - await driver.close(); - }); - - it("should insert a record", async () => { - const newUUID = UUIDGeneratorMock.generate(); - const insertResult = await store.insertInto("users", { - name: "test", - id: newUUID, - streamId: newUUID, - streamVersion: 1n, - }); - if (isError(insertResult)) throw insertResult; - }); - - it("should query with a basic select", async () => { - const newUUID = UUIDGeneratorMock.generate(); - const insertResult = await store.insertInto("users", { - name: "test", - id: newUUID, - streamId: newUUID, - streamVersion: 1n, - }); - - if (isError(insertResult)) throw insertResult; - - const result = (await store.from("users").select()).unwrapOrThrow(); - - expectTypeOf(result).toEqualTypeOf< - { - id: UUID; - streamId: UUID; - streamVersion: bigint; - name: string; - }[] - >(); - - expect(result).toEqual([ - { - id: newUUID, - streamId: newUUID, - streamVersion: 1n, - name: "test", - }, - ]); - }); - - it("should query with a where clause", async () => { - const newUUID = UUIDGeneratorMock.generate(); - - await Run.seqUNSAFE( - () => - store.insertInto("users", { - name: "test", - id: newUUID, - streamId: newUUID, - streamVersion: 1n, - }), - () => - store.insertInto("users", { - name: "anotherName", - id: UUIDGeneratorMock.generate(), - streamId: UUIDGeneratorMock.generate(), - streamVersion: 1n, - }), - () => - store.insertInto("users", { - name: "anotherName2", - id: UUIDGeneratorMock.generate(), - streamId: UUIDGeneratorMock.generate(), - streamVersion: 1n, - }), - ); - - const result = await Run.UNSAFE(() => - store - .from("users") - .where({ - name: isLike("te%"), - }) - .select(), - ); - - expectTypeOf(result).toEqualTypeOf< - { - id: UUID; - streamId: UUID; - streamVersion: bigint; - name: string; - }[] - >(); - - expect(result).toEqual([ - { - id: newUUID, - streamId: newUUID, - streamVersion: 1n, - name: "test", - }, - ]); - }); -}); diff --git a/packages/fabric/domain/src/models/state-store.ts b/packages/fabric/domain/src/models/state-store.ts index fd6473c..c94359a 100644 --- a/packages/fabric/domain/src/models/state-store.ts +++ b/packages/fabric/domain/src/models/state-store.ts @@ -1,42 +1,19 @@ import { AsyncResult } from "@fabric/core"; -import { CircularDependencyError } from "../errors/circular-dependency-error.js"; import { StoreQueryError } from "../errors/query-error.js"; -import { StorageDriver } from "../storage/storage-driver.js"; import { ModelSchemaFromModels } from "./model-schema.js"; import { Model, ModelToType } from "./model.js"; -import { QueryBuilder } from "./query/query-builder.js"; import { StoreQuery } from "./query/query.js"; -export class StateStore { - private schema: ModelSchemaFromModels; - constructor( - private driver: StorageDriver, - models: TModel[], - ) { - this.schema = models.reduce((acc, model: TModel) => { - return { - ...acc, - [model.name]: model, - }; - }, {} as ModelSchemaFromModels); - } - - migrate(): AsyncResult { - return this.driver.sync(this.schema); - } - - async insertInto>( - collection: T, - record: ModelToType[T]>, - ): AsyncResult { - return this.driver.insert(this.schema[collection], record); - } - +export interface ReadonlyStateStore { from>( collection: T, - ): StoreQuery[T]>> { - return new QueryBuilder(this.driver, this.schema, { - from: collection, - }) as StoreQuery[T]>>; - } + ): StoreQuery[T]>>; +} + +export interface WritableStateStore + extends ReadonlyStateStore { + insertInto>( + collection: T, + record: ModelToType[T]>, + ): AsyncResult; } diff --git a/packages/fabric/domain/src/storage/index.ts b/packages/fabric/domain/src/storage/index.ts deleted file mode 100644 index 1d74826..0000000 --- a/packages/fabric/domain/src/storage/index.ts +++ /dev/null @@ -1 +0,0 @@ -export * from "./storage-driver.js"; diff --git a/packages/fabric/domain/src/storage/storage-driver.ts b/packages/fabric/domain/src/storage/storage-driver.ts deleted file mode 100644 index 0e414b2..0000000 --- a/packages/fabric/domain/src/storage/storage-driver.ts +++ /dev/null @@ -1,65 +0,0 @@ -/* eslint-disable @typescript-eslint/no-explicit-any */ - -import { AsyncResult, UnexpectedError } from "@fabric/core"; -import { CircularDependencyError } from "../errors/circular-dependency-error.js"; -import { StoreQueryError } from "../errors/query-error.js"; -import { ModelSchema } from "../models/model-schema.js"; -import { Collection } from "../models/model.js"; -import { QueryDefinition } from "../models/query/query.js"; - -export interface StorageDriver { - /** - * Insert data into the store - */ - insert( - model: Collection, - record: Record, - ): AsyncResult; - - /** - * Run a select query against the store. - */ - select( - model: ModelSchema, - query: QueryDefinition, - ): AsyncResult; - - /** - * Run a select query against the store. - */ - selectOne( - model: ModelSchema, - query: QueryDefinition, - ): AsyncResult; - - /** - * Sincronice the store with the schema. - */ - sync( - schema: ModelSchema, - ): AsyncResult; - - /** - * Drop the store. This is a destructive operation. - */ - drop(): AsyncResult; - - /** - * Close the store. - */ - close(): AsyncResult; - - /** - * Update a record in the store. - */ - update( - model: Collection, - id: string, - record: Record, - ): AsyncResult; - - /** - * Delete a record from the store. - */ - delete(model: Collection, id: string): AsyncResult; -} diff --git a/packages/fabric/store-sqlite/src/index.ts b/packages/fabric/store-sqlite/src/index.ts index af380ab..5dae270 100644 --- a/packages/fabric/store-sqlite/src/index.ts +++ b/packages/fabric/store-sqlite/src/index.ts @@ -1 +1 @@ -export * from "./sqlite-driver.js"; +export * from "./state/index.js"; diff --git a/packages/fabric/store-sqlite/src/sqlite-driver.spec.ts b/packages/fabric/store-sqlite/src/sqlite-driver.spec.ts deleted file mode 100644 index 375165d..0000000 --- a/packages/fabric/store-sqlite/src/sqlite-driver.spec.ts +++ /dev/null @@ -1,273 +0,0 @@ -import { Run } from "@fabric/core"; -import { defineModel, Field, isLike } from "@fabric/domain"; -import { afterEach, beforeEach, describe, expect, it } from "vitest"; -import { SQLiteStorageDriver } from "./sqlite-driver.js"; - -describe("SQLite Store Driver", () => { - const schema = { - demo: defineModel("demo", { - value: Field.float(), - owner: Field.reference({ targetModel: "users" }), - }), - users: defineModel("users", { - name: Field.string(), - }), - }; - - let driver: SQLiteStorageDriver; - - beforeEach(() => { - driver = new SQLiteStorageDriver(":memory:"); - }); - - afterEach(async () => { - await Run.UNSAFE(() => driver.close()); - }); - - it("should synchronize the store and insert a record", async () => { - await Run.UNSAFE(() => driver.sync(schema)); - - await Run.UNSAFE(() => - driver.insert(schema.users, { - id: "1", - name: "test", - streamId: "1", - streamVersion: 1n, - }), - ); - - const records = await Run.UNSAFE(() => - driver.select(schema, { from: "users" }), - ); - - expect(records).toEqual([ - { id: "1", name: "test", streamId: "1", streamVersion: 1n }, - ]); - }); - - it("should be update a record", async () => { - await Run.UNSAFE(() => driver.sync(schema)); - - await Run.UNSAFE(() => - driver.insert(schema.users, { - id: "1", - name: "test", - streamId: "1", - streamVersion: 1n, - }), - ); - - await Run.UNSAFE(() => - driver.update(schema.users, "1", { name: "updated" }), - ); - - const records = await Run.UNSAFE(() => - driver.select(schema, { from: "users" }), - ); - - expect(records).toEqual([ - { id: "1", name: "updated", streamId: "1", streamVersion: 1n }, - ]); - }); - - it("should be able to delete a record", async () => { - await Run.UNSAFE(() => driver.sync(schema)); - - await Run.UNSAFE(() => - driver.insert(schema.users, { - id: "1", - name: "test", - streamId: "1", - streamVersion: 1n, - }), - ); - - await Run.UNSAFE(() => driver.delete(schema.users, "1")); - - const records = await Run.UNSAFE(() => - driver.select(schema, { from: "users" }), - ); - - expect(records).toEqual([]); - }); - - it("should be able to select records", async () => { - await Run.UNSAFE(() => driver.sync(schema)); - - await Run.UNSAFE(() => - driver.insert(schema.users, { - id: "1", - name: "test", - streamId: "1", - streamVersion: 1n, - }), - ); - await Run.UNSAFE(() => - driver.insert(schema.users, { - id: "2", - name: "test", - streamId: "2", - streamVersion: 1n, - }), - ); - - const records = await Run.UNSAFE(() => - driver.select(schema, { from: "users" }), - ); - - expect(records).toEqual([ - { id: "1", name: "test", streamId: "1", streamVersion: 1n }, - { id: "2", name: "test", streamId: "2", streamVersion: 1n }, - ]); - }); - - it("should be able to select one record", async () => { - await Run.UNSAFE(() => driver.sync(schema)); - - await Run.UNSAFE(() => - driver.insert(schema.users, { - id: "1", - name: "test", - streamId: "1", - streamVersion: 1n, - }), - ); - await Run.UNSAFE(() => - driver.insert(schema.users, { - id: "2", - name: "test", - streamId: "2", - streamVersion: 1n, - }), - ); - - const record = await Run.UNSAFE(() => - driver.selectOne(schema, { from: "users" }), - ); - - expect(record).toEqual({ - id: "1", - name: "test", - streamId: "1", - streamVersion: 1n, - }); - }); - - it("should select a record with a where clause", async () => { - await Run.UNSAFE(() => driver.sync(schema)); - - await Run.UNSAFE(() => - driver.insert(schema.users, { - id: "1", - name: "test", - streamId: "1", - streamVersion: 1n, - }), - ); - await Run.UNSAFE(() => - driver.insert(schema.users, { - id: "2", - name: "jamón", - streamId: "2", - streamVersion: 1n, - }), - ); - - const result = await Run.UNSAFE(() => - driver.select(schema, { - from: "users", - where: { name: isLike("te%") }, - }), - ); - - expect(result).toEqual([ - { - id: "1", - name: "test", - streamId: "1", - streamVersion: 1n, - }, - ]); - }); - - it("should select a record with a where clause of a specific type", async () => { - await Run.UNSAFE(() => driver.sync(schema)); - - await Run.UNSAFE(() => - driver.insert(schema.users, { - id: "1", - name: "test", - streamId: "1", - streamVersion: 1n, - }), - ); - await Run.UNSAFE(() => - driver.insert(schema.users, { - id: "2", - name: "jamón", - streamId: "2", - streamVersion: 1n, - }), - ); - - const result = await Run.UNSAFE(() => - driver.select(schema, { - from: "users", - where: { streamVersion: 1n }, - }), - ); - - expect(result).toEqual([ - { - id: "1", - name: "test", - streamId: "1", - streamVersion: 1n, - }, - { - id: "2", - name: "jamón", - streamId: "2", - streamVersion: 1n, - }, - ]); - }); - - it("should select with a limit and offset", async () => { - await Run.UNSAFE(() => driver.sync(schema)); - - await Run.UNSAFE(() => - driver.insert(schema.users, { - id: "1", - name: "test", - streamId: "1", - streamVersion: 1n, - }), - ); - await Run.UNSAFE(() => - driver.insert(schema.users, { - id: "2", - name: "jamón", - streamId: "2", - streamVersion: 1n, - }), - ); - - const result = await Run.UNSAFE(() => - driver.select(schema, { - from: "users", - limit: 1, - offset: 1, - }), - ); - - expect(result).toEqual([ - { - id: "2", - name: "jamón", - streamId: "2", - streamVersion: 1n, - }, - ]); - }); -}); diff --git a/packages/fabric/store-sqlite/src/sqlite-driver.ts b/packages/fabric/store-sqlite/src/sqlite-driver.ts deleted file mode 100644 index 2bb4e16..0000000 --- a/packages/fabric/store-sqlite/src/sqlite-driver.ts +++ /dev/null @@ -1,231 +0,0 @@ -/* eslint-disable @typescript-eslint/no-explicit-any */ -import { AsyncResult, UnexpectedError } from "@fabric/core"; -import { unlink } from "fs/promises"; - -import { - CircularDependencyError, - Collection, - Model, - ModelSchema, - QueryDefinition, - StorageDriver, - StoreQueryError, -} from "@fabric/domain"; -import { filterToParams, filterToSQL } from "./sqlite/filter-to-sql.js"; -import { modelToSql } from "./sqlite/model-to-sql.js"; -import { - keyToParam, - recordToSQLKeyParams, - recordToSQLKeys, - recordToSQLParams, - recordToSQLSet, -} from "./sqlite/record-utils.js"; -import { transformRow } from "./sqlite/sql-to-value.js"; -import { SQLiteDatabase } from "./sqlite/sqlite-wrapper.js"; - -export class SQLiteStorageDriver implements StorageDriver { - private db: SQLiteDatabase; - - constructor(private path: string) { - this.db = new SQLiteDatabase(path); - } - - private getSelectStatement( - collection: Collection, - query: QueryDefinition, - ): [string, Record] { - const selectFields = query.keys ? query.keys.join(", ") : "*"; - - const queryFilter = filterToSQL(query.where); - const limit = query.limit ? `LIMIT ${query.limit}` : ""; - const offset = query.offset ? `OFFSET ${query.offset}` : ""; - - const sql = [ - `SELECT ${selectFields}`, - `FROM ${query.from}`, - queryFilter, - limit, - offset, - ].join(" "); - - return [ - sql, - { - ...filterToParams(collection, query.where), - }, - ]; - } - - /** - * Insert data into the store - */ - async insert( - model: Model, - record: Record, - ): AsyncResult { - return AsyncResult.tryFrom( - async () => { - await this.db.runPrepared( - `INSERT INTO ${model.name} (${recordToSQLKeys(record)}) VALUES (${recordToSQLKeyParams(record)})`, - recordToSQLParams(model, record), - ); - }, - (error) => - new StoreQueryError(error.message, { - error, - collectionName: model.name, - record, - }), - ); - } - - /** - * Run a select query against the store. - */ - async select( - schema: ModelSchema, - query: QueryDefinition, - ): AsyncResult { - return AsyncResult.tryFrom( - async () => { - const [sql, params] = this.getSelectStatement( - schema[query.from], - query, - ); - return this.db.allPrepared( - sql, - params, - transformRow(schema[query.from]), - ); - }, - (err) => - new StoreQueryError(err.message, { - err, - query, - }), - ); - } - - /** - * Run a select query against the store. - */ - async selectOne( - schema: ModelSchema, - query: QueryDefinition, - ): AsyncResult { - return AsyncResult.tryFrom( - async () => { - const [stmt, params] = this.getSelectStatement( - schema[query.from], - query, - ); - return await this.db.onePrepared( - stmt, - params, - transformRow(schema[query.from]), - ); - }, - (err) => - new StoreQueryError(err.message, { - err, - query, - }), - ); - } - - /** - * Sincronice the store with the schema. - */ - async sync( - schema: ModelSchema, - ): AsyncResult { - return AsyncResult.tryFrom( - async () => { - await this.db.withTransaction(async () => { - for (const modelKey in schema) { - const model = schema[modelKey]; - await this.db.runPrepared(modelToSql(model)); - } - }); - }, - (error) => - new StoreQueryError(error.message, { - error, - schema, - }), - ); - } - - /** - * Drop the store. This is a destructive operation. - */ - async drop(): AsyncResult { - return AsyncResult.tryFrom( - async () => { - if (this.path === ":memory:") { - throw "Cannot drop in-memory database"; - } else { - await unlink(this.path); - } - }, - (error) => - new StoreQueryError(error.message, { - error, - }), - ); - } - - async close(): AsyncResult { - return AsyncResult.from(async () => { - this.db.close(); - }); - } - - /** - * Update a record in the store. - */ - async update( - model: Model, - id: string, - record: Record, - ): AsyncResult { - return AsyncResult.tryFrom( - async () => { - const params = recordToSQLParams(model, { - ...record, - id, - }); - await this.db.runPrepared( - `UPDATE ${model.name} SET ${recordToSQLSet(record)} WHERE id = ${keyToParam("id")}`, - params, - ); - }, - (error) => - new StoreQueryError(error.message, { - error, - collectionName: model.name, - record, - }), - ); - } - - /** - * Delete a record from the store. - */ - async delete(model: Model, id: string): AsyncResult { - return AsyncResult.tryFrom( - async () => { - await this.db.runPrepared( - `DELETE FROM ${model.name} WHERE id = ${keyToParam("id")}`, - { $id: id }, - ); - }, - (error) => - new StoreQueryError(error.message, { - error, - collectionName: model.name, - id, - }), - ); - } -} diff --git a/packages/fabric/store-sqlite/src/state/index.ts b/packages/fabric/store-sqlite/src/state/index.ts new file mode 100644 index 0000000..381f7cc --- /dev/null +++ b/packages/fabric/store-sqlite/src/state/index.ts @@ -0,0 +1 @@ +export * from "./state-store.js"; diff --git a/packages/fabric/store-sqlite/src/state/query-builder.ts b/packages/fabric/store-sqlite/src/state/query-builder.ts new file mode 100644 index 0000000..60b3438 --- /dev/null +++ b/packages/fabric/store-sqlite/src/state/query-builder.ts @@ -0,0 +1,127 @@ +/* eslint-disable @typescript-eslint/no-explicit-any */ +import { AsyncResult, Keyof, Optional } from "@fabric/core"; +import { + Collection, + FilterOptions, + ModelSchema, + OrderByOptions, + QueryDefinition, + SelectableQuery, + StoreLimitableQuery, + StoreQuery, + StoreQueryError, + StoreSortableQuery, +} from "@fabric/domain"; +import { filterToParams, filterToSQL } from "../sqlite/filter-to-sql.js"; +import { transformRow } from "../sqlite/sql-to-value.js"; +import { SQLiteDatabase } from "../sqlite/sqlite-wrapper.js"; + +export class QueryBuilder implements StoreQuery { + constructor( + private db: SQLiteDatabase, + private schema: ModelSchema, + private query: QueryDefinition, + ) {} + + where(where: FilterOptions): StoreSortableQuery { + return new QueryBuilder(this.db, this.schema, { + ...this.query, + where, + }); + } + + orderBy(opts: OrderByOptions): StoreLimitableQuery { + return new QueryBuilder(this.db, this.schema, { + ...this.query, + orderBy: opts, + }); + } + + limit(limit: number, offset?: number | undefined): SelectableQuery { + return new QueryBuilder(this.db, this.schema, { + ...this.query, + limit, + offset, + }); + } + + select(): AsyncResult; + select>( + keys: K[], + ): AsyncResult[], StoreQueryError>; + select>(keys?: K[]): AsyncResult { + return AsyncResult.tryFrom( + async () => { + const [sql, params] = getSelectStatement(this.schema[this.query.from], { + ...this.query, + keys, + }); + return this.db.allPrepared( + sql, + params, + transformRow(this.schema[this.query.from]), + ); + }, + (err) => + new StoreQueryError(err.message, { + err, + query: this.query, + }), + ); + } + + selectOne(): AsyncResult, StoreQueryError>; + selectOne>( + keys: K, + ): AsyncResult>, StoreQueryError>; + selectOne>(keys?: K[]): AsyncResult { + return AsyncResult.tryFrom( + async () => { + const [stmt, params] = getSelectStatement( + this.schema[this.query.from], + { + ...this.query, + keys, + limit: 1, + }, + ); + return await this.db.onePrepared( + stmt, + params, + transformRow(this.schema[this.query.from]), + ); + }, + (err) => + new StoreQueryError(err.message, { + err, + query: this.query, + }), + ); + } +} + +export function getSelectStatement( + collection: Collection, + query: QueryDefinition, +): [string, Record] { + const selectFields = query.keys ? query.keys.join(", ") : "*"; + + const queryFilter = filterToSQL(query.where); + const limit = query.limit ? `LIMIT ${query.limit}` : ""; + const offset = query.offset ? `OFFSET ${query.offset}` : ""; + + const sql = [ + `SELECT ${selectFields}`, + `FROM ${query.from}`, + queryFilter, + limit, + offset, + ].join(" "); + + return [ + sql, + { + ...filterToParams(collection, query.where), + }, + ]; +} diff --git a/packages/fabric/store-sqlite/src/state/state-store.spec.ts b/packages/fabric/store-sqlite/src/state/state-store.spec.ts new file mode 100644 index 0000000..1818958 --- /dev/null +++ b/packages/fabric/store-sqlite/src/state/state-store.spec.ts @@ -0,0 +1,213 @@ +import { Run } from "@fabric/core"; +import { defineModel, Field, isLike, UUID } from "@fabric/domain"; +import { UUIDGeneratorMock } from "@fabric/domain/mocks"; +import { + afterEach, + beforeEach, + describe, + expect, + expectTypeOf, + it, +} from "vitest"; +import { SQLiteStateStore } from "./state-store.js"; + +describe("State Store", () => { + const models = [ + defineModel("demo", { + value: Field.float(), + owner: Field.reference({ targetModel: "users" }), + }), + defineModel("users", { + name: Field.string(), + }), + ]; + + let store: SQLiteStateStore<(typeof models)[number]>; + + beforeEach(async () => { + store = new SQLiteStateStore(":memory:", models); + await Run.UNSAFE(() => store.migrate()); + }); + + afterEach(async () => { + await Run.UNSAFE(() => store.close()); + }); + + it("should insert a record", async () => { + const newUUID = UUIDGeneratorMock.generate(); + + await Run.UNSAFE(() => + store.insertInto("users", { + id: newUUID, + name: "test", + streamId: newUUID, + streamVersion: 1n, + }), + ); + }); + + it("should select all records", async () => { + const newUUID = UUIDGeneratorMock.generate(); + + await Run.UNSAFE(() => + store.insertInto("users", { + name: "test", + id: newUUID, + streamId: newUUID, + streamVersion: 1n, + }), + ); + + const result = await Run.UNSAFE(() => store.from("users").select()); + + expectTypeOf(result).toEqualTypeOf< + { + id: UUID; + streamId: UUID; + streamVersion: bigint; + name: string; + }[] + >(); + + expect(result).toEqual([ + { + id: newUUID, + streamId: newUUID, + streamVersion: 1n, + name: "test", + }, + ]); + }); + + it("should select records with a filter", async () => { + const newUUID = UUIDGeneratorMock.generate(); + + await Run.seqUNSAFE( + () => + store.insertInto("users", { + name: "test", + id: newUUID, + streamId: newUUID, + streamVersion: 1n, + }), + () => + store.insertInto("users", { + name: "anotherName", + id: UUIDGeneratorMock.generate(), + streamId: UUIDGeneratorMock.generate(), + streamVersion: 1n, + }), + () => + store.insertInto("users", { + name: "anotherName2", + id: UUIDGeneratorMock.generate(), + streamId: UUIDGeneratorMock.generate(), + streamVersion: 1n, + }), + ); + + const result = await Run.UNSAFE(() => + store + .from("users") + .where({ + name: isLike("te%"), + }) + .select(), + ); + + expectTypeOf(result).toEqualTypeOf< + { + id: UUID; + streamId: UUID; + streamVersion: bigint; + name: string; + }[] + >(); + + expect(result).toEqual([ + { + id: newUUID, + streamId: newUUID, + streamVersion: 1n, + name: "test", + }, + ]); + }); + + it("should update a record", async () => { + const newUUID = UUIDGeneratorMock.generate(); + + await Run.UNSAFE(() => + store.insertInto("users", { + name: "test", + id: newUUID, + streamId: newUUID, + streamVersion: 1n, + }), + ); + + await Run.UNSAFE(() => + store.update("users", newUUID, { + name: "updated", + }), + ); + + const result = await Run.UNSAFE(() => + store.from("users").where({ id: newUUID }).selectOne(), + ); + + expect(result).toEqual({ + id: newUUID, + streamId: newUUID, + streamVersion: 1n, + name: "updated", + }); + }); + + it("should delete a record", async () => { + const newUUID = UUIDGeneratorMock.generate(); + + await Run.UNSAFE(() => + store.insertInto("users", { + name: "test", + id: newUUID, + streamId: newUUID, + streamVersion: 1n, + }), + ); + + await Run.UNSAFE(() => store.delete("users", newUUID)); + + const result = await Run.UNSAFE(() => + store.from("users").where({ id: newUUID }).selectOne(), + ); + + expect(result).toBeUndefined(); + }); + + //test for inserting into a collection with a reference + + it("should insert a record with a reference", async () => { + const newUUID = UUIDGeneratorMock.generate(); + const ownerUUID = UUIDGeneratorMock.generate(); + + await Run.UNSAFE(() => + store.insertInto("users", { + id: ownerUUID, + name: "test", + streamId: ownerUUID, + streamVersion: 1n, + }), + ); + + await Run.UNSAFE(() => + store.insertInto("demo", { + id: newUUID, + value: 1.0, + owner: ownerUUID, + streamId: newUUID, + streamVersion: 1n, + }), + ); + }); +}); diff --git a/packages/fabric/store-sqlite/src/state/state-store.ts b/packages/fabric/store-sqlite/src/state/state-store.ts new file mode 100644 index 0000000..e178e7f --- /dev/null +++ b/packages/fabric/store-sqlite/src/state/state-store.ts @@ -0,0 +1,144 @@ +import { AsyncResult, UnexpectedError } from "@fabric/core"; +import { + Model, + ModelSchemaFromModels, + ModelToType, + StoreQuery, + StoreQueryError, + UUID, + WritableStateStore, +} from "@fabric/domain"; +import { modelToSql } from "../sqlite/model-to-sql.js"; +import { + keyToParam, + recordToSQLKeyParams, + recordToSQLKeys, + recordToSQLParams, + recordToSQLSet, +} from "../sqlite/record-utils.js"; +import { SQLiteDatabase } from "../sqlite/sqlite-wrapper.js"; +import { QueryBuilder } from "./query-builder.js"; + +export class SQLiteStateStore + implements WritableStateStore +{ + private schema: ModelSchemaFromModels; + private db: SQLiteDatabase; + + constructor( + private dbPath: string, + models: TModel[], + ) { + this.schema = models.reduce((acc, model: TModel) => { + return { + ...acc, + [model.name]: model, + }; + }, {} as ModelSchemaFromModels); + + this.db = new SQLiteDatabase(dbPath); + } + + async insertInto>( + collection: T, + record: ModelToType[T]>, + ): AsyncResult { + const model = this.schema[collection]; + + return AsyncResult.tryFrom( + async () => { + await this.db.runPrepared( + `INSERT INTO ${model.name} (${recordToSQLKeys(record)}) VALUES (${recordToSQLKeyParams(record)})`, + recordToSQLParams(model, record), + ); + }, + (error) => + new StoreQueryError(error.message, { + error, + collectionName: model.name, + record, + }), + ); + } + + from>( + collection: T, + ): StoreQuery[T]>> { + return new QueryBuilder(this.db, this.schema, { + from: collection, + }) as StoreQuery[T]>>; + } + + update>( + collection: T, + id: UUID, + record: Partial[T]>>, + ): AsyncResult { + const model = this.schema[collection]; + + return AsyncResult.tryFrom( + async () => { + const params = recordToSQLParams(model, { + ...record, + id, + }); + await this.db.runPrepared( + `UPDATE ${model.name} SET ${recordToSQLSet(record)} WHERE id = ${keyToParam("id")}`, + params, + ); + }, + (error) => + new StoreQueryError(error.message, { + error, + collectionName: model.name, + record, + }), + ); + } + + delete>( + collection: T, + id: UUID, + ): AsyncResult { + const model = this.schema[collection]; + + return AsyncResult.tryFrom( + async () => { + await this.db.runPrepared( + `DELETE FROM ${model.name} WHERE id = ${keyToParam("id")}`, + { $id: id }, + ); + }, + (error) => + new StoreQueryError(error.message, { + error, + collectionName: model.name, + id, + }), + ); + } + + migrate(): AsyncResult { + return AsyncResult.tryFrom( + async () => { + await this.db.init(); + await this.db.withTransaction(async () => { + for (const modelKey in this.schema) { + const model = + this.schema[modelKey as keyof ModelSchemaFromModels]; + await this.db.runPrepared(modelToSql(model)); + } + }); + }, + (error) => + new StoreQueryError(error.message, { + error, + schema: this.schema, + }), + ); + } + + async close(): AsyncResult { + return AsyncResult.from(() => this.db.close()); + } +}