4141import java .util .concurrent .CountDownLatch ;
4242import java .util .concurrent .atomic .AtomicReference ;
4343import java .util .function .Consumer ;
44+ import java .util .function .Predicate ;
4445import java .util .function .Supplier ;
4546import java .util .stream .IntStream ;
4647import java .util .stream .Stream ;
@@ -57,6 +58,7 @@ public class AmqpInteroperabilityTest {
5758
5859 String stream ;
5960 TestUtils .ClientFactory cf ;
61+ String brokerVersion ;
6062
6163 static Stream <Codec > codecs () {
6264 return Stream .of (new QpidProtonCodec (), new SwiftMqCodec ());
@@ -67,14 +69,28 @@ private static HeaderTestConfiguration htc(
6769 return new HeaderTestConfiguration (headerValue , assertion );
6870 }
6971
72+ private static PropertiesTestConfiguration ptc (
73+ Predicate <String > condition ,
74+ Consumer <AMQP .BasicProperties .Builder > builder ,
75+ Consumer <Message > assertion ) {
76+ return new PropertiesTestConfiguration (builder , assertion , condition );
77+ }
78+
7079 private static PropertiesTestConfiguration ptc (
7180 Consumer <AMQP .BasicProperties .Builder > builder , Consumer <Message > assertion ) {
72- return new PropertiesTestConfiguration (builder , assertion );
81+ return new PropertiesTestConfiguration (builder , assertion , ignored -> true );
82+ }
83+
84+ static MessageOperation mo (
85+ Predicate <String > brokerVersionCondition ,
86+ Consumer <MessageBuilder > messageBuilderConsumer ,
87+ Consumer <Delivery > deliveryConsumer ) {
88+ return new MessageOperation (messageBuilderConsumer , deliveryConsumer , brokerVersionCondition );
7389 }
7490
7591 static MessageOperation mo (
7692 Consumer <MessageBuilder > messageBuilderConsumer , Consumer <Delivery > deliveryConsumer ) {
77- return new MessageOperation (messageBuilderConsumer , deliveryConsumer );
93+ return new MessageOperation (messageBuilderConsumer , deliveryConsumer , ignored -> true );
7894 }
7995
8096 @ ParameterizedTest
@@ -88,10 +104,15 @@ void publishToStreamQueueConsumeFromStream(Codec codec) throws Exception {
88104 () ->
89105 Stream .of (
90106 ptc (
107+ BEFORE_MESSAGE_CONTAINERS ,
91108 b -> b .appId ("application id" ),
92109 m ->
93110 assertThat (m .getApplicationProperties ().get ("x-basic-app-id" ))
94111 .isEqualTo ("application id" )),
112+ ptc (
113+ AFTER_MESSAGE_CONTAINERS ,
114+ b -> b .appId ("application id" ),
115+ m -> assertThat (m .getProperties ().getGroupId ()).isEqualTo ("application id" )),
95116 ptc (
96117 b -> b .contentEncoding ("content encoding" ),
97118 m ->
@@ -137,10 +158,17 @@ void publishToStreamQueueConsumeFromStream(Codec codec) throws Exception {
137158 * 1000 )), // in seconds in 091, in ms in 1.0, so losing some
138159 // precision
139160 ptc (
161+ BEFORE_MESSAGE_CONTAINERS ,
140162 b -> b .type ("the type" ),
141163 m ->
142164 assertThat (m .getApplicationProperties ().get ("x-basic-type" ))
143165 .isEqualTo ("the type" )),
166+ ptc (
167+ AFTER_MESSAGE_CONTAINERS ,
168+ b -> b .type ("the type" ),
169+ m ->
170+ assertThat (m .getMessageAnnotations ().get ("x-basic-type" ))
171+ .isEqualTo ("the type" )),
144172 ptc (
145173 b -> b .userId ("guest" ),
146174 m ->
@@ -199,6 +227,7 @@ void publishToStreamQueueConsumeFromStream(Codec codec) throws Exception {
199227 AMQP .BasicProperties .Builder builder = new AMQP .BasicProperties .Builder ();
200228 propertiesTestConfigurations
201229 .get ()
230+ .filter (configuration -> configuration .brokerVersionCondition .test (brokerVersion ))
202231 .forEach (configuration -> configuration .builder .accept (builder ));
203232
204233 AMQP .BasicProperties properties = builder .headers (headers ).build ();
@@ -234,7 +263,10 @@ void publishToStreamQueueConsumeFromStream(Codec codec) throws Exception {
234263 .forEach (i -> assertThat (messageBodies .contains ("amqp " + i )).isTrue ());
235264 Message message = messages .iterator ().next ();
236265
237- propertiesTestConfigurations .get ().forEach (c -> c .assertion .accept (message ));
266+ propertiesTestConfigurations
267+ .get ()
268+ .filter (c -> c .brokerVersionCondition .test (brokerVersion ))
269+ .forEach (c -> c .assertion .accept (message ));
238270
239271 assertThat (message .getMessageAnnotations ().get ("x-exchange" )).isEqualTo ("" );
240272 assertThat (message .getMessageAnnotations ().get ("x-routing-key" )).isEqualTo (stream );
@@ -285,6 +317,7 @@ void publishToStreamConsumeFromStreamQueue(Codec codec, TestInfo info) {
285317 LongStringHelper .asLongString (StringUtils .repeat ("*" , 300 )));
286318 }),
287319 mo (
320+ BEFORE_MESSAGE_CONTAINERS ,
288321 mb -> {
289322 mb .properties ().messageId (messageIdUuid );
290323 mb .properties ().correlationId (correlationIdUuid );
@@ -300,6 +333,19 @@ void publishToStreamConsumeFromStreamQueue(Codec codec, TestInfo info) {
300333 "x-correlation-id-type" , LongStringHelper .asLongString ("uuid" ));
301334 }),
302335 mo (
336+ AFTER_MESSAGE_CONTAINERS ,
337+ mb -> {
338+ mb .properties ().messageId (messageIdUuid );
339+ mb .properties ().correlationId (correlationIdUuid );
340+ },
341+ d -> {
342+ assertThat (d .getProperties ().getMessageId ())
343+ .isEqualTo ("urn:uuid:" + messageIdUuid );
344+ assertThat (d .getProperties ().getCorrelationId ())
345+ .isEqualTo ("urn:uuid:" + correlationIdUuid );
346+ }),
347+ mo (
348+ BEFORE_MESSAGE_CONTAINERS ,
303349 mb -> {
304350 mb .properties ().messageId (10 );
305351 mb .properties ().correlationId (20 );
@@ -314,6 +360,19 @@ void publishToStreamConsumeFromStreamQueue(Codec codec, TestInfo info) {
314360 "x-correlation-id-type" , LongStringHelper .asLongString ("ulong" ));
315361 }),
316362 mo (
363+ AFTER_MESSAGE_CONTAINERS ,
364+ mb -> {
365+ mb .properties ().messageId (10 );
366+ mb .properties ().correlationId (20 );
367+ },
368+ d -> {
369+ assertThat (d .getProperties ().getMessageId ()).isEqualTo ("10" );
370+ assertThat (d .getProperties ().getCorrelationId ()).isEqualTo ("20" );
371+ assertThat (d .getProperties ().getHeaders ())
372+ .doesNotContainKeys ("x-message-id-type" , "x-correlation-id-type" );
373+ }),
374+ mo (
375+ BEFORE_MESSAGE_CONTAINERS ,
317376 mb -> {
318377 mb .properties ().messageId ("the message ID" .getBytes (UTF8 ));
319378 mb .properties ().correlationId ("the correlation ID" .getBytes (UTF8 ));
@@ -329,6 +388,30 @@ void publishToStreamConsumeFromStreamQueue(Codec codec, TestInfo info) {
329388 .containsEntry (
330389 "x-correlation-id-type" , LongStringHelper .asLongString ("binary" ));
331390 }),
391+ mo (
392+ AFTER_MESSAGE_CONTAINERS ,
393+ mb -> {
394+ mb .properties ().messageId ("the message ID" .getBytes (UTF8 ));
395+ mb .properties ().correlationId ("the correlation ID" .getBytes (UTF8 ));
396+ },
397+ d -> {
398+ assertThat (d .getProperties ().getMessageId ()).isNull ();
399+ assertThat (d .getProperties ().getCorrelationId ()).isNull ();
400+ assertThat (
401+ d .getProperties ()
402+ .getHeaders ()
403+ .get ("x-message-id" )
404+ .toString ()
405+ .getBytes (UTF8 ))
406+ .isEqualTo ("the message ID" .getBytes (UTF8 ));
407+ assertThat (
408+ d .getProperties ()
409+ .getHeaders ()
410+ .get ("x-correlation-id" )
411+ .toString ()
412+ .getBytes (UTF8 ))
413+ .isEqualTo ("the correlation ID" .getBytes (UTF8 ));
414+ }),
332415 mo (
333416 mb -> {
334417 mb .properties ()
@@ -356,6 +439,8 @@ void publishToStreamConsumeFromStreamQueue(Codec codec, TestInfo info) {
356439
357440 testMessageOperations
358441 .get ()
442+ .filter (
443+ testMessageOperation -> testMessageOperation .brokerVersionCondition .test (brokerVersion ))
359444 .forEach (
360445 testMessageOperation -> {
361446 CountDownLatch confirmLatch = new CountDownLatch (messageCount );
@@ -476,6 +561,9 @@ void publishToStreamConsumeFromStreamQueue(Codec codec, TestInfo info) {
476561
477562 messageOperations
478563 .get ()
564+ .filter (
565+ messageOperation ->
566+ messageOperation .brokerVersionCondition .test (brokerVersion ))
479567 .forEach (
480568 messageOperation ->
481569 messageOperation .messageBuilderConsumer .accept (messageBuilder ));
@@ -518,6 +606,9 @@ void publishToStreamConsumeFromStreamQueue(Codec codec, TestInfo info) {
518606
519607 messageOperations
520608 .get ()
609+ .filter (
610+ messageOperation ->
611+ messageOperation .brokerVersionCondition .test (brokerVersion ))
521612 .forEach (messageOperation -> messageOperation .deliveryConsumer .accept (message ));
522613
523614 } catch (Exception e ) {
@@ -562,11 +653,15 @@ void messageWithEmptyBodyAndPropertiesShouldBeConvertedInAmqp(Codec codec) throw
562653 private static class PropertiesTestConfiguration {
563654 final Consumer <AMQP .BasicProperties .Builder > builder ;
564655 final Consumer <Message > assertion ;
656+ final Predicate <String > brokerVersionCondition ;
565657
566658 private PropertiesTestConfiguration (
567- Consumer <AMQP .BasicProperties .Builder > builder , Consumer <Message > assertion ) {
659+ Consumer <AMQP .BasicProperties .Builder > builder ,
660+ Consumer <Message > assertion ,
661+ Predicate <String > brokerVersionCondition ) {
568662 this .builder = builder ;
569663 this .assertion = assertion ;
664+ this .brokerVersionCondition = brokerVersionCondition ;
570665 }
571666 }
572667
@@ -584,11 +679,21 @@ private static class HeaderTestConfiguration {
584679 private static class MessageOperation {
585680 final Consumer <MessageBuilder > messageBuilderConsumer ;
586681 final Consumer <Delivery > deliveryConsumer ;
682+ final Predicate <String > brokerVersionCondition ;
587683
588684 MessageOperation (
589- Consumer <MessageBuilder > messageBuilderConsumer , Consumer <Delivery > deliveryConsumer ) {
685+ Consumer <MessageBuilder > messageBuilderConsumer ,
686+ Consumer <Delivery > deliveryConsumer ,
687+ Predicate <String > brokerVersionCondition ) {
590688 this .messageBuilderConsumer = messageBuilderConsumer ;
591689 this .deliveryConsumer = deliveryConsumer ;
690+ this .brokerVersionCondition = brokerVersionCondition ;
592691 }
593692 }
693+
694+ private static final Predicate <String > BEFORE_MESSAGE_CONTAINERS =
695+ TestUtils ::beforeMessageContainers ;
696+
697+ private static final Predicate <String > AFTER_MESSAGE_CONTAINERS =
698+ TestUtils ::afterMessageContainers ;
594699}
0 commit comments