@@ -1179,6 +1179,88 @@ bucket_definitions:
11791179 } ) ;
11801180
11811181 testChecksumBatching ( generateStorageFactory ) ;
1182+
1183+ test . only ( 'empty checkpoints (1)' , async ( ) => {
1184+ await using factory = await generateStorageFactory ( ) ;
1185+ const syncRules = await factory . updateSyncRules ( {
1186+ content : `
1187+ bucket_definitions:
1188+ global:
1189+ data:
1190+ - SELECT id, description FROM "%"
1191+ `
1192+ } ) ;
1193+ const bucketStorage = factory . getInstance ( syncRules ) ;
1194+
1195+ await bucketStorage . startBatch ( test_utils . BATCH_OPTIONS , async ( batch ) => {
1196+ await batch . markAllSnapshotDone ( '1/1' ) ;
1197+ await batch . commit ( '1/1' ) ;
1198+
1199+ const cp1 = await bucketStorage . getCheckpoint ( ) ;
1200+ expect ( cp1 . lsn ) . toEqual ( '1/1' ) ;
1201+
1202+ await batch . commit ( '2/1' , { createEmptyCheckpoints : true } ) ;
1203+ const cp2 = await bucketStorage . getCheckpoint ( ) ;
1204+ expect ( cp2 . lsn ) . toEqual ( '2/1' ) ;
1205+
1206+ await batch . keepalive ( '3/1' ) ;
1207+ const cp3 = await bucketStorage . getCheckpoint ( ) ;
1208+ expect ( cp3 . lsn ) . toEqual ( '3/1' ) ;
1209+
1210+ // For the last one, we skip creating empty checkpoints
1211+ // This means the LSN stays at 3/1.
1212+ await batch . commit ( '4/1' , { createEmptyCheckpoints : false } ) ;
1213+ const cp4 = await bucketStorage . getCheckpoint ( ) ;
1214+ expect ( cp4 . lsn ) . toEqual ( '3/1' ) ;
1215+ } ) ;
1216+ } ) ;
1217+
1218+ test . only ( 'empty checkpoints (2)' , async ( ) => {
1219+ await using factory = await generateStorageFactory ( ) ;
1220+ const syncRules = await factory . updateSyncRules ( {
1221+ content : `
1222+ bucket_definitions:
1223+ global:
1224+ data:
1225+ - SELECT id, description FROM "%"
1226+ `
1227+ } ) ;
1228+ const bucketStorage = factory . getInstance ( syncRules ) ;
1229+
1230+ const sourceTable = TEST_TABLE ;
1231+ // We simulate two concurrent batches, but nesting is the easiest way to do this.
1232+ await bucketStorage . startBatch ( test_utils . BATCH_OPTIONS , async ( batch1 ) => {
1233+ await bucketStorage . startBatch ( test_utils . BATCH_OPTIONS , async ( batch2 ) => {
1234+ await batch1 . markAllSnapshotDone ( '1/1' ) ;
1235+ await batch1 . commit ( '1/1' ) ;
1236+
1237+ await batch1 . commit ( '2/1' , { createEmptyCheckpoints : false } ) ;
1238+ const cp2 = await bucketStorage . getCheckpoint ( ) ;
1239+ expect ( cp2 . lsn ) . toEqual ( '1/1' ) ; // checkpoint 2/1 skipped
1240+
1241+ await batch2 . save ( {
1242+ sourceTable,
1243+ tag : storage . SaveOperationTag . INSERT ,
1244+ after : {
1245+ id : 'test1' ,
1246+ description : 'test1a'
1247+ } ,
1248+ afterReplicaId : test_utils . rid ( 'test1' )
1249+ } ) ;
1250+ // This simulates what happens on a snapshot processor.
1251+ // This may later change to a flush() rather than commit().
1252+ await batch2 . commit ( test_utils . BATCH_OPTIONS . zeroLSN ) ;
1253+
1254+ const cp3 = await bucketStorage . getCheckpoint ( ) ;
1255+ expect ( cp3 . lsn ) . toEqual ( '1/1' ) ; // Still unchanged
1256+
1257+ // This now needs to advance the LSN, despite {createEmptyCheckpoints: false}
1258+ await batch1 . commit ( '4/1' , { createEmptyCheckpoints : false } ) ;
1259+ const cp4 = await bucketStorage . getCheckpoint ( ) ;
1260+ expect ( cp4 . lsn ) . toEqual ( '4/1' ) ;
1261+ } ) ;
1262+ } ) ;
1263+ } ) ;
11821264}
11831265
11841266/**
0 commit comments