File tree Expand file tree Collapse file tree 4 files changed +4
-14
lines changed
module-mongodb-storage/src/storage/implementation
module-mssql/src/replication
module-postgres-storage/src/storage/batch
packages/service-core/src/storage Expand file tree Collapse file tree 4 files changed +4
-14
lines changed Original file line number Diff line number Diff line change @@ -146,10 +146,6 @@ export class MongoBucketBatch
146146 return this . last_checkpoint_lsn ;
147147 }
148148
149- get noCheckpointBeforeLsn ( ) {
150- return this . no_checkpoint_before_lsn ;
151- }
152-
153149 async flush ( options ?: storage . BatchBucketFlushOptions ) : Promise < storage . FlushedResult | null > {
154150 let result : storage . FlushedResult | null = null ;
155151 // One flush may be split over multiple transactions.
Original file line number Diff line number Diff line change @@ -304,7 +304,7 @@ export class CDCStream {
304304 const postSnapshotLSN = await getLatestLSN ( this . connections ) ;
305305 // Side note: A ROLLBACK would probably also be fine here, since we only read in this transaction.
306306 await transaction . commit ( ) ;
307- const [ updatedSourceTable ] = await batch . markSnapshotDone ( [ table . sourceTable ] , postSnapshotLSN . toString ( ) ) ;
307+ const [ updatedSourceTable ] = await batch . markTableSnapshotDone ( [ table . sourceTable ] , postSnapshotLSN . toString ( ) ) ;
308308 this . tableCache . updateSourceTable ( updatedSourceTable ) ;
309309 } catch ( e ) {
310310 await transaction . rollback ( ) ;
@@ -500,11 +500,11 @@ export class CDCStream {
500500
501501 // This will not create a consistent checkpoint yet, but will persist the op.
502502 // Actual checkpoint will be created when streaming replication caught up.
503+ const postSnapshotLSN = await getLatestLSN ( this . connections ) ;
504+ await batch . markAllSnapshotDone ( postSnapshotLSN . toString ( ) ) ;
503505 await batch . commit ( snapshotLSN ) ;
504506
505- this . logger . info (
506- `Snapshot done. Need to replicate from ${ snapshotLSN } to ${ batch . noCheckpointBeforeLsn } to be consistent`
507- ) ;
507+ this . logger . info ( `Snapshot done. Need to replicate from ${ snapshotLSN } to ${ postSnapshotLSN } to be consistent` ) ;
508508 }
509509 ) ;
510510 }
Original file line number Diff line number Diff line change @@ -111,10 +111,6 @@ export class PostgresBucketBatch
111111 return this . last_checkpoint_lsn ;
112112 }
113113
114- get noCheckpointBeforeLsn ( ) {
115- return this . no_checkpoint_before_lsn ;
116- }
117-
118114 async [ Symbol . asyncDispose ] ( ) {
119115 super . clearListeners ( ) ;
120116 }
Original file line number Diff line number Diff line change @@ -87,8 +87,6 @@ export interface BucketStorageBatch extends ObserverClient<BucketBatchStorageLis
8787 markTableSnapshotRequired ( table : SourceTable ) : Promise < void > ;
8888 markAllSnapshotDone ( no_checkpoint_before_lsn : string ) : Promise < void > ;
8989
90- noCheckpointBeforeLsn : string ;
91-
9290 updateTableProgress ( table : SourceTable , progress : Partial < TableSnapshotStatus > ) : Promise < SourceTable > ;
9391
9492 /**
You can’t perform that action at this time.
0 commit comments