Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 20 additions & 12 deletions src/riak_cs_copy_object.erl
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ start_put_fsm(Bucket, Key, ContentLength, ContentType, Metadata, Acl, RcPid) ->

%% @doc check "x-amz-copy-source" to know whether it requests copying
%% from another object
-spec get_copy_source(#wm_reqdata{}) -> undefined | {binary(), binary()} |
-spec get_copy_source(#wm_reqdata{}) -> undefined |
{binary(), binary()} |
{error, atom()}.
get_copy_source(RD) ->
%% for oos (TODO)
Expand All @@ -159,21 +160,28 @@ get_copy_source(RD) ->

handle_copy_source(undefined) ->
undefined;
handle_copy_source([$/, $/ | _Path]) ->
{error, bad_request};
handle_copy_source([$/|Path]) ->
handle_copy_source(Path);
handle_copy_source(Path0) when is_list(Path0) ->
Path = mochiweb_util:unquote(Path0),

Offset = string:chr(Path, $/),
Bucket = string:substr(Path, 1, Offset-1),
SplitKey = string:substr(Path, Offset+1),

case SplitKey of
[] ->
{error, invalid_x_copy_from_path};
_ ->
{iolist_to_binary(Bucket),
iolist_to_binary(SplitKey)}
Length = length(Path),
case string:chr(Path, $/) of
0 ->
{error, bad_request};
Length ->
{error, bad_request};
Offset ->
Bucket = string:substr(Path, 1, Offset-1),
SplitKey = string:substr(Path, Offset+1),
case SplitKey of
[] ->
{error, bad_request};
_ ->
{iolist_to_binary(Bucket),
iolist_to_binary(SplitKey)}
end
end.

%% @doc runs copy
Expand Down
17 changes: 0 additions & 17 deletions src/riak_cs_mp_utils.erl
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@
list_parts/5, list_parts/6,
make_content_types_accepted/2,
make_content_types_accepted/3,
make_special_error/1,
make_special_error/4,
upload_part/6, upload_part/7,
upload_part_1blob/2,
upload_part_finished/7, upload_part_finished/8,
Expand Down Expand Up @@ -168,21 +166,6 @@ make_content_types_accepted(CT, RD, Ctx=#context{local_context=LocalCtx0}, Callb
Ctx}
end.

make_special_error(Error) ->
make_special_error(Error, Error, "request-id", "host-id").

make_special_error(Code, Message, RequestId, HostId) ->
XmlDoc = {'Error',
[
{'Code', [Code]},
{'Message', [Message]},
{'RequestId', [RequestId]},
{'HostId', [HostId]}
]
},
riak_cs_xml:to_xml([XmlDoc]).


list_multipart_uploads(Bucket, Caller, Opts) ->
list_multipart_uploads(Bucket, Caller, Opts, nopid).

Expand Down
11 changes: 6 additions & 5 deletions src/riak_cs_stats.erl
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,12 @@ duration_metrics() ->
[object_acl, get],
[object_acl, put],

[multipart, post], % Initiate
[multipart_upload, put], % Upload Part (Copy)
[multipart_upload, post], % Complete
[multipart_upload, delete], % Abort
[multipart_upload, get], % List Parts
[multipart, post], % Initiate
[multipart_upload, put], % Upload Part
[multipart_upload, put_copy], % Upload Part (Copy)
[multipart_upload, post], % Complete
[multipart_upload, delete], % Abort
[multipart_upload, get], % List Parts

[velvet, create_user],
[velvet, update_user],
Expand Down
161 changes: 78 additions & 83 deletions src/riak_cs_wm_object_upload_part.erl
Original file line number Diff line number Diff line change
Expand Up @@ -203,82 +203,83 @@ parse_body(Body0) ->
bad
end.

-spec accept_body(#wm_reqdata{}, #context{}) -> {{halt, integer()}, #wm_reqdata{}, #context{}}.
accept_body(RD, Ctx0=#context{local_context=LocalCtx0,
response_module=ResponseMod,
riak_client=RcPid}) ->
#key_context{bucket=DstBucket,
key=Key,
size=Size,
get_fsm_pid=GetFsmPid} = LocalCtx0,
catch riak_cs_get_fsm:stop(GetFsmPid),
BlockSize = riak_cs_lfs_utils:block_size(),
Caller = riak_cs_user:to_3tuple(Ctx0#context.user),
-spec accept_body(#wm_reqdata{}, #context{}) ->
{{halt, integer()}, #wm_reqdata{}, #context{}}.
accept_body(RD, #context{local_context=LocalCtx0} = Ctx0) ->
catch riak_cs_get_fsm:stop(LocalCtx0#key_context.get_fsm_pid),

DstKey = list_to_binary(Key),

{SrcManifest, ExactSize, ReadRcPid} =
%% checking existence of "x-amz-copy-source"
case riak_cs_copy_object:get_copy_source(RD) of
undefined ->
%% normal upload, use size claimed via HTTP request
{undefined, Size, undefined};
{SrcBucket0, SrcKey0} ->
%% case copy, use size from copy source manifest
{ok, ReadRcPid0} = riak_cs_riak_client:checkout(),
{ok, SrcManifest0} = riak_cs_manifest:fetch(ReadRcPid0, SrcBucket0, SrcKey0),
{Start,End} = riak_cs_copy_object:copy_range(RD, SrcManifest0),
{SrcManifest0, End - Start + 1, ReadRcPid0}
end,

case ExactSize =< riak_cs_lfs_utils:max_content_len() of
false ->
ResponseMod:api_error(entity_too_large, RD, Ctx0);
try
{t, {ok, UploadId}} =
{t, riak_cs_utils:safe_base64url_decode(
re:replace(wrq:path(RD), ".*/uploads/", "", [{return, binary}]))},
{t, {ok, PartNumber}} =
{t, riak_cs_utils:safe_list_to_integer(wrq:get_qs_value("partNumber", RD))},

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are these {t, {ok, ... bindings for?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Who knows 🙉

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I have to be serious :)

{t,_} is matched in catch clause right below to respond 400. There might be other error patterns because there was {t3,_} at https://github.com/basho/riak_cs/pull/1214/files#diff-d617e0da6709d2bb3eabf098f392f035L277.
Anyway, these diff lines are just by refactoring and not essential for this PR/commit.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah that makes sense.

LocalCtx = LocalCtx0#key_context{upload_id=UploadId,
part_number=PartNumber},
Ctx = Ctx0#context{local_context=LocalCtx},
validate_copy_header(RD, Ctx)
catch
error:{badmatch, {t, _}} ->
{{halt, 400}, RD, Ctx0}
end.

true ->
validate_copy_header(RD, #context{response_module=ResponseMod,
local_context=LocalCtx} = Ctx) ->
case riak_cs_copy_object:get_copy_source(RD) of
{error, Reason} ->
riak_cs_s3_response:api_error(Reason, RD, Ctx);
undefined ->
validate_part_size(RD, Ctx, LocalCtx#key_context.size,
undefined, undefined);
{SrcBucket, SrcKey} ->
{ok, ReadRcPid} = riak_cs_riak_client:checkout(),
try
{t, {ok, UploadId}} =
{t, riak_cs_utils:safe_base64url_decode(re:replace(wrq:path(RD), ".*/uploads/", "", [{return, binary}]))},
{t, {ok, PartNumber}} =
{t, riak_cs_utils:safe_list_to_integer(wrq:get_qs_value("partNumber", RD))},

case riak_cs_mp_utils:upload_part(DstBucket, Key, UploadId, PartNumber,
ExactSize, Caller, RcPid) of
{upload_part_ready, PartUUID, PutPid} ->
LocalCtx = LocalCtx0#key_context{upload_id=UploadId,
part_number=PartNumber,
part_uuid=PartUUID},
Ctx = Ctx0#context{local_context=LocalCtx},

case riak_cs_copy_object:get_copy_source(RD) of
undefined ->
%% Normal upload part
accept_streambody(RD, Ctx, PutPid,
wrq:stream_req_body(RD, BlockSize));
{error, Reason} ->
riak_cs_s3_response:api_error(Reason, RD, Ctx);

{_SrcBucket, _SrcKey} -> %% they're already in SrcManifest
%% upload part by copy
try
maybe_copy_part(PutPid, DstBucket, DstKey, SrcManifest,
ReadRcPid, RD, Ctx)
after
riak_cs_riak_client:checkin(ReadRcPid)
end
end;
case riak_cs_manifest:fetch(ReadRcPid, SrcBucket, SrcKey) of
{error, notfound} ->
riak_cs_s3_response:no_such_upload_response(UploadId, RD, Ctx0);
{error, Reason} ->
riak_cs_s3_response:api_error(Reason, RD, Ctx0)
ResponseMod:api_error(no_copy_source_key, RD, Ctx);
{ok, SrcManifest} ->
{Start,End} = riak_cs_copy_object:copy_range(RD, SrcManifest),
validate_part_size(RD,
Ctx#context{stats_key=[multipart_upload, put_copy]},
End - Start + 1,
SrcManifest, ReadRcPid)
end
catch
error:{badmatch, {t, _}} ->
{{halt, 400}, RD, Ctx0};
error:{badmatch, {t3, _}} ->
XErrT3 = riak_cs_mp_utils:make_special_error("InvalidDigest"),
RDT3 = wrq:set_resp_body(XErrT3, RD),
{{halt, 400}, RDT3, Ctx0}
after
riak_cs_riak_client:checkin(ReadRcPid)
end
end.

validate_part_size(RD, #context{response_module=ResponseMod} = Ctx,
ExactSize, SrcManifest, ReadRcPid) ->
case ExactSize =< riak_cs_lfs_utils:max_content_len() of
false ->
ResponseMod:api_error(entity_too_large, RD, Ctx);
true ->
prepare_part_upload(RD, Ctx, ExactSize, SrcManifest, ReadRcPid)
end.

prepare_part_upload(RD, #context{riak_client=RcPid,
local_context=LocalCtx0} = Ctx0,
ExactSize, SrcManifest, ReadRcPid) ->
#key_context{bucket=DstBucket, key=Key,
upload_id=UploadId, part_number=PartNumber} = LocalCtx0,
Caller = riak_cs_user:to_3tuple(Ctx0#context.user),
case riak_cs_mp_utils:upload_part(DstBucket, Key, UploadId, PartNumber,
ExactSize, Caller, RcPid) of
{error, notfound} ->
riak_cs_s3_response:no_such_upload_response(UploadId, RD, Ctx0);
{error, Reason} ->
riak_cs_s3_response:api_error(Reason, RD, Ctx0);
{upload_part_ready, PartUUID, PutPid} ->
LocalCtx = LocalCtx0#key_context{part_uuid=PartUUID},
Ctx = Ctx0#context{local_context=LocalCtx},
case SrcManifest of
undefined ->
BlockSize = riak_cs_lfs_utils:block_size(),
accept_streambody(RD, Ctx, PutPid,
wrq:stream_req_body(RD, BlockSize));
_ ->
maybe_copy_part(PutPid, SrcManifest, ReadRcPid, RD, Ctx)
end
end.

Expand Down Expand Up @@ -385,38 +386,34 @@ finalize_request(RD, Ctx=#context{local_context=LocalCtx,
riak_cs_s3_response:api_error(Reason1, RD, Ctx)
end.

-spec maybe_copy_part(pid(), binary(), binary(), lfs_manifest(), riak_client(),
-spec maybe_copy_part(pid(), lfs_manifest(), riak_client(),
#wm_reqdata{}, #context{}) ->
{{halt, integer()}, #wm_reqdata{}, #context{}}.
maybe_copy_part(PutPid,
DstBucket, DstKey,
?MANIFEST{bkey={SrcBucket, SrcKey}} = SrcManifest,
ReadRcPid,
RD, #context{riak_client=RcPid,
local_context=LocalCtx,
user=User} = Ctx) ->

#key_context{upload_id=UploadId,
#key_context{bucket=DstBucket, key=Key,
upload_id=UploadId,
part_number=PartNumber,
part_uuid=PartUUID} = LocalCtx,
DstKey = list_to_binary(Key),
Caller = riak_cs_user:to_3tuple(User),

case riak_cs_copy_object:test_condition_and_permission(ReadRcPid, SrcManifest, RD, Ctx) of
{false, _, _} ->

%% start copying
_ = lager:debug("copying! > ~s ~s => ~s ~s via ~p",
_ = lager:debug("Start copying! > ~s ~s => ~s ~s via ~p",
[SrcBucket, SrcKey, DstBucket, DstKey, ReadRcPid]),

Range = riak_cs_copy_object:copy_range(RD, SrcManifest),

%% Prepare for connection loss or client close
FDWatcher = riak_cs_copy_object:connection_checker((RD#wm_reqdata.wm_state)#wm_reqstate.socket),

Range = riak_cs_copy_object:copy_range(RD, SrcManifest),
%% This ain't fail because all permission and 404
%% possibility has been already checked.
case riak_cs_copy_object:copy(PutPid, SrcManifest, ReadRcPid, FDWatcher, Range) of

{ok, DstManifest} ->
case riak_cs_mp_utils:upload_part_finished(
DstBucket, DstKey, UploadId, PartNumber, PartUUID,
Expand All @@ -429,7 +426,6 @@ maybe_copy_part(PutPid,
{error, Reason0} ->
riak_cs_s3_response:api_error(Reason0, RD, Ctx)
end;

{error, Reason} ->
riak_cs_s3_response:api_error(Reason, RD, Ctx)
end;
Expand All @@ -439,7 +435,6 @@ maybe_copy_part(PutPid,
%% TODO: check the return value
_ = lager:debug("access to source object denied (~s, ~s)", [SrcBucket, SrcKey]),
{{halt, 403}, RD, Ctx};

Error ->
_ = lager:debug("unknown error: ~p", [Error]),
%% ResponseMod:api_error(Error, RD, Ctx#context{local_context=LocalCtx})
Expand Down