Remove storage-driver interface; Add state-store as driver-specific implementation

This commit is contained in:
Pablo Baleztena 2024-10-12 17:01:25 -03:00
parent 010e3eecfc
commit d443e9e395
16 changed files with 508 additions and 820 deletions

View File

@ -4,6 +4,5 @@ export * from "./files/index.js";
export * from "./models/index.js";
export * from "./security/index.js";
export * from "./services/index.js";
export * from "./storage/index.js";
export * from "./types/index.js";
export * from "./use-case/index.js";

View File

@ -1,7 +1,7 @@
import { Collection } from "./model.js";
import { Model } from "./model.js";
export type ModelSchema = Record<string, Collection>;
export type ModelSchema = Record<string, Model>;
export type ModelSchemaFromModels<TModels extends Collection> = {
export type ModelSchemaFromModels<TModels extends Model> = {
[K in TModels["name"]]: Extract<TModels, { name: K }>;
};

View File

@ -1,4 +1,3 @@
export * from "./filter-options.js";
export * from "./order-by-options.js";
export * from "./query-builder.js";
export * from "./query.js";

View File

@ -1,66 +0,0 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
import { AsyncResult, Keyof } from "@fabric/core";
import { StoreQueryError } from "../../errors/query-error.js";
import { StorageDriver } from "../../storage/storage-driver.js";
import { ModelSchema } from "../model-schema.js";
import { FilterOptions } from "./filter-options.js";
import { OrderByOptions } from "./order-by-options.js";
import {
QueryDefinition,
SelectableQuery,
StoreLimitableQuery,
StoreQuery,
StoreSortableQuery,
} from "./query.js";
export class QueryBuilder<T> implements StoreQuery<T> {
constructor(
private driver: StorageDriver,
private schema: ModelSchema,
private query: QueryDefinition,
) {}
where(where: FilterOptions<T>): StoreSortableQuery<T> {
return new QueryBuilder(this.driver, this.schema, {
...this.query,
where,
});
}
orderBy(opts: OrderByOptions<T>): StoreLimitableQuery<T> {
return new QueryBuilder(this.driver, this.schema, {
...this.query,
orderBy: opts,
});
}
limit(limit: number, offset?: number | undefined): SelectableQuery<T> {
return new QueryBuilder(this.driver, this.schema, {
...this.query,
limit,
offset,
});
}
select(): AsyncResult<T[], StoreQueryError>;
select<K extends Keyof<T>>(
keys: K[],
): AsyncResult<Pick<T, K>[], StoreQueryError>;
select<K extends Keyof<T>>(keys?: K[]): AsyncResult<any, StoreQueryError> {
return this.driver.select(this.schema, {
...this.query,
keys,
});
}
selectOne(): AsyncResult<T, StoreQueryError>;
selectOne<K extends Keyof<T>>(
keys: K,
): AsyncResult<Pick<T, K>, StoreQueryError>;
selectOne<K extends Keyof<T>>(keys?: K[]): AsyncResult<any, StoreQueryError> {
return this.driver.selectOne(this.schema, {
...this.query,
keys,
});
}
}

View File

@ -1,5 +1,5 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
import { AsyncResult, Keyof } from "@fabric/core";
import { AsyncResult, Keyof, Optional } from "@fabric/core";
import { StoreQueryError } from "../../errors/query-error.js";
import { FilterOptions } from "./filter-options.js";
import { OrderByOptions } from "./order-by-options.js";
@ -14,10 +14,10 @@ export interface StoreQuery<T> {
keys: K[],
): AsyncResult<Pick<T, K>[], StoreQueryError>;
selectOne(): AsyncResult<T, StoreQueryError>;
selectOne(): AsyncResult<Optional<T>, StoreQueryError>;
selectOne<K extends Keyof<T>>(
keys: K[],
): AsyncResult<Pick<T, K>, StoreQueryError>;
): AsyncResult<Optional<Pick<T, K>>, StoreQueryError>;
}
export interface StoreSortableQuery<T> {
@ -29,10 +29,10 @@ export interface StoreSortableQuery<T> {
keys: K[],
): AsyncResult<Pick<T, K>[], StoreQueryError>;
selectOne(): AsyncResult<T, StoreQueryError>;
selectOne(): AsyncResult<Optional<T>, StoreQueryError>;
selectOne<K extends Keyof<T>>(
keys: K[],
): AsyncResult<Pick<T, K>, StoreQueryError>;
): AsyncResult<Optional<Pick<T, K>>, StoreQueryError>;
}
export interface StoreLimitableQuery<T> {
@ -43,10 +43,10 @@ export interface StoreLimitableQuery<T> {
keys: K[],
): AsyncResult<Pick<T, K>[], StoreQueryError>;
selectOne(): AsyncResult<T, StoreQueryError>;
selectOne(): AsyncResult<Optional<T>, StoreQueryError>;
selectOne<K extends Keyof<T>>(
keys: K[],
): AsyncResult<Pick<T, K>, StoreQueryError>;
): AsyncResult<Optional<Pick<T, K>>, StoreQueryError>;
}
export interface SelectableQuery<T> {
@ -55,10 +55,10 @@ export interface SelectableQuery<T> {
keys: K[],
): AsyncResult<Pick<T, K>[], StoreQueryError>;
selectOne(): AsyncResult<T, StoreQueryError>;
selectOne(): AsyncResult<Optional<T>, StoreQueryError>;
selectOne<K extends Keyof<T>>(
keys: K[],
): AsyncResult<Pick<T, K>, StoreQueryError>;
): AsyncResult<Optional<Pick<T, K>>, StoreQueryError>;
}
export interface QueryDefinition<K extends string = string> {

View File

@ -1,136 +0,0 @@
import { isError, Run } from "@fabric/core";
import { SQLiteStorageDriver } from "@fabric/sqlite-store";
import {
afterEach,
beforeEach,
describe,
expect,
expectTypeOf,
it,
} from "vitest";
import { UUIDGeneratorMock } from "../services/uuid-generator.mock.js";
import { UUID } from "../types/uuid.js";
import { Field } from "./fields/index.js";
import { defineModel } from "./model.js";
import { isLike } from "./query/filter-options.js";
import { StateStore } from "./state-store.js";
describe("State Store", () => {
const models = [
defineModel("users", {
name: Field.string(),
}),
];
let driver: SQLiteStorageDriver;
let store: StateStore<(typeof models)[number]>;
beforeEach(async () => {
driver = new SQLiteStorageDriver(":memory:");
store = new StateStore(driver, models);
const migrationResult = await store.migrate();
if (isError(migrationResult)) throw migrationResult;
});
afterEach(async () => {
await driver.close();
});
it("should insert a record", async () => {
const newUUID = UUIDGeneratorMock.generate();
const insertResult = await store.insertInto("users", {
name: "test",
id: newUUID,
streamId: newUUID,
streamVersion: 1n,
});
if (isError(insertResult)) throw insertResult;
});
it("should query with a basic select", async () => {
const newUUID = UUIDGeneratorMock.generate();
const insertResult = await store.insertInto("users", {
name: "test",
id: newUUID,
streamId: newUUID,
streamVersion: 1n,
});
if (isError(insertResult)) throw insertResult;
const result = (await store.from("users").select()).unwrapOrThrow();
expectTypeOf(result).toEqualTypeOf<
{
id: UUID;
streamId: UUID;
streamVersion: bigint;
name: string;
}[]
>();
expect(result).toEqual([
{
id: newUUID,
streamId: newUUID,
streamVersion: 1n,
name: "test",
},
]);
});
it("should query with a where clause", async () => {
const newUUID = UUIDGeneratorMock.generate();
await Run.seqUNSAFE(
() =>
store.insertInto("users", {
name: "test",
id: newUUID,
streamId: newUUID,
streamVersion: 1n,
}),
() =>
store.insertInto("users", {
name: "anotherName",
id: UUIDGeneratorMock.generate(),
streamId: UUIDGeneratorMock.generate(),
streamVersion: 1n,
}),
() =>
store.insertInto("users", {
name: "anotherName2",
id: UUIDGeneratorMock.generate(),
streamId: UUIDGeneratorMock.generate(),
streamVersion: 1n,
}),
);
const result = await Run.UNSAFE(() =>
store
.from("users")
.where({
name: isLike("te%"),
})
.select(),
);
expectTypeOf(result).toEqualTypeOf<
{
id: UUID;
streamId: UUID;
streamVersion: bigint;
name: string;
}[]
>();
expect(result).toEqual([
{
id: newUUID,
streamId: newUUID,
streamVersion: 1n,
name: "test",
},
]);
});
});

View File

@ -1,42 +1,19 @@
import { AsyncResult } from "@fabric/core";
import { CircularDependencyError } from "../errors/circular-dependency-error.js";
import { StoreQueryError } from "../errors/query-error.js";
import { StorageDriver } from "../storage/storage-driver.js";
import { ModelSchemaFromModels } from "./model-schema.js";
import { Model, ModelToType } from "./model.js";
import { QueryBuilder } from "./query/query-builder.js";
import { StoreQuery } from "./query/query.js";
export class StateStore<TModel extends Model> {
private schema: ModelSchemaFromModels<TModel>;
constructor(
private driver: StorageDriver,
models: TModel[],
) {
this.schema = models.reduce((acc, model: TModel) => {
return {
...acc,
[model.name]: model,
};
}, {} as ModelSchemaFromModels<TModel>);
}
migrate(): AsyncResult<void, StoreQueryError | CircularDependencyError> {
return this.driver.sync(this.schema);
}
async insertInto<T extends keyof ModelSchemaFromModels<TModel>>(
collection: T,
record: ModelToType<ModelSchemaFromModels<TModel>[T]>,
): AsyncResult<void, StoreQueryError> {
return this.driver.insert(this.schema[collection], record);
}
export interface ReadonlyStateStore<TModel extends Model> {
from<T extends keyof ModelSchemaFromModels<TModel>>(
collection: T,
): StoreQuery<ModelToType<ModelSchemaFromModels<TModel>[T]>> {
return new QueryBuilder(this.driver, this.schema, {
from: collection,
}) as StoreQuery<ModelToType<ModelSchemaFromModels<TModel>[T]>>;
}
): StoreQuery<ModelToType<ModelSchemaFromModels<TModel>[T]>>;
}
export interface WritableStateStore<TModel extends Model>
extends ReadonlyStateStore<TModel> {
insertInto<T extends keyof ModelSchemaFromModels<TModel>>(
collection: T,
record: ModelToType<ModelSchemaFromModels<TModel>[T]>,
): AsyncResult<void, StoreQueryError>;
}

View File

@ -1 +0,0 @@
export * from "./storage-driver.js";

View File

@ -1,65 +0,0 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
import { AsyncResult, UnexpectedError } from "@fabric/core";
import { CircularDependencyError } from "../errors/circular-dependency-error.js";
import { StoreQueryError } from "../errors/query-error.js";
import { ModelSchema } from "../models/model-schema.js";
import { Collection } from "../models/model.js";
import { QueryDefinition } from "../models/query/query.js";
export interface StorageDriver {
/**
* Insert data into the store
*/
insert(
model: Collection,
record: Record<string, any>,
): AsyncResult<void, StoreQueryError>;
/**
* Run a select query against the store.
*/
select(
model: ModelSchema,
query: QueryDefinition,
): AsyncResult<any[], StoreQueryError>;
/**
* Run a select query against the store.
*/
selectOne(
model: ModelSchema,
query: QueryDefinition,
): AsyncResult<any, StoreQueryError>;
/**
* Sincronice the store with the schema.
*/
sync(
schema: ModelSchema,
): AsyncResult<void, StoreQueryError | CircularDependencyError>;
/**
* Drop the store. This is a destructive operation.
*/
drop(): AsyncResult<void, StoreQueryError>;
/**
* Close the store.
*/
close(): AsyncResult<void, UnexpectedError>;
/**
* Update a record in the store.
*/
update(
model: Collection,
id: string,
record: Record<string, any>,
): AsyncResult<void, StoreQueryError>;
/**
* Delete a record from the store.
*/
delete(model: Collection, id: string): AsyncResult<void, StoreQueryError>;
}

View File

@ -1 +1 @@
export * from "./sqlite-driver.js";
export * from "./state/index.js";

View File

@ -1,273 +0,0 @@
import { Run } from "@fabric/core";
import { defineModel, Field, isLike } from "@fabric/domain";
import { afterEach, beforeEach, describe, expect, it } from "vitest";
import { SQLiteStorageDriver } from "./sqlite-driver.js";
describe("SQLite Store Driver", () => {
const schema = {
demo: defineModel("demo", {
value: Field.float(),
owner: Field.reference({ targetModel: "users" }),
}),
users: defineModel("users", {
name: Field.string(),
}),
};
let driver: SQLiteStorageDriver;
beforeEach(() => {
driver = new SQLiteStorageDriver(":memory:");
});
afterEach(async () => {
await Run.UNSAFE(() => driver.close());
});
it("should synchronize the store and insert a record", async () => {
await Run.UNSAFE(() => driver.sync(schema));
await Run.UNSAFE(() =>
driver.insert(schema.users, {
id: "1",
name: "test",
streamId: "1",
streamVersion: 1n,
}),
);
const records = await Run.UNSAFE(() =>
driver.select(schema, { from: "users" }),
);
expect(records).toEqual([
{ id: "1", name: "test", streamId: "1", streamVersion: 1n },
]);
});
it("should be update a record", async () => {
await Run.UNSAFE(() => driver.sync(schema));
await Run.UNSAFE(() =>
driver.insert(schema.users, {
id: "1",
name: "test",
streamId: "1",
streamVersion: 1n,
}),
);
await Run.UNSAFE(() =>
driver.update(schema.users, "1", { name: "updated" }),
);
const records = await Run.UNSAFE(() =>
driver.select(schema, { from: "users" }),
);
expect(records).toEqual([
{ id: "1", name: "updated", streamId: "1", streamVersion: 1n },
]);
});
it("should be able to delete a record", async () => {
await Run.UNSAFE(() => driver.sync(schema));
await Run.UNSAFE(() =>
driver.insert(schema.users, {
id: "1",
name: "test",
streamId: "1",
streamVersion: 1n,
}),
);
await Run.UNSAFE(() => driver.delete(schema.users, "1"));
const records = await Run.UNSAFE(() =>
driver.select(schema, { from: "users" }),
);
expect(records).toEqual([]);
});
it("should be able to select records", async () => {
await Run.UNSAFE(() => driver.sync(schema));
await Run.UNSAFE(() =>
driver.insert(schema.users, {
id: "1",
name: "test",
streamId: "1",
streamVersion: 1n,
}),
);
await Run.UNSAFE(() =>
driver.insert(schema.users, {
id: "2",
name: "test",
streamId: "2",
streamVersion: 1n,
}),
);
const records = await Run.UNSAFE(() =>
driver.select(schema, { from: "users" }),
);
expect(records).toEqual([
{ id: "1", name: "test", streamId: "1", streamVersion: 1n },
{ id: "2", name: "test", streamId: "2", streamVersion: 1n },
]);
});
it("should be able to select one record", async () => {
await Run.UNSAFE(() => driver.sync(schema));
await Run.UNSAFE(() =>
driver.insert(schema.users, {
id: "1",
name: "test",
streamId: "1",
streamVersion: 1n,
}),
);
await Run.UNSAFE(() =>
driver.insert(schema.users, {
id: "2",
name: "test",
streamId: "2",
streamVersion: 1n,
}),
);
const record = await Run.UNSAFE(() =>
driver.selectOne(schema, { from: "users" }),
);
expect(record).toEqual({
id: "1",
name: "test",
streamId: "1",
streamVersion: 1n,
});
});
it("should select a record with a where clause", async () => {
await Run.UNSAFE(() => driver.sync(schema));
await Run.UNSAFE(() =>
driver.insert(schema.users, {
id: "1",
name: "test",
streamId: "1",
streamVersion: 1n,
}),
);
await Run.UNSAFE(() =>
driver.insert(schema.users, {
id: "2",
name: "jamón",
streamId: "2",
streamVersion: 1n,
}),
);
const result = await Run.UNSAFE(() =>
driver.select(schema, {
from: "users",
where: { name: isLike("te%") },
}),
);
expect(result).toEqual([
{
id: "1",
name: "test",
streamId: "1",
streamVersion: 1n,
},
]);
});
it("should select a record with a where clause of a specific type", async () => {
await Run.UNSAFE(() => driver.sync(schema));
await Run.UNSAFE(() =>
driver.insert(schema.users, {
id: "1",
name: "test",
streamId: "1",
streamVersion: 1n,
}),
);
await Run.UNSAFE(() =>
driver.insert(schema.users, {
id: "2",
name: "jamón",
streamId: "2",
streamVersion: 1n,
}),
);
const result = await Run.UNSAFE(() =>
driver.select(schema, {
from: "users",
where: { streamVersion: 1n },
}),
);
expect(result).toEqual([
{
id: "1",
name: "test",
streamId: "1",
streamVersion: 1n,
},
{
id: "2",
name: "jamón",
streamId: "2",
streamVersion: 1n,
},
]);
});
it("should select with a limit and offset", async () => {
await Run.UNSAFE(() => driver.sync(schema));
await Run.UNSAFE(() =>
driver.insert(schema.users, {
id: "1",
name: "test",
streamId: "1",
streamVersion: 1n,
}),
);
await Run.UNSAFE(() =>
driver.insert(schema.users, {
id: "2",
name: "jamón",
streamId: "2",
streamVersion: 1n,
}),
);
const result = await Run.UNSAFE(() =>
driver.select(schema, {
from: "users",
limit: 1,
offset: 1,
}),
);
expect(result).toEqual([
{
id: "2",
name: "jamón",
streamId: "2",
streamVersion: 1n,
},
]);
});
});

View File

@ -1,231 +0,0 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
import { AsyncResult, UnexpectedError } from "@fabric/core";
import { unlink } from "fs/promises";
import {
CircularDependencyError,
Collection,
Model,
ModelSchema,
QueryDefinition,
StorageDriver,
StoreQueryError,
} from "@fabric/domain";
import { filterToParams, filterToSQL } from "./sqlite/filter-to-sql.js";
import { modelToSql } from "./sqlite/model-to-sql.js";
import {
keyToParam,
recordToSQLKeyParams,
recordToSQLKeys,
recordToSQLParams,
recordToSQLSet,
} from "./sqlite/record-utils.js";
import { transformRow } from "./sqlite/sql-to-value.js";
import { SQLiteDatabase } from "./sqlite/sqlite-wrapper.js";
export class SQLiteStorageDriver implements StorageDriver {
private db: SQLiteDatabase;
constructor(private path: string) {
this.db = new SQLiteDatabase(path);
}
private getSelectStatement(
collection: Collection,
query: QueryDefinition,
): [string, Record<string, any>] {
const selectFields = query.keys ? query.keys.join(", ") : "*";
const queryFilter = filterToSQL(query.where);
const limit = query.limit ? `LIMIT ${query.limit}` : "";
const offset = query.offset ? `OFFSET ${query.offset}` : "";
const sql = [
`SELECT ${selectFields}`,
`FROM ${query.from}`,
queryFilter,
limit,
offset,
].join(" ");
return [
sql,
{
...filterToParams(collection, query.where),
},
];
}
/**
* Insert data into the store
*/
async insert(
model: Model,
record: Record<string, any>,
): AsyncResult<void, StoreQueryError> {
return AsyncResult.tryFrom(
async () => {
await this.db.runPrepared(
`INSERT INTO ${model.name} (${recordToSQLKeys(record)}) VALUES (${recordToSQLKeyParams(record)})`,
recordToSQLParams(model, record),
);
},
(error) =>
new StoreQueryError(error.message, {
error,
collectionName: model.name,
record,
}),
);
}
/**
* Run a select query against the store.
*/
async select(
schema: ModelSchema,
query: QueryDefinition,
): AsyncResult<any[], StoreQueryError> {
return AsyncResult.tryFrom(
async () => {
const [sql, params] = this.getSelectStatement(
schema[query.from],
query,
);
return this.db.allPrepared(
sql,
params,
transformRow(schema[query.from]),
);
},
(err) =>
new StoreQueryError(err.message, {
err,
query,
}),
);
}
/**
* Run a select query against the store.
*/
async selectOne(
schema: ModelSchema,
query: QueryDefinition,
): AsyncResult<any, StoreQueryError> {
return AsyncResult.tryFrom(
async () => {
const [stmt, params] = this.getSelectStatement(
schema[query.from],
query,
);
return await this.db.onePrepared(
stmt,
params,
transformRow(schema[query.from]),
);
},
(err) =>
new StoreQueryError(err.message, {
err,
query,
}),
);
}
/**
* Sincronice the store with the schema.
*/
async sync(
schema: ModelSchema,
): AsyncResult<void, StoreQueryError | CircularDependencyError> {
return AsyncResult.tryFrom(
async () => {
await this.db.withTransaction(async () => {
for (const modelKey in schema) {
const model = schema[modelKey];
await this.db.runPrepared(modelToSql(model));
}
});
},
(error) =>
new StoreQueryError(error.message, {
error,
schema,
}),
);
}
/**
* Drop the store. This is a destructive operation.
*/
async drop(): AsyncResult<void, StoreQueryError> {
return AsyncResult.tryFrom(
async () => {
if (this.path === ":memory:") {
throw "Cannot drop in-memory database";
} else {
await unlink(this.path);
}
},
(error) =>
new StoreQueryError(error.message, {
error,
}),
);
}
async close(): AsyncResult<void, UnexpectedError> {
return AsyncResult.from(async () => {
this.db.close();
});
}
/**
* Update a record in the store.
*/
async update(
model: Model,
id: string,
record: Record<string, any>,
): AsyncResult<void, StoreQueryError> {
return AsyncResult.tryFrom(
async () => {
const params = recordToSQLParams(model, {
...record,
id,
});
await this.db.runPrepared(
`UPDATE ${model.name} SET ${recordToSQLSet(record)} WHERE id = ${keyToParam("id")}`,
params,
);
},
(error) =>
new StoreQueryError(error.message, {
error,
collectionName: model.name,
record,
}),
);
}
/**
* Delete a record from the store.
*/
async delete(model: Model, id: string): AsyncResult<void, StoreQueryError> {
return AsyncResult.tryFrom(
async () => {
await this.db.runPrepared(
`DELETE FROM ${model.name} WHERE id = ${keyToParam("id")}`,
{ $id: id },
);
},
(error) =>
new StoreQueryError(error.message, {
error,
collectionName: model.name,
id,
}),
);
}
}

View File

@ -0,0 +1 @@
export * from "./state-store.js";

View File

@ -0,0 +1,127 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
import { AsyncResult, Keyof, Optional } from "@fabric/core";
import {
Collection,
FilterOptions,
ModelSchema,
OrderByOptions,
QueryDefinition,
SelectableQuery,
StoreLimitableQuery,
StoreQuery,
StoreQueryError,
StoreSortableQuery,
} from "@fabric/domain";
import { filterToParams, filterToSQL } from "../sqlite/filter-to-sql.js";
import { transformRow } from "../sqlite/sql-to-value.js";
import { SQLiteDatabase } from "../sqlite/sqlite-wrapper.js";
export class QueryBuilder<T> implements StoreQuery<T> {
constructor(
private db: SQLiteDatabase,
private schema: ModelSchema,
private query: QueryDefinition,
) {}
where(where: FilterOptions<T>): StoreSortableQuery<T> {
return new QueryBuilder(this.db, this.schema, {
...this.query,
where,
});
}
orderBy(opts: OrderByOptions<T>): StoreLimitableQuery<T> {
return new QueryBuilder(this.db, this.schema, {
...this.query,
orderBy: opts,
});
}
limit(limit: number, offset?: number | undefined): SelectableQuery<T> {
return new QueryBuilder(this.db, this.schema, {
...this.query,
limit,
offset,
});
}
select(): AsyncResult<T[], StoreQueryError>;
select<K extends Keyof<T>>(
keys: K[],
): AsyncResult<Pick<T, K>[], StoreQueryError>;
select<K extends Keyof<T>>(keys?: K[]): AsyncResult<any, StoreQueryError> {
return AsyncResult.tryFrom(
async () => {
const [sql, params] = getSelectStatement(this.schema[this.query.from], {
...this.query,
keys,
});
return this.db.allPrepared(
sql,
params,
transformRow(this.schema[this.query.from]),
);
},
(err) =>
new StoreQueryError(err.message, {
err,
query: this.query,
}),
);
}
selectOne(): AsyncResult<Optional<T>, StoreQueryError>;
selectOne<K extends Keyof<T>>(
keys: K,
): AsyncResult<Optional<Pick<T, K>>, StoreQueryError>;
selectOne<K extends Keyof<T>>(keys?: K[]): AsyncResult<any, StoreQueryError> {
return AsyncResult.tryFrom(
async () => {
const [stmt, params] = getSelectStatement(
this.schema[this.query.from],
{
...this.query,
keys,
limit: 1,
},
);
return await this.db.onePrepared(
stmt,
params,
transformRow(this.schema[this.query.from]),
);
},
(err) =>
new StoreQueryError(err.message, {
err,
query: this.query,
}),
);
}
}
export function getSelectStatement(
collection: Collection,
query: QueryDefinition,
): [string, Record<string, any>] {
const selectFields = query.keys ? query.keys.join(", ") : "*";
const queryFilter = filterToSQL(query.where);
const limit = query.limit ? `LIMIT ${query.limit}` : "";
const offset = query.offset ? `OFFSET ${query.offset}` : "";
const sql = [
`SELECT ${selectFields}`,
`FROM ${query.from}`,
queryFilter,
limit,
offset,
].join(" ");
return [
sql,
{
...filterToParams(collection, query.where),
},
];
}

View File

@ -0,0 +1,213 @@
import { Run } from "@fabric/core";
import { defineModel, Field, isLike, UUID } from "@fabric/domain";
import { UUIDGeneratorMock } from "@fabric/domain/mocks";
import {
afterEach,
beforeEach,
describe,
expect,
expectTypeOf,
it,
} from "vitest";
import { SQLiteStateStore } from "./state-store.js";
describe("State Store", () => {
const models = [
defineModel("demo", {
value: Field.float(),
owner: Field.reference({ targetModel: "users" }),
}),
defineModel("users", {
name: Field.string(),
}),
];
let store: SQLiteStateStore<(typeof models)[number]>;
beforeEach(async () => {
store = new SQLiteStateStore(":memory:", models);
await Run.UNSAFE(() => store.migrate());
});
afterEach(async () => {
await Run.UNSAFE(() => store.close());
});
it("should insert a record", async () => {
const newUUID = UUIDGeneratorMock.generate();
await Run.UNSAFE(() =>
store.insertInto("users", {
id: newUUID,
name: "test",
streamId: newUUID,
streamVersion: 1n,
}),
);
});
it("should select all records", async () => {
const newUUID = UUIDGeneratorMock.generate();
await Run.UNSAFE(() =>
store.insertInto("users", {
name: "test",
id: newUUID,
streamId: newUUID,
streamVersion: 1n,
}),
);
const result = await Run.UNSAFE(() => store.from("users").select());
expectTypeOf(result).toEqualTypeOf<
{
id: UUID;
streamId: UUID;
streamVersion: bigint;
name: string;
}[]
>();
expect(result).toEqual([
{
id: newUUID,
streamId: newUUID,
streamVersion: 1n,
name: "test",
},
]);
});
it("should select records with a filter", async () => {
const newUUID = UUIDGeneratorMock.generate();
await Run.seqUNSAFE(
() =>
store.insertInto("users", {
name: "test",
id: newUUID,
streamId: newUUID,
streamVersion: 1n,
}),
() =>
store.insertInto("users", {
name: "anotherName",
id: UUIDGeneratorMock.generate(),
streamId: UUIDGeneratorMock.generate(),
streamVersion: 1n,
}),
() =>
store.insertInto("users", {
name: "anotherName2",
id: UUIDGeneratorMock.generate(),
streamId: UUIDGeneratorMock.generate(),
streamVersion: 1n,
}),
);
const result = await Run.UNSAFE(() =>
store
.from("users")
.where({
name: isLike("te%"),
})
.select(),
);
expectTypeOf(result).toEqualTypeOf<
{
id: UUID;
streamId: UUID;
streamVersion: bigint;
name: string;
}[]
>();
expect(result).toEqual([
{
id: newUUID,
streamId: newUUID,
streamVersion: 1n,
name: "test",
},
]);
});
it("should update a record", async () => {
const newUUID = UUIDGeneratorMock.generate();
await Run.UNSAFE(() =>
store.insertInto("users", {
name: "test",
id: newUUID,
streamId: newUUID,
streamVersion: 1n,
}),
);
await Run.UNSAFE(() =>
store.update("users", newUUID, {
name: "updated",
}),
);
const result = await Run.UNSAFE(() =>
store.from("users").where({ id: newUUID }).selectOne(),
);
expect(result).toEqual({
id: newUUID,
streamId: newUUID,
streamVersion: 1n,
name: "updated",
});
});
it("should delete a record", async () => {
const newUUID = UUIDGeneratorMock.generate();
await Run.UNSAFE(() =>
store.insertInto("users", {
name: "test",
id: newUUID,
streamId: newUUID,
streamVersion: 1n,
}),
);
await Run.UNSAFE(() => store.delete("users", newUUID));
const result = await Run.UNSAFE(() =>
store.from("users").where({ id: newUUID }).selectOne(),
);
expect(result).toBeUndefined();
});
//test for inserting into a collection with a reference
it("should insert a record with a reference", async () => {
const newUUID = UUIDGeneratorMock.generate();
const ownerUUID = UUIDGeneratorMock.generate();
await Run.UNSAFE(() =>
store.insertInto("users", {
id: ownerUUID,
name: "test",
streamId: ownerUUID,
streamVersion: 1n,
}),
);
await Run.UNSAFE(() =>
store.insertInto("demo", {
id: newUUID,
value: 1.0,
owner: ownerUUID,
streamId: newUUID,
streamVersion: 1n,
}),
);
});
});

View File

@ -0,0 +1,144 @@
import { AsyncResult, UnexpectedError } from "@fabric/core";
import {
Model,
ModelSchemaFromModels,
ModelToType,
StoreQuery,
StoreQueryError,
UUID,
WritableStateStore,
} from "@fabric/domain";
import { modelToSql } from "../sqlite/model-to-sql.js";
import {
keyToParam,
recordToSQLKeyParams,
recordToSQLKeys,
recordToSQLParams,
recordToSQLSet,
} from "../sqlite/record-utils.js";
import { SQLiteDatabase } from "../sqlite/sqlite-wrapper.js";
import { QueryBuilder } from "./query-builder.js";
export class SQLiteStateStore<TModel extends Model>
implements WritableStateStore<TModel>
{
private schema: ModelSchemaFromModels<TModel>;
private db: SQLiteDatabase;
constructor(
private dbPath: string,
models: TModel[],
) {
this.schema = models.reduce((acc, model: TModel) => {
return {
...acc,
[model.name]: model,
};
}, {} as ModelSchemaFromModels<TModel>);
this.db = new SQLiteDatabase(dbPath);
}
async insertInto<T extends keyof ModelSchemaFromModels<TModel>>(
collection: T,
record: ModelToType<ModelSchemaFromModels<TModel>[T]>,
): AsyncResult<void, StoreQueryError> {
const model = this.schema[collection];
return AsyncResult.tryFrom(
async () => {
await this.db.runPrepared(
`INSERT INTO ${model.name} (${recordToSQLKeys(record)}) VALUES (${recordToSQLKeyParams(record)})`,
recordToSQLParams(model, record),
);
},
(error) =>
new StoreQueryError(error.message, {
error,
collectionName: model.name,
record,
}),
);
}
from<T extends keyof ModelSchemaFromModels<TModel>>(
collection: T,
): StoreQuery<ModelToType<ModelSchemaFromModels<TModel>[T]>> {
return new QueryBuilder(this.db, this.schema, {
from: collection,
}) as StoreQuery<ModelToType<ModelSchemaFromModels<TModel>[T]>>;
}
update<T extends keyof ModelSchemaFromModels<TModel>>(
collection: T,
id: UUID,
record: Partial<ModelToType<ModelSchemaFromModels<TModel>[T]>>,
): AsyncResult<void, StoreQueryError> {
const model = this.schema[collection];
return AsyncResult.tryFrom(
async () => {
const params = recordToSQLParams(model, {
...record,
id,
});
await this.db.runPrepared(
`UPDATE ${model.name} SET ${recordToSQLSet(record)} WHERE id = ${keyToParam("id")}`,
params,
);
},
(error) =>
new StoreQueryError(error.message, {
error,
collectionName: model.name,
record,
}),
);
}
delete<T extends keyof ModelSchemaFromModels<TModel>>(
collection: T,
id: UUID,
): AsyncResult<void, StoreQueryError> {
const model = this.schema[collection];
return AsyncResult.tryFrom(
async () => {
await this.db.runPrepared(
`DELETE FROM ${model.name} WHERE id = ${keyToParam("id")}`,
{ $id: id },
);
},
(error) =>
new StoreQueryError(error.message, {
error,
collectionName: model.name,
id,
}),
);
}
migrate(): AsyncResult<void, StoreQueryError> {
return AsyncResult.tryFrom(
async () => {
await this.db.init();
await this.db.withTransaction(async () => {
for (const modelKey in this.schema) {
const model =
this.schema[modelKey as keyof ModelSchemaFromModels<TModel>];
await this.db.runPrepared(modelToSql(model));
}
});
},
(error) =>
new StoreQueryError(error.message, {
error,
schema: this.schema,
}),
);
}
async close(): AsyncResult<void, UnexpectedError> {
return AsyncResult.from(() => this.db.close());
}
}