Skip to content

Commit f847caf

Browse files
authored
Merge pull request #9 from rabbitmq/writer_dedupe
writer deduplication
2 parents 728969a + 3e234b8 commit f847caf

File tree

8 files changed

+847
-359
lines changed

8 files changed

+847
-359
lines changed

src/osiris.erl

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,10 @@
99

1010
-include("osiris.hrl").
1111
-export([
12-
write/3,
12+
write/4,
1313
write_tracking/3,
1414
read_tracking/2,
15+
fetch_writer_seq/2,
1516
init_reader/2,
1617
register_offset_listener/2,
1718
register_offset_listener/3,
@@ -50,14 +51,21 @@
5051
-type retention_spec() :: {max_bytes, non_neg_integer()} |
5152
{max_age, milliseconds()}.
5253

54+
-type writer_id() :: binary().
55+
-type data() :: iodata() |
56+
{batch, non_neg_integer(), 0, iodata()}.
57+
5358
-export_type([
5459
state/0,
5560
config/0,
5661
offset/0,
5762
epoch/0,
5863
tail_info/0,
5964
offset_spec/0,
60-
retention_spec/0
65+
retention_spec/0,
66+
milliseconds/0,
67+
writer_id/0,
68+
data/0
6169
]).
6270

6371
-spec start_cluster(config()) ->
@@ -97,8 +105,12 @@ start_writer(Config) ->
97105
start_replica(Replica, Config) ->
98106
osiris_replica:start(Replica, Config).
99107

100-
write(Pid, Corr, Data) ->
101-
osiris_writer:write(Pid, self(), Corr, Data).
108+
-spec write(Pid :: pid(),
109+
WriterId :: binary() | undefined,
110+
CorrOrSeq :: non_neg_integer() | term(),
111+
Data :: data()) -> ok.
112+
write(Pid, WriterId, Corr, Data) ->
113+
osiris_writer:write(Pid, self(), WriterId, Corr, Data).
102114

103115
-spec write_tracking(pid(), binary(), offset()) -> ok.
104116
write_tracking(Pid, TrackingId, Offset) ->
@@ -108,6 +120,18 @@ write_tracking(Pid, TrackingId, Offset) ->
108120
read_tracking(Pid, TrackingId) ->
109121
osiris_writer:read_tracking(Pid, TrackingId).
110122

123+
-spec fetch_writer_seq(pid(), binary()) ->
124+
non_neg_integer() | undefined.
125+
fetch_writer_seq(Pid, WriterId) when is_pid(Pid) andalso is_binary(WriterId) ->
126+
osiris_writer:query_writers(Pid,
127+
fun(W) ->
128+
case maps:get(WriterId, W, undefined) of
129+
undefined -> undefined;
130+
{_, _, Seq} ->
131+
Seq
132+
end
133+
end).
134+
111135
%% @doc Initialise a new offset reader
112136
%% @param Pid the pid of a writer or replica process
113137
%% @param OffsetSpec specifies where in the log to attach the reader

src/osiris.hrl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,13 @@
2929
-define(MAGIC, 5).
3030
%% chunk format version
3131
-define(VERSION, 0).
32-
-define(HEADER_SIZE_B, 40).
32+
-define(HEADER_SIZE_B, 44).
3333
-define(FILE_OPTS_WRITE, [raw, binary, write, read]).
3434

3535

3636
%% chunk types
3737
-define(CHNK_USER, 0).
3838
-define(CHNK_TRK_DELTA, 1).
3939
-define(CHNK_TRK_SNAPSHOT, 2).
40+
-define(CHNK_WRT_SNAPSHOT, 3).
4041

src/osiris_bench.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ do_publish0(Conf, 0) ->
8383
end;
8484
do_publish0(#{leader := Leader} = Conf, InFlight) ->
8585
Ref = make_ref(),
86-
ok = osiris:write(Leader, Ref, <<"datadata">>),
86+
ok = osiris:write(Leader, undefined, Ref, <<"datadata">>),
8787
do_publish0(Conf, InFlight - 1).
8888

8989
start_metrics_gatherer(Node) ->

0 commit comments

Comments
 (0)