@@ -304,7 +304,8 @@ lookup_dlx(#state{exchange_ref = DLXRef} = State0) ->
304304 State = log_missing_dlx_once (State0 ),
305305 {not_found , State };
306306 {ok , X } ->
307- {X , State0 }
307+ State = clear_log_missing_dlx_once (State0 ),
308+ {X , State }
308309 end .
309310
310311-spec forward (mc :state (), non_neg_integer (), rabbit_amqqueue :name (),
@@ -348,7 +349,7 @@ forward(ConsumedMsg, ConsumedMsgId, ConsumedQRef, DLX, Reason,
348349 [] ->
349350 log_no_route_once (State1 );
350351 _ ->
351- State1
352+ clear_log_no_route_once ( State1 )
352353 end ,
353354 {RouteToQs , State2 }
354355 end ,
@@ -503,8 +504,9 @@ redeliver0(#pending{delivery = Msg0,
503504 % % Routes changed dynamically so that we don't await any publisher confirms anymore.
504505 % % Since we also received at least one publisher confirm (mandatory flag semantics),
505506 % % we can ack the message to the source quorum queue.
506- State0 # state {pendings = maps :remove (OutSeq , Pendings ),
507- settled_ids = [ConsumedId | SettledIds ]};
507+ State = State0 # state {pendings = maps :remove (OutSeq , Pendings ),
508+ settled_ids = [ConsumedId | SettledIds ]},
509+ clear_log_no_route_once (State );
508510 _ ->
509511 % % Do not redeliver message to a target queue
510512 % % 1. for which we already received a publisher confirm, or
@@ -517,7 +519,7 @@ redeliver0(#pending{delivery = Msg0,
517519 State1 = log_cycles (Cycles , DLRKeys , State0 ),
518520 case RouteToQs of
519521 [] ->
520- State1 ;
522+ log_no_route_once ( State1 ) ;
521523 _ ->
522524 Pend = Pend0 # pending {publish_count = PublishCount + 1 ,
523525 last_published_at = os :system_time (millisecond ),
@@ -527,7 +529,7 @@ redeliver0(#pending{delivery = Msg0,
527529 % % Any target queue that rejected previously and still need
528530 % % to be routed to is moved back to 'unsettled'.
529531 rejected = []},
530- State = State0 # state {pendings = maps :update (OutSeq , Pend , Pendings )},
532+ State = clear_log_no_route_once ( State0 # state {pendings = maps :update (OutSeq , Pend , Pendings )}) ,
531533 Options = #{correlation => OutSeq },
532534 deliver_to_queues (Msg ,
533535 Options ,
@@ -637,6 +639,19 @@ log_missing_dlx_once(#state{exchange_ref = DlxResource,
637639 [rabbit_misc :rs (QueueResource ), rabbit_misc :rs (DlxResource )]),
638640 State # state {logged = maps :put (missing_dlx , DlxResource , Logged )}.
639641
642+ clear_log_missing_dlx_once (# state {exchange_ref = DlxResource ,
643+ queue_ref = QueueResource ,
644+ pendings = Pendings ,
645+ logged = #{missing_dlx := MissingDlx } = Logged } = State ) ->
646+ ? LOG_INFO (" Dead-letter-exchange ~ts found for quorum ~ts . Forwarding was previously "
647+ " blocked since the configured dead-letter-exchange ~ts could not be found. "
648+ " Forwarding of ~b pending dead-letter messages will be attempted." ,
649+ [rabbit_misc :rs (DlxResource ), rabbit_misc :rs (QueueResource ),
650+ rabbit_misc :rs (MissingDlx ), maps :size (Pendings )]),
651+ State # state {logged = maps :remove (missing_dlx , Logged )};
652+ clear_log_missing_dlx_once (State ) ->
653+ State .
654+
640655log_no_route_once (# state {exchange_ref = SameDlx ,
641656 routing_key = SameRoutingKey ,
642657 logged = #{no_route := {SameDlx , SameRoutingKey }}} = State ) ->
@@ -657,6 +672,22 @@ log_no_route_once(#state{queue_ref = QueueResource,
657672 [rabbit_misc :rs (QueueResource ), rabbit_misc :rs (DlxResource ), RoutingKey ]),
658673 State # state {logged = maps :put (no_route , {DlxResource , RoutingKey }, Logged )}.
659674
675+ clear_log_no_route_once (# state {exchange_ref = DlxResource ,
676+ routing_key = RoutingKey ,
677+ queue_ref = QueueResource ,
678+ pendings = Pendings ,
679+ logged = #{no_route := {OldDlx , OldRoutingKey }} = Logged } = State ) ->
680+ ? LOG_INFO (" Discovered a route to forward dead-letter messages from quorum ~ts on "
681+ " configured dead-letter-exchange ~ts and dead-letter-routing-key '~ts '. "
682+ " Previously dead-letter messages could not be forwarded on configured "
683+ " dead-letter-exchange ~ts and dead-letter-routing-key '~ts '. "
684+ " Forwarding of ~b pending dead-letter messages will be attempted." ,
685+ [rabbit_misc :rs (QueueResource ), rabbit_misc :rs (DlxResource ),
686+ RoutingKey , rabbit_misc :rs (OldDlx ), OldRoutingKey , maps :size (Pendings )]),
687+ State # state {logged = maps :remove (no_route , Logged )};
688+ clear_log_no_route_once (State ) ->
689+ State .
690+
660691log_cycles (Cycles , RoutingKeys , State ) ->
661692 lists :foldl (fun (Cycle , S ) -> log_cycle_once (Cycle , RoutingKeys , S ) end , State , Cycles ).
662693
0 commit comments