Skip to content

Commit eecbfdd

Browse files
committed
Writer tracking fixes
Ensuring that duplicate writes aren't confirmed earlier than the original write. Also adding osiris_log:read_header/1 to allow readers to only read the header.
1 parent e1d852b commit eecbfdd

File tree

5 files changed

+330
-222
lines changed

5 files changed

+330
-222
lines changed

src/osiris.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ fetch_writer_seq(Pid, WriterId) when is_pid(Pid) andalso is_binary(WriterId) ->
127127
fun(W) ->
128128
case maps:get(WriterId, W, undefined) of
129129
undefined -> undefined;
130-
{_, Seq} ->
130+
{_, _, Seq} ->
131131
Seq
132132
end
133133
end).

src/osiris_log.erl

Lines changed: 134 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
init_data_reader/2,
2727
init_offset_reader/2,
28+
read_header/1,
2829
read_chunk/1,
2930
read_chunk_parsed/1,
3031
committed_offset/1,
@@ -131,6 +132,17 @@
131132
-type record() :: {offset(), iodata()}.
132133
-type offset_spec() :: osiris:offset_spec().
133134
-type retention_spec() :: osiris:retention_spec().
135+
-type header_map() :: #{chunk_id => offset(),
136+
epoch => epoch(),
137+
type => chunk_type(),
138+
crc => integer(),
139+
num_records => non_neg_integer(),
140+
num_entries => non_neg_integer(),
141+
timestamp => osiris:milliseconds(),
142+
data_size => non_neg_integer(),
143+
trailer_size => non_neg_integer(),
144+
header_data => binary()}.
145+
134146

135147
%% holds static or rarely changing fields
136148
-record(cfg, {directory :: file:filename(),
@@ -158,7 +170,8 @@
158170
%% the current offset tracking state
159171
tracking = #{} :: #{tracking_id() => offset()},
160172
writers = #{} :: #{osiris:writer_id() =>
161-
{osiris:milliseconds(), non_neg_integer()}}
173+
{offset(), osiris:milliseconds(),
174+
non_neg_integer()}}
162175
}).
163176

164177
-record(?MODULE, {cfg :: #cfg{},
@@ -354,7 +367,7 @@ write_wrt_snapshot(Writers,
354367
#?MODULE{cfg = #cfg{},
355368
mode = #write{} = W0} = State0) ->
356369
WData = maps:fold(
357-
fun (W, {T, S}, Acc) ->
370+
fun (W, {_O, T, S}, Acc) ->
358371
[<<(byte_size(W)):8/unsigned,
359372
W/binary,
360373
T:64/unsigned,
@@ -764,10 +777,35 @@ tracking(#?MODULE{mode = #write{tracking = Tracking}}) ->
764777
Tracking.
765778

766779
-spec writers(state()) ->
767-
#{osiris:writer_id() => {osiris:milliseconds(), non_neg_integer()}}.
780+
#{osiris:writer_id() => {offset(), osiris:milliseconds(), non_neg_integer()}}.
768781
writers(#?MODULE{mode = #write{writers = Writers}}) ->
769782
Writers.
770783

784+
-spec read_header(state()) ->
785+
{ok, header_map(), state()} |
786+
{end_of_stream, state()} |
787+
{error, {invalid_chunk_header, term()}}.
788+
read_header(#?MODULE{cfg = #cfg{},
789+
mode = #read{} = Read,
790+
fd = Fd} = State) ->
791+
%% reads the next chunk of entries, parsed
792+
%% NB: this may return records before the requested index,
793+
%% that is fine - the reading process can do the appropriate filtering
794+
case read_header0(State) of
795+
{ok, #{num_records := NumRecords,
796+
data_size := DataSize,
797+
trailer_size := TrailerSize} = Header} ->
798+
%% skip data portion
799+
{ok, _} = file:position(Fd, {cur, DataSize + TrailerSize}),
800+
{ok, Header,
801+
State#?MODULE{mode = incr_next_offset(NumRecords, Read)}};
802+
{end_of_stream, _} = EOF ->
803+
EOF;
804+
{error, _} = Err ->
805+
Err
806+
end.
807+
808+
771809
-spec read_chunk(state()) ->
772810
{ok, {chunk_type(),
773811
offset(),
@@ -778,66 +816,30 @@ writers(#?MODULE{mode = #write{writers = Writers}}) ->
778816
}, state()} |
779817
{end_of_stream, state()} |
780818
{error, {invalid_chunk_header, term()}}.
781-
read_chunk(#?MODULE{cfg = #cfg{directory = Dir},
819+
read_chunk(#?MODULE{cfg = #cfg{},
782820
mode = #read{last_offset = _Last,
783821
next_offset = Offs} = Read,
784-
current_file = CurFile,
785822
fd = Fd} = State) ->
786823
%% reads the next chunk of entries, parsed
787824
%% NB: this may return records before the requested index,
788825
%% that is fine - the reading process can do the appropriate filtering
789-
case can_read_next_offset(Read) of
790-
true ->
791-
case file:read(Fd, ?HEADER_SIZE_B) of
792-
{ok, <<?MAGIC:4/unsigned,
793-
?VERSION:4/unsigned,
794-
ChType:8/unsigned,
795-
_NumEntries:16/unsigned,
796-
NumRecords:32/unsigned,
797-
_Timestamp:64/signed,
798-
Epoch:64/unsigned,
799-
Offs:64/unsigned,
800-
Crc:32/integer,
801-
DataSize:32/unsigned,
802-
TrailerSize:32/unsigned>> = HeaderData} ->
803-
{ok, BlobData} = file:read(Fd, DataSize),
804-
%% position after trailer
805-
%% TODO: should we return trailer as well?
806-
{ok, TrailerData} = file:read(Fd, TrailerSize),
807-
validate_crc(Offs, Crc, BlobData),
808-
%% tracking data
809-
{ok, {ChType, Offs, Epoch, HeaderData, BlobData, TrailerData},
810-
State#?MODULE{mode = incr_next_offset(NumRecords, Read)}};
811-
{ok, _} ->
812-
%% set the position back for the next read
813-
{ok, _} = file:position(Fd, {cur, -?HEADER_SIZE_B}),
814-
{end_of_stream, State};
815-
eof ->
816-
%% open next segment file and start there if it exists
817-
SegFile = make_file_name(Offs, "segment"),
818-
case SegFile == CurFile of
819-
true ->
820-
%% the new filename is the same as the old one
821-
%% this should only really happen for an empty
822-
%% log but would cause an infinite loop if it does
823-
{end_of_stream, State};
824-
false ->
825-
case file:open(filename:join(Dir, SegFile),
826-
[raw, binary, read]) of
827-
{ok, Fd2} ->
828-
ok = file:close(Fd),
829-
{ok, _} = file:position(Fd2, ?LOG_HEADER_SIZE),
830-
read_chunk(State#?MODULE{current_file = SegFile,
831-
fd = Fd2});
832-
{error, enoent} ->
833-
{end_of_stream, State}
834-
end
835-
end;
836-
Invalid ->
837-
{error, {invalid_chunk_header, Invalid}}
838-
end;
839-
false ->
840-
{end_of_stream, State}
826+
case read_header0(State) of
827+
{ok, #{type := ChType,
828+
chunk_id := ChId,
829+
epoch := Epoch,
830+
crc := Crc,
831+
num_records := NumRecords,
832+
header_data := HeaderData,
833+
data_size := DataSize,
834+
trailer_size := TrailerSize}} ->
835+
{ok, BlobData} = file:read(Fd, DataSize),
836+
%% position after trailer
837+
{ok, TrailerData} = file:read(Fd, TrailerSize),
838+
validate_crc(Offs, Crc, BlobData),
839+
{ok, {ChType, ChId, Epoch, HeaderData, BlobData, TrailerData},
840+
State#?MODULE{mode = incr_next_offset(NumRecords, Read)}};
841+
Other ->
842+
Other
841843
end.
842844

843845
-spec read_chunk_parsed(state()) ->
@@ -1332,7 +1334,7 @@ write_chunk(Chunk, NewWriters, Timestamp, Epoch, NumRecords,
13321334
counters:add(CntRef, ?C_CHUNKS, 1),
13331335
Writers = maps:fold(
13341336
fun(K, V, Acc) ->
1335-
maps:put(K, {Timestamp, V}, Acc)
1337+
maps:put(K, {Next, Timestamp, V}, Acc)
13361338
end, Writers0, NewWriters),
13371339
case file:position(Fd, cur) of
13381340
{ok, After} when After >= MaxSize ->
@@ -1562,7 +1564,7 @@ recover_tracking(Fd, Trk, Wrt) ->
15621564
_NumRecords:32/unsigned,
15631565
Timestamp:64/signed,
15641566
_Epoch:64/unsigned,
1565-
_Next:64/unsigned,
1567+
ChunkId:64/unsigned,
15661568
_Crc:32/integer,
15671569
Size:32/unsigned,
15681570
TSize:32/unsigned>>} ->
@@ -1582,11 +1584,11 @@ recover_tracking(Fd, Trk, Wrt) ->
15821584
?CHNK_WRT_SNAPSHOT ->
15831585
{ok, <<0:1, S:31, Data:S/binary>>} = file:read(Fd, Size),
15841586
{ok, _} = file:read(Fd, TSize),
1585-
recover_tracking(Fd, Trk, parse_writers_snapshot(Data, #{}));
1587+
recover_tracking(Fd, Trk, parse_writers_snapshot(Data, ChunkId, #{}));
15861588
?CHNK_USER ->
15871589
{ok, _} = file:position(Fd, {cur, Size}),
15881590
{ok, TData} = file:read(Fd, TSize),
1589-
recover_tracking(Fd, Trk, parse_writers(TData, Timestamp, Wrt))
1591+
recover_tracking(Fd, Trk, parse_writers(TData, ChunkId, Timestamp, Wrt))
15901592
end;
15911593
eof ->
15921594
file:close(Fd),
@@ -1601,41 +1603,104 @@ parse_tracking(<<Size:8/unsigned,
16011603
Rem/binary>>, Acc) ->
16021604
parse_tracking(Rem, Acc#{Id => Offs}).
16031605

1604-
parse_writers(<<>>, _, Acc) ->
1606+
parse_writers(<<>>, _, _, Acc) ->
16051607
Acc;
16061608
parse_writers(<<Size:8/unsigned,
16071609
Id:Size/binary,
16081610
Seq:64/unsigned,
1609-
Rem/binary>>, Ts, Acc) ->
1610-
parse_writers(Rem, Ts, Acc#{Id => {Ts, Seq}}).
1611+
Rem/binary>>, ChunkId, Ts, Acc) ->
1612+
parse_writers(Rem, ChunkId, Ts, Acc#{Id => {ChunkId, Ts, Seq}}).
16111613

1612-
parse_writers_snapshot(<<>>, Acc) ->
1614+
parse_writers_snapshot(<<>>, _ChId, Acc) ->
16131615
Acc;
16141616
parse_writers_snapshot(<<Size:8/unsigned,
16151617
Id:Size/binary,
16161618
Ts:64/unsigned,
16171619
Seq:64/unsigned,
1618-
Rem/binary>>, Acc) ->
1619-
parse_writers_snapshot(Rem, Acc#{Id => {Ts, Seq}}).
1620+
Rem/binary>>, ChunkId, Acc) ->
1621+
parse_writers_snapshot(Rem, ChunkId, Acc#{Id => {ChunkId, Ts, Seq}}).
16201622

16211623
trim_writers(Max, Writers)
16221624
when map_size(Writers) =< Max ->
16231625
Writers;
16241626
trim_writers(Max, Writers) ->
16251627
%% remove oldest
16261628
{ToRemove, _} = maps:fold(
1627-
fun (K, {Ts, _}, {_, PrevTs} = Prev) ->
1629+
fun (K, {_ChId, Ts, _}, {_, PrevTs} = Prev) ->
16281630
case Ts < PrevTs of
16291631
true ->
16301632
{K, Ts};
16311633
false ->
16321634
Prev
16331635
end;
1634-
(K, {Ts, _}, undefined) ->
1636+
(K, {_ChId, Ts, _}, undefined) ->
16351637
{K, Ts}
16361638
end, undefined, Writers),
16371639
trim_writers(Max, maps:remove(ToRemove, Writers)).
16381640

1641+
read_header0(#?MODULE{cfg = #cfg{directory = Dir},
1642+
mode = #read{next_offset = NextChId} = Read,
1643+
current_file = CurFile,
1644+
fd = Fd} = State) ->
1645+
%% reads the next header if permitted
1646+
case can_read_next_offset(Read) of
1647+
true ->
1648+
case file:read(Fd, ?HEADER_SIZE_B) of
1649+
{ok, <<?MAGIC:4/unsigned,
1650+
?VERSION:4/unsigned,
1651+
ChType:8/unsigned,
1652+
NumEntries:16/unsigned,
1653+
NumRecords:32/unsigned,
1654+
Timestamp:64/signed,
1655+
Epoch:64/unsigned,
1656+
NextChId:64/unsigned,
1657+
Crc:32/integer,
1658+
DataSize:32/unsigned,
1659+
TrailerSize:32/unsigned>> = HeaderData} ->
1660+
{ok, #{chunk_id => NextChId,
1661+
epoch => Epoch,
1662+
type => ChType,
1663+
crc => Crc,
1664+
num_records => NumRecords,
1665+
num_entries => NumEntries,
1666+
timestamp => Timestamp,
1667+
data_size => DataSize,
1668+
trailer_size => TrailerSize,
1669+
header_data => HeaderData}};
1670+
{ok, _} ->
1671+
%% set the position back for the next read
1672+
%% TODO: should it be an exception if the next chunk is not
1673+
%% the expected next chunk id??
1674+
{ok, _} = file:position(Fd, {cur, -?HEADER_SIZE_B}),
1675+
{end_of_stream, State};
1676+
eof ->
1677+
%% open next segment file and start there if it exists
1678+
SegFile = make_file_name(NextChId, "segment"),
1679+
case SegFile == CurFile of
1680+
true ->
1681+
%% the new filename is the same as the old one
1682+
%% this should only really happen for an empty
1683+
%% log but would cause an infinite loop if it does
1684+
{end_of_stream, State};
1685+
false ->
1686+
case file:open(filename:join(Dir, SegFile),
1687+
[raw, binary, read]) of
1688+
{ok, Fd2} ->
1689+
ok = file:close(Fd),
1690+
{ok, _} = file:position(Fd2, ?LOG_HEADER_SIZE),
1691+
read_chunk(State#?MODULE{current_file = SegFile,
1692+
fd = Fd2});
1693+
{error, enoent} ->
1694+
{end_of_stream, State}
1695+
end
1696+
end;
1697+
Invalid ->
1698+
{error, {invalid_chunk_header, Invalid}}
1699+
end;
1700+
false ->
1701+
{end_of_stream, State}
1702+
end.
1703+
16391704
-ifdef(TEST).
16401705
-include_lib("eunit/include/eunit.hrl").
16411706

0 commit comments

Comments
 (0)