Skip to content
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
9ad650c
feat(merge_table): Add schema handling for merge tables
ieQu1 Apr 14, 2026
943401f
feat(merge_table): Bump protocol version
ieQu1 Apr 14, 2026
87a4d06
refactor(replica): Register replica process via gproc
ieQu1 Apr 14, 2026
5e29c6d
refactor(merge_table): Rename core_shard_sup to shard_upstream_sup
ieQu1 Apr 14, 2026
678caf0
refactor: Rename replicant_shard_sup to shard_downstream_sup
ieQu1 Apr 14, 2026
50235d0
feat(merged_table): Implement bootstrap/cleanup for merged tables
ieQu1 Apr 14, 2026
bb9558f
fix(merge_table): Add local_content property to merge tables
ieQu1 Apr 14, 2026
638b511
test: Start test_shard explicitly
ieQu1 Apr 14, 2026
2dfed3d
feat(schema): Cache information about merge shards
ieQu1 Apr 14, 2026
3a64e60
feat(trans): Implement mria_rlog:shard_writes function
ieQu1 Apr 14, 2026
5b93586
test: Add wait_tables/2 function
ieQu1 Apr 15, 2026
9b3c243
test(merge_table): Verify schema persistent term
ieQu1 Apr 15, 2026
0530517
feat(merge_table): Add upstream as parameter to replica worker
ieQu1 Apr 15, 2026
4b7b046
feat(merge_table): Draft of implementation
ieQu1 Apr 15, 2026
d0ec2e0
fix(bootstrapper): Import merge_table batches directly to ETS
ieQu1 Apr 15, 2026
dcf303d
feat(merge_table): Import via ETS
ieQu1 Apr 16, 2026
bc77027
test(merge_table): Fix test
ieQu1 Apr 16, 2026
b5869c9
fix(merge_table): Minor fixes
ieQu1 Apr 16, 2026
c763062
feat(merge_table): Verify merge table via single match spec
ieQu1 Apr 16, 2026
e6341ad
fix(merge_table): Simplify node enforcement
ieQu1 Apr 16, 2026
bc58223
fix(merge_table): Add transaction interceptor on replicant
ieQu1 Apr 16, 2026
c3d7dc7
test(merge_table): Test dirty operations
ieQu1 Apr 16, 2026
721e62c
test(merge_table): Test bootstrapping
ieQu1 Apr 16, 2026
6cfc37b
fix(membership): Do not reply to pings unless local member present
ieQu1 Apr 16, 2026
b311a1a
feat(merge_table): Autoclean
ieQu1 Apr 16, 2026
8f16439
fix: Fix return value of start_shard
ieQu1 Apr 16, 2026
04a971a
test(fault-tolerance): Fix getting pid of replica worker
ieQu1 Apr 16, 2026
239a926
feat(merge_table): Support node pattern with multiple clauses
ieQu1 Apr 17, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,40 @@ Transactions for each shard are replicated independently.
Currently transaction can only modify tables in one shard.
Usually it is a good idea to group all tables that belong to a particular OTP application in one shard.

## Merge tables

Tables where every entry includes ID of the node that created the record are called "merge tables".

How to create such tables:

```erlang
mria:create_table(my_merged, [ {type, ordered_set}
, {rlog_shard, shard_id}
, {node_pattern, #my_record{key = {'_', '$1'}, _ = '_'}}
, {merge_table, true}
, {auto_clean, boolean()} %% Optional, default = false
])
```

1. All tables in the shard must have `merge_table` property = `true`.
2. `node_pattern` property is mandatory.
Its value must be an ets match pattern with one free variable: `'$1'`.
Mria verifies that this value is set to `node()` for each record.
Comment thread
ieQu1 marked this conversation as resolved.
3. `auto_clean` is an optional property that allows a downstream node to clean all records owned by an upstream node when the latter disconnects.

Unlike regular tables,
both cores and replicants use local writes to update such tables.
In a clustered setup,
the contents of the merge table consist of records from all reachable peer nodes.
Records from remote nodes are read-only.

Comment on lines +75 to +93
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The README states that Mria verifies node_pattern is set to node() for each record, but the enforcement added in this PR appears to happen only for transactional ops (in mria_upstream:transactional_wrapper/3) and only for write/delete_object. Dirty operations (and transactional delete) currently bypass this check. Consider clarifying the doc to match current behavior, or extending enforcement so the statement is accurate.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Member Author

@ieQu1 ieQu1 Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is the intent, lack of enforcement for dirty writes is an implementation detail. Yes, currently unsound dirty operations can corrupt the local state, but dirty operations are called dirty for a reason.

### Limitations

- Only `ram_copies` storage is currently supported.

- Importing of data from remote nodes is always done using dirty operations.
`mria:transaction` and `mria:ro_transaction` interfaces do not guarantee atomicity when reading data from the remote nodes.

## Enabling RLOG in your application

It is important to make the application code compatible with the RLOG feature by using the correct APIs.
Expand Down
3 changes: 2 additions & 1 deletion rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.4.1"}}},
{replayq, {git, "https://github.com/emqx/replayq", {tag, "0.3.6"}}},
{mnesia_rocksdb, {git, "https://github.com/emqx/mnesia_rocksdb", {tag, "0.1.17"}}},
{optvar, {git, "https://github.com/emqx/optvar", {tag, "1.0.5"}}}
{optvar, {git, "https://github.com/emqx/optvar", {tag, "1.0.5"}}},
{gproc, {git, "https://github.com/uwiger/gproc", {tag, "1.1.0"}}}
]}.

{erl_opts,
Expand Down
3 changes: 2 additions & 1 deletion src/mria.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
gen_rpc,
replayq,
snabbkaffe,
optvar
optvar,
gproc
]},
{modules, []},
{licenses, ["Apache 2.0"]},
Expand Down
127 changes: 60 additions & 67 deletions src/mria.erl
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2019-2025 EMQ Technologies Co., Ltd. All Rights Reserved.
%% Copyright (c) 2019-2026 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -347,34 +347,35 @@ wait_for_tables(Tables) ->
end.

-spec ro_transaction(mria_rlog:shard(), fun(() -> A)) -> t_result(A).
ro_transaction(?LOCAL_CONTENT_SHARD, Fun) ->
maybe_middleman(mnesia, transaction, [fun ro_transaction/1, [Fun]]);
ro_transaction(Shard, Fun) ->
case mria_rlog:role() of
core ->
maybe_middleman(mnesia, transaction, [fun ro_transaction/1, [Fun]]);
replicant ->
?tp(mria_ro_transaction, #{role => replicant}),
case mria_status:upstream(Shard) of
{ok, AgentPid} ->
Ret = maybe_middleman(mnesia, transaction, [fun ro_transaction/1, [Fun]]),
%% Now we check that the agent pid is still the
%% same, meaning the replicant node haven't gone
%% through bootstrapping process while running the
%% transaction and it didn't have a chance to
%% observe the stale writes.
case mria_status:upstream(Shard) of
{ok, AgentPid} ->
Ret;
_ ->
%% Restart transaction. If the shard is
%% still disconnected, it will become an
%% RPC call to a core node:
ro_transaction(Shard, Fun)
end;
disconnected ->
ro_trans_rpc(Shard, Fun)
end
maybe
{ok, Writes} ?= mria_rlog:shard_writes(Shard),
case Writes of
remote ->
?tp(mria_ro_transaction, #{role => replicant}),
case mria_status:upstream(Shard) of
{ok, AgentPid} ->
Ret = maybe_middleman(mnesia, transaction, [fun ro_transaction/1, [Fun]]),
%% Now we check that the agent pid is still the
%% same, meaning the replicant node haven't gone
%% through bootstrapping process while running the
%% transaction and it didn't have a chance to
%% observe the stale writes.
case mria_status:upstream(Shard) of
{ok, AgentPid} ->
Ret;
_ ->
%% Restart transaction. If the shard is
%% still disconnected, it will become an
%% RPC call to a core node:
ro_transaction(Shard, Fun)
end;
disconnected ->
ro_trans_rpc(Shard, Fun)
end;
_ ->
maybe_middleman(mnesia, transaction, [fun ro_transaction/1, [Fun]])
end
end.

%% @doc Synchronous transaction.
Expand All @@ -392,15 +393,16 @@ ro_transaction(Shard, Fun) ->
-spec sync_transaction(mria_rlog:shard(), fun((...) -> A), list(), timeout()) ->
t_result(A) | {timeout, t_result(A)} | {timeout, {error, shard_not_ready}}.
sync_transaction(Shard, Function, Args, ReplTimeout) ->
case {mria_config:whoami(), Shard} of
{mnesia, _} ->
maybe_middleman(mnesia, transaction, [Function, Args]);
{_, ?LOCAL_CONTENT_SHARD} ->
maybe_middleman(mria_upstream, transactional_wrapper, [?LOCAL_CONTENT_SHARD, Function, Args]);
{core, _} ->
maybe_middleman(mria_upstream, transactional_wrapper, [Shard, Function, Args]);
{replicant, _} ->
sync_replicant_trans(Shard, Function, Args, ReplTimeout)
maybe
{ok, Writes} ?= mria_rlog:shard_writes(Shard),
case Writes of
mnesia ->
maybe_middleman(mnesia, transaction, [Function, Args]);
local ->
maybe_middleman(mria_upstream, transactional_wrapper, [Shard, Function, Args]);
remote ->
sync_replicant_trans(Shard, Function, Args, ReplTimeout)
end
end.

-spec sync_transaction(mria_rlog:shard(), fun((...) -> A), list()) ->
Expand All @@ -415,34 +417,35 @@ sync_transaction(Shard, Fun) ->

-spec transaction(mria_rlog:shard(), fun((...) -> A), list()) -> t_result(A).
transaction(Shard, Function, Args) ->
case {mria_config:whoami(), Shard} of
{mnesia, _} ->
maybe_middleman(mnesia, transaction, [Function, Args]);
{_, ?LOCAL_CONTENT_SHARD} ->
maybe_middleman(mria_upstream, transactional_wrapper, [?LOCAL_CONTENT_SHARD, Function, Args]);
{core, _} ->
maybe_middleman(mria_upstream, transactional_wrapper, [Shard, Function, Args]);
{replicant, _} ->
rpc_to_core_node(Shard, mria_upstream, transactional_wrapper, [Shard, Function, Args])
maybe
{ok, Writes} ?= mria_rlog:shard_writes(Shard),
case Writes of
mnesia ->
maybe_middleman(mnesia, transaction, [Function, Args]);
local ->
maybe_middleman(mria_upstream, transactional_wrapper, [Shard, Function, Args]);
remote ->
rpc_to_core_node(Shard, mria_upstream, transactional_wrapper, [Shard, Function, Args])
end
end.

-spec transaction(mria_rlog:shard(), fun(() -> A)) -> t_result(A).
transaction(Shard, Fun) ->
transaction(Shard, Fun, []).

-spec async_dirty(mria_rlog:shard(), fun((...) -> A), list()) -> A | no_return().
-spec async_dirty(mria_rlog:shard(), fun((...) -> A), list()) -> A.
async_dirty(Shard, Fun, Args) ->
call_backend_rw(Shard, mnesia, async_dirty, [Fun, Args]).

-spec async_dirty(mria_rlog:shard(), fun(() -> A)) -> A | no_return().
-spec async_dirty(mria_rlog:shard(), fun(() -> A)) -> A.
async_dirty(Shard, Fun) ->
async_dirty(Shard, Fun, []).

-spec sync_dirty(mria_rlog:shard(), fun((...) -> A), list()) -> A | no_return().
-spec sync_dirty(mria_rlog:shard(), fun((...) -> A), list()) -> A.
sync_dirty(Shard, Fun, Args) ->
call_backend_rw(Shard, mnesia, sync_dirty, [Fun, Args]).

-spec sync_dirty(mria_rlog:shard(), fun(() -> A)) -> A | no_return().
-spec sync_dirty(mria_rlog:shard(), fun(() -> A)) -> A.
sync_dirty(Shard, Fun) ->
sync_dirty(Shard, Fun, []).

Expand Down Expand Up @@ -519,11 +522,14 @@ call_backend_rw_dirty(Function, Table, Args) ->

-spec call_backend_rw(mria_rlog:shard(), module(), atom(), list()) -> term().
call_backend_rw(Shard, Module, Function, Args) ->
case is_upstream(Shard) of
true ->
case mria_rlog:shard_writes(Shard) of
{ok, remote} ->
rpc_to_core_node(Shard, Module, Function, Args);
{ok, _} ->
%% Core or mnesia:
maybe_middleman(Module, Function, Args);
false ->
rpc_to_core_node(Shard, Module, Function, Args)
Badshard ->
exit(Badshard)
end.

-spec maybe_middleman(module(), atom(), list()) -> term().
Expand Down Expand Up @@ -663,19 +669,6 @@ do_assert_ro_trans() ->
Ops -> error({transaction_is_not_readonly, Ops})
end.

%% @doc Return `true' if the local node is the upstream for the shard.
-spec is_upstream(mria_rlog:shard()) -> boolean().
is_upstream(Shard) ->
case mria_config:whoami() of
replicant ->
case Shard of
?LOCAL_CONTENT_SHARD -> true;
_ -> false
end;
_ -> % core or mnesia
true
end.

%% Stop the application and reload the basic config from scratch.
-spec prep_restart(stop_reason()) -> ok.
prep_restart(Reason) ->
Expand Down
73 changes: 52 additions & 21 deletions src/mria_bootstrapper.erl
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%% Copyright (c) 2021-2026 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -45,16 +45,18 @@
%%================================================================================

-define(clear_table, clear_table).
-define(clear_table(NODE), {clear_table, NODE}).

-type batch() :: { _From :: pid()
, _Table :: mria:table()
, _Records :: [tuple()] | ?clear_table
, _Records :: [tuple()] | ?clear_table | ?clear_table(node())
}.

-record(iter,
{ table :: mria:table()
, storage :: atom() | {ext, _, _}
, state :: _
{ table :: mria:table()
, storage :: atom() | {ext, _, _}
, state :: _
, is_merge_shard :: boolean()
}).

-record(server,
Expand All @@ -65,9 +67,10 @@
}).

-record(client,
{ shard :: mria_rlog:shard()
, server :: pid()
, parent :: pid()
{ shard :: mria_rlog:shard()
, server :: pid()
, parent :: pid()
, is_merge_shard :: boolean()
}).

%%================================================================================
Expand Down Expand Up @@ -123,9 +126,11 @@ init({client, Shard, RemoteNode, Parent}) ->
}),
mria_status:notify_replicant_bootstrap_start(Shard),
{ok, Pid} = mria_rlog_server:bootstrap_me(RemoteNode, Shard),
{ok, #client{ parent = Parent
, shard = Shard
, server = Pid
{ok, IsMerge} = mria_schema:is_merge_shard(Shard),
{ok, #client{ parent = Parent
, shard = Shard
, server = Pid
, is_merge_shard = IsMerge
}}.

handle_info(loop, St = #server{}) ->
Expand All @@ -144,8 +149,8 @@ handle_call({complete, Server, Checkpoint}, From, St = #client{server = Server,
gen_server:reply(From, ok),
mria_status:notify_replicant_bootstrap_complete(Shard),
{stop, normal, St};
handle_call({batch, {Server, Table, Records}}, _From, St = #client{server = Server, shard = Shard}) ->
handle_batch(Server, Table, Records),
handle_call({batch, {Server, Table, Records}}, _From, St = #client{server = Server, shard = Shard, is_merge_shard = IsMerge}) ->
handle_batch(IsMerge, Server, Table, Records),
mria_status:notify_replicant_bootstrap_import(Shard),
{reply, ok, St};
handle_call(Call, From, St) ->
Expand All @@ -166,7 +171,8 @@ terminate(_Reason, St = #client{}) ->
%% Internal functions
%%================================================================================

-spec push_records(mria_lib:subscriber(), mria:table(), [tuple()] | ?clear_table) -> ok | {badrpc, _}.
-spec push_records(mria_lib:subscriber(), mria:table(), Commands) -> ok | {badrpc, _}
when Commands :: [tuple()] | ?clear_table | ?clear_table(node()).
push_records(Subscriber, Table, Records) ->
push_batch(Subscriber, {self(), Table, Records}).

Expand All @@ -178,12 +184,18 @@ push_batch({Node, Pid}, Batch = {_, _, _}) ->
complete({Node, Pid}, Server, Checkpoint) ->
mria_lib:rpc_call_nothrow(Node, ?MODULE, do_complete, [Pid, Server, Checkpoint]).

handle_batch(_Server, Table, ?clear_table) ->
handle_batch(_IsMerge, _Server, Table, ?clear_table) ->
mria_schema:ensure_local_table(Table),
{atomic, ok} = mnesia:clear_table(Table),
ok;
handle_batch(_Server, Table, Records) ->
lists:foreach(fun(I) -> mnesia:dirty_write(Table, I) end, Records).
handle_batch(_IsMerge, _Server, Table, ?clear_table(Node)) ->
mria_schema:ensure_local_table(Table),
mria_rlog_replica:clean_merge_table(Table, Node),
ok;
handle_batch(false, _Server, Table, Records) ->
lists:foreach(fun(I) -> mnesia:dirty_write(Table, I) end, Records);
handle_batch(true, _Server, Table, Records) ->
ets:insert(Table, Records).

server_loop(St = #server{tables = [], subscriber = Subscriber, iterator = undefined}) ->
%% All tables and chunks have been sent:
Expand Down Expand Up @@ -237,22 +249,36 @@ iter_start(Subscriber, Table, BatchSize) ->
%% Push an empty batch to the replica to make sure it created the
%% local table before we start actual iteration and the receiving
%% table is empty:
push_records(Subscriber, Table, ?clear_table),
{ok, IsMerge} = mria_schema:is_merge_table(Table),
ClearCommand = case IsMerge of
true -> ?clear_table(node());
false -> ?clear_table
end,
push_records(Subscriber, Table, ClearCommand),
%% Start iteration over records:
mnesia_lib:db_fixtable(Storage, Table, true),
Iter0 = #iter{ table = Table
, storage = Storage
, is_merge_shard = IsMerge
},
case mnesia_lib:db_init_chunk(Storage, Table, BatchSize) of
InitChunk = case IsMerge of
true ->
{ok, NodePattern} = mria_schema:get_merged_table_node_pattern(Table),
MS = {NodePattern, [{'==', '$1', node()}], ['$_']},
ets:select(Table, [MS], BatchSize);
false ->
mnesia_lib:db_init_chunk(Storage, Table, BatchSize)
end,
case InitChunk of
{Matches, Cont} ->
{Iter0#iter{state = Cont}, Matches};
?end_of_table ->
{Iter0, ?end_of_table}
end.

-spec iter_next(#iter{}) -> {#iter{}, [tuple()] | ?end_of_table}.
iter_next(Iter0 = #iter{storage = Storage, state = State}) ->
case mnesia_lib:db_chunk(Storage, State) of
iter_next(Iter0 = #iter{storage = Storage, state = State, is_merge_shard = IsMerge}) ->
case next_chunk(IsMerge, Storage, State) of
{Matches, Cont} ->
{Iter0#iter{state = Cont}, Matches};
?end_of_table ->
Expand All @@ -262,3 +288,8 @@ iter_next(Iter0 = #iter{storage = Storage, state = State}) ->
-spec iter_end(#iter{}) -> ok.
iter_end(#iter{table = Table, storage = Storage}) ->
mnesia_lib:db_fixtable(Storage, Table, false).

next_chunk(false, Storage, Iter) ->
mnesia_lib:db_chunk(Storage, Iter);
next_chunk(true, ram_copies, Iter) ->
ets:select(Iter).
Loading
Loading