Compare commits

..

No commits in common. "d443e9e3955790ec36a8071468b3e1f37448b24d" and "4fff9f91f5fcadb935242b051b04394d5e0ca4d8" have entirely different histories.

35 changed files with 981 additions and 716 deletions

View File

@ -1,10 +1,14 @@
{ {
"name": "@fabric/core", "name": "@fabric/core",
"type": "module", "type": "module",
"main": "./dist/index.js", "module": "dist/index.js",
"types": "./dist/index.d.ts", "main": "dist/index.js",
"types": "dist/index.d.ts",
"exports": { "exports": {
".": "./dist/index.js" ".": "./dist/index.js",
"./domain": "./dist/domain.js",
"./validation": "./dist/validation.js",
"./validation/fields": "./dist/validation/fields/index.js"
}, },
"files": [ "files": [
"dist" "dist"

View File

@ -1,19 +1,15 @@
{ {
"name": "@fabric/domain", "name": "@fabric/domain",
"type": "module", "type": "module",
"main": "./dist/index.js", "module": "dist/index.js",
"types": "./dist/index.d.ts", "main": "dist/index.js",
"exports": {
".": "./dist/index.js",
"./mocks": "./dist/mocks.js"
},
"files": [ "files": [
"dist" "dist"
], ],
"private": true, "private": true,
"packageManager": "yarn@4.1.1", "packageManager": "yarn@4.1.1",
"devDependencies": { "devDependencies": {
"@fabric/sqlite-store": "workspace:^", "@fabric/store-sqlite": "workspace:^",
"typescript": "^5.6.2", "typescript": "^5.6.2",
"vitest": "^2.1.1" "vitest": "^2.1.1"
}, },

View File

@ -1,26 +1,36 @@
import { AsyncResult, PosixDate } from "@fabric/core"; import { AsyncResult, MaybePromise, PosixDate } from "@fabric/core";
import { StoreQueryError } from "../errors/query-error.js"; import { StoreQueryError } from "../errors/query-error.js";
import { EventsFromStream, EventStream } from "./event-stream.js"; import { UUID } from "../types/uuid.js";
import { StoredEvent } from "./stored-event.js"; import { Event, StoredEvent } from "./event.js";
export interface EventStore<TEventStream extends EventStream> { export interface EventStore<TEvent extends Event = Event> {
/** getStream<TEventStreamEvent extends TEvent>(
* Store a new event in the event store. streamId: UUID,
*/ ): AsyncResult<EventStream<TEventStreamEvent>, StoreQueryError>;
append<
TStreamKey extends TEventStream["name"], appendToStream<TEvent extends Event>(
T extends EventsFromStream<TEventStream, TStreamKey>, streamId: UUID,
>( events: TEvent,
streamName: TStreamKey, ): AsyncResult<void, StoreQueryError>;
event: T, }
): AsyncResult<StoredEvent<T>, StoreQueryError>;
export interface EventStream<TEvent extends Event = Event> {
getCurrentVersion(): bigint;
append(events: TEvent): AsyncResult<StoredEvent<TEvent>, StoreQueryError>;
subscribe(callback: (event: StoredEvent<TEvent>) => MaybePromise<void>): void;
getEvents(
opts?: EventFilterOptions,
): AsyncResult<StoredEvent<TEvent>[], StoreQueryError>;
} }
export interface EventFilterOptions { export interface EventFilterOptions {
fromDate?: PosixDate; fromDate?: PosixDate;
toDate?: PosixDate; toDate?: PosixDate;
fromVersion?: bigint; fromVersion?: number;
toVersion?: bigint; toVersion?: number;
limit?: number; limit?: number;
offset?: number; offset?: number;
} }

View File

@ -1,16 +0,0 @@
import { AsyncResult } from "@fabric/core";
import { UUID } from "../types/uuid.js";
import { Event } from "./event.js";
import { StoredEvent } from "./stored-event.js";
export interface EventStream<
TName extends string = string,
TEvent extends Event = Event,
> {
id: UUID;
name: TName;
append<T extends TEvent>(event: Event): AsyncResult<StoredEvent<T>>;
}
export type EventsFromStream<T extends EventStream, TKey extends string> =
T extends EventStream<TKey, infer TEvent> ? TEvent : never;

View File

@ -1,12 +1,20 @@
/* eslint-disable @typescript-eslint/no-explicit-any */ /* eslint-disable @typescript-eslint/no-explicit-any */
import { PosixDate, TaggedVariant } from "@fabric/core";
import { UUID } from "../types/uuid.js"; import { UUID } from "../types/uuid.js";
/** /**
* An event is a tagged variant with a payload and a timestamp. * An event is a tagged variant with a payload and a timestamp.
*/ */
export interface Event<TTag extends string = string, TPayload = any> { export interface Event<TTag extends string = string, TPayload = any>
readonly type: TTag; extends TaggedVariant<TTag> {
readonly id: UUID; streamId: UUID;
readonly streamId: UUID; payload: TPayload;
readonly payload: TPayload;
} }
/**
* A stored event is an inmutable event, already stored, with it's version in the stream and timestamp.
*/
export type StoredEvent<TEvent extends Event> = Readonly<TEvent> & {
readonly version: number;
readonly timestamp: PosixDate;
};

View File

@ -1,4 +1,2 @@
export * from "./event-store.js"; export * from "./event-store.js";
export * from "./event-stream.js";
export * from "./event.js"; export * from "./event.js";
export * from "./stored-event.js";

View File

@ -1,10 +0,0 @@
import { PosixDate } from "@fabric/core";
import { Event } from "./event.js";
/**
* A stored event is an inmutable event, already stored, with it's version in the stream and timestamp.
*/
export type StoredEvent<TEvent extends Event> = TEvent & {
readonly version: bigint;
readonly timestamp: PosixDate;
};

View File

@ -4,5 +4,6 @@ export * from "./files/index.js";
export * from "./models/index.js"; export * from "./models/index.js";
export * from "./security/index.js"; export * from "./security/index.js";
export * from "./services/index.js"; export * from "./services/index.js";
export * from "./storage/index.js";
export * from "./types/index.js"; export * from "./types/index.js";
export * from "./use-case/index.js"; export * from "./use-case/index.js";

View File

@ -1,7 +1,7 @@
import { Model } from "./model.js"; import { Collection } from "./model.js";
export type ModelSchema = Record<string, Model>; export type ModelSchema = Record<string, Collection>;
export type ModelSchemaFromModels<TModels extends Model> = { export type ModelSchemaFromModels<TModels extends Collection> = {
[K in TModels["name"]]: Extract<TModels, { name: K }>; [K in TModels["name"]]: Extract<TModels, { name: K }>;
}; };

View File

@ -1,3 +1,4 @@
export * from "./filter-options.js"; export * from "./filter-options.js";
export * from "./order-by-options.js"; export * from "./order-by-options.js";
export * from "./query-builder.js";
export * from "./query.js"; export * from "./query.js";

View File

@ -0,0 +1,66 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
import { AsyncResult, Keyof } from "@fabric/core";
import { StoreQueryError } from "../../errors/query-error.js";
import { StorageDriver } from "../../storage/storage-driver.js";
import { ModelSchema } from "../model-schema.js";
import { FilterOptions } from "./filter-options.js";
import { OrderByOptions } from "./order-by-options.js";
import {
QueryDefinition,
SelectableQuery,
StoreLimitableQuery,
StoreQuery,
StoreSortableQuery,
} from "./query.js";
export class QueryBuilder<T> implements StoreQuery<T> {
constructor(
private driver: StorageDriver,
private schema: ModelSchema,
private query: QueryDefinition,
) {}
where(where: FilterOptions<T>): StoreSortableQuery<T> {
return new QueryBuilder(this.driver, this.schema, {
...this.query,
where,
});
}
orderBy(opts: OrderByOptions<T>): StoreLimitableQuery<T> {
return new QueryBuilder(this.driver, this.schema, {
...this.query,
orderBy: opts,
});
}
limit(limit: number, offset?: number | undefined): SelectableQuery<T> {
return new QueryBuilder(this.driver, this.schema, {
...this.query,
limit,
offset,
});
}
select(): AsyncResult<T[], StoreQueryError>;
select<K extends Keyof<T>>(
keys: K[],
): AsyncResult<Pick<T, K>[], StoreQueryError>;
select<K extends Keyof<T>>(keys?: K[]): AsyncResult<any, StoreQueryError> {
return this.driver.select(this.schema, {
...this.query,
keys,
});
}
selectOne(): AsyncResult<T, StoreQueryError>;
selectOne<K extends Keyof<T>>(
keys: K,
): AsyncResult<Pick<T, K>, StoreQueryError>;
selectOne<K extends Keyof<T>>(keys?: K[]): AsyncResult<any, StoreQueryError> {
return this.driver.selectOne(this.schema, {
...this.query,
keys,
});
}
}

View File

@ -1,5 +1,5 @@
/* eslint-disable @typescript-eslint/no-explicit-any */ /* eslint-disable @typescript-eslint/no-explicit-any */
import { AsyncResult, Keyof, Optional } from "@fabric/core"; import { AsyncResult, Keyof } from "@fabric/core";
import { StoreQueryError } from "../../errors/query-error.js"; import { StoreQueryError } from "../../errors/query-error.js";
import { FilterOptions } from "./filter-options.js"; import { FilterOptions } from "./filter-options.js";
import { OrderByOptions } from "./order-by-options.js"; import { OrderByOptions } from "./order-by-options.js";
@ -14,10 +14,10 @@ export interface StoreQuery<T> {
keys: K[], keys: K[],
): AsyncResult<Pick<T, K>[], StoreQueryError>; ): AsyncResult<Pick<T, K>[], StoreQueryError>;
selectOne(): AsyncResult<Optional<T>, StoreQueryError>; selectOne(): AsyncResult<T, StoreQueryError>;
selectOne<K extends Keyof<T>>( selectOne<K extends Keyof<T>>(
keys: K[], keys: K[],
): AsyncResult<Optional<Pick<T, K>>, StoreQueryError>; ): AsyncResult<Pick<T, K>, StoreQueryError>;
} }
export interface StoreSortableQuery<T> { export interface StoreSortableQuery<T> {
@ -29,10 +29,10 @@ export interface StoreSortableQuery<T> {
keys: K[], keys: K[],
): AsyncResult<Pick<T, K>[], StoreQueryError>; ): AsyncResult<Pick<T, K>[], StoreQueryError>;
selectOne(): AsyncResult<Optional<T>, StoreQueryError>; selectOne(): AsyncResult<T, StoreQueryError>;
selectOne<K extends Keyof<T>>( selectOne<K extends Keyof<T>>(
keys: K[], keys: K[],
): AsyncResult<Optional<Pick<T, K>>, StoreQueryError>; ): AsyncResult<Pick<T, K>, StoreQueryError>;
} }
export interface StoreLimitableQuery<T> { export interface StoreLimitableQuery<T> {
@ -43,10 +43,10 @@ export interface StoreLimitableQuery<T> {
keys: K[], keys: K[],
): AsyncResult<Pick<T, K>[], StoreQueryError>; ): AsyncResult<Pick<T, K>[], StoreQueryError>;
selectOne(): AsyncResult<Optional<T>, StoreQueryError>; selectOne(): AsyncResult<T, StoreQueryError>;
selectOne<K extends Keyof<T>>( selectOne<K extends Keyof<T>>(
keys: K[], keys: K[],
): AsyncResult<Optional<Pick<T, K>>, StoreQueryError>; ): AsyncResult<Pick<T, K>, StoreQueryError>;
} }
export interface SelectableQuery<T> { export interface SelectableQuery<T> {
@ -55,10 +55,10 @@ export interface SelectableQuery<T> {
keys: K[], keys: K[],
): AsyncResult<Pick<T, K>[], StoreQueryError>; ): AsyncResult<Pick<T, K>[], StoreQueryError>;
selectOne(): AsyncResult<Optional<T>, StoreQueryError>; selectOne(): AsyncResult<T, StoreQueryError>;
selectOne<K extends Keyof<T>>( selectOne<K extends Keyof<T>>(
keys: K[], keys: K[],
): AsyncResult<Optional<Pick<T, K>>, StoreQueryError>; ): AsyncResult<Pick<T, K>, StoreQueryError>;
} }
export interface QueryDefinition<K extends string = string> { export interface QueryDefinition<K extends string = string> {

View File

@ -0,0 +1,136 @@
import { isError, Run } from "@fabric/core";
import { SQLiteStorageDriver } from "@fabric/store-sqlite";
import {
afterEach,
beforeEach,
describe,
expect,
expectTypeOf,
it,
} from "vitest";
import { UUIDGeneratorMock } from "../services/uuid-generator.mock.js";
import { UUID } from "../types/uuid.js";
import { Field } from "./fields/index.js";
import { defineModel } from "./model.js";
import { isLike } from "./query/filter-options.js";
import { StateStore } from "./state-store.js";
describe("State Store", () => {
const models = [
defineModel("users", {
name: Field.string(),
}),
];
let driver: SQLiteStorageDriver;
let store: StateStore<(typeof models)[number]>;
beforeEach(async () => {
driver = new SQLiteStorageDriver(":memory:");
store = new StateStore(driver, models);
const migrationResult = await store.migrate();
if (isError(migrationResult)) throw migrationResult;
});
afterEach(async () => {
await driver.close();
});
it("should insert a record", async () => {
const newUUID = UUIDGeneratorMock.generate();
const insertResult = await store.insertInto("users", {
name: "test",
id: newUUID,
streamId: newUUID,
streamVersion: 1n,
});
if (isError(insertResult)) throw insertResult;
});
it("should query with a basic select", async () => {
const newUUID = UUIDGeneratorMock.generate();
const insertResult = await store.insertInto("users", {
name: "test",
id: newUUID,
streamId: newUUID,
streamVersion: 1n,
});
if (isError(insertResult)) throw insertResult;
const result = (await store.from("users").select()).unwrapOrThrow();
expectTypeOf(result).toEqualTypeOf<
{
id: UUID;
streamId: UUID;
streamVersion: bigint;
name: string;
}[]
>();
expect(result).toEqual([
{
id: newUUID,
streamId: newUUID,
streamVersion: 1n,
name: "test",
},
]);
});
it("should query with a where clause", async () => {
const newUUID = UUIDGeneratorMock.generate();
await Run.seqUNSAFE(
() =>
store.insertInto("users", {
name: "test",
id: newUUID,
streamId: newUUID,
streamVersion: 1n,
}),
() =>
store.insertInto("users", {
name: "anotherName",
id: UUIDGeneratorMock.generate(),
streamId: UUIDGeneratorMock.generate(),
streamVersion: 1n,
}),
() =>
store.insertInto("users", {
name: "anotherName2",
id: UUIDGeneratorMock.generate(),
streamId: UUIDGeneratorMock.generate(),
streamVersion: 1n,
}),
);
const result = await Run.UNSAFE(() =>
store
.from("users")
.where({
name: isLike("te%"),
})
.select(),
);
expectTypeOf(result).toEqualTypeOf<
{
id: UUID;
streamId: UUID;
streamVersion: bigint;
name: string;
}[]
>();
expect(result).toEqual([
{
id: newUUID,
streamId: newUUID,
streamVersion: 1n,
name: "test",
},
]);
});
});

View File

@ -1,19 +1,42 @@
import { AsyncResult } from "@fabric/core"; import { AsyncResult } from "@fabric/core";
import { CircularDependencyError } from "../errors/circular-dependency-error.js";
import { StoreQueryError } from "../errors/query-error.js"; import { StoreQueryError } from "../errors/query-error.js";
import { StorageDriver } from "../storage/storage-driver.js";
import { ModelSchemaFromModels } from "./model-schema.js"; import { ModelSchemaFromModels } from "./model-schema.js";
import { Model, ModelToType } from "./model.js"; import { Model, ModelToType } from "./model.js";
import { QueryBuilder } from "./query/query-builder.js";
import { StoreQuery } from "./query/query.js"; import { StoreQuery } from "./query/query.js";
export interface ReadonlyStateStore<TModel extends Model> { export class StateStore<TModel extends Model> {
from<T extends keyof ModelSchemaFromModels<TModel>>( private schema: ModelSchemaFromModels<TModel>;
collection: T, constructor(
): StoreQuery<ModelToType<ModelSchemaFromModels<TModel>[T]>>; private driver: StorageDriver,
} models: TModel[],
) {
this.schema = models.reduce((acc, model: TModel) => {
return {
...acc,
[model.name]: model,
};
}, {} as ModelSchemaFromModels<TModel>);
}
export interface WritableStateStore<TModel extends Model> migrate(): AsyncResult<void, StoreQueryError | CircularDependencyError> {
extends ReadonlyStateStore<TModel> { return this.driver.sync(this.schema);
insertInto<T extends keyof ModelSchemaFromModels<TModel>>( }
async insertInto<T extends keyof ModelSchemaFromModels<TModel>>(
collection: T, collection: T,
record: ModelToType<ModelSchemaFromModels<TModel>[T]>, record: ModelToType<ModelSchemaFromModels<TModel>[T]>,
): AsyncResult<void, StoreQueryError>; ): AsyncResult<void, StoreQueryError> {
return this.driver.insert(this.schema[collection], record);
}
from<T extends keyof ModelSchemaFromModels<TModel>>(
collection: T,
): StoreQuery<ModelToType<ModelSchemaFromModels<TModel>[T]>> {
return new QueryBuilder(this.driver, this.schema, {
from: collection,
}) as StoreQuery<ModelToType<ModelSchemaFromModels<TModel>[T]>>;
}
} }

View File

@ -0,0 +1 @@
export * from "./storage-driver.js";

View File

@ -0,0 +1,65 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
import { AsyncResult, UnexpectedError } from "@fabric/core";
import { CircularDependencyError } from "../errors/circular-dependency-error.js";
import { StoreQueryError } from "../errors/query-error.js";
import { ModelSchema } from "../models/model-schema.js";
import { Collection } from "../models/model.js";
import { QueryDefinition } from "../models/query/query.js";
export interface StorageDriver {
/**
* Insert data into the store
*/
insert(
model: Collection,
record: Record<string, any>,
): AsyncResult<void, StoreQueryError>;
/**
* Run a select query against the store.
*/
select(
model: ModelSchema,
query: QueryDefinition,
): AsyncResult<any[], StoreQueryError>;
/**
* Run a select query against the store.
*/
selectOne(
model: ModelSchema,
query: QueryDefinition,
): AsyncResult<any, StoreQueryError>;
/**
* Sincronice the store with the schema.
*/
sync(
schema: ModelSchema,
): AsyncResult<void, StoreQueryError | CircularDependencyError>;
/**
* Drop the store. This is a destructive operation.
*/
drop(): AsyncResult<void, StoreQueryError>;
/**
* Close the store.
*/
close(): AsyncResult<void, UnexpectedError>;
/**
* Update a record in the store.
*/
update(
model: Collection,
id: string,
record: Record<string, any>,
): AsyncResult<void, StoreQueryError>;
/**
* Delete a record from the store.
*/
delete(model: Collection, id: string): AsyncResult<void, StoreQueryError>;
}

View File

@ -1,11 +1,8 @@
{ {
"name": "@fabric/sqlite-store", "name": "@fabric/store-sqlite",
"type": "module", "type": "module",
"main": "./dist/index.js", "module": "dist/index.js",
"types": "./dist/index.d.ts", "main": "dist/index.js",
"exports": {
".": "./dist/index.js"
},
"files": [ "files": [
"dist" "dist"
], ],

View File

@ -1 +1 @@
export * from "./state/index.js"; export * from "./sqlite-driver.js";

View File

@ -1,6 +1,8 @@
/* eslint-disable @typescript-eslint/no-explicit-any */ /* eslint-disable @typescript-eslint/no-explicit-any */
import { Variant, VariantTag } from "@fabric/core"; import { Variant, VariantTag } from "@fabric/core";
import { Collection, FieldDefinition, getTargetKey } from "@fabric/domain"; import { Collection, FieldDefinition, getTargetKey } from "@fabric/domain";
import { EmbeddedField } from "@fabric/domain/dist/models/fields/embedded.js";
import { TimestampField } from "@fabric/domain/dist/models/fields/timestamp.js";
type FieldSQLDefinitionMap = { type FieldSQLDefinitionMap = {
[K in FieldDefinition[VariantTag]]: ( [K in FieldDefinition[VariantTag]]: (
@ -40,10 +42,10 @@ const FieldSQLDefinitionMap: FieldSQLDefinitionMap = {
DecimalField: (n, f): string => { DecimalField: (n, f): string => {
return [n, "REAL", modifiersFromOpts(f)].join(" "); return [n, "REAL", modifiersFromOpts(f)].join(" ");
}, },
TimestampField: (n, f): string => { TimestampField: (n, f: TimestampField): string => {
return [n, "NUMERIC", modifiersFromOpts(f)].join(" "); return [n, "NUMERIC", modifiersFromOpts(f)].join(" ");
}, },
EmbeddedField: (n, f): string => { EmbeddedField: (n, f: EmbeddedField): string => {
return [n, "TEXT", modifiersFromOpts(f)].join(" "); return [n, "TEXT", modifiersFromOpts(f)].join(" ");
}, },
}; };

View File

@ -0,0 +1,251 @@
import { isError } from "@fabric/core";
import { defineModel, Field, isLike } from "@fabric/domain";
import { afterEach, beforeEach, describe, expect, it } from "vitest";
import { SQLiteStorageDriver } from "./sqlite-driver.js";
describe("SQLite Store Driver", () => {
const schema = {
demo: defineModel("demo", {
value: Field.float(),
owner: Field.reference({ targetModel: "users" }),
}),
users: defineModel("users", {
name: Field.string(),
}),
};
let driver: SQLiteStorageDriver;
beforeEach(() => {
driver = new SQLiteStorageDriver(":memory:");
});
afterEach(async () => {
const result = await driver.close();
if (isError(result)) throw result;
});
it("should synchronize the store and insert a record", async () => {
const result = await driver.sync(schema);
if (isError(result)) throw result;
const insertResult = await driver.insert(schema.users, {
id: "1",
name: "test",
streamId: "1",
streamVersion: 1n,
});
if (isError(insertResult)) throw insertResult;
const records = (
await driver.select(schema, { from: "users" })
).unwrapOrThrow();
expect(records).toEqual([
{ id: "1", name: "test", streamId: "1", streamVersion: 1n },
]);
});
it("should be update a record", async () => {
await driver.sync(schema);
await driver.insert(schema.users, {
id: "1",
name: "test",
streamId: "1",
streamVersion: 1n,
});
const err = await driver.update(schema.users, "1", { name: "updated" });
if (isError(err)) throw err;
const records = (
await driver.select(schema, { from: "users" })
).unwrapOrThrow();
expect(records).toEqual([
{ id: "1", name: "updated", streamId: "1", streamVersion: 1n },
]);
});
it("should be able to delete a record", async () => {
await driver.sync(schema);
await driver.insert(schema.users, {
id: "1",
name: "test",
streamId: "1",
streamVersion: 1n,
});
await driver.delete(schema.users, "1");
const records = (
await driver.select(schema, { from: "users" })
).unwrapOrThrow();
expect(records).toEqual([]);
});
it("should be able to select records", async () => {
await driver.sync(schema);
await driver.insert(schema.users, {
id: "1",
name: "test",
streamId: "1",
streamVersion: 1n,
});
await driver.insert(schema.users, {
id: "2",
name: "test",
streamId: "2",
streamVersion: 1n,
});
const records = (
await driver.select(schema, { from: "users" })
).unwrapOrThrow();
expect(records).toEqual([
{ id: "1", name: "test", streamId: "1", streamVersion: 1n },
{ id: "2", name: "test", streamId: "2", streamVersion: 1n },
]);
});
it("should be able to select one record", async () => {
await driver.sync(schema);
await driver.insert(schema.users, {
id: "1",
name: "test",
streamId: "1",
streamVersion: 1n,
});
await driver.insert(schema.users, {
id: "2",
name: "test",
streamId: "2",
streamVersion: 1n,
});
const record = (
await driver.selectOne(schema, { from: "users" })
).unwrapOrThrow();
expect(record).toEqual({
id: "1",
name: "test",
streamId: "1",
streamVersion: 1n,
});
});
it("should select a record with a where clause", async () => {
await driver.sync(schema);
await driver.insert(schema.users, {
id: "1",
name: "test",
streamId: "1",
streamVersion: 1n,
});
await driver.insert(schema.users, {
id: "2",
name: "jamón",
streamId: "2",
streamVersion: 1n,
});
const result = (
await driver.select(schema, {
from: "users",
where: { name: isLike("te%") },
})
).unwrapOrThrow();
expect(result).toEqual([
{
id: "1",
name: "test",
streamId: "1",
streamVersion: 1n,
},
]);
});
it("should select a record with a where clause of a specific type", async () => {
await driver.sync(schema);
await driver.insert(schema.users, {
id: "1",
name: "test",
streamId: "1",
streamVersion: 1n,
});
await driver.insert(schema.users, {
id: "2",
name: "jamón",
streamId: "2",
streamVersion: 1n,
});
const result = (
await driver.select(schema, {
from: "users",
where: { streamVersion: 1n },
})
).unwrapOrThrow();
expect(result).toEqual([
{
id: "1",
name: "test",
streamId: "1",
streamVersion: 1n,
},
{
id: "2",
name: "jamón",
streamId: "2",
streamVersion: 1n,
},
]);
});
it("should select with a limit and offset", async () => {
await driver.sync(schema);
await driver.insert(schema.users, {
id: "1",
name: "test",
streamId: "1",
streamVersion: 1n,
});
await driver.insert(schema.users, {
id: "2",
name: "jamón",
streamId: "2",
streamVersion: 1n,
});
const result = (
await driver.select(schema, {
from: "users",
limit: 1,
offset: 1,
})
).unwrapOrThrow();
expect(result).toEqual([
{
id: "2",
name: "jamón",
streamId: "2",
streamVersion: 1n,
},
]);
});
});

View File

@ -0,0 +1,257 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
import { AsyncResult, UnexpectedError } from "@fabric/core";
import { unlink } from "fs/promises";
import {
CircularDependencyError,
Collection,
Model,
ModelSchema,
QueryDefinition,
StorageDriver,
StoreQueryError,
} from "@fabric/domain";
import { Database, Statement } from "sqlite3";
import { filterToParams, filterToSQL } from "./filter-to-sql.js";
import { modelToSql } from "./model-to-sql.js";
import {
keyToParam,
recordToSQLKeyParams,
recordToSQLKeys,
recordToSQLParams,
recordToSQLSet,
} from "./record-utils.js";
import { transformRow } from "./sql-to-value.js";
import {
dbClose,
dbRun,
finalize,
getAll,
getOne,
prepare,
run,
} from "./sqlite-wrapper.js";
export class SQLiteStorageDriver implements StorageDriver {
private db: Database;
private cachedStatements = new Map<string, Statement>();
constructor(private path: string) {
this.db = new Database(path);
}
/**
* Get a statement from the cache or prepare a new one.
*/
private async getOrCreatePreparedStatement(sql: string): Promise<Statement> {
if (this.cachedStatements.has(sql)) {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion -- We know it's there.
return this.cachedStatements.get(sql)!;
}
const stmt = await prepare(this.db, sql);
this.cachedStatements.set(sql, stmt);
return stmt;
}
private async getSelectStatement(
collection: Collection,
query: QueryDefinition,
): Promise<[Statement, Record<string, any>]> {
const selectFields = query.keys ? query.keys.join(", ") : "*";
const queryFilter = filterToSQL(query.where);
const limit = query.limit ? `LIMIT ${query.limit}` : "";
const offset = query.offset ? `OFFSET ${query.offset}` : "";
const sql = [
`SELECT ${selectFields}`,
`FROM ${query.from}`,
queryFilter,
limit,
offset,
].join(" ");
return [
await this.getOrCreatePreparedStatement(sql),
{
...filterToParams(collection, query.where),
},
];
}
/**
* Insert data into the store
*/
async insert(
model: Model,
record: Record<string, any>,
): AsyncResult<void, StoreQueryError> {
return AsyncResult.tryFrom(
async () => {
const sql = `INSERT INTO ${model.name} (${recordToSQLKeys(record)}) VALUES (${recordToSQLKeyParams(record)})`;
const stmt = await this.getOrCreatePreparedStatement(sql);
return await run(stmt, recordToSQLParams(model, record));
},
(error) =>
new StoreQueryError(error.message, {
error,
collectionName: model.name,
record,
}),
);
}
/**
* Run a select query against the store.
*/
async select(
schema: ModelSchema,
query: QueryDefinition,
): AsyncResult<any[], StoreQueryError> {
return AsyncResult.tryFrom(
async () => {
const [stmt, params] = await this.getSelectStatement(
schema[query.from],
query,
);
return await getAll(stmt, params, transformRow(schema[query.from]));
},
(err) =>
new StoreQueryError(err.message, {
err,
query,
}),
);
}
/**
* Run a select query against the store.
*/
async selectOne(
schema: ModelSchema,
query: QueryDefinition,
): AsyncResult<any, StoreQueryError> {
return AsyncResult.tryFrom(
async () => {
const [stmt, params] = await this.getSelectStatement(
schema[query.from],
query,
);
return await getOne(stmt, params, transformRow(schema[query.from]));
},
(err) =>
new StoreQueryError(err.message, {
err,
query,
}),
);
}
/**
* Sincronice the store with the schema.
*/
async sync(
schema: ModelSchema,
): AsyncResult<void, StoreQueryError | CircularDependencyError> {
return AsyncResult.tryFrom(
async () => {
// Enable Write-Ahead Logging, which is faster and more reliable.
await dbRun(this.db, "PRAGMA journal_mode = WAL;");
// Enable foreign key constraints.
await dbRun(this.db, "PRAGMA foreign_keys = ON;");
// Begin a transaction to create the schema.
await dbRun(this.db, "BEGIN TRANSACTION;");
for (const modelKey in schema) {
const model = schema[modelKey];
await dbRun(this.db, modelToSql(model));
}
await dbRun(this.db, "COMMIT;");
},
(error) =>
new StoreQueryError(error.message, {
error,
schema,
}),
);
}
/**
* Drop the store. This is a destructive operation.
*/
async drop(): AsyncResult<void, StoreQueryError> {
return AsyncResult.tryFrom(
async () => {
if (this.path === ":memory:") {
throw "Cannot drop in-memory database";
} else {
await unlink(this.path);
}
},
(error) =>
new StoreQueryError(error.message, {
error,
}),
);
}
async close(): AsyncResult<void, UnexpectedError> {
return AsyncResult.from(async () => {
for (const stmt of this.cachedStatements.values()) {
await finalize(stmt);
}
await dbClose(this.db);
});
}
/**
* Update a record in the store.
*/
async update(
model: Model,
id: string,
record: Record<string, any>,
): AsyncResult<void, StoreQueryError> {
return AsyncResult.tryFrom(
async () => {
const sql = `UPDATE ${model.name} SET ${recordToSQLSet(record)} WHERE id = ${keyToParam("id")}`;
const stmt = await this.getOrCreatePreparedStatement(sql);
const params = recordToSQLParams(model, {
...record,
id,
});
return await run(stmt, params);
},
(error) =>
new StoreQueryError(error.message, {
error,
collectionName: model.name,
record,
}),
);
}
/**
* Delete a record from the store.
*/
async delete(model: Model, id: string): AsyncResult<void, StoreQueryError> {
return AsyncResult.tryFrom(
async () => {
const sql = `DELETE FROM ${model.name} WHERE id = ${keyToParam("id")}`;
const stmt = await this.getOrCreatePreparedStatement(sql);
return await run(stmt, { [keyToParam("id")]: id });
},
(error) =>
new StoreQueryError(error.message, {
error,
collectionName: model.name,
id,
}),
);
}
}

View File

@ -0,0 +1,97 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
import { Database, Statement } from "sqlite3";
export function dbRun(db: Database, statement: string): Promise<any> {
return new Promise((resolve, reject) => {
db.all(statement, (err, result) => {
if (err) {
reject(err);
} else {
resolve(result);
}
});
});
}
export function dbClose(db: Database): Promise<void> {
return new Promise((resolve, reject) => {
db.close((err) => {
if (err) {
reject(err);
} else {
resolve();
}
});
});
}
export function prepare(db: Database, statement: string): Promise<Statement> {
return new Promise((resolve, reject) => {
const stmt = db.prepare(statement, (err) => {
if (err) {
reject(err);
} else {
resolve(stmt);
}
});
});
}
export function run(
stmt: Statement,
params: Record<string, any>,
): Promise<void> {
return new Promise((resolve, reject) => {
stmt.run(params, (err: Error | null) => {
if (err) {
reject(err);
} else {
resolve();
}
});
});
}
export function getAll(
stmt: Statement,
params: Record<string, any>,
transformer: (row: any) => any,
): Promise<Record<string, any>[]> {
return new Promise((resolve, reject) => {
stmt.all(params, (err: Error | null, rows: Record<string, any>[]) => {
if (err) {
reject(err);
} else {
resolve(rows.map(transformer));
}
});
});
}
export function getOne(
stmt: Statement,
params: Record<string, any>,
transformer: (row: any) => any,
): Promise<Record<string, any>> {
return new Promise((resolve, reject) => {
stmt.all(params, (err: Error | null, rows: Record<string, any>[]) => {
if (err) {
reject(err);
} else {
resolve(rows.map(transformer)[0]);
}
});
});
}
export function finalize(stmt: Statement): Promise<void> {
return new Promise((resolve, reject) => {
stmt.finalize((err) => {
if (err) {
reject(err);
} else {
resolve();
}
});
});
}

View File

@ -1,134 +0,0 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
import { MaybePromise } from "@fabric/core";
import SQLite from "sqlite3";
export class SQLiteDatabase {
db: SQLite.Database;
private cachedStatements = new Map<string, SQLite.Statement>();
constructor(private readonly path: string) {
this.db = new SQLite.Database(path);
}
async init() {
await this.run("PRAGMA journal_mode = WAL");
await this.run("PRAGMA foreign_keys = ON");
}
async close() {
await this.finalizeStatements();
await new Promise<void>((resolve, reject) => {
this.db.close((err: Error | null) => {
if (err) {
reject(err);
} else {
resolve();
}
});
});
}
async withTransaction(fn: () => MaybePromise<void>) {
try {
await this.run("BEGIN TRANSACTION");
await fn();
await this.run("COMMIT");
} catch {
await this.run("ROLLBACK");
}
}
run(sql: string, params?: Record<string, any>) {
return new Promise<void>((resolve, reject) => {
this.db.run(sql, params, (err: Error | null) => {
if (err) {
reject(err);
} else {
resolve();
}
});
});
}
runPrepared(sql: string, params?: Record<string, any>) {
const cachedStmt = this.getCachedStatement(sql);
return new Promise<void>((resolve, reject) => {
cachedStmt.run(params, (err: Error | null) => {
if (err) {
reject(err);
} else {
resolve();
}
});
});
}
allPrepared(
sql: string,
params?: Record<string, any>,
transformer?: (row: any) => any,
) {
const cachedStmt = this.getCachedStatement(sql);
return new Promise<any>((resolve, reject) => {
cachedStmt.all(
params,
(err: Error | null, rows: Record<string, any>[]) => {
if (err) {
reject(err);
} else {
resolve(transformer ? rows.map(transformer) : rows);
}
},
);
});
}
onePrepared(
sql: string,
params?: Record<string, any>,
transformer?: (row: any) => any,
) {
const cachedStmt = this.getCachedStatement(sql);
return new Promise<any>((resolve, reject) => {
cachedStmt.all(
params,
(err: Error | null, rows: Record<string, any>[]) => {
if (err) {
reject(err);
} else {
resolve(transformer ? rows.map(transformer)[0] : rows[0]);
}
},
);
});
}
private getCachedStatement(sql: string) {
let cached = this.cachedStatements.get(sql);
if (!cached) {
const stmt = this.db.prepare(sql);
this.cachedStatements.set(sql, stmt);
cached = stmt;
}
return cached;
}
private async finalizeStatements() {
for (const stmt of this.cachedStatements.values()) {
await new Promise<void>((resolve, reject) => {
stmt.finalize((err: Error | null) => {
if (err) {
reject(err);
} else {
resolve();
}
});
});
}
}
}

View File

@ -1 +0,0 @@
export * from "./state-store.js";

View File

@ -1,127 +0,0 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
import { AsyncResult, Keyof, Optional } from "@fabric/core";
import {
Collection,
FilterOptions,
ModelSchema,
OrderByOptions,
QueryDefinition,
SelectableQuery,
StoreLimitableQuery,
StoreQuery,
StoreQueryError,
StoreSortableQuery,
} from "@fabric/domain";
import { filterToParams, filterToSQL } from "../sqlite/filter-to-sql.js";
import { transformRow } from "../sqlite/sql-to-value.js";
import { SQLiteDatabase } from "../sqlite/sqlite-wrapper.js";
export class QueryBuilder<T> implements StoreQuery<T> {
constructor(
private db: SQLiteDatabase,
private schema: ModelSchema,
private query: QueryDefinition,
) {}
where(where: FilterOptions<T>): StoreSortableQuery<T> {
return new QueryBuilder(this.db, this.schema, {
...this.query,
where,
});
}
orderBy(opts: OrderByOptions<T>): StoreLimitableQuery<T> {
return new QueryBuilder(this.db, this.schema, {
...this.query,
orderBy: opts,
});
}
limit(limit: number, offset?: number | undefined): SelectableQuery<T> {
return new QueryBuilder(this.db, this.schema, {
...this.query,
limit,
offset,
});
}
select(): AsyncResult<T[], StoreQueryError>;
select<K extends Keyof<T>>(
keys: K[],
): AsyncResult<Pick<T, K>[], StoreQueryError>;
select<K extends Keyof<T>>(keys?: K[]): AsyncResult<any, StoreQueryError> {
return AsyncResult.tryFrom(
async () => {
const [sql, params] = getSelectStatement(this.schema[this.query.from], {
...this.query,
keys,
});
return this.db.allPrepared(
sql,
params,
transformRow(this.schema[this.query.from]),
);
},
(err) =>
new StoreQueryError(err.message, {
err,
query: this.query,
}),
);
}
selectOne(): AsyncResult<Optional<T>, StoreQueryError>;
selectOne<K extends Keyof<T>>(
keys: K,
): AsyncResult<Optional<Pick<T, K>>, StoreQueryError>;
selectOne<K extends Keyof<T>>(keys?: K[]): AsyncResult<any, StoreQueryError> {
return AsyncResult.tryFrom(
async () => {
const [stmt, params] = getSelectStatement(
this.schema[this.query.from],
{
...this.query,
keys,
limit: 1,
},
);
return await this.db.onePrepared(
stmt,
params,
transformRow(this.schema[this.query.from]),
);
},
(err) =>
new StoreQueryError(err.message, {
err,
query: this.query,
}),
);
}
}
export function getSelectStatement(
collection: Collection,
query: QueryDefinition,
): [string, Record<string, any>] {
const selectFields = query.keys ? query.keys.join(", ") : "*";
const queryFilter = filterToSQL(query.where);
const limit = query.limit ? `LIMIT ${query.limit}` : "";
const offset = query.offset ? `OFFSET ${query.offset}` : "";
const sql = [
`SELECT ${selectFields}`,
`FROM ${query.from}`,
queryFilter,
limit,
offset,
].join(" ");
return [
sql,
{
...filterToParams(collection, query.where),
},
];
}

View File

@ -1,213 +0,0 @@
import { Run } from "@fabric/core";
import { defineModel, Field, isLike, UUID } from "@fabric/domain";
import { UUIDGeneratorMock } from "@fabric/domain/mocks";
import {
afterEach,
beforeEach,
describe,
expect,
expectTypeOf,
it,
} from "vitest";
import { SQLiteStateStore } from "./state-store.js";
describe("State Store", () => {
const models = [
defineModel("demo", {
value: Field.float(),
owner: Field.reference({ targetModel: "users" }),
}),
defineModel("users", {
name: Field.string(),
}),
];
let store: SQLiteStateStore<(typeof models)[number]>;
beforeEach(async () => {
store = new SQLiteStateStore(":memory:", models);
await Run.UNSAFE(() => store.migrate());
});
afterEach(async () => {
await Run.UNSAFE(() => store.close());
});
it("should insert a record", async () => {
const newUUID = UUIDGeneratorMock.generate();
await Run.UNSAFE(() =>
store.insertInto("users", {
id: newUUID,
name: "test",
streamId: newUUID,
streamVersion: 1n,
}),
);
});
it("should select all records", async () => {
const newUUID = UUIDGeneratorMock.generate();
await Run.UNSAFE(() =>
store.insertInto("users", {
name: "test",
id: newUUID,
streamId: newUUID,
streamVersion: 1n,
}),
);
const result = await Run.UNSAFE(() => store.from("users").select());
expectTypeOf(result).toEqualTypeOf<
{
id: UUID;
streamId: UUID;
streamVersion: bigint;
name: string;
}[]
>();
expect(result).toEqual([
{
id: newUUID,
streamId: newUUID,
streamVersion: 1n,
name: "test",
},
]);
});
it("should select records with a filter", async () => {
const newUUID = UUIDGeneratorMock.generate();
await Run.seqUNSAFE(
() =>
store.insertInto("users", {
name: "test",
id: newUUID,
streamId: newUUID,
streamVersion: 1n,
}),
() =>
store.insertInto("users", {
name: "anotherName",
id: UUIDGeneratorMock.generate(),
streamId: UUIDGeneratorMock.generate(),
streamVersion: 1n,
}),
() =>
store.insertInto("users", {
name: "anotherName2",
id: UUIDGeneratorMock.generate(),
streamId: UUIDGeneratorMock.generate(),
streamVersion: 1n,
}),
);
const result = await Run.UNSAFE(() =>
store
.from("users")
.where({
name: isLike("te%"),
})
.select(),
);
expectTypeOf(result).toEqualTypeOf<
{
id: UUID;
streamId: UUID;
streamVersion: bigint;
name: string;
}[]
>();
expect(result).toEqual([
{
id: newUUID,
streamId: newUUID,
streamVersion: 1n,
name: "test",
},
]);
});
it("should update a record", async () => {
const newUUID = UUIDGeneratorMock.generate();
await Run.UNSAFE(() =>
store.insertInto("users", {
name: "test",
id: newUUID,
streamId: newUUID,
streamVersion: 1n,
}),
);
await Run.UNSAFE(() =>
store.update("users", newUUID, {
name: "updated",
}),
);
const result = await Run.UNSAFE(() =>
store.from("users").where({ id: newUUID }).selectOne(),
);
expect(result).toEqual({
id: newUUID,
streamId: newUUID,
streamVersion: 1n,
name: "updated",
});
});
it("should delete a record", async () => {
const newUUID = UUIDGeneratorMock.generate();
await Run.UNSAFE(() =>
store.insertInto("users", {
name: "test",
id: newUUID,
streamId: newUUID,
streamVersion: 1n,
}),
);
await Run.UNSAFE(() => store.delete("users", newUUID));
const result = await Run.UNSAFE(() =>
store.from("users").where({ id: newUUID }).selectOne(),
);
expect(result).toBeUndefined();
});
//test for inserting into a collection with a reference
it("should insert a record with a reference", async () => {
const newUUID = UUIDGeneratorMock.generate();
const ownerUUID = UUIDGeneratorMock.generate();
await Run.UNSAFE(() =>
store.insertInto("users", {
id: ownerUUID,
name: "test",
streamId: ownerUUID,
streamVersion: 1n,
}),
);
await Run.UNSAFE(() =>
store.insertInto("demo", {
id: newUUID,
value: 1.0,
owner: ownerUUID,
streamId: newUUID,
streamVersion: 1n,
}),
);
});
});

View File

@ -1,144 +0,0 @@
import { AsyncResult, UnexpectedError } from "@fabric/core";
import {
Model,
ModelSchemaFromModels,
ModelToType,
StoreQuery,
StoreQueryError,
UUID,
WritableStateStore,
} from "@fabric/domain";
import { modelToSql } from "../sqlite/model-to-sql.js";
import {
keyToParam,
recordToSQLKeyParams,
recordToSQLKeys,
recordToSQLParams,
recordToSQLSet,
} from "../sqlite/record-utils.js";
import { SQLiteDatabase } from "../sqlite/sqlite-wrapper.js";
import { QueryBuilder } from "./query-builder.js";
export class SQLiteStateStore<TModel extends Model>
implements WritableStateStore<TModel>
{
private schema: ModelSchemaFromModels<TModel>;
private db: SQLiteDatabase;
constructor(
private dbPath: string,
models: TModel[],
) {
this.schema = models.reduce((acc, model: TModel) => {
return {
...acc,
[model.name]: model,
};
}, {} as ModelSchemaFromModels<TModel>);
this.db = new SQLiteDatabase(dbPath);
}
async insertInto<T extends keyof ModelSchemaFromModels<TModel>>(
collection: T,
record: ModelToType<ModelSchemaFromModels<TModel>[T]>,
): AsyncResult<void, StoreQueryError> {
const model = this.schema[collection];
return AsyncResult.tryFrom(
async () => {
await this.db.runPrepared(
`INSERT INTO ${model.name} (${recordToSQLKeys(record)}) VALUES (${recordToSQLKeyParams(record)})`,
recordToSQLParams(model, record),
);
},
(error) =>
new StoreQueryError(error.message, {
error,
collectionName: model.name,
record,
}),
);
}
from<T extends keyof ModelSchemaFromModels<TModel>>(
collection: T,
): StoreQuery<ModelToType<ModelSchemaFromModels<TModel>[T]>> {
return new QueryBuilder(this.db, this.schema, {
from: collection,
}) as StoreQuery<ModelToType<ModelSchemaFromModels<TModel>[T]>>;
}
update<T extends keyof ModelSchemaFromModels<TModel>>(
collection: T,
id: UUID,
record: Partial<ModelToType<ModelSchemaFromModels<TModel>[T]>>,
): AsyncResult<void, StoreQueryError> {
const model = this.schema[collection];
return AsyncResult.tryFrom(
async () => {
const params = recordToSQLParams(model, {
...record,
id,
});
await this.db.runPrepared(
`UPDATE ${model.name} SET ${recordToSQLSet(record)} WHERE id = ${keyToParam("id")}`,
params,
);
},
(error) =>
new StoreQueryError(error.message, {
error,
collectionName: model.name,
record,
}),
);
}
delete<T extends keyof ModelSchemaFromModels<TModel>>(
collection: T,
id: UUID,
): AsyncResult<void, StoreQueryError> {
const model = this.schema[collection];
return AsyncResult.tryFrom(
async () => {
await this.db.runPrepared(
`DELETE FROM ${model.name} WHERE id = ${keyToParam("id")}`,
{ $id: id },
);
},
(error) =>
new StoreQueryError(error.message, {
error,
collectionName: model.name,
id,
}),
);
}
migrate(): AsyncResult<void, StoreQueryError> {
return AsyncResult.tryFrom(
async () => {
await this.db.init();
await this.db.withTransaction(async () => {
for (const modelKey in this.schema) {
const model =
this.schema[modelKey as keyof ModelSchemaFromModels<TModel>];
await this.db.runPrepared(modelToSql(model));
}
});
},
(error) =>
new StoreQueryError(error.message, {
error,
schema: this.schema,
}),
);
}
async close(): AsyncResult<void, UnexpectedError> {
return AsyncResult.from(() => this.db.close());
}
}

View File

@ -1,11 +1,8 @@
{ {
"name": "@ulthar/lib-template", "name": "@ulthar/lib-template",
"type": "module", "type": "module",
"main": "./dist/index.js", "module": "dist/index.js",
"types": "./dist/index.d.ts", "main": "dist/index.js",
"exports": {
".": "./dist/index.js"
},
"files": [ "files": [
"dist" "dist"
], ],

View File

@ -419,16 +419,16 @@ __metadata:
resolution: "@fabric/domain@workspace:packages/fabric/domain" resolution: "@fabric/domain@workspace:packages/fabric/domain"
dependencies: dependencies:
"@fabric/core": "workspace:^" "@fabric/core": "workspace:^"
"@fabric/sqlite-store": "workspace:^" "@fabric/store-sqlite": "workspace:^"
decimal.js: "npm:^10.4.3" decimal.js: "npm:^10.4.3"
typescript: "npm:^5.6.2" typescript: "npm:^5.6.2"
vitest: "npm:^2.1.1" vitest: "npm:^2.1.1"
languageName: unknown languageName: unknown
linkType: soft linkType: soft
"@fabric/sqlite-store@workspace:^, @fabric/sqlite-store@workspace:packages/fabric/store-sqlite": "@fabric/store-sqlite@workspace:^, @fabric/store-sqlite@workspace:packages/fabric/store-sqlite":
version: 0.0.0-use.local version: 0.0.0-use.local
resolution: "@fabric/sqlite-store@workspace:packages/fabric/store-sqlite" resolution: "@fabric/store-sqlite@workspace:packages/fabric/store-sqlite"
dependencies: dependencies:
"@fabric/core": "workspace:^" "@fabric/core": "workspace:^"
"@fabric/domain": "workspace:^" "@fabric/domain": "workspace:^"