Skip to content
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 114 additions & 59 deletions src/dev_delegated_compute.erl
Original file line number Diff line number Diff line change
Expand Up @@ -22,42 +22,24 @@ normalize(Msg1, _Msg2, Opts) ->
%% `POST /compute' and the body is the JSON-encoded message that we want to
%% evaluate.
compute(Msg1, Msg2, Opts) ->
RawProcessID = dev_process:process_id(Msg1, #{}, Opts),
OutputPrefix = dev_stack:prefix(Msg1, Msg2, Opts),
ProcessID =
case RawProcessID of
not_found -> hb_ao:get(<<"process-id">>, Msg2, Opts);
ProcID -> ProcID
end,
Res = do_compute(ProcessID, Msg2, Opts),
case Res of
{ok, JSONRes} ->
?event(
{compute_lite_res,
{process_id, ProcessID},
{slot, hb_ao:get(<<"slot">>, Msg2, Opts)},
{json_res, {string, JSONRes}},
{req, Msg2}
}
),
{ok, Msg} = dev_json_iface:json_to_message(JSONRes, Opts),
{ok,
hb_ao:set(
Msg1,
#{
<<OutputPrefix/binary, "/results">> => Msg,
<<OutputPrefix/binary, "/results/json">> =>
#{
<<"content-type">> => <<"application/json">>,
<<"body">> => JSONRes
}
},
Opts
)
};
{error, Error} ->
{error, Error}
end.
% Extract the process ID - this identifies which process to run compute
% against.
ProcessID = get_process_id(Msg1, Msg2, Opts),
% If request is an assignment, we will compute the result
% Otherwise, it is a dryrun
Type = hb_ao:get(<<"type">>, Msg2, not_found, Opts),
?event({doing_delegated_compute, {msg2, Msg2}, {type, Type}}),
% Execute the compute via external CU
case Type of
<<"assignment">> ->
Slot = hb_ao:get(<<"slot">>, Msg2, Opts),
Res = do_compute(ProcessID, Msg2, Opts);
_ ->
Slot = dryrun,
Res = do_dryrun(ProcessID, Msg2, Opts)
end,
handle_relay_response(Msg1, Msg2, Opts, Res, OutputPrefix, ProcessID, Slot).

%% @doc Execute computation on a remote machine via relay and the JSON-Iface.
do_compute(ProcID, Msg2, Opts) ->
Expand All @@ -72,43 +54,116 @@ do_compute(ProcID, Msg2, Opts) ->
false,
Opts
),
?event({do_compute_msg, {aos2, {string, Body}}}),
Res =
hb_ao:resolve(
#{
<<"device">> => <<"[email protected]">>,
<<"content-type">> => <<"application/json">>
},
AOS2#{
<<"path">> => <<"call">>,
<<"relay-method">> => <<"POST">>,
<<"relay-body">> => Body,
<<"relay-path">> =>
<<
"/result/",
(hb_util:bin(Slot))/binary,
"?process-id=",
ProcID/binary
>>,
<<"content-type">> => <<"application/json">>
},
?event({do_compute_body, {aos2, {string, Body}}}),
% Send to external CU via relay using /result endpoint
Response =
do_relay(
<<"POST">>,
<<"/result/", (hb_util:bin(Slot))/binary, "?process-id=", ProcID/binary>>,
Body,
AOS2,
Opts#{
hashpath => ignore,
cache_control => [<<"no-store">>, <<"no-cache">>]
}
),
case Res of
{ok, Response} ->
JSONRes = hb_ao:get(<<"body">>, Response, Opts),
extract_json_res(Response, Opts).

%% @doc Execute dry-run computation on a remote machine via relay and use
%% the JSON-Iface to decode the response.
do_dryrun(ProcID, Msg2, Opts) ->
?event({do_dryrun_msg, {req, Msg2}}),
% Remove commitments from the message before sending to the external CU
Body =
hb_json:encode(
dev_json_iface:message_to_json_struct(
hb_maps:without([<<"commitments">>], Msg2, Opts),
Opts
)
),
?event({do_dryrun_body, {string, Body}}),
% Send to external CU via relay using /dry-run endpoint
Response = do_relay(
<<"POST">>,
<<"/dry-run?process-id=", ProcID/binary>>,
Body,
#{},
Opts#{
hashpath => ignore,
cache_control => [<<"no-store">>, <<"no-cache">>]
}
),
extract_json_res(Response, Opts).

do_relay(Method, Path, Body, AOS2, Opts) ->
hb_ao:resolve(
#{
<<"device">> => <<"[email protected]">>,
<<"content-type">> => <<"application/json">>
},
AOS2#{
<<"path">> => <<"call">>,
<<"relay-method">> => Method,
<<"relay-body">> => Body,
<<"relay-path">> => Path,
<<"content-type">> => <<"application/json">>
},
Opts
).

%% @doc Extract the JSON response from the delegated compute response.
extract_json_res(Response, Opts) ->
case Response of
{ok, Res} ->
JSONRes = hb_ao:get(<<"body">>, Res, Opts),
?event({
delegated_compute_res_metadata,
{req, hb_maps:without([<<"body">>], Response, Opts)}
{req, hb_maps:without([<<"body">>], Res, Opts)}
Comment on lines +115 to +121
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry this is a nitpick Jack. I love the way you've refactored it, extracting the code into functions.
I would only keep the previous names/terminology (Res for Results and Response for the second element of the ok tuple).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I am not sure I understand - which variable should be Res / Response? The second element of the ok tuple?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Res (for Result) having the whole tuple and Response for the element of the tuple 🙏

}),
{ok, JSONRes};
{Err, Error} when Err == error; Err == failure ->
{error, Error}
end.

get_process_id(Msg1, Msg2, Opts) ->
RawProcessID = dev_process:process_id(Msg1, #{}, Opts),
case RawProcessID of
not_found -> hb_ao:get(<<"process-id">>, Msg2, Opts);
ProcID -> ProcID
end.
Comment on lines +129 to +133
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is about idioms but the maybe expression, which is enabled by default in OTP 27+, is very handy for handling a single case:

maybe not_found ?= dev_process:process_id(Msg1, #{}, Opts),
    hb_ao:get(<<"process-id">>, Msg2, Opts)
end.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will take a transition on erlang idiomatic expressions but the team can have multiple benefits while adopting it. Analysing for this case (and we can extrapolate to many other ones), it helps making clear right away somes points:

  • prevents giving two names for the same thing: RawProcessID and ProcID
  • that hb_ao:get does not use the output of dev_process:process_id
  • subsequent function calls will be made only if all previous conditions are met. This is especially helpful for multiple nested handling.
  • as a consequence of the previous point, engineers interested only the case when the process_id is found don't need to look at any other line of code.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@twilson63 This might be out of scope for now, but it would be great if we could discuss adopting (or not) this idiom with the team and Sam at a later time, given the benefits mentioned above.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, we use maybe in other places but I think we format the syntax differently - but maybe is something we have been using so if it fits here, that would be a good change, @jfrain99 I would look at other code examples of maybe to get the formatting correct.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could not find any other cases of maybe being used in the codebase - since we are trying to get this across the line, I think now is not the best time to introduce it. It definitely seems like something that should be worked in in the future, though. Thanks!


%% @doc Handle the response from the delegated compute server. Assumes that the
%% response is in AOS2-style format, decoding with the JSON-Iface.
handle_relay_response(Msg1, Msg2, Opts, Response, OutputPrefix, ProcessID, Slot) ->
case Response of
{ok, JSONRes} ->
?event(
{compute_lite_res,
{process_id, ProcessID},
{slot, Slot},
{json_res, {string, JSONRes}},
{req, Msg2}
}
),
{ok, Msg} = dev_json_iface:json_to_message(JSONRes, Opts),
{ok,
hb_ao:set(
Msg1,
#{
<<OutputPrefix/binary, "/results">> => Msg,
<<OutputPrefix/binary, "/results/json">> =>
#{
<<"content-type">> => <<"application/json">>,
<<"body">> => JSONRes
}
},
Opts
)
};
{error, Error} ->
{error, Error}
end.

%% @doc Generate a snapshot of a running computation by calling the
%% `GET /snapshot' endpoint.
snapshot(Msg, Msg2, Opts) ->
Expand Down
Loading