Skip to content

Commit da0eb09

Browse files
authored
Merge pull request #15075 from rabbitmq/reject
Provide queue name and reason to publisher for rejected messages
2 parents b0ce844 + c4fdca4 commit da0eb09

File tree

11 files changed

+229
-124
lines changed

11 files changed

+229
-124
lines changed

deps/rabbit/src/rabbit_amqp_session.erl

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -373,7 +373,10 @@
373373

374374
%% Queue actions that we will process later such that we can confirm and reject
375375
%% delivery IDs in ranges to reduce the number of DISPOSITION frames sent to the client.
376-
stashed_rejected = [] :: [{rejected, rabbit_amqqueue:name(), [delivery_number(),...]}],
376+
stashed_rejected = [] :: [{rejected,
377+
rabbit_amqqueue:name(),
378+
rabbit_queue_type:reject_reason(),
379+
[delivery_number(),...]}],
377380
stashed_settled = [] :: [{settled, rabbit_amqqueue:name(), [delivery_number(),...]}],
378381
%% Classic queues that are down.
379382
stashed_down = []:: [rabbit_amqqueue:name()],
@@ -692,7 +695,17 @@ send_delivery_state_changes(State0 = #state{cfg = #cfg{writer_pid = Writer,
692695
%% Order is important:
693696
%% 1. Process queue rejections.
694697
{RejectedIds, GrantCredits0, State1} = handle_stashed_rejected(State0),
695-
send_dispositions(RejectedIds, #'v1_0.rejected'{}, Writer, ChannelNum),
698+
maps:foreach(
699+
fun({QNameBin, Reason}, Ids) ->
700+
Info = {map,
701+
[{{symbol, <<"queue">>}, {utf8, QNameBin}},
702+
{{symbol, <<"reason">>}, {symbol, reject_reason_to_binary(Reason)}}]},
703+
Rej = #'v1_0.rejected'{
704+
error = #'v1_0.error'{
705+
condition = ?V_1_0_AMQP_ERROR_PRECONDITION_FAILED,
706+
info = Info}},
707+
send_dispositions(Ids, Rej, Writer, ChannelNum)
708+
end, RejectedIds),
696709
%% 2. Process queue confirmations.
697710
{AcceptedIds0, GrantCredits1, State2} = handle_stashed_settled(GrantCredits0, State1),
698711
%% 3. Process unavailable classic queues.
@@ -716,22 +729,28 @@ send_delivery_state_changes(State0 = #state{cfg = #cfg{writer_pid = Writer,
716729
State.
717730

718731
handle_stashed_rejected(#state{stashed_rejected = []} = State) ->
719-
{[], #{}, State};
732+
{#{}, #{}, State};
720733
handle_stashed_rejected(#state{cfg = #cfg{max_link_credit = MaxLinkCredit},
721734
stashed_rejected = Actions,
722735
incoming_links = Links} = State0) ->
723736
{Ids, GrantCredits, Ls} =
724737
lists:foldl(
725-
fun({rejected, _QName, Correlations}, Accum) ->
738+
fun({rejected, #resource{name = QNameBin}, Reason, Correlations}, Accum) ->
726739
lists:foldl(
727740
fun({HandleInt, DeliveryId}, {Ids0, GrantCreds0, Links0} = Acc) ->
728741
case Links0 of
729742
#{HandleInt := Link0 = #incoming_link{incoming_unconfirmed_map = U0}} ->
730743
case maps:take(DeliveryId, U0) of
731744
{{_, Settled, _}, U} ->
732745
Ids1 = case Settled of
733-
true -> Ids0;
734-
false -> [DeliveryId | Ids0]
746+
true ->
747+
Ids0;
748+
false ->
749+
maps:update_with(
750+
{QNameBin, Reason},
751+
fun(L) -> [DeliveryId | L] end,
752+
[DeliveryId],
753+
Ids0)
735754
end,
736755
Link1 = Link0#incoming_link{incoming_unconfirmed_map = U},
737756
{Link, GrantCreds} = maybe_grant_link_credit(
@@ -745,7 +764,7 @@ handle_stashed_rejected(#state{cfg = #cfg{max_link_credit = MaxLinkCredit},
745764
Acc
746765
end
747766
end, Accum, Correlations)
748-
end, {[], #{}, Links}, Actions),
767+
end, {#{}, #{}, Links}, Actions),
749768

750769
State = State0#state{stashed_rejected = [],
751770
incoming_links = Ls},
@@ -2116,7 +2135,7 @@ handle_queue_actions(Actions, State) ->
21162135
lists:foldl(
21172136
fun ({settled, _QName, _DelIds} = Action, #state{stashed_settled = As} = S) ->
21182137
S#state{stashed_settled = [Action | As]};
2119-
({rejected, _QName, _DelIds} = Action, #state{stashed_rejected = As} = S) ->
2138+
({rejected, _QName, _Reason, _DelIds} = Action, #state{stashed_rejected = As} = S) ->
21202139
S#state{stashed_rejected = [Action | As]};
21212140
({deliver, CTag, AckRequired, Msgs}, S0) ->
21222141
lists:foldl(fun(Msg, S) ->
@@ -2584,6 +2603,11 @@ rejected(DeliveryId, Error) ->
25842603
settled = true,
25852604
state = #'v1_0.rejected'{error = Error}}.
25862605

2606+
reject_reason_to_binary(maxlen) ->
2607+
<<"maxlen">>;
2608+
reject_reason_to_binary(down) ->
2609+
<<"unavailable">>.
2610+
25872611
maybe_grant_link_credit(Credit, MaxLinkCredit, DeliveryCount, NumUnconfirmed, Handle) ->
25882612
case grant_link_credit(Credit, MaxLinkCredit, NumUnconfirmed) of
25892613
true ->

deps/rabbit/src/rabbit_channel.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2787,7 +2787,7 @@ handle_queue_actions(Actions, State) ->
27872787
lists:foldl(
27882788
fun({settled, QRef, MsgSeqNos}, S0) ->
27892789
confirm(MsgSeqNos, QRef, S0);
2790-
({rejected, _QRef, MsgSeqNos}, S0) ->
2790+
({rejected, _QRef, _Reason, MsgSeqNos}, S0) ->
27912791
{U, Rej} =
27922792
lists:foldr(
27932793
fun(SeqNo, {U1, Acc}) ->

deps/rabbit/src/rabbit_classic_queue.erl

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -390,7 +390,7 @@ handle_event(QName, {reject_publish, SeqNo, _QPid},
390390
%% It does not matter which queue rejected the message,
391391
%% if any queue did, it should not be confirmed.
392392
{U, Rejected} = reject_seq_no(SeqNo, U0),
393-
Actions = [{rejected, QName, Rejected}],
393+
Actions = [{rejected, QName, maxlen, Rejected}],
394394
{ok, State#?STATE{unconfirmed = U}, Actions};
395395
handle_event(QName, {down, Pid, Info}, #?STATE{monitored = Monitored,
396396
unconfirmed = U0} = State0) ->
@@ -405,11 +405,19 @@ handle_event(QName, {down, Pid, Info}, #?STATE{monitored = Monitored,
405405
maps:filter(fun (_, #msg_status{pending = Pids}) ->
406406
lists:member(Pid, Pids)
407407
end, U0)),
408-
{Unconfirmed, Settled, Rejected} =
409-
settle_seq_nos(MsgSeqNos, Pid, U0, down),
410-
Actions = settlement_action(
411-
settled, QName, Settled,
412-
settlement_action(rejected, QName, Rejected, Actions0)),
408+
{Unconfirmed, Settled, Rejected} = settle_seq_nos(MsgSeqNos, Pid, U0, down),
409+
Actions1 = case Rejected of
410+
[] ->
411+
Actions0;
412+
_ ->
413+
[{rejected, QName, down, Rejected} | Actions0]
414+
end,
415+
Actions = case Settled of
416+
[] ->
417+
Actions1;
418+
_ ->
419+
[{settled, QName, Settled} | Actions1]
420+
end,
413421
{ok, State#?STATE{unconfirmed = Unconfirmed}, Actions};
414422
true ->
415423
%% any abnormal exit should be considered a full reject of the
@@ -425,17 +433,12 @@ handle_event(QName, {down, Pid, Info}, #?STATE{monitored = Monitored,
425433
end, [], U0),
426434
U = maps:without(MsgIds, U0),
427435
{ok, State#?STATE{unconfirmed = U},
428-
[{rejected, QName, MsgIds} | Actions0]}
436+
[{rejected, QName, down, MsgIds} | Actions0]}
429437
end;
430438
handle_event(_QName, Action, State)
431439
when element(1, Action) =:= credit_reply ->
432440
{ok, State, [Action]}.
433441

434-
settlement_action(_Type, _QRef, [], Acc) ->
435-
Acc;
436-
settlement_action(Type, QRef, MsgSeqs, Acc) ->
437-
[{Type, QRef, MsgSeqs} | Acc].
438-
439442
supports_stateful_delivery() -> true.
440443

441444
-spec deliver([{amqqueue:target(), state()}],

deps/rabbit/src/rabbit_fifo_dlx_worker.erl

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,8 @@ wait_for_queue_deleted(QRef, N) ->
232232
end.
233233

234234
-spec lookup_topology(state()) -> state().
235-
lookup_topology(#state{queue_ref = {resource, Vhost, queue, _} = QRef} = State) ->
235+
lookup_topology(#state{queue_ref = #resource{virtual_host = Vhost,
236+
kind = queue} = QRef} = State) ->
236237
{ok, Q} = rabbit_amqqueue:lookup(QRef),
237238
DLRKey = rabbit_queue_type_util:args_policy_lookup(<<"dead-letter-routing-key">>,
238239
fun(_Pol, QArg) -> QArg end, Q),
@@ -253,7 +254,7 @@ handle_queue_actions(Actions, State0) ->
253254
S1 = handle_settled(QRef, MsgSeqs, S0),
254255
S2 = ack(S1),
255256
maybe_cancel_timer(S2);
256-
({rejected, QRef, MsgSeqs}, S0) ->
257+
({rejected, QRef, _Reason, MsgSeqs}, S0) ->
257258
handle_rejected(QRef, MsgSeqs, S0);
258259
({queue_down, _QRef}, S0) ->
259260
%% target classic queue is down, but not deleted
@@ -291,8 +292,8 @@ rejected(SeqNo, Qs, Pendings)
291292
Pendings);
292293
false ->
293294
?LOG_DEBUG("Ignoring rejection for unknown sequence number ~b "
294-
"from target dead letter queues ~tp",
295-
[SeqNo, Qs]),
295+
"from target dead letter queues ~tp",
296+
[SeqNo, Qs]),
296297
Pendings
297298
end.
298299

deps/rabbit/src/rabbit_queue_type.erl

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@
8181

8282
-type queue_name() :: rabbit_amqqueue:name().
8383
-type queue_state() :: term().
84+
-type reject_reason() :: maxlen | down.
8485
%% sequence number typically
8586
-type correlation() :: term().
8687
-type arguments() :: queue_arguments | consumer_arguments.
@@ -101,6 +102,7 @@
101102
%% indicate to the queue type module that a message has been delivered
102103
%% fully to the queue
103104
{settled, queue_name(), [correlation()]} |
105+
{rejected, queue_name(), reject_reason(), [correlation()]} |
104106
{deliver, rabbit_types:ctag(), boolean(), [rabbit_amqqueue:qmsg()]} |
105107
{block | unblock, QueueName :: term()} |
106108
credit_reply_action().
@@ -158,6 +160,7 @@
158160
credit_reply_action/0,
159161
action/0,
160162
actions/0,
163+
reject_reason/0,
161164
settle_op/0,
162165
queue_type/0,
163166
credit/0,

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1125,7 +1125,7 @@ deliver(QSs, Msg0, Options) ->
11251125
case deliver0(QName, Correlation, Msg, S0) of
11261126
{reject_publish, S} ->
11271127
{[{Q, S} | Qs],
1128-
[{rejected, QName, [Correlation]} | Actions]};
1128+
[{rejected, QName, maxlen, [Correlation]} | Actions]};
11291129
{ok, S, As} ->
11301130
{[{Q, S} | Qs], As ++ Actions}
11311131
end

0 commit comments

Comments
 (0)