1616
1717import static com .rabbitmq .stream .impl .Utils .*;
1818import static java .lang .String .format ;
19+ import static java .util .stream .Collectors .toList ;
1920
2021import com .rabbitmq .stream .*;
2122import com .rabbitmq .stream .Consumer ;
3536import java .util .Map .Entry ;
3637import java .util .NavigableSet ;
3738import java .util .Objects ;
38- import java .util .Random ;
3939import java .util .Set ;
4040import java .util .concurrent .Callable ;
4141import java .util .concurrent .ConcurrentHashMap ;
5353import org .slf4j .Logger ;
5454import org .slf4j .LoggerFactory ;
5555
56- class ConsumersCoordinator {
56+ final class ConsumersCoordinator implements AutoCloseable {
5757
5858 static final int MAX_SUBSCRIPTIONS_PER_CLIENT = 256 ;
5959 static final int MAX_ATTEMPT_BEFORE_FALLING_BACK_TO_LEADER = 5 ;
@@ -62,7 +62,6 @@ class ConsumersCoordinator {
6262 static final OffsetSpecification DEFAULT_OFFSET_SPECIFICATION = OffsetSpecification .next ();
6363
6464 private static final Logger LOGGER = LoggerFactory .getLogger (ConsumersCoordinator .class );
65- private final Random random = new Random ();
6665 private final StreamEnvironment environment ;
6766 private final ClientFactory clientFactory ;
6867 private final int maxConsumersByConnection ;
@@ -115,8 +114,8 @@ Runnable subscribe(
115114 return lock (
116115 this .coordinatorLock ,
117116 () -> {
118- List <Client . Broker > candidates = findBrokersForStream (stream , forceReplica );
119- Client . Broker newNode = pickBroker (candidates );
117+ List <BrokerWrapper > candidates = findCandidateNodes (stream , forceReplica );
118+ Broker newNode = pickBroker (this . brokerPicker , candidates );
120119 if (newNode == null ) {
121120 throw new IllegalStateException ("No available node to subscribe to" );
122121 }
@@ -161,7 +160,7 @@ Runnable subscribe(
161160
162161 private void addToManager (
163162 Broker node ,
164- List <Broker > candidates ,
163+ List <BrokerWrapper > candidates ,
165164 SubscriptionTracker tracker ,
166165 OffsetSpecification offsetSpecification ,
167166 boolean isInitialSubscription ) {
@@ -231,7 +230,7 @@ int managerCount() {
231230 }
232231
233232 // package protected for testing
234- List <Client . Broker > findBrokersForStream (String stream , boolean forceReplica ) {
233+ List <BrokerWrapper > findCandidateNodes (String stream , boolean forceReplica ) {
235234 LOGGER .debug (
236235 "Candidate lookup to consumer from '{}', forcing replica? {}" , stream , forceReplica );
237236 Map <String , Client .StreamMetadata > metadata =
@@ -254,12 +253,13 @@ List<Client.Broker> findBrokersForStream(String stream, boolean forceReplica) {
254253 }
255254 }
256255
257- List <Client .Broker > replicas = streamMetadata .getReplicas ();
258- if ((replicas == null || replicas .isEmpty ()) && streamMetadata .getLeader () == null ) {
256+ Broker leader = streamMetadata .getLeader ();
257+ List <Broker > replicas = streamMetadata .getReplicas ();
258+ if ((replicas == null || replicas .isEmpty ()) && leader == null ) {
259259 throw new IllegalStateException ("No node available to consume from stream " + stream );
260260 }
261261
262- List <Client . Broker > brokers ;
262+ List <BrokerWrapper > brokers ;
263263 if (replicas == null || replicas .isEmpty ()) {
264264 if (forceReplica ) {
265265 throw new IllegalStateException (
@@ -268,21 +268,26 @@ List<Client.Broker> findBrokersForStream(String stream, boolean forceReplica) {
268268 + "consuming from leader has been deactivated for this consumer" ,
269269 stream ));
270270 } else {
271- brokers = Collections .singletonList (streamMetadata .getLeader ());
272- LOGGER .debug (
273- "Only leader node {} for consuming from {}" , streamMetadata .getLeader (), stream );
271+ brokers = Collections .singletonList (new BrokerWrapper (leader , true ));
272+ LOGGER .debug ("Only leader node {} for consuming from {}" , leader , stream );
274273 }
275274 } else {
276275 LOGGER .debug ("Replicas for consuming from {}: {}" , stream , replicas );
277- brokers = new ArrayList <>(replicas );
276+ brokers =
277+ replicas .stream ()
278+ .map (b -> new BrokerWrapper (b , false ))
279+ .collect (Collectors .toCollection (ArrayList ::new ));
280+ if (!forceReplica && leader != null ) {
281+ brokers .add (new BrokerWrapper (leader , true ));
282+ }
278283 }
279284
280285 LOGGER .debug ("Candidates to consume from {}: {}" , stream , brokers );
281286
282287 return brokers ;
283288 }
284289
285- private Callable <List <Broker >> findBrokersForStream (String stream ) {
290+ private Callable <List <BrokerWrapper >> findCandidateNodes (String stream ) {
286291 AtomicInteger attemptNumber = new AtomicInteger ();
287292 return () -> {
288293 boolean mustUseReplica ;
@@ -294,20 +299,10 @@ private Callable<List<Broker>> findBrokersForStream(String stream) {
294299 }
295300 LOGGER .debug (
296301 "Looking for broker(s) for stream {}, forcing replica {}" , stream , mustUseReplica );
297- return findBrokersForStream (stream , mustUseReplica );
302+ return findCandidateNodes (stream , mustUseReplica );
298303 };
299304 }
300305
301- private Client .Broker pickBroker (List <Client .Broker > brokers ) {
302- if (brokers .isEmpty ()) {
303- return null ;
304- } else if (brokers .size () == 1 ) {
305- return brokers .get (0 );
306- } else {
307- return brokers .get (random .nextInt (brokers .size ()));
308- }
309- }
310-
311306 public void close () {
312307 Iterator <ClientSubscriptionsManager > iterator = this .managers .iterator ();
313308 while (iterator .hasNext ()) {
@@ -584,7 +579,9 @@ private class ClientSubscriptionsManager implements Comparable<ClientSubscriptio
584579 private final AtomicBoolean closed = new AtomicBoolean (false );
585580
586581 private ClientSubscriptionsManager (
587- Broker targetNode , List <Broker > candidates , Client .ClientParameters clientParameters ) {
582+ Broker targetNode ,
583+ List <BrokerWrapper > candidates ,
584+ Client .ClientParameters clientParameters ) {
588585 this .id = managerIdSequence .getAndIncrement ();
589586 this .trackerCount = 0 ;
590587 AtomicReference <String > nameReference = new AtomicReference <>();
@@ -804,7 +801,7 @@ private ClientSubscriptionsManager(
804801 .metadataListener (metadataListener )
805802 .consumerUpdateListener (consumerUpdateListener ),
806803 keyForNode (targetNode ),
807- candidates );
804+ candidates . stream (). map ( BrokerWrapper :: broker ). collect ( toList ()) );
808805 this .client = clientFactory .client (clientFactoryContext );
809806 this .node = brokerFromClient (this .client );
810807 this .name = keyForNode (this .node );
@@ -834,15 +831,15 @@ private void assignConsumersToStream(
834831 }
835832 };
836833
837- AsyncRetry .asyncRetry (findBrokersForStream (stream ))
834+ AsyncRetry .asyncRetry (findCandidateNodes (stream ))
838835 .description ("Candidate lookup to consume from '%s'" , stream )
839836 .scheduler (environment .scheduledExecutorService ())
840837 .retry (ex -> !(ex instanceof StreamDoesNotExistException ))
841838 .delayPolicy (delayPolicy )
842839 .build ()
843840 .thenAccept (
844841 candidateNodes -> {
845- List <Broker > candidates = candidateNodes ;
842+ List <BrokerWrapper > candidates = candidateNodes ;
846843 if (candidates == null ) {
847844 LOGGER .debug ("No candidate nodes to consume from '{}'" , stream );
848845 consumersClosingCallback .run ();
@@ -876,7 +873,8 @@ private List<SubscriptionTracker> createSubscriptionTrackerList() {
876873 return newSubscriptions ;
877874 }
878875
879- private void maybeRecoverSubscription (List <Broker > candidates , SubscriptionTracker tracker ) {
876+ private void maybeRecoverSubscription (
877+ List <BrokerWrapper > candidates , SubscriptionTracker tracker ) {
880878 if (tracker .compareAndSet (SubscriptionState .ACTIVE , SubscriptionState .RECOVERING )) {
881879 try {
882880 recoverSubscription (candidates , tracker );
@@ -897,12 +895,12 @@ private void maybeRecoverSubscription(List<Broker> candidates, SubscriptionTrack
897895 }
898896 }
899897
900- private void recoverSubscription (List <Broker > candidates , SubscriptionTracker tracker ) {
898+ private void recoverSubscription (List <BrokerWrapper > candidates , SubscriptionTracker tracker ) {
901899 boolean reassignmentCompleted = false ;
902900 while (!reassignmentCompleted ) {
903901 try {
904902 if (tracker .consumer .isOpen ()) {
905- Broker broker = pickBroker (candidates );
903+ Broker broker = pickBroker (brokerPicker , candidates );
906904 LOGGER .debug ("Using {} to resume consuming from {}" , broker , tracker .stream );
907905 synchronized (tracker .consumer ) {
908906 if (tracker .consumer .isOpen ()) {
@@ -933,7 +931,7 @@ private void recoverSubscription(List<Broker> candidates, SubscriptionTracker tr
933931 // maybe not a good candidate, let's refresh and retry for this one
934932 candidates =
935933 Utils .callAndMaybeRetry (
936- findBrokersForStream (tracker .stream ),
934+ findCandidateNodes (tracker .stream ),
937935 ex -> !(ex instanceof StreamDoesNotExistException ),
938936 recoveryBackOffDelayPolicy (),
939937 "Candidate lookup to consume from '%s' (subscription recovery)" ,
@@ -1301,4 +1299,20 @@ static <T> int pickSlot(List<T> list, AtomicInteger sequence) {
13011299 }
13021300 return index ;
13031301 }
1302+
1303+ private static List <Broker > keepReplicasIfPossible (Collection <BrokerWrapper > brokers ) {
1304+ if (brokers .size () > 1 ) {
1305+ return brokers .stream ()
1306+ .filter (w -> !w .isLeader ())
1307+ .map (BrokerWrapper ::broker )
1308+ .collect (toList ());
1309+ } else {
1310+ return brokers .stream ().map (BrokerWrapper ::broker ).collect (toList ());
1311+ }
1312+ }
1313+
1314+ static Broker pickBroker (
1315+ Function <List <Broker >, Broker > picker , Collection <BrokerWrapper > candidates ) {
1316+ return picker .apply (keepReplicasIfPossible (candidates ));
1317+ }
13041318}
0 commit comments