Skip to content

Commit 20caa4d

Browse files
committed
Add OSS changes for rabbitmq_jms plugin
This commit adds OSS changes required for the new VMware Tanzu RabbitMQ `rabbitmq_jms` plugin. The deleted tests in this commit moved to `~/tanzu-rabbitmq/deps/rabbitmq_jms/test/`. (cherry picked from commit 7fd4b16)
1 parent a6ef316 commit 20caa4d

50 files changed

Lines changed: 704 additions & 2076 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

deps/amqp10_client/src/amqp10_client_session.erl

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,8 @@
8383
-type target_def() :: #{address => terminus_address(),
8484
durable => terminus_durability()}.
8585
-type source_def() :: #{address => terminus_address(),
86-
durable => terminus_durability()}.
86+
durable => terminus_durability(),
87+
distribution_mode => distribution_mode()}.
8788

8889
-type attach_role() :: {sender, target_def()} | {receiver, source_def(), pid()}.
8990

@@ -818,12 +819,19 @@ make_source(#{role := {receiver, Source, _Pid}} = AttachArgs) ->
818819
_ ->
819820
undefined
820821
end,
822+
DistMode = case Source of
823+
#{distribution_mode := Mode} ->
824+
amqp10_util:dist_mode_from_atom(Mode);
825+
_ ->
826+
undefined
827+
end,
821828
Dynamic = maps:get(dynamic, Source, false),
822829
TranslatedFilter = translate_filters(maps:get(filter, AttachArgs, #{})),
823830
#'v1_0.source'{address = make_address(Source),
824831
durable = {uint, Durable},
825832
expiry_policy = ExpiryPolicy,
826833
dynamic = Dynamic,
834+
distribution_mode = DistMode,
827835
filter = TranslatedFilter,
828836
capabilities = make_capabilities(Source)}.
829837

deps/amqp10_common/include/amqp10_filter.hrl

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,27 @@
55
%% Copyright (c) 2007-2026 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
66

77
%% AMQP Filter Expressions Version 1.0 Committee Specification Draft 01
8+
%% https://docs.oasis-open.org/amqp/filtex/v1.0/csd01/filtex-v1.0-csd01.html#_Toc67929253
9+
-define(CAP_FILTEX_SQL, <<"AMQP_FILTEX_SQL_V1_0">>).
10+
-define(CAP_FILTEX_PROP, <<"AMQP_FILTEX_PROP_V1_0">>).
811
%% https://docs.oasis-open.org/amqp/filtex/v1.0/csd01/filtex-v1.0-csd01.html#_Toc67929266
912
-define(DESCRIPTOR_NAME_PROPERTIES_FILTER, <<"amqp:properties-filter">>).
1013
-define(DESCRIPTOR_CODE_PROPERTIES_FILTER, 16#173).
1114
-define(DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER, <<"amqp:application-properties-filter">>).
1215
-define(DESCRIPTOR_CODE_APPLICATION_PROPERTIES_FILTER, 16#174).
13-
1416
%% A filter with this name contains an AMQP SQL expression.
1517
-define(FILTER_NAME_SQL, <<"sql-filter">>).
1618
%% https://docs.oasis-open.org/amqp/filtex/v1.0/csd01/filtex-v1.0-csd01.html#_Toc67929276
1719
-define(DESCRIPTOR_NAME_SQL_FILTER, <<"amqp:sql-filter">>).
1820
-define(DESCRIPTOR_CODE_SQL_FILTER, 16#120).
21+
22+
%% A filter with this name contains a JMS message selector.
23+
%% We use the same name as Qpid JMS in
24+
%% https://github.com/apache/qpid-jms/blob/2.10.0/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSupport.java#L75
25+
-define(FILTER_NAME_SELECTOR, <<"jms-selector">>).
26+
%% SQL-based filtering syntax
27+
%% This capability and these descriptors are defined in
28+
%% https://www.amqp.org/specification/1.0/filters
29+
-define(CAP_SELECTOR, <<"APACHE.ORG:SELECTOR">>).
30+
-define(DESCRIPTOR_NAME_SELECTOR_FILTER, <<"apache.org:selector-filter:string">>).
31+
-define(DESCRIPTOR_CODE_SELECTOR_FILTER, 16#0000468C00000004).

deps/amqp10_common/include/amqp10_types.hrl

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,3 +26,6 @@
2626

2727
% [3.2.16]
2828
-define(MESSAGE_FORMAT, 0).
29+
30+
% [3.5.7]
31+
-type distribution_mode() :: move | copy.

deps/amqp10_common/src/amqp10_util.erl

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,11 @@
77

88
-module(amqp10_util).
99
-include_lib("amqp10_common/include/amqp10_types.hrl").
10-
-export([link_credit_snd/3]).
10+
-include_lib("amqp10_common/include/amqp10_framing.hrl").
11+
12+
-export([link_credit_snd/3,
13+
dist_mode_to_atom/1,
14+
dist_mode_from_atom/1]).
1115

1216
%% AMQP 1.0 §2.6.7
1317
-spec link_credit_snd(sequence_no(), uint(), sequence_no()) -> uint().
@@ -18,3 +22,15 @@ link_credit_snd(DeliveryCountRcv, LinkCreditRcv, DeliveryCountSnd) ->
1822
%% LinkCreditSnd can be negative when receiver decreases credits
1923
%% while messages are in flight. Maintain a floor of zero.
2024
max(0, LinkCreditSnd).
25+
26+
-spec dist_mode_to_atom({symbol, binary()}) -> distribution_mode().
27+
dist_mode_to_atom(?V_1_0_STD_DIST_MODE_MOVE) ->
28+
move;
29+
dist_mode_to_atom(?V_1_0_STD_DIST_MODE_COPY) ->
30+
copy.
31+
32+
-spec dist_mode_from_atom(distribution_mode()) -> {symbol, binary()}.
33+
dist_mode_from_atom(move) ->
34+
?V_1_0_STD_DIST_MODE_MOVE;
35+
dist_mode_from_atom(copy) ->
36+
?V_1_0_STD_DIST_MODE_COPY.

deps/rabbit/Makefile

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ define PROJECT_ENV
8383
%% See https://www.rabbitmq.com/docs/consumers#acknowledgement-timeout
8484
%% 30 minutes
8585
{consumer_timeout, 1800000},
86+
{consumer_disconnected_timeout, 60000},
8687

8788
%% used by rabbit_peer_discovery_classic_config
8889
{cluster_nodes, {[], disc}},
@@ -248,7 +249,7 @@ define ct_master.erl
248249
endef
249250

250251
PARALLEL_CT_SET_1_A = unit_rabbit_ssl unit_cluster_formation_locking_mocks unit_cluster_formation_sort_nodes unit_collections unit_config_value_encryption unit_connection_tracking
251-
PARALLEL_CT_SET_1_B = amqp_address amqp_auth amqp_filter_prop amqp_filter_sql amqp_filter_sql_unit amqp_dotnet amqp_jms signal_handling single_active_consumer unit_access_control_authn_authz_context_propagation unit_access_control_credential_validation unit_amqp091_content_framing unit_amqp091_server_properties unit_app_management
252+
PARALLEL_CT_SET_1_B = amqp_address amqp_auth amqp_filter_prop amqp_filter_sql amqp_filter_sql_unit amqp_dotnet signal_handling single_active_consumer unit_access_control_authn_authz_context_propagation unit_access_control_credential_validation unit_amqp091_content_framing unit_amqp091_server_properties unit_app_management
252253
PARALLEL_CT_SET_1_C = amqp_proxy_protocol amqpl_consumer_ack bindings rabbit_db_maintenance rabbit_db_msup rabbit_db_policy rabbit_db_queue rabbit_db_topic_exchange cluster_limit cluster_minority default_queue_type_prop term_to_binary_compat_prop topic_permission unicode unit_access_control
253254
PARALLEL_CT_SET_1_D = amqqueue_backward_compatibility channel_interceptor channel_operation_timeout classic_queue_prop config_schema peer_discovery_dns peer_discovery_tmp_hidden_node per_node_limit per_user_connection_channel_limit
254255

@@ -352,8 +353,8 @@ ifdef TRACE_SUPERVISOR2
352353
RMQ_ERLC_OPTS += -DTRACE_SUPERVISOR2=true
353354
endif
354355

355-
# https://www.erlang.org/doc/apps/parsetools/leex.html#file/2
356356
ifndef NON_DETERMINISTIC
357+
# https://www.erlang.org/doc/apps/parsetools/leex.html#file/2
357358
YRL_ERLC_OPTS += +deterministic
358359
endif
359360

deps/rabbit/include/rabbit_global_counters.hrl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
%% Dead Letter counters:
44
%%
55
%% The following two counters are mutually exclusive because
6-
%% quorum queue dead-letter-strategy at-least-once is incompatible with overflow drop-head.
6+
%% dead-letter-strategy at-least-once is incompatible with overflow drop-head.
77
-define(MESSAGES_DEAD_LETTERED_MAXLEN, 1).
88
-define(MESSAGES_DEAD_LETTERED_CONFIRMED, 1).
99
-define(MESSAGES_DEAD_LETTERED_EXPIRED, 2).

deps/rabbit/src/amqqueue.erl

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
% type_state
5353
get_type_state/1,
5454
set_type_state/2,
55+
update_type_state/2,
5556
% state
5657
get_state/1,
5758
set_state/2,
@@ -465,11 +466,10 @@ set_pid(#amqqueue{} = Queue, Pid) ->
465466
% policy
466467

467468
-spec get_policy(amqqueue()) -> proplists:proplist() | none | undefined.
468-
469469
get_policy(#amqqueue{policy = Policy}) -> Policy.
470470

471-
-spec set_policy(amqqueue(), binary() | none | undefined) -> amqqueue().
472-
471+
-spec set_policy(amqqueue(), proplists:proplist() | none | undefined) ->
472+
amqqueue().
473473
set_policy(#amqqueue{} = Queue, Policy) ->
474474
Queue#amqqueue{policy = Policy}.
475475

@@ -499,6 +499,11 @@ set_type_state(#amqqueue{} = Queue, TState) ->
499499
set_type_state(Queue, _TState) ->
500500
Queue.
501501

502+
-spec update_type_state(amqqueue(), fun((map()) -> map())) -> amqqueue().
503+
update_type_state(#amqqueue{} = Queue, Fun) ->
504+
TState = get_type_state(Queue),
505+
set_type_state(Queue, Fun(TState)).
506+
502507
% state
503508

504509
-spec get_state(amqqueue()) -> atom() | none.

deps/rabbit/src/mc.erl

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
message_id/1,
2424
property/2,
2525
timestamp/1,
26+
received_timestamp/1,
2627
priority/1,
2728
set_ttl/2,
2829
x_header/2,
@@ -287,6 +288,11 @@ timestamp(#?MODULE{annotations = Anns}) ->
287288
timestamp(BasicMsg) ->
288289
mc_compat:timestamp(BasicMsg).
289290

291+
-spec received_timestamp(state()) -> non_neg_integer().
292+
received_timestamp(#?MODULE{annotations = #{?ANN_RECEIVED_AT_TIMESTAMP := Ts}})
293+
when is_integer(Ts) ->
294+
Ts.
295+
290296
-spec priority(state()) -> undefined | non_neg_integer().
291297
priority(#?MODULE{annotations = Anns}) ->
292298
maps:get(?ANN_PRIORITY, Anns, undefined);

0 commit comments

Comments
 (0)