Skip to content

Commit db690a2

Browse files
committed
Merge branch 'rabbitmq-server-5308-stream-command-versions-exchange'
2 parents 0a44469 + fda9a77 commit db690a2

File tree

6 files changed

+301
-22
lines changed

6 files changed

+301
-22
lines changed

src/main/java/com/rabbitmq/stream/Constants.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2020-2021 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
@@ -67,8 +67,10 @@ public final class Constants {
6767
public static final short COMMAND_HEARTBEAT = 23;
6868
public static final short COMMAND_ROUTE = 24;
6969
public static final short COMMAND_PARTITIONS = 25;
70+
public static final short COMMAND_EXCHANGE_COMMAND_VERSIONS = 27;
7071

7172
public static final short VERSION_1 = 1;
73+
public static final short VERSION_2 = 2;
7274

7375
private Constants() {}
7476
}

src/main/java/com/rabbitmq/stream/impl/Client.java

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import static com.rabbitmq.stream.Constants.COMMAND_DECLARE_PUBLISHER;
2020
import static com.rabbitmq.stream.Constants.COMMAND_DELETE_PUBLISHER;
2121
import static com.rabbitmq.stream.Constants.COMMAND_DELETE_STREAM;
22+
import static com.rabbitmq.stream.Constants.COMMAND_EXCHANGE_COMMAND_VERSIONS;
2223
import static com.rabbitmq.stream.Constants.COMMAND_HEARTBEAT;
2324
import static com.rabbitmq.stream.Constants.COMMAND_METADATA;
2425
import static com.rabbitmq.stream.Constants.COMMAND_OPEN;
@@ -61,6 +62,7 @@
6162
import com.rabbitmq.stream.compression.CompressionCodecFactory;
6263
import com.rabbitmq.stream.impl.Client.ShutdownContext.ShutdownReason;
6364
import com.rabbitmq.stream.impl.ServerFrameHandler.FrameHandler;
65+
import com.rabbitmq.stream.impl.ServerFrameHandler.FrameHandlerInfo;
6466
import com.rabbitmq.stream.metrics.MetricsCollector;
6567
import com.rabbitmq.stream.metrics.NoOpMetricsCollector;
6668
import com.rabbitmq.stream.sasl.CredentialsProvider;
@@ -1302,7 +1304,7 @@ public List<String> partitions(String superStream) {
13021304
throw new IllegalArgumentException("stream must not be null");
13031305
}
13041306
int length =
1305-
2 + 2 + 4 + +2 + superStream.length(); // API code, version, correlation ID, 1 string
1307+
2 + 2 + 4 + 2 + superStream.length(); // API code, version, correlation ID, 1 string
13061308
int correlationId = correlationSequence.incrementAndGet();
13071309
try {
13081310
ByteBuf bb = allocate(length + 4);
@@ -1323,6 +1325,35 @@ public List<String> partitions(String superStream) {
13231325
}
13241326
}
13251327

1328+
List<FrameHandlerInfo> exchangeCommandVersions() {
1329+
List<FrameHandlerInfo> commandVersions = ServerFrameHandler.commandVersions();
1330+
int length = 2 + 2 + 4 + 4; // API code, version, correlation ID, array size
1331+
length += commandVersions.size() * (2 + 2 + 2);
1332+
int correlationId = correlationSequence.incrementAndGet();
1333+
try {
1334+
ByteBuf bb = allocate(length + 4);
1335+
bb.writeInt(length);
1336+
bb.writeShort(encodeRequestCode(COMMAND_EXCHANGE_COMMAND_VERSIONS));
1337+
bb.writeShort(VERSION_1);
1338+
bb.writeInt(correlationId);
1339+
bb.writeInt(commandVersions.size());
1340+
for (FrameHandlerInfo commandVersion : commandVersions) {
1341+
bb.writeShort(commandVersion.getKey());
1342+
bb.writeShort(commandVersion.getMinVersion());
1343+
bb.writeShort(commandVersion.getMaxVersion());
1344+
}
1345+
OutstandingRequest<List<FrameHandlerInfo>> request =
1346+
new OutstandingRequest<>(this.rpcTimeout);
1347+
outstandingRequests.put(correlationId, request);
1348+
channel.writeAndFlush(bb);
1349+
request.block();
1350+
return request.response.get();
1351+
} catch (RuntimeException e) {
1352+
outstandingRequests.remove(correlationId);
1353+
throw new StreamException(e);
1354+
}
1355+
}
1356+
13261357
void shutdownReason(ShutdownReason reason) {
13271358
this.shutdownReason = reason;
13281359
}

0 commit comments

Comments
 (0)