Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
ca716a7
Initial split of data vs parameter sources.
rkistner Dec 8, 2025
604bc6a
Split out sync rule definitions from compiled ones.
rkistner Dec 9, 2025
223ae3e
Update APIs to use hydrated sync rules.
rkistner Dec 9, 2025
2b7787d
Refactor parameter queries.
rkistner Dec 9, 2025
308477a
Fix tests.
rkistner Dec 9, 2025
3f378db
Remove redundant functions.
rkistner Dec 9, 2025
c1fd10d
Split out parameter lookup sources from parameter querier sources.
rkistner Dec 9, 2025
028c1d9
Bring back BucketSource.
rkistner Dec 10, 2025
eb0b860
Cleanup.
rkistner Dec 10, 2025
011ccb4
Cleanup imports.
rkistner Dec 10, 2025
0049d54
Split out SyncStream into its components.
rkistner Dec 10, 2025
ec5851a
Support multiple data sources.
rkistner Dec 10, 2025
ddcb8a0
Use separate sources per sync stream variant.
rkistner Dec 10, 2025
18bc3bf
Move debugRepresentation() back to BucketSource.
rkistner Dec 10, 2025
bcd6f9b
Merge remote-tracking branch 'origin/main' into granular-sync-rules
rkistner Dec 10, 2025
3bddef9
Split out dataSource on SqlBucketDescriptor.
rkistner Dec 10, 2025
60290ae
Move SubqueryParameterLookupSource deeper to be specific to variants.
rkistner Dec 10, 2025
9d8f7fb
Use HydrationState for bucket names.
rkistner Dec 10, 2025
1075d04
Remove some unused descriptorName references.
rkistner Dec 10, 2025
fb6f76a
Refactor parameter lookups to make the names configurable.
rkistner Dec 10, 2025
bf7fb60
More minor refactoring.
rkistner Dec 10, 2025
8a6b40d
Some comments.
rkistner Dec 10, 2025
1eecae9
Import cleanup.
rkistner Dec 10, 2025
ae08fd3
Remove BucketIdTransformer.
rkistner Dec 11, 2025
1c1d55f
Add a stream test with custom hydrationState.
rkistner Dec 11, 2025
f919d20
Add changeset.
rkistner Dec 11, 2025
af096ea
Merge remote-tracking branch 'origin/main' into granular-sync-rules
rkistner Dec 12, 2025
1679f4a
Remove need for hydration on BucketDataSource.
rkistner Dec 12, 2025
448e84c
Rename defaultBucketPrefix -> uniqueName.
rkistner Dec 12, 2025
7f60a4b
Remove hydration for ParameterLookupSource.
rkistner Dec 12, 2025
68c69e6
Rename BucketParameterLookupSource -> ParameterIndexLookupCreator.
rkistner Dec 12, 2025
f8f432d
Merge BucketParameterQuerierSourceDefinition into HydratedBucketSource.
rkistner Dec 12, 2025
d130411
Merge remote-tracking branch 'origin/main' into granular-sync-rules
rkistner Dec 12, 2025
b3a5ec3
Remove HydrationState generics.
rkistner Dec 15, 2025
46fc9ac
Add some hydration tests.
rkistner Dec 15, 2025
ea0c1c0
Add "end-to-end" sync rules test.
rkistner Dec 15, 2025
8602a75
Merge remote-tracking branch 'origin/main' into granular-sync-rules
rkistner Dec 15, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions .changeset/fresh-geckos-develop.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
---
'@powersync/service-module-postgres-storage': minor
'@powersync/service-module-mongodb-storage': minor
'@powersync/service-core-tests': minor
'@powersync/service-module-postgres': minor
'@powersync/service-module-mongodb': minor
'@powersync/service-core': minor
'@powersync/service-module-mssql': minor
'@powersync/service-module-mysql': minor
'@powersync/service-sync-rules': minor
---

[Internal] Refactor sync rule representation to split out the parsed definitions from the hydrated state.
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import { mongo } from '@powersync/lib-service-mongodb';
import { SqlEventDescriptor, SqliteRow, SqliteValue, SqlSyncRules } from '@powersync/service-sync-rules';
import { SqlEventDescriptor, SqliteRow, SqliteValue, HydratedSyncRules } from '@powersync/service-sync-rules';
import * as bson from 'bson';

import {
BaseObserver,
container,
logger as defaultLogger,
ErrorCode,
errors,
Logger,
logger as defaultLogger,
ReplicationAssertionError,
ServiceError
} from '@powersync/lib-services-framework';
Expand All @@ -22,13 +22,13 @@ import {
utils
} from '@powersync/service-core';
import * as timers from 'node:timers/promises';
import { idPrefixFilter } from '../../utils/util.js';
import { PowerSyncMongo } from './db.js';
import { CurrentBucket, CurrentDataDocument, SourceKey, SyncRuleDocument } from './models.js';
import { MongoIdSequence } from './MongoIdSequence.js';
import { batchCreateCustomWriteCheckpoints } from './MongoWriteCheckpointAPI.js';
import { cacheKey, OperationBatch, RecordOperation } from './OperationBatch.js';
import { PersistedBatch } from './PersistedBatch.js';
import { idPrefixFilter } from '../../utils/util.js';

/**
* 15MB
Expand All @@ -44,7 +44,7 @@ const replicationMutex = new utils.Mutex();

export interface MongoBucketBatchOptions {
db: PowerSyncMongo;
syncRules: SqlSyncRules;
syncRules: HydratedSyncRules;
groupId: number;
slotName: string;
lastCheckpointLsn: string | null;
Expand All @@ -71,7 +71,7 @@ export class MongoBucketBatch
private readonly client: mongo.MongoClient;
public readonly db: PowerSyncMongo;
public readonly session: mongo.ClientSession;
private readonly sync_rules: SqlSyncRules;
private readonly sync_rules: HydratedSyncRules;

private readonly group_id: number;

Expand Down Expand Up @@ -474,8 +474,7 @@ export class MongoBucketBatch
if (sourceTable.syncData) {
const { results: evaluated, errors: syncErrors } = this.sync_rules.evaluateRowWithErrors({
record: after,
sourceTable,
bucketIdTransformer: SqlSyncRules.versionedBucketIdTransformer(`${this.group_id}`)
sourceTable
});

for (let error of syncErrors) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { SqlSyncRules } from '@powersync/service-sync-rules';
import { SqlSyncRules, HydratedSyncRules } from '@powersync/service-sync-rules';

import { storage } from '@powersync/service-core';
import { versionedHydrationState } from '@powersync/service-sync-rules/src/HydrationState.js';

export class MongoPersistedSyncRules implements storage.PersistedSyncRules {
public readonly slot_name: string;
Expand All @@ -13,4 +14,8 @@ export class MongoPersistedSyncRules implements storage.PersistedSyncRules {
) {
this.slot_name = slot_name ?? `powersync_${id}`;
}

hydratedSyncRules(): HydratedSyncRules {
return this.sync_rules.hydrate({ hydrationState: versionedHydrationState(this.id) });
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import {
BroadcastIterable,
CHECKPOINT_INVALIDATE_ALL,
CheckpointChanges,
CompactOptions,
deserializeParameterLookup,
GetCheckpointChangesOptions,
InternalOpId,
Expand All @@ -25,10 +24,11 @@ import {
WatchWriteCheckpointOptions
} from '@powersync/service-core';
import { JSONBig } from '@powersync/service-jsonbig';
import { ParameterLookup, SqliteJsonRow, SqlSyncRules } from '@powersync/service-sync-rules';
import { HydratedSyncRules, ScopedParameterLookup, SqliteJsonRow } from '@powersync/service-sync-rules';
import * as bson from 'bson';
import { LRUCache } from 'lru-cache';
import * as timers from 'timers/promises';
import { idPrefixFilter, mapOpEntry, readSingleBatch, setSessionSnapshotTime } from '../../utils/util.js';
import { MongoBucketStorage } from '../MongoBucketStorage.js';
import { PowerSyncMongo } from './db.js';
import { BucketDataDocument, BucketDataKey, BucketStateDocument, SourceKey, SourceTableDocument } from './models.js';
Expand All @@ -37,7 +37,6 @@ import { MongoChecksumOptions, MongoChecksums } from './MongoChecksums.js';
import { MongoCompactor } from './MongoCompactor.js';
import { MongoParameterCompactor } from './MongoParameterCompactor.js';
import { MongoWriteCheckpointAPI } from './MongoWriteCheckpointAPI.js';
import { idPrefixFilter, mapOpEntry, readSingleBatch, setSessionSnapshotTime } from '../../utils/util.js';

export interface MongoSyncBucketStorageOptions {
checksumOptions?: MongoChecksumOptions;
Expand All @@ -61,7 +60,7 @@ export class MongoSyncBucketStorage
private readonly db: PowerSyncMongo;
readonly checksums: MongoChecksums;

private parsedSyncRulesCache: { parsed: SqlSyncRules; options: storage.ParseSyncRulesOptions } | undefined;
private parsedSyncRulesCache: { parsed: HydratedSyncRules; options: storage.ParseSyncRulesOptions } | undefined;
private writeCheckpointAPI: MongoWriteCheckpointAPI;

constructor(
Expand Down Expand Up @@ -101,14 +100,14 @@ export class MongoSyncBucketStorage
});
}

getParsedSyncRules(options: storage.ParseSyncRulesOptions): SqlSyncRules {
getParsedSyncRules(options: storage.ParseSyncRulesOptions): HydratedSyncRules {
const { parsed, options: cachedOptions } = this.parsedSyncRulesCache ?? {};
/**
* Check if the cached sync rules, if present, had the same options.
* Parse sync rules if the options are different or if there is no cached value.
*/
if (!parsed || options.defaultSchema != cachedOptions?.defaultSchema) {
this.parsedSyncRulesCache = { parsed: this.sync_rules.parsed(options).sync_rules, options };
this.parsedSyncRulesCache = { parsed: this.sync_rules.parsed(options).hydratedSyncRules(), options };
}

return this.parsedSyncRulesCache!.parsed;
Expand Down Expand Up @@ -170,7 +169,7 @@ export class MongoSyncBucketStorage
await using batch = new MongoBucketBatch({
logger: options.logger,
db: this.db,
syncRules: this.sync_rules.parsed(options).sync_rules,
syncRules: this.sync_rules.parsed(options).hydratedSyncRules(),
groupId: this.group_id,
slotName: this.slot_name,
lastCheckpointLsn: checkpoint_lsn,
Expand Down Expand Up @@ -293,7 +292,10 @@ export class MongoSyncBucketStorage
return result!;
}

async getParameterSets(checkpoint: MongoReplicationCheckpoint, lookups: ParameterLookup[]): Promise<SqliteJsonRow[]> {
async getParameterSets(
checkpoint: MongoReplicationCheckpoint,
lookups: ScopedParameterLookup[]
): Promise<SqliteJsonRow[]> {
return this.db.client.withSession({ snapshot: true }, async (session) => {
// Set the session's snapshot time to the checkpoint's snapshot time.
// An alternative would be to create the session when the checkpoint is created, but managing
Expand Down Expand Up @@ -1025,7 +1027,7 @@ class MongoReplicationCheckpoint implements ReplicationCheckpoint {
public snapshotTime: mongo.Timestamp
) {}

async getParameterSets(lookups: ParameterLookup[]): Promise<SqliteJsonRow[]> {
async getParameterSets(lookups: ScopedParameterLookup[]): Promise<SqliteJsonRow[]> {
return this.storage.getParameterSets(this, lookups);
}
}
Expand All @@ -1034,7 +1036,7 @@ class EmptyReplicationCheckpoint implements ReplicationCheckpoint {
readonly checkpoint: InternalOpId = 0n;
readonly lsn: string | null = null;

async getParameterSets(lookups: ParameterLookup[]): Promise<SqliteJsonRow[]> {
async getParameterSets(lookups: ScopedParameterLookup[]): Promise<SqliteJsonRow[]> {
return [];
}
}
11 changes: 8 additions & 3 deletions modules/module-mongodb/src/replication/ChangeStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,20 @@ import {
ServiceError
} from '@powersync/lib-services-framework';
import {
InternalOpId,
MetricsEngine,
RelationCache,
SaveOperationTag,
SourceEntityDescriptor,
SourceTable,
storage
} from '@powersync/service-core';
import { DatabaseInputRow, SqliteInputRow, SqliteRow, SqlSyncRules, TablePattern } from '@powersync/service-sync-rules';
import {
DatabaseInputRow,
SqliteInputRow,
SqliteRow,
HydratedSyncRules,
TablePattern
} from '@powersync/service-sync-rules';
import { ReplicationMetric } from '@powersync/service-types';
import { MongoLSN } from '../common/MongoLSN.js';
import { PostImagesOption } from '../types/types.js';
Expand Down Expand Up @@ -75,7 +80,7 @@ export class ChangeStreamInvalidatedError extends DatabaseConnectionError {
}

export class ChangeStream {
sync_rules: SqlSyncRules;
sync_rules: HydratedSyncRules;
group_id: number;

connection_id = 1;
Expand Down
10 changes: 8 additions & 2 deletions modules/module-mssql/src/replication/CDCStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,13 @@ import {
} from '@powersync/lib-services-framework';
import { getUuidReplicaIdentityBson, MetricsEngine, SourceEntityDescriptor, storage } from '@powersync/service-core';

import { SqliteInputRow, SqliteRow, SqlSyncRules, TablePattern } from '@powersync/service-sync-rules';
import {
SqliteInputRow,
SqliteRow,
SqlSyncRules,
HydratedSyncRules,
TablePattern
} from '@powersync/service-sync-rules';

import { ReplicationMetric } from '@powersync/service-types';
import { BatchedSnapshotQuery, MSSQLSnapshotQuery, SimpleSnapshotQuery } from './MSSQLSnapshotQuery.js';
Expand Down Expand Up @@ -82,7 +88,7 @@ export class CDCDataExpiredError extends DatabaseConnectionError {
}

export class CDCStream {
private readonly syncRules: SqlSyncRules;
private readonly syncRules: HydratedSyncRules;
private readonly storage: storage.SyncRulesBucketStorage;
private readonly connections: MSSQLConnectionManager;
private readonly abortSignal: AbortSignal;
Expand Down
2 changes: 1 addition & 1 deletion modules/module-mysql/src/replication/BinLogStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ function createTableId(schema: string, tableName: string): string {
}

export class BinLogStream {
private readonly syncRules: sync_rules.SqlSyncRules;
private readonly syncRules: sync_rules.HydratedSyncRules;
private readonly groupId: number;

private readonly storage: storage.SyncRulesBucketStorage;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import {
BucketChecksum,
CHECKPOINT_INVALIDATE_ALL,
CheckpointChanges,
CompactOptions,
GetCheckpointChangesOptions,
InternalOpId,
internalToExternalOpId,
Expand Down Expand Up @@ -60,7 +59,9 @@ export class PostgresSyncRulesStorage
protected writeCheckpointAPI: PostgresWriteCheckpointAPI;

// TODO we might be able to share this in an abstract class
private parsedSyncRulesCache: { parsed: sync_rules.SqlSyncRules; options: storage.ParseSyncRulesOptions } | undefined;
private parsedSyncRulesCache:
| { parsed: sync_rules.HydratedSyncRules; options: storage.ParseSyncRulesOptions }
| undefined;
private _checksumCache: storage.ChecksumCache | undefined;

constructor(protected options: PostgresSyncRulesStorageOptions) {
Expand Down Expand Up @@ -96,14 +97,14 @@ export class PostgresSyncRulesStorage
}

// TODO we might be able to share this in an abstract class
getParsedSyncRules(options: storage.ParseSyncRulesOptions): sync_rules.SqlSyncRules {
getParsedSyncRules(options: storage.ParseSyncRulesOptions): sync_rules.HydratedSyncRules {
const { parsed, options: cachedOptions } = this.parsedSyncRulesCache ?? {};
/**
* Check if the cached sync rules, if present, had the same options.
* Parse sync rules if the options are different or if there is no cached value.
*/
if (!parsed || options.defaultSchema != cachedOptions?.defaultSchema) {
this.parsedSyncRulesCache = { parsed: this.sync_rules.parsed(options).sync_rules, options };
this.parsedSyncRulesCache = { parsed: this.sync_rules.parsed(options).hydratedSyncRules(), options };
}

return this.parsedSyncRulesCache!.parsed;
Expand Down Expand Up @@ -349,7 +350,7 @@ export class PostgresSyncRulesStorage
const batch = new PostgresBucketBatch({
logger: options.logger ?? framework.logger,
db: this.db,
sync_rules: this.sync_rules.parsed(options).sync_rules,
sync_rules: this.sync_rules.parsed(options).hydratedSyncRules(),
group_id: this.group_id,
slot_name: this.slot_name,
last_checkpoint_lsn: checkpoint_lsn,
Expand All @@ -374,7 +375,7 @@ export class PostgresSyncRulesStorage

async getParameterSets(
checkpoint: ReplicationCheckpoint,
lookups: sync_rules.ParameterLookup[]
lookups: sync_rules.ScopedParameterLookup[]
): Promise<sync_rules.SqliteJsonRow[]> {
const rows = await this.db.sql`
SELECT DISTINCT
Expand Down Expand Up @@ -879,7 +880,7 @@ class PostgresReplicationCheckpoint implements storage.ReplicationCheckpoint {
public readonly lsn: string | null
) {}

getParameterSets(lookups: sync_rules.ParameterLookup[]): Promise<sync_rules.SqliteJsonRow[]> {
getParameterSets(lookups: sync_rules.ScopedParameterLookup[]): Promise<sync_rules.SqliteJsonRow[]> {
return this.storage.getParameterSets(this, lookups);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import { PostgresPersistedBatch } from './PostgresPersistedBatch.js';
export interface PostgresBucketBatchOptions {
logger: Logger;
db: lib_postgres.DatabaseClient;
sync_rules: sync_rules.SqlSyncRules;
sync_rules: sync_rules.HydratedSyncRules;
group_id: number;
slot_name: string;
last_checkpoint_lsn: string | null;
Expand Down Expand Up @@ -72,7 +72,7 @@ export class PostgresBucketBatch
protected persisted_op: InternalOpId | null;

protected write_checkpoint_batch: storage.CustomWriteCheckpointOptions[];
protected readonly sync_rules: sync_rules.SqlSyncRules;
protected readonly sync_rules: sync_rules.HydratedSyncRules;
protected batch: OperationBatch | null;
private lastWaitingLogThrottled = 0;
private markRecordUnavailable: BucketStorageMarkRecordUnavailable | undefined;
Expand Down Expand Up @@ -840,8 +840,7 @@ export class PostgresBucketBatch
if (sourceTable.syncData) {
const { results: evaluated, errors: syncErrors } = this.sync_rules.evaluateRowWithErrors({
record: after,
sourceTable,
bucketIdTransformer: sync_rules.SqlSyncRules.versionedBucketIdTransformer(`${this.group_id}`)
sourceTable
});

for (const error of syncErrors) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { storage } from '@powersync/service-core';
import { SqlSyncRules } from '@powersync/service-sync-rules';

import { models } from '../../types/types.js';
import { versionedHydrationState } from '@powersync/service-sync-rules/src/HydrationState.js';

export class PostgresPersistedSyncRulesContent implements storage.PersistedSyncRulesContent {
public readonly slot_name: string;
Expand Down Expand Up @@ -35,7 +36,12 @@ export class PostgresPersistedSyncRulesContent implements storage.PersistedSyncR
return {
id: this.id,
slot_name: this.slot_name,
sync_rules: SqlSyncRules.fromYaml(this.sync_rules_content, options)
sync_rules: SqlSyncRules.fromYaml(this.sync_rules_content, options),
hydratedSyncRules() {
return this.sync_rules.hydrate({
hydrationState: versionedHydrationState(this.id)
});
}
};
}

Expand Down
3 changes: 2 additions & 1 deletion modules/module-postgres/src/replication/WalStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import {
SqliteRow,
SqliteValue,
SqlSyncRules,
HydratedSyncRules,
TablePattern,
ToastableSqliteRow,
toSyncRulesRow,
Expand Down Expand Up @@ -111,7 +112,7 @@ export class MissingReplicationSlotError extends Error {
}

export class WalStream {
sync_rules: SqlSyncRules;
sync_rules: HydratedSyncRules;
group_id: number;

connection_id = 1;
Expand Down
Loading