Skip to content

Commit ca716a7

Browse files
committed
Initial split of data vs parameter sources.
1 parent b77bb2c commit ca716a7

File tree

6 files changed

+102
-55
lines changed

6 files changed

+102
-55
lines changed

packages/sync-rules/src/BucketSource.ts

Lines changed: 49 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,55 @@ import { EvaluatedParametersResult, EvaluateRowOptions, EvaluationResult, Source
1515
* There are two ways to define bucket sources: Via sync rules made up of parameter and data queries, and via stream
1616
* definitions that only consist of a single query.
1717
*/
18-
export interface BucketSource {
18+
export interface BucketDataSource {
19+
readonly name: string;
20+
readonly type: BucketSourceType;
21+
22+
readonly subscribedToByDefault: boolean;
23+
24+
/**
25+
* Given a row as it appears in a table that affects sync data, return buckets, logical table names and transformed
26+
* data for rows to add to buckets.
27+
*/
28+
evaluateRow(options: EvaluateRowOptions): EvaluationResult[];
29+
30+
/**
31+
* Reports {@link BucketParameterQuerier}s resolving buckets that a specific stream request should have access to.
32+
*
33+
* @param result The target array to insert queriers and errors into.
34+
* @param options Options, including parameters that may affect the buckets loaded by this source.
35+
*/
36+
pushBucketParameterQueriers(result: PendingQueriers, options: GetQuerierOptions): void;
37+
38+
getSourceTables(): Set<TablePattern>;
39+
40+
/** Whether the table possibly affects the contents of buckets resolved by this source. */
41+
tableSyncsData(table: SourceTableInterface): boolean;
42+
43+
/**
44+
* Given a static schema, infer all logical tables and associated columns that appear in buckets defined by this
45+
* source.
46+
*
47+
* This is use to generate the client-side schema.
48+
*/
49+
resolveResultSets(schema: SourceSchema, tables: Record<string, Record<string, ColumnDefinition>>): void;
50+
51+
debugWriteOutputTables(result: Record<string, { query: string }[]>): void;
52+
53+
debugRepresentation(): any;
54+
}
55+
56+
/**
57+
* An interface declaring
58+
*
59+
* - which buckets the sync service should create when processing change streams from the database.
60+
* - how data in source tables maps to data in buckets (e.g. when we're not selecting all columns).
61+
* - which buckets a given connection has access to.
62+
*
63+
* There are two ways to define bucket sources: Via sync rules made up of parameter and data queries, and via stream
64+
* definitions that only consist of a single query.
65+
*/
66+
export interface BucketParameterSource {
1967
readonly name: string;
2068
readonly type: BucketSourceType;
2169

@@ -30,12 +78,6 @@ export interface BucketSource {
3078
*/
3179
evaluateParameterRow(sourceTable: SourceTableInterface, row: SqliteRow): EvaluatedParametersResult[];
3280

33-
/**
34-
* Given a row as it appears in a table that affects sync data, return buckets, logical table names and transformed
35-
* data for rows to add to buckets.
36-
*/
37-
evaluateRow(options: EvaluateRowOptions): EvaluationResult[];
38-
3981
/**
4082
* Reports {@link BucketParameterQuerier}s resolving buckets that a specific stream request should have access to.
4183
*
@@ -57,17 +99,6 @@ export interface BucketSource {
5799
/** Whether the table possibly affects the buckets resolved by this source. */
58100
tableSyncsParameters(table: SourceTableInterface): boolean;
59101

60-
/** Whether the table possibly affects the contents of buckets resolved by this source. */
61-
tableSyncsData(table: SourceTableInterface): boolean;
62-
63-
/**
64-
* Given a static schema, infer all logical tables and associated columns that appear in buckets defined by this
65-
* source.
66-
*
67-
* This is use to generate the client-side schema.
68-
*/
69-
resolveResultSets(schema: SourceSchema, tables: Record<string, Record<string, ColumnDefinition>>): void;
70-
71102
debugWriteOutputTables(result: Record<string, { query: string }[]>): void;
72103

73104
debugRepresentation(): any;

packages/sync-rules/src/SqlBucketDescriptor.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { BucketInclusionReason, ResolvedBucket } from './BucketDescription.js';
22
import { BucketParameterQuerier, mergeBucketParameterQueriers, PendingQueriers } from './BucketParameterQuerier.js';
3-
import { BucketSource, BucketSourceType, ResultSetDescription } from './BucketSource.js';
3+
import { BucketDataSource, BucketParameterSource, BucketSourceType, ResultSetDescription } from './BucketSource.js';
44
import { ColumnDefinition } from './ExpressionType.js';
55
import { IdSequence } from './IdSequence.js';
66
import { SourceTableInterface } from './SourceTableInterface.js';
@@ -32,7 +32,7 @@ export interface QueryParseResult {
3232
errors: SqlRuleError[];
3333
}
3434

35-
export class SqlBucketDescriptor implements BucketSource {
35+
export class SqlBucketDescriptor implements BucketDataSource, BucketParameterSource {
3636
name: string;
3737
bucketParameters?: string[];
3838

packages/sync-rules/src/SqlSyncRules.ts

Lines changed: 27 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
import { isScalar, LineCounter, parseDocument, Scalar, YAMLMap, YAMLSeq } from 'yaml';
22
import { isValidPriority } from './BucketDescription.js';
33
import { BucketParameterQuerier, mergeBucketParameterQueriers, QuerierError } from './BucketParameterQuerier.js';
4+
import { BucketDataSource, BucketParameterSource } from './BucketSource.js';
5+
import { CompatibilityContext, CompatibilityEdition, CompatibilityOption } from './compatibility.js';
46
import { SqlRuleError, SyncRulesErrors, YamlError } from './errors.js';
57
import { SqlEventDescriptor } from './events/SqlEventDescriptor.js';
68
import { validateSyncRulesSchema } from './json_schema.js';
79
import { SourceTableInterface } from './SourceTableInterface.js';
810
import { QueryParseResult, SqlBucketDescriptor } from './SqlBucketDescriptor.js';
11+
import { syncStreamFromSql } from './streams/from_sql.js';
912
import { TablePattern } from './TablePattern.js';
1013
import {
1114
BucketIdTransformer,
@@ -28,9 +31,6 @@ import {
2831
StreamParseOptions,
2932
SyncRules
3033
} from './types.js';
31-
import { BucketSource } from './BucketSource.js';
32-
import { syncStreamFromSql } from './streams/from_sql.js';
33-
import { CompatibilityContext, CompatibilityEdition, CompatibilityOption } from './compatibility.js';
3434
import { applyRowContext } from './utils.js';
3535

3636
const ACCEPT_POTENTIALLY_DANGEROUS_QUERIES = Symbol('ACCEPT_POTENTIALLY_DANGEROUS_QUERIES');
@@ -94,7 +94,9 @@ export interface GetBucketParameterQuerierResult {
9494
}
9595

9696
export class SqlSyncRules implements SyncRules {
97-
bucketSources: BucketSource[] = [];
97+
bucketDataSources: BucketDataSource[] = [];
98+
bucketParameterSources: BucketParameterSource[] = [];
99+
98100
eventDescriptors: SqlEventDescriptor[] = [];
99101
compatibility: CompatibilityContext = CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY;
100102

@@ -245,7 +247,8 @@ export class SqlSyncRules implements SyncRules {
245247
return descriptor.addDataQuery(q, queryOptions, compatibility);
246248
});
247249
}
248-
rules.bucketSources.push(descriptor);
250+
rules.bucketDataSources.push(descriptor);
251+
rules.bucketParameterSources.push(descriptor);
249252
}
250253

251254
for (const entry of streamMap?.items ?? []) {
@@ -270,7 +273,8 @@ export class SqlSyncRules implements SyncRules {
270273
if (data instanceof Scalar) {
271274
rules.withScalar(data, (q) => {
272275
const [parsed, errors] = syncStreamFromSql(key, q, queryOptions);
273-
rules.bucketSources.push(parsed);
276+
rules.bucketDataSources.push(parsed);
277+
rules.bucketParameterSources.push(parsed);
274278
return {
275279
parsed: true,
276280
errors
@@ -412,7 +416,7 @@ export class SqlSyncRules implements SyncRules {
412416
};
413417

414418
let rawResults: EvaluationResult[] = [];
415-
for (let source of this.bucketSources) {
419+
for (let source of this.bucketDataSources) {
416420
rawResults.push(...source.evaluateRow(resolvedOptions));
417421
}
418422

@@ -438,7 +442,7 @@ export class SqlSyncRules implements SyncRules {
438442
row: SqliteRow
439443
): { results: EvaluatedParameters[]; errors: EvaluationError[] } {
440444
let rawResults: EvaluatedParametersResult[] = [];
441-
for (let source of this.bucketSources) {
445+
for (let source of this.bucketParameterSources) {
442446
rawResults.push(...source.evaluateParameterRow(table, row));
443447
}
444448

@@ -460,7 +464,7 @@ export class SqlSyncRules implements SyncRules {
460464
const errors: QuerierError[] = [];
461465
const pending = { queriers, errors };
462466

463-
for (const source of this.bucketSources) {
467+
for (const source of this.bucketParameterSources) {
464468
if (
465469
(source.subscribedToByDefault && resolvedOptions.hasDefaultStreams) ||
466470
source.name in resolvedOptions.streams
@@ -474,12 +478,18 @@ export class SqlSyncRules implements SyncRules {
474478
}
475479

476480
hasDynamicBucketQueries() {
477-
return this.bucketSources.some((s) => s.hasDynamicBucketQueries());
481+
return this.bucketParameterSources.some((s) => s.hasDynamicBucketQueries());
478482
}
479483

480484
getSourceTables(): TablePattern[] {
481485
const sourceTables = new Map<String, TablePattern>();
482-
for (const bucket of this.bucketSources) {
486+
for (const bucket of this.bucketDataSources) {
487+
for (const r of bucket.getSourceTables()) {
488+
const key = `${r.connectionTag}.${r.schema}.${r.tablePattern}`;
489+
sourceTables.set(key, r);
490+
}
491+
}
492+
for (const bucket of this.bucketParameterSources) {
483493
for (const r of bucket.getSourceTables()) {
484494
const key = `${r.connectionTag}.${r.schema}.${r.tablePattern}`;
485495
sourceTables.set(key, r);
@@ -513,16 +523,19 @@ export class SqlSyncRules implements SyncRules {
513523
}
514524

515525
tableSyncsData(table: SourceTableInterface): boolean {
516-
return this.bucketSources.some((b) => b.tableSyncsData(table));
526+
return this.bucketDataSources.some((b) => b.tableSyncsData(table));
517527
}
518528

519529
tableSyncsParameters(table: SourceTableInterface): boolean {
520-
return this.bucketSources.some((b) => b.tableSyncsParameters(table));
530+
return this.bucketParameterSources.some((b) => b.tableSyncsParameters(table));
521531
}
522532

523533
debugGetOutputTables() {
524534
let result: Record<string, any[]> = {};
525-
for (let bucket of this.bucketSources) {
535+
for (let bucket of this.bucketDataSources) {
536+
bucket.debugWriteOutputTables(result);
537+
}
538+
for (let bucket of this.bucketParameterSources) {
526539
bucket.debugWriteOutputTables(result);
527540
}
528541
return result;

packages/sync-rules/src/schema-generators/SchemaGenerator.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ export abstract class SchemaGenerator {
1010
protected getAllTables(source: SqlSyncRules, schema: SourceSchema) {
1111
let tables: Record<string, Record<string, ColumnDefinition>> = {};
1212

13-
for (let descriptor of source.bucketSources) {
13+
for (let descriptor of source.bucketDataSources) {
1414
descriptor.resolveResultSets(schema, tables);
1515
}
1616

packages/sync-rules/src/streams/stream.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { BaseSqlDataQuery } from '../BaseSqlDataQuery.js';
22
import { BucketInclusionReason, BucketPriority, DEFAULT_BUCKET_PRIORITY } from '../BucketDescription.js';
33
import { BucketParameterQuerier, PendingQueriers } from '../BucketParameterQuerier.js';
4-
import { BucketSource, BucketSourceType, ResultSetDescription } from '../BucketSource.js';
4+
import { BucketDataSource, BucketParameterSource, BucketSourceType, ResultSetDescription } from '../BucketSource.js';
55
import { ColumnDefinition } from '../ExpressionType.js';
66
import { SourceTableInterface } from '../SourceTableInterface.js';
77
import { GetQuerierOptions, RequestedStream } from '../SqlSyncRules.js';
@@ -18,7 +18,7 @@ import {
1818
} from '../types.js';
1919
import { StreamVariant } from './variant.js';
2020

21-
export class SyncStream implements BucketSource {
21+
export class SyncStream implements BucketDataSource, BucketParameterSource {
2222
name: string;
2323
subscribedToByDefault: boolean;
2424
priority: BucketPriority;

packages/sync-rules/test/src/sync_rules.test.ts

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@ describe('sync rules', () => {
1717

1818
test('parse empty sync rules', () => {
1919
const rules = SqlSyncRules.fromYaml('bucket_definitions: {}', PARSE_OPTIONS);
20-
expect(rules.bucketSources).toEqual([]);
20+
expect(rules.bucketParameterSources).toEqual([]);
21+
expect(rules.bucketDataSources).toEqual([]);
2122
});
2223

2324
test('parse global sync rules', () => {
@@ -30,7 +31,7 @@ bucket_definitions:
3031
`,
3132
PARSE_OPTIONS
3233
);
33-
const bucket = rules.bucketSources[0] as SqlBucketDescriptor;
34+
const bucket = rules.bucketDataSources[0] as SqlBucketDescriptor;
3435
expect(bucket.name).toEqual('mybucket');
3536
expect(bucket.bucketParameters).toEqual([]);
3637
const dataQuery = bucket.dataQueries[0];
@@ -70,7 +71,7 @@ bucket_definitions:
7071
`,
7172
PARSE_OPTIONS
7273
);
73-
const bucket = rules.bucketSources[0] as SqlBucketDescriptor;
74+
const bucket = rules.bucketParameterSources[0] as SqlBucketDescriptor;
7475
expect(bucket.bucketParameters).toEqual([]);
7576
const param_query = bucket.globalParameterQueries[0];
7677

@@ -102,7 +103,7 @@ bucket_definitions:
102103
`,
103104
PARSE_OPTIONS
104105
);
105-
const bucket = rules.bucketSources[0] as SqlBucketDescriptor;
106+
const bucket = rules.bucketParameterSources[0] as SqlBucketDescriptor;
106107
expect(bucket.bucketParameters).toEqual([]);
107108
const param_query = bucket.parameterQueries[0];
108109
expect(param_query.bucketParameters).toEqual([]);
@@ -126,9 +127,10 @@ bucket_definitions:
126127
`,
127128
PARSE_OPTIONS
128129
);
129-
const bucket = rules.bucketSources[0] as SqlBucketDescriptor;
130-
expect(bucket.bucketParameters).toEqual(['user_id', 'device_id']);
131-
const param_query = bucket.globalParameterQueries[0];
130+
const bucketParameters = rules.bucketParameterSources[0] as SqlBucketDescriptor;
131+
const bucketData = rules.bucketDataSources[0] as SqlBucketDescriptor;
132+
expect(bucketParameters.bucketParameters).toEqual(['user_id', 'device_id']);
133+
const param_query = bucketParameters.globalParameterQueries[0];
132134
expect(param_query.bucketParameters).toEqual(['user_id', 'device_id']);
133135
expect(
134136
rules.getBucketParameterQuerier(normalizeQuerierOptions({ user_id: 'user1' }, { device_id: 'device1' })).querier
@@ -137,7 +139,7 @@ bucket_definitions:
137139
{ bucket: 'mybucket["user1","device1"]', definition: 'mybucket', inclusion_reasons: ['default'], priority: 3 }
138140
]);
139141

140-
const data_query = bucket.dataQueries[0];
142+
const data_query = bucketData.dataQueries[0];
141143
expect(data_query.bucketParameters).toEqual(['user_id', 'device_id']);
142144
expect(
143145
rules.evaluateRow({
@@ -176,15 +178,16 @@ bucket_definitions:
176178
`,
177179
PARSE_OPTIONS
178180
);
179-
const bucket = rules.bucketSources[0] as SqlBucketDescriptor;
180-
expect(bucket.bucketParameters).toEqual(['user_id']);
181-
const param_query = bucket.globalParameterQueries[0];
181+
const bucketParameters = rules.bucketParameterSources[0] as SqlBucketDescriptor;
182+
const bucketData = rules.bucketDataSources[0] as SqlBucketDescriptor;
183+
expect(bucketParameters.bucketParameters).toEqual(['user_id']);
184+
const param_query = bucketParameters.globalParameterQueries[0];
182185
expect(param_query.bucketParameters).toEqual(['user_id']);
183186
expect(
184187
rules.getBucketParameterQuerier(normalizeQuerierOptions({ user_id: 'user1' })).querier.staticBuckets
185188
).toEqual([{ bucket: 'mybucket["user1"]', definition: 'mybucket', inclusion_reasons: ['default'], priority: 3 }]);
186189

187-
const data_query = bucket.dataQueries[0];
190+
const data_query = bucketData.dataQueries[0];
188191
expect(data_query.bucketParameters).toEqual(['user_id']);
189192
expect(
190193
rules.evaluateRow({
@@ -322,8 +325,8 @@ bucket_definitions:
322325
`,
323326
PARSE_OPTIONS
324327
);
325-
const bucket = rules.bucketSources[0] as SqlBucketDescriptor;
326-
expect(bucket.bucketParameters).toEqual(['user_id']);
328+
const bucketParameters = rules.bucketParameterSources[0] as SqlBucketDescriptor;
329+
expect(bucketParameters.bucketParameters).toEqual(['user_id']);
327330
expect(rules.getBucketParameterQuerier(normalizeQuerierOptions({ user_id: 'user1' })).querier).toMatchObject({
328331
staticBuckets: [{ bucket: 'mybucket["USER1"]', priority: 3 }],
329332
hasDynamicBuckets: false
@@ -360,8 +363,8 @@ bucket_definitions:
360363
`,
361364
PARSE_OPTIONS
362365
);
363-
const bucket = rules.bucketSources[0] as SqlBucketDescriptor;
364-
expect(bucket.bucketParameters).toEqual(['user_id']);
366+
const bucketParameters = rules.bucketParameterSources[0] as SqlBucketDescriptor;
367+
expect(bucketParameters.bucketParameters).toEqual(['user_id']);
365368
expect(rules.getBucketParameterQuerier(normalizeQuerierOptions({ user_id: 'user1' })).querier).toMatchObject({
366369
staticBuckets: [{ bucket: 'mybucket["USER1"]', priority: 3 }],
367370
hasDynamicBuckets: false
@@ -956,8 +959,8 @@ bucket_definitions:
956959
`,
957960
PARSE_OPTIONS
958961
);
959-
const bucket = rules.bucketSources[0] as SqlBucketDescriptor;
960-
expect(bucket.bucketParameters).toEqual(['user_id']);
962+
const bucketParameters = rules.bucketParameterSources[0] as SqlBucketDescriptor;
963+
expect(bucketParameters.bucketParameters).toEqual(['user_id']);
961964
expect(rules.hasDynamicBucketQueries()).toBe(true);
962965

963966
expect(rules.getBucketParameterQuerier(normalizeQuerierOptions({ user_id: 'user1' })).querier).toMatchObject({

0 commit comments

Comments
 (0)