Skip to content

Commit 1c1d55f

Browse files
committed
Add a stream test with custom hydrationState.
1 parent ae08fd3 commit 1c1d55f

File tree

1 file changed

+127
-24
lines changed

1 file changed

+127
-24
lines changed

packages/sync-rules/test/src/streams.test.ts

Lines changed: 127 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
/// <reference path="../matchers.d.ts" />
22
import { describe, expect, test } from 'vitest';
3+
import { HydrationState, ParameterLookupScope, versionedHydrationState } from '../../src/HydrationState.js';
34
import {
45
BucketParameterQuerier,
56
CompatibilityContext,
67
CompatibilityEdition,
78
CreateSourceParams,
89
debugHydratedMergedSource,
910
DEFAULT_TAG,
11+
EvaluationResult,
1012
GetBucketParameterQuerierResult,
1113
GetQuerierOptions,
1214
mergeBucketParameterQueriers,
@@ -16,14 +18,12 @@ import {
1618
SourceTableInterface,
1719
SqliteJsonRow,
1820
SqliteRow,
19-
SqlSyncRules,
2021
StaticSchema,
2122
StreamParseOptions,
2223
SyncStream,
2324
syncStreamFromSql
2425
} from '../../src/index.js';
2526
import { normalizeQuerierOptions, PARSE_OPTIONS, TestSourceTable } from './util.js';
26-
import { ParameterLookupScope, versionedHydrationState } from '../../src/HydrationState.js';
2727

2828
describe('streams', () => {
2929
const STREAM_0: ParameterLookupScope = {
@@ -878,6 +878,108 @@ WHERE
878878
).toStrictEqual(['1#stream|0["foo"]']);
879879
});
880880
});
881+
882+
test('variants with custom hydrationState', async () => {
883+
// Convoluted example, but want to test specific variant usage.
884+
// This test that bucket prefix and lookup scope mappings are correctly applied for each variant.
885+
const desc = parseStream(`
886+
SELECT * FROM comments WHERE
887+
issue_id IN (SELECT id FROM issues WHERE owner_id = auth.user_id()) OR -- stream|0
888+
issue_id IN (SELECT id FROM issues WHERE name = subscription.parameter('issue_name')) OR -- stream|1
889+
label = subscription.parameter('comment_label') OR -- stream|2
890+
auth.parameter('is_admin') -- stream|3
891+
`);
892+
893+
const hydrationState: HydrationState = {
894+
getBucketSourceState(source) {
895+
return { bucketPrefix: `${source.defaultBucketPrefix}.test` };
896+
},
897+
getParameterLookupScope(source) {
898+
return {
899+
lookupName: `${source.defaultLookupScope.lookupName}.test`,
900+
queryId: `${source.defaultLookupScope.queryId}.test`
901+
};
902+
}
903+
};
904+
905+
const hydrated = debugHydratedMergedSource(desc, { hydrationState });
906+
907+
expect(
908+
bucketIds(hydrated.evaluateRow({ sourceTable: COMMENTS, record: { id: 'c', issue_id: 'i1', label: 'l1' } }))
909+
).toStrictEqual(['stream|0.test["i1"]', 'stream|1.test["i1"]', 'stream|2.test["l1"]', 'stream|3.test[]']);
910+
911+
expect(
912+
hydrated.evaluateParameterRow(ISSUES, {
913+
id: 'i1',
914+
owner_id: 'u1',
915+
name: 'myname'
916+
})
917+
).toStrictEqual([
918+
{
919+
lookup: ParameterLookup.normalized({ lookupName: 'stream.test', queryId: '0.test' }, ['u1']),
920+
bucketParameters: [
921+
{
922+
result: 'i1'
923+
}
924+
]
925+
},
926+
927+
{
928+
lookup: ParameterLookup.normalized({ lookupName: 'stream.test', queryId: '1.test' }, ['myname']),
929+
bucketParameters: [
930+
{
931+
result: 'i1'
932+
}
933+
]
934+
}
935+
]);
936+
937+
expect(
938+
hydrated.evaluateParameterRow(ISSUES, {
939+
id: 'i1',
940+
owner_id: 'u1'
941+
})
942+
).toStrictEqual([
943+
{
944+
lookup: ParameterLookup.normalized({ lookupName: 'stream.test', queryId: '0.test' }, ['u1']),
945+
bucketParameters: [
946+
{
947+
result: 'i1'
948+
}
949+
]
950+
}
951+
]);
952+
953+
function getParameterSets(lookups: ParameterLookup[]) {
954+
return lookups.flatMap((lookup) => {
955+
if (JSON.stringify(lookup.values) == JSON.stringify(['stream.test', '1.test', null])) {
956+
return [];
957+
} else if (JSON.stringify(lookup.values) == JSON.stringify(['stream.test', '0.test', 'u1'])) {
958+
return [{ result: 'i1' }];
959+
} else if (JSON.stringify(lookup.values) == JSON.stringify(['stream.test', '1.test', 'myname'])) {
960+
return [{ result: 'i2' }];
961+
} else {
962+
throw new Error(`Unexpected lookup: ${JSON.stringify(lookup.values)}`);
963+
}
964+
});
965+
}
966+
967+
expect(
968+
await queryBucketIds(desc, {
969+
hydrationState,
970+
token: { sub: 'u1', is_admin: false },
971+
getParameterSets
972+
})
973+
).toStrictEqual(['stream|2.test[null]', 'stream|0.test["i1"]']);
974+
expect(
975+
await queryBucketIds(desc, {
976+
hydrationState,
977+
token: { sub: 'u1', is_admin: true },
978+
parameters: { comment_label: 'l1', issue_name: 'myname' },
979+
getParameterSets
980+
})
981+
).toStrictEqual(['stream|2.test["l1"]', 'stream|3.test[]', 'stream|0.test["i1"]', 'stream|1.test["i2"]']);
982+
});
881983
});
882984

883985
const USERS = new TestSourceTable('users');
@@ -941,24 +1043,28 @@ const options: StreamParseOptions = {
9411043
const hydrationParams: CreateSourceParams = { hydrationState: versionedHydrationState(1) };
9421044

9431045
function evaluateBucketIds(stream: SyncStream, sourceTable: SourceTableInterface, record: SqliteRow) {
944-
return debugHydratedMergedSource(stream, hydrationParams)
945-
.evaluateRow({ sourceTable, record })
946-
.map((r) => {
947-
if ('error' in r) {
948-
throw new Error(`Unexpected error evaluating row: ${r.error}`);
949-
}
1046+
return bucketIds(debugHydratedMergedSource(stream, hydrationParams).evaluateRow({ sourceTable, record }));
1047+
}
9501048

951-
return r.bucket;
952-
});
1049+
function bucketIds(result: EvaluationResult[]): string[] {
1050+
return result.map((r) => {
1051+
if ('error' in r) {
1052+
throw new Error(`Unexpected error evaluating row: ${r.error}`);
1053+
}
1054+
1055+
return r.bucket;
1056+
});
9531057
}
9541058

1059+
interface TestQuerierOptions {
1060+
token?: Record<string, any>;
1061+
parameters?: Record<string, any>;
1062+
getParameterSets?: (lookups: ParameterLookup[]) => SqliteJsonRow[];
1063+
hydrationState?: HydrationState;
1064+
}
9551065
async function createQueriers(
9561066
stream: SyncStream,
957-
options?: {
958-
token?: Record<string, any>;
959-
parameters?: Record<string, any>;
960-
getParameterSets?: (lookups: ParameterLookup[]) => SqliteJsonRow[];
961-
}
1067+
options?: TestQuerierOptions
9621068
): Promise<GetBucketParameterQuerierResult> {
9631069
const queriers: BucketParameterQuerier[] = [];
9641070
const errors: QuerierError[] = [];
@@ -977,20 +1083,17 @@ async function createQueriers(
9771083
};
9781084

9791085
for (let querier of stream.parameterQuerierSources) {
980-
querier.createParameterQuerierSource(hydrationParams).pushBucketParameterQueriers(pending, querierOptions);
1086+
querier
1087+
.createParameterQuerierSource(
1088+
options?.hydrationState ? { hydrationState: options.hydrationState } : hydrationParams
1089+
)
1090+
.pushBucketParameterQueriers(pending, querierOptions);
9811091
}
9821092

9831093
return { querier: mergeBucketParameterQueriers(queriers), errors };
9841094
}
9851095

986-
async function queryBucketIds(
987-
stream: SyncStream,
988-
options?: {
989-
token?: Record<string, any>;
990-
parameters?: Record<string, any>;
991-
getParameterSets?: (lookups: ParameterLookup[]) => SqliteJsonRow[];
992-
}
993-
) {
1096+
async function queryBucketIds(stream: SyncStream, options?: TestQuerierOptions) {
9941097
const { querier, errors } = await createQueriers(stream, options);
9951098
expect(errors).toHaveLength(0);
9961099

0 commit comments

Comments
 (0)