Skip to content

Commit 1679f4a

Browse files
committed
Remove need for hydration on BucketDataSource.
1 parent af096ea commit 1679f4a

22 files changed

+367
-282
lines changed

packages/sync-rules/src/BaseSqlDataQuery.ts

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,10 @@ import { AvailableTable, SqlTools } from './sql_filters.js';
66
import { castAsText } from './sql_functions.js';
77
import { TablePattern } from './TablePattern.js';
88
import {
9-
EvaluationResult,
109
QueryParameters,
1110
QuerySchema,
11+
SourceEvaluatedRow,
12+
SourceEvaluationResult,
1213
SourceSchema,
1314
SourceSchemaTable,
1415
SqliteJsonRow,
@@ -24,7 +25,7 @@ export interface RowValueExtractor {
2425
export interface EvaluateRowOptions {
2526
table: SourceTableInterface;
2627
row: SqliteRow;
27-
bucketIds: (params: QueryParameters) => string[];
28+
serializedBucketParameters: (params: QueryParameters) => string[];
2829
}
2930

3031
export interface BaseSqlDataQueryOptions {
@@ -169,13 +170,14 @@ export class BaseSqlDataQuery {
169170
}
170171
}
171172

172-
evaluateRowWithOptions(options: EvaluateRowOptions): EvaluationResult[] {
173+
evaluateRowWithOptions(options: EvaluateRowOptions): SourceEvaluationResult[] {
173174
try {
174-
const { table, row, bucketIds } = options;
175+
const { table, row, serializedBucketParameters } = options;
175176

176177
const tables = { [this.table.nameInSchema]: this.addSpecialParameters(table, row) };
177-
const resolvedBucketIds = bucketIds(tables);
178-
if (resolvedBucketIds.length == 0) {
178+
// Array of _serialized_ parameters, one per output result.
179+
const resolvedBucketParameters = serializedBucketParameters(tables);
180+
if (resolvedBucketParameters.length == 0) {
179181
// Short-circuit: No need to transform the row if there are no matching buckets.
180182
return [];
181183
}
@@ -193,13 +195,13 @@ export class BaseSqlDataQuery {
193195
}
194196
const outputTable = this.getOutputName(table.name);
195197

196-
return resolvedBucketIds.map((bucketId) => {
198+
return resolvedBucketParameters.map((serializedBucketParameters) => {
197199
return {
198-
bucket: bucketId,
200+
serializedBucketParameters,
199201
table: outputTable,
200202
id: id,
201203
data
202-
} as EvaluationResult;
204+
} satisfies SourceEvaluatedRow;
203205
});
204206
} catch (e) {
205207
return [{ error: e.message ?? `Evaluating data query failed` }];

packages/sync-rules/src/BucketSource.ts

Lines changed: 51 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,17 @@ import { DEFAULT_HYDRATION_STATE, HydrationState, ParameterLookupScope } from '.
44
import { SourceTableInterface } from './SourceTableInterface.js';
55
import { GetQuerierOptions } from './SqlSyncRules.js';
66
import { TablePattern } from './TablePattern.js';
7-
import { EvaluatedParametersResult, EvaluateRowOptions, EvaluationResult, SourceSchema, SqliteRow } from './types.js';
7+
import {
8+
EvaluatedParametersResult,
9+
EvaluatedRow,
10+
EvaluateRowOptions,
11+
EvaluationResult,
12+
isEvaluationError,
13+
SourceEvaluationResult,
14+
SourceSchema,
15+
SqliteRow
16+
} from './types.js';
17+
import { buildBucketName } from './utils.js';
818

919
export interface CreateSourceParams {
1020
hydrationState: HydrationState;
@@ -30,7 +40,7 @@ export interface BucketSource {
3040
* Specifically, bucket definitions would always have a single data source, while stream definitions may have
3141
* one per variant.
3242
*/
33-
readonly dataSources: BucketDataSourceDefinition[];
43+
readonly dataSources: BucketDataSource[];
3444

3545
/**
3646
* BucketParameterQuerierSource describing the parameter queries / stream subqueries in this bucket/stream definition.
@@ -57,8 +67,11 @@ export interface HydratedBucketSource {
5767

5868
/**
5969
* Encodes a static definition of a bucket source, as parsed from sync rules or stream definitions.
70+
*
71+
* This does not require any "hydration" itself: All results are independent of bucket names.
72+
* The higher-level HydratedSyncRules will use a HydrationState to generate bucket names.
6073
*/
61-
export interface BucketDataSourceDefinition {
74+
export interface BucketDataSource {
6275
/**
6376
* Bucket prefix if no transformations are defined.
6477
*
@@ -70,10 +83,16 @@ export interface BucketDataSourceDefinition {
7083
* For debug use only.
7184
*/
7285
readonly bucketParameters: string[];
86+
7387
getSourceTables(): Set<TablePattern>;
74-
createDataSource(params: CreateSourceParams): BucketDataSource;
7588
tableSyncsData(table: SourceTableInterface): boolean;
7689

90+
/**
91+
* Given a row as it appears in a table that affects sync data, return buckets, logical table names and transformed
92+
* data for rows to add to buckets.
93+
*/
94+
evaluateRow(options: EvaluateRowOptions): SourceEvaluationResult[];
95+
7796
/**
7897
* Given a static schema, infer all logical tables and associated columns that appear in buckets defined by this
7998
* source.
@@ -121,29 +140,11 @@ export interface BucketParameterQuerierSourceDefinition {
121140
*
122141
* Note that queriers do not persist data themselves; they only resolve which buckets to load based on request parameters.
123142
*/
124-
readonly querierDataSource: BucketDataSourceDefinition;
143+
readonly querierDataSource: BucketDataSource;
125144

126145
createParameterQuerierSource(params: CreateSourceParams): BucketParameterQuerierSource;
127146
}
128147

129-
/**
130-
* An interface declaring
131-
*
132-
* - which buckets the sync service should create when processing change streams from the database.
133-
* - how data in source tables maps to data in buckets (e.g. when we're not selecting all columns).
134-
* - which buckets a given connection has access to.
135-
*
136-
* There are two ways to define bucket sources: Via sync rules made up of parameter and data queries, and via stream
137-
* definitions that only consist of a single query.
138-
*/
139-
export interface BucketDataSource {
140-
/**
141-
* Given a row as it appears in a table that affects sync data, return buckets, logical table names and transformed
142-
* data for rows to add to buckets.
143-
*/
144-
evaluateRow(options: EvaluateRowOptions): EvaluationResult[];
145-
}
146-
147148
export interface BucketParameterLookupSource {
148149
/**
149150
* Given a row in a source table that affects sync parameters, returns a structure to index which buckets rows should
@@ -165,10 +166,9 @@ export interface BucketParameterQuerierSource {
165166
pushBucketParameterQueriers(result: PendingQueriers, options: GetQuerierOptions): void;
166167
}
167168

168-
export interface DebugMergedSource
169-
extends BucketDataSource,
170-
BucketParameterLookupSource,
171-
BucketParameterQuerierSource {}
169+
export interface DebugMergedSource extends BucketParameterLookupSource, BucketParameterQuerierSource {
170+
evaluateRow(options: EvaluateRowOptions): EvaluationResult[];
171+
}
172172

173173
export enum BucketSourceType {
174174
SYNC_RULE,
@@ -177,14 +177,31 @@ export enum BucketSourceType {
177177

178178
export type ResultSetDescription = { name: string; columns: ColumnDefinition[] };
179179

180-
export function mergeDataSources(sources: BucketDataSource[]): BucketDataSource {
180+
export function hydrateEvaluateRow(
181+
hydrationState: HydrationState,
182+
source: BucketDataSource
183+
): (options: EvaluateRowOptions) => EvaluationResult[] {
184+
const scope = hydrationState.getBucketSourceScope(source);
185+
return (options: EvaluateRowOptions): EvaluationResult[] => {
186+
return source.evaluateRow(options).map((result) => {
187+
if (isEvaluationError(result)) {
188+
return result;
189+
}
190+
return {
191+
bucket: buildBucketName(scope, result.serializedBucketParameters),
192+
id: result.id,
193+
table: result.table,
194+
data: result.data
195+
} satisfies EvaluatedRow;
196+
});
197+
};
198+
}
199+
200+
export function mergeDataSources(hydrationState: HydrationState, sources: BucketDataSource[]) {
201+
const withScope = sources.map((source) => hydrateEvaluateRow(hydrationState, source));
181202
return {
182203
evaluateRow(options: EvaluateRowOptions): EvaluationResult[] {
183-
let results: EvaluationResult[] = [];
184-
for (let source of sources) {
185-
results.push(...source.evaluateRow(options));
186-
}
187-
return results;
204+
return withScope.flatMap((source) => source(options));
188205
}
189206
};
190207
}
@@ -218,9 +235,7 @@ export function mergeParameterQuerierSources(sources: BucketParameterQuerierSour
218235
export function debugHydratedMergedSource(bucketSource: BucketSource, params?: CreateSourceParams): DebugMergedSource {
219236
const hydrationState = params?.hydrationState ?? DEFAULT_HYDRATION_STATE;
220237
const resolvedParams = { hydrationState };
221-
const dataSource = mergeDataSources(
222-
bucketSource.dataSources.map((source) => source.createDataSource(resolvedParams))
223-
);
238+
const dataSource = mergeDataSources(hydrationState, bucketSource.dataSources);
224239
const parameterLookupSource = mergeParameterLookupSources(
225240
bucketSource.parameterLookupSources.map((source) => source.createParameterLookupSource(resolvedParams))
226241
);

packages/sync-rules/src/SyncRules.ts renamed to packages/sync-rules/src/HydratedSyncRules.ts

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
import {
22
BucketDataSource,
33
BucketParameterLookupSource,
4-
BucketParameterQuerierSource,
5-
BucketParameterQuerierSourceDefinition,
64
CreateSourceParams,
75
HydratedBucketSource
86
} from './BucketSource.js';
7+
import { BucketDataScope } from './HydrationState.js';
98
import {
109
BucketParameterQuerier,
1110
CompatibilityContext,
@@ -35,6 +34,7 @@ export class HydratedSyncRules {
3534
bucketSources: HydratedBucketSource[] = [];
3635
bucketDataSources: BucketDataSource[];
3736
bucketParameterLookupSources: BucketParameterLookupSource[];
37+
bucketSourceHydration: Map<BucketDataSource, BucketDataScope> = new Map();
3838

3939
eventDescriptors: SqlEventDescriptor[] = [];
4040
compatibility: CompatibilityContext = CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY;
@@ -53,6 +53,13 @@ export class HydratedSyncRules {
5353
this.bucketParameterLookupSources = params.bucketParameterLookupSources;
5454
this.definition = params.definition;
5555

56+
const hydrationState = params.createParams.hydrationState;
57+
58+
for (let source of this.bucketDataSources) {
59+
const state = hydrationState.getBucketSourceScope(source);
60+
this.bucketSourceHydration.set(source, state);
61+
}
62+
5663
if (params.eventDescriptors) {
5764
this.eventDescriptors = params.eventDescriptors;
5865
}
@@ -107,7 +114,24 @@ export class HydratedSyncRules {
107114
evaluateRowWithErrors(options: EvaluateRowOptions): { results: EvaluatedRow[]; errors: EvaluationError[] } {
108115
let rawResults: EvaluationResult[] = [];
109116
for (let source of this.bucketDataSources) {
110-
rawResults.push(...source.evaluateRow(options));
117+
const sourceResults = source.evaluateRow(options);
118+
if (sourceResults.length == 0) {
119+
continue;
120+
}
121+
const bucketPrefix = this.bucketSourceHydration.get(source)!.bucketPrefix;
122+
rawResults.push(
123+
...sourceResults.map((sourceRow) => {
124+
if (isEvaluationError(sourceRow)) {
125+
return sourceRow;
126+
}
127+
return {
128+
bucket: bucketPrefix + sourceRow.serializedBucketParameters,
129+
id: sourceRow.id,
130+
table: sourceRow.table,
131+
data: sourceRow.data
132+
} satisfies EvaluatedRow;
133+
})
134+
);
111135
}
112136

113137
const results = rawResults.filter(isEvaluatedRow) as EvaluatedRow[];

packages/sync-rules/src/HydrationState.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
import { BucketDataSourceDefinition, BucketParameterLookupSourceDefinition } from './BucketSource.js';
1+
import { BucketDataSource, BucketParameterLookupSourceDefinition } from './BucketSource.js';
22

3-
export interface BucketSourceState {
3+
export interface BucketDataScope {
44
/** The prefix is the bucket name before the parameters. */
55
bucketPrefix: string;
66
}
@@ -18,13 +18,13 @@ export interface ParameterLookupScope {
1818
* both to re-use mappings across hydrations of different sync rule versions, or to generate new mappings.
1919
*/
2020
export interface HydrationState<
21-
T extends BucketSourceState = BucketSourceState,
21+
T extends BucketDataScope = BucketDataScope,
2222
U extends ParameterLookupScope = ParameterLookupScope
2323
> {
2424
/**
2525
* Given a bucket data source definition, get the bucket prefix to use for it.
2626
*/
27-
getBucketSourceState(source: BucketDataSourceDefinition): T;
27+
getBucketSourceScope(source: BucketDataSource): T;
2828

2929
/**
3030
* Given a bucket parameter lookup definition, get the persistence name to use.
@@ -38,7 +38,7 @@ export interface HydrationState<
3838
* This is the legacy default behavior with no bucket versioning.
3939
*/
4040
export const DEFAULT_HYDRATION_STATE: HydrationState = {
41-
getBucketSourceState(source: BucketDataSourceDefinition) {
41+
getBucketSourceScope(source: BucketDataSource) {
4242
return {
4343
bucketPrefix: source.defaultBucketPrefix
4444
};
@@ -62,7 +62,7 @@ export const DEFAULT_HYDRATION_STATE: HydrationState = {
6262
*/
6363
export function versionedHydrationState(version: number): HydrationState {
6464
return {
65-
getBucketSourceState(source: BucketDataSourceDefinition): BucketSourceState {
65+
getBucketSourceScope(source: BucketDataSource): BucketDataScope {
6666
return {
6767
bucketPrefix: `${version}#${source.defaultBucketPrefix}`
6868
};

packages/sync-rules/src/SqlBucketDescriptor.ts

Lines changed: 12 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,4 @@
1-
import {
2-
BucketDataSource,
3-
BucketDataSourceDefinition,
4-
BucketSource,
5-
BucketSourceType,
6-
CreateSourceParams
7-
} from './BucketSource.js';
1+
import { BucketDataSource, BucketSource, BucketSourceType } from './BucketSource.js';
82
import { ColumnDefinition } from './ExpressionType.js';
93
import { IdSequence } from './IdSequence.js';
104
import { SourceTableInterface } from './SourceTableInterface.js';
@@ -16,7 +10,7 @@ import { TablePattern } from './TablePattern.js';
1610
import { TableValuedFunctionSqlParameterQuery } from './TableValuedFunctionSqlParameterQuery.js';
1711
import { CompatibilityContext } from './compatibility.js';
1812
import { SqlRuleError } from './errors.js';
19-
import { EvaluationResult, QueryParseOptions, SourceSchema } from './types.js';
13+
import { EvaluateRowOptions, QueryParseOptions, SourceEvaluationResult, SourceSchema } from './types.js';
2014

2115
export interface QueryParseResult {
2216
/**
@@ -142,7 +136,7 @@ export class SqlBucketDescriptor implements BucketSource {
142136
}
143137
}
144138

145-
export class BucketDefinitionDataSource implements BucketDataSourceDefinition {
139+
export class BucketDefinitionDataSource implements BucketDataSource {
146140
constructor(private descriptor: SqlBucketDescriptor) {}
147141

148142
/**
@@ -156,22 +150,16 @@ export class BucketDefinitionDataSource implements BucketDataSourceDefinition {
156150
return this.descriptor.name;
157151
}
158152

159-
createDataSource(params: CreateSourceParams): BucketDataSource {
160-
const hydrationState = params.hydrationState;
161-
const bucketPrefix = hydrationState.getBucketSourceState(this).bucketPrefix;
162-
return {
163-
evaluateRow: (options) => {
164-
let results: EvaluationResult[] = [];
165-
for (let query of this.descriptor.dataQueries) {
166-
if (!query.applies(options.sourceTable)) {
167-
continue;
168-
}
169-
170-
results.push(...query.evaluateRow(options.sourceTable, options.record, bucketPrefix));
171-
}
172-
return results;
153+
evaluateRow(options: EvaluateRowOptions) {
154+
let results: SourceEvaluationResult[] = [];
155+
for (let query of this.descriptor.dataQueries) {
156+
if (!query.applies(options.sourceTable)) {
157+
continue;
173158
}
174-
};
159+
160+
results.push(...query.evaluateRow(options.sourceTable, options.record));
161+
}
162+
return results;
175163
}
176164

177165
getSourceTables(): Set<TablePattern> {

0 commit comments

Comments
 (0)