Skip to content

Commit 29f4302

Browse files
committed
Fix snapshot_lsn handling.
1 parent 333bdad commit 29f4302

File tree

3 files changed

+14
-8
lines changed

3 files changed

+14
-8
lines changed

modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -736,6 +736,10 @@ export class MongoBucketBatch
736736
},
737737
'$last_checkpoint'
738738
]
739+
},
740+
// Unset snapshot_lsn on checkpoint
741+
snapshot_lsn: {
742+
$cond: ['$_can_checkpoint', { $literal: null }, '$snapshot_lsn']
739743
}
740744
}
741745
},
@@ -1025,9 +1029,6 @@ export class MongoBucketBatch
10251029
},
10261030
$max: {
10271031
no_checkpoint_before: no_checkpoint_before_lsn
1028-
},
1029-
$unset: {
1030-
snapshot_lsn: 1
10311032
}
10321033
},
10331034
{ session: this.session }

modules/module-mongodb/test/src/change_stream_utils.ts

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import { clearTestDb, TEST_CONNECTION_OPTIONS } from './util.js';
2222
export class ChangeStreamTestContext {
2323
private _walStream?: ChangeStream;
2424
private abortController = new AbortController();
25-
private streamPromise?: Promise<void>;
25+
private streamPromise?: Promise<any>;
2626
public storage?: SyncRulesBucketStorage;
2727

2828
/**
@@ -103,7 +103,7 @@ export class ChangeStreamTestContext {
103103
return this.storage!;
104104
}
105105

106-
get walStream() {
106+
get streamer() {
107107
if (this.storage == null) {
108108
throw new Error('updateSyncRules() first');
109109
}
@@ -125,7 +125,7 @@ export class ChangeStreamTestContext {
125125
}
126126

127127
async replicateSnapshot() {
128-
await this.walStream.initReplication();
128+
await this.streamer.initReplication();
129129
}
130130

131131
/**
@@ -143,13 +143,17 @@ export class ChangeStreamTestContext {
143143
}
144144

145145
startStreaming() {
146-
return (this.streamPromise = this.walStream.streamChanges());
146+
this.streamPromise = this.streamer.streamChanges().catch((e) => e);
147147
}
148148

149149
async getCheckpoint(options?: { timeout?: number }) {
150150
let checkpoint = await Promise.race([
151151
getClientCheckpoint(this.client, this.db, this.factory, { timeout: options?.timeout ?? 15_000 }),
152-
this.streamPromise
152+
this.streamPromise?.then((e) => {
153+
if (e != null) {
154+
throw e;
155+
}
156+
})
153157
]);
154158
if (checkpoint == null) {
155159
// This indicates an issue with the test setup - streamingPromise completed instead

packages/service-core-tests/src/tests/register-data-storage-data-tests.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1085,6 +1085,7 @@ bucket_definitions:
10851085
const r = await f.configureSyncRules({ content: 'bucket_definitions: {}', validate: false });
10861086
const storage = f.getInstance(r.persisted_sync_rules!);
10871087
await storage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => {
1088+
await batch.markAllSnapshotDone('1/0');
10881089
await batch.keepalive('1/0');
10891090
});
10901091

0 commit comments

Comments
 (0)