Skip to content

Commit 0049d54

Browse files
committed
Split out SyncStream into its components.
1 parent 011ccb4 commit 0049d54

File tree

5 files changed

+127
-101
lines changed

5 files changed

+127
-101
lines changed

packages/sync-rules/src/BaseSqlDataQuery.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ export interface EvaluateRowOptions {
2626
table: SourceTableInterface;
2727
row: SqliteRow;
2828
bucketIds: (params: QueryParameters) => string[];
29-
bucketIdTransformer: BucketIdTransformer | null;
3029
}
3130

3231
export interface BaseSqlDataQueryOptions {
@@ -177,7 +176,7 @@ export class BaseSqlDataQuery {
177176
}
178177
}
179178

180-
evaluateRowWithOptions(options: Omit<EvaluateRowOptions, 'bucketIdTransformer'>): EvaluationResult[] {
179+
evaluateRowWithOptions(options: EvaluateRowOptions): EvaluationResult[] {
181180
try {
182181
const { table, row, bucketIds } = options;
183182

packages/sync-rules/src/BucketSource.ts

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -80,11 +80,6 @@ export interface BucketDataSourceDefinition {
8080
* This is only relevant for parameter queries that query tables.
8181
*/
8282
export interface BucketParameterLookupSourceDefinition {
83-
/**
84-
* For debug use only.
85-
*/
86-
readonly bucketParameters: string[];
87-
8883
getSourceTables(): Set<TablePattern>;
8984
createParameterLookupSource(params: CreateSourceParams): BucketParameterLookupSource;
9085

packages/sync-rules/src/SqlBucketDescriptor.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,10 +124,10 @@ export class SqlBucketDescriptor implements BucketDataSourceDefinition, BucketSo
124124
getSourceTables(): Set<TablePattern> {
125125
let result = new Set<TablePattern>();
126126
for (let query of this.parameterQueries) {
127-
result.add(query.sourceTable!);
127+
result.add(query.sourceTable);
128128
}
129129
for (let query of this.dataQueries) {
130-
result.add(query.sourceTable!);
130+
result.add(query.sourceTable);
131131
}
132132

133133
// Note: No physical tables for global_parameter_queries

packages/sync-rules/src/streams/stream.ts

Lines changed: 85 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -27,47 +27,77 @@ import {
2727
} from '../types.js';
2828
import { StreamVariant } from './variant.js';
2929

30-
export class SyncStream
31-
implements
32-
BucketDataSourceDefinition,
33-
BucketParameterLookupSourceDefinition,
34-
BucketParameterQuerierSourceDefinition,
35-
BucketSource
36-
{
30+
export class SyncStream implements BucketSource {
3731
name: string;
3832
subscribedToByDefault: boolean;
3933
priority: BucketPriority;
4034
variants: StreamVariant[];
4135
data: BaseSqlDataQuery;
4236

37+
public readonly dataSource: BucketDataSourceDefinition;
38+
public readonly parameterLookupSources: BucketParameterLookupSourceDefinition[];
39+
public readonly parameterQuerierSources: BucketParameterQuerierSourceDefinition[];
40+
4341
constructor(name: string, data: BaseSqlDataQuery) {
4442
this.name = name;
4543
this.subscribedToByDefault = false;
4644
this.priority = DEFAULT_BUCKET_PRIORITY;
4745
this.variants = [];
4846
this.data = data;
47+
48+
this.dataSource = new SyncStreamDataSource(this, data);
49+
this.parameterQuerierSources = [new SyncStreamParameterQuerierSource(this)];
50+
this.parameterLookupSources = [new SyncStreamParameterLookupSource(this)];
4951
}
5052

5153
public get type(): BucketSourceType {
5254
return BucketSourceType.SYNC_STREAM;
5355
}
56+
}
5457

55-
public get bucketParameters(): string[] {
58+
export class SyncStreamDataSource implements BucketDataSourceDefinition {
59+
constructor(
60+
private stream: SyncStream,
61+
private data: BaseSqlDataQuery
62+
) {}
63+
64+
get bucketParameters() {
5665
// FIXME: check whether this is correct.
5766
// Could there be multiple variants with different bucket parameters?
5867
return this.data.bucketParameters;
5968
}
6069

61-
public get dataSource() {
62-
return this;
70+
getSourceTables(): Set<TablePattern> {
71+
return new Set<TablePattern>([this.data.sourceTable]);
6372
}
6473

65-
public get parameterLookupSources() {
66-
return [this];
74+
tableSyncsData(table: SourceTableInterface): boolean {
75+
return this.data.applies(table);
76+
}
77+
78+
resolveResultSets(schema: SourceSchema, tables: Record<string, Record<string, ColumnDefinition>>): void {
79+
return this.data.resolveResultSets(schema, tables);
6780
}
6881

69-
public get parameterQuerierSources() {
70-
return [this];
82+
debugWriteOutputTables(result: Record<string, { query: string }[]>): void {
83+
result[this.data.table!.sqlName] ??= [];
84+
const r = {
85+
query: this.data.sql
86+
};
87+
88+
result[this.data.table!.sqlName].push(r);
89+
}
90+
91+
debugRepresentation() {
92+
return {
93+
name: this.stream.name,
94+
type: BucketSourceType[BucketSourceType.SYNC_STREAM],
95+
variants: this.stream.variants.map((v) => v.debugRepresentation()),
96+
data: {
97+
table: this.data.sourceTable,
98+
columns: this.data.columnOutputNames()
99+
}
100+
};
71101
}
72102

73103
createDataSource(params: CreateSourceParams): BucketDataSource {
@@ -79,7 +109,7 @@ export class SyncStream
79109
return [];
80110
}
81111

82-
const stream = this;
112+
const stream = this.stream;
83113
const row: TableRow = {
84114
sourceTable: options.sourceTable,
85115
record: options.record
@@ -100,31 +130,28 @@ export class SyncStream
100130
}
101131
};
102132
}
133+
}
103134

104-
createParameterLookupSource(params: CreateSourceParams): BucketParameterLookupSource {
105-
return {
106-
definition: this,
135+
export class SyncStreamParameterQuerierSource implements BucketParameterQuerierSourceDefinition {
136+
// We could eventually split this into a separate source per variant.
107137

108-
evaluateParameterRow: (sourceTable, row) => {
109-
const result: EvaluatedParametersResult[] = [];
138+
constructor(private stream: SyncStream) {}
110139

111-
for (const variant of this.variants) {
112-
variant.pushParameterRowEvaluation(result, sourceTable, row);
113-
}
114-
115-
return result;
116-
}
117-
};
140+
get bucketParameters(): string[] {
141+
// FIXME: check whether this is correct.
142+
// Could there be multiple variants with different bucket parameters?
143+
return this.stream.data.bucketParameters;
118144
}
119145

120146
createParameterQuerierSource(params: CreateSourceParams): BucketParameterQuerierSource {
147+
const stream = this.stream;
121148
return {
122149
definition: this,
123150

124151
pushBucketParameterQueriers: (result: PendingQueriers, options: GetQuerierOptions): void => {
125-
const subscriptions = options.streams[this.name] ?? [];
152+
const subscriptions = options.streams[stream.name] ?? [];
126153

127-
if (!this.subscribedToByDefault && !subscriptions.length) {
154+
if (!stream.subscribedToByDefault && !subscriptions.length) {
128155
// The client is not subscribing to this stream, so don't query buckets related to it.
129156
return;
130157
}
@@ -143,7 +170,7 @@ export class SyncStream
143170

144171
// If the stream is subscribed to by default and there is no explicit subscription that would match the default
145172
// subscription, also include the default querier.
146-
if (this.subscribedToByDefault && !hasExplicitDefaultSubscription) {
173+
if (stream.subscribedToByDefault && !hasExplicitDefaultSubscription) {
147174
this.queriersForSubscription(result, null, options.globalParameters, params.bucketIdTransformer);
148175
}
149176
}
@@ -160,8 +187,8 @@ export class SyncStream
160187
const queriers: BucketParameterQuerier[] = [];
161188

162189
try {
163-
for (const variant of this.variants) {
164-
const querier = variant.querier(this, reason, params, bucketIdTransformer);
190+
for (const variant of this.stream.variants) {
191+
const querier = variant.querier(this.stream, reason, params, bucketIdTransformer);
165192
if (querier) {
166193
queriers.push(querier);
167194
}
@@ -170,65 +197,55 @@ export class SyncStream
170197
result.queriers.push(...queriers);
171198
} catch (e) {
172199
result.errors.push({
173-
descriptor: this.name,
200+
descriptor: this.stream.name,
174201
message: `Error evaluating bucket ids: ${e.message}`,
175202
subscription: subscription ?? undefined
176203
});
177204
}
178205
}
206+
}
179207

180-
tableSyncsData(table: SourceTableInterface): boolean {
181-
return this.data.applies(table);
182-
}
183-
184-
tableSyncsParameters(table: SourceTableInterface): boolean {
185-
for (const variant of this.variants) {
186-
for (const subquery of variant.subqueries) {
187-
if (subquery.parameterTable.matches(table)) {
188-
return true;
189-
}
190-
}
191-
}
208+
export class SyncStreamParameterLookupSource implements BucketParameterLookupSourceDefinition {
209+
// We could eventually split this into a separate source per variant.
192210

193-
return false;
194-
}
211+
constructor(private stream: SyncStream) {}
195212

196213
getSourceTables(): Set<TablePattern> {
197214
let result = new Set<TablePattern>();
198-
result.add(this.data.sourceTable);
199-
for (let variant of this.variants) {
215+
for (let variant of this.stream.variants) {
200216
for (const subquery of variant.subqueries) {
201217
result.add(subquery.parameterTable);
202218
}
203219
}
204220

205-
// Note: No physical tables for global_parameter_queries
206-
207221
return result;
208222
}
209223

210-
resolveResultSets(schema: SourceSchema, tables: Record<string, Record<string, ColumnDefinition>>) {
211-
this.data.resolveResultSets(schema, tables);
212-
}
213-
214-
debugRepresentation() {
224+
createParameterLookupSource(params: CreateSourceParams): BucketParameterLookupSource {
215225
return {
216-
name: this.name,
217-
type: BucketSourceType[this.type],
218-
variants: this.variants.map((v) => v.debugRepresentation()),
219-
data: {
220-
table: this.data.sourceTable,
221-
columns: this.data.columnOutputNames()
226+
definition: this,
227+
228+
evaluateParameterRow: (sourceTable, row) => {
229+
const result: EvaluatedParametersResult[] = [];
230+
231+
for (const variant of this.stream.variants) {
232+
variant.pushParameterRowEvaluation(result, sourceTable, row);
233+
}
234+
235+
return result;
222236
}
223237
};
224238
}
225239

226-
debugWriteOutputTables(result: Record<string, { query: string }[]>): void {
227-
result[this.data.table!.sqlName] ??= [];
228-
const r = {
229-
query: this.data.sql
230-
};
240+
tableSyncsParameters(table: SourceTableInterface): boolean {
241+
for (const variant of this.stream.variants) {
242+
for (const subquery of variant.subqueries) {
243+
if (subquery.parameterTable.matches(table)) {
244+
return true;
245+
}
246+
}
247+
}
231248

232-
result[this.data.table!.sqlName].push(r);
249+
return false;
233250
}
234251
}

0 commit comments

Comments
 (0)