Feature: Basic Events, Models and Projections #2

Merged
piarrot merged 37 commits from feat-base-projections into main 2024-10-15 15:20:25 -03:00
4 changed files with 316 additions and 283 deletions
Showing only changes of commit 475ec309cb - Show all commits

View File

@ -1,4 +1,4 @@
import { isError } from "@fabric/core";
import { Run } from "@fabric/core";
import { defineModel, Field, isLike } from "@fabric/domain";
import { afterEach, beforeEach, describe, expect, it } from "vitest";
import { SQLiteStorageDriver } from "./sqlite-driver.js";
@ -21,27 +21,24 @@ describe("SQLite Store Driver", () => {
});
afterEach(async () => {
const result = await driver.close();
if (isError(result)) throw result;
await Run.UNSAFE(() => driver.close());
});
it("should synchronize the store and insert a record", async () => {
const result = await driver.sync(schema);
await Run.UNSAFE(() => driver.sync(schema));
if (isError(result)) throw result;
const insertResult = await driver.insert(schema.users, {
await Run.UNSAFE(() =>
driver.insert(schema.users, {
id: "1",
name: "test",
streamId: "1",
streamVersion: 1n,
});
}),
);
if (isError(insertResult)) throw insertResult;
const records = (
await driver.select(schema, { from: "users" })
).unwrapOrThrow();
const records = await Run.UNSAFE(() =>
driver.select(schema, { from: "users" }),
);
expect(records).toEqual([
{ id: "1", name: "test", streamId: "1", streamVersion: 1n },
@ -49,21 +46,24 @@ describe("SQLite Store Driver", () => {
});
it("should be update a record", async () => {
await driver.sync(schema);
await Run.UNSAFE(() => driver.sync(schema));
await driver.insert(schema.users, {
await Run.UNSAFE(() =>
driver.insert(schema.users, {
id: "1",
name: "test",
streamId: "1",
streamVersion: 1n,
});
}),
);
const err = await driver.update(schema.users, "1", { name: "updated" });
if (isError(err)) throw err;
await Run.UNSAFE(() =>
driver.update(schema.users, "1", { name: "updated" }),
);
const records = (
await driver.select(schema, { from: "users" })
).unwrapOrThrow();
const records = await Run.UNSAFE(() =>
driver.select(schema, { from: "users" }),
);
expect(records).toEqual([
{ id: "1", name: "updated", streamId: "1", streamVersion: 1n },
@ -71,43 +71,49 @@ describe("SQLite Store Driver", () => {
});
it("should be able to delete a record", async () => {
await driver.sync(schema);
await Run.UNSAFE(() => driver.sync(schema));
await driver.insert(schema.users, {
await Run.UNSAFE(() =>
driver.insert(schema.users, {
id: "1",
name: "test",
streamId: "1",
streamVersion: 1n,
});
}),
);
await driver.delete(schema.users, "1");
await Run.UNSAFE(() => driver.delete(schema.users, "1"));
const records = (
await driver.select(schema, { from: "users" })
).unwrapOrThrow();
const records = await Run.UNSAFE(() =>
driver.select(schema, { from: "users" }),
);
expect(records).toEqual([]);
});
it("should be able to select records", async () => {
await driver.sync(schema);
await Run.UNSAFE(() => driver.sync(schema));
await driver.insert(schema.users, {
await Run.UNSAFE(() =>
driver.insert(schema.users, {
id: "1",
name: "test",
streamId: "1",
streamVersion: 1n,
});
await driver.insert(schema.users, {
}),
);
await Run.UNSAFE(() =>
driver.insert(schema.users, {
id: "2",
name: "test",
streamId: "2",
streamVersion: 1n,
});
}),
);
const records = (
await driver.select(schema, { from: "users" })
).unwrapOrThrow();
const records = await Run.UNSAFE(() =>
driver.select(schema, { from: "users" }),
);
expect(records).toEqual([
{ id: "1", name: "test", streamId: "1", streamVersion: 1n },
@ -116,24 +122,28 @@ describe("SQLite Store Driver", () => {
});
it("should be able to select one record", async () => {
await driver.sync(schema);
await Run.UNSAFE(() => driver.sync(schema));
await driver.insert(schema.users, {
await Run.UNSAFE(() =>
driver.insert(schema.users, {
id: "1",
name: "test",
streamId: "1",
streamVersion: 1n,
});
await driver.insert(schema.users, {
}),
);
await Run.UNSAFE(() =>
driver.insert(schema.users, {
id: "2",
name: "test",
streamId: "2",
streamVersion: 1n,
});
}),
);
const record = (
await driver.selectOne(schema, { from: "users" })
).unwrapOrThrow();
const record = await Run.UNSAFE(() =>
driver.selectOne(schema, { from: "users" }),
);
expect(record).toEqual({
id: "1",
@ -144,27 +154,31 @@ describe("SQLite Store Driver", () => {
});
it("should select a record with a where clause", async () => {
await driver.sync(schema);
await Run.UNSAFE(() => driver.sync(schema));
await driver.insert(schema.users, {
await Run.UNSAFE(() =>
driver.insert(schema.users, {
id: "1",
name: "test",
streamId: "1",
streamVersion: 1n,
});
await driver.insert(schema.users, {
}),
);
await Run.UNSAFE(() =>
driver.insert(schema.users, {
id: "2",
name: "jamón",
streamId: "2",
streamVersion: 1n,
});
}),
);
const result = (
await driver.select(schema, {
const result = await Run.UNSAFE(() =>
driver.select(schema, {
from: "users",
where: { name: isLike("te%") },
})
).unwrapOrThrow();
}),
);
expect(result).toEqual([
{
@ -177,27 +191,31 @@ describe("SQLite Store Driver", () => {
});
it("should select a record with a where clause of a specific type", async () => {
await driver.sync(schema);
await Run.UNSAFE(() => driver.sync(schema));
await driver.insert(schema.users, {
await Run.UNSAFE(() =>
driver.insert(schema.users, {
id: "1",
name: "test",
streamId: "1",
streamVersion: 1n,
});
await driver.insert(schema.users, {
}),
);
await Run.UNSAFE(() =>
driver.insert(schema.users, {
id: "2",
name: "jamón",
streamId: "2",
streamVersion: 1n,
});
}),
);
const result = (
await driver.select(schema, {
const result = await Run.UNSAFE(() =>
driver.select(schema, {
from: "users",
where: { streamVersion: 1n },
})
).unwrapOrThrow();
}),
);
expect(result).toEqual([
{
@ -216,28 +234,32 @@ describe("SQLite Store Driver", () => {
});
it("should select with a limit and offset", async () => {
await driver.sync(schema);
await Run.UNSAFE(() => driver.sync(schema));
await driver.insert(schema.users, {
await Run.UNSAFE(() =>
driver.insert(schema.users, {
id: "1",
name: "test",
streamId: "1",
streamVersion: 1n,
});
await driver.insert(schema.users, {
}),
);
await Run.UNSAFE(() =>
driver.insert(schema.users, {
id: "2",
name: "jamón",
streamId: "2",
streamVersion: 1n,
});
}),
);
const result = (
await driver.select(schema, {
const result = await Run.UNSAFE(() =>
driver.select(schema, {
from: "users",
limit: 1,
offset: 1,
})
).unwrapOrThrow();
}),
);
expect(result).toEqual([
{

View File

@ -11,7 +11,6 @@ import {
StorageDriver,
StoreQueryError,
} from "@fabric/domain";
import { Database, Statement } from "sqlite3";
import { filterToParams, filterToSQL } from "./filter-to-sql.js";
import { modelToSql } from "./model-to-sql.js";
import {
@ -22,44 +21,19 @@ import {
recordToSQLSet,
} from "./record-utils.js";
import { transformRow } from "./sql-to-value.js";
import {
dbClose,
dbRun,
finalize,
getAll,
getOne,
prepare,
run,
} from "./sqlite-wrapper.js";
import { SQLiteDatabase } from "./sqlite/sqlite-wrapper.js";
export class SQLiteStorageDriver implements StorageDriver {
private db: Database;
private cachedStatements = new Map<string, Statement>();
private db: SQLiteDatabase;
constructor(private path: string) {
this.db = new Database(path);
this.db = new SQLiteDatabase(path);
}
/**
* Get a statement from the cache or prepare a new one.
*/
private async getOrCreatePreparedStatement(sql: string): Promise<Statement> {
if (this.cachedStatements.has(sql)) {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion -- We know it's there.
return this.cachedStatements.get(sql)!;
}
const stmt = await prepare(this.db, sql);
this.cachedStatements.set(sql, stmt);
return stmt;
}
private async getSelectStatement(
private getSelectStatement(
collection: Collection,
query: QueryDefinition,
): Promise<[Statement, Record<string, any>]> {
): [string, Record<string, any>] {
const selectFields = query.keys ? query.keys.join(", ") : "*";
const queryFilter = filterToSQL(query.where);
@ -75,7 +49,7 @@ export class SQLiteStorageDriver implements StorageDriver {
].join(" ");
return [
await this.getOrCreatePreparedStatement(sql),
sql,
{
...filterToParams(collection, query.where),
},
@ -91,9 +65,10 @@ export class SQLiteStorageDriver implements StorageDriver {
): AsyncResult<void, StoreQueryError> {
return AsyncResult.tryFrom(
async () => {
const sql = `INSERT INTO ${model.name} (${recordToSQLKeys(record)}) VALUES (${recordToSQLKeyParams(record)})`;
const stmt = await this.getOrCreatePreparedStatement(sql);
return await run(stmt, recordToSQLParams(model, record));
await this.db.runPrepared(
`INSERT INTO ${model.name} (${recordToSQLKeys(record)}) VALUES (${recordToSQLKeyParams(record)})`,
recordToSQLParams(model, record),
);
},
(error) =>
new StoreQueryError(error.message, {
@ -113,11 +88,15 @@ export class SQLiteStorageDriver implements StorageDriver {
): AsyncResult<any[], StoreQueryError> {
return AsyncResult.tryFrom(
async () => {
const [stmt, params] = await this.getSelectStatement(
const [sql, params] = this.getSelectStatement(
schema[query.from],
query,
);
return await getAll(stmt, params, transformRow(schema[query.from]));
return this.db.allPrepared(
sql,
params,
transformRow(schema[query.from]),
);
},
(err) =>
new StoreQueryError(err.message, {
@ -136,11 +115,15 @@ export class SQLiteStorageDriver implements StorageDriver {
): AsyncResult<any, StoreQueryError> {
return AsyncResult.tryFrom(
async () => {
const [stmt, params] = await this.getSelectStatement(
const [stmt, params] = this.getSelectStatement(
schema[query.from],
query,
);
return await getOne(stmt, params, transformRow(schema[query.from]));
return await this.db.onePrepared(
stmt,
params,
transformRow(schema[query.from]),
);
},
(err) =>
new StoreQueryError(err.message, {
@ -158,19 +141,12 @@ export class SQLiteStorageDriver implements StorageDriver {
): AsyncResult<void, StoreQueryError | CircularDependencyError> {
return AsyncResult.tryFrom(
async () => {
// Enable Write-Ahead Logging, which is faster and more reliable.
await dbRun(this.db, "PRAGMA journal_mode = WAL;");
// Enable foreign key constraints.
await dbRun(this.db, "PRAGMA foreign_keys = ON;");
// Begin a transaction to create the schema.
await dbRun(this.db, "BEGIN TRANSACTION;");
await this.db.withTransaction(async () => {
for (const modelKey in schema) {
const model = schema[modelKey];
await dbRun(this.db, modelToSql(model));
await this.db.runPrepared(modelToSql(model));
}
await dbRun(this.db, "COMMIT;");
});
},
(error) =>
new StoreQueryError(error.message, {
@ -201,10 +177,7 @@ export class SQLiteStorageDriver implements StorageDriver {
async close(): AsyncResult<void, UnexpectedError> {
return AsyncResult.from(async () => {
for (const stmt of this.cachedStatements.values()) {
await finalize(stmt);
}
await dbClose(this.db);
this.db.close();
});
}
@ -218,13 +191,14 @@ export class SQLiteStorageDriver implements StorageDriver {
): AsyncResult<void, StoreQueryError> {
return AsyncResult.tryFrom(
async () => {
const sql = `UPDATE ${model.name} SET ${recordToSQLSet(record)} WHERE id = ${keyToParam("id")}`;
const stmt = await this.getOrCreatePreparedStatement(sql);
const params = recordToSQLParams(model, {
...record,
id,
});
return await run(stmt, params);
await this.db.runPrepared(
`UPDATE ${model.name} SET ${recordToSQLSet(record)} WHERE id = ${keyToParam("id")}`,
params,
);
},
(error) =>
new StoreQueryError(error.message, {
@ -238,13 +212,13 @@ export class SQLiteStorageDriver implements StorageDriver {
/**
* Delete a record from the store.
*/
async delete(model: Model, id: string): AsyncResult<void, StoreQueryError> {
return AsyncResult.tryFrom(
async () => {
const sql = `DELETE FROM ${model.name} WHERE id = ${keyToParam("id")}`;
const stmt = await this.getOrCreatePreparedStatement(sql);
return await run(stmt, { [keyToParam("id")]: id });
await this.db.runPrepared(
`DELETE FROM ${model.name} WHERE id = ${keyToParam("id")}`,
{ $id: id },
);
},
(error) =>
new StoreQueryError(error.message, {

View File

@ -1,97 +0,0 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
import { Database, Statement } from "sqlite3";
export function dbRun(db: Database, statement: string): Promise<any> {
return new Promise((resolve, reject) => {
db.all(statement, (err, result) => {
if (err) {
reject(err);
} else {
resolve(result);
}
});
});
}
export function dbClose(db: Database): Promise<void> {
return new Promise((resolve, reject) => {
db.close((err) => {
if (err) {
reject(err);
} else {
resolve();
}
});
});
}
export function prepare(db: Database, statement: string): Promise<Statement> {
return new Promise((resolve, reject) => {
const stmt = db.prepare(statement, (err) => {
if (err) {
reject(err);
} else {
resolve(stmt);
}
});
});
}
export function run(
stmt: Statement,
params: Record<string, any>,
): Promise<void> {
return new Promise((resolve, reject) => {
stmt.run(params, (err: Error | null) => {
if (err) {
reject(err);
} else {
resolve();
}
});
});
}
export function getAll(
stmt: Statement,
params: Record<string, any>,
transformer: (row: any) => any,
): Promise<Record<string, any>[]> {
return new Promise((resolve, reject) => {
stmt.all(params, (err: Error | null, rows: Record<string, any>[]) => {
if (err) {
reject(err);
} else {
resolve(rows.map(transformer));
}
});
});
}
export function getOne(
stmt: Statement,
params: Record<string, any>,
transformer: (row: any) => any,
): Promise<Record<string, any>> {
return new Promise((resolve, reject) => {
stmt.all(params, (err: Error | null, rows: Record<string, any>[]) => {
if (err) {
reject(err);
} else {
resolve(rows.map(transformer)[0]);
}
});
});
}
export function finalize(stmt: Statement): Promise<void> {
return new Promise((resolve, reject) => {
stmt.finalize((err) => {
if (err) {
reject(err);
} else {
resolve();
}
});
});
}

View File

@ -0,0 +1,134 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
import { MaybePromise } from "@fabric/core";
import SQLite from "sqlite3";
export class SQLiteDatabase {
db: SQLite.Database;
private cachedStatements = new Map<string, SQLite.Statement>();
constructor(private readonly path: string) {
this.db = new SQLite.Database(path);
}
async init() {
await this.run("PRAGMA journal_mode = WAL");
await this.run("PRAGMA foreign_keys = ON");
}
async close() {
await this.finalizeStatements();
await new Promise<void>((resolve, reject) => {
this.db.close((err: Error | null) => {
if (err) {
reject(err);
} else {
resolve();
}
});
});
}
async withTransaction(fn: () => MaybePromise<void>) {
try {
await this.run("BEGIN TRANSACTION");
await fn();
await this.run("COMMIT");
} catch {
await this.run("ROLLBACK");
}
}
run(sql: string, params?: Record<string, any>) {
return new Promise<void>((resolve, reject) => {
this.db.run(sql, params, (err: Error | null) => {
if (err) {
reject(err);
} else {
resolve();
}
});
});
}
runPrepared(sql: string, params?: Record<string, any>) {
const cachedStmt = this.getCachedStatement(sql);
return new Promise<void>((resolve, reject) => {
cachedStmt.run(params, (err: Error | null) => {
if (err) {
reject(err);
} else {
resolve();
}
});
});
}
allPrepared(
sql: string,
params?: Record<string, any>,
transformer?: (row: any) => any,
) {
const cachedStmt = this.getCachedStatement(sql);
return new Promise<any>((resolve, reject) => {
cachedStmt.all(
params,
(err: Error | null, rows: Record<string, any>[]) => {
if (err) {
reject(err);
} else {
resolve(transformer ? rows.map(transformer) : rows);
}
},
);
});
}
onePrepared(
sql: string,
params?: Record<string, any>,
transformer?: (row: any) => any,
) {
const cachedStmt = this.getCachedStatement(sql);
return new Promise<any>((resolve, reject) => {
cachedStmt.all(
params,
(err: Error | null, rows: Record<string, any>[]) => {
if (err) {
reject(err);
} else {
resolve(transformer ? rows.map(transformer)[0] : rows[0]);
}
},
);
});
}
private getCachedStatement(sql: string) {
let cached = this.cachedStatements.get(sql);
if (!cached) {
const stmt = this.db.prepare(sql);
this.cachedStatements.set(sql, stmt);
cached = stmt;
}
return cached;
}
private async finalizeStatements() {
for (const stmt of this.cachedStatements.values()) {
await new Promise<void>((resolve, reject) => {
stmt.finalize((err: Error | null) => {
if (err) {
reject(err);
} else {
resolve();
}
});
});
}
}
}