Skip to content

Commit 3129507

Browse files
committed
Fix truncate on MongoDB storage.
1 parent ec82bb6 commit 3129507

File tree

2 files changed

+17
-4
lines changed

2 files changed

+17
-4
lines changed

modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -557,7 +557,7 @@ export class MongoBucketBatch
557557
// Note that this is a soft delete.
558558
// We don't specifically need a new or unique op_id here, but it must be greater than the
559559
// last checkpoint, so we use next().
560-
batch.deleteCurrentData(before_key, opSeq.next());
560+
batch.softDeleteCurrentData(before_key, opSeq.next());
561561
}
562562
return result;
563563
}
@@ -992,7 +992,9 @@ export class MongoBucketBatch
992992
while (lastBatchCount == BATCH_LIMIT) {
993993
await this.withReplicationTransaction(`Truncate ${sourceTable.qualifiedName}`, async (session, opSeq) => {
994994
const current_data_filter: mongo.Filter<CurrentDataDocument> = {
995-
_id: idPrefixFilter<SourceKey>({ g: this.group_id, t: sourceTable.id }, ['k'])
995+
_id: idPrefixFilter<SourceKey>({ g: this.group_id, t: sourceTable.id }, ['k']),
996+
// Skip soft-deleted data
997+
pending_delete: { $exists: false }
996998
};
997999

9981000
const cursor = this.db.current_data.find(current_data_filter, {
@@ -1023,7 +1025,8 @@ export class MongoBucketBatch
10231025
sourceKey: value._id.k
10241026
});
10251027

1026-
persistedBatch.deleteCurrentData(value._id, opSeq.next());
1028+
// Since this is not from streaming replication, we can do a hard delete
1029+
persistedBatch.hardDeleteCurrentData(value._id);
10271030
}
10281031
await persistedBatch.flush(this.db, session);
10291032
lastBatchCount = batch.length;

modules/module-mongodb-storage/src/storage/implementation/PersistedBatch.ts

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,17 @@ export class PersistedBatch {
243243
}
244244
}
245245

246-
deleteCurrentData(id: SourceKey, checkpointGreaterThan: bigint) {
246+
hardDeleteCurrentData(id: SourceKey) {
247+
const op: mongo.AnyBulkWriteOperation<CurrentDataDocument> = {
248+
deleteOne: {
249+
filter: { _id: id }
250+
}
251+
};
252+
this.currentData.push(op);
253+
this.currentSize += 50;
254+
}
255+
256+
softDeleteCurrentData(id: SourceKey, checkpointGreaterThan: bigint) {
247257
const op: mongo.AnyBulkWriteOperation<CurrentDataDocument> = {
248258
updateOne: {
249259
filter: { _id: id },

0 commit comments

Comments
 (0)