Skip to content

Commit ce500a1

Browse files
committed
Refactor commit logic to better handle concurrent replication.
1 parent df0d4cb commit ce500a1

File tree

4 files changed

+154
-131
lines changed

4 files changed

+154
-131
lines changed

modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts

Lines changed: 76 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -661,32 +661,7 @@ export class MongoBucketBatch
661661

662662
await this.flush(options);
663663

664-
if (this.last_checkpoint_lsn != null && lsn < this.last_checkpoint_lsn) {
665-
// When re-applying transactions, don't create a new checkpoint until
666-
// we are past the last transaction.
667-
this.logger.info(`Re-applied transaction ${lsn} - skipping checkpoint`);
668-
// Cannot create a checkpoint yet - return false
669-
return false;
670-
}
671-
672-
if (!createEmptyCheckpoints && this.persisted_op == null) {
673-
// Nothing to commit - also return true
674-
await this.autoActivate(lsn);
675-
return true;
676-
}
677-
678664
const now = new Date();
679-
const update: Partial<SyncRuleDocument> = {
680-
last_checkpoint_lsn: lsn,
681-
last_checkpoint_ts: now,
682-
last_keepalive_ts: now,
683-
last_fatal_error: null,
684-
keepalive_op: null
685-
};
686-
687-
if (this.persisted_op != null) {
688-
update.last_checkpoint = this.persisted_op;
689-
}
690665

691666
// Mark relevant write checkpoints as "processed".
692667
// This makes it easier to identify write checkpoints that are "valid" in order.
@@ -705,58 +680,101 @@ export class MongoBucketBatch
705680
}
706681
);
707682

708-
const updateResult = await this.db.sync_rules.updateOne(
683+
const updateResult = await this.db.sync_rules.findOneAndUpdate(
709684
{
710-
_id: this.group_id,
711-
snapshot_done: true,
712-
$or: [{ no_checkpoint_before: null }, { no_checkpoint_before: { $lte: lsn } }]
685+
_id: this.group_id
713686
},
714687
[
715688
{
716689
$set: {
717-
last_checkpoint_lsn: { $literal: lsn },
718-
last_checkpoint_ts: { $literal: now },
690+
_can_checkpoint: {
691+
$and: [
692+
{ $eq: ['$snapshot_done', true] },
693+
{
694+
$or: [{ $eq: ['$last_checkpoint_lsn', null] }, { $lte: ['$last_checkpoint_lsn', { $literal: lsn }] }]
695+
},
696+
{
697+
$or: [
698+
{ $eq: ['$no_checkpoint_before', null] },
699+
{ $lte: ['$no_checkpoint_before', { $literal: lsn }] }
700+
]
701+
}
702+
]
703+
}
704+
}
705+
},
706+
{
707+
$set: {
708+
last_checkpoint_lsn: {
709+
$cond: ['$_can_checkpoint', { $literal: lsn }, '$last_checkpoint_lsn']
710+
},
711+
last_checkpoint_ts: {
712+
$cond: ['$_can_checkpoint', { $literal: now }, '$last_checkpoint_ts']
713+
},
719714
last_keepalive_ts: { $literal: now },
720-
last_fatal_error: { $literal: null },
721-
keepalive_op: { $literal: null },
715+
last_fatal_error: {
716+
$cond: ['$_can_checkpoint', { $literal: null }, '$last_fatal_error']
717+
},
718+
keepalive_op: {
719+
$cond: [
720+
'$_can_checkpoint',
721+
// Checkpoint: set to null
722+
{ $literal: null },
723+
// Not checkpoint: update to max of existing keepalive_op and persisted_op
724+
{
725+
$toString: {
726+
$max: [{ $toLong: '$keepalive_op' }, { $literal: this.persisted_op }]
727+
}
728+
}
729+
]
730+
},
722731
last_checkpoint: {
723-
$max: ['$last_checkpoint', { $literal: this.persisted_op }, { $toLong: '$keepalive_op' }]
732+
$cond: [
733+
'$_can_checkpoint',
734+
{
735+
$max: ['$last_checkpoint', { $literal: this.persisted_op }, { $toLong: '$keepalive_op' }]
736+
},
737+
'$last_checkpoint'
738+
]
724739
}
725740
}
741+
},
742+
{
743+
$unset: '_can_checkpoint'
726744
}
727745
],
728-
{ session: this.session }
746+
{
747+
session: this.session,
748+
returnDocument: 'after',
749+
projection: {
750+
snapshot_done: 1,
751+
last_checkpoint_lsn: 1,
752+
no_checkpoint_before: 1,
753+
keepalive_op: 1
754+
}
755+
}
729756
);
730-
if (updateResult.matchedCount == 0) {
757+
if (updateResult == null) {
758+
throw new ReplicationAssertionError('Failed to load sync_rules document during checkpoint update');
759+
}
760+
const checkpointCreated = updateResult.snapshot_done === true && updateResult.last_checkpoint_lsn === lsn;
761+
762+
if (!checkpointCreated) {
731763
// Failed on snapshot_done or no_checkpoint_before.
732-
if (Date.now() - this.lastWaitingLogThottled > 5_000) {
764+
if (Date.now() - this.lastWaitingLogThottled > 5_000 || true) {
733765
this.logger.info(
734-
`Waiting until before creating checkpoint, currently at ${lsn}. Persisted op: ${this.persisted_op}`
735-
);
736-
this.lastWaitingLogThottled = Date.now();
737-
}
738-
739-
if (this.persisted_op != null) {
740-
await this.db.sync_rules.updateOne(
741-
{
742-
_id: this.group_id
743-
},
744-
[
745-
// KLUDGE: the string format is a pain here, not sure why we ever had it as a string
766+
`Waiting before creating checkpoint, currently at ${lsn}. Persisted op: ${this.persisted_op}. Current state: ${JSON.stringify(
746767
{
747-
$set: {
748-
keepalive_op: {
749-
$toString: {
750-
$max: [{ $toLong: '$keepalive_op' }, { $literal: this.persisted_op }]
751-
}
752-
}
753-
}
768+
snapshot_done: updateResult.snapshot_done,
769+
last_checkpoint_lsn: updateResult.last_checkpoint_lsn,
770+
no_checkpoint_before: updateResult.no_checkpoint_before
754771
}
755-
],
756-
{ session: this.session }
772+
)}`
757773
);
774+
this.lastWaitingLogThottled = Date.now();
758775
}
759776
} else {
777+
this.logger.info(`Created checkpoint at ${lsn}. Persisted op: ${this.persisted_op}`);
760778
await this.autoActivate(lsn);
761779
await this.db.notifyCheckpoint();
762780
this.persisted_op = null;

modules/module-postgres/src/replication/PostgresSnapshotter.ts

Lines changed: 8 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,6 @@ export class PostgresSnapshotter {
190190
objectId: relid,
191191
replicaIdColumns: cresult.replicationColumns
192192
} as SourceEntityDescriptor,
193-
snapshot: false,
194193
referencedTypeIds: columnTypes
195194
});
196195

@@ -329,8 +328,12 @@ export class PostgresSnapshotter {
329328
},
330329
async (batch) => {
331330
await this.snapshotTableInTx(batch, db, table);
331+
// This commit ensures we set keepalive_op.
332+
// It may be better if that is automatically set when flushing.
333+
await batch.commit(ZERO_LSN);
332334
}
333335
);
336+
this.logger.info(`Flushed snapshot at ${flushResults?.flushed_op}`);
334337
} finally {
335338
await db.end();
336339
}
@@ -442,7 +445,8 @@ export class PostgresSnapshotter {
442445
}
443446
}
444447

445-
public async queueSnapshot(table: storage.SourceTable) {
448+
public async queueSnapshot(batch: storage.BucketStorageBatch, table: storage.SourceTable) {
449+
await batch.markTableSnapshotRequired(table);
446450
this.queue.add(table);
447451
}
448452

@@ -477,6 +481,7 @@ export class PostgresSnapshotter {
477481
tableLsnNotBefore = rs.rows[0][0];
478482
// Side note: A ROLLBACK would probably also be fine here, since we only read in this transaction.
479483
await db.query('COMMIT');
484+
this.logger.info(`Snapshot complete for table ${table.qualifiedName}, resume at ${tableLsnNotBefore}`);
480485
const [resultTable] = await batch.markTableSnapshotDone([table], tableLsnNotBefore);
481486
this.relationCache.update(resultTable);
482487
return resultTable;
@@ -614,10 +619,9 @@ export class PostgresSnapshotter {
614619
async handleRelation(options: {
615620
batch: storage.BucketStorageBatch;
616621
descriptor: SourceEntityDescriptor;
617-
snapshot: boolean;
618622
referencedTypeIds: number[];
619623
}) {
620-
const { batch, descriptor, snapshot, referencedTypeIds } = options;
624+
const { batch, descriptor, referencedTypeIds } = options;
621625

622626
if (!descriptor.objectId && typeof descriptor.objectId != 'number') {
623627
throw new ReplicationAssertionError(`objectId expected, got ${typeof descriptor.objectId}`);
@@ -637,31 +641,6 @@ export class PostgresSnapshotter {
637641
// Ensure we have a description for custom types referenced in the table.
638642
await this.connections.types.fetchTypes(referencedTypeIds);
639643

640-
// Snapshot if:
641-
// 1. Snapshot is requested (false for initial snapshot, since that process handles it elsewhere)
642-
// 2. Snapshot is not already done, AND:
643-
// 3. The table is used in sync rules.
644-
const shouldSnapshot = snapshot && !result.table.snapshotComplete && result.table.syncAny;
645-
646-
if (shouldSnapshot) {
647-
// Truncate this table, in case a previous snapshot was interrupted.
648-
await batch.truncate([result.table]);
649-
650-
// Start the snapshot inside a transaction.
651-
// We use a dedicated connection for this.
652-
const db = await this.connections.snapshotConnection();
653-
try {
654-
const table = await this.snapshotTableInTx(batch, db, result.table);
655-
// After the table snapshot, we wait for replication to catch up.
656-
// To make sure there is actually something to replicate, we send a keepalive
657-
// message.
658-
await sendKeepAlive(db);
659-
return table;
660-
} finally {
661-
await db.end();
662-
}
663-
}
664-
665644
return result.table;
666645
}
667646

modules/module-postgres/src/replication/WalStream.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,10 @@ export class WalStream {
183183
this.relationCache.update(result.table);
184184

185185
// Drop conflicting tables. This includes for example renamed tables.
186-
await batch.drop(result.dropTables);
186+
if (result.dropTables.length > 0) {
187+
this.logger.info(`Dropping conflicting tables: ${result.dropTables.map((t) => t.qualifiedName).join(', ')}`);
188+
await batch.drop(result.dropTables);
189+
}
187190

188191
// Ensure we have a description for custom types referenced in the table.
189192
await this.connections.types.fetchTypes(referencedTypeIds);
@@ -195,7 +198,8 @@ export class WalStream {
195198
const shouldSnapshot = snapshot && !result.table.snapshotComplete && result.table.syncAny;
196199

197200
if (shouldSnapshot) {
198-
await this.snapshotter.queueSnapshot(result.table);
201+
this.logger.info(`Queuing snapshot for new table ${result.table.qualifiedName}`);
202+
await this.snapshotter.queueSnapshot(batch, result.table);
199203
}
200204

201205
return result.table;

0 commit comments

Comments
 (0)