Skip to content

Commit 874ccb6

Browse files
committed
Re-apply some postgres changes.
1 parent 8447d1d commit 874ccb6

File tree

3 files changed

+47
-34
lines changed

3 files changed

+47
-34
lines changed

modules/module-postgres/src/replication/WalStream.ts

Lines changed: 35 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,8 @@ export class WalStream {
144144
*/
145145
private isStartingReplication = true;
146146

147+
private initialSnapshotPromise: Promise<void> | null = null;
148+
147149
constructor(options: WalStreamOptions) {
148150
this.logger = options.logger ?? defaultLogger;
149151
this.storage = options.storage;
@@ -442,6 +444,24 @@ WHERE oid = $1::regclass`,
442444
// This makes sure we don't skip any changes applied before starting this snapshot,
443445
// in the case of snapshot retries.
444446
// We could alternatively commit at the replication slot LSN.
447+
448+
// Get the current LSN.
449+
// The data will only be consistent once incremental replication has passed that point.
450+
// We have to get this LSN _after_ we have finished the table snapshots.
451+
//
452+
// There are basically two relevant LSNs here:
453+
// A: The LSN before the snapshot starts. We don't explicitly record this on the PowerSync side,
454+
// but it is implicitly recorded in the replication slot.
455+
// B: The LSN after the table snapshot is complete, which is what we get here.
456+
// When we do the snapshot queries, the data that we get back for each chunk could match the state
457+
// anywhere between A and B. To actually have a consistent state on our side, we need to:
458+
// 1. Complete the snapshot.
459+
// 2. Wait until logical replication has caught up with all the change between A and B.
460+
// Calling `markSnapshotDone(LSN B)` covers that.
461+
const rs = await db.query(`select pg_current_wal_lsn() as lsn`);
462+
const noCommitBefore = rs.rows[0][0];
463+
464+
await batch.markAllSnapshotDone(noCommitBefore);
445465
await batch.commit(ZERO_LSN);
446466
}
447467
);
@@ -482,27 +502,11 @@ WHERE oid = $1::regclass`,
482502
// replication afterwards.
483503
await db.query('BEGIN');
484504
try {
485-
let tableLsnNotBefore: string;
486505
await this.snapshotTable(batch, db, table, limited);
487506

488-
// Get the current LSN.
489-
// The data will only be consistent once incremental replication has passed that point.
490-
// We have to get this LSN _after_ we have finished the table snapshot.
491-
//
492-
// There are basically two relevant LSNs here:
493-
// A: The LSN before the snapshot starts. We don't explicitly record this on the PowerSync side,
494-
// but it is implicitly recorded in the replication slot.
495-
// B: The LSN after the table snapshot is complete, which is what we get here.
496-
// When we do the snapshot queries, the data that we get back for each chunk could match the state
497-
// anywhere between A and B. To actually have a consistent state on our side, we need to:
498-
// 1. Complete the snapshot.
499-
// 2. Wait until logical replication has caught up with all the change between A and B.
500-
// Calling `markSnapshotDone(LSN B)` covers that.
501-
const rs = await db.query(`select pg_current_wal_lsn() as lsn`);
502-
tableLsnNotBefore = rs.rows[0][0];
503507
// Side note: A ROLLBACK would probably also be fine here, since we only read in this transaction.
504508
await db.query('COMMIT');
505-
const [resultTable] = await batch.markSnapshotDone([table], tableLsnNotBefore);
509+
const [resultTable] = await batch.markTableSnapshotDone([table]);
506510
this.relationCache.update(resultTable);
507511
return resultTable;
508512
} catch (e) {
@@ -818,7 +822,8 @@ WHERE oid = $1::regclass`,
818822
// If anything errors here, the entire replication process is halted, and
819823
// all connections automatically closed, including this one.
820824
const initReplicationConnection = await this.connections.replicationConnection();
821-
await this.initReplication(initReplicationConnection);
825+
this.initialSnapshotPromise = this.initReplication(initReplicationConnection);
826+
await this.initialSnapshotPromise;
822827
await initReplicationConnection.end();
823828

824829
// At this point, the above connection has often timed out, so we start a new one
@@ -831,6 +836,18 @@ WHERE oid = $1::regclass`,
831836
}
832837
}
833838

839+
/**
840+
* After calling replicate(), call this to wait for the initial snapshot to complete.
841+
*
842+
* For tests only.
843+
*/
844+
async waitForInitialSnapshot() {
845+
if (this.initialSnapshotPromise == null) {
846+
throw new ReplicationAssertionError(`Initial snapshot not started yet`);
847+
}
848+
return this.initialSnapshotPromise;
849+
}
850+
834851
async initReplication(replicationConnection: pgwire.PgConnection) {
835852
const result = await this.initSlot();
836853
if (result.needsInitialSync) {

modules/module-postgres/test/src/pg_test.test.ts

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,16 @@
1+
import { WalStream } from '@module/replication/WalStream.js';
2+
import { PostgresTypeResolver } from '@module/types/resolver.js';
13
import * as pgwire from '@powersync/service-jpgwire';
24
import {
35
applyRowContext,
46
CompatibilityContext,
5-
SqliteInputRow,
7+
CompatibilityEdition,
68
DateTimeValue,
7-
TimeValue,
8-
CompatibilityEdition
9+
SqliteInputRow,
10+
TimeValue
911
} from '@powersync/service-sync-rules';
1012
import { describe, expect, test } from 'vitest';
1113
import { clearTestDb, connectPgPool, connectPgWire, TEST_URI } from './util.js';
12-
import { WalStream } from '@module/replication/WalStream.js';
13-
import { PostgresTypeResolver } from '@module/types/resolver.js';
14-
import { CustomTypeRegistry } from '@module/types/registry.js';
15-
import { PostgresSnapshotter } from '@module/replication/PostgresSnapshotter.js';
1614

1715
describe('pg data types', () => {
1816
async function setupTable(db: pgwire.PgClient) {
@@ -304,7 +302,7 @@ VALUES(10, ARRAY['null']::TEXT[]);
304302
await insert(db);
305303

306304
const transformed = [
307-
...PostgresSnapshotter.getQueryData(pgwire.pgwireRows(await db.query(`SELECT * FROM test_data ORDER BY id`)))
305+
...WalStream.getQueryData(pgwire.pgwireRows(await db.query(`SELECT * FROM test_data ORDER BY id`)))
308306
];
309307

310308
checkResults(transformed);
@@ -323,7 +321,7 @@ VALUES(10, ARRAY['null']::TEXT[]);
323321
await insert(db);
324322

325323
const transformed = [
326-
...PostgresSnapshotter.getQueryData(
324+
...WalStream.getQueryData(
327325
pgwire.pgwireRows(
328326
await db.query({
329327
statement: `SELECT * FROM test_data WHERE $1 ORDER BY id`,
@@ -347,9 +345,7 @@ VALUES(10, ARRAY['null']::TEXT[]);
347345
await insertArrays(db);
348346

349347
const transformed = [
350-
...PostgresSnapshotter.getQueryData(
351-
pgwire.pgwireRows(await db.query(`SELECT * FROM test_data_arrays ORDER BY id`))
352-
)
348+
...WalStream.getQueryData(pgwire.pgwireRows(await db.query(`SELECT * FROM test_data_arrays ORDER BY id`)))
353349
].map((e) => applyRowContext(e, CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY));
354350

355351
checkResultArrays(transformed);
@@ -452,7 +448,7 @@ INSERT INTO test_data(id, time, timestamp, timestamptz) VALUES (1, '17:42:01.12'
452448
`);
453449

454450
const [row] = [
455-
...PostgresSnapshotter.getQueryData(
451+
...WalStream.getQueryData(
456452
pgwire.pgwireRows(await db.query(`SELECT time, timestamp, timestamptz FROM test_data`))
457453
)
458454
];

modules/module-postgres/test/src/util.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { PostgresRouteAPIAdapter } from '@module/api/PostgresRouteAPIAdapter.js'
22
import * as types from '@module/types/types.js';
33
import * as lib_postgres from '@powersync/lib-service-postgres';
44
import { logger } from '@powersync/lib-services-framework';
5-
import { BucketStorageFactory, InternalOpId, TestStorageFactory } from '@powersync/service-core';
5+
import { BucketStorageFactory, InternalOpId, TestStorageConfig, TestStorageFactory } from '@powersync/service-core';
66
import * as pgwire from '@powersync/service-jpgwire';
77
import * as mongo_storage from '@powersync/service-module-mongodb-storage';
88
import * as postgres_storage from '@powersync/service-module-postgres-storage';
@@ -16,11 +16,11 @@ export const INITIALIZED_MONGO_STORAGE_FACTORY = mongo_storage.test_utils.mongoT
1616
isCI: env.CI
1717
});
1818

19-
export const INITIALIZED_POSTGRES_STORAGE_FACTORY = postgres_storage.test_utils.postgresTestStorageFactoryGenerator({
19+
export const INITIALIZED_POSTGRES_STORAGE_FACTORY = postgres_storage.test_utils.postgresTestSetup({
2020
url: env.PG_STORAGE_TEST_URL
2121
});
2222

23-
export function describeWithStorage(options: TestOptions, fn: (factory: TestStorageFactory) => void) {
23+
export function describeWithStorage(options: TestOptions, fn: (factory: TestStorageConfig) => void) {
2424
describe.skipIf(!env.TEST_MONGO_STORAGE)(`mongodb storage`, options, function () {
2525
fn(INITIALIZED_MONGO_STORAGE_FACTORY);
2626
});

0 commit comments

Comments
 (0)