Skip to content

Commit f09dea3

Browse files
committed
Refactor streaming promise management.
1 parent 85aaccc commit f09dea3

File tree

10 files changed

+102
-118
lines changed

10 files changed

+102
-118
lines changed

modules/module-postgres/src/replication/PostgresSnapshotter.ts

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -344,23 +344,28 @@ export class PostgresSnapshotter {
344344
}
345345

346346
async replicationLoop() {
347-
if (this.queue.size == 0) {
348-
// Special case where we start with no tables to snapshot
349-
await this.markSnapshotDone();
350-
}
351-
while (!this.abort_signal.aborted) {
352-
const table = this.queue.values().next().value;
353-
if (table == null) {
354-
this.initialSnapshotDone.resolve();
355-
await timers.setTimeout(500, { signal: this.abort_signal });
356-
continue;
357-
}
358-
359-
await this.replicateTable(table);
360-
this.queue.delete(table);
347+
try {
361348
if (this.queue.size == 0) {
349+
// Special case where we start with no tables to snapshot
362350
await this.markSnapshotDone();
363351
}
352+
while (!this.abort_signal.aborted) {
353+
const table = this.queue.values().next().value;
354+
if (table == null) {
355+
this.initialSnapshotDone.resolve();
356+
await timers.setTimeout(500, { signal: this.abort_signal });
357+
continue;
358+
}
359+
360+
await this.replicateTable(table);
361+
this.queue.delete(table);
362+
if (this.queue.size == 0) {
363+
await this.markSnapshotDone();
364+
}
365+
}
366+
} catch (e) {
367+
this.initialSnapshotDone.reject(e);
368+
throw e;
364369
}
365370
}
366371

@@ -410,7 +415,7 @@ export class PostgresSnapshotter {
410415
* If (partial) replication was done before on this slot, this clears the state
411416
* and starts again from scratch.
412417
*/
413-
async startReplication(db: pgwire.PgConnection) {
418+
async queueSnapshotTables(db: pgwire.PgConnection) {
414419
const sourceTables = this.sync_rules.getSourceTables();
415420

416421
await this.storage.startBatch(

modules/module-postgres/src/replication/WalStream.ts

Lines changed: 51 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -101,9 +101,10 @@ export class WalStream {
101101

102102
private connections: PgManager;
103103

104-
private abort_signal: AbortSignal;
104+
private abortController = new AbortController();
105+
private abortSignal: AbortSignal = this.abortController.signal;
105106

106-
private streamPromise: Promise<void> | null = null;
107+
private initPromise: Promise<void> | null = null;
107108
private snapshotter: PostgresSnapshotter;
108109

109110
private relationCache = new RelationCache((relation: number | SourceTable) => {
@@ -135,9 +136,16 @@ export class WalStream {
135136
this.slot_name = options.storage.slot_name;
136137
this.connections = options.connections;
137138

138-
this.abort_signal = options.abort_signal;
139-
this.snapshotter = new PostgresSnapshotter(options);
140-
this.abort_signal.addEventListener(
139+
// We wrap in our own abort controller so we can trigger abort internally.
140+
options.abort_signal.addEventListener('abort', () => {
141+
this.abortController.abort();
142+
});
143+
if (options.abort_signal.aborted) {
144+
this.abortController.abort();
145+
}
146+
147+
this.snapshotter = new PostgresSnapshotter({ ...options, abort_signal: this.abortSignal });
148+
this.abortSignal.addEventListener(
141149
'abort',
142150
() => {
143151
if (this.startedStreaming) {
@@ -159,7 +167,7 @@ export class WalStream {
159167
}
160168

161169
get stopped() {
162-
return this.abort_signal.aborted;
170+
return this.abortSignal.aborted;
163171
}
164172

165173
async handleRelation(options: {
@@ -328,52 +336,62 @@ export class WalStream {
328336
return null;
329337
}
330338

339+
/**
340+
* Start replication loop, and continue until aborted or error.
341+
*/
331342
async replicate() {
332343
try {
333-
await this.initReplication();
334-
335-
await this.streamChanges();
344+
this.initPromise = this.initReplication();
345+
await this.initPromise;
346+
const streamPromise = this.streamChanges();
347+
const loopPromise = this.snapshotter.replicationLoop();
348+
await Promise.race([loopPromise, streamPromise]);
336349
} catch (e) {
337350
await this.storage.reportError(e);
338351
throw e;
352+
} finally {
353+
this.abortController.abort();
354+
}
355+
}
356+
357+
public async waitForInitialSnapshot() {
358+
if (this.initPromise == null) {
359+
throw new ReplicationAssertionError('replicate() must be called before waitForInitialSnapshot()');
339360
}
361+
await this.initPromise;
362+
363+
await this.snapshotter.waitForInitialSnapshot();
340364
}
341365

342-
async initReplication() {
366+
/**
367+
* Initialize replication.
368+
* Start replication loop, and continue until aborted, error or initial snapshot completed.
369+
*/
370+
private async initReplication() {
343371
const result = await this.snapshotter.checkSlot();
344372
const db = await this.connections.snapshotConnection();
345373
try {
346374
await this.snapshotter.setupSlot(db, result);
347-
// Trigger here, but we await elsewhere
348-
// TODO: fail on the first error
349-
this.streamChanges().catch((_) => {});
350375
if (result.needsInitialSync) {
351-
await this.snapshotter.startReplication(db);
376+
await this.snapshotter.queueSnapshotTables(db);
352377
}
353-
354-
// FIXME: handle the background promise
355-
this.snapshotter.replicationLoop();
356-
await this.snapshotter.waitForInitialSnapshot();
357378
} finally {
358379
await db.end();
359380
}
360381
}
361382

362-
async streamChanges() {
363-
this.streamPromise ??= (async () => {
364-
const streamReplicationConnection = await this.connections.replicationConnection();
365-
try {
366-
await this.streamChangesInternal(streamReplicationConnection);
367-
} catch (e) {
368-
if (isReplicationSlotInvalidError(e)) {
369-
throw new MissingReplicationSlotError(e.message, e);
370-
}
371-
throw e;
372-
} finally {
373-
await streamReplicationConnection.end();
383+
private async streamChanges() {
384+
const streamReplicationConnection = await this.connections.replicationConnection();
385+
try {
386+
await this.streamChangesInternal(streamReplicationConnection);
387+
} catch (e) {
388+
if (isReplicationSlotInvalidError(e)) {
389+
throw new MissingReplicationSlotError(e.message, e);
374390
}
375-
})();
376-
await this.streamPromise;
391+
throw e;
392+
} finally {
393+
await streamReplicationConnection.end();
394+
}
377395
}
378396

379397
private async streamChangesInternal(replicationConnection: pgwire.PgConnection) {
@@ -451,7 +469,7 @@ export class WalStream {
451469
for await (const chunk of replicationStream.pgoutputDecode()) {
452470
this.touch();
453471

454-
if (this.abort_signal.aborted) {
472+
if (this.abortSignal.aborted) {
455473
break;
456474
}
457475

modules/module-postgres/test/src/checkpoints.test.ts

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,9 @@ const checkpointTests = (factory: TestStorageFactory) => {
3535

3636
await pool.query(`CREATE TABLE test_data(id text primary key, description text, other text)`);
3737

38-
await context.replicateSnapshot();
39-
40-
context.startStreaming();
4138
// Wait for a consistent checkpoint before we start.
42-
await context.getCheckpoint();
39+
await context.initializeReplication();
40+
4341
const storage = context.storage!;
4442

4543
const controller = new AbortController();

modules/module-postgres/test/src/chunked_snapshots.test.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,8 @@ function defineBatchTests(factory: TestStorageFactory) {
142142
await p;
143143

144144
// 5. Logical replication picks up the UPDATE above, but it is missing the TOAST column.
145-
context.startStreaming();
145+
// Note: logical replication now runs concurrently with the snapshot.
146+
// TODO: re-check the test logic here.
146147

147148
// 6. If all went well, the "resnapshot" process would take care of this.
148149
const data = await context.getBucketData('global[]', undefined, {});

modules/module-postgres/test/src/large_batch.test.ts

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,6 @@ function defineBatchTests(factory: storage.TestStorageFactory) {
3939

4040
const start = Date.now();
4141

42-
context.startStreaming();
43-
4442
const checkpoint = await context.getCheckpoint({ timeout: 100_000 });
4543
const duration = Date.now() - start;
4644
const used = Math.round(process.memoryUsage().heapUsed / 1024 / 1024);
@@ -87,7 +85,6 @@ function defineBatchTests(factory: storage.TestStorageFactory) {
8785
const start = Date.now();
8886

8987
await context.replicateSnapshot();
90-
context.startStreaming();
9188

9289
const checkpoint = await context.getCheckpoint({ timeout: 100_000 });
9390
const duration = Date.now() - start;
@@ -139,8 +136,6 @@ function defineBatchTests(factory: storage.TestStorageFactory) {
139136

140137
const start = Date.now();
141138

142-
context.startStreaming();
143-
144139
const checkpoint = await context.getCheckpoint({ timeout: 50_000 });
145140
const duration = Date.now() - start;
146141
const used = Math.round(process.memoryUsage().heapUsed / 1024 / 1024);
@@ -226,8 +221,6 @@ function defineBatchTests(factory: storage.TestStorageFactory) {
226221
});
227222
await context.replicateSnapshot();
228223

229-
context.startStreaming();
230-
231224
const checkpoint = await context.getCheckpoint({ timeout: 50_000 });
232225
const checksum = await context.storage!.getChecksums(checkpoint, ['global[]']);
233226
expect(checksum.get('global[]')!.count).toEqual((numDocs + 2) * 4);

modules/module-postgres/test/src/resuming_snapshots.test.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,6 @@ async function testResumingReplication(factory: TestStorageFactory, stopAfter: n
104104
await context2.loadNextSyncRules();
105105
await context2.replicateSnapshot();
106106

107-
context2.startStreaming();
108107
const data = await context2.getBucketData('global[]', undefined, {});
109108

110109
const deletedRowOps = data.filter((row) => row.object_type == 'test_data2' && row.object_id === String(id1));

modules/module-postgres/test/src/schema_changes.test.ts

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ function defineTests(factory: storage.TestStorageFactory) {
3737
await pool.query(`INSERT INTO test_data(id, description) VALUES('t1', 'test1')`);
3838

3939
await context.replicateSnapshot();
40-
context.startStreaming();
4140

4241
await pool.query(`INSERT INTO test_data(id, description) VALUES('t2', 'test2')`);
4342

@@ -68,7 +67,6 @@ function defineTests(factory: storage.TestStorageFactory) {
6867
const { pool } = context;
6968

7069
await context.replicateSnapshot();
71-
context.startStreaming();
7270

7371
await pool.query(`CREATE TABLE test_data(id text primary key, description text)`);
7472
await pool.query(`INSERT INTO test_data(id, description) VALUES('t1', 'test1')`);
@@ -103,7 +101,6 @@ function defineTests(factory: storage.TestStorageFactory) {
103101
await pool.query(`INSERT INTO test_data_old(id, description) VALUES('t1', 'test1')`);
104102

105103
await context.replicateSnapshot();
106-
context.startStreaming();
107104

108105
await pool.query(
109106
{ statement: `ALTER TABLE test_data_old RENAME TO test_data` },
@@ -143,7 +140,6 @@ function defineTests(factory: storage.TestStorageFactory) {
143140
await pool.query(`INSERT INTO test_data1(id, description) VALUES('t1', 'test1')`);
144141

145142
await context.replicateSnapshot();
146-
context.startStreaming();
147143

148144
await pool.query(
149145
{ statement: `ALTER TABLE test_data1 RENAME TO test_data2` },
@@ -186,7 +182,6 @@ function defineTests(factory: storage.TestStorageFactory) {
186182
await pool.query(`INSERT INTO test_data(id, description) VALUES('t1', 'test1')`);
187183

188184
await context.replicateSnapshot();
189-
context.startStreaming();
190185

191186
await pool.query(
192187
{ statement: `ALTER TABLE test_data RENAME TO test_data_na` },
@@ -216,7 +211,6 @@ function defineTests(factory: storage.TestStorageFactory) {
216211
await pool.query(`INSERT INTO test_data(id, description) VALUES('t1', 'test1')`);
217212

218213
await context.replicateSnapshot();
219-
context.startStreaming();
220214

221215
await pool.query(
222216
{ statement: `ALTER TABLE test_data REPLICA IDENTITY FULL` },
@@ -259,7 +253,6 @@ function defineTests(factory: storage.TestStorageFactory) {
259253
await pool.query(`INSERT INTO test_data(id, description) VALUES('t1', 'test1')`);
260254

261255
await context.replicateSnapshot();
262-
context.startStreaming();
263256

264257
await pool.query(
265258
{ statement: `ALTER TABLE test_data ADD COLUMN other TEXT` },
@@ -301,7 +294,6 @@ function defineTests(factory: storage.TestStorageFactory) {
301294
await pool.query(`INSERT INTO test_data(id, description) VALUES('t1', 'test1')`);
302295

303296
await context.replicateSnapshot();
304-
context.startStreaming();
305297

306298
await pool.query(
307299
{ statement: `ALTER TABLE test_data ALTER COLUMN id TYPE varchar` },
@@ -345,7 +337,6 @@ function defineTests(factory: storage.TestStorageFactory) {
345337
await pool.query(`INSERT INTO test_data(id, description) VALUES('t1', 'test1')`);
346338

347339
await context.replicateSnapshot();
348-
context.startStreaming();
349340

350341
await pool.query(`INSERT INTO test_data(id, description) VALUES('t2', 'test2')`);
351342

@@ -396,7 +387,6 @@ function defineTests(factory: storage.TestStorageFactory) {
396387
await pool.query(`INSERT INTO test_data(id, description) VALUES('t1', 'test1')`);
397388

398389
await context.replicateSnapshot();
399-
context.startStreaming();
400390

401391
await pool.query(`INSERT INTO test_data(id, description) VALUES('t2', 'test2')`);
402392

@@ -440,7 +430,6 @@ function defineTests(factory: storage.TestStorageFactory) {
440430
await pool.query(`INSERT INTO test_other(id, description) VALUES('t1', 'test1')`);
441431

442432
await context.replicateSnapshot();
443-
context.startStreaming();
444433

445434
await pool.query(`INSERT INTO test_other(id, description) VALUES('t2', 'test2')`);
446435

@@ -468,7 +457,6 @@ function defineTests(factory: storage.TestStorageFactory) {
468457
await pool.query(`INSERT INTO test_data(id, description) VALUES('t1', 'test1')`);
469458

470459
await context.replicateSnapshot();
471-
context.startStreaming();
472460

473461
await pool.query(`INSERT INTO test_data(id, description) VALUES('t2', 'test2')`);
474462

@@ -506,7 +494,6 @@ function defineTests(factory: storage.TestStorageFactory) {
506494
await pool.query(`INSERT INTO test_data(id, description) VALUES('t1', 'test1')`);
507495

508496
await context.replicateSnapshot();
509-
context.startStreaming();
510497

511498
await pool.query(`INSERT INTO test_data(id, description) VALUES('t2', 'test2')`);
512499

@@ -555,7 +542,6 @@ function defineTests(factory: storage.TestStorageFactory) {
555542
await pool.query(`INSERT INTO test_data_old(id, num) VALUES('t2', 0)`);
556543

557544
await context.replicateSnapshot();
558-
context.startStreaming();
559545

560546
await pool.query(
561547
{ statement: `ALTER TABLE test_data_old RENAME TO test_data` },
@@ -627,7 +613,6 @@ config:
627613
await pool.query(`INSERT INTO test_data(id) VALUES ('t1')`);
628614

629615
await context.replicateSnapshot();
630-
context.startStreaming();
631616

632617
await pool.query(
633618
{ statement: `CREATE TYPE composite AS (foo bool, bar int4);` },

0 commit comments

Comments
 (0)