Skip to content

Commit f103082

Browse files
authored
Merge pull request #19 from rabbitmq/various
Move child stop/start/delete to supervisor
2 parents 446e2f3 + d1a0d22 commit f103082

File tree

8 files changed

+170
-155
lines changed

8 files changed

+170
-155
lines changed

src/osiris.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,8 @@
7070
{error, term(), config()}.
7171
start_cluster(Config00 = #{name := Name}) ->
7272
true = osiris_util:validate_base64uri(Name),
73-
Config0 =
74-
Config00#{external_ref => maps:get(reference, Config00, Name)},
73+
%% ensure reference is set
74+
Config0 = maps:merge(#{reference => Name}, Config00),
7575
case osiris_writer:start(Config0) of
7676
{ok, Pid} ->
7777
Config = Config0#{leader_pid => Pid},

src/osiris.hrl

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,11 @@
77

88
%% logging shim
99
-define(DEBUG(Fmt, Args), ?DISPATCH_LOG(debug, Fmt, Args)).
10+
-define(DEBUG_IF(Fmt, Args, Bool),
11+
if Bool ->
12+
?DISPATCH_LOG(debug, Fmt, Args);
13+
true -> ok
14+
end).
1015
-define(INFO(Fmt, Args), ?DISPATCH_LOG(info, Fmt, Args)).
1116
-define(NOTICE(Fmt, Args), ?DISPATCH_LOG(notice, Fmt, Args)).
1217
-define(WARN(Fmt, Args), ?DISPATCH_LOG(warning, Fmt, Args)).
@@ -40,3 +45,4 @@
4045
-define(CHNK_TRK_SNAPSHOT, 2).
4146
-define(CHNK_WRT_SNAPSHOT, 3).
4247

48+
-define(SUP, osiris_server_sup).

src/osiris_log.erl

Lines changed: 59 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -312,7 +312,7 @@
312312
#{dir := file:filename(),
313313
epoch => non_neg_integer(),
314314
max_segment_size => non_neg_integer(),
315-
counter_spec => {Tag :: atom(), Fields :: [atom()]}}.
315+
counter_spec => {Tag :: term(), Fields :: [atom()]}}.
316316
-type record() :: {offset(), iodata()}.
317317
-type offset_spec() :: osiris:offset_spec().
318318
-type retention_spec() :: osiris:retention_spec().
@@ -365,7 +365,8 @@
365365
{epoch :: epoch(),
366366
timestamp :: non_neg_integer(),
367367
id :: offset(),
368-
num :: non_neg_integer()}).
368+
num :: non_neg_integer()
369+
}).
369370
-record(seg_info,
370371
{file :: file:filename(),
371372
size = 0 :: non_neg_integer(),
@@ -381,15 +382,14 @@
381382

382383
% record/0,
383384

384-
-spec directory(osiris:config()) -> file:filename().
385-
directory(#{name := Name} = Config) ->
386-
Dir = case Config of
387-
#{dir := D} ->
388-
D;
389-
_ ->
390-
{ok, D} = application:get_env(osiris, data_dir),
391-
D
392-
end,
385+
-spec directory(osiris:config() | list()) -> file:filename().
386+
directory(#{name := Name, dir := Dir}) ->
387+
filename:join(Dir, Name);
388+
directory(#{name := Name}) ->
389+
{ok, Dir} = application:get_env(osiris, data_dir),
390+
filename:join(Dir, Name);
391+
directory(Name) when is_list(Name) ->
392+
{ok, Dir} = application:get_env(osiris, data_dir),
393393
filename:join(Dir, Name).
394394

395395
-spec init(config()) -> state().
@@ -439,6 +439,7 @@ init(#{dir := Dir,
439439
[#seg_info{file = Filename,
440440
index = IdxFilename,
441441
first = #chunk_info{id = FstChId},
442+
size = Size,
442443
last =
443444
#chunk_info{epoch = E,
444445
id = ChId,
@@ -463,10 +464,13 @@ init(#{dir := Dir,
463464
element(1, TailInfo)]),
464465
{ok, Fd} = open(Filename, ?FILE_OPTS_WRITE),
465466
{ok, IdxFd} = open(IdxFilename, ?FILE_OPTS_WRITE),
466-
%% recover tracking info
467-
{Tracking, Writers} = recover_tracking(Filename),
468-
{ok, _} = file:position(Fd, eof),
467+
{ok, Size} = file:position(Fd, Size),
468+
ok = file:truncate(Fd),
469469
{ok, _} = file:position(IdxFd, eof),
470+
%% recover tracking info
471+
{Tracking, Writers} = recover_tracking(Fd),
472+
{ok, Size} = file:position(Fd, Size),
473+
%% truncate segment to size in case there is trailing data
470474
#?MODULE{cfg = Cfg,
471475
mode =
472476
#write{type = WriterType,
@@ -483,6 +487,8 @@ init(#{dir := Dir,
483487
%% the empty log case
484488
{ok, Fd} = open(Filename, ?FILE_OPTS_WRITE),
485489
{ok, IdxFd} = open(IdxFilename, ?FILE_OPTS_WRITE),
490+
%% TODO: do we potentially need to truncate the segment
491+
%% here too?
486492
{ok, _} = file:position(Fd, eof),
487493
{ok, _} = file:position(IdxFd, eof),
488494
#?MODULE{cfg = Cfg,
@@ -1109,13 +1115,17 @@ read_chunk_parsed(#?MODULE{mode = #read{type = RType}} = State0) ->
11091115
end.
11101116

11111117
-spec send_file(gen_tcp:socket(), state()) ->
1112-
{ok, state()} | {end_of_stream, state()}.
1118+
{ok, state()} |
1119+
{error, term()} |
1120+
{end_of_stream, state()}.
11131121
send_file(Sock, State) ->
11141122
send_file(Sock, State, fun(_, S) -> S end).
11151123

11161124
-spec send_file(gen_tcp:socket(), state(),
11171125
fun((header_map(), non_neg_integer()) -> term())) ->
1118-
{ok, state()} | {end_of_stream, state()}.
1126+
{ok, state()} |
1127+
{error, term()} |
1128+
{end_of_stream, state()}.
11191129
send_file(Sock,
11201130
#?MODULE{cfg = #cfg{}, mode = #read{type = RType}} = State0,
11211131
Callback) ->
@@ -1175,8 +1185,10 @@ close(#?MODULE{cfg = #cfg{counter_id = CntId}, fd = Fd}) ->
11751185
osiris_counters:delete(CntId)
11761186
end.
11771187

1178-
delete_directory(Config) ->
1179-
Dir = directory(Config),
1188+
delete_directory(#{name := Name} = Config) when is_map(Config) ->
1189+
delete_directory(Name);
1190+
delete_directory(Name) when is_list(Name) ->
1191+
Dir = directory(Name),
11801192
case file:list_dir(Dir) of
11811193
{ok, Files} ->
11821194
[ok =
@@ -1369,9 +1381,15 @@ build_segment_info(SegFile, LastChunkPos, IdxFile, Acc0) ->
13691381
LastTs:64/signed,
13701382
LastEpoch:64/unsigned,
13711383
LastChId:64/unsigned,
1372-
_/binary>>} =
1384+
_LastCrc:32/integer,
1385+
LastSize:32/unsigned,
1386+
LastTSize:32/unsigned>>} =
13731387
file:read(Fd, ?HEADER_SIZE_B),
1374-
{ok, Size} = file:position(Fd, eof),
1388+
Size = LastChunkPos + LastSize + LastTSize + ?HEADER_SIZE_B,
1389+
{ok, Eof} = file:position(Fd, eof),
1390+
?DEBUG_IF("~s: segment ~s has trailing data ~w ~w",
1391+
[?MODULE, filename:basename(SegFile),
1392+
Size, Eof], Size =/= Eof),
13751393
_ = file:close(Fd),
13761394
[#seg_info{file = SegFile,
13771395
index = IdxFile,
@@ -1562,7 +1580,8 @@ make_chunk(Blobs, Writers, ChType, Timestamp, Epoch, Next) ->
15621580
Next:64/unsigned,
15631581
Crc:32/integer,
15641582
Size:32/unsigned,
1565-
TSize:32/unsigned>>,
1583+
TSize:32/unsigned
1584+
>>,
15661585
EData, TData],
15671586
NumRecords}.
15681587

@@ -1827,12 +1846,12 @@ part(Len, [B | L]) when Len > 0 ->
18271846
[binary:part(B, {0, Len})]
18281847
end.
18291848

1830-
recover_tracking(File) ->
1849+
recover_tracking(Fd) ->
18311850
%% TODO: if the first chunk in the segment isn't a tracking snapshot and
18321851
%% there are prior segments we could scan at least two segments increasing
18331852
%% the chance of encountering a snapshot and thus ensure we don't miss any
18341853
%% tracking entries
1835-
{ok, Fd} = file:open(File, [read, binary, raw]),
1854+
{ok, 0} = file:position(Fd, 0),
18361855
{ok, ?LOG_HEADER_SIZE} = file:position(Fd, ?LOG_HEADER_SIZE),
18371856
recover_tracking(Fd, #{}, #{}).
18381857

@@ -1879,7 +1898,6 @@ recover_tracking(Fd, Trk, Wrt) ->
18791898
Wrt))
18801899
end;
18811900
eof ->
1882-
file:close(Fd),
18831901
{Trk, Wrt}
18841902
end.
18851903

@@ -1999,7 +2017,24 @@ read_header0(#?MODULE{cfg = #cfg{directory = Dir},
19992017
{end_of_stream, State}
20002018
end
20012019
end;
2020+
{ok,
2021+
<<?MAGIC:4/unsigned,
2022+
?VERSION:4/unsigned,
2023+
_ChType:8/unsigned,
2024+
_NumEntries:16/unsigned,
2025+
_NumRecords:32/unsigned,
2026+
_Timestamp:64/signed,
2027+
_Epoch:64/unsigned,
2028+
UnexpectedChId:64/unsigned,
2029+
_Crc:32/integer,
2030+
_DataSize:32/unsigned,
2031+
_TrailerSize:32/unsigned>>} ->
2032+
%% TODO: we may need to return the new state here if
2033+
%% we've crossed segments
2034+
{ok, Pos} = file:position(Fd, Pos),
2035+
{error, {unexpected_chunk_id, UnexpectedChId, NextChId}};
20022036
Invalid ->
2037+
_ = file:position(Fd, Pos),
20032038
{error, {invalid_chunk_header, Invalid}}
20042039
end;
20052040
false ->

0 commit comments

Comments
 (0)