Skip to content

Cancel/Pause stream optimization #395

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Dec 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions include/aws/s3/private/s3_meta_request_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,9 @@ struct aws_s3_meta_request {
/* True if the finish result has been set. */
uint32_t finish_result_set : 1;

/* To track the aws_s3_request that are active from HTTP level */
struct aws_linked_list ongoing_http_requests_list;

} synced_data;

/* Anything in this structure should only ever be accessed by the client on its process work event loop task. */
Expand Down Expand Up @@ -359,6 +362,9 @@ void aws_s3_meta_request_add_event_for_delivery_synced(
* The meta-request's finish callback must not be invoked until this returns false. */
bool aws_s3_meta_request_are_events_out_for_delivery_synced(struct aws_s3_meta_request *meta_request);

/* Cancel the requests with ongoing HTTP activities for the meta request */
void aws_s3_meta_request_cancel_ongoing_http_requests_synced(struct aws_s3_meta_request *meta_request, int error_code);

/* Asynchronously read from the meta request's input stream. Should always be done outside of any mutex,
* as reading from the stream could cause user code to call back into aws-c-s3.
* This will fill the buffer to capacity, unless end of stream is reached.
Expand Down
9 changes: 9 additions & 0 deletions include/aws/s3/private/s3_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,15 @@ struct aws_s3_request {
/* Linked list node used for queuing. */
struct aws_linked_list_node node;

/* Linked list node used for tracking the request is active from HTTP level. */
struct aws_linked_list_node ongoing_http_requests_list_node;

/* The meta request lock must be held to access the data */
struct {
/* The underlying http stream, only valid when the request is active from HTTP level */
struct aws_http_stream *http_stream;
} synced_data;

/* TODO Ref count on the request is no longer needed--only one part of code should ever be holding onto a request,
* and we can just transfer ownership.*/
struct aws_ref_count ref_count;
Expand Down
2 changes: 2 additions & 0 deletions source/s3_auto_ranged_put.c
Original file line number Diff line number Diff line change
Expand Up @@ -1711,6 +1711,8 @@ static int s_s3_auto_ranged_put_pause(
*/
aws_s3_meta_request_set_fail_synced(meta_request, NULL, AWS_ERROR_S3_PAUSED);

aws_s3_meta_request_cancel_ongoing_http_requests_synced(meta_request, AWS_ERROR_S3_PAUSED);

/* unlock */
aws_s3_meta_request_unlock_synced_data(meta_request);

Expand Down
47 changes: 43 additions & 4 deletions source/s3_meta_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ int aws_s3_meta_request_init_base(
meta_request->type = options->type;
/* Set up reference count. */
aws_ref_count_init(&meta_request->ref_count, meta_request, s_s3_meta_request_destroy);
aws_linked_list_init(&meta_request->synced_data.ongoing_http_requests_list);

if (part_size == SIZE_MAX) {
aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
Expand Down Expand Up @@ -345,6 +346,7 @@ void aws_s3_meta_request_cancel(struct aws_s3_meta_request *meta_request) {
/* BEGIN CRITICAL SECTION */
aws_s3_meta_request_lock_synced_data(meta_request);
aws_s3_meta_request_set_fail_synced(meta_request, NULL, AWS_ERROR_S3_CANCELED);
aws_s3_meta_request_cancel_ongoing_http_requests_synced(meta_request, AWS_ERROR_S3_CANCELED);
aws_s3_meta_request_unlock_synced_data(meta_request);
/* END CRITICAL SECTION */
}
Expand Down Expand Up @@ -486,6 +488,8 @@ static void s_s3_meta_request_destroy(void *user_data) {
AWS_ASSERT(aws_array_list_length(&meta_request->io_threaded_data.event_delivery_array) == 0);
aws_array_list_clean_up(&meta_request->io_threaded_data.event_delivery_array);

AWS_ASSERT(aws_linked_list_empty(&meta_request->synced_data.ongoing_http_requests_list));

aws_s3_meta_request_result_clean_up(meta_request, &meta_request->synced_data.finish_result);

if (meta_request->vtable != NULL) {
Expand Down Expand Up @@ -1077,6 +1081,15 @@ void aws_s3_meta_request_send_request(struct aws_s3_meta_request *meta_request,
goto error_finish;
}

{
/* BEGIN CRITICAL SECTION */
aws_s3_meta_request_lock_synced_data(meta_request);
aws_linked_list_push_back(
&meta_request->synced_data.ongoing_http_requests_list, &request->ongoing_http_requests_list_node);
request->synced_data.http_stream = stream;
aws_s3_meta_request_unlock_synced_data(meta_request);
/* END CRITICAL SECTION */
}
return;

error_finish:
Expand Down Expand Up @@ -1366,9 +1379,20 @@ static void s_s3_meta_request_stream_complete(struct aws_http_stream *stream, in

struct aws_s3_connection *connection = user_data;
AWS_PRECONDITION(connection);
if (connection->request->meta_request->checksum_config.validate_response_checksum) {
struct aws_s3_request *request = connection->request;
struct aws_s3_meta_request *meta_request = request->meta_request;

if (meta_request->checksum_config.validate_response_checksum) {
s_get_response_part_finish_checksum_helper(connection, error_code);
}
if (error_code != AWS_ERROR_S3_CANCELED && error_code != AWS_ERROR_S3_PAUSED) {
/* BEGIN CRITICAL SECTION */
aws_s3_meta_request_lock_synced_data(meta_request);
AWS_ASSERT(request->synced_data.http_stream != NULL);
aws_linked_list_remove(&request->ongoing_http_requests_list_node);
aws_s3_meta_request_unlock_synced_data(meta_request);
/* END CRITICAL SECTION */
}
s_s3_meta_request_send_request_finish(connection, stream, error_code);
}

Expand Down Expand Up @@ -1644,6 +1668,21 @@ bool aws_s3_meta_request_are_events_out_for_delivery_synced(struct aws_s3_meta_r
meta_request->synced_data.event_delivery_active;
}

void aws_s3_meta_request_cancel_ongoing_http_requests_synced(struct aws_s3_meta_request *meta_request, int error_code) {
ASSERT_SYNCED_DATA_LOCK_HELD(meta_request);
while (!aws_linked_list_empty(&meta_request->synced_data.ongoing_http_requests_list)) {
struct aws_linked_list_node *request_node =
aws_linked_list_pop_front(&meta_request->synced_data.ongoing_http_requests_list);
struct aws_s3_request *request =
AWS_CONTAINER_OF(request_node, struct aws_s3_request, ongoing_http_requests_list_node);
if (!request->always_send) {
/* Cancel the ongoing http stream, unless it's always send. */
aws_http_stream_cancel(request->synced_data.http_stream, error_code);
}
request->synced_data.http_stream = NULL;
}
}

/* Deliver events in event_delivery_array.
* This task runs on the meta-request's io_event_loop thread. */
static void s_s3_meta_request_event_delivery_task(struct aws_task *task, void *arg, enum aws_task_status task_status) {
Expand Down Expand Up @@ -1887,9 +1926,9 @@ void aws_s3_meta_request_finish_default(struct aws_s3_meta_request *meta_request
finish_result.error_code,
aws_error_str(finish_result.error_code));

/* As the meta request has been finished with any HTTP message, we can safely release the http message that hold. So
* that, the downstream high level language doesn't need to wait for shutdown to clean related resource (eg: input
* stream) */
/* As the meta request has been finished with any HTTP message, we can safely release the http message that
* hold. So that, the downstream high level language doesn't need to wait for shutdown to clean related resource
* (eg: input stream) */
meta_request->request_body_async_stream = aws_async_input_stream_release(meta_request->request_body_async_stream);
meta_request->request_body_parallel_stream =
aws_parallel_input_stream_release(meta_request->request_body_parallel_stream);
Expand Down
2 changes: 2 additions & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ add_net_test_case(test_s3_cancel_mpu_create_completed)
add_net_test_case(test_s3_cancel_mpu_one_part_completed)
add_net_test_case(test_s3_cancel_mpu_one_part_completed_async)
add_net_test_case(test_s3_cancel_mpu_all_parts_completed)
add_net_test_case(test_s3_cancel_mpu_ongoing_http_requests)
add_net_test_case(test_s3_pause_mpu_ongoing_http_requests)
add_net_test_case(test_s3_cancel_mpd_nothing_sent)
add_net_test_case(test_s3_cancel_mpd_one_part_sent)
add_net_test_case(test_s3_cancel_mpd_one_part_completed)
Expand Down
126 changes: 99 additions & 27 deletions tests/s3_cancel_tests.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@
#include <aws/testing/aws_test_harness.h>

enum s3_update_cancel_type {
S3_UPDATE_CANCEL_TYPE_NO_CANCEL,

S3_UPDATE_CANCEL_TYPE_MPU_CREATE_NOT_SENT,
S3_UPDATE_CANCEL_TYPE_MPU_CREATE_COMPLETED,
S3_UPDATE_CANCEL_TYPE_MPU_ONE_PART_COMPLETED,
S3_UPDATE_CANCEL_TYPE_MPU_ALL_PARTS_COMPLETED,
S3_UPDATE_CANCEL_TYPE_MPU_ONGOING_HTTP_REQUESTS,
S3_UPDATE_CANCEL_TYPE_NUM_MPU_CANCEL_TYPES,

S3_UPDATE_CANCEL_TYPE_MPD_NOTHING_SENT,
Expand All @@ -31,6 +34,8 @@ enum s3_update_cancel_type {

struct s3_cancel_test_user_data {
enum s3_update_cancel_type type;
bool pause;
struct aws_s3_meta_request_resume_token *resume_token;
bool abort_successful;
};

Expand All @@ -48,74 +53,84 @@ static bool s_s3_meta_request_update_cancel_test(
struct aws_s3_auto_ranged_put *auto_ranged_put = meta_request->impl;
struct aws_s3_auto_ranged_get *auto_ranged_get = meta_request->impl;

bool call_cancel = false;
bool call_cancel_or_pause = false;
bool block_update = false;

aws_s3_meta_request_lock_synced_data(meta_request);

switch (cancel_test_user_data->type) {
case S3_UPDATE_CANCEL_TYPE_NO_CANCEL:
break;

case S3_UPDATE_CANCEL_TYPE_MPU_CREATE_NOT_SENT:
call_cancel = auto_ranged_put->synced_data.create_multipart_upload_sent != 0;
call_cancel_or_pause = auto_ranged_put->synced_data.create_multipart_upload_sent != 0;
break;
case S3_UPDATE_CANCEL_TYPE_MPU_CREATE_COMPLETED:
call_cancel = auto_ranged_put->synced_data.create_multipart_upload_completed != 0;
call_cancel_or_pause = auto_ranged_put->synced_data.create_multipart_upload_completed != 0;
break;
case S3_UPDATE_CANCEL_TYPE_MPU_ONE_PART_COMPLETED:
call_cancel = auto_ranged_put->synced_data.num_parts_completed == 1;
block_update = !call_cancel && auto_ranged_put->synced_data.num_parts_started == 1;
call_cancel_or_pause = auto_ranged_put->synced_data.num_parts_completed == 1;
block_update = !call_cancel_or_pause && auto_ranged_put->synced_data.num_parts_started == 1;
break;
case S3_UPDATE_CANCEL_TYPE_MPU_ALL_PARTS_COMPLETED:
call_cancel = auto_ranged_put->synced_data.num_parts_completed ==
auto_ranged_put->total_num_parts_from_content_length;
call_cancel_or_pause = auto_ranged_put->synced_data.num_parts_completed ==
auto_ranged_put->total_num_parts_from_content_length;
break;

case S3_UPDATE_CANCEL_TYPE_MPU_ONGOING_HTTP_REQUESTS:
call_cancel_or_pause = !aws_linked_list_empty(&meta_request->synced_data.ongoing_http_requests_list);
break;

case S3_UPDATE_CANCEL_TYPE_NUM_MPU_CANCEL_TYPES:
AWS_ASSERT(false);
break;

case S3_UPDATE_CANCEL_TYPE_MPD_NOTHING_SENT:
call_cancel = auto_ranged_get->synced_data.num_parts_requested == 0;
call_cancel_or_pause = auto_ranged_get->synced_data.num_parts_requested == 0;
break;

case S3_UPDATE_CANCEL_TYPE_MPD_HEAD_OBJECT_SENT:
call_cancel = auto_ranged_get->synced_data.head_object_sent != 0;
call_cancel_or_pause = auto_ranged_get->synced_data.head_object_sent != 0;
break;

case S3_UPDATE_CANCEL_TYPE_MPD_HEAD_OBJECT_COMPLETED:
call_cancel = auto_ranged_get->synced_data.head_object_completed != 0;
call_cancel_or_pause = auto_ranged_get->synced_data.head_object_completed != 0;
break;

case S3_UPDATE_CANCEL_TYPE_MPD_GET_WITHOUT_RANGE_SENT:
call_cancel = auto_ranged_get->synced_data.get_without_range_sent != 0;
call_cancel_or_pause = auto_ranged_get->synced_data.get_without_range_sent != 0;
break;

case S3_UPDATE_CANCEL_TYPE_MPD_GET_WITHOUT_RANGE_COMPLETED:
call_cancel = auto_ranged_get->synced_data.get_without_range_completed != 0;
call_cancel_or_pause = auto_ranged_get->synced_data.get_without_range_completed != 0;
break;

case S3_UPDATE_CANCEL_TYPE_MPD_ONE_PART_SENT:
call_cancel = auto_ranged_get->synced_data.num_parts_requested == 1;
call_cancel_or_pause = auto_ranged_get->synced_data.num_parts_requested == 1;
break;

case S3_UPDATE_CANCEL_TYPE_MPD_ONE_PART_COMPLETED:
call_cancel = auto_ranged_get->synced_data.num_parts_completed == 1;
call_cancel_or_pause = auto_ranged_get->synced_data.num_parts_completed == 1;

/* Prevent other parts from being queued while we wait for this one to complete. */
block_update = !call_cancel && auto_ranged_get->synced_data.num_parts_requested == 1;
block_update = !call_cancel_or_pause && auto_ranged_get->synced_data.num_parts_requested == 1;
break;

case S3_UPDATE_CANCEL_TYPE_MPD_TWO_PARTS_COMPLETED:
call_cancel = auto_ranged_get->synced_data.num_parts_completed == 2;
call_cancel_or_pause = auto_ranged_get->synced_data.num_parts_completed == 2;

/* Prevent other parts from being queued while we wait for these two to complete. */
block_update = !call_cancel && auto_ranged_get->synced_data.num_parts_requested == 2;
block_update = !call_cancel_or_pause && auto_ranged_get->synced_data.num_parts_requested == 2;
break;
}

aws_s3_meta_request_unlock_synced_data(meta_request);

if (call_cancel) {
aws_s3_meta_request_cancel(meta_request);
if (call_cancel_or_pause) {
if (cancel_test_user_data->pause) {
aws_s3_meta_request_pause(meta_request, &cancel_test_user_data->resume_token);
} else {
aws_s3_meta_request_cancel(meta_request);
}
}

if (block_update) {
Expand Down Expand Up @@ -175,7 +190,8 @@ static struct aws_s3_meta_request *s_meta_request_factory_patch_update_cancel_te
static int s3_cancel_test_helper_ex(
struct aws_allocator *allocator,
enum s3_update_cancel_type cancel_type,
bool async_input_stream) {
bool async_input_stream,
bool pause) {

AWS_ASSERT(allocator);

Expand All @@ -184,6 +200,7 @@ static int s3_cancel_test_helper_ex(

struct s3_cancel_test_user_data test_user_data = {
.type = cancel_type,
.pause = pause,
};

tester.user_data = &test_user_data;
Expand Down Expand Up @@ -221,13 +238,49 @@ static int s3_cancel_test_helper_ex(
};

ASSERT_SUCCESS(aws_s3_tester_send_meta_request_with_options(&tester, &options, &meta_request_test_results));
ASSERT_INT_EQUALS(AWS_ERROR_S3_CANCELED, meta_request_test_results.finished_error_code);
int expected_error_code = pause ? AWS_ERROR_S3_PAUSED : AWS_ERROR_S3_CANCELED;
ASSERT_INT_EQUALS(expected_error_code, meta_request_test_results.finished_error_code);

if (cancel_type == S3_UPDATE_CANCEL_TYPE_MPU_ONGOING_HTTP_REQUESTS) {
/* Check the metric and see we have at least a request completed with AWS_ERROR_S3_CANCELED */
/* The meta request completed, we can access the synced data now. */
struct aws_array_list *metrics_list = &meta_request_test_results.synced_data.metrics;
bool cancelled_successfully = false;
for (size_t i = 0; i < aws_array_list_length(metrics_list); ++i) {
struct aws_s3_request_metrics *metrics = NULL;
aws_array_list_get_at(metrics_list, (void **)&metrics, i);
if (metrics->crt_info_metrics.error_code == expected_error_code) {
cancelled_successfully = true;
break;
}
}
ASSERT_TRUE(cancelled_successfully);
}

aws_s3_meta_request_test_results_clean_up(&meta_request_test_results);

if (cancel_type != S3_UPDATE_CANCEL_TYPE_MPU_CREATE_NOT_SENT) {
if (cancel_type != S3_UPDATE_CANCEL_TYPE_MPU_CREATE_NOT_SENT && !pause) {
ASSERT_TRUE(test_user_data.abort_successful);
}
if (pause) {
/* Resume the paused request. */
ASSERT_NOT_NULL(test_user_data.resume_token);
test_user_data.type = S3_UPDATE_CANCEL_TYPE_NO_CANCEL;
struct aws_s3_tester_meta_request_options resume_options = {
.allocator = allocator,
.client = client,
.meta_request_type = AWS_S3_META_REQUEST_TYPE_PUT_OBJECT,
.validate_type = AWS_S3_TESTER_VALIDATE_TYPE_EXPECT_SUCCESS,
.put_options =
{
.ensure_multipart = true,
.async_input_stream = async_input_stream,
.resume_token = test_user_data.resume_token,
},
};

ASSERT_SUCCESS(aws_s3_tester_send_meta_request_with_options(&tester, &resume_options, NULL));
aws_s3_meta_request_resume_token_release(test_user_data.resume_token);
}

/* TODO: perform additional verification with list-multipart-uploads */

Expand Down Expand Up @@ -284,7 +337,7 @@ static int s3_cancel_test_helper_ex(
}

static int s3_cancel_test_helper(struct aws_allocator *allocator, enum s3_update_cancel_type cancel_type) {
return s3_cancel_test_helper_ex(allocator, cancel_type, false /*async_input_stream*/);
return s3_cancel_test_helper_ex(allocator, cancel_type, false /*async_input_stream*/, false /*pause*/);
}

static int s3_cancel_test_helper_fc(
Expand Down Expand Up @@ -459,8 +512,8 @@ AWS_TEST_CASE(test_s3_cancel_mpu_one_part_completed_async, s_test_s3_cancel_mpu_
static int s_test_s3_cancel_mpu_one_part_completed_async(struct aws_allocator *allocator, void *ctx) {
(void)ctx;

ASSERT_SUCCESS(
s3_cancel_test_helper_ex(allocator, S3_UPDATE_CANCEL_TYPE_MPU_ONE_PART_COMPLETED, true /*async_input_stream*/));
ASSERT_SUCCESS(s3_cancel_test_helper_ex(
allocator, S3_UPDATE_CANCEL_TYPE_MPU_ONE_PART_COMPLETED, true /*async_input_stream*/, false /*pause*/));

return 0;
}
Expand All @@ -474,6 +527,25 @@ static int s_test_s3_cancel_mpu_all_parts_completed(struct aws_allocator *alloca
return 0;
}

AWS_TEST_CASE(test_s3_cancel_mpu_ongoing_http_requests, s_test_s3_cancel_mpu_ongoing_http_requests)
static int s_test_s3_cancel_mpu_ongoing_http_requests(struct aws_allocator *allocator, void *ctx) {
(void)ctx;

ASSERT_SUCCESS(s3_cancel_test_helper(allocator, S3_UPDATE_CANCEL_TYPE_MPU_ONGOING_HTTP_REQUESTS));

return 0;
}

AWS_TEST_CASE(test_s3_pause_mpu_ongoing_http_requests, s_test_s3_pause_mpu_ongoing_http_requests)
static int s_test_s3_pause_mpu_ongoing_http_requests(struct aws_allocator *allocator, void *ctx) {
(void)ctx;

ASSERT_SUCCESS(s3_cancel_test_helper_ex(
allocator, S3_UPDATE_CANCEL_TYPE_MPU_ONGOING_HTTP_REQUESTS, false /*async_input_stream*/, true /*pause*/));

return 0;
}

AWS_TEST_CASE(test_s3_cancel_mpd_nothing_sent, s_test_s3_cancel_mpd_nothing_sent)
static int s_test_s3_cancel_mpd_nothing_sent(struct aws_allocator *allocator, void *ctx) {
(void)ctx;
Expand Down