Skip to content

Commit 854be17

Browse files
updated DB adapters with utils
1 parent 9adc0ba commit 854be17

File tree

7 files changed

+174
-149
lines changed

7 files changed

+174
-149
lines changed

packages/powersync-sdk-common/src/client/AbstractPowerSyncDatabase.ts

Lines changed: 38 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
6868
this.closed = true;
6969
this.options = { ...DEFAULT_POWERSYNC_DB_OPTIONS, ...options };
7070
this.bucketStorageAdapter = this.generateBucketStorageAdapter();
71-
this.sdkVersion = this.options.database.execute('SELECT powersync_rs_version()').rows?.item(0)[
72-
'powersync_rs_version()'
73-
];
71+
this.sdkVersion = '';
7472
}
7573

7674
get schema() {
@@ -96,7 +94,9 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
9694
this.initialized = (async () => {
9795
await this._init();
9896
await this.bucketStorageAdapter.init();
99-
await this.database.executeAsync('SELECT powersync_replace_schema(?)', [JSON.stringify(this.schema.toJSON())]);
97+
await this.database.execute('SELECT powersync_replace_schema(?)', [JSON.stringify(this.schema.toJSON())]);
98+
const version = await this.options.database.execute('SELECT powersync_rs_version()');
99+
this.sdkVersion = version.rows?.item(0)['powersync_rs_version()'] ?? '';
100100
})();
101101
await this.initialized;
102102
}
@@ -109,7 +109,6 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
109109
await this.disconnect();
110110

111111
await this.initialized;
112-
113112
this.syncStreamImplementation = this.generateSyncStreamImplementation(connector);
114113
this.syncStatusListenerDisposer = this.syncStreamImplementation.registerListener({
115114
statusChanged: (status) => {
@@ -140,20 +139,20 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
140139
await this.disconnect();
141140

142141
// TODO DB name, verify this is necessary with extension
143-
await this.database.transaction(async (tx) => {
144-
await tx.executeAsync('DELETE FROM ps_oplog WHERE 1');
145-
await tx.executeAsync('DELETE FROM ps_crud WHERE 1');
146-
await tx.executeAsync('DELETE FROM ps_buckets WHERE 1');
142+
await this.database.writeTransaction(async (tx) => {
143+
await tx.execute('DELETE FROM ps_oplog WHERE 1');
144+
await tx.execute('DELETE FROM ps_crud WHERE 1');
145+
await tx.execute('DELETE FROM ps_buckets WHERE 1');
147146

148-
const existingTableRows = await tx.executeAsync(
147+
const existingTableRows = await tx.execute(
149148
"SELECT name FROM sqlite_master WHERE type='table' AND name GLOB 'ps_data_*'"
150149
);
151150

152151
if (!existingTableRows.rows.length) {
153152
return;
154153
}
155154
for (const row of existingTableRows.rows._array) {
156-
await tx.executeAsync(`DELETE FROM ${row.name} WHERE 1`);
155+
await tx.execute(`DELETE FROM ${row.name} WHERE 1`);
157156
}
158157
});
159158
}
@@ -179,14 +178,12 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
179178
async getUploadQueueStats(includeSize?: boolean): Promise<UploadQueueStats> {
180179
return this.readTransaction(async (tx) => {
181180
if (includeSize) {
182-
const result = await tx.executeAsync(
183-
'SELECT SUM(cast(data as blob) + 20) as size, count(*) as count FROM ps_crud'
184-
);
181+
const result = await tx.execute('SELECT SUM(cast(data as blob) + 20) as size, count(*) as count FROM ps_crud');
185182

186183
const row = result.rows.item(0);
187184
return new UploadQueueStats(row?.count ?? 0, row?.size ?? 0);
188185
} else {
189-
const result = await tx.executeAsync('SELECT count(*) as count FROM ps_crud');
186+
const result = await tx.execute('SELECT count(*) as count FROM ps_crud');
190187
const row = result.rows.item(0);
191188
return new UploadQueueStats(row?.count ?? 0);
192189
}
@@ -211,7 +208,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
211208
* and a single transaction may be split over multiple batches.
212209
*/
213210
async getCrudBatch(limit: number): Promise<CrudBatch | null> {
214-
const result = await this.database.executeAsync('SELECT id, tx_id, data FROM ps_crud ORDER BY id ASC LIMIT ?', [
211+
const result = await this.database.execute('SELECT id, tx_id, data FROM ps_crud ORDER BY id ASC LIMIT ?', [
215212
limit + 1
216213
]);
217214

@@ -229,11 +226,11 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
229226
const last = all[all.length - 1];
230227
return new CrudBatch(all, haveMore, async (writeCheckpoint?: string) => {
231228
await this.writeTransaction(async (tx) => {
232-
await tx.executeAsync('DELETE FROM ps_crud WHERE id <= ?', [last.clientId]);
233-
if (writeCheckpoint != null && (await tx.executeAsync('SELECT 1 FROM ps_crud LIMIT 1')) == null) {
234-
await tx.executeAsync("UPDATE ps_buckets SET target_op = ? WHERE name='$local'", [writeCheckpoint]);
229+
await tx.execute('DELETE FROM ps_crud WHERE id <= ?', [last.clientId]);
230+
if (writeCheckpoint != null && (await tx.execute('SELECT 1 FROM ps_crud LIMIT 1')) == null) {
231+
await tx.execute("UPDATE ps_buckets SET target_op = ? WHERE name='$local'", [writeCheckpoint]);
235232
} else {
236-
await tx.executeAsync("UPDATE ps_buckets SET target_op = ? WHERE name='$local'", [
233+
await tx.execute("UPDATE ps_buckets SET target_op = ? WHERE name='$local'", [
237234
this.bucketStorageAdapter.getMaxOpId()
238235
]);
239236
}
@@ -256,7 +253,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
256253
*/
257254
async getNextCrudTransaction(): Promise<CrudTransaction> {
258255
return await this.readTransaction(async (tx) => {
259-
const first = await tx.executeAsync('SELECT id, tx_id, data FROM ps_crud ORDER BY id ASC LIMIT 1');
256+
const first = await tx.execute('SELECT id, tx_id, data FROM ps_crud ORDER BY id ASC LIMIT 1');
260257

261258
if (!first.rows.length) {
262259
return null;
@@ -267,9 +264,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
267264
if (!txId) {
268265
all = [CrudEntry.fromRow(first.rows.item(0))];
269266
} else {
270-
const result = await tx.executeAsync('SELECT id, tx_id, data FROM ps_crud WHERE tx_id = ? ORDER BY id ASC', [
271-
txId
272-
]);
267+
const result = await tx.execute('SELECT id, tx_id, data FROM ps_crud WHERE tx_id = ? ORDER BY id ASC', [txId]);
273268
all = result.rows._array.map((row) => CrudEntry.fromRow(row));
274269
}
275270

@@ -279,14 +274,14 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
279274
all,
280275
async (writeCheckpoint?: string) => {
281276
await this.writeTransaction(async (tx) => {
282-
await tx.executeAsync('DELETE FROM ps_crud WHERE id <= ?', [last.clientId]);
277+
await tx.execute('DELETE FROM ps_crud WHERE id <= ?', [last.clientId]);
283278
if (writeCheckpoint) {
284-
const check = await tx.executeAsync('SELECT 1 FROM ps_crud LIMIT 1');
279+
const check = await tx.execute('SELECT 1 FROM ps_crud LIMIT 1');
285280
if (!check.rows?.length) {
286-
await tx.executeAsync("UPDATE ps_buckets SET target_op = ? WHERE name='$local'", [writeCheckpoint]);
281+
await tx.execute("UPDATE ps_buckets SET target_op = ? WHERE name='$local'", [writeCheckpoint]);
287282
}
288283
} else {
289-
await tx.executeAsync("UPDATE ps_buckets SET target_op = ? WHERE name='$local'", [
284+
await tx.execute("UPDATE ps_buckets SET target_op = ? WHERE name='$local'", [
290285
this.bucketStorageAdapter.getMaxOpId()
291286
]);
292287
}
@@ -301,36 +296,32 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
301296
* Execute a statement and optionally return results
302297
*/
303298
async execute(sql: string, parameters?: any[]) {
304-
const res = await this.writeLock((tx) => tx.executeAsync(sql, parameters));
305-
return res;
299+
await this.initialized;
300+
return this.database.execute(sql, parameters);
306301
}
307302

308303
/**
309304
* Execute a read-only query and return results
310305
*/
311306
async getAll<T>(sql: string, parameters?: any[]): Promise<T[]> {
312-
const res = await this.readTransaction((tx) => tx.executeAsync(sql, parameters));
313-
return res.rows?._array ?? [];
307+
await this.initialized;
308+
return this.database.getAll(sql, parameters);
314309
}
315310

316311
/**
317312
* Execute a read-only query and return the first result, or null if the ResultSet is empty.
318313
*/
319314
async getOptional<T>(sql: string, parameters?: any[]): Promise<T | null> {
320-
const res = await this.readTransaction((tx) => tx.executeAsync(sql, parameters));
321-
return res.rows?.item(0) ?? null;
315+
await this.initialized;
316+
return this.database.getOptional(sql, parameters);
322317
}
323318

324319
/**
325320
* Execute a read-only query and return the first result, error if the ResultSet is empty.
326321
*/
327322
async get<T>(sql: string, parameters?: any[]): Promise<T> {
328-
const res = await this.readTransaction((tx) => tx.executeAsync(sql, parameters));
329-
const first = res.rows?.item(0);
330-
if (!first) {
331-
throw new Error('Result set is empty');
332-
}
333-
return first;
323+
await this.initialized;
324+
return this.database.get(sql, parameters);
334325
}
335326

336327
/**
@@ -358,28 +349,26 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
358349

359350
async readTransaction<T>(callback: (tx: Transaction) => Promise<T>, lockTimeout?: number): Promise<T> {
360351
await this.initialized;
361-
return this.runLockedTransaction(
362-
AbstractPowerSyncDatabase.transactionMutex,
352+
return this.database.readTransaction(
363353
async (tx) => {
364-
const res = await callback(tx);
365-
await tx.rollbackAsync();
354+
const res = await callback({ ...tx });
355+
await tx.rollback();
366356
return res;
367357
},
368-
lockTimeout
358+
{ timeoutMs: lockTimeout }
369359
);
370360
}
371361

372362
async writeTransaction<T>(callback: (tx: Transaction) => Promise<T>, lockTimeout?: number): Promise<T> {
373363
await this.initialized;
374-
return this.runLockedTransaction(
375-
AbstractPowerSyncDatabase.transactionMutex,
364+
return this.database.writeTransaction(
376365
async (tx) => {
377366
const res = await callback(tx);
378-
await tx.commitAsync();
367+
await tx.commit();
379368
_.defer(() => this.syncStreamImplementation?.triggerCrudUpload());
380369
return res;
381370
},
382-
lockTimeout
371+
{ timeoutMs: lockTimeout }
383372
);
384373
}
385374

@@ -456,27 +445,4 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
456445
return () => dispose();
457446
});
458447
}
459-
460-
private runLockedTransaction<T>(
461-
mutex: Mutex,
462-
callback: (tx: Transaction) => Promise<T>,
463-
lockTimeout?: number
464-
): Promise<T> {
465-
return mutexRunExclusive(
466-
mutex,
467-
() => {
468-
return new Promise<T>(async (resolve, reject) => {
469-
try {
470-
await this.database.transaction(async (tx) => {
471-
const r = await callback(tx);
472-
resolve(r);
473-
});
474-
} catch (ex) {
475-
reject(ex);
476-
}
477-
});
478-
},
479-
{ timeoutMs: lockTimeout }
480-
);
481-
}
482448
}

0 commit comments

Comments
 (0)