Skip to content

Commit 97b9fc4

Browse files
authored
Merge pull request #366 from rabbitmq/support-multi-table-projections
khepri_projection: Support multi-table projections
2 parents f5613b5 + 9ec163b commit 97b9fc4

5 files changed

Lines changed: 670 additions & 113 deletions

File tree

doc/overview.edoc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -587,3 +587,7 @@ Projections have some costs though. Expect some increased memory consumption
587587
since information in the projection tables is duplicated between the Khepri
588588
store and ETS. Khepri may also take longer to accomplish writes since
589589
projections are updated by Khepri synchronously when writing to the store.
590+
591+
There are several types of projection functions, ranging from the most
592+
efficient to the more flexible one capable of managing several ETS tables. See
593+
{@link khepri_projection} for more details.

src/khepri_machine.erl

Lines changed: 35 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,15 @@
5757
%% </ul>
5858
%% </td>
5959
%% </tr>
60+
%% <tr>
61+
%% <td style="text-align: right; vertical-align: top;">2</td>
62+
%% <td>
63+
%% <ul>
64+
%% <li>Added support for multi-table projections (see {@link
65+
%% khepri_projection}).</li>
66+
%% </ul>
67+
%% </td>
68+
%% </tr>
6069
%% </table>
6170

6271
-module(khepri_machine).
@@ -668,14 +677,26 @@ register_projection(
668677
ets_options = EtsOptions} = Projection,
669678
Options)
670679
when is_atom(Name) andalso
671-
is_list(EtsOptions) andalso
680+
?ARE_PROJECTION_ETS_OPTIONS(EtsOptions) andalso
672681
(?IS_HORUS_STANDALONE_FUN(ProjectionFun) orelse
673682
ProjectionFun =:= copy) ->
674-
PathPattern = khepri_path:from_string(PathPattern0),
675-
khepri_path:ensure_is_valid(PathPattern),
676-
Command = #register_projection{pattern = PathPattern,
677-
projection = Projection},
678-
process_command(StoreId, Command, Options).
683+
Timeout = get_timeout(Options),
684+
T0 = khepri_utils:start_timeout_window(Timeout),
685+
Compatible = khepri_projection:check_compatibility_with_store(
686+
StoreId, Projection, Timeout),
687+
case Compatible of
688+
ok ->
689+
NewTimeout = khepri_utils:end_timeout_window(Timeout, T0),
690+
Options1 = Options#{timeout => NewTimeout},
691+
692+
PathPattern = khepri_path:from_string(PathPattern0),
693+
khepri_path:ensure_is_valid(PathPattern),
694+
Command = #register_projection{pattern = PathPattern,
695+
projection = Projection},
696+
process_command(StoreId, Command, Options1);
697+
{error, _Reason} = Error ->
698+
Error
699+
end.
679700

680701
-spec unregister_projections(StoreId, Names, Options) -> Ret when
681702
StoreId :: khepri:store_id(),
@@ -1957,17 +1978,18 @@ overview(State) ->
19571978
keep_while_conds => KeepWhileConds}.
19581979

19591980
-spec version() -> MacVer when
1960-
MacVer :: 2.
1981+
MacVer :: 3.
19611982
%% @doc Returns the state machine version.
19621983

19631984
version() ->
1964-
2.
1985+
3.
19651986

19661987
-spec which_module(MacVer) -> Module when
1967-
MacVer :: 0..2,
1988+
MacVer :: 0..3,
19681989
Module :: ?MODULE.
19691990
%% @doc Returns the state machine module corresponding to the given version.
19701991

1992+
which_module(3) -> ?MODULE;
19711993
which_module(2) -> ?MODULE;
19721994
which_module(1) -> ?MODULE;
19731995
which_module(0) -> ?MODULE.
@@ -2041,6 +2063,7 @@ api_behaviour_to_machine_version(dedup_protection) -> 1;
20412063
api_behaviour_to_machine_version(delete_reason_in_node_props) -> 2;
20422064
api_behaviour_to_machine_version(indirect_deletes_in_ret) -> 2;
20432065
api_behaviour_to_machine_version(uniform_write_ret) -> 2;
2066+
api_behaviour_to_machine_version(multi_table_projections) -> 3;
20442067
api_behaviour_to_machine_version(Behaviour) when is_atom(Behaviour) ->
20452068
undefined.
20462069

@@ -2830,7 +2853,9 @@ convert_state1(State, 0, 1) ->
28302853
convert_state1(State, 1, 2) ->
28312854
Tree = get_tree(State),
28322855
Tree1 = khepri_tree:convert_tree(Tree, 1, 2),
2833-
set_tree(State, Tree1).
2856+
set_tree(State, Tree1);
2857+
convert_state1(State, 2, 3) ->
2858+
State.
28342859

28352860
-spec update_projections(OldState, NewState) -> ok when
28362861
OldState :: khepri_machine:state(),

0 commit comments

Comments
 (0)