Skip to content

Commit 223ae3e

Browse files
committed
Update APIs to use hydrated sync rules.
1 parent 604bc6a commit 223ae3e

File tree

29 files changed

+298
-319
lines changed

29 files changed

+298
-319
lines changed

modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
import { mongo } from '@powersync/lib-service-mongodb';
2-
import { SqlEventDescriptor, SqliteRow, SqliteValue, SqlSyncRules } from '@powersync/service-sync-rules';
2+
import { SqlEventDescriptor, SqliteRow, SqliteValue, HydratedSyncRules } from '@powersync/service-sync-rules';
33
import * as bson from 'bson';
44

55
import {
66
BaseObserver,
77
container,
8+
logger as defaultLogger,
89
ErrorCode,
910
errors,
1011
Logger,
11-
logger as defaultLogger,
1212
ReplicationAssertionError,
1313
ServiceError
1414
} from '@powersync/lib-services-framework';
@@ -22,13 +22,13 @@ import {
2222
utils
2323
} from '@powersync/service-core';
2424
import * as timers from 'node:timers/promises';
25+
import { idPrefixFilter } from '../../utils/util.js';
2526
import { PowerSyncMongo } from './db.js';
2627
import { CurrentBucket, CurrentDataDocument, SourceKey, SyncRuleDocument } from './models.js';
2728
import { MongoIdSequence } from './MongoIdSequence.js';
2829
import { batchCreateCustomWriteCheckpoints } from './MongoWriteCheckpointAPI.js';
2930
import { cacheKey, OperationBatch, RecordOperation } from './OperationBatch.js';
3031
import { PersistedBatch } from './PersistedBatch.js';
31-
import { idPrefixFilter } from '../../utils/util.js';
3232

3333
/**
3434
* 15MB
@@ -44,7 +44,7 @@ const replicationMutex = new utils.Mutex();
4444

4545
export interface MongoBucketBatchOptions {
4646
db: PowerSyncMongo;
47-
syncRules: SqlSyncRules;
47+
syncRules: HydratedSyncRules;
4848
groupId: number;
4949
slotName: string;
5050
lastCheckpointLsn: string | null;
@@ -71,7 +71,7 @@ export class MongoBucketBatch
7171
private readonly client: mongo.MongoClient;
7272
public readonly db: PowerSyncMongo;
7373
public readonly session: mongo.ClientSession;
74-
private readonly sync_rules: SqlSyncRules;
74+
private readonly sync_rules: HydratedSyncRules;
7575

7676
private readonly group_id: number;
7777

@@ -474,8 +474,7 @@ export class MongoBucketBatch
474474
if (sourceTable.syncData) {
475475
const { results: evaluated, errors: syncErrors } = this.sync_rules.evaluateRowWithErrors({
476476
record: after,
477-
sourceTable,
478-
bucketIdTransformer: SqlSyncRules.versionedBucketIdTransformer(`${this.group_id}`)
477+
sourceTable
479478
});
480479

481480
for (let error of syncErrors) {

modules/module-mongodb-storage/src/storage/implementation/MongoPersistedSyncRules.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { SqlSyncRules } from '@powersync/service-sync-rules';
1+
import { SqlSyncRules, HydratedSyncRules } from '@powersync/service-sync-rules';
22

33
import { storage } from '@powersync/service-core';
44

@@ -13,4 +13,8 @@ export class MongoPersistedSyncRules implements storage.PersistedSyncRules {
1313
) {
1414
this.slot_name = slot_name ?? `powersync_${id}`;
1515
}
16+
17+
hydratedSyncRules(): HydratedSyncRules {
18+
return this.sync_rules.hydrate({ bucketIdTransformer: SqlSyncRules.versionedBucketIdTransformer(`${this.id}`) });
19+
}
1620
}

modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import {
2525
WatchWriteCheckpointOptions
2626
} from '@powersync/service-core';
2727
import { JSONBig } from '@powersync/service-jsonbig';
28-
import { ParameterLookup, SqliteJsonRow, SqlSyncRules } from '@powersync/service-sync-rules';
28+
import { ParameterLookup, SqliteJsonRow, SqlSyncRules, HydratedSyncRules } from '@powersync/service-sync-rules';
2929
import * as bson from 'bson';
3030
import { LRUCache } from 'lru-cache';
3131
import * as timers from 'timers/promises';
@@ -61,7 +61,7 @@ export class MongoSyncBucketStorage
6161
private readonly db: PowerSyncMongo;
6262
readonly checksums: MongoChecksums;
6363

64-
private parsedSyncRulesCache: { parsed: SqlSyncRules; options: storage.ParseSyncRulesOptions } | undefined;
64+
private parsedSyncRulesCache: { parsed: HydratedSyncRules; options: storage.ParseSyncRulesOptions } | undefined;
6565
private writeCheckpointAPI: MongoWriteCheckpointAPI;
6666

6767
constructor(
@@ -101,14 +101,14 @@ export class MongoSyncBucketStorage
101101
});
102102
}
103103

104-
getParsedSyncRules(options: storage.ParseSyncRulesOptions): SqlSyncRules {
104+
getParsedSyncRules(options: storage.ParseSyncRulesOptions): HydratedSyncRules {
105105
const { parsed, options: cachedOptions } = this.parsedSyncRulesCache ?? {};
106106
/**
107107
* Check if the cached sync rules, if present, had the same options.
108108
* Parse sync rules if the options are different or if there is no cached value.
109109
*/
110110
if (!parsed || options.defaultSchema != cachedOptions?.defaultSchema) {
111-
this.parsedSyncRulesCache = { parsed: this.sync_rules.parsed(options).sync_rules, options };
111+
this.parsedSyncRulesCache = { parsed: this.sync_rules.parsed(options).hydratedSyncRules(), options };
112112
}
113113

114114
return this.parsedSyncRulesCache!.parsed;
@@ -170,7 +170,7 @@ export class MongoSyncBucketStorage
170170
await using batch = new MongoBucketBatch({
171171
logger: options.logger,
172172
db: this.db,
173-
syncRules: this.sync_rules.parsed(options).sync_rules,
173+
syncRules: this.sync_rules.parsed(options).hydratedSyncRules(),
174174
groupId: this.group_id,
175175
slotName: this.slot_name,
176176
lastCheckpointLsn: checkpoint_lsn,

modules/module-mongodb/src/replication/ChangeStream.ts

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,20 @@ import {
1010
ServiceError
1111
} from '@powersync/lib-services-framework';
1212
import {
13-
InternalOpId,
1413
MetricsEngine,
1514
RelationCache,
1615
SaveOperationTag,
1716
SourceEntityDescriptor,
1817
SourceTable,
1918
storage
2019
} from '@powersync/service-core';
21-
import { DatabaseInputRow, SqliteInputRow, SqliteRow, SqlSyncRules, TablePattern } from '@powersync/service-sync-rules';
20+
import {
21+
DatabaseInputRow,
22+
SqliteInputRow,
23+
SqliteRow,
24+
HydratedSyncRules,
25+
TablePattern
26+
} from '@powersync/service-sync-rules';
2227
import { ReplicationMetric } from '@powersync/service-types';
2328
import { MongoLSN } from '../common/MongoLSN.js';
2429
import { PostImagesOption } from '../types/types.js';
@@ -75,7 +80,7 @@ export class ChangeStreamInvalidatedError extends DatabaseConnectionError {
7580
}
7681

7782
export class ChangeStream {
78-
sync_rules: SqlSyncRules;
83+
sync_rules: HydratedSyncRules;
7984
group_id: number;
8085

8186
connection_id = 1;

modules/module-mssql/src/replication/CDCStream.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,13 @@ import {
1010
} from '@powersync/lib-services-framework';
1111
import { getUuidReplicaIdentityBson, MetricsEngine, SourceEntityDescriptor, storage } from '@powersync/service-core';
1212

13-
import { SqliteInputRow, SqliteRow, SqlSyncRules, TablePattern } from '@powersync/service-sync-rules';
13+
import {
14+
SqliteInputRow,
15+
SqliteRow,
16+
SqlSyncRules,
17+
HydratedSyncRules,
18+
TablePattern
19+
} from '@powersync/service-sync-rules';
1420

1521
import { ReplicationMetric } from '@powersync/service-types';
1622
import { BatchedSnapshotQuery, MSSQLSnapshotQuery, SimpleSnapshotQuery } from './MSSQLSnapshotQuery.js';
@@ -82,7 +88,7 @@ export class CDCDataExpiredError extends DatabaseConnectionError {
8288
}
8389

8490
export class CDCStream {
85-
private readonly syncRules: SqlSyncRules;
91+
private readonly syncRules: HydratedSyncRules;
8692
private readonly storage: storage.SyncRulesBucketStorage;
8793
private readonly connections: MSSQLConnectionManager;
8894
private readonly abortSignal: AbortSignal;

modules/module-mysql/src/replication/BinLogStream.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ function createTableId(schema: string, tableName: string): string {
6161
}
6262

6363
export class BinLogStream {
64-
private readonly syncRules: sync_rules.SqlSyncRules;
64+
private readonly syncRules: sync_rules.HydratedSyncRules;
6565
private readonly groupId: number;
6666

6767
private readonly storage: storage.SyncRulesBucketStorage;

modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,9 @@ export class PostgresSyncRulesStorage
6060
protected writeCheckpointAPI: PostgresWriteCheckpointAPI;
6161

6262
// TODO we might be able to share this in an abstract class
63-
private parsedSyncRulesCache: { parsed: sync_rules.SqlSyncRules; options: storage.ParseSyncRulesOptions } | undefined;
63+
private parsedSyncRulesCache:
64+
| { parsed: sync_rules.HydratedSyncRules; options: storage.ParseSyncRulesOptions }
65+
| undefined;
6466
private _checksumCache: storage.ChecksumCache | undefined;
6567

6668
constructor(protected options: PostgresSyncRulesStorageOptions) {
@@ -96,14 +98,14 @@ export class PostgresSyncRulesStorage
9698
}
9799

98100
// TODO we might be able to share this in an abstract class
99-
getParsedSyncRules(options: storage.ParseSyncRulesOptions): sync_rules.SqlSyncRules {
101+
getParsedSyncRules(options: storage.ParseSyncRulesOptions): sync_rules.HydratedSyncRules {
100102
const { parsed, options: cachedOptions } = this.parsedSyncRulesCache ?? {};
101103
/**
102104
* Check if the cached sync rules, if present, had the same options.
103105
* Parse sync rules if the options are different or if there is no cached value.
104106
*/
105107
if (!parsed || options.defaultSchema != cachedOptions?.defaultSchema) {
106-
this.parsedSyncRulesCache = { parsed: this.sync_rules.parsed(options).sync_rules, options };
108+
this.parsedSyncRulesCache = { parsed: this.sync_rules.parsed(options).hydratedSyncRules(), options };
107109
}
108110

109111
return this.parsedSyncRulesCache!.parsed;
@@ -349,7 +351,7 @@ export class PostgresSyncRulesStorage
349351
const batch = new PostgresBucketBatch({
350352
logger: options.logger ?? framework.logger,
351353
db: this.db,
352-
sync_rules: this.sync_rules.parsed(options).sync_rules,
354+
sync_rules: this.sync_rules.parsed(options).hydratedSyncRules(),
353355
group_id: this.group_id,
354356
slot_name: this.slot_name,
355357
last_checkpoint_lsn: checkpoint_lsn,

modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import { PostgresPersistedBatch } from './PostgresPersistedBatch.js';
2424
export interface PostgresBucketBatchOptions {
2525
logger: Logger;
2626
db: lib_postgres.DatabaseClient;
27-
sync_rules: sync_rules.SqlSyncRules;
27+
sync_rules: sync_rules.HydratedSyncRules;
2828
group_id: number;
2929
slot_name: string;
3030
last_checkpoint_lsn: string | null;
@@ -72,7 +72,7 @@ export class PostgresBucketBatch
7272
protected persisted_op: InternalOpId | null;
7373

7474
protected write_checkpoint_batch: storage.CustomWriteCheckpointOptions[];
75-
protected readonly sync_rules: sync_rules.SqlSyncRules;
75+
protected readonly sync_rules: sync_rules.HydratedSyncRules;
7676
protected batch: OperationBatch | null;
7777
private lastWaitingLogThrottled = 0;
7878
private markRecordUnavailable: BucketStorageMarkRecordUnavailable | undefined;
@@ -840,8 +840,7 @@ export class PostgresBucketBatch
840840
if (sourceTable.syncData) {
841841
const { results: evaluated, errors: syncErrors } = this.sync_rules.evaluateRowWithErrors({
842842
record: after,
843-
sourceTable,
844-
bucketIdTransformer: sync_rules.SqlSyncRules.versionedBucketIdTransformer(`${this.group_id}`)
843+
sourceTable
845844
});
846845

847846
for (const error of syncErrors) {

modules/module-postgres-storage/src/storage/sync-rules/PostgresPersistedSyncRulesContent.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,12 @@ export class PostgresPersistedSyncRulesContent implements storage.PersistedSyncR
3535
return {
3636
id: this.id,
3737
slot_name: this.slot_name,
38-
sync_rules: SqlSyncRules.fromYaml(this.sync_rules_content, options)
38+
sync_rules: SqlSyncRules.fromYaml(this.sync_rules_content, options),
39+
hydratedSyncRules() {
40+
return this.sync_rules.hydrate({
41+
bucketIdTransformer: SqlSyncRules.versionedBucketIdTransformer(`${this.id}`)
42+
});
43+
}
3944
};
4045
}
4146

modules/module-postgres/src/replication/WalStream.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import {
2727
SqliteInputValue,
2828
SqliteRow,
2929
SqlSyncRules,
30+
HydratedSyncRules,
3031
TablePattern,
3132
ToastableSqliteRow,
3233
toSyncRulesRow
@@ -107,7 +108,7 @@ export class MissingReplicationSlotError extends Error {
107108
}
108109

109110
export class WalStream {
110-
sync_rules: SqlSyncRules;
111+
sync_rules: HydratedSyncRules;
111112
group_id: number;
112113

113114
connection_id = 1;

0 commit comments

Comments
 (0)