Skip to content

Commit ff32dab

Browse files
committed
Re-apply more test changes.
1 parent 874ccb6 commit ff32dab

File tree

3 files changed

+57
-50
lines changed

3 files changed

+57
-50
lines changed

modules/module-postgres/src/replication/WalStream.ts

Lines changed: 25 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -445,19 +445,8 @@ WHERE oid = $1::regclass`,
445445
// in the case of snapshot retries.
446446
// We could alternatively commit at the replication slot LSN.
447447

448-
// Get the current LSN.
449-
// The data will only be consistent once incremental replication has passed that point.
450-
// We have to get this LSN _after_ we have finished the table snapshots.
451-
//
452-
// There are basically two relevant LSNs here:
453-
// A: The LSN before the snapshot starts. We don't explicitly record this on the PowerSync side,
454-
// but it is implicitly recorded in the replication slot.
455-
// B: The LSN after the table snapshot is complete, which is what we get here.
456-
// When we do the snapshot queries, the data that we get back for each chunk could match the state
457-
// anywhere between A and B. To actually have a consistent state on our side, we need to:
458-
// 1. Complete the snapshot.
459-
// 2. Wait until logical replication has caught up with all the change between A and B.
460-
// Calling `markSnapshotDone(LSN B)` covers that.
448+
// Get the current LSN for hte snapshot.
449+
// We could also use the LSN from the last table snapshto.
461450
const rs = await db.query(`select pg_current_wal_lsn() as lsn`);
462451
const noCommitBefore = rs.rows[0][0];
463452

@@ -504,9 +493,25 @@ WHERE oid = $1::regclass`,
504493
try {
505494
await this.snapshotTable(batch, db, table, limited);
506495

496+
// Get the current LSN.
497+
// The data will only be consistent once incremental replication has passed that point.
498+
// We have to get this LSN _after_ we have finished the table snapshot.
499+
//
500+
// There are basically two relevant LSNs here:
501+
// A: The LSN before the snapshot starts. We don't explicitly record this on the PowerSync side,
502+
// but it is implicitly recorded in the replication slot.
503+
// B: The LSN after the table snapshot is complete, which is what we get here.
504+
// When we do the snapshot queries, the data that we get back for each chunk could match the state
505+
// anywhere between A and B. To actually have a consistent state on our side, we need to:
506+
// 1. Complete the snapshot.
507+
// 2. Wait until logical replication has caught up with all the change between A and B.
508+
// Calling `markSnapshotDone(LSN B)` covers that.
509+
const rs = await db.query(`select pg_current_wal_lsn() as lsn`);
510+
const tableLsnNotBefore = rs.rows[0][0];
511+
507512
// Side note: A ROLLBACK would probably also be fine here, since we only read in this transaction.
508513
await db.query('COMMIT');
509-
const [resultTable] = await batch.markTableSnapshotDone([table]);
514+
const [resultTable] = await batch.markTableSnapshotDone([table], tableLsnNotBefore);
510515
this.relationCache.update(resultTable);
511516
return resultTable;
512517
} catch (e) {
@@ -821,10 +826,13 @@ WHERE oid = $1::regclass`,
821826
try {
822827
// If anything errors here, the entire replication process is halted, and
823828
// all connections automatically closed, including this one.
824-
const initReplicationConnection = await this.connections.replicationConnection();
825-
this.initialSnapshotPromise = this.initReplication(initReplicationConnection);
829+
this.initialSnapshotPromise = (async () => {
830+
const initReplicationConnection = await this.connections.replicationConnection();
831+
await this.initReplication(initReplicationConnection);
832+
await initReplicationConnection.end();
833+
})();
834+
826835
await this.initialSnapshotPromise;
827-
await initReplicationConnection.end();
828836

829837
// At this point, the above connection has often timed out, so we start a new one
830838
const streamReplicationConnection = await this.connections.replicationConnection();

modules/module-postgres/test/src/schema_changes.test.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,10 +56,17 @@ function defineTests(config: storage.TestStorageConfig) {
5656
// Truncate - order doesn't matter
5757
expect(data.slice(2, 4).sort(compareIds)).toMatchObject([REMOVE_T1, REMOVE_T2]);
5858

59-
expect(data.slice(4)).toMatchObject([
59+
expect(data.slice(4, 5)).toMatchObject([
6060
// Snapshot and/or replication insert
6161
PUT_T3
6262
]);
63+
64+
if (data.length > 5) {
65+
expect(data.slice(5)).toMatchObject([
66+
// Replicated insert (optional duplication)
67+
PUT_T3
68+
]);
69+
}
6370
});
6471

6572
test('add table', async () => {

modules/module-postgres/test/src/wal_stream_utils.ts

Lines changed: 24 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -6,21 +6,22 @@ import {
66
initializeCoreReplicationMetrics,
77
InternalOpId,
88
OplogEntry,
9+
settledPromise,
910
storage,
10-
SyncRulesBucketStorage
11+
SyncRulesBucketStorage,
12+
unsettledPromise
1113
} from '@powersync/service-core';
1214
import { METRICS_HELPER, test_utils } from '@powersync/service-core-tests';
1315
import * as pgwire from '@powersync/service-jpgwire';
1416
import { clearTestDb, getClientCheckpoint, TEST_CONNECTION_OPTIONS } from './util.js';
1517
import { CustomTypeRegistry } from '@module/types/registry.js';
18+
import { ReplicationAbortedError } from '@powersync/lib-services-framework';
1619

1720
export class WalStreamTestContext implements AsyncDisposable {
1821
private _walStream?: WalStream;
1922
private abortController = new AbortController();
20-
private streamPromise?: Promise<void>;
2123
public storage?: SyncRulesBucketStorage;
22-
private replicationConnection?: pgwire.PgConnection;
23-
private snapshotPromise?: Promise<void>;
24+
private settledReplicationPromise?: Promise<PromiseSettledResult<void>>;
2425

2526
/**
2627
* Tests operating on the wal stream need to configure the stream and manage asynchronous
@@ -55,21 +56,10 @@ export class WalStreamTestContext implements AsyncDisposable {
5556
await this.dispose();
5657
}
5758

58-
/**
59-
* Clear any errors from startStream, to allow for a graceful dispose when streaming errors
60-
* were expected.
61-
*/
62-
async clearStreamError() {
63-
if (this.streamPromise != null) {
64-
this.streamPromise = this.streamPromise.catch((e) => {});
65-
}
66-
}
67-
6859
async dispose() {
6960
this.abortController.abort();
7061
try {
71-
await this.snapshotPromise;
72-
await this.streamPromise;
62+
await this.settledReplicationPromise;
7363
await this.connectionManager.destroy();
7464
await this.factory?.[Symbol.asyncDispose]();
7565
} catch (e) {
@@ -143,36 +133,38 @@ export class WalStreamTestContext implements AsyncDisposable {
143133
*/
144134
async initializeReplication() {
145135
await this.replicateSnapshot();
146-
this.startStreaming();
147136
// Make sure we're up to date
148137
await this.getCheckpoint();
149138
}
150139

140+
/**
141+
* Replicate the initial snapshot, and start streaming.
142+
*/
151143
async replicateSnapshot() {
152-
const promise = (async () => {
153-
this.replicationConnection = await this.connectionManager.replicationConnection();
154-
await this.walStream.initReplication(this.replicationConnection);
155-
})();
156-
this.snapshotPromise = promise.catch((e) => e);
157-
await promise;
158-
}
159-
160-
startStreaming() {
161-
if (this.replicationConnection == null) {
162-
throw new Error('Call replicateSnapshot() before startStreaming()');
144+
// Use a settledPromise to avoid unhandled rejections
145+
this.settledReplicationPromise = settledPromise(this.walStream.replicate());
146+
try {
147+
await Promise.race([unsettledPromise(this.settledReplicationPromise), this.walStream.waitForInitialSnapshot()]);
148+
} catch (e) {
149+
if (e instanceof ReplicationAbortedError && e.cause != null) {
150+
// Edge case for tests: replicate() can throw an error, but we'd receive the ReplicationAbortedError from
151+
// waitForInitialSnapshot() first. In that case, prioritize the cause, e.g. MissingReplicationSlotError.
152+
// This is not a concern for production use, since we only use waitForInitialSnapshot() in tests.
153+
throw e.cause;
154+
}
155+
throw e;
163156
}
164-
this.streamPromise = this.walStream.streamChanges(this.replicationConnection!);
165157
}
166158

167159
async getCheckpoint(options?: { timeout?: number }) {
168160
let checkpoint = await Promise.race([
169161
getClientCheckpoint(this.pool, this.factory, { timeout: options?.timeout ?? 15_000 }),
170-
this.streamPromise
162+
unsettledPromise(this.settledReplicationPromise!)
171163
]);
172164
if (checkpoint == null) {
173-
// This indicates an issue with the test setup - streamingPromise completed instead
165+
// This indicates an issue with the test setup - replicationPromise completed instead
174166
// of getClientCheckpoint()
175-
throw new Error('Test failure - streamingPromise completed');
167+
throw new Error('Test failure - replicationPromise completed');
176168
}
177169
return checkpoint;
178170
}

0 commit comments

Comments
 (0)