Skip to content

Commit 7ad356e

Browse files
Merge pull request #15287 from rabbitmq/mergify/bp/v4.1.x/pr-15286
Check exclusive queue owner before deleting a queue (backport #15276) (backport #15286)
2 parents 76a4cd2 + ef0276e commit 7ad356e

5 files changed

Lines changed: 96 additions & 159 deletions

File tree

deps/rabbit/src/amqqueue.erl

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
pattern_match_on_durable/1,
7373
pattern_match_on_type_and_durable/2,
7474
pattern_match_on_type_and_vhost/2,
75+
pattern_match_on_exclusive_owner/1,
7576
reset_decorators/1,
7677
set_immutable/1,
7778
qnode/1,
@@ -594,6 +595,11 @@ pattern_match_on_type_and_durable(Type, IsDurable) ->
594595
pattern_match_on_type_and_vhost(Type, VHost) ->
595596
#amqqueue{type = Type, vhost = VHost, _ = '_'}.
596597

598+
-spec pattern_match_on_exclusive_owner(pid() | none) -> amqqueue_pattern().
599+
600+
pattern_match_on_exclusive_owner(Owner) ->
601+
#amqqueue{exclusive_owner = Owner, _ = '_'}.
602+
597603
-spec reset_decorators(amqqueue()) -> amqqueue().
598604

599605
reset_decorators(#amqqueue{} = Queue) ->

deps/rabbit/src/rabbit_amqqueue.erl

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@
7979
%% Deprecated feature callback.
8080
-export([are_transient_nonexcl_used/1]).
8181

82+
-include_lib("khepri/include/khepri.hrl").
8283
-include_lib("rabbit_common/include/rabbit.hrl").
8384
-include_lib("stdlib/include/qlc.hrl").
8485
-include("amqqueue.hrl").
@@ -1863,7 +1864,14 @@ internal_delete(Queue, ActingUser) ->
18631864

18641865
internal_delete(Queue, ActingUser, Reason) ->
18651866
QueueName = amqqueue:get_name(Queue),
1866-
case rabbit_db_queue:delete(QueueName, Reason) of
1867+
Conditions = case amqqueue:get_exclusive_owner(Queue) of
1868+
none ->
1869+
[];
1870+
Owner ->
1871+
Pattern = amqqueue:pattern_match_on_exclusive_owner(Owner),
1872+
[#if_data_matches{pattern = Pattern}]
1873+
end,
1874+
case rabbit_db_queue:delete_if(QueueName, Conditions, Reason) of
18671875
ok ->
18681876
ok;
18691877
{error, timeout} = Err ->

deps/rabbit/src/rabbit_db_queue.erl

Lines changed: 46 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
create_or_get/1,
3131
set/1,
3232
delete/2,
33+
delete_if/3,
3334
update/2,
3435
update_decorators/2,
3536
exists/1
@@ -384,34 +385,51 @@ list_for_count_in_khepri(VHostName) ->
384385
rabbit_khepri:timeout_error().
385386

386387
delete(QueueName, Reason) ->
388+
delete_if(QueueName, [], Reason).
389+
390+
-spec delete_if(QName, Conditions, Reason) -> Ret when
391+
QName :: rabbit_amqqueue:name(),
392+
Conditions :: [khepri_condition:condition()],
393+
Reason :: atom(),
394+
Ret :: ok |
395+
Deletions :: rabbit_binding:deletions() |
396+
rabbit_khepri:timeout_error().
397+
398+
delete_if(QueueName, Conditions, Reason) ->
387399
rabbit_khepri:handle_fallback(
388-
#{mnesia => fun() -> delete_in_mnesia(QueueName, Reason) end,
389-
khepri => fun() -> delete_in_khepri(QueueName) end
400+
#{mnesia => fun() -> delete_in_mnesia(QueueName, Conditions, Reason) end,
401+
khepri => fun() -> delete_in_khepri(QueueName, Conditions) end
390402
}).
391403

392-
delete_in_mnesia(QueueName, Reason) ->
404+
delete_in_mnesia(QueueName, Conditions, Reason) ->
393405
rabbit_mnesia:execute_mnesia_transaction(
394406
fun () ->
395407
case {mnesia:wread({?MNESIA_TABLE, QueueName}),
396408
mnesia:wread({?MNESIA_DURABLE_TABLE, QueueName})} of
397409
{[], []} ->
398410
ok;
399-
_ ->
400-
OnlyDurable = case Reason of
401-
missing_owner -> true;
402-
_ -> false
403-
end,
404-
internal_delete_in_mnesia(QueueName, OnlyDurable, Reason)
411+
{Qs, DurableQs} ->
412+
case queue_matches_conditions(Qs ++ DurableQs, Conditions) of
413+
true ->
414+
OnlyDurable = case Reason of
415+
missing_owner -> true;
416+
_ -> false
417+
end,
418+
internal_delete_in_mnesia(QueueName, OnlyDurable, Reason);
419+
false ->
420+
ok
421+
end
405422
end
406423
end).
407424

408-
delete_in_khepri(QueueName) ->
409-
delete_in_khepri(QueueName, false).
425+
delete_in_khepri(QueueName, Conditions) ->
426+
delete_in_khepri(QueueName, Conditions, false).
410427

411-
delete_in_khepri(QueueName, OnlyDurable) ->
428+
delete_in_khepri(QueueName, Conditions, OnlyDurable) ->
429+
Path0 = khepri_queue_path(QueueName),
430+
Path = khepri_path:combine_with_conditions(Path0, Conditions),
412431
rabbit_khepri:transaction(
413432
fun () ->
414-
Path = khepri_queue_path(QueueName),
415433
case khepri_tx_adv:delete(Path) of
416434
{ok, #{data := _}} ->
417435
%% we want to execute some things, as decided by rabbit_exchange,
@@ -438,7 +456,7 @@ internal_delete(QueueName, OnlyDurable, Reason) ->
438456
%% HA queues are removed it can be removed.
439457
rabbit_khepri:handle_fallback(
440458
#{mnesia => fun() -> internal_delete_in_mnesia(QueueName, OnlyDurable, Reason) end,
441-
khepri => fun() -> delete_in_khepri(QueueName, OnlyDurable) end
459+
khepri => fun() -> delete_in_khepri(QueueName, [], OnlyDurable) end
442460
}).
443461

444462
internal_delete_in_mnesia(QueueName, OnlyDurable, Reason) ->
@@ -1360,6 +1378,20 @@ clear_in_khepri() ->
13601378
%% Internal
13611379
%% --------------------------------------------------------------
13621380

1381+
queue_matches_conditions(_, []) ->
1382+
true;
1383+
queue_matches_conditions([], _Conditions) ->
1384+
false;
1385+
queue_matches_conditions([Q | _], [#if_data_matches{pattern = Pattern,
1386+
conditions = Conditions} | Rest]) ->
1387+
MatchSpec = ets:match_spec_compile([{Pattern, Conditions, [match]}]),
1388+
case ets:match_spec_run([Q], MatchSpec) of
1389+
[match] -> queue_matches_conditions([Q], Rest);
1390+
[] -> false
1391+
end;
1392+
queue_matches_conditions(_, [Condition | _]) ->
1393+
error({unsupported_condition, Condition}).
1394+
13631395
list_with_possible_retry_in_mnesia(Fun) ->
13641396
%% amqqueue migration:
13651397
%% The `rabbit_queue' or `rabbit_durable_queue' tables

deps/rabbit/test/rabbit_db_queue_SUITE.erl

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
-module(rabbit_db_queue_SUITE).
99

10+
-include_lib("khepri/include/khepri.hrl").
1011
-include_lib("rabbit_common/include/rabbit.hrl").
1112
-include_lib("eunit/include/eunit.hrl").
1213
-include("amqqueue.hrl").
@@ -42,6 +43,7 @@ all_tests() ->
4243
count_by_vhost,
4344
set,
4445
delete,
46+
delete_exclusive_queue,
4547
update,
4648
update_decorators,
4749
exists,
@@ -62,7 +64,8 @@ mnesia_tests() ->
6264
foreach_transient,
6365
delete_transient,
6466
update_in_mnesia_tx,
65-
get_durable_in_mnesia_tx
67+
get_durable_in_mnesia_tx,
68+
delete_exclusive_queue
6669
].
6770

6871
%% -------------------------------------------------------------------
@@ -323,6 +326,34 @@ delete1(_Config) ->
323326
?assertEqual({error, not_found}, rabbit_db_queue:get(QName)),
324327
passed.
325328

329+
delete_exclusive_queue(Config) ->
330+
passed = rabbit_ct_broker_helpers:rpc(
331+
Config, 0, ?MODULE, delete_exclusive_queue1, [Config]).
332+
333+
delete_exclusive_queue1(_Config) ->
334+
QName = rabbit_misc:r(?VHOST, queue, <<"test-exclusive-queue">>),
335+
Owner = spawn_link(fun() -> receive stop -> ok end end),
336+
NotOwner = spawn_link(fun() -> receive stop -> ok end end),
337+
Q = new_exclusive_queue(QName, rabbit_classic_queue, Owner),
338+
?assertEqual(ok, rabbit_db_queue:set(Q)),
339+
?assertEqual({ok, Q}, rabbit_db_queue:get(QName)),
340+
%% Delete with wrong owner should not delete the queue
341+
NotOwnerPattern = amqqueue:pattern_match_on_exclusive_owner(NotOwner),
342+
NotOwnerConditions = [#if_data_matches{pattern = NotOwnerPattern}],
343+
?assertEqual(ok, rabbit_db_queue:delete_if(QName, NotOwnerConditions, normal)),
344+
?assertEqual({ok, Q}, rabbit_db_queue:get(QName)),
345+
%% Delete with correct owner should delete the queue
346+
OwnerPattern = amqqueue:pattern_match_on_exclusive_owner(Owner),
347+
OwnerConditions = [#if_data_matches{pattern = OwnerPattern}],
348+
Deletions = rabbit_db_queue:delete_if(QName, OwnerConditions, normal),
349+
?assertEqual(rabbit_binding:new_deletions(), Deletions),
350+
?assertEqual({error, not_found}, rabbit_db_queue:get(QName)),
351+
unlink(Owner),
352+
unlink(NotOwner),
353+
Owner ! stop,
354+
NotOwner ! stop,
355+
passed.
356+
326357
update(Config) ->
327358
passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, update1, [Config]).
328359

@@ -620,3 +651,6 @@ new_queue(QName, Type) ->
620651

621652
new_queue(#resource{virtual_host = VHost} = QName, Type, Pid) ->
622653
amqqueue:new(QName, Pid, true, false, none, [], VHost, #{}, Type).
654+
655+
new_exclusive_queue(#resource{virtual_host = VHost} = QName, Type, Owner) ->
656+
amqqueue:new(QName, none, true, false, Owner, [], VHost, #{}, Type).

deps/rabbitmq_federation/README-hacking

Lines changed: 0 additions & 143 deletions
This file was deleted.

0 commit comments

Comments
 (0)