Skip to content

Commit 7eb7957

Browse files
authored
[Postgres] Remove usage of pg_logical_slot_peek_binary_changes (#390)
* Remove usage of pg_logical_slot_peek_binary_changes. * More comments. * Clean up error handling. * Add changeset.
1 parent d0a3cc3 commit 7eb7957

File tree

4 files changed

+116
-127
lines changed

4 files changed

+116
-127
lines changed

.changeset/serious-icons-drop.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
'@powersync/service-module-postgres': patch
3+
'@powersync/service-core': patch
4+
'@powersync/service-image': patch
5+
---
6+
7+
[Postgres] Remove usage of pg_logical_slot_peek_binary_changes due to performance issues in some cases

modules/module-postgres/src/replication/WalStream.ts

Lines changed: 71 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import {
44
DatabaseConnectionError,
55
logger as defaultLogger,
66
ErrorCode,
7-
errors,
87
Logger,
98
ReplicationAbortedError,
109
ReplicationAssertionError
@@ -100,8 +99,10 @@ export const sendKeepAlive = async (db: pgwire.PgClient) => {
10099
};
101100

102101
export class MissingReplicationSlotError extends Error {
103-
constructor(message: string) {
102+
constructor(message: string, cause?: any) {
104103
super(message);
104+
105+
this.cause = cause;
105106
}
106107
}
107108

@@ -304,135 +305,54 @@ export class WalStream {
304305
})
305306
)[0];
306307

308+
// Previously we also used pg_catalog.pg_logical_slot_peek_binary_changes to confirm that we can query the slot.
309+
// However, there were some edge cases where the query times out, repeating the query, ultimately
310+
// causing high load on the source database and never recovering automatically.
311+
// We now instead jump straight to replication if the wal_status is not "lost", rather detecting those
312+
// errors during streaming replication, which is a little more robust.
313+
314+
// We can have:
315+
// 1. needsInitialSync: true, lost slot -> MissingReplicationSlotError (starts new sync rules version).
316+
// Theoretically we could handle this the same as (2).
317+
// 2. needsInitialSync: true, no slot -> create new slot
318+
// 3. needsInitialSync: true, valid slot -> resume initial sync
319+
// 4. needsInitialSync: false, lost slot -> MissingReplicationSlotError (starts new sync rules version)
320+
// 5. needsInitialSync: false, no slot -> MissingReplicationSlotError (starts new sync rules version)
321+
// 6. needsInitialSync: false, valid slot -> resume streaming replication
322+
// The main advantage of MissingReplicationSlotError are:
323+
// 1. If there was a complete snapshot already (cases 4/5), users can still sync from that snapshot while
324+
// we do the reprocessing under a new slot name.
325+
// 2. If there was a partial snapshot (case 1), we can start with the new slot faster by not waiting for
326+
// the partial data to be cleared.
307327
if (slot != null) {
308328
// This checks that the slot is still valid
309-
const r = await this.checkReplicationSlot(slot as any);
310-
if (snapshotDone && r.needsNewSlot) {
311-
// We keep the current snapshot, and create a new replication slot
312-
throw new MissingReplicationSlotError(`Replication slot ${slotName} is not valid anymore`);
329+
330+
// wal_status is present in postgres 13+
331+
// invalidation_reason is present in postgres 17+
332+
const lost = slot.wal_status == 'lost';
333+
if (lost) {
334+
// Case 1 / 4
335+
throw new MissingReplicationSlotError(
336+
`Replication slot ${slotName} is not valid anymore. invalidation_reason: ${slot.invalidation_reason ?? 'unknown'}`
337+
);
313338
}
314-
// We can have:
315-
// needsInitialSync: true, needsNewSlot: true -> initial sync from scratch
316-
// needsInitialSync: true, needsNewSlot: false -> resume initial sync
317-
// needsInitialSync: false, needsNewSlot: true -> handled above
318-
// needsInitialSync: false, needsNewSlot: false -> resume streaming replication
339+
// Case 3 / 6
319340
return {
320341
needsInitialSync: !snapshotDone,
321-
needsNewSlot: r.needsNewSlot
342+
needsNewSlot: false
322343
};
323344
} else {
324345
if (snapshotDone) {
346+
// Case 5
325347
// This will create a new slot, while keeping the current sync rules active
326348
throw new MissingReplicationSlotError(`Replication slot ${slotName} is missing`);
327349
}
328-
// This will clear data and re-create the same slot
350+
// Case 2
351+
// This will clear data (if any) and re-create the same slot
329352
return { needsInitialSync: true, needsNewSlot: true };
330353
}
331354
}
332355

333-
/**
334-
* If a replication slot exists, check that it is healthy.
335-
*/
336-
private async checkReplicationSlot(slot: {
337-
// postgres 13+
338-
wal_status?: string;
339-
// postgres 17+
340-
invalidation_reason?: string | null;
341-
}): Promise<{ needsNewSlot: boolean }> {
342-
// Start with a placeholder error, should be replaced if there is an actual issue.
343-
let last_error = new ReplicationAssertionError(`Slot health check failed to execute`);
344-
345-
const slotName = this.slot_name;
346-
347-
const lost = slot.wal_status == 'lost';
348-
if (lost) {
349-
this.logger.warn(
350-
`Replication slot ${slotName} is invalidated. invalidation_reason: ${slot.invalidation_reason ?? 'unknown'}`
351-
);
352-
return {
353-
needsNewSlot: true
354-
};
355-
}
356-
357-
// Check that replication slot exists, trying for up to 2 minutes.
358-
const startAt = performance.now();
359-
while (performance.now() - startAt < 120_000) {
360-
this.touch();
361-
362-
try {
363-
// We peek a large number of changes here, to make it more likely to pick up replication slot errors.
364-
// For example, "publication does not exist" only occurs here if the peek actually includes changes related
365-
// to the slot.
366-
this.logger.info(`Checking ${slotName}`);
367-
368-
// The actual results can be quite large, so we don't actually return everything
369-
// due to memory and processing overhead that would create.
370-
const cursor = await this.connections.pool.stream({
371-
statement: `SELECT 1 FROM pg_catalog.pg_logical_slot_peek_binary_changes($1, NULL, 1000, 'proto_version', '1', 'publication_names', $2)`,
372-
params: [
373-
{ type: 'varchar', value: slotName },
374-
{ type: 'varchar', value: PUBLICATION_NAME }
375-
]
376-
});
377-
378-
for await (let _chunk of cursor) {
379-
// No-op, just exhaust the cursor
380-
}
381-
382-
// Success
383-
this.logger.info(`Slot ${slotName} appears healthy`);
384-
return { needsNewSlot: false };
385-
} catch (e) {
386-
last_error = e;
387-
this.logger.warn(`Replication slot error`, e);
388-
389-
if (this.stopped) {
390-
throw e;
391-
}
392-
393-
if (
394-
/incorrect prev-link/.test(e.message) ||
395-
/replication slot.*does not exist/.test(e.message) ||
396-
/publication.*does not exist/.test(e.message) ||
397-
// Postgres 18 - exceeded max_slot_wal_keep_size
398-
/can no longer access replication slot/.test(e.message) ||
399-
// Postgres 17 - exceeded max_slot_wal_keep_size
400-
/can no longer get changes from replication slot/.test(e.message)
401-
) {
402-
// Fatal error. In most cases since Postgres 13+, the `wal_status == 'lost'` check should pick this up, but this
403-
// works as a fallback.
404-
405-
container.reporter.captureException(e, {
406-
level: errors.ErrorSeverity.WARNING,
407-
metadata: {
408-
replication_slot: slotName
409-
}
410-
});
411-
// Sample: record with incorrect prev-link 10000/10000 at 0/18AB778
412-
// Seen during development. Some internal error, fixed by re-creating slot.
413-
//
414-
// Sample: publication "powersync" does not exist
415-
// Happens when publication deleted or never created.
416-
// Slot must be re-created in this case.
417-
this.logger.info(`${slotName} is not valid anymore`);
418-
419-
return { needsNewSlot: true };
420-
}
421-
// Try again after a pause
422-
await new Promise((resolve) => setTimeout(resolve, 1000));
423-
}
424-
}
425-
426-
container.reporter.captureException(last_error, {
427-
level: errors.ErrorSeverity.ERROR,
428-
metadata: {
429-
replication_slot: slotName
430-
}
431-
});
432-
433-
throw last_error;
434-
}
435-
436356
async estimatedCountNumber(db: pgwire.PgConnection, table: storage.SourceTable): Promise<number> {
437357
const results = await db.query({
438358
statement: `SELECT reltuples::bigint AS estimate
@@ -915,6 +835,17 @@ WHERE oid = $1::regclass`,
915835
}
916836

917837
async streamChanges(replicationConnection: pgwire.PgConnection) {
838+
try {
839+
await this.streamChangesInternal(replicationConnection);
840+
} catch (e) {
841+
if (isReplicationSlotInvalidError(e)) {
842+
throw new MissingReplicationSlotError(e.message, e);
843+
}
844+
throw e;
845+
}
846+
}
847+
848+
private async streamChangesInternal(replicationConnection: pgwire.PgConnection) {
918849
// When changing any logic here, check /docs/wal-lsns.md.
919850
const { createEmptyCheckpoints } = await this.ensureStorageCompatibility();
920851

@@ -1179,3 +1110,27 @@ WHERE oid = $1::regclass`,
11791110
});
11801111
}
11811112
}
1113+
1114+
function isReplicationSlotInvalidError(e: any) {
1115+
// We could access the error code from pgwire using this:
1116+
// e[Symbol.for('pg.ErrorCode')]
1117+
// However, we typically get a generic code such as 42704 (undefined_object), which does not
1118+
// help much. So we check the actual error message.
1119+
const message = e.message ?? '';
1120+
1121+
// Sample: record with incorrect prev-link 10000/10000 at 0/18AB778
1122+
// Seen during development. Some internal error, fixed by re-creating slot.
1123+
//
1124+
// Sample: publication "powersync" does not exist
1125+
// Happens when publication deleted or never created.
1126+
// Slot must be re-created in this case.
1127+
return (
1128+
/incorrect prev-link/.test(message) ||
1129+
/replication slot.*does not exist/.test(message) ||
1130+
/publication.*does not exist/.test(message) ||
1131+
// Postgres 18 - exceeded max_slot_wal_keep_size
1132+
/can no longer access replication slot/.test(message) ||
1133+
// Postgres 17 - exceeded max_slot_wal_keep_size
1134+
/can no longer get changes from replication slot/.test(message)
1135+
);
1136+
}

modules/module-postgres/test/src/wal_stream.test.ts

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -295,7 +295,7 @@ bucket_definitions:
295295
`INSERT INTO test_data(id, description) VALUES('8133cd37-903b-4937-a022-7c8294015a3a', 'test1') returning id as test_id`
296296
);
297297
await context.replicateSnapshot();
298-
await context.startStreaming();
298+
context.startStreaming();
299299

300300
const data = await context.getBucketData('global[]');
301301

@@ -320,17 +320,25 @@ bucket_definitions:
320320

321321
await context.loadActiveSyncRules();
322322

323+
// Previously, the `replicateSnapshot` call picked up on this error.
324+
// Now, we have removed that check, this only comes up when we start actually streaming.
325+
// We don't get the streaming response directly here, but getCheckpoint() checks for that.
326+
await context.replicateSnapshot();
327+
context.startStreaming();
328+
323329
if (serverVersion!.compareMain('18.0.0') >= 0) {
324-
await context.replicateSnapshot();
325330
// No error expected in Postres 18. Replication keeps on working depite the
326331
// publication being re-created.
332+
await context.getCheckpoint();
327333
} else {
334+
// await context.getCheckpoint();
328335
// Postgres < 18 invalidates the replication slot when the publication is re-created.
329-
// The error is handled on a higher level, which triggers
336+
// In the service, this error is handled in WalStreamReplicationJob,
330337
// creating a new replication slot.
331338
await expect(async () => {
332-
await context.replicateSnapshot();
339+
await context.getCheckpoint();
333340
}).rejects.toThrowError(MissingReplicationSlotError);
341+
context.clearStreamError();
334342
}
335343
}
336344
});
@@ -352,7 +360,7 @@ bucket_definitions:
352360
`INSERT INTO test_data(id, description) VALUES('8133cd37-903b-4937-a022-7c8294015a3a', 'test1') returning id as test_id`
353361
);
354362
await context.replicateSnapshot();
355-
await context.startStreaming();
363+
context.startStreaming();
356364

357365
const data = await context.getBucketData('global[]');
358366

@@ -415,7 +423,7 @@ bucket_definitions:
415423
`INSERT INTO test_data(id, description) VALUES('8133cd37-903b-4937-a022-7c8294015a3a', 'test1') returning id as test_id`
416424
);
417425
await context.replicateSnapshot();
418-
await context.startStreaming();
426+
context.startStreaming();
419427

420428
const data = await context.getBucketData('global[]');
421429

@@ -572,7 +580,7 @@ config:
572580
);
573581

574582
await context.replicateSnapshot();
575-
await context.startStreaming();
583+
context.startStreaming();
576584

577585
await pool.query(`UPDATE test_data SET description = 'test2' WHERE id = '${test_id}'`);
578586

modules/module-postgres/test/src/wal_stream_utils.ts

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,12 +55,31 @@ export class WalStreamTestContext implements AsyncDisposable {
5555
await this.dispose();
5656
}
5757

58+
/**
59+
* Clear any errors from startStream, to allow for a graceful dispose when streaming errors
60+
* were expected.
61+
*/
62+
async clearStreamError() {
63+
if (this.streamPromise != null) {
64+
this.streamPromise = this.streamPromise.catch((e) => {});
65+
}
66+
}
67+
5868
async dispose() {
5969
this.abortController.abort();
60-
await this.snapshotPromise;
61-
await this.streamPromise;
62-
await this.connectionManager.destroy();
63-
await this.factory?.[Symbol.asyncDispose]();
70+
try {
71+
await this.snapshotPromise;
72+
await this.streamPromise;
73+
await this.connectionManager.destroy();
74+
await this.factory?.[Symbol.asyncDispose]();
75+
} catch (e) {
76+
// Throwing here may result in SuppressedError. The underlying errors often don't show up
77+
// in the test output, so we log it here.
78+
// If we could get vitest to log SuppressedError.error and SuppressedError.suppressed, we
79+
// could remove this.
80+
console.error('Error during WalStreamTestContext dispose', e);
81+
throw e;
82+
}
6483
}
6584

6685
get pool() {

0 commit comments

Comments
 (0)