Skip to content

Commit 604bc6a

Browse files
committed
Split out sync rule definitions from compiled ones.
1 parent ca716a7 commit 604bc6a

File tree

9 files changed

+434
-333
lines changed

9 files changed

+434
-333
lines changed

packages/service-core/src/routes/endpoints/sync-rules.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ async function debugSyncRules(apiHandler: RouteAPI, sync_rules: string) {
202202

203203
return {
204204
valid: true,
205-
bucket_definitions: rules.bucketSources.map((source) => source.debugRepresentation()),
205+
bucket_definitions: rules.debugRepresentation(),
206206
source_tables: resolved_tables,
207207
data_tables: rules.debugGetOutputTables()
208208
};

packages/sync-rules/src/BucketParameterQuerier.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { BucketDescription, ResolvedBucket } from './BucketDescription.js';
1+
import { ResolvedBucket } from './BucketDescription.js';
22
import { RequestedStream } from './SqlSyncRules.js';
33
import { RequestParameters, SqliteJsonRow, SqliteJsonValue } from './types.js';
44
import { normalizeParameterValue } from './utils.js';

packages/sync-rules/src/BucketSource.ts

Lines changed: 63 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -3,41 +3,32 @@ import { ColumnDefinition } from './ExpressionType.js';
33
import { SourceTableInterface } from './SourceTableInterface.js';
44
import { GetQuerierOptions } from './SqlSyncRules.js';
55
import { TablePattern } from './TablePattern.js';
6-
import { EvaluatedParametersResult, EvaluateRowOptions, EvaluationResult, SourceSchema, SqliteRow } from './types.js';
6+
import {
7+
BucketIdTransformer,
8+
EvaluatedParametersResult,
9+
EvaluateRowOptions,
10+
EvaluationResult,
11+
SourceSchema,
12+
SqliteRow
13+
} from './types.js';
14+
15+
export interface CreateSourceParams {
16+
bucketIdTransformer: BucketIdTransformer;
17+
}
718

819
/**
9-
* An interface declaring
10-
*
11-
* - which buckets the sync service should create when processing change streams from the database.
12-
* - how data in source tables maps to data in buckets (e.g. when we're not selecting all columns).
13-
* - which buckets a given connection has access to.
14-
*
15-
* There are two ways to define bucket sources: Via sync rules made up of parameter and data queries, and via stream
16-
* definitions that only consist of a single query.
20+
* Encodes a static definition of a bucket source, as parsed from sync rules or stream definitions.
1721
*/
18-
export interface BucketDataSource {
22+
export interface BucketDataSourceDefinition {
1923
readonly name: string;
2024
readonly type: BucketSourceType;
21-
2225
readonly subscribedToByDefault: boolean;
23-
24-
/**
25-
* Given a row as it appears in a table that affects sync data, return buckets, logical table names and transformed
26-
* data for rows to add to buckets.
27-
*/
28-
evaluateRow(options: EvaluateRowOptions): EvaluationResult[];
29-
3026
/**
31-
* Reports {@link BucketParameterQuerier}s resolving buckets that a specific stream request should have access to.
32-
*
33-
* @param result The target array to insert queriers and errors into.
34-
* @param options Options, including parameters that may affect the buckets loaded by this source.
27+
* For debug use only.
3528
*/
36-
pushBucketParameterQueriers(result: PendingQueriers, options: GetQuerierOptions): void;
37-
29+
readonly bucketParameters: string[];
3830
getSourceTables(): Set<TablePattern>;
39-
40-
/** Whether the table possibly affects the contents of buckets resolved by this source. */
31+
createDataSource(params: CreateSourceParams): BucketDataSource;
4132
tableSyncsData(table: SourceTableInterface): boolean;
4233

4334
/**
@@ -53,6 +44,30 @@ export interface BucketDataSource {
5344
debugRepresentation(): any;
5445
}
5546

47+
export interface BucketParameterSourceDefinition {
48+
readonly name: string;
49+
readonly type: BucketSourceType;
50+
readonly subscribedToByDefault: boolean;
51+
52+
getSourceTables(): Set<TablePattern>;
53+
createParameterSource(params: CreateSourceParams): BucketParameterSource;
54+
55+
/**
56+
* Whether {@link pushBucketParameterQueriers} may include a querier where
57+
* {@link BucketParameterQuerier.hasDynamicBuckets} is true.
58+
*
59+
* This is mostly used for testing.
60+
*/
61+
hasDynamicBucketQueries(): boolean;
62+
63+
getSourceTables(): Set<TablePattern>;
64+
65+
/** Whether the table possibly affects the buckets resolved by this source. */
66+
tableSyncsParameters(table: SourceTableInterface): boolean;
67+
68+
debugRepresentation(): any;
69+
}
70+
5671
/**
5772
* An interface declaring
5873
*
@@ -63,12 +78,28 @@ export interface BucketDataSource {
6378
* There are two ways to define bucket sources: Via sync rules made up of parameter and data queries, and via stream
6479
* definitions that only consist of a single query.
6580
*/
66-
export interface BucketParameterSource {
67-
readonly name: string;
68-
readonly type: BucketSourceType;
81+
export interface BucketDataSource {
82+
readonly definition: BucketDataSourceDefinition;
6983

70-
readonly subscribedToByDefault: boolean;
84+
/**
85+
* Given a row as it appears in a table that affects sync data, return buckets, logical table names and transformed
86+
* data for rows to add to buckets.
87+
*/
88+
evaluateRow(options: EvaluateRowOptions): EvaluationResult[];
89+
}
7190

91+
/**
92+
* An interface declaring
93+
*
94+
* - which buckets the sync service should create when processing change streams from the database.
95+
* - how data in source tables maps to data in buckets (e.g. when we're not selecting all columns).
96+
* - which buckets a given connection has access to.
97+
*
98+
* There are two ways to define bucket sources: Via sync rules made up of parameter and data queries, and via stream
99+
* definitions that only consist of a single query.
100+
*/
101+
export interface BucketParameterSource {
102+
readonly definition: BucketParameterSourceDefinition;
72103
/**
73104
* Given a row in a source table that affects sync parameters, returns a structure to index which buckets rows should
74105
* be associated with.
@@ -87,21 +118,9 @@ export interface BucketParameterSource {
87118
pushBucketParameterQueriers(result: PendingQueriers, options: GetQuerierOptions): void;
88119

89120
/**
90-
* Whether {@link pushBucketParameterQueriers} may include a querier where
91-
* {@link BucketParameterQuerier.hasDynamicBuckets} is true.
92-
*
93-
* This is mostly used for testing.
121+
* @deprecated Use `pushBucketParameterQueriers` instead and merge at the top-level.
94122
*/
95-
hasDynamicBucketQueries(): boolean;
96-
97-
getSourceTables(): Set<TablePattern>;
98-
99-
/** Whether the table possibly affects the buckets resolved by this source. */
100-
tableSyncsParameters(table: SourceTableInterface): boolean;
101-
102-
debugWriteOutputTables(result: Record<string, { query: string }[]>): void;
103-
104-
debugRepresentation(): any;
123+
getBucketParameterQuerier(options: GetQuerierOptions): BucketParameterQuerier;
105124
}
106125

107126
export enum BucketSourceType {

packages/sync-rules/src/SqlBucketDescriptor.ts

Lines changed: 82 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,14 @@
11
import { BucketInclusionReason, ResolvedBucket } from './BucketDescription.js';
22
import { BucketParameterQuerier, mergeBucketParameterQueriers, PendingQueriers } from './BucketParameterQuerier.js';
3-
import { BucketDataSource, BucketParameterSource, BucketSourceType, ResultSetDescription } from './BucketSource.js';
3+
import {
4+
BucketDataSource,
5+
BucketDataSourceDefinition,
6+
BucketParameterSource,
7+
BucketParameterSourceDefinition,
8+
BucketSourceType,
9+
CreateSourceParams,
10+
ResultSetDescription
11+
} from './BucketSource.js';
412
import { ColumnDefinition } from './ExpressionType.js';
513
import { IdSequence } from './IdSequence.js';
614
import { SourceTableInterface } from './SourceTableInterface.js';
@@ -32,9 +40,9 @@ export interface QueryParseResult {
3240
errors: SqlRuleError[];
3341
}
3442

35-
export class SqlBucketDescriptor implements BucketDataSource, BucketParameterSource {
43+
export class SqlBucketDescriptor implements BucketDataSourceDefinition, BucketParameterSourceDefinition {
3644
name: string;
37-
bucketParameters?: string[];
45+
private bucketParametersInternal: string[] | null = null;
3846

3947
constructor(name: string) {
4048
this.name = name;
@@ -48,6 +56,10 @@ export class SqlBucketDescriptor implements BucketDataSource, BucketParameterSou
4856
return true;
4957
}
5058

59+
public get bucketParameters(): string[] {
60+
return this.bucketParametersInternal ?? [];
61+
}
62+
5163
/**
5264
* source table -> queries
5365
*/
@@ -58,10 +70,10 @@ export class SqlBucketDescriptor implements BucketDataSource, BucketParameterSou
5870
parameterIdSequence = new IdSequence();
5971

6072
addDataQuery(sql: string, options: SyncRulesOptions, compatibility: CompatibilityContext): QueryParseResult {
61-
if (this.bucketParameters == null) {
73+
if (this.bucketParametersInternal == null) {
6274
throw new Error('Bucket parameters must be defined');
6375
}
64-
const dataRows = SqlDataQuery.fromSql(this.name, this.bucketParameters, sql, options, compatibility);
76+
const dataRows = SqlDataQuery.fromSql(this.name, this.bucketParametersInternal, sql, options, compatibility);
6577

6678
this.dataQueries.push(dataRows);
6779

@@ -73,11 +85,12 @@ export class SqlBucketDescriptor implements BucketDataSource, BucketParameterSou
7385

7486
addParameterQuery(sql: string, options: QueryParseOptions): QueryParseResult {
7587
const parameterQuery = SqlParameterQuery.fromSql(this.name, sql, options, this.parameterIdSequence.nextId());
76-
if (this.bucketParameters == null) {
77-
this.bucketParameters = parameterQuery.bucketParameters;
88+
if (this.bucketParametersInternal == null) {
89+
this.bucketParametersInternal = parameterQuery.bucketParameters;
7890
} else {
7991
if (
80-
new Set([...parameterQuery.bucketParameters!, ...this.bucketParameters]).size != this.bucketParameters.length
92+
new Set([...parameterQuery.bucketParameters!, ...this.bucketParametersInternal]).size !=
93+
this.bucketParametersInternal.length
8194
) {
8295
throw new Error('Bucket parameters must match for each parameter query within a bucket');
8396
}
@@ -94,61 +107,71 @@ export class SqlBucketDescriptor implements BucketDataSource, BucketParameterSou
94107
};
95108
}
96109

97-
evaluateRow(options: EvaluateRowOptions): EvaluationResult[] {
98-
let results: EvaluationResult[] = [];
99-
for (let query of this.dataQueries) {
100-
if (!query.applies(options.sourceTable)) {
101-
continue;
110+
createDataSource(params: CreateSourceParams): BucketDataSource {
111+
return {
112+
definition: this,
113+
evaluateRow: (options) => {
114+
let results: EvaluationResult[] = [];
115+
for (let query of this.dataQueries) {
116+
if (!query.applies(options.sourceTable)) {
117+
continue;
118+
}
119+
120+
results.push(...query.evaluateRow(options.sourceTable, options.record, params.bucketIdTransformer));
121+
}
122+
return results;
102123
}
103-
104-
results.push(...query.evaluateRow(options.sourceTable, options.record, options.bucketIdTransformer));
105-
}
106-
return results;
124+
};
107125
}
108126

109-
evaluateParameterRow(sourceTable: SourceTableInterface, row: SqliteRow): EvaluatedParametersResult[] {
110-
let results: EvaluatedParametersResult[] = [];
111-
for (let query of this.parameterQueries) {
112-
if (query.applies(sourceTable)) {
113-
results.push(...query.evaluateParameterRow(row));
127+
createParameterSource(params: CreateSourceParams): BucketParameterSource {
128+
return {
129+
definition: this,
130+
131+
evaluateParameterRow: (sourceTable: SourceTableInterface, row: SqliteRow): EvaluatedParametersResult[] => {
132+
let results: EvaluatedParametersResult[] = [];
133+
for (let query of this.parameterQueries) {
134+
if (query.applies(sourceTable)) {
135+
results.push(...query.evaluateParameterRow(row));
136+
}
137+
}
138+
return results;
139+
},
140+
pushBucketParameterQueriers: (result: PendingQueriers, options: GetQuerierOptions) => {
141+
const reasons = [this.bucketInclusionReason()];
142+
const staticBuckets = this.getStaticBucketDescriptions(
143+
options.globalParameters,
144+
reasons,
145+
params.bucketIdTransformer
146+
);
147+
const staticQuerier = {
148+
staticBuckets,
149+
hasDynamicBuckets: false,
150+
parameterQueryLookups: [],
151+
queryDynamicBucketDescriptions: async () => []
152+
} satisfies BucketParameterQuerier;
153+
result.queriers.push(staticQuerier);
154+
155+
if (this.parameterQueries.length == 0) {
156+
return;
157+
}
158+
159+
const dynamicQueriers = this.parameterQueries.map((query) =>
160+
query.getBucketParameterQuerier(options.globalParameters, reasons, params.bucketIdTransformer)
161+
);
162+
result.queriers.push(...dynamicQueriers);
163+
},
164+
165+
/**
166+
* @deprecated Use `pushBucketParameterQueriers` instead and merge at the top-level.
167+
*/
168+
getBucketParameterQuerier(options: GetQuerierOptions): BucketParameterQuerier {
169+
const queriers: BucketParameterQuerier[] = [];
170+
this.pushBucketParameterQueriers({ queriers, errors: [] }, options);
171+
172+
return mergeBucketParameterQueriers(queriers);
114173
}
115-
}
116-
return results;
117-
}
118-
119-
/**
120-
* @deprecated Use `pushBucketParameterQueriers` instead and merge at the top-level.
121-
*/
122-
getBucketParameterQuerier(options: GetQuerierOptions): BucketParameterQuerier {
123-
const queriers: BucketParameterQuerier[] = [];
124-
this.pushBucketParameterQueriers({ queriers, errors: [] }, options);
125-
126-
return mergeBucketParameterQueriers(queriers);
127-
}
128-
129-
pushBucketParameterQueriers(result: PendingQueriers, options: GetQuerierOptions) {
130-
const reasons = [this.bucketInclusionReason()];
131-
const staticBuckets = this.getStaticBucketDescriptions(
132-
options.globalParameters,
133-
reasons,
134-
options.bucketIdTransformer
135-
);
136-
const staticQuerier = {
137-
staticBuckets,
138-
hasDynamicBuckets: false,
139-
parameterQueryLookups: [],
140-
queryDynamicBucketDescriptions: async () => []
141-
} satisfies BucketParameterQuerier;
142-
result.queriers.push(staticQuerier);
143-
144-
if (this.parameterQueries.length == 0) {
145-
return;
146-
}
147-
148-
const dynamicQueriers = this.parameterQueries.map((query) =>
149-
query.getBucketParameterQuerier(options.globalParameters, reasons, options.bucketIdTransformer)
150-
);
151-
result.queriers.push(...dynamicQueriers);
174+
};
152175
}
153176

154177
getStaticBucketDescriptions(

0 commit comments

Comments
 (0)