Skip to content

Commit b68ffa6

Browse files
committed
Improve handling of abort errors.
1 parent d3bfdd6 commit b68ffa6

File tree

3 files changed

+27
-9
lines changed

3 files changed

+27
-9
lines changed

modules/module-postgres/src/replication/PostgresSnapshotter.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -363,7 +363,7 @@ export class PostgresSnapshotter {
363363
await this.markSnapshotDone();
364364
}
365365
}
366-
throw new ReplicationAbortedError();
366+
throw new ReplicationAbortedError(`Replication loop aborted`, this.abortSignal.reason);
367367
} catch (e) {
368368
// If initial snapshot already completed, this has no effect
369369
this.initialSnapshotDone.reject(e);
@@ -622,7 +622,7 @@ export class PostgresSnapshotter {
622622

623623
if (this.abortSignal.aborted) {
624624
// We only abort after flushing
625-
throw new ReplicationAbortedError(`Table snapshot interrupted`);
625+
throw new ReplicationAbortedError(`Table snapshot interrupted`, this.abortSignal.reason);
626626
}
627627
}
628628
}

modules/module-postgres/src/replication/WalStream.ts

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -347,12 +347,23 @@ export class WalStream {
347347
this.initPromise = this.initReplication();
348348
await this.initPromise;
349349
// These Promises are both expected to run until aborted or error.
350-
streamPromise = this.streamChanges().finally(() => {
351-
this.abortController.abort();
352-
});
353-
loopPromise = this.snapshotter.replicationLoop().finally(() => {
354-
this.abortController.abort();
355-
});
350+
streamPromise = this.streamChanges()
351+
.then(() => {
352+
throw new ReplicationAssertionError(`Replication stream exited unexpectedly`);
353+
})
354+
.catch((e) => {
355+
this.abortController.abort(e);
356+
throw e;
357+
});
358+
loopPromise = this.snapshotter
359+
.replicationLoop()
360+
.then(() => {
361+
throw new ReplicationAssertionError(`Replication snapshotter exited unexpectedly`);
362+
})
363+
.catch((e) => {
364+
this.abortController.abort(e);
365+
throw e;
366+
});
356367
const results = await Promise.allSettled([loopPromise, streamPromise]);
357368
// First, prioritize non-aborted errors
358369
for (let result of results) {
@@ -366,16 +377,21 @@ export class WalStream {
366377
throw result.reason;
367378
}
368379
}
380+
369381
// If we get here, both Promises completed successfully, which is unexpected.
370382
throw new ReplicationAssertionError(`Replication loop exited unexpectedly`);
371383
} catch (e) {
372384
await this.storage.reportError(e);
373385
throw e;
374386
} finally {
387+
// Just to make sure
375388
this.abortController.abort();
376389
}
377390
}
378391

392+
/**
393+
* For tests: Wait until the initial snapshot is complete.
394+
*/
379395
public async waitForInitialSnapshot() {
380396
if (this.initPromise == null) {
381397
throw new ReplicationAssertionError('replicate() must be called before waitForInitialSnapshot()');

packages/service-errors/src/errors.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,11 +151,13 @@ export class ServiceAssertionError extends ServiceError {
151151
export class ReplicationAbortedError extends ServiceError {
152152
static readonly CODE = ErrorCode.PSYNC_S1103;
153153

154-
constructor(description?: string) {
154+
constructor(description?: string, cause?: any) {
155155
super({
156156
code: ReplicationAbortedError.CODE,
157157
description: description ?? 'Replication aborted'
158158
});
159+
160+
this.cause = cause;
159161
}
160162
}
161163

0 commit comments

Comments
 (0)