Skip to content

Commit 3c77f43

Browse files
committed
Fix postgres storage truncate; typed SourceTable id.
1 parent 3129507 commit 3c77f43

File tree

9 files changed

+128
-47
lines changed

9 files changed

+128
-47
lines changed

modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import { MongoIdSequence } from './MongoIdSequence.js';
2929
import { batchCreateCustomWriteCheckpoints } from './MongoWriteCheckpointAPI.js';
3030
import { cacheKey, OperationBatch, RecordOperation } from './OperationBatch.js';
3131
import { PersistedBatch } from './PersistedBatch.js';
32-
import { idPrefixFilter } from '../../utils/util.js';
32+
import { idPrefixFilter, mongoTableId } from '../../utils/util.js';
3333

3434
/**
3535
* 15MB
@@ -211,7 +211,7 @@ export class MongoBucketBatch
211211
// the order of processing, which then becomes really tricky to manage.
212212
// This now takes 2+ queries, but doesn't have any issues with order of operations.
213213
const sizeLookups: SourceKey[] = batch.batch.map((r) => {
214-
return { g: this.group_id, t: r.record.sourceTable.id, k: r.beforeId };
214+
return { g: this.group_id, t: mongoTableId(r.record.sourceTable.id), k: r.beforeId };
215215
});
216216

217217
sizes = new Map<string, number>();
@@ -252,7 +252,7 @@ export class MongoBucketBatch
252252
continue;
253253
}
254254
const lookups: SourceKey[] = b.map((r) => {
255-
return { g: this.group_id, t: r.record.sourceTable.id, k: r.beforeId };
255+
return { g: this.group_id, t: mongoTableId(r.record.sourceTable.id), k: r.beforeId };
256256
});
257257
let current_data_lookup = new Map<string, CurrentDataDocument>();
258258
// With skipExistingRows, we only need to know whether or not the row exists.
@@ -326,7 +326,7 @@ export class MongoBucketBatch
326326
let existing_lookups: bson.Binary[] = [];
327327
let new_lookups: bson.Binary[] = [];
328328

329-
const before_key: SourceKey = { g: this.group_id, t: record.sourceTable.id, k: beforeId };
329+
const before_key: SourceKey = { g: this.group_id, t: mongoTableId(record.sourceTable.id), k: beforeId };
330330

331331
if (this.skipExistingRows) {
332332
if (record.tag == SaveOperationTag.INSERT) {
@@ -538,7 +538,7 @@ export class MongoBucketBatch
538538
// 5. TOAST: Update current data and bucket list.
539539
if (afterId) {
540540
// Insert or update
541-
const after_key: SourceKey = { g: this.group_id, t: sourceTable.id, k: afterId };
541+
const after_key: SourceKey = { g: this.group_id, t: sourceTable.id as bson.ObjectId, k: afterId };
542542
batch.upsertCurrentData(after_key, {
543543
data: afterData,
544544
buckets: new_buckets,
@@ -957,7 +957,7 @@ export class MongoBucketBatch
957957

958958
await this.withTransaction(async () => {
959959
for (let table of sourceTables) {
960-
await this.db.source_tables.deleteOne({ _id: table.id });
960+
await this.db.source_tables.deleteOne({ _id: mongoTableId(table.id) });
961961
}
962962
});
963963
return result;
@@ -992,7 +992,7 @@ export class MongoBucketBatch
992992
while (lastBatchCount == BATCH_LIMIT) {
993993
await this.withReplicationTransaction(`Truncate ${sourceTable.qualifiedName}`, async (session, opSeq) => {
994994
const current_data_filter: mongo.Filter<CurrentDataDocument> = {
995-
_id: idPrefixFilter<SourceKey>({ g: this.group_id, t: sourceTable.id }, ['k']),
995+
_id: idPrefixFilter<SourceKey>({ g: this.group_id, t: mongoTableId(sourceTable.id) }, ['k']),
996996
// Skip soft-deleted data
997997
pending_delete: { $exists: false }
998998
};
@@ -1052,7 +1052,7 @@ export class MongoBucketBatch
10521052

10531053
await this.withTransaction(async () => {
10541054
await this.db.source_tables.updateOne(
1055-
{ _id: table.id },
1055+
{ _id: mongoTableId(table.id) },
10561056
{
10571057
$set: {
10581058
snapshot_status: {
@@ -1103,7 +1103,7 @@ export class MongoBucketBatch
11031103

11041104
async markTableSnapshotDone(tables: storage.SourceTable[], no_checkpoint_before_lsn?: string) {
11051105
const session = this.session;
1106-
const ids = tables.map((table) => table.id);
1106+
const ids = tables.map((table) => mongoTableId(table.id));
11071107

11081108
await this.withTransaction(async () => {
11091109
await this.db.source_tables.updateMany(

modules/module-mongodb-storage/src/storage/implementation/OperationBatch.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { ToastableSqliteRow } from '@powersync/service-sync-rules';
22
import * as bson from 'bson';
33

44
import { storage } from '@powersync/service-core';
5+
import { mongoTableId } from '../storage-index.js';
56

67
/**
78
* Maximum number of operations in a batch.
@@ -86,8 +87,8 @@ export class RecordOperation {
8687
const beforeId = record.beforeReplicaId ?? record.afterReplicaId;
8788
this.afterId = afterId;
8889
this.beforeId = beforeId;
89-
this.internalBeforeKey = cacheKey(record.sourceTable.id, beforeId);
90-
this.internalAfterKey = afterId ? cacheKey(record.sourceTable.id, afterId) : null;
90+
this.internalBeforeKey = cacheKey(mongoTableId(record.sourceTable.id), beforeId);
91+
this.internalAfterKey = afterId ? cacheKey(mongoTableId(record.sourceTable.id), afterId) : null;
9192

9293
this.estimatedSize = estimateRowSize(record.before) + estimateRowSize(record.after);
9394
}

modules/module-mongodb-storage/src/storage/implementation/PersistedBatch.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import {
1616
CurrentDataDocument,
1717
SourceKey
1818
} from './models.js';
19-
import { replicaIdToSubkey } from '../../utils/util.js';
19+
import { mongoTableId, replicaIdToSubkey } from '../../utils/util.js';
2020

2121
/**
2222
* Maximum size of operations we write in a single transaction.
@@ -132,7 +132,7 @@ export class PersistedBatch {
132132
o: op_id
133133
},
134134
op: 'PUT',
135-
source_table: options.table.id,
135+
source_table: mongoTableId(options.table.id),
136136
source_key: options.sourceKey,
137137
table: k.table,
138138
row_id: k.id,
@@ -159,7 +159,7 @@ export class PersistedBatch {
159159
o: op_id
160160
},
161161
op: 'REMOVE',
162-
source_table: options.table.id,
162+
source_table: mongoTableId(options.table.id),
163163
source_key: options.sourceKey,
164164
table: bd.table,
165165
row_id: bd.id,
@@ -208,7 +208,7 @@ export class PersistedBatch {
208208
_id: op_id,
209209
key: {
210210
g: this.group_id,
211-
t: sourceTable.id,
211+
t: mongoTableId(sourceTable.id),
212212
k: sourceKey
213213
},
214214
lookup: binLookup,
@@ -230,7 +230,7 @@ export class PersistedBatch {
230230
_id: op_id,
231231
key: {
232232
g: this.group_id,
233-
t: sourceTable.id,
233+
t: mongoTableId(sourceTable.id),
234234
k: sourceKey
235235
},
236236
lookup: lookup,

modules/module-mongodb-storage/src/utils/util.ts

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,17 +92,32 @@ export function mapOpEntry(row: BucketDataDocument): utils.OplogEntry {
9292
}
9393
}
9494

95-
export function replicaIdToSubkey(table: bson.ObjectId, id: storage.ReplicaId): string {
95+
export function replicaIdToSubkey(table: storage.SourceTableId, id: storage.ReplicaId): string {
9696
if (storage.isUUID(id)) {
9797
// Special case for UUID for backwards-compatiblity
98-
return `${table.toHexString()}/${id.toHexString()}`;
98+
return `${tableIdString(table)}/${id.toHexString()}`;
9999
} else {
100100
// Hashed UUID from the table and id
101101
const repr = bson.serialize({ table, id });
102102
return uuid.v5(repr, utils.ID_NAMESPACE);
103103
}
104104
}
105105

106+
export function mongoTableId(table: storage.SourceTableId): bson.ObjectId {
107+
if (typeof table == 'string') {
108+
throw new ServiceAssertionError(`Got string table id, expected ObjectId`);
109+
}
110+
return table;
111+
}
112+
113+
function tableIdString(table: storage.SourceTableId) {
114+
if (typeof table == 'string') {
115+
return table;
116+
} else {
117+
return table.toHexString();
118+
}
119+
}
120+
106121
export function setSessionSnapshotTime(session: mongo.ClientSession, time: bson.Timestamp) {
107122
// This is a workaround for the lack of direct support for snapshot reads in the MongoDB driver.
108123
if (!session.snapshotEnabled) {

modules/module-postgres-storage/src/storage/batch/OperationBatch.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
import { storage, utils } from '@powersync/service-core';
77
import { RequiredOperationBatchLimits } from '../../types/types.js';
8+
import { postgresTableId } from './PostgresPersistedBatch.js';
89

910
/**
1011
* Batch of input operations.
@@ -89,13 +90,13 @@ export class RecordOperation {
8990
/**
9091
* In-memory cache key - must not be persisted.
9192
*/
92-
export function cacheKey(sourceTableId: string, id: storage.ReplicaId) {
93+
export function cacheKey(sourceTableId: storage.SourceTableId, id: storage.ReplicaId) {
9394
return encodedCacheKey(sourceTableId, storage.serializeReplicaId(id));
9495
}
9596

9697
/**
9798
* Calculates a cache key for a stored ReplicaId. This is usually stored as a bytea/Buffer.
9899
*/
99-
export function encodedCacheKey(sourceTableId: string, storedKey: Buffer) {
100-
return `${sourceTableId}.${storedKey.toString('base64')}`;
100+
export function encodedCacheKey(sourceTableId: storage.SourceTableId, storedKey: Buffer) {
101+
return `${postgresTableId(sourceTableId)}.${storedKey.toString('base64')}`;
101102
}

modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import { NOTIFICATION_CHANNEL, sql } from '../../utils/db.js';
1919
import { pick } from '../../utils/ts-codec.js';
2020
import { batchCreateCustomWriteCheckpoints } from '../checkpoints/PostgresWriteCheckpointAPI.js';
2121
import { cacheKey, encodedCacheKey, OperationBatch, RecordOperation } from './OperationBatch.js';
22-
import { PostgresPersistedBatch } from './PostgresPersistedBatch.js';
22+
import { PostgresPersistedBatch, postgresTableId } from './PostgresPersistedBatch.js';
2323
import { bigint } from '../../types/codecs.js';
2424

2525
export interface PostgresBucketBatchOptions {
@@ -196,6 +196,7 @@ export class PostgresBucketBatch
196196
WHERE
197197
group_id = ${{ type: 'int4', value: this.group_id }}
198198
AND source_table = ${{ type: 'varchar', value: sourceTable.id }}
199+
AND pending_delete IS NULL
199200
LIMIT
200201
${{ type: 'int4', value: BATCH_LIMIT }}
201202
FOR NO KEY UPDATE
@@ -220,7 +221,9 @@ export class PostgresBucketBatch
220221
persistedBatch.deleteCurrentData({
221222
// This is serialized since we got it from a DB query
222223
serialized_source_key: value.source_key,
223-
source_table_id: sourceTable.id
224+
source_table_id: postgresTableId(sourceTable.id),
225+
// No need for soft delete, since this is not streaming replication
226+
soft: false
224227
});
225228
}
226229
}
@@ -630,7 +633,7 @@ export class PostgresBucketBatch
630633
// exceeding memory limits.
631634
const sizeLookups = batch.batch.map((r) => {
632635
return {
633-
source_table: r.record.sourceTable.id.toString(),
636+
source_table: postgresTableId(r.record.sourceTable.id),
634637
/**
635638
* Encode to hex in order to pass a jsonb
636639
*/
@@ -1012,7 +1015,7 @@ export class PostgresBucketBatch
10121015
source_key: afterId,
10131016
group_id: this.group_id,
10141017
data: afterData!,
1015-
source_table: sourceTable.id,
1018+
source_table: postgresTableId(sourceTable.id),
10161019
buckets: newBuckets,
10171020
lookups: newLookups,
10181021
pending_delete: null
@@ -1023,8 +1026,9 @@ export class PostgresBucketBatch
10231026
if (afterId == null || !storage.replicaIdEquals(beforeId, afterId)) {
10241027
// Either a delete (afterId == null), or replaced the old replication id
10251028
persistedBatch.deleteCurrentData({
1026-
source_table_id: sourceTable.id,
1027-
source_key: beforeId!
1029+
source_table_id: postgresTableId(sourceTable.id),
1030+
source_key: beforeId!,
1031+
soft: true
10281032
});
10291033
}
10301034

0 commit comments

Comments
 (0)