4040import java .net .URI ;
4141import java .net .URLDecoder ;
4242import java .time .Duration ;
43+ import java .time .LocalDateTime ;
4344import java .util .Collections ;
4445import java .util .List ;
4546import java .util .Map ;
4647import java .util .Optional ;
47- import java .util .concurrent .CopyOnWriteArrayList ;
48- import java .util .concurrent .ExecutionException ;
49- import java .util .concurrent .Executors ;
50- import java .util .concurrent .ScheduledExecutorService ;
51- import java .util .concurrent .ScheduledFuture ;
52- import java .util .concurrent .TimeoutException ;
48+ import java .util .concurrent .*;
5349import java .util .concurrent .atomic .AtomicBoolean ;
5450import java .util .concurrent .atomic .AtomicReference ;
5551import java .util .function .BiFunction ;
@@ -167,8 +163,11 @@ class StreamEnvironment implements Environment {
167163 }
168164 ScheduledExecutorService executorService ;
169165 if (scheduledExecutorService == null ) {
170- executorService =
171- Executors .newScheduledThreadPool (Runtime .getRuntime ().availableProcessors ());
166+ int threads = Runtime .getRuntime ().availableProcessors ();
167+ LOGGER .debug ("Creating scheduled executor service with {} thread(s)" , threads );
168+ ThreadFactory threadFactory =
169+ new Utils .NamedThreadFactory ("rabbitmq-stream-environment-scheduler-" );
170+ executorService = Executors .newScheduledThreadPool (threads , threadFactory );
172171 this .privateScheduleExecutorService = true ;
173172 } else {
174173 executorService = scheduledExecutorService ;
@@ -255,7 +254,9 @@ private ShutdownListener shutdownListener(
255254 shutdownContext -> {
256255 if (shutdownContext .isShutdownUnexpected ()) {
257256 locator .client (null );
258- LOGGER .debug ("Unexpected locator disconnection, trying to reconnect" );
257+ LOGGER .debug (
258+ "Unexpected locator disconnection for locator on '{}', trying to reconnect" ,
259+ locator .label ());
259260 try {
260261 Client .ClientParameters newLocatorParameters =
261262 this .locatorParametersCopy ().shutdownListener (shutdownListenerReference .get ());
@@ -292,6 +293,8 @@ private ShutdownListener shutdownListener(
292293 } catch (Exception e ) {
293294 LOGGER .debug ("Error while scheduling locator reconnection" , e );
294295 }
296+ } else {
297+ LOGGER .debug ("Locator connection '{}' closing normally" , locator .label ());
295298 }
296299 };
297300 shutdownListenerReference .set (shutdownListener );
@@ -705,9 +708,14 @@ static <T> T locatorOperation(
705708 executed = true ;
706709 break ;
707710 } catch (LocatorNotAvailableException e ) {
711+ Duration waitTime = backOffDelayPolicy .delay (attempt );
712+ LOGGER .debug (
713+ "No locator available for operation '{}', waiting for {} before retrying" ,
714+ operation ,
715+ waitTime );
708716 attempt ++;
709717 try {
710- Thread .sleep (backOffDelayPolicy . delay ( attempt ) .toMillis ());
718+ Thread .sleep (waitTime .toMillis ());
711719 } catch (InterruptedException ex ) {
712720 lastException = ex ;
713721 Thread .currentThread ().interrupt ();
@@ -876,14 +884,28 @@ private static class Locator {
876884
877885 private final Address address ;
878886 private volatile Optional <Client > client ;
887+ private volatile LocalDateTime lastChanged ;
879888
880889 private Locator (Address address ) {
881890 this .address = address ;
882891 this .client = Optional .empty ();
892+ lastChanged = LocalDateTime .now ();
893+ LOGGER .debug (
894+ "Locator wrapper '{}' created with no connection at {}" , this .label (), lastChanged );
883895 }
884896
885897 Locator client (Client client ) {
898+ Client previous = this .nullableClient ();
886899 this .client = Optional .ofNullable (client );
900+ LocalDateTime now = LocalDateTime .now ();
901+ LOGGER .debug (
902+ "Locator wrapper '{}' updated from {} to {}, last changed {}, {} ago" ,
903+ this .label (),
904+ previous ,
905+ client ,
906+ this .lastChanged ,
907+ Duration .between (this .lastChanged , now ));
908+ lastChanged = now ;
887909 return this ;
888910 }
889911
@@ -906,5 +928,14 @@ private Client nullableClient() {
906928 private Address address () {
907929 return this .address ;
908930 }
931+
932+ private String label () {
933+ return address .host () + ":" + address .port ();
934+ }
935+
936+ @ Override
937+ public String toString () {
938+ return "Locator{" + "address=" + address + ", client=" + client + '}' ;
939+ }
909940 }
910941}
0 commit comments