1
1
use super :: Blob ;
2
2
use failure:: Error ;
3
- use futures_util:: {
4
- future:: FutureExt ,
5
- stream:: { FuturesUnordered , StreamExt } ,
6
- } ;
3
+ use futures:: Future ;
7
4
use log:: { error, warn} ;
8
5
use rusoto_core:: region:: Region ;
9
6
use rusoto_credential:: DefaultCredentialsProvider ;
10
7
use rusoto_s3:: { GetObjectRequest , PutObjectRequest , S3Client , S3 } ;
11
8
use std:: convert:: TryInto ;
12
9
use std:: io:: Read ;
13
10
use time:: Timespec ;
14
- use tokio:: runtime:: { Handle , Runtime } ;
11
+ use tokio:: runtime:: Runtime ;
15
12
16
13
#[ cfg( test) ]
17
14
mod test;
@@ -35,23 +32,15 @@ impl<'a> S3Backend<'a> {
35
32
}
36
33
}
37
34
38
- #[ cfg( test) ]
39
- pub ( crate ) fn with_runtime ( client : S3Client , bucket : & ' a str , runtime : Runtime ) -> Self {
40
- Self {
41
- client,
42
- bucket,
43
- runtime,
44
- }
45
- }
46
-
47
- pub ( super ) fn get ( & mut self , path : & str ) -> Result < Blob , Error > {
35
+ pub ( super ) fn get ( & self , path : & str ) -> Result < Blob , Error > {
48
36
let res = self
49
- . runtime
50
- . block_on ( self . client . get_object ( GetObjectRequest {
37
+ . client
38
+ . get_object ( GetObjectRequest {
51
39
bucket : self . bucket . to_string ( ) ,
52
40
key : path. into ( ) ,
53
41
..Default :: default ( )
54
- } ) ) ?;
42
+ } )
43
+ . sync ( ) ?;
55
44
56
45
let mut b = res. body . unwrap ( ) . into_blocking_read ( ) ;
57
46
let mut content = Vec :: with_capacity (
@@ -71,16 +60,14 @@ impl<'a> S3Backend<'a> {
71
60
} )
72
61
}
73
62
74
- pub ( super ) fn store_batch ( & mut self , mut uploads : Vec < Blob > ) -> Result < ( ) , Error > {
63
+ pub ( super ) fn store_batch ( & mut self , batch : & [ Blob ] ) -> Result < ( ) , Error > {
64
+ use futures:: stream:: FuturesUnordered ;
65
+ use futures:: stream:: Stream ;
75
66
let mut attempts = 0 ;
76
67
77
68
loop {
78
- // `FuturesUnordered` is used because the order of execution doesn't
79
- // matter, we just want things to execute as fast as possible
80
- let futures = FuturesUnordered :: new ( ) ;
81
-
82
- // Drain uploads, filling `futures` with upload requests
83
- for blob in uploads. drain ( ..) {
69
+ let mut futures = FuturesUnordered :: new ( ) ;
70
+ for blob in batch {
84
71
futures. push (
85
72
self . client
86
73
. put_object ( PutObjectRequest {
@@ -90,53 +77,27 @@ impl<'a> S3Backend<'a> {
90
77
content_type : Some ( blob. mime . clone ( ) ) ,
91
78
..Default :: default ( )
92
79
} )
93
- // Drop the value returned by `put_object` because we don't need it,
94
- // emit an error and replace the error values with the blob that failed
95
- // to upload so that we can retry failed uploads
96
- . map ( |resp| match resp {
97
- Ok ( ..) => {
98
- // Increment the total uploaded files when a file is uploaded
99
- crate :: web:: metrics:: UPLOADED_FILES_TOTAL . inc_by ( 1 ) ;
100
-
101
- Ok ( ( ) )
102
- }
103
- Err ( err) => {
104
- error ! ( "failed to upload file to s3: {:?}" , err) ;
105
- Err ( blob)
106
- }
80
+ . inspect ( |_| {
81
+ crate :: web:: metrics:: UPLOADED_FILES_TOTAL . inc_by ( 1 ) ;
107
82
} ) ,
108
83
) ;
109
84
}
110
85
attempts += 1 ;
111
86
112
- // Collect all the failed uploads so that we can retry them
113
- uploads = self . runtime . block_on (
114
- futures
115
- . filter_map ( |resp| async move { resp. err ( ) } )
116
- . collect ( ) ,
117
- ) ;
118
-
119
- // If there are no further uploads we were successful and can return
120
- if uploads. is_empty ( ) {
121
- break ;
122
-
123
- // If more than three attempts to upload fail, return an error
124
- } else if attempts >= 3 {
125
- error ! ( "failed to upload to s3, abandoning" ) ;
126
- failure:: bail!( "Failed to upload to s3 three times, abandoning" ) ;
87
+ match self . runtime . block_on ( futures. map ( drop) . collect ( ) ) {
88
+ // this batch was successful, start another batch if there are still more files
89
+ Ok ( _) => break ,
90
+ Err ( err) => {
91
+ error ! ( "failed to upload to s3: {:?}" , err) ;
92
+ // if a futures error occurs, retry the batch
93
+ if attempts > 2 {
94
+ panic ! ( "failed to upload 3 times, exiting" ) ;
95
+ }
96
+ }
127
97
}
128
98
}
129
-
130
99
Ok ( ( ) )
131
100
}
132
-
133
- pub fn runtime_handle ( & self ) -> Handle {
134
- self . runtime . handle ( ) . clone ( )
135
- }
136
-
137
- pub fn client ( & self ) -> & S3Client {
138
- & self . client
139
- }
140
101
}
141
102
142
103
fn parse_timespec ( raw : & str ) -> Result < Timespec , Error > {
@@ -150,15 +111,13 @@ pub(crate) fn s3_client() -> Option<S3Client> {
150
111
if std:: env:: var_os ( "AWS_ACCESS_KEY_ID" ) . is_none ( ) && std:: env:: var_os ( "FORCE_S3" ) . is_none ( ) {
151
112
return None ;
152
113
}
153
-
154
114
let creds = match DefaultCredentialsProvider :: new ( ) {
155
115
Ok ( creds) => creds,
156
116
Err ( err) => {
157
117
warn ! ( "failed to retrieve AWS credentials: {}" , err) ;
158
118
return None ;
159
119
}
160
120
} ;
161
-
162
121
Some ( S3Client :: new_with (
163
122
rusoto_core:: request:: HttpClient :: new ( ) . unwrap ( ) ,
164
123
creds,
@@ -176,6 +135,7 @@ pub(crate) fn s3_client() -> Option<S3Client> {
176
135
pub ( crate ) mod tests {
177
136
use super :: * ;
178
137
use crate :: test:: * ;
138
+ use std:: slice;
179
139
180
140
#[ test]
181
141
fn test_parse_timespec ( ) {
@@ -205,7 +165,7 @@ pub(crate) mod tests {
205
165
206
166
// Add a test file to the database
207
167
let s3 = env. s3 ( ) ;
208
- s3. upload ( vec ! [ blob . clone ( ) ] ) . unwrap ( ) ;
168
+ s3. upload ( slice :: from_ref ( & blob ) ) . unwrap ( ) ;
209
169
210
170
// Test that the proper file was returned
211
171
s3. assert_blob ( & blob, "dir/foo.txt" ) ;
@@ -227,10 +187,8 @@ pub(crate) mod tests {
227
187
"b" ,
228
188
"a_very_long_file_name_that_has_an.extension" ,
229
189
"parent/child" ,
230
- "h/i/
231
- g/h/l/y/_/n/e/s/t/e/d/_/d/i/r/e/c/t/o/r/i/e/s" ,
190
+ "h/i/g/h/l/y/_/n/e/s/t/e/d/_/d/i/r/e/c/t/o/r/i/e/s" ,
232
191
] ;
233
-
234
192
let blobs: Vec < _ > = names
235
193
. iter ( )
236
194
. map ( |& path| Blob {
@@ -240,12 +198,10 @@ pub(crate) mod tests {
240
198
content : "Hello world!" . into ( ) ,
241
199
} )
242
200
. collect ( ) ;
243
-
244
- s3. upload ( blobs. clone ( ) ) . unwrap ( ) ;
201
+ s3. upload ( & blobs) . unwrap ( ) ;
245
202
for blob in & blobs {
246
203
s3. assert_blob ( blob, & blob. path ) ;
247
204
}
248
-
249
205
Ok ( ( ) )
250
206
} )
251
207
}
0 commit comments