Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 22 additions & 16 deletions deps/rabbit/src/rabbit_amqp_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,8 @@
%% While at_least_one_credit_req_in_flight is true, we stash the
%% latest credit request from the receiving client.
stashed_credit_req :: none | #credit_req{},
%% Whether a consumer timeout warning was already logged for this link.
timeout_logged :: boolean()
%% Consumer timeout log level
timeout_log_level :: warning | debug
}).

-record(outgoing_unsettled, {
Expand Down Expand Up @@ -860,15 +860,15 @@ handle_stashed_consumer_timeout(#state{cfg = #cfg{container_id = ContainerId,
fun(CTag, true, Acc) ->
Handle = ctag_to_handle(CTag),
case Acc of
#{Handle := #outgoing_link{timeout_logged = false,
name = LinkName,
queue_name = QName} = Link} ->
?LOG_WARNING(
"released unsettled messages due to consumer timeout on "
"connection '~ts' for link '~ts' with handle ~b to "
"AMQP container '~ts' consuming from ~ts",
[ConnName, LinkName, Handle, ContainerId, rabbit_misc:rs(QName)]),
Acc#{Handle := Link#outgoing_link{timeout_logged = true}};
#{Handle := #outgoing_link{name = LinkName,
queue_name = QName,
timeout_log_level = Level} = Link} ->
?LOG(Level,
"released unsettled messages due to consumer timeout on "
"connection '~ts' for link '~ts' with handle ~b to "
"AMQP container '~ts' consuming from ~ts",
[ConnName, LinkName, Handle, ContainerId, rabbit_misc:rs(QName)]),
Acc#{Handle := Link#outgoing_link{timeout_log_level = debug}};
_ ->
Acc
end
Expand Down Expand Up @@ -1629,7 +1629,7 @@ handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER,
drain = false},
at_least_one_credit_req_in_flight = false,
stashed_credit_req = none,
timeout_logged = false},
timeout_log_level = warning},
OutgoingLinks = OutgoingLinks0#{HandleInt => L},
State1 = State0#state{queue_states = QStates,
outgoing_links = OutgoingLinks,
Expand Down Expand Up @@ -4151,9 +4151,15 @@ info_outgoing_management_links(Links) ->
credit = Credit} <- Links].

info_outgoing_links(Links) ->
[info_outgoing_link(Handle, Name, SourceAddress, QueueNameBin,
SendSettled, MaxMessageSize, Filter,
DeliveryCount, Credit, ConsumerTimeoutLogged)
[begin
ConsumerTimeout = case ConsumerTimeoutLogLevel of
warning -> false;
debug -> true
end,
info_outgoing_link(Handle, Name, SourceAddress, QueueNameBin,
SendSettled, MaxMessageSize, Filter,
DeliveryCount, Credit, ConsumerTimeout)
end
|| Handle := #outgoing_link{
name = Name,
source_address = SourceAddress,
Expand All @@ -4164,7 +4170,7 @@ info_outgoing_links(Links) ->
client_flow_ctl = #client_flow_ctl{
delivery_count = DeliveryCount,
credit = Credit},
timeout_logged = ConsumerTimeoutLogged}
timeout_log_level = ConsumerTimeoutLogLevel}
<- Links].

info_outgoing_link(Handle, LinkName, SourceAddress, QueueNameBin, SendSettled,
Expand Down
Loading