Skip to content

Conversation

@rkistner
Copy link
Contributor

@rkistner rkistner commented Dec 9, 2025

This is an internal factoring in the sync-rules package, and should have no visible impact on the service on its own. This makes no change to storage format or functionality, but does bring us closer to being able to implement Incremental reprocessing.

This changes the sync rule structure to split:

  1. The old SqlSyncRules class is the sync rules "definition", as parsed from the sync rules yaml file.
  2. A new HydratedSyncRules class is used to represent the combination of the parsed sync rules, with any relevant state in bucket storage. In a way, this is similar to how VersionedSyncRules was used before. This now uses HydrationState as a more flexible representation of the old BucketIdTransformer.

Right now, the only relevant state is the sync rules "version", used in bucket names if versioned_bucket_ids is enabled. This avoids the need to pass in the bucketIdTransformer in specific functions, instead keeping it as state in HydratedSyncRules.

The second big change is to split out some functionality from BucketSource:

  1. BucketDataSource: Represents data queries, more specifically the transform of replicated data -> bucket data.
  2. ParameterIndexLookupCreator: Represents table index lookups for parameter queries, more specifically the transform of replicated data -> parameter lookup data.

The BucketSource itself still exists, handling stream subscriptions and queries.

Now, along with the split, the BucketDataSources and ParameterIndexLookupCreators are also pulled up into a flat structure in the top-level. What this means is that when a new version of sync rules is deployed, each of the BucketDataSource and ParameterIndexLookupCreator instances can be compared with the ones in the old version of sync rules. If there is overlap, we can re-use the existing data, instead of re-replicating that from scratch. That is the core of the Incremental reprocessing project.

Some specifics in how data is persisted and linked for each:

  1. BucketDataSource represents the combination of data queries in a bucket definition. If any of those queries changes, we need a new set of buckets. Each unique BucketDataSource needs a unique bucket name prefix (the part before the bucket parameters).
  2. ParameterIndexLookupCreator represents a single parameter query that references a source table.
    a. Each unique ParameterIndexLookupCreator needs an unique identifier in the bucket_parameters lookup table, but it does not have to correlate with the bucket name.
    b. ParameterIndexLookupCreator does not need any reference to the bucket name prefix. The same ParameterIndexLookupCreator can be re-used in different bucket definitions in different sync rules versions, and even in different bucket definitions in the same sync rules version.
    c. In theory, if the query is the same, the same ParameterIndexLookupCreator can be used across multiple bucket definitions.
  3. BucketParameterQuerierSource (internal concept) does not have any persisted data.
    a. At runtime, the hydrated BucketParameterQuerierSource needs a reference to the ParameterIndexLookupCreator(s) to evaluate table lookups (if relevant). In theory, this can be different in different sync rules version.
    b. At runtime, the hydrated BucketParameterQuerierSource needs a reference to the bucket name prefixes for the related BucketDataSource. This can be different in different sync rules version that use the same BucketParameterQuerierSource.

Note that even though we can now split out a sync rules definition into these different parts, the plan is not to persist the definitions for these parts separately. We would however store mappings of definition -> bucket name prefix or parameter lookup source name, so that these could be re-used across sync rules versions.

Further work required in this PR:

  • Split out the implementations for sync streams.
  • Check that the new generated bucketPrefix is used everywhere
  • Use HydrationState for parameter lookup values (name and queryId).
  • Implement logic for comparing source entities, and for merging entities from multiple sync rules into one. - Later PR
  • Create a diagram to indicate the relationships between the different source entities.

Bucket definitions

erDiagram
    BucketSource ||--|| BucketDefinitionDataSource : dataSource
    BucketSource ||--o{ SqlParameterQuery : parameterLookupSources
    BucketSource ||--o{ SqlParameterQuery : parameterQuerierSources
    BucketSource ||--o{ StaticSqlParameterQuery : parameterQuerierSources
    SqlParameterQuery }o--|| BucketDefinitionDataSource : querierDataSource
    StaticSqlParameterQuery }o--|| BucketDefinitionDataSource : querierDataSource

    BucketDefinitionDataSource ||--o{ SqlDataQuery : contains
    BucketSource["SqlBucketDescriptor (BucketSource)"] {
        string name
        SYNC_RULE type
        true subscribedToByDefault
    }
    BucketDefinitionDataSource["BucketDefinitionDataSource (BucketDefinitionDataSource)"] {
        string unqiueName "BucketSource.name"
        string[] bucketParameters "debug use only"
    }
    SqlDataQuery {
        TablePattern table
        string sql
    }
    SqlParameterQuery["SqlParameterQuery (ParameterIndexLookupCreator)"] {
        TablePattern sourceTable
        string sql
        ParameterLookupScope defaultLookupScope
    }
    StaticSqlParameterQuery["StaticSqlParameterQuery"] {
        string sql
    }
Loading

Sync streams

Each sync stream can have multiple variants, where each variant has a specific bucket parameter "shape" (more details here that I'm ignoring for now).
Each variant can have multiple subqueries, which each references a single source table.
A variant can also use request parameters directly.
The ParameterIndexLookupCreator is tied to a specific subquery, so that it's one per source table. In code there may be multiple per subquery, but in reality it would only be one per subquery at the moment.
The BucketDataSource is tied to a variant - there is effectively one set of buckets for each variant.

The diagram here is slightly simplified from reality.

erDiagram
    BucketSource ||--o{ StreamVariant : variants
    StreamVariant ||--|| SyncStreamDataSource : dataSources
    StreamVariant ||--|| SyncStreamParameterQuerierSource : parameterQuerierSources
    
    SyncStreamParameterQuerierSource }o--|| SyncStreamDataSource : querierDataSource
    StreamVariant ||--o{ SubqueryEvaluator : subqueries
    SubqueryEvaluator ||--o{ SubqueryParameterLookupSource : lookupSources
    SyncStreamParameterQuerierSource ||--o{ SubqueryParameterLookupSource : "implicit lookup relationship via subqueries"

    BucketSource["SyncStream (BucketSource)"] {
        string name
        SYNC_STREAM type
        boolean subscribedToByDefault
    }
    SyncStreamDataSource["SyncStreamDataSource (BucketDataSource)"] {
        TablePattern table
        string uniqueName "BucketSource.name|variant.id"
    }
    StreamVariant {

    }
    SubqueryEvaluator {
        TablePattern table
    }
    SyncStreamParameterQuerierSource["createParameterQuerierSource (internal)"] {

    }
    SubqueryParameterLookupSource["SubqueryParameterLookupSource"] {
        TablePattern parameterTable
        ParameterLookupScope defaultLookupScope
    }
Loading

@changeset-bot
Copy link

changeset-bot bot commented Dec 9, 2025

🦋 Changeset detected

Latest commit: d130411

The changes in this PR will be included in the next version bump.

This PR includes changesets to release 18 packages
Name Type
@powersync/service-module-postgres-storage Minor
@powersync/service-module-mongodb-storage Minor
@powersync/service-core-tests Minor
@powersync/service-module-postgres Minor
@powersync/service-module-mongodb Minor
@powersync/service-core Minor
@powersync/service-module-mssql Minor
@powersync/service-module-mysql Minor
@powersync/service-sync-rules Minor
@powersync/service-schema Minor
@powersync/service-image Minor
@powersync/service-module-core Patch
test-client Patch
@powersync/service-jpgwire Patch
@powersync/lib-services-framework Patch
@powersync/lib-service-postgres Patch
@powersync/service-rsocket-router Patch
@powersync/lib-service-mongodb Patch

Not sure what this means? Click here to learn what changesets are.

Click here if you're a maintainer who wants to add another changeset to this PR

@rkistner rkistner requested a review from simolus3 December 10, 2025 20:09
@rkistner rkistner changed the title [WIP] Granular sync rules Granular sync rule parsing Dec 11, 2025
@rkistner rkistner marked this pull request as ready for review December 11, 2025 08:46
@rkistner
Copy link
Contributor Author

@simolus3 It would be good if you could specifically check the changes to sync streams

Copy link
Contributor

@simolus3 simolus3 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'm starting to understand the concept of this, and I think this is a helpful refactoring. I'll start with some very high level questions and comments, and I'll take a look at the sync streams port of this afterwards.

First, a conceptual question: As discussed offline, the idea is that BucketDataSourceDefinition and BucketParameterLookupSourceDefinitions are hoisted into a top-level structure which allows diffing them easily. Assuming that we can actually diff them, am I right to assume that the overal flow once this feature is complete would be something like this:

  1. State A: We have two streams (say SELECT * FROM a and SELECT * FROM b WHERE foo).
    1. In the hydration state, getBucketSourceState(a) == 1#a and getBucketSourceState(b) == 1#b.
  2. State B: We have two streams (say SELECT * FROM a and SELECT * FROM b WHERE NOT foo).
    1. We realize that the BucketDataSourceDefinition for stream a is identical, and doesn't need reprocessing.
    2. This is reflected by getBucketSourceState(a) == 1#a while getBucketSourceState(b) == 2#b.
    3. Because the querier looks up this prefix from the hydrated state, it finds the right buckets for both streams.

Is that correct?

To scaffold an API of diffing these definitions (even though a proper implementation will be another hard problem), I wonder if we should start with a trivial diff that always considers two definitions to be incompatible if they're not the same JS object. Maybe the API for that would look like this?

interface BucketDataSourceDefinition {
  // Whether this definition takes the same input rows and is guaranteed to generate the
  // exact same evaluated row for an input row as the other definition
  isIdenticalTo(other: BucketDataSourceDefinition): boolean {
    // our only implementation for now:
    return other === this;
  }

  // or maybe we instead want to expose compatibility like this?
  identityHashCode(): string {
    // our onlny implementation for now: a random uuid generated on instance creation
  }

  // ...
}

Having this in place would be a reminder that we need to eventually implement this. Since it's partial equality, it would also allow us to do it piece-by-piece: Maybe we eventually add an implementation comparing input texts and later move to a more accurate semantic AST diff.
Either way, fixing the interface that determines this is a necessary first step, and it would let us think about how to react to this (maybe SqlSyncRules can already remove duplicates on creation).

*
* This is only relevant for parameter queries that query tables.
*/
export interface BucketParameterLookupSourceDefinition {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Maybe it's just me, but these names don't sound particularly descriptive or helpful to me (and I keep getting BucketParameterLookupSourceDefinition and BucketParameterQuerierSourceDefinition mixed up).

Since sync streams already make the definition of parameters a bit blurry, I wonder if other terms could be more intuitive here:

  • For BucketParameterQuerierSource, maybe something like ConnectionBucketsQuerier or ConnectionBucketsResolver makes it clearer what this is actually doing.
  • For BucketParameterLookupSource, maybe something like IndexLookupCreator since that's really what parameter lookups are? Then if we need different kind of indexes in the future (say a range index for IVM with order by), we could potentially add those more easily in the future by introducing super interfaces.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'm not a fan of the names either. Will look at these options.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went with ParameterIndexLookupCreator here now. I still want to keep the "parameter" part: Even though "parameter query" not a user-facing concept in sync streams, the name is still used in the implementation in many places, and the concept still translates to sync stream subqueries.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the concept of BucketParameterQuerierSourceDefinition now. These don't persist any data, and we never need to evaluate them individually, so it made more sense to have the functionality on HydratedBucketSource.

I kept the implementation for these queriers mostly the same, but the structure is an implementation detail now, rather than a public API.

/**
* Given a bucket data source definition, get the bucket prefix to use for it.
*/
getBucketSourceState(source: BucketDataSourceDefinition): T;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See this comment which would potentially split this into a function for creating evaluated rows and a function for looking up bucket names (if we want to be fancy, only giving each step access to the function it needs).

Following that, I wonder if we should add a utility like lookupBucketName(source: BucketDataSourceDefinition, parameters: SqliteJsonValue[]) that queriers would use instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I refactored the BucketDataSource part now (see my other comments), but kept this the same aside from some renaming. For queriers, I specifically want to keep it a multipart process for BucketDataSource -> BucketDataScope -> bucketName, since that first part of computing the scope could be more expensive, and we may need to compute the list of referenced scopes upfront.

* The returned {@link ParameterLookup} can be referenced by {@link pushBucketParameterQueriers} to allow the storage
* system to find buckets.
*/
evaluateParameterRow(sourceTable: SourceTableInterface, row: SqliteRow): EvaluatedParametersResult[];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I'm also wondering if passing a consumer accepting the results might be easier. That would create EvaluatedParameters by applying the one and only scope accessible to this lookup source, making it impossible to get this wrong.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I implemented the same refactoring as for BucketDataSource here:

  1. Merged BucketParameterQuerierSourceDefinition + BucketParameterQuerierSource into a single ParameterIndexLookupCreator - no hydration required anymore.
  2. The ParameterIndexLookupCreator now returns results without a scope. The HydratedSyncRules then adds the scope.

@rkistner
Copy link
Contributor Author

I think I'm starting to understand the concept of this, and I think this is a helpful refactoring. I'll start with some very high level questions and comments, and I'll take a look at the sync streams port of this afterwards.

First, a conceptual question: As discussed offline, the idea is that BucketDataSourceDefinition and BucketParameterLookupSourceDefinitions are hoisted into a top-level structure which allows diffing them easily. Assuming that we can actually diff them, am I right to assume that the overal flow once this feature is complete would be something like this:

  1. State A: We have two streams (say SELECT * FROM a and SELECT * FROM b WHERE foo).

    1. In the hydration state, getBucketSourceState(a) == 1#a and getBucketSourceState(b) == 1#b.
  2. State B: We have two streams (say SELECT * FROM a and SELECT * FROM b WHERE NOT foo).

    1. We realize that the BucketDataSourceDefinition for stream a is identical, and doesn't need reprocessing.
    2. This is reflected by getBucketSourceState(a) == 1#a while getBucketSourceState(b) == 2#b.
    3. Because the querier looks up this prefix from the hydrated state, it finds the right buckets for both streams.

Is that correct?

Yes, that's correct. But it's about more than just diffing: It's about knowing when to re-use the existing persisted data for a source versus removing it, and being able to combine multiple SyncRules into one replication stream, without duplicating work when there is overlap.

To scaffold an API of diffing these definitions (even though a proper implementation will be another hard problem), I wonder if we should start with a trivial diff that always considers two definitions to be incompatible if they're not the same JS object. Maybe the API for that would look like this?

interface BucketDataSourceDefinition {
  // Whether this definition takes the same input rows and is guaranteed to generate the
  // exact same evaluated row for an input row as the other definition
  isIdenticalTo(other: BucketDataSourceDefinition): boolean {
    // our only implementation for now:
    return other === this;
  }

  // or maybe we instead want to expose compatibility like this?
  identityHashCode(): string {
    // our onlny implementation for now: a random uuid generated on instance creation
  }

  // ...
}

Having this in place would be a reminder that we need to eventually implement this. Since it's partial equality, it would also allow us to do it piece-by-piece: Maybe we eventually add an implementation comparing input texts and later move to a more accurate semantic AST diff. Either way, fixing the interface that determines this is a necessary first step, and it would let us think about how to react to this (maybe SqlSyncRules can already remove duplicates on creation).

I don't think we gain much by having the scaffolding if the definitions will never be considered equal. I also think we can get by with a pretty simple equality check to get started with: Check if the source SQL matches. It isn't quite optimal: a change to which request parameter is used in a query should not need to affect the storage at all, but we can implement more granular checks for that later. Just comparing the SQL should already help to cover changes where you're only adding one new steam, for example.

Either way, there is still a lot of work to do on the storage and replication layer before we can actually do incremental reprocessing. This PR was already getting big, so I thought it's better to delay that implementation for the next PR.

@rkistner rkistner force-pushed the granular-sync-rules branch from dab9c47 to 1679f4a Compare December 12, 2025 11:31
@rkistner rkistner requested a review from simolus3 December 12, 2025 14:11
Copy link
Contributor

@simolus3 simolus3 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sync-rules refactoring and integration into the service looks great to me, I only have a few minor comments.

Comment on lines +20 to +23
export interface HydrationState<
T extends BucketDataScope = BucketDataScope,
U extends ParameterLookupScope = ParameterLookupScope
> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reason for these being generic? If we ever need to refer to an implementation returning specific subtypes, I assume we could introduce a subinterface returning those?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea was that the storage layer can attach additional data. But that requires a lot more work, and I'm not even sure generics here would actually help, so I'll remove it for now.

Comment on lines +35 to +39
// TODO: Is there a better design here?
// This is used for parameter _queries_. But the queries need to know which lookup scopes to
// use, and each querier may use multiple lookup sources, each with their own scope.
// This implementation here does "hydration" on each subquery, which gives us hydrated function call.
// Should this maybe be part of a higher-level class instead of just a function, i.e. a hydrated subquery?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good point, I'll revisit this for the planned sync stream improvements.

import { StaticSqlParameterQuery } from '../../src/StaticSqlParameterQuery.js';
import { BASIC_SCHEMA, EMPTY_DATA_SOURCE, normalizeTokenParameters, PARSE_OPTIONS } from './util.js';

describe('parameter queries', () => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe also add a test similar to variants with custom hydrationState in streams.test.ts here and in data_queries.test.ts?

Or at least add some e2e tests in sync_rules.test.ts to ensure sync rules also work correctly under custom bucket source and parameter index lookup scopes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants