@@ -327,10 +327,7 @@ VALUES(10, ARRAY['null']::TEXT[]);
327327
328328 await insert ( db ) ;
329329
330- const transformed = [
331- ...WalStream . getQueryData ( pgwire . pgwireRows ( await db . query ( `SELECT * FROM test_data ORDER BY id` ) ) )
332- ] ;
333-
330+ const transformed = await queryAll ( db , `SELECT * FROM test_data ORDER BY id` ) ;
334331 checkResults ( transformed ) ;
335332 } finally {
336333 await db . end ( ) ;
@@ -346,17 +343,11 @@ VALUES(10, ARRAY['null']::TEXT[]);
346343
347344 await insert ( db ) ;
348345
349- const transformed = [
350- ...WalStream . getQueryData (
351- pgwire . pgwireRows (
352- await db . query ( {
353- statement : `SELECT * FROM test_data WHERE $1 ORDER BY id` ,
354- params : [ { type : 'bool' , value : true } ]
355- } )
356- )
357- )
358- ] ;
359-
346+ const raw = await db . query ( {
347+ statement : `SELECT * FROM test_data WHERE $1 ORDER BY id` ,
348+ params : [ { type : 'bool' , value : true } ]
349+ } ) ;
350+ const transformed = await interpretResults ( db , raw ) ;
360351 checkResults ( transformed ) ;
361352 } finally {
362353 await db . end ( ) ;
@@ -370,9 +361,9 @@ VALUES(10, ARRAY['null']::TEXT[]);
370361
371362 await insertArrays ( db ) ;
372363
373- const transformed = [
374- ... WalStream . getQueryData ( pgwire . pgwireRows ( await db . query ( `SELECT * FROM test_data_arrays ORDER BY id` ) ) )
375- ] . map ( ( e ) => applyRowContext ( e , CompatibilityContext . FULL_BACKWARDS_COMPATIBILITY ) ) ;
364+ const transformed = ( await queryAll ( db , `SELECT * FROM test_data_arrays ORDER BY id` ) ) . map ( ( e ) =>
365+ applyRowContext ( e , CompatibilityContext . FULL_BACKWARDS_COMPATIBILITY )
366+ ) ;
376367
377368 checkResultArrays ( transformed ) ;
378369 } finally {
@@ -465,19 +456,15 @@ VALUES(10, ARRAY['null']::TEXT[]);
465456 } ) ;
466457
467458 test ( 'date formats' , async ( ) => {
468- const db = await connectPgWire ( ) ;
459+ const db = await connectPgPool ( ) ;
469460 try {
470461 await setupTable ( db ) ;
471462
472463 await db . query ( `
473464INSERT INTO test_data(id, time, timestamp, timestamptz) VALUES (1, '17:42:01.12', '2023-03-06 15:47:12.4', '2023-03-06 15:47+02');
474465` ) ;
475466
476- const [ row ] = [
477- ...WalStream . getQueryData (
478- pgwire . pgwireRows ( await db . query ( `SELECT time, timestamp, timestamptz FROM test_data` ) )
479- )
480- ] ;
467+ const [ row ] = await queryAll ( db , `SELECT time, timestamp, timestamptz FROM test_data` ) ;
481468
482469 const oldFormat = applyRowContext ( row , CompatibilityContext . FULL_BACKWARDS_COMPATIBILITY ) ;
483470 expect ( oldFormat ) . toMatchObject ( {
@@ -515,17 +502,18 @@ INSERT INTO test_data(id, time, timestamp, timestamptz) VALUES (1, '17:42:01.12'
515502 try {
516503 await clearTestDb ( db ) ;
517504 await db . query ( `CREATE DOMAIN rating_value AS FLOAT CHECK (VALUE BETWEEN 0 AND 5);` ) ;
518- await db . query ( `CREATE TYPE composite AS (foo rating_value[], bar TEXT);` ) ;
519- await db . query ( `CREATE TYPE nested_composite AS (a BOOLEAN, b composite);` ) ;
520505 await db . query ( `CREATE TYPE mood AS ENUM ('sad', 'ok', 'happy')` ) ;
506+ await db . query ( `CREATE TYPE composite AS (foo rating_value[], bar TEXT, mood mood);` ) ;
507+ await db . query ( `CREATE TYPE nested_composite AS (a BOOLEAN, b composite);` ) ;
521508
522509 await db . query ( `CREATE TABLE test_custom(
523510 id serial primary key,
524511 rating rating_value,
525512 composite composite,
526513 nested_composite nested_composite,
527514 boxes box[],
528- mood mood
515+ mood mood,
516+ moods mood[]
529517 );` ) ;
530518
531519 const slotName = 'test_slot' ;
@@ -542,13 +530,14 @@ INSERT INTO test_data(id, time, timestamp, timestamptz) VALUES (1, '17:42:01.12'
542530
543531 await db . query ( `
544532 INSERT INTO test_custom
545- (rating, composite, nested_composite, boxes, mood)
533+ (rating, composite, nested_composite, boxes, mood, moods )
546534 VALUES (
547535 1,
548- (ARRAY[2,3], 'bar'),
549- (TRUE, (ARRAY[2,3], 'bar')),
536+ (ARRAY[2,3], 'bar', 'sad'::mood ),
537+ (TRUE, (ARRAY[2,3], 'bar', 'sad'::mood )),
550538 ARRAY[box(point '(1,2)', point '(3,4)'), box(point '(5, 6)', point '(7,8)')],
551- 'happy'
539+ 'happy',
540+ ARRAY['sad'::mood, 'happy'::mood]
552541 );
553542 ` ) ;
554543
@@ -562,27 +551,53 @@ INSERT INTO test_data(id, time, timestamp, timestamptz) VALUES (1, '17:42:01.12'
562551 } ) ;
563552
564553 const [ transformed ] = await getReplicationTx ( db , replicationStream ) ;
554+ const [ queried ] = await queryAll ( db , `SELECT * FROM test_custom` ) ;
565555 await pg . end ( ) ;
566556
567- const oldFormat = applyRowContext ( transformed , CompatibilityContext . FULL_BACKWARDS_COMPATIBILITY ) ;
568- expect ( oldFormat ) . toMatchObject ( {
557+ const oldFormatStreamed = applyRowContext ( transformed , CompatibilityContext . FULL_BACKWARDS_COMPATIBILITY ) ;
558+ expect ( oldFormatStreamed ) . toMatchObject ( {
569559 rating : '1' ,
570- composite : '("{2,3}",bar)' ,
571- nested_composite : '(t,"(""{2,3}"",bar)")' ,
560+ composite : '("{2,3}",bar,sad)' ,
561+ nested_composite : '(t,"(""{2,3}"",bar,sad)")' ,
562+ boxes : '["(3","4)","(1","2);(7","8)","(5","6)"]' ,
563+ mood : 'happy' ,
564+ moods : '{sad,happy}'
565+ } ) ;
566+
567+ const oldFormatQueried = applyRowContext ( queried , CompatibilityContext . FULL_BACKWARDS_COMPATIBILITY ) ;
568+ expect ( oldFormatQueried ) . toMatchObject ( {
569+ rating : 1 ,
570+ composite : '("{2,3}",bar,sad)' ,
571+ nested_composite : '(t,"(""{2,3}"",bar,sad)")' ,
572572 boxes : '["(3","4)","(1","2);(7","8)","(5","6)"]' ,
573- mood : 'happy'
573+ mood : 'happy' ,
574+ moods : '{sad,happy}'
574575 } ) ;
575576
576- const newFormat = applyRowContext (
577+ const newFormatStreamed = applyRowContext (
577578 transformed ,
578579 new CompatibilityContext ( { edition : CompatibilityEdition . SYNC_STREAMS } )
579580 ) ;
580- expect ( newFormat ) . toMatchObject ( {
581+ expect ( newFormatStreamed ) . toMatchObject ( {
581582 rating : 1 ,
582- composite : '{"foo":[2.0,3.0],"bar":"bar"}' ,
583- nested_composite : '{"a":1,"b":{"foo":[2.0,3.0],"bar":"bar"}}' ,
583+ composite : '{"foo":[2.0,3.0],"bar":"bar","mood":"sad" }' ,
584+ nested_composite : '{"a":1,"b":{"foo":[2.0,3.0],"bar":"bar","mood":"sad" }}' ,
584585 boxes : JSON . stringify ( [ '(3,4),(1,2)' , '(7,8),(5,6)' ] ) ,
585- mood : 'happy'
586+ mood : 'happy' ,
587+ moods : '["sad","happy"]'
588+ } ) ;
589+
590+ const newFormatQueried = applyRowContext (
591+ queried ,
592+ new CompatibilityContext ( { edition : CompatibilityEdition . SYNC_STREAMS } )
593+ ) ;
594+ expect ( newFormatQueried ) . toMatchObject ( {
595+ rating : 1 ,
596+ composite : '{"foo":[2.0,3.0],"bar":"bar","mood":"sad"}' ,
597+ nested_composite : '{"a":1,"b":{"foo":[2.0,3.0],"bar":"bar","mood":"sad"}}' ,
598+ boxes : JSON . stringify ( [ '(3,4),(1,2)' , '(7,8),(5,6)' ] ) ,
599+ mood : 'happy' ,
600+ moods : '["sad","happy"]'
586601 } ) ;
587602 } finally {
588603 await db . end ( ) ;
@@ -635,18 +650,36 @@ INSERT INTO test_data(id, time, timestamp, timestamptz) VALUES (1, '17:42:01.12'
635650 } ) ;
636651
637652 const [ transformed ] = await getReplicationTx ( db , replicationStream ) ;
653+ const [ queried ] = await queryAll ( db , `SELECT ranges FROM test_custom` ) ;
638654 await pg . end ( ) ;
639655
640- const oldFormat = applyRowContext ( transformed , CompatibilityContext . FULL_BACKWARDS_COMPATIBILITY ) ;
641- expect ( oldFormat ) . toMatchObject ( {
656+ const oldFormatStreamed = applyRowContext ( transformed , CompatibilityContext . FULL_BACKWARDS_COMPATIBILITY ) ;
657+ expect ( oldFormatStreamed ) . toMatchObject ( {
658+ ranges : '{"{[2,4),[6,8)}"}'
659+ } ) ;
660+ const oldFormatQueried = applyRowContext ( queried , CompatibilityContext . FULL_BACKWARDS_COMPATIBILITY ) ;
661+ expect ( oldFormatQueried ) . toMatchObject ( {
642662 ranges : '{"{[2,4),[6,8)}"}'
643663 } ) ;
644664
645- const newFormat = applyRowContext (
665+ const newFormatStreamed = applyRowContext (
646666 transformed ,
647667 new CompatibilityContext ( { edition : CompatibilityEdition . SYNC_STREAMS } )
648668 ) ;
649- expect ( newFormat ) . toMatchObject ( {
669+ expect ( newFormatStreamed ) . toMatchObject ( {
670+ ranges : JSON . stringify ( [
671+ [
672+ { lower : 2 , upper : 4 , lower_exclusive : 0 , upper_exclusive : 1 } ,
673+ { lower : 6 , upper : 8 , lower_exclusive : 0 , upper_exclusive : 1 }
674+ ]
675+ ] )
676+ } ) ;
677+
678+ const newFormatQueried = applyRowContext (
679+ queried ,
680+ new CompatibilityContext ( { edition : CompatibilityEdition . SYNC_STREAMS } )
681+ ) ;
682+ expect ( newFormatQueried ) . toMatchObject ( {
650683 ranges : JSON . stringify ( [
651684 [
652685 { lower : 2 , upper : 4 , lower_exclusive : 0 , upper_exclusive : 1 } ,
@@ -679,3 +712,18 @@ async function getReplicationTx(db: pgwire.PgClient, replicationStream: pgwire.R
679712 }
680713 return transformed ;
681714}
715+
716+ /**
717+ * Simulates what WalStream does for initial snapshots.
718+ */
719+ async function queryAll ( db : pgwire . PgClient , sql : string ) {
720+ const raw = await db . query ( sql ) ;
721+ return await interpretResults ( db , raw ) ;
722+ }
723+
724+ async function interpretResults ( db : pgwire . PgClient , results : pgwire . PgResult ) {
725+ const typeCache = new PostgresTypeResolver ( db ) ;
726+ await typeCache . fetchTypesForSchema ( ) ;
727+
728+ return results . rows . map ( ( row ) => WalStream . decodeRow ( row , typeCache ) ) ;
729+ }
0 commit comments