182182 supports_rabbit_khepri_topic_trie_version /0 ]).
183183
184184% % Called locally to determine which projection to use.
185- -export ([get_effective_topic_binding_projection_version /0 ]).
185+ -export ([get_effective_topic_binding_projection_version /0 ,
186+ topic_trie_table_names /1 ]).
186187
187188% % Used during topic binding projections related feature flags handling.
188189-export ([topic_binding_projection_enable /1 ,
189190 topic_binding_projection_post_enable /1 ]).
191+ -export ([topic_binding_projection_v5_enable /1 ,
192+ topic_binding_projection_v5_post_enable /1 ]).
190193
191194-ifdef (TEST ).
192195-export ([register_projections /0 ,
@@ -1564,8 +1567,8 @@ projection_fun_for_sets(MapFun) ->
15641567
15651568register_rabbit_topic_binding_projection () ->
15661569 case get_effective_topic_binding_projection_version () of
1567- V when V >= 4 -> register_rabbit_topic_trie_projection ();
1568- _ -> register_rabbit_topic_graph_projection ()
1570+ V when V >= 4 -> register_rabbit_topic_trie_projection (V );
1571+ _ -> register_rabbit_topic_graph_projection ()
15691572 end .
15701573
15711574% % Topic routing via trie ETS projection (v3).
@@ -1648,7 +1651,7 @@ register_rabbit_topic_graph_projection() ->
16481651 _ = unregister_old_rabbit_topic_trie_projections (),
16491652 khepri :register_projection (? STORE_ID , PathPattern , Projection ).
16501653
1651- % % Topic routing via trie + ordered_set ETS projections (v4) .
1654+ % % Topic routing via trie + ordered_set ETS projections.
16521655% % Uses a single Khepri projection with two ETS tables:
16531656% %
16541657% % Trie edges table (set) for fast trie navigation during routing:
@@ -1662,10 +1665,7 @@ register_rabbit_topic_graph_projection() ->
16621665% % Word = binary() (a single topic segment, e.g. <<"foo">>, <<"*">>, <<"#">>)
16631666% % ChildCount = non_neg_integer() (number of outgoing edges)
16641667% % Dest = #resource{}
1665- % %
1666- % % This projection is only used once the `topic_binding_projection_v4' feature
1667- % % flag is enabled.
1668- register_rabbit_topic_trie_projection () ->
1668+ register_rabbit_topic_trie_projection (Vsn ) ->
16691669 ShouldProcessFun =
16701670 fun (rabbit_db_topic_exchange , split_topic_key_binary , 1 , _From ) ->
16711671 % % This function uses `persistent_term' to store a lazily compiled
@@ -1681,27 +1681,30 @@ register_rabbit_topic_trie_projection() ->
16811681 (M , F , A , From ) ->
16821682 khepri_tx_adv :should_process_function (M , F , A , From )
16831683 end ,
1684- Opts = #{tables => #{rabbit_khepri_topic_trie_v4 =>
1685- #{type => set },
1686- rabbit_khepri_topic_binding_v4 =>
1687- #{type => ordered_set }},
1684+ {TrieTabName , BindingTabName } = topic_trie_table_names (Vsn ),
1685+ Opts = #{tables => #{TrieTabName => #{type => set },
1686+ BindingTabName => #{type => ordered_set }},
16881687 keypos => 1 ,
16891688 read_concurrency => true ,
16901689 standalone_fun_options => #{should_process_function => ShouldProcessFun }},
16911690 PFun = fun (Tables , Path , OldProps , NewProps ) ->
1692- #{rabbit_khepri_topic_trie_v4 := TrieTab ,
1693- rabbit_khepri_topic_binding_v4 := BindingTab } = Tables ,
1691+ #{TrieTabName := TrieTab ,
1692+ BindingTabName := BindingTab } = Tables ,
16941693 {VHost , ExchangeName , Kind , DstName , BindingKey } =
16951694 rabbit_db_binding :khepri_route_path_to_args (Path ),
16961695 XSrc = {VHost , ExchangeName },
16971696 Dest = rabbit_misc :r (VHost , Kind , DstName ),
16981697 Words = rabbit_db_topic_exchange :split_topic_key_binary (BindingKey ),
1698+ Root = case Vsn of
1699+ 4 -> root ;
1700+ _ -> {root , XSrc }
1701+ end ,
16991702 case {OldProps , NewProps } of
17001703 {_ , #{data := _ }} ->
1701- LeafNodeId = trie_follow_down_create (TrieTab , XSrc , Words ),
1704+ LeafNodeId = trie_follow_down_create (TrieTab , XSrc , Root , Words ),
17021705 ets :insert (BindingTab , {{LeafNodeId , BindingKey , Dest }});
17031706 {#{data := _ }, _ } ->
1704- case trie_follow_down_get_path (TrieTab , XSrc , Words ) of
1707+ case trie_follow_down_get_path (TrieTab , XSrc , Root , Words ) of
17051708 {ok , LeafNodeId , TriePath } ->
17061709 ets :delete (BindingTab , {LeafNodeId , BindingKey , Dest }),
17071710 trie_gc_path (TrieTab , BindingTab , TriePath );
@@ -1712,7 +1715,7 @@ register_rabbit_topic_trie_projection() ->
17121715 ok
17131716 end
17141717 end ,
1715- Projection = khepri_projection :new (rabbit_khepri_topic_trie_v4 , PFun , Opts ),
1718+ Projection = khepri_projection :new (TrieTabName , PFun , Opts ),
17161719 PathPattern = topic_binding_path_pattern (),
17171720 unregister_old_rabbit_topic_trie_projections (),
17181721 khepri :register_projection (? STORE_ID , PathPattern , Projection ).
@@ -1732,8 +1735,8 @@ topic_binding_path_pattern() ->
17321735% % Each trie row is a 3-tuple: {Key, ChildNodeId, ChildCount}.
17331736% % ChildCount tracks the number of outgoing edges from ChildNodeId.
17341737% % It is incremented when a new edge is created, decremented during GC.
1735- trie_follow_down_create (TrieTab , XSrc , Words ) ->
1736- trie_follow_down_create (TrieTab , XSrc , { root , XSrc } , none , Words ).
1738+ trie_follow_down_create (TrieTab , XSrc , Root , Words ) ->
1739+ trie_follow_down_create (TrieTab , XSrc , Root , none , Words ).
17371740
17381741trie_follow_down_create (_TrieTab , _XSrc , NodeId , _ParentKey , []) ->
17391742 NodeId ;
@@ -1756,8 +1759,8 @@ trie_follow_down_create(TrieTab, XSrc, ParentId, ParentKey, [Word | Rest]) ->
17561759
17571760% % Walk down the trie following the given words, collecting the path
17581761% % for later GC. Returns {ok, LeafNodeId, Path} or error.
1759- trie_follow_down_get_path (TrieTab , XSrc , Words ) ->
1760- trie_follow_down_get_path (TrieTab , XSrc , { root , XSrc } , none , Words , []).
1762+ trie_follow_down_get_path (TrieTab , XSrc , Root , Words ) ->
1763+ trie_follow_down_get_path (TrieTab , XSrc , Root , none , Words , []).
17611764
17621765trie_follow_down_get_path (_TrieTab , _XSrc , NodeId , _ParentKey , [], Path ) ->
17631766 {ok , NodeId , Path };
@@ -1811,14 +1814,44 @@ supports_rabbit_khepri_topic_trie_v2() ->
18111814
18121815- spec supports_rabbit_khepri_topic_trie_version () -> non_neg_integer ().
18131816supports_rabbit_khepri_topic_trie_version () ->
1814- 4 .
1817+ 5 .
1818+
1819+ - spec topic_trie_table_names (non_neg_integer ()) -> {atom (), atom ()}.
1820+ topic_trie_table_names (V ) when V >= 5 ->
1821+ {rabbit_khepri_topic_trie_v5 ,
1822+ rabbit_khepri_topic_binding_v5 };
1823+ topic_trie_table_names (4 ) ->
1824+ {rabbit_khepri_topic_trie_v4 ,
1825+ rabbit_khepri_topic_binding_v4 }.
18151826
18161827get_effective_topic_binding_projection_version () ->
1817- IsEnabled = rabbit_feature_flags :is_enabled (
1818- topic_binding_projection_v4 , non_blocking ),
1819- case IsEnabled of
1820- true -> 4 ;
1821- _ -> 3
1828+ case rabbit_feature_flags :is_enabled (topic_binding_projection_v5 ,
1829+ non_blocking ) of
1830+ true ->
1831+ 5 ;
1832+ _ ->
1833+ case rabbit_feature_flags :is_enabled (topic_binding_projection_v4 ,
1834+ non_blocking ) of
1835+ true ->
1836+ 4 ;
1837+ _ ->
1838+ 3
1839+ end
1840+ end .
1841+
1842+ topic_binding_projection_v5_enable (
1843+ #{feature_name := topic_binding_projection_v5 = FeatureName }) ->
1844+ ? LOG_DEBUG (
1845+ " Feature flag `~s `: register topic binding projection v5" ,
1846+ [FeatureName ],
1847+ #{domain => ? RMQLOG_DOMAIN_DB }),
1848+ case register_rabbit_topic_trie_projection (5 ) of
1849+ ok ->
1850+ ok ;
1851+ {error , {khepri , projection_already_exists , _Info }} ->
1852+ ok ;
1853+ {error , _ } = Error ->
1854+ Error
18221855 end .
18231856
18241857topic_binding_projection_enable (
@@ -1827,7 +1860,7 @@ topic_binding_projection_enable(
18271860 " Feature flag `~s `: register topic binding projection v4" ,
18281861 [FeatureName ],
18291862 #{domain => ? RMQLOG_DOMAIN_DB }),
1830- case register_rabbit_topic_trie_projection () of
1863+ case register_rabbit_topic_trie_projection (4 ) of
18311864 ok ->
18321865 ok ;
18331866 {error , {khepri , projection_already_exists , _Info }} ->
@@ -1836,6 +1869,15 @@ topic_binding_projection_enable(
18361869 Error
18371870 end .
18381871
1872+ topic_binding_projection_v5_post_enable (
1873+ #{feature_name := topic_binding_projection_v5 = FeatureName }) ->
1874+ ? LOG_DEBUG (
1875+ " Feature flag `~s `: unregister old topic binding projections" ,
1876+ [FeatureName ],
1877+ #{domain => ? RMQLOG_DOMAIN_DB }),
1878+ unregister_old_rabbit_topic_trie_projections (),
1879+ ok .
1880+
18391881topic_binding_projection_post_enable (
18401882 #{feature_name := topic_binding_projection_v4 = FeatureName }) ->
18411883 ? LOG_DEBUG (
@@ -1849,7 +1891,11 @@ unregister_old_rabbit_topic_trie_projections() ->
18491891 OldProjections0 = [{1 , rabbit_khepri_topic_trie },
18501892 {2 , rabbit_khepri_topic_trie_v2 }],
18511893 OldProjections1 = case get_effective_topic_binding_projection_version () of
1852- V when V >= 4 ->
1894+ V when V >= 5 ->
1895+ OldProjections0 ++
1896+ [{3 , rabbit_khepri_topic_trie_v3 },
1897+ {4 , rabbit_khepri_topic_trie_v4 }];
1898+ 4 ->
18531899 OldProjections0 ++
18541900 [{3 , rabbit_khepri_topic_trie_v3 }];
18551901 _ ->
0 commit comments