Skip to content

Commit 336a003

Browse files
committed
WIP
1 parent 54e87d7 commit 336a003

2 files changed

Lines changed: 23 additions & 7 deletions

File tree

deps/rabbit/src/rabbit.erl

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@
3333
pg_local_scope/1,
3434
pg_scope_amqp091_channel/0,
3535
pg_scope_amqp091_connection/0,
36-
pg_scope_non_amqp_connection/0]).
36+
pg_scope_non_amqp_connection/0,
37+
pg_scope_direct_connection/0]).
3738
%% For CLI, testing and mgmt-agent.
3839
-export([set_log_level/1, log_locations/0, config_files/0]).
3940
-export([is_booted/1, is_booted/0, is_booting/1, is_booting/0]).
@@ -46,6 +47,7 @@
4647
pg_local_amqp091_channel/0,
4748
pg_local_amqp091_connection/0,
4849
pg_local_non_amqp_connection/0,
50+
pg_local_direct_connection/0,
4951
prevent_startup_if_node_was_reset/0]).
5052

5153
-rabbit_boot_step({pre_boot, [{description, "rabbit boot start"}]}).
@@ -316,6 +318,12 @@
316318
{requires, kernel_ready},
317319
{enables, core_initialized}]}).
318320

321+
-rabbit_boot_step({pg_local_direct_connection,
322+
[{description, "local-only pg scope for direct connections"},
323+
{mfa, {rabbit, pg_local_direct_connection, []}},
324+
{requires, kernel_ready},
325+
{enables, core_initialized}]}).
326+
319327
%%---------------------------------------------------------------------------
320328

321329
-include_lib("rabbit_common/include/rabbit.hrl").
@@ -1162,6 +1170,11 @@ pg_local_non_amqp_connection() ->
11621170
persistent_term:put(pg_scope_non_amqp_connection, PgScope),
11631171
rabbit_sup:start_child(pg_non_amqp_connection, pg, [PgScope]).
11641172

1173+
pg_local_direct_connection() ->
1174+
PgScope = pg_local_scope(direct_connection),
1175+
persistent_term:put(pg_scope_direct_connection, PgScope),
1176+
rabbit_sup:start_child(pg_direct_connection, pg, [PgScope]).
1177+
11651178
pg_local_scope(Prefix) ->
11661179
list_to_atom(io_lib:format("~s_~s", [Prefix, node()])).
11671180

@@ -1174,6 +1187,9 @@ pg_scope_amqp091_connection() ->
11741187
pg_scope_non_amqp_connection() ->
11751188
persistent_term:get(pg_scope_non_amqp_connection).
11761189

1190+
pg_scope_direct_connection() ->
1191+
persistent_term:get(pg_scope_direct_connection).
1192+
11771193
-spec update_cluster_tags() -> 'ok'.
11781194

11791195
update_cluster_tags() ->

deps/rabbit/src/rabbit_direct.erl

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,14 @@
2020
-include_lib("rabbit_common/include/rabbit_misc.hrl").
2121
-include_lib("kernel/include/logger.hrl").
2222

23-
-define(TABLE, ?MODULE).
24-
2523
%%----------------------------------------------------------------------------
2624

25+
pg_scope() ->
26+
rabbit:pg_scope_direct_connection().
27+
2728
-spec boot() -> 'ok'.
2829

2930
boot() ->
30-
?TABLE = ets:new(?TABLE, [set, public, named_table]),
3131
rabbit_sup:start_supervisor_child(
3232
rabbit_direct_client_sup, rabbit_client_sup,
3333
[{local, rabbit_direct_client_sup},
@@ -42,7 +42,7 @@ force_event_refresh(Ref) ->
4242
-spec list_local() -> [pid()].
4343

4444
list_local() ->
45-
[Pid || {Pid} <- ets:tab2list(?TABLE)].
45+
pg:which_groups(pg_scope()).
4646

4747
-spec list() -> [pid()].
4848

@@ -190,7 +190,7 @@ connect1(User = #user{username = Username}, VHost, Pid, Infos) ->
190190
AuthzContext = proplists:get_value(variable_map, Infos, #{}),
191191
try rabbit_access_control:check_vhost_access(User, VHost,
192192
{ip, PeerHost}, AuthzContext) of
193-
ok -> ets:insert(?TABLE, {Pid}),
193+
ok -> ok = pg:join(pg_scope(), Pid, Pid),
194194
rabbit_core_metrics:connection_created(Pid, Infos),
195195
rabbit_event:notify(connection_created, Infos),
196196
_ = rabbit_alarm:register(
@@ -250,7 +250,7 @@ start_channel(Number, ClientChannelPid, ConnPid, ConnName,
250250
-spec disconnect(pid(), rabbit_event:event_props()) -> 'ok'.
251251

252252
disconnect(Pid, Infos) ->
253-
ets:delete(?TABLE, Pid),
253+
pg:leave(pg_scope(), Pid, Pid),
254254
rabbit_core_metrics:connection_closed(Pid),
255255
rabbit_event:notify(connection_closed, Infos).
256256

0 commit comments

Comments
 (0)