Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 107 additions & 0 deletions src/khepri_cluster.erl
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@
members/0, members/1, members/2,
nodes/0, nodes/1, nodes/2,
wait_for_leader/0, wait_for_leader/1, wait_for_leader/2,
wait_for_effective_machine_version/2,
wait_for_effective_machine_version/3,
wait_for_effective_behaviour/2,
wait_for_effective_behaviour/3,
get_default_ra_system_or_data_dir/0,
get_default_store_id/0,
get_store_ids/0,
Expand Down Expand Up @@ -1384,6 +1388,109 @@ do_wait_for_leader(RaServer, WaitForProcToStart, Timeout) ->
Error
end.

-spec wait_for_effective_machine_version(StoreIdOrRaServer, MacVer) -> Ret when
StoreIdOrRaServer :: StoreId | RaServer,
StoreId :: khepri:store_id(),
RaServer :: ra:server_id(),
MacVer :: ra_machine:version() | latest,
Ret :: ok | {error, Reason},
Reason :: timeout |
?khepri_error(effective_machine_version_not_defined, map()).
%% @doc Waits for the store to run the given machine version.
%%
%% Calling this function is the same as calling
%% `wait_for_effective_machine_version(StoreId, MacVer, DefaultTimeout)' where
%% `DefaultTimeout' is returned by {@link khepri_app:get_default_timeout/0}.
%%
%% @see wait_for_effective_machine_version/3.

wait_for_effective_machine_version(StoreIdOrRaServer, MacVer) ->
Timeout = khepri_app:get_default_timeout(),
wait_for_effective_machine_version(StoreIdOrRaServer, MacVer, Timeout).

-spec wait_for_effective_machine_version(StoreIdOrRaServer, MacVer, Timeout) ->
Ret when
StoreIdOrRaServer :: StoreId | RaServer,
StoreId :: khepri:store_id(),
RaServer :: ra:server_id(),
MacVer :: ra_machine:version() | latest,
Timeout :: timeout(),
Ret :: ok | {error, Reason},
Reason :: timeout |
?khepri_error(effective_machine_version_not_defined, map()).
%% @doc Waits for the store to run the given machine version.
%%
%% `MacVer' can be an integer if the caller needs to wait for a specific state
%% machine version. It can also be the atom `latest'. In this case, the version
%% of the local state machine module is taken as this "latest" version. It is
%% possible that another node can support a greater version. Therefore, this
%% functions can wait for the latest version from the point of view of the
%% local version of Khepri, not the overnall latest version.
%%
%% @param StoreId the ID of the store.
%% @param MacVer the wanted machine version or `latest'.
%% @param Timeout the timeout.
%%
%% @returns `ok' when the state machine of the given store runs at least the
%% given version or an `{error, Reason}' tuple.
%%
%% @private

wait_for_effective_machine_version({StoreId, _Node}, MacVer, Timeout) ->
wait_for_effective_machine_version(StoreId, MacVer, Timeout);
wait_for_effective_machine_version(StoreId, MacVer, Timeout)
when ?IS_KHEPRI_STORE_ID(StoreId) ->
khepri_machine:wait_for_effective_machine_version(StoreId, MacVer, Timeout).

-spec wait_for_effective_behaviour(StoreIdOrRaServer, Behaviour) -> Ret when
StoreIdOrRaServer :: StoreId | RaServer,
StoreId :: khepri:store_id(),
RaServer :: ra:server_id(),
Behaviour :: khepri_machine:api_behaviour(),
Ret :: ok | {error, Reason},
Reason :: timeout |
?khepri_error(unknown_api_hehaviour, #{behaviour := atom()}) |
?khepri_error(effective_machine_version_not_defined, map()).
%% @doc Waits for the store to support the given API behaviour.
%%
%% Calling this function is the same as calling
%% `wait_for_effective_behaviour(StoreId, Behaviour, DefaultTimeout)' where
%% `DefaultTimeout' is returned by {@link khepri_app:get_default_timeout/0}.
%%
%% @see wait_for_effective_behaviour/3.

wait_for_effective_behaviour(StoreIdOrRaServer, Behaviour) ->
Timeout = khepri_app:get_default_timeout(),
wait_for_effective_behaviour(StoreIdOrRaServer, Behaviour, Timeout).

-spec wait_for_effective_behaviour(StoreIdOrRaServer, Behaviour, Timeout) ->
Ret when
StoreIdOrRaServer :: StoreId | RaServer,
StoreId :: khepri:store_id(),
RaServer :: ra:server_id(),
Behaviour :: khepri_machine:api_behaviour(),
Timeout :: timeout(),
Ret :: ok | {error, Reason},
Reason :: timeout |
?khepri_error(unknown_api_hehaviour, #{behaviour := atom()}) |
?khepri_error(effective_machine_version_not_defined, map()).
%% @doc Waits for the store to support the given API behaviour.
%%
%% @param StoreId the ID of the store.
%% @param Behaviour the wanted behaviour.
%% @param Timeout the timeout.
%%
%% @returns `ok' when the state machine of the given store supports the given
%% API behaviour or an `{error, Reason}' tuple.
%%
%% @private

wait_for_effective_behaviour({StoreId, _Node}, Behaviour, Timeout) ->
wait_for_effective_behaviour(StoreId, Behaviour, Timeout);
wait_for_effective_behaviour(StoreId, Behaviour, Timeout)
when ?IS_KHEPRI_STORE_ID(StoreId) ->
khepri_machine:wait_for_effective_behaviour(StoreId, Behaviour, Timeout).

-spec node_to_member(StoreId, Node) -> Member when
StoreId :: khepri:store_id(),
Node :: node(),
Expand Down
102 changes: 86 additions & 16 deletions src/khepri_machine.erl
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,9 @@
handle_tx_exception/1,
process_query/3,
process_command/3,
does_api_comply_with/2]).
does_api_comply_with/2,
wait_for_effective_machine_version/3,
wait_for_effective_behaviour/3]).

%% Internal functions to access the opaque #khepri_machine{} state.
-export([is_state/1,
Expand Down Expand Up @@ -2027,6 +2029,21 @@ clear_cached_effective_machine_version(StoreId) ->
_ = persistent_term:erase(Key),
ok.

-spec api_behaviour_to_machine_version(Behaviour) -> Ret when
Behaviour :: khepri_machine:api_behaviour(),
Ret :: MacVer | undefined,
MacVer :: 1..2.
%% @doc Returns the state machine version that implemented the given API behaviour.
%%
%% If the behaviour is unknown to this implementation, `undefined' is returned.

api_behaviour_to_machine_version(dedup_protection) -> 1;
api_behaviour_to_machine_version(delete_reason_in_node_props) -> 2;
api_behaviour_to_machine_version(indirect_deletes_in_ret) -> 2;
api_behaviour_to_machine_version(uniform_write_ret) -> 2;
api_behaviour_to_machine_version(Behaviour) when is_atom(Behaviour) ->
undefined.

-spec does_api_comply_with(Behaviour, MacVer | StoreId) -> DoesUse when
Behaviour :: khepri_machine:api_behaviour(),
MacVer :: ra_machine:version(),
Expand All @@ -2047,28 +2064,81 @@ clear_cached_effective_machine_version(StoreId) ->
%% @returns true if the given behaviour is activated, false if it is not or if
%% the behaviour is unknown.

does_api_comply_with(dedup_protection, MacVer)
when is_integer(MacVer) ->
MacVer >= 1;
does_api_comply_with(delete_reason_in_node_props, MacVer)
when is_integer(MacVer) ->
MacVer >= 2;
does_api_comply_with(indirect_deletes_in_ret, MacVer)
when is_integer(MacVer) ->
MacVer >= 2;
does_api_comply_with(uniform_write_ret, MacVer)
when is_integer(MacVer) ->
MacVer >= 2;
does_api_comply_with(_Behaviour, MacVer)
when is_integer(MacVer) andalso MacVer >= 0 ->
false;
does_api_comply_with(Behaviour, MacVer) when is_integer(MacVer) ->
RequiredVersion = api_behaviour_to_machine_version(Behaviour),
is_integer(RequiredVersion) andalso MacVer >= RequiredVersion;
does_api_comply_with(Behaviour, StoreId)
when ?IS_KHEPRI_STORE_ID(StoreId) ->
case effective_version(StoreId) of
{ok, MacVer} -> does_api_comply_with(Behaviour, MacVer);
_ -> false
end.

-spec wait_for_effective_machine_version(StoreId, MacVer, Timeout) -> Ret when
StoreId :: khepri:store_id(),
MacVer :: ra_machine:version() | latest,
Timeout :: timeout(),
Ret :: ok | {error, Reason},
Reason :: timeout |
?khepri_error(effective_machine_version_not_defined, map()).
%% @doc Waits for the store to run the given machine version.
%%
%% @private

wait_for_effective_machine_version(StoreId, MacVer, Timeout)
when MacVer =:= latest orelse
(is_integer(MacVer) andalso MacVer >= 0) ->
T0 = khepri_utils:start_timeout_window(Timeout),
ExpectedMacVer = case MacVer of
latest -> version();
_ -> MacVer
end,
case effective_version(StoreId) of
{ok, EffectiveMacVer} ->
case EffectiveMacVer >= ExpectedMacVer of
true ->
ok;
false when ?HAS_TIME_LEFT(Timeout) ->
timer:sleep(50),
NewTimeout = khepri_utils:end_timeout_window(Timeout, T0),
wait_for_effective_machine_version(
StoreId, ExpectedMacVer, NewTimeout);
false ->
{error, timeout}
end;
{error, _} when ?HAS_TIME_LEFT(Timeout) ->
timer:sleep(50),
NewTimeout = khepri_utils:end_timeout_window(Timeout, T0),
wait_for_effective_machine_version(
StoreId, ExpectedMacVer, NewTimeout);
{error, _} = Error ->
Error
end.

-spec wait_for_effective_behaviour(StoreId, Behaviour, Timeout) -> Ret when
StoreId :: khepri:store_id(),
Behaviour :: khepri_machine:api_behaviour(),
Timeout :: timeout(),
Ret :: ok | {error, Reason},
Reason :: timeout |
?khepri_error(unknown_api_hehaviour, map()) |
?khepri_error(effective_machine_version_not_defined, map()).
%% @doc Waits for the store to support the given API behaviour.
%%
%% @private

wait_for_effective_behaviour(StoreId, Behaviour, Timeout) ->
case api_behaviour_to_machine_version(Behaviour) of
RequiredMacVer when is_integer(RequiredMacVer) ->
wait_for_effective_machine_version(
StoreId, RequiredMacVer, Timeout);
undefined ->
Reason = ?khepri_error(
unknown_api_hehaviour,
#{behaviour => Behaviour}),
{error, Reason}
end.

%% -------------------------------------------------------------------
%% Internal functions.
%% -------------------------------------------------------------------
Expand Down
57 changes: 54 additions & 3 deletions test/cluster_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@
projections_are_updated_when_a_snapshot_is_installed/1,
async_command_leader_change_in_three_node_cluster/1,
spam_txs_during_election/1,
spam_changes_during_unregister_projections/1]).
spam_changes_during_unregister_projections/1,
can_wait_for_effective_behaviour/1]).

all() ->
[
Expand Down Expand Up @@ -79,7 +80,8 @@ groups() ->
fail_to_start_with_bad_ra_server_config,
initial_members_are_ignored,
fail_to_join_non_existing_node,
can_set_snapshot_interval
can_set_snapshot_interval,
can_wait_for_effective_behaviour
]}
]},
{cluster, [],
Expand All @@ -104,7 +106,8 @@ groups() ->
projections_are_updated_when_a_snapshot_is_installed,
async_command_leader_change_in_three_node_cluster,
spam_txs_during_election,
spam_changes_during_unregister_projections
spam_changes_during_unregister_projections,
can_wait_for_effective_behaviour
]}
]}
]}
Expand Down Expand Up @@ -2630,3 +2633,51 @@ spam_async_changes(Config, Parent, Node, StoreId, Path, Runs) ->
spam_async_changes(
Config, Parent, Node, StoreId, Path, NewRuns)
end.

can_wait_for_effective_behaviour(Config) ->
PropsPerNode = ?config(ra_system_props, Config),
[FirstNode | OtherNodes] = Nodes = maps:keys(PropsPerNode),

%% We assume all nodes are using the same Ra system name & store ID.
RaSystem = helpers:get_ra_system_name(Config),
StoreId = RaSystem,

ct:pal("Start database + cluster nodes"),
lists:foreach(
fun(Node) ->
ct:pal("- khepri:start() from node ~s", [Node]),
?assertEqual(
{ok, StoreId},
helpers:call(
Config, Node, khepri, start, [RaSystem, StoreId]))
end, Nodes),
lists:foreach(
fun(Node) ->
ct:pal("- khepri_cluster:join() from node ~s", [Node]),
?assertEqual(
ok,
helpers:call(
Config, Node, khepri_cluster, join, [StoreId, FirstNode]))
end, OtherNodes),

ct:pal("Wait for the `uniform_write_ret` behaviour"),
lists:foreach(
fun(Node) ->
ct:pal(
"- khepri_cluster:wait_for_effective_behaviour() from node ~s",
[Node]),
?assertEqual(
ok,
helpers:call(
Config, Node,
khepri_cluster, wait_for_effective_behaviour,
[{StoreId, Node}, uniform_write_ret])),

?assertMatch(
{error,
?khepri_error(unknown_api_hehaviour, _)},
helpers:call(
Config, Node,
khepri_cluster, wait_for_effective_behaviour,
[{StoreId, Node}, random_unknown_behaviour]))
end, Nodes).
Loading