Skip to content

Commit ec82bb6

Browse files
committed
More abort error improvements.
1 parent 8cc27bd commit ec82bb6

File tree

4 files changed

+10
-5
lines changed

4 files changed

+10
-5
lines changed

modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -561,7 +561,7 @@ export class MongoSyncBucketStorage
561561
async clear(options?: storage.ClearStorageOptions): Promise<void> {
562562
while (true) {
563563
if (options?.signal?.aborted) {
564-
throw new ReplicationAbortedError('Aborted clearing data');
564+
throw new ReplicationAbortedError('Aborted clearing data', options.signal.reason);
565565
}
566566
try {
567567
await this.clearIteration();

modules/module-mongodb/src/replication/ChangeStream.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -497,7 +497,7 @@ export class ChangeStream {
497497
}
498498

499499
if (this.abort_signal.aborted) {
500-
throw new ReplicationAbortedError(`Aborted initial replication`);
500+
throw new ReplicationAbortedError(`Aborted initial replication`, this.abort_signal.reason);
501501
}
502502

503503
// Pre-fetch next batch, so that we can read and write concurrently

modules/module-mysql/src/replication/BinLogStream.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -324,7 +324,10 @@ export class BinLogStream {
324324

325325
for await (let row of stream) {
326326
if (this.stopped) {
327-
throw new ReplicationAbortedError('Abort signal received - initial replication interrupted.');
327+
throw new ReplicationAbortedError(
328+
'Abort signal received - initial replication interrupted.',
329+
this.abortSignal.reason
330+
);
328331
}
329332

330333
if (columns == null) {

modules/module-postgres/src/replication/WalStream.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,10 +139,10 @@ export class WalStream {
139139

140140
// We wrap in our own abort controller so we can trigger abort internally.
141141
options.abort_signal.addEventListener('abort', () => {
142-
this.abortController.abort();
142+
this.abortController.abort(options.abort_signal.reason);
143143
});
144144
if (options.abort_signal.aborted) {
145-
this.abortController.abort();
145+
this.abortController.abort(options.abort_signal.reason);
146146
}
147147

148148
this.snapshotter = new PostgresSnapshotter({ ...options, abort_signal: this.abortSignal });
@@ -620,6 +620,8 @@ export class WalStream {
620620
}
621621
}
622622
);
623+
624+
throw new ReplicationAbortedError(`Replication stream aborted`, this.abortSignal.reason);
623625
}
624626

625627
async ack(lsn: string, replicationStream: pgwire.ReplicationStream) {

0 commit comments

Comments
 (0)