Skip to content

Commit a609190

Browse files
committed
Handle AMQP Value in message body
Fixes #70
1 parent d86f4ed commit a609190

File tree

4 files changed

+93
-16
lines changed

4 files changed

+93
-16
lines changed

src/main/java/com/rabbitmq/stream/codec/QpidProtonCodec.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.UUID;
2525
import java.util.function.Function;
2626
import org.apache.qpid.proton.amqp.*;
27+
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
2728
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
2829
import org.apache.qpid.proton.amqp.messaging.Data;
2930
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
@@ -464,7 +465,20 @@ public long getPublishingId() {
464465

465466
@Override
466467
public byte[] getBodyAsBinary() {
467-
return message.getBody() == null ? null : ((Data) message.getBody()).getValue().getArray();
468+
if (message.getBody() == null) {
469+
return null;
470+
} else if (message.getBody() instanceof Data) {
471+
return ((Data) message.getBody()).getValue().getArray();
472+
} else if (message.getBody() instanceof AmqpValue) {
473+
AmqpValue value = (AmqpValue) message.getBody();
474+
if (value.getValue() instanceof byte[]) {
475+
return (byte[]) value.getValue();
476+
}
477+
}
478+
throw new IllegalStateException(
479+
"Body cannot by returned as array of bytes: "
480+
+ message.getBody()
481+
+ ". Use #getBody() to get native representation.");
468482
}
469483

470484
@Override

src/main/java/com/rabbitmq/stream/codec/SwiftMqCodec.java

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -395,12 +395,45 @@ public long getPublishingId() {
395395

396396
@Override
397397
public byte[] getBodyAsBinary() {
398-
return amqpMessage.getData().get(0).getValue();
398+
if (amqpMessage.getData() != null) {
399+
return amqpMessage.getData().get(0).getValue();
400+
} else if (amqpMessage.getAmqpValue() != null) {
401+
AMQPType value = amqpMessage.getAmqpValue().getValue();
402+
if (value instanceof AMQPBinary) {
403+
return ((AMQPBinary) value).getValue();
404+
} else if (value instanceof AMQPArray) {
405+
try {
406+
AMQPType[] array = ((AMQPArray) value).getValue();
407+
if (array.length > 0) {
408+
// far-fetch
409+
if (array[0] instanceof AMQPByte) {
410+
byte[] result = new byte[array.length];
411+
for (int i = 0; i < array.length; i++) {
412+
result[i] = ((AMQPByte) array[i]).getValue();
413+
}
414+
return result;
415+
}
416+
}
417+
} catch (IOException e) {
418+
throw new RuntimeException(e);
419+
}
420+
}
421+
} else if (amqpMessage.getData() == null && amqpMessage.getAmqpValue() == null) {
422+
return null;
423+
}
424+
throw new IllegalStateException(
425+
"Body cannot by returned as array of bytes. Use #getBody() to get native representation.");
399426
}
400427

401428
@Override
402429
public Object getBody() {
403-
return body;
430+
if (amqpMessage.getData() != null) {
431+
return amqpMessage.getData();
432+
} else if (amqpMessage.getAmqpValue() != null) {
433+
return amqpMessage.getAmqpValue();
434+
} else {
435+
return null;
436+
}
404437
}
405438

406439
@Override

src/test/java/com/rabbitmq/stream/codec/CodecsTest.java

Lines changed: 42 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,21 +19,28 @@
1919
import static org.mockito.Mockito.when;
2020

2121
import com.rabbitmq.stream.Codec;
22+
import com.rabbitmq.stream.Codec.EncodedMessage;
2223
import com.rabbitmq.stream.Message;
2324
import com.rabbitmq.stream.MessageBuilder;
2425
import com.rabbitmq.stream.amqp.UnsignedByte;
2526
import com.rabbitmq.stream.amqp.UnsignedInteger;
2627
import com.rabbitmq.stream.amqp.UnsignedLong;
2728
import com.rabbitmq.stream.amqp.UnsignedShort;
29+
import com.rabbitmq.stream.codec.QpidProtonCodec.QpidProtonAmqpMessageWrapper;
2830
import java.math.BigDecimal;
2931
import java.math.BigInteger;
3032
import java.nio.charset.Charset;
3133
import java.nio.charset.StandardCharsets;
32-
import java.util.*;
34+
import java.util.ArrayList;
35+
import java.util.Arrays;
36+
import java.util.Date;
37+
import java.util.List;
38+
import java.util.UUID;
3339
import java.util.function.Consumer;
3440
import java.util.function.Function;
3541
import java.util.function.Supplier;
3642
import java.util.stream.Stream;
43+
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
3744
import org.assertj.core.api.InstanceOfAssertFactories;
3845
import org.assertj.core.api.ThrowableAssert;
3946
import org.junit.jupiter.params.ParameterizedTest;
@@ -71,6 +78,18 @@ static Iterable<Codec> readCreatedMessage() {
7178
new SwiftMqCodec());
7279
}
7380

81+
static Stream<Codec> codecs() {
82+
return Stream.of(new QpidProtonCodec(), new SwiftMqCodec());
83+
}
84+
85+
static Stream<MessageBuilder> messageBuilders() {
86+
return Stream.of(
87+
new QpidProtonMessageBuilder(),
88+
new SwiftMqMessageBuilder(),
89+
new WrapperMessageBuilder(),
90+
new SimpleCodec().messageBuilder());
91+
}
92+
7493
@ParameterizedTest
7594
@MethodSource("codecsCouples")
7695
void codecs(CodecCouple codecCouple) {
@@ -493,12 +512,28 @@ void notSupportedTypes(Supplier<MessageBuilder> messageBuilderSupplier) {
493512
action -> assertThatThrownBy(action).isInstanceOf(UnsupportedOperationException.class));
494513
}
495514

496-
static Stream<MessageBuilder> messageBuilders() {
497-
return Stream.of(
498-
new QpidProtonMessageBuilder(),
499-
new SwiftMqMessageBuilder(),
500-
new WrapperMessageBuilder(),
501-
new SimpleCodec().messageBuilder());
515+
@ParameterizedTest
516+
@MethodSource("codecs")
517+
void supportAmqpValueBody(Codec codec) {
518+
Function<Object, Message> encodeDecode =
519+
content -> {
520+
org.apache.qpid.proton.message.Message nativeMessage =
521+
org.apache.qpid.proton.message.Message.Factory.create();
522+
nativeMessage.setBody(new AmqpValue(content));
523+
QpidProtonAmqpMessageWrapper wrapper =
524+
new QpidProtonAmqpMessageWrapper(true, 1L, nativeMessage);
525+
EncodedMessage encoded = new QpidProtonCodec().encode(wrapper);
526+
byte[] encodedData = new byte[encoded.getSize()];
527+
System.arraycopy(encoded.getData(), 0, encodedData, 0, encoded.getSize());
528+
Message decodedMessage = codec.decode(encodedData);
529+
return decodedMessage;
530+
};
531+
532+
Message m1 = encodeDecode.apply("hello".getBytes(StandardCharsets.UTF_8));
533+
assertThat(m1.getBodyAsBinary()).asString(StandardCharsets.UTF_8).isEqualTo("hello");
534+
535+
Message m2 = encodeDecode.apply("a string is not an array of byte");
536+
assertThatThrownBy(() -> m2.getBodyAsBinary()).isInstanceOf(IllegalStateException.class);
502537
}
503538

504539
@ParameterizedTest

src/test/java/com/rabbitmq/stream/perf/StreamPerfTestTest.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -318,12 +318,7 @@ void compressionWithoutSubEntriesShouldNotStart() throws Exception {
318318
void monitoringShouldReturnValidEndpoint() throws Exception {
319319
int monitoringPort = randomNetworkPort();
320320
Future<?> run =
321-
run(
322-
builder()
323-
.deleteStreams()
324-
.monitoring()
325-
.monitoringPort(monitoringPort)
326-
.prometheus());
321+
run(builder().deleteStreams().monitoring().monitoringPort(monitoringPort).prometheus());
327322
waitUntilStreamExists(s);
328323
waitOneSecond();
329324

0 commit comments

Comments
 (0)