Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/host/app/lib/browser-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ export class BrowserQueue implements QueuePublisher, QueueRunner {

private debouncedDrainJobs = debounce(() => {
this.drainJobs();
}, 250);
}, 1);

private async drainJobs() {
await this.flush();
Expand Down
340 changes: 281 additions & 59 deletions packages/host/app/lib/sqlite-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ export default class SQLiteAdapter implements DBAdapter {
private _dbId: string | undefined;
private primaryKeys = new Map<string, string>();
private tables: string[] = [];
private snapshotCounter = 0;
private snapshotInfos = new Map<string, { filename: string; dbId: string }>();
#isClosed = false;
private started = this.#startClient();

Expand All @@ -42,55 +44,7 @@ export default class SQLiteAdapter implements DBAdapter {
});
this._sqlite = await waitForPromise(ready.promise, 'sqlite startup');

let response = await this.sqlite('open', {
// It is possible to write to the local
// filesystem via Origin Private Filesystem, but it requires _very_
// restrictive response headers that would cause our host app to break
// "Cross-Origin-Embedder-Policy: require-corp"
// "Cross-Origin-Opener-Policy: same-origin"
// https://webkit.org/blog/12257/the-file-system-access-api-with-origin-private-file-system/

// Otherwise, local storage and session storage are off limits to the
// worker (they are available in the synchronous interface), so only
// ephemeral memory storage is available
filename: ':memory:',
});
const { dbId } = response;
this._dbId = dbId;

if (this.schemaSQL) {
try {
await this.sqlite('exec', {
dbId: this.dbId,
sql: this.schemaSQL,
});
} catch (e: any) {
console.error(
`Error executing SQL: ${e.result.message}\n${this.schemaSQL}`,
e,
);
throw e;
}

this.tables = (
(await this.internalExecute(
`SELECT name FROM pragma_table_list WHERE schema = 'main' AND name != 'sqlite_schema'`,
)) as { name: string }[]
).map((r) => r.name);
let pks = (await this.internalExecute(
`
SELECT m.name AS table_name,
GROUP_CONCAT(p.name, ', ') AS primary_keys
FROM sqlite_master AS m
JOIN pragma_table_info(m.name) AS p ON m.type = 'table'
WHERE p.pk > 0
GROUP BY m.name;
`,
)) as { table_name: string; primary_keys: string }[];
for (let { table_name, primary_keys } of pks) {
this.primaryKeys.set(table_name, primary_keys);
}
}
await this.#openDatabase(':memory:', true);
}

async execute(sql: string, opts?: ExecuteOptions) {
Expand All @@ -107,6 +61,9 @@ export default class SQLiteAdapter implements DBAdapter {
async close() {
this.assertNotClosed();
await this.started;
for (let snapshotName of Array.from(this.snapshotInfos.keys())) {
await this.deleteSnapshot(snapshotName);
}
await this.sqlite('close', { dbId: this.dbId });
this.#isClosed = true;
}
Expand Down Expand Up @@ -209,6 +166,221 @@ export default class SQLiteAdapter implements DBAdapter {
return results;
}

async exportSnapshot(snapshotName?: string): Promise<string> {
this.assertNotClosed();
await this.started;
let alias = snapshotName ?? `snapshot_${++this.snapshotCounter}`;
if (!/^[A-Za-z][A-Za-z0-9_]*$/.test(alias)) {
throw new Error(
`Snapshot name '${alias}' is invalid. Snapshot names must match /^[A-Za-z][A-Za-z0-9_]*$/.`,
);
}
if (this.snapshotInfos.has(alias)) {
throw new Error(`Snapshot database '${alias}' already exists`);
}
let filename = `file:${alias}?mode=memory&cache=shared`;
let response = await this.sqlite('open', {
filename,
});
this.snapshotInfos.set(alias, { filename, dbId: response.dbId });
await this.sqlite('exec', {
dbId: this.dbId,
sql: `ATTACH DATABASE '${filename}' AS ${alias};`,
});
let schemaEntries = (await this.internalExecute(
`SELECT name, sql
FROM sqlite_schema
WHERE type = 'table'
AND sql IS NOT NULL
AND name NOT LIKE 'sqlite_%'
ORDER BY name;`,
)) as { name: string; sql: string }[];
for (let entry of schemaEntries) {
let rewritten = this.#rewriteSchemaSql(entry.sql, alias);
await this.sqlite('exec', { dbId: this.dbId, sql: rewritten });
await this.sqlite('exec', {
dbId: this.dbId,
sql: `DELETE FROM ${alias}.${this.#quoteIdentifier(entry.name)};`,
});
let columns = await this.getColumnNames(entry.name);
let copySql = this.#buildCopyIntoTableSql({
destinationSchema: alias,
sourceSchema: 'main',
tableName: entry.name,
columns,
});
try {
await this.sqlite('exec', {
dbId: this.dbId,
sql: copySql,
});
} catch (e: any) {
console.error(
`Snapshot export failed while copying table '${entry.name}': ${
e?.result?.message ?? e?.message ?? e
}\nSQL:\n${copySql}`,
e,
);
throw e;
}
}
return alias;
}

hasSnapshot(snapshotName: string): boolean {
return this.snapshotInfos.has(snapshotName);
}

async deleteSnapshotsByPrefix(snapshotNamePrefix: string): Promise<void> {
this.assertNotClosed();
await this.started;
let snapshotNames = Array.from(this.snapshotInfos.keys()).filter((name) =>
name.startsWith(snapshotNamePrefix),
);
for (let snapshotName of snapshotNames) {
await this.deleteSnapshot(snapshotName);
}
}

async deleteSnapshot(snapshotName: string): Promise<void> {
this.assertNotClosed();
await this.started;
let snapshotInfo = this.snapshotInfos.get(snapshotName);
if (!snapshotInfo) {
return;
}

let attached = (await this.internalExecute(
'SELECT name FROM pragma_database_list WHERE name = $1;',
{ bind: [snapshotName] },
)) as { name: string }[];

if (attached.length) {
await this.sqlite('exec', {
dbId: this.dbId,
sql: `DETACH DATABASE ${this.#quoteIdentifier(snapshotName)};`,
});
}

await this.sqlite('close', {
dbId: snapshotInfo.dbId,
});
this.snapshotInfos.delete(snapshotName);
}

async importSnapshot(snapshotName: string) {
this.assertNotClosed();
await this.started;
let snapshotInfo = this.snapshotInfos.get(snapshotName);
if (!snapshotInfo) {
throw new Error(`Unknown snapshot database '${snapshotName}'`);
}
let attached = (await this.internalExecute(
`SELECT name FROM pragma_database_list WHERE name = '${snapshotName}'`,
)) as { name: string }[];
if (!attached.length) {
await this.sqlite('exec', {
dbId: this.dbId,
sql: `ATTACH DATABASE '${snapshotInfo.filename}' AS ${snapshotName};`,
});
}
let tables = (await this.internalExecute(
`SELECT name
FROM ${snapshotName}.sqlite_schema
WHERE type = 'table'
AND name NOT LIKE 'sqlite_%';`,
)) as { name: string }[];
let tableNames = tables.map((t) => t.name);
let columnsByTable = await this.#getWritableColumnsByTable(tableNames);
let statements: string[] = ['BEGIN IMMEDIATE;'];
for (let tableName of tableNames) {
let columns = columnsByTable.get(tableName);
if (!columns?.length) {
columns = await this.getColumnNames(tableName);
}
statements.push(
`DELETE FROM main.${this.#quoteIdentifier(tableName)};`,
this.#buildCopyIntoTableSql({
destinationSchema: 'main',
sourceSchema: snapshotName,
tableName,
columns,
}),
);
}
statements.push('COMMIT;');
try {
await this.sqlite('exec', {
dbId: this.dbId,
sql: statements.join('\n'),
});
} catch (e: any) {
// Ensure we don't leave an open transaction after a failed batched import.
try {
await this.sqlite('exec', { dbId: this.dbId, sql: 'ROLLBACK;' });
} catch (_rollbackError) {
// noop
}
console.error(
`Snapshot import failed while executing batched restore: ${
e?.result?.message ?? e?.message ?? e
}`,
e,
);
throw e;
}
}

async #openDatabase(filename: string, initializeSchema = true) {
// It is possible to write to the local
// filesystem via Origin Private Filesystem, but it requires _very_
// restrictive response headers that would cause our host app to break
// so we always stick to in-memory files to keep behavior deterministic.
let response = await this.sqlite('open', {
filename,
});
const { dbId } = response;
this._dbId = dbId;

if (initializeSchema && this.schemaSQL) {
try {
await this.sqlite('exec', {
dbId: this.dbId,
sql: this.schemaSQL,
});
} catch (e: any) {
console.error(
`Error executing SQL: ${e.result.message}\n${this.schemaSQL}`,
e,
);
throw e;
}
}

await this.#loadSchemaMetadata();
}

async #loadSchemaMetadata() {
let tables = (await this.internalExecute(
`SELECT name FROM pragma_table_list WHERE schema = 'main' AND name != 'sqlite_schema'`,
)) as { name: string }[];
this.tables = tables.map((r) => r.name);
let pks = (await this.internalExecute(
`
SELECT m.name AS table_name,
GROUP_CONCAT(p.name, ', ') AS primary_keys
FROM sqlite_master AS m
JOIN pragma_table_info(m.name) AS p ON m.type = 'table'
WHERE p.pk > 0
GROUP BY m.name;
`,
)) as { table_name: string; primary_keys: string }[];
this.primaryKeys.clear();
for (let { table_name, primary_keys } of pks) {
this.primaryKeys.set(table_name, primary_keys);
}
}

private adjustSQL(sql: string): string {
return sql
.replace(/ON CONFLICT ON CONSTRAINT (\w*)\b/, (_, constraint) => {
Expand All @@ -227,16 +399,12 @@ export default class SQLiteAdapter implements DBAdapter {
.replace(/ILIKE/g, 'LIKE') // sqlite LIKE is case insensitive
.replace(/jsonb_array_elements_text\(/g, 'json_each(')
.replace(/jsonb_tree\(/g, 'json_tree(')
.replace(
/\b([A-Za-z0-9_]+)_array_element\b/g,
(match, _alias, offset, sql) => {
let preceding = sql.slice(0, offset);
if (/as\s+$/i.test(preceding)) {
return match;
}
return `${match}.value`;
},
)
.replace(/([^\s]+\s[^\s]+)_array_element/g, (match, group) => {
if (group.startsWith('as ')) {
return match;
}
return `${match}.value`;
})
.replace(/\.text_value/g, '.value')
.replace(/\.jsonb_value/g, '.value')
.replace(/= 'null'::jsonb/g, 'IS NULL')
Expand All @@ -252,6 +420,60 @@ export default class SQLiteAdapter implements DBAdapter {
);
}
}

#quoteIdentifier(identifier: string) {
return `"${identifier.replace(/"/g, '""')}"`;
}

#rewriteSchemaSql(sql: string, schema: string) {
return sql.replace(
/^(CREATE\s+(?:TEMP\s+)?TABLE(?:\s+IF\s+NOT\s+EXISTS)?)\s+/i,
(match) => `${match} ${schema}.`,
);
}

#buildCopyIntoTableSql({
destinationSchema,
sourceSchema,
tableName,
columns,
}: {
destinationSchema: string;
sourceSchema: string;
tableName: string;
columns: string[];
}) {
let quotedColumns = columns.map((c) => this.#quoteIdentifier(c)).join(', ');
return `INSERT INTO ${destinationSchema}.${this.#quoteIdentifier(tableName)} (${quotedColumns})
SELECT ${quotedColumns} FROM ${sourceSchema}.${this.#quoteIdentifier(tableName)};`;
}

async #getWritableColumnsByTable(tableNames: string[]) {
if (!tableNames.length) {
return new Map<string, string[]>();
}
let tableNamesSql = tableNames
.map((name) => `'${name.replace(/'/g, "''")}'`)
.join(', ');
let rows = (await this.internalExecute(`
SELECT m.name AS table_name, p.name AS column_name
FROM main.sqlite_schema AS m
JOIN pragma_table_info(m.name) AS p
WHERE m.type = 'table'
AND m.name IN (${tableNamesSql})
ORDER BY m.name, p.cid;
`)) as { table_name: string; column_name: string }[];
let map = new Map<string, string[]>();
for (let { table_name, column_name } of rows) {
let columns = map.get(table_name);
if (!columns) {
columns = [];
map.set(table_name, columns);
}
columns.push(column_name);
}
return map;
}
}

function assertNever(value: never) {
Expand Down
Loading
Loading