diff --git a/packages/fabric/store-sqlite/src/sqlite-driver.spec.ts b/packages/fabric/store-sqlite/src/sqlite-driver.spec.ts index 89bc226..375165d 100644 --- a/packages/fabric/store-sqlite/src/sqlite-driver.spec.ts +++ b/packages/fabric/store-sqlite/src/sqlite-driver.spec.ts @@ -1,4 +1,4 @@ -import { isError } from "@fabric/core"; +import { Run } from "@fabric/core"; import { defineModel, Field, isLike } from "@fabric/domain"; import { afterEach, beforeEach, describe, expect, it } from "vitest"; import { SQLiteStorageDriver } from "./sqlite-driver.js"; @@ -21,27 +21,24 @@ describe("SQLite Store Driver", () => { }); afterEach(async () => { - const result = await driver.close(); - if (isError(result)) throw result; + await Run.UNSAFE(() => driver.close()); }); it("should synchronize the store and insert a record", async () => { - const result = await driver.sync(schema); + await Run.UNSAFE(() => driver.sync(schema)); - if (isError(result)) throw result; + await Run.UNSAFE(() => + driver.insert(schema.users, { + id: "1", + name: "test", + streamId: "1", + streamVersion: 1n, + }), + ); - const insertResult = await driver.insert(schema.users, { - id: "1", - name: "test", - streamId: "1", - streamVersion: 1n, - }); - - if (isError(insertResult)) throw insertResult; - - const records = ( - await driver.select(schema, { from: "users" }) - ).unwrapOrThrow(); + const records = await Run.UNSAFE(() => + driver.select(schema, { from: "users" }), + ); expect(records).toEqual([ { id: "1", name: "test", streamId: "1", streamVersion: 1n }, @@ -49,21 +46,24 @@ describe("SQLite Store Driver", () => { }); it("should be update a record", async () => { - await driver.sync(schema); + await Run.UNSAFE(() => driver.sync(schema)); - await driver.insert(schema.users, { - id: "1", - name: "test", - streamId: "1", - streamVersion: 1n, - }); + await Run.UNSAFE(() => + driver.insert(schema.users, { + id: "1", + name: "test", + streamId: "1", + streamVersion: 1n, + }), + ); - const err = await driver.update(schema.users, "1", { name: "updated" }); - if (isError(err)) throw err; + await Run.UNSAFE(() => + driver.update(schema.users, "1", { name: "updated" }), + ); - const records = ( - await driver.select(schema, { from: "users" }) - ).unwrapOrThrow(); + const records = await Run.UNSAFE(() => + driver.select(schema, { from: "users" }), + ); expect(records).toEqual([ { id: "1", name: "updated", streamId: "1", streamVersion: 1n }, @@ -71,43 +71,49 @@ describe("SQLite Store Driver", () => { }); it("should be able to delete a record", async () => { - await driver.sync(schema); + await Run.UNSAFE(() => driver.sync(schema)); - await driver.insert(schema.users, { - id: "1", - name: "test", - streamId: "1", - streamVersion: 1n, - }); + await Run.UNSAFE(() => + driver.insert(schema.users, { + id: "1", + name: "test", + streamId: "1", + streamVersion: 1n, + }), + ); - await driver.delete(schema.users, "1"); + await Run.UNSAFE(() => driver.delete(schema.users, "1")); - const records = ( - await driver.select(schema, { from: "users" }) - ).unwrapOrThrow(); + const records = await Run.UNSAFE(() => + driver.select(schema, { from: "users" }), + ); expect(records).toEqual([]); }); it("should be able to select records", async () => { - await driver.sync(schema); + await Run.UNSAFE(() => driver.sync(schema)); - await driver.insert(schema.users, { - id: "1", - name: "test", - streamId: "1", - streamVersion: 1n, - }); - await driver.insert(schema.users, { - id: "2", - name: "test", - streamId: "2", - streamVersion: 1n, - }); + await Run.UNSAFE(() => + driver.insert(schema.users, { + id: "1", + name: "test", + streamId: "1", + streamVersion: 1n, + }), + ); + await Run.UNSAFE(() => + driver.insert(schema.users, { + id: "2", + name: "test", + streamId: "2", + streamVersion: 1n, + }), + ); - const records = ( - await driver.select(schema, { from: "users" }) - ).unwrapOrThrow(); + const records = await Run.UNSAFE(() => + driver.select(schema, { from: "users" }), + ); expect(records).toEqual([ { id: "1", name: "test", streamId: "1", streamVersion: 1n }, @@ -116,24 +122,28 @@ describe("SQLite Store Driver", () => { }); it("should be able to select one record", async () => { - await driver.sync(schema); + await Run.UNSAFE(() => driver.sync(schema)); - await driver.insert(schema.users, { - id: "1", - name: "test", - streamId: "1", - streamVersion: 1n, - }); - await driver.insert(schema.users, { - id: "2", - name: "test", - streamId: "2", - streamVersion: 1n, - }); + await Run.UNSAFE(() => + driver.insert(schema.users, { + id: "1", + name: "test", + streamId: "1", + streamVersion: 1n, + }), + ); + await Run.UNSAFE(() => + driver.insert(schema.users, { + id: "2", + name: "test", + streamId: "2", + streamVersion: 1n, + }), + ); - const record = ( - await driver.selectOne(schema, { from: "users" }) - ).unwrapOrThrow(); + const record = await Run.UNSAFE(() => + driver.selectOne(schema, { from: "users" }), + ); expect(record).toEqual({ id: "1", @@ -144,27 +154,31 @@ describe("SQLite Store Driver", () => { }); it("should select a record with a where clause", async () => { - await driver.sync(schema); + await Run.UNSAFE(() => driver.sync(schema)); - await driver.insert(schema.users, { - id: "1", - name: "test", - streamId: "1", - streamVersion: 1n, - }); - await driver.insert(schema.users, { - id: "2", - name: "jamón", - streamId: "2", - streamVersion: 1n, - }); + await Run.UNSAFE(() => + driver.insert(schema.users, { + id: "1", + name: "test", + streamId: "1", + streamVersion: 1n, + }), + ); + await Run.UNSAFE(() => + driver.insert(schema.users, { + id: "2", + name: "jamón", + streamId: "2", + streamVersion: 1n, + }), + ); - const result = ( - await driver.select(schema, { + const result = await Run.UNSAFE(() => + driver.select(schema, { from: "users", where: { name: isLike("te%") }, - }) - ).unwrapOrThrow(); + }), + ); expect(result).toEqual([ { @@ -177,27 +191,31 @@ describe("SQLite Store Driver", () => { }); it("should select a record with a where clause of a specific type", async () => { - await driver.sync(schema); + await Run.UNSAFE(() => driver.sync(schema)); - await driver.insert(schema.users, { - id: "1", - name: "test", - streamId: "1", - streamVersion: 1n, - }); - await driver.insert(schema.users, { - id: "2", - name: "jamón", - streamId: "2", - streamVersion: 1n, - }); + await Run.UNSAFE(() => + driver.insert(schema.users, { + id: "1", + name: "test", + streamId: "1", + streamVersion: 1n, + }), + ); + await Run.UNSAFE(() => + driver.insert(schema.users, { + id: "2", + name: "jamón", + streamId: "2", + streamVersion: 1n, + }), + ); - const result = ( - await driver.select(schema, { + const result = await Run.UNSAFE(() => + driver.select(schema, { from: "users", where: { streamVersion: 1n }, - }) - ).unwrapOrThrow(); + }), + ); expect(result).toEqual([ { @@ -216,28 +234,32 @@ describe("SQLite Store Driver", () => { }); it("should select with a limit and offset", async () => { - await driver.sync(schema); + await Run.UNSAFE(() => driver.sync(schema)); - await driver.insert(schema.users, { - id: "1", - name: "test", - streamId: "1", - streamVersion: 1n, - }); - await driver.insert(schema.users, { - id: "2", - name: "jamón", - streamId: "2", - streamVersion: 1n, - }); + await Run.UNSAFE(() => + driver.insert(schema.users, { + id: "1", + name: "test", + streamId: "1", + streamVersion: 1n, + }), + ); + await Run.UNSAFE(() => + driver.insert(schema.users, { + id: "2", + name: "jamón", + streamId: "2", + streamVersion: 1n, + }), + ); - const result = ( - await driver.select(schema, { + const result = await Run.UNSAFE(() => + driver.select(schema, { from: "users", limit: 1, offset: 1, - }) - ).unwrapOrThrow(); + }), + ); expect(result).toEqual([ { diff --git a/packages/fabric/store-sqlite/src/sqlite-driver.ts b/packages/fabric/store-sqlite/src/sqlite-driver.ts index f5d4806..1ceee8f 100644 --- a/packages/fabric/store-sqlite/src/sqlite-driver.ts +++ b/packages/fabric/store-sqlite/src/sqlite-driver.ts @@ -11,7 +11,6 @@ import { StorageDriver, StoreQueryError, } from "@fabric/domain"; -import { Database, Statement } from "sqlite3"; import { filterToParams, filterToSQL } from "./filter-to-sql.js"; import { modelToSql } from "./model-to-sql.js"; import { @@ -22,44 +21,19 @@ import { recordToSQLSet, } from "./record-utils.js"; import { transformRow } from "./sql-to-value.js"; -import { - dbClose, - dbRun, - finalize, - getAll, - getOne, - prepare, - run, -} from "./sqlite-wrapper.js"; +import { SQLiteDatabase } from "./sqlite/sqlite-wrapper.js"; export class SQLiteStorageDriver implements StorageDriver { - private db: Database; - - private cachedStatements = new Map(); + private db: SQLiteDatabase; constructor(private path: string) { - this.db = new Database(path); + this.db = new SQLiteDatabase(path); } - /** - * Get a statement from the cache or prepare a new one. - */ - private async getOrCreatePreparedStatement(sql: string): Promise { - if (this.cachedStatements.has(sql)) { - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion -- We know it's there. - return this.cachedStatements.get(sql)!; - } - - const stmt = await prepare(this.db, sql); - this.cachedStatements.set(sql, stmt); - - return stmt; - } - - private async getSelectStatement( + private getSelectStatement( collection: Collection, query: QueryDefinition, - ): Promise<[Statement, Record]> { + ): [string, Record] { const selectFields = query.keys ? query.keys.join(", ") : "*"; const queryFilter = filterToSQL(query.where); @@ -75,7 +49,7 @@ export class SQLiteStorageDriver implements StorageDriver { ].join(" "); return [ - await this.getOrCreatePreparedStatement(sql), + sql, { ...filterToParams(collection, query.where), }, @@ -91,9 +65,10 @@ export class SQLiteStorageDriver implements StorageDriver { ): AsyncResult { return AsyncResult.tryFrom( async () => { - const sql = `INSERT INTO ${model.name} (${recordToSQLKeys(record)}) VALUES (${recordToSQLKeyParams(record)})`; - const stmt = await this.getOrCreatePreparedStatement(sql); - return await run(stmt, recordToSQLParams(model, record)); + await this.db.runPrepared( + `INSERT INTO ${model.name} (${recordToSQLKeys(record)}) VALUES (${recordToSQLKeyParams(record)})`, + recordToSQLParams(model, record), + ); }, (error) => new StoreQueryError(error.message, { @@ -113,11 +88,15 @@ export class SQLiteStorageDriver implements StorageDriver { ): AsyncResult { return AsyncResult.tryFrom( async () => { - const [stmt, params] = await this.getSelectStatement( + const [sql, params] = this.getSelectStatement( schema[query.from], query, ); - return await getAll(stmt, params, transformRow(schema[query.from])); + return this.db.allPrepared( + sql, + params, + transformRow(schema[query.from]), + ); }, (err) => new StoreQueryError(err.message, { @@ -136,11 +115,15 @@ export class SQLiteStorageDriver implements StorageDriver { ): AsyncResult { return AsyncResult.tryFrom( async () => { - const [stmt, params] = await this.getSelectStatement( + const [stmt, params] = this.getSelectStatement( schema[query.from], query, ); - return await getOne(stmt, params, transformRow(schema[query.from])); + return await this.db.onePrepared( + stmt, + params, + transformRow(schema[query.from]), + ); }, (err) => new StoreQueryError(err.message, { @@ -158,19 +141,12 @@ export class SQLiteStorageDriver implements StorageDriver { ): AsyncResult { return AsyncResult.tryFrom( async () => { - // Enable Write-Ahead Logging, which is faster and more reliable. - await dbRun(this.db, "PRAGMA journal_mode = WAL;"); - - // Enable foreign key constraints. - await dbRun(this.db, "PRAGMA foreign_keys = ON;"); - - // Begin a transaction to create the schema. - await dbRun(this.db, "BEGIN TRANSACTION;"); - for (const modelKey in schema) { - const model = schema[modelKey]; - await dbRun(this.db, modelToSql(model)); - } - await dbRun(this.db, "COMMIT;"); + await this.db.withTransaction(async () => { + for (const modelKey in schema) { + const model = schema[modelKey]; + await this.db.runPrepared(modelToSql(model)); + } + }); }, (error) => new StoreQueryError(error.message, { @@ -201,10 +177,7 @@ export class SQLiteStorageDriver implements StorageDriver { async close(): AsyncResult { return AsyncResult.from(async () => { - for (const stmt of this.cachedStatements.values()) { - await finalize(stmt); - } - await dbClose(this.db); + this.db.close(); }); } @@ -218,13 +191,14 @@ export class SQLiteStorageDriver implements StorageDriver { ): AsyncResult { return AsyncResult.tryFrom( async () => { - const sql = `UPDATE ${model.name} SET ${recordToSQLSet(record)} WHERE id = ${keyToParam("id")}`; - const stmt = await this.getOrCreatePreparedStatement(sql); const params = recordToSQLParams(model, { ...record, id, }); - return await run(stmt, params); + await this.db.runPrepared( + `UPDATE ${model.name} SET ${recordToSQLSet(record)} WHERE id = ${keyToParam("id")}`, + params, + ); }, (error) => new StoreQueryError(error.message, { @@ -238,13 +212,13 @@ export class SQLiteStorageDriver implements StorageDriver { /** * Delete a record from the store. */ - async delete(model: Model, id: string): AsyncResult { return AsyncResult.tryFrom( async () => { - const sql = `DELETE FROM ${model.name} WHERE id = ${keyToParam("id")}`; - const stmt = await this.getOrCreatePreparedStatement(sql); - return await run(stmt, { [keyToParam("id")]: id }); + await this.db.runPrepared( + `DELETE FROM ${model.name} WHERE id = ${keyToParam("id")}`, + { $id: id }, + ); }, (error) => new StoreQueryError(error.message, { diff --git a/packages/fabric/store-sqlite/src/sqlite-wrapper.ts b/packages/fabric/store-sqlite/src/sqlite-wrapper.ts deleted file mode 100644 index 4a58620..0000000 --- a/packages/fabric/store-sqlite/src/sqlite-wrapper.ts +++ /dev/null @@ -1,97 +0,0 @@ -/* eslint-disable @typescript-eslint/no-explicit-any */ -import { Database, Statement } from "sqlite3"; - -export function dbRun(db: Database, statement: string): Promise { - return new Promise((resolve, reject) => { - db.all(statement, (err, result) => { - if (err) { - reject(err); - } else { - resolve(result); - } - }); - }); -} - -export function dbClose(db: Database): Promise { - return new Promise((resolve, reject) => { - db.close((err) => { - if (err) { - reject(err); - } else { - resolve(); - } - }); - }); -} - -export function prepare(db: Database, statement: string): Promise { - return new Promise((resolve, reject) => { - const stmt = db.prepare(statement, (err) => { - if (err) { - reject(err); - } else { - resolve(stmt); - } - }); - }); -} - -export function run( - stmt: Statement, - params: Record, -): Promise { - return new Promise((resolve, reject) => { - stmt.run(params, (err: Error | null) => { - if (err) { - reject(err); - } else { - resolve(); - } - }); - }); -} - -export function getAll( - stmt: Statement, - params: Record, - transformer: (row: any) => any, -): Promise[]> { - return new Promise((resolve, reject) => { - stmt.all(params, (err: Error | null, rows: Record[]) => { - if (err) { - reject(err); - } else { - resolve(rows.map(transformer)); - } - }); - }); -} - -export function getOne( - stmt: Statement, - params: Record, - transformer: (row: any) => any, -): Promise> { - return new Promise((resolve, reject) => { - stmt.all(params, (err: Error | null, rows: Record[]) => { - if (err) { - reject(err); - } else { - resolve(rows.map(transformer)[0]); - } - }); - }); -} - -export function finalize(stmt: Statement): Promise { - return new Promise((resolve, reject) => { - stmt.finalize((err) => { - if (err) { - reject(err); - } else { - resolve(); - } - }); - }); -} diff --git a/packages/fabric/store-sqlite/src/sqlite/sqlite-wrapper.ts b/packages/fabric/store-sqlite/src/sqlite/sqlite-wrapper.ts new file mode 100644 index 0000000..7424a0f --- /dev/null +++ b/packages/fabric/store-sqlite/src/sqlite/sqlite-wrapper.ts @@ -0,0 +1,134 @@ +/* eslint-disable @typescript-eslint/no-explicit-any */ +import { MaybePromise } from "@fabric/core"; +import SQLite from "sqlite3"; + +export class SQLiteDatabase { + db: SQLite.Database; + + private cachedStatements = new Map(); + + constructor(private readonly path: string) { + this.db = new SQLite.Database(path); + } + + async init() { + await this.run("PRAGMA journal_mode = WAL"); + await this.run("PRAGMA foreign_keys = ON"); + } + + async close() { + await this.finalizeStatements(); + await new Promise((resolve, reject) => { + this.db.close((err: Error | null) => { + if (err) { + reject(err); + } else { + resolve(); + } + }); + }); + } + + async withTransaction(fn: () => MaybePromise) { + try { + await this.run("BEGIN TRANSACTION"); + await fn(); + await this.run("COMMIT"); + } catch { + await this.run("ROLLBACK"); + } + } + + run(sql: string, params?: Record) { + return new Promise((resolve, reject) => { + this.db.run(sql, params, (err: Error | null) => { + if (err) { + reject(err); + } else { + resolve(); + } + }); + }); + } + + runPrepared(sql: string, params?: Record) { + const cachedStmt = this.getCachedStatement(sql); + + return new Promise((resolve, reject) => { + cachedStmt.run(params, (err: Error | null) => { + if (err) { + reject(err); + } else { + resolve(); + } + }); + }); + } + + allPrepared( + sql: string, + params?: Record, + transformer?: (row: any) => any, + ) { + const cachedStmt = this.getCachedStatement(sql); + + return new Promise((resolve, reject) => { + cachedStmt.all( + params, + (err: Error | null, rows: Record[]) => { + if (err) { + reject(err); + } else { + resolve(transformer ? rows.map(transformer) : rows); + } + }, + ); + }); + } + + onePrepared( + sql: string, + params?: Record, + transformer?: (row: any) => any, + ) { + const cachedStmt = this.getCachedStatement(sql); + + return new Promise((resolve, reject) => { + cachedStmt.all( + params, + (err: Error | null, rows: Record[]) => { + if (err) { + reject(err); + } else { + resolve(transformer ? rows.map(transformer)[0] : rows[0]); + } + }, + ); + }); + } + + private getCachedStatement(sql: string) { + let cached = this.cachedStatements.get(sql); + + if (!cached) { + const stmt = this.db.prepare(sql); + this.cachedStatements.set(sql, stmt); + cached = stmt; + } + return cached; + } + + private async finalizeStatements() { + for (const stmt of this.cachedStatements.values()) { + await new Promise((resolve, reject) => { + stmt.finalize((err: Error | null) => { + if (err) { + reject(err); + } else { + resolve(); + } + }); + }); + } + } +}