Skip to content

Commit 20282d9

Browse files
authored
Merge pull request #437 from rabbitmq/do-not-unsubscribe-with-closed-connection
Do not unsubscribe with closed client
2 parents 3f51f48 + 3fa5a50 commit 20282d9

File tree

6 files changed

+55
-8
lines changed

6 files changed

+55
-8
lines changed

src/main/java/com/rabbitmq/stream/impl/Client.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2029,8 +2029,14 @@ public void done() {
20292029
}
20302030
}
20312031

2032+
static Response responseOk() {
2033+
return Response.OK;
2034+
}
2035+
20322036
public static class Response {
20332037

2038+
private static final Response OK = new Response(RESPONSE_CODE_OK);
2039+
20342040
private final short responseCode;
20352041

20362042
public Response(short responseCode) {

src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -814,6 +814,10 @@ private void assignConsumersToStream(
814814
boolean maybeCloseClient) {
815815
Runnable consumersClosingCallback =
816816
() -> {
817+
LOGGER.debug(
818+
"Running consumer closing callback after recovery failure, "
819+
+ "closing {} subscription(s)",
820+
subscriptions.size());
817821
for (SubscriptionTracker affectedSubscription : subscriptions) {
818822
try {
819823
affectedSubscription.consumer.closeAfterStreamDeletion();
@@ -1078,7 +1082,13 @@ synchronized void remove(SubscriptionTracker subscriptionTracker) {
10781082
try {
10791083
Client.Response unsubscribeResponse =
10801084
Utils.callAndMaybeRetry(
1081-
() -> client.unsubscribe(subscriptionIdInClient),
1085+
() -> {
1086+
if (client.isOpen()) {
1087+
return client.unsubscribe(subscriptionIdInClient);
1088+
} else {
1089+
return Client.responseOk();
1090+
}
1091+
},
10821092
RETRY_ON_TIMEOUT,
10831093
"Unsubscribe request for consumer %d on stream '%s'",
10841094
subscriptionTracker.consumer.id(),

src/main/java/com/rabbitmq/stream/impl/Utils.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,7 @@ static <T> T callAndMaybeRetry(
230230
while (keepTrying) {
231231
try {
232232
attempt++;
233+
LOGGER.debug("Starting attempt #{} for operation '{}'", attempt, description);
233234
T result = operation.call();
234235
Duration operationDuration = Duration.ofNanos(System.nanoTime() - startTime);
235236
LOGGER.debug(

src/test/java/com/rabbitmq/stream/impl/ConsumersCoordinatorTest.java

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,7 @@
2929
import static org.mockito.ArgumentMatchers.anyMap;
3030
import static org.mockito.ArgumentMatchers.anyString;
3131
import static org.mockito.ArgumentMatchers.isNull;
32-
import static org.mockito.Mockito.mock;
33-
import static org.mockito.Mockito.times;
34-
import static org.mockito.Mockito.verify;
35-
import static org.mockito.Mockito.when;
32+
import static org.mockito.Mockito.*;
3633

3734
import com.rabbitmq.stream.*;
3835
import com.rabbitmq.stream.codec.WrapperMessageBuilder;
@@ -459,6 +456,42 @@ void subscribeShouldSubscribeToStreamAndDispatchMessage_UnsubscribeShouldUnsubsc
459456
assertThat(messageHandlerCalls.get()).isEqualTo(1);
460457
}
461458

459+
@Test
460+
void shouldNotUnsubscribeIfClientIsClosed() {
461+
when(locator.metadata("stream")).thenReturn(metadata(null, replicas()));
462+
463+
when(clientFactory.client(any())).thenReturn(client);
464+
when(client.subscribe(
465+
subscriptionIdCaptor.capture(),
466+
anyString(),
467+
any(OffsetSpecification.class),
468+
anyInt(),
469+
anyMap()))
470+
.thenReturn(new Client.Response(Constants.RESPONSE_CODE_OK));
471+
472+
Runnable closingRunnable =
473+
coordinator.subscribe(
474+
consumer,
475+
"stream",
476+
OffsetSpecification.first(),
477+
null,
478+
NO_OP_SUBSCRIPTION_LISTENER,
479+
() -> {},
480+
(offset, message) -> {},
481+
Collections.emptyMap(),
482+
flowStrategy());
483+
verify(clientFactory, times(1)).client(any());
484+
verify(client, times(1))
485+
.subscribe(anyByte(), anyString(), any(OffsetSpecification.class), anyInt(), anyMap());
486+
487+
when(client.isOpen()).thenReturn(false);
488+
when(client.unsubscribe(subscriptionIdCaptor.getValue()))
489+
.thenReturn(new Client.Response(Constants.RESPONSE_CODE_OK));
490+
491+
closingRunnable.run();
492+
verify(client, never()).unsubscribe(subscriptionIdCaptor.getValue());
493+
}
494+
462495
@Test
463496
void subscribeShouldSubscribeToStreamAndDispatchMessageWithManySubscriptions() {
464497
when(locator.metadata("stream")).thenReturn(metadata(leader(), null));

src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -357,7 +357,6 @@ void consumerShouldBeClosedWhenStreamGetsDeleted(TestInfo info) throws Exception
357357
environment.deleteStream(s);
358358

359359
TestUtils.waitAtMost(10, () -> !consumer.isOpen());
360-
361360
assertThat(consumer.isOpen()).isFalse();
362361
}
363362

src/test/resources/logback-test.xml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,8 @@
66
</appender>
77

88
<logger name="com.rabbitmq.stream" level="warn" />
9-
<logger name="com.rabbitmq.stream.perf" level="info" />
109

1110
<logger name="com.rabbitmq.stream.perf.Version" level="error" />
12-
<logger name="org.eclipse.jetty" level="warn" />
1311

1412
<root level="info">
1513
<appender-ref ref="STDOUT" />

0 commit comments

Comments
 (0)