@@ -315,13 +315,74 @@ bucket_definitions:
315315 await pool . query ( `UPDATE test_data SET description = 'updated'` ) ;
316316 await pool . query ( 'CREATE PUBLICATION powersync FOR ALL TABLES' ) ;
317317
318+ const serverVersion = await context . connectionManager . getServerVersion ( ) ;
319+
318320 await context . loadActiveSyncRules ( ) ;
319- await expect ( async ( ) => {
321+
322+ if ( serverVersion ! . compareMain ( '18.0.0' ) >= 0 ) {
320323 await context . replicateSnapshot ( ) ;
321- } ) . rejects . toThrowError ( MissingReplicationSlotError ) ;
324+ // No error expected in Postres 18
325+ // TODO: introduce new test scenario for Postgres 18 that _does_ invalidate the replication slot.
326+ } else {
327+ // Postgres < 18 invalidates the replication slot when the publication is re-created.
328+ // The error is handled on a higher level, which triggers
329+ // creating a new replication slot.
330+ await expect ( async ( ) => {
331+ await context . replicateSnapshot ( ) ;
332+ } ) . rejects . toThrowError ( MissingReplicationSlotError ) ;
333+ }
334+ }
335+ } ) ;
336+
337+ test ( 'dropped replication slot' , async ( ) => {
338+ {
339+ await using context = await WalStreamTestContext . open ( factory ) ;
340+ const { pool } = context ;
341+ await context . updateSyncRules ( `
342+ bucket_definitions:
343+ global:
344+ data:
345+ - SELECT id, description FROM "test_data"` ) ;
346+
347+ await pool . query (
348+ `CREATE TABLE test_data(id uuid primary key default uuid_generate_v4(), description text, num int8)`
349+ ) ;
350+ await pool . query (
351+ `INSERT INTO test_data(id, description) VALUES('8133cd37-903b-4937-a022-7c8294015a3a', 'test1') returning id as test_id`
352+ ) ;
353+ await context . replicateSnapshot ( ) ;
354+ await context . startStreaming ( ) ;
355+
356+ const data = await context . getBucketData ( 'global[]' ) ;
357+
358+ expect ( data ) . toMatchObject ( [
359+ putOp ( 'test_data' , {
360+ id : '8133cd37-903b-4937-a022-7c8294015a3a' ,
361+ description : 'test1'
362+ } )
363+ ] ) ;
364+
365+ expect ( await context . storage ! . getStatus ( ) ) . toMatchObject ( { active : true , snapshot_done : true } ) ;
366+ }
367+
368+ {
369+ await using context = await WalStreamTestContext . open ( factory , { doNotClear : true } ) ;
370+ const { pool } = context ;
371+ const storage = await context . factory . getActiveStorage ( ) ;
372+
373+ // Here we explicitly drop the replication slot, which should always be handled.
374+ await pool . query ( {
375+ statement : `SELECT pg_drop_replication_slot($1)` ,
376+ params : [ { type : 'varchar' , value : storage ?. slot_name ! } ]
377+ } ) ;
378+
379+ await context . loadActiveSyncRules ( ) ;
322380
323381 // The error is handled on a higher level, which triggers
324382 // creating a new replication slot.
383+ await expect ( async ( ) => {
384+ await context . replicateSnapshot ( ) ;
385+ } ) . rejects . toThrowError ( MissingReplicationSlotError ) ;
325386 }
326387 } ) ;
327388
0 commit comments