Skip to content

Commit dc0cd66

Browse files
committed
Fixing issues #6, #7, and #8. Adding concurrent uploads of parts.
1 parent ba31035 commit dc0cd66

File tree

4 files changed

+229
-69
lines changed

4 files changed

+229
-69
lines changed

CHANGELOG.md

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
Changelog
2+
=========
3+
4+
### 0.5.0 (2014-08-11)
5+
6+
* Added client caching to reuse an existing s3 client rather than creating a new one for each upload. Fixes #6
7+
* Updated the maxPartSize to be a hard limit instead of a soft one so that generated ETAG are consistent to to the reliable size of the uploaded parts. Fixes #7
8+
* Added this file. Fixes #8
9+
* New feature: concurrent part uploads. Now you can optionally enable concurrent part uploads if you wish to allow your application to drain the source stream more quickly and absorb some of the bottle neck when uploading to S3.
10+
11+
### 0.4.0 (2014-06-23)
12+
13+
* Now with better error handling. If an error occurs while uploading a part to S3, or completing a multipart upload then the in progress multipart upload will be aborted (to delete the uploaded parts from S3) and a more descriptive error message will be emitted instead of the raw error response from S3.
14+
15+
### 0.3.0 (2014-05-06)
16+
17+
* Added tests using a stubbed out version of the Amazon S3 client. These tests will ensure that the upload stream behaves properly, calls S3 correctly, and emits the proper events.
18+
* Added Travis integration
19+
* Also fixed bug with the functionality to dynamically adjust the part size.
20+
21+
### 0.2.0 (2014-04-25)
22+
23+
* Fixed a race condition bug that occured occasionally with streams very close to the 5 MB size threshold where the multipart upload would be finalized on S3 prior to the last data buffer being flushed, resulting in the last part of the stream being cut off in the resulting S3 file. (Notice: If you are using an older version of this module I highly recommend upgrading to get this latest bugfix.)
24+
* Added a method for adjusting the part size dynamically.
25+
26+
### 0.1.0 (2014-04-17)
27+
28+
* Code cleanups and stylistic goodness.
29+
* Made the connection parameters optional for those who are following Amazon's best practices of allowing the SDK to get AWS credentials from environment variables or AMI roles.
30+
31+
### 0.0.3 (2013-12-25)
32+
33+
* Merge for pull request #2 to fix an issue where the latest version of the AWS SDK required a strict type on part number.
34+
35+
### 0.0.2 (2013-08-01)
36+
37+
* Improving the documentation
38+
39+
### 0.0.1 (2013-07-31)
40+
41+
* Initial release

README.md

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,14 @@ A pipeable write stream which uploads to Amazon S3 using the multipart file uplo
66

77
### Changelog
88

9-
_June 23, 2014_ - Now with better error handling. If an error occurs while uploading a part to S3, or completing a multipart upload then the in progress multipart upload will be aborted (to delete the uploaded parts from S3) and a more descriptive error message will be emitted instead of the raw error response from S3.
9+
## 0.5.0 (2014-08-11)
1010

11-
_May 6, 2014_ - Added tests using a stubbed out version of the Amazon S3 client. These tests will ensure that the upload stream behaves properly, calls S3 correctly, and emits the proper events. Also fixed bug with the functionality to dynamically adjust the part size.
11+
* Added client caching to reuse an existing s3 client rather than creating a new one for each upload. Fixes #6
12+
* Updated the maxPartSize to be a hard limit instead of a soft one so that generated ETAG's are consistent due to the reliable size of the uploaded parts. Fixes #7
13+
* Added a changelog.md file. Fixes #8
14+
* New feature: concurrent part uploads. Now you can optionally enable concurrent part uploads if you wish to allow your application to drain the source stream more quickly and absorb some of the backpressure from a fast incoming stream when uploading to S3.
1215

13-
_April 25, 2014_ - Fixed a race condition bug that occured occasionally with streams very close to the 5 MB size threshold where the multipart upload would be finalized on S3 prior to the last data buffer being flushed, resulting in the last part of the stream being cut off in the resulting S3 file. Also added a method for adjusting the part size dynamically. (__Notice:__ If you are using an older version of this module I highly recommend upgrading to get this latest bugfix.)
14-
15-
_April 17, 2014_ - Made the connection parameters optional for those who are following Amazon's best practices of allowing the SDK to get AWS credentials from environment variables or AMI roles.
16+
[Historical Changelogs](CHANGELOG.md)
1617

1718
### Why use this stream?
1819

@@ -146,6 +147,31 @@ var UploadStreamObject = new Uploader(
146147
);
147148
```
148149

150+
### stream.concurrentParts(numberOfParts)
151+
152+
Used to adjust the number of parts that are concurrently uploaded to S3. By default this is just one at a time, to keep memory usage low and allow the upstream to deal with backpressure. However, in some cases you may wish to drain the stream that you are piping in quickly, and then issue concurrent upload requests to upload multiple parts.
153+
154+
Keep in mind that total memory usage will be at least `maxPartSize` * `concurrentParts` as each concurrent part will be `maxPartSize` large, so it is not recommended that you set both `maxPartSize` and `concurrentParts` to high values, or your process will be buffering large amounts of data in its memory.
155+
156+
```js
157+
var UploadStreamObject = new Uploader(
158+
{
159+
"Bucket": "your-bucket-name",
160+
"Key": "uploaded-file-name " + new Date()
161+
},
162+
function (err, uploadStream)
163+
{
164+
uploadStream.concurrentParts(5)
165+
166+
uploadStream.on('uploaded', function (data) {
167+
console.log('done');
168+
});
169+
170+
read.pipe(uploadStream);
171+
}
172+
);
173+
```
174+
149175
### Tuning configuration of the AWS SDK
150176

151177
The following configuration tuning can help prevent errors when using less reliable internet connections (such as 3G data if you are using Node.js on the Tessel) by causing the AWS SDK to detect upload timeouts and retry.

lib/s3-upload-stream.js

Lines changed: 156 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,34 @@
11
var Writable = require('stream').Writable,
2+
util = require("util"),
3+
EventEmitter = require("events").EventEmitter,
24
AWS = require('aws-sdk');
35

6+
var cachedClient;
7+
48
module.exports = {
9+
setClient: function (client) {
10+
cachedClient = client;
11+
},
12+
513
// Generate a writeable stream which uploads to a file on S3.
614
Uploader: function (connection, destinationDetails, doneCreatingUploadStream) {
715
var self = this;
816

9-
if (arguments.length == 2){
17+
if (arguments.length == 2) {
1018
// No connection passed in, assume that the connection details were already specified using
1119
// environment variables as documented at http://docs.aws.amazon.com/AWSJavaScriptSDK/guide/node-configuring.html
1220
doneCreatingUploadStream = destinationDetails;
1321
destinationDetails = connection;
14-
self.s3Client = new AWS.S3();
22+
if (cachedClient)
23+
self.s3Client = cachedClient;
24+
else
25+
self.s3Client = new AWS.S3();
1526
}
1627
else {
1728
// The user already configured an S3 client that they want the stream to use.
1829
if (typeof connection.s3Client != 'undefined')
1930
self.s3Client = connection.s3Client;
20-
else {
31+
else if (connection.accessKeyId && connection.secretAccessKey) {
2132
// The user hardcodes their credentials into their app
2233
self.s3Client = new AWS.S3({
2334
apiVersion: 'latest',
@@ -26,19 +37,33 @@ module.exports = {
2637
region: connection.region
2738
});
2839
}
40+
else if (cachedClient) {
41+
self.s3Client = cachedClient;
42+
}
43+
else {
44+
throw "Unable to find an interface for connecting to S3";
45+
}
2946
}
3047

3148
// Create the writeable stream interface.
3249
self.ws = Writable({
3350
highWaterMark: 4194304 // 4 MB
3451
});
3552

53+
// Data pertaining to the overall upload
3654
self.partNumber = 1;
37-
self.parts = [];
55+
self.partIds = [];
3856
self.receivedSize = 0;
3957
self.uploadedSize = 0;
40-
self.currentPart = Buffer(0);
41-
self.partSizeThreshold = 5242880;
58+
59+
// Parts which need to be uploaded to S3.
60+
self.pendingParts = 0;
61+
self.concurrentPartThreshold = 1;
62+
63+
// Data pertaining to buffers we have received
64+
self.receivedBuffers = [];
65+
self.receivedBuffersLength = 0;
66+
self.partSizeThreshold = 6242880;
4267

4368
// Set the maximum amount of data that we will keep in memory before flushing it to S3 as a part
4469
// of the multipart upload
@@ -49,31 +74,138 @@ module.exports = {
4974
self.partSizeThreshold = partSize;
5075
};
5176

77+
// Set the maximum amount of data that we will keep in memory before flushing it to S3 as a part
78+
// of the multipart upload
79+
self.concurrentParts = function (parts) {
80+
if (parts < 1)
81+
parts = 1;
82+
83+
self.concurrentPartThreshold = parts;
84+
};
85+
5286
// Handler to receive data and upload it to S3.
53-
self.ws._write = function (Part, enc, next) {
54-
self.currentPart = Buffer.concat([self.currentPart, Part]);
87+
self.ws._write = function (incomingBuffer, enc, next) {
88+
self.absorbBuffer(incomingBuffer);
5589

56-
// If the current Part buffer is getting too large, or the stream piped in has ended then flush
57-
// the Part buffer downstream to S3 via the multipart upload API.
58-
if (self.currentPart.length > self.partSizeThreshold)
59-
self.flushPart(next);
60-
else
90+
if (self.receivedBuffersLength < self.partSizeThreshold)
91+
return next(); // Ready to receive more data in _write.
92+
93+
// We need to upload some data
94+
self.uploadHandler(next);
95+
};
96+
97+
self.uploadHandler = function (next) {
98+
if (self.pendingParts < self.concurrentPartThreshold) {
99+
// We need to upload some of the data we've received
100+
upload();
101+
}
102+
else {
103+
// Block uploading (and receiving of more data) until we upload
104+
// some of the pending parts
105+
self.once('chunk', upload);
106+
}
107+
108+
function upload() {
109+
self.pendingParts++;
110+
self.flushPart(function (partDetails) {
111+
--self.pendingParts;
112+
self.emit('chunk'); // Internal event
113+
self.ws.emit('chunk', partDetails); // External event
114+
});
61115
next();
116+
}
117+
};
118+
119+
// Absorb an incoming buffer from _write into a buffer queue
120+
self.absorbBuffer = function (incomingBuffer) {
121+
self.receivedBuffers.push(incomingBuffer);
122+
self.receivedBuffersLength += incomingBuffer.length;
123+
};
124+
125+
// Take a list of received buffers and return a combined buffer that is exactly
126+
// self.partSizeThreshold in size.
127+
self.preparePartBuffer = function () {
128+
// Combine the buffers we've received and reset the list of buffers.
129+
var combinedBuffer = Buffer.concat(self.receivedBuffers, self.receivedBufferLength);
130+
self.receivedBuffers.length = 0; // Trick to reset the array while keeping the original reference
131+
self.receivedBuffersLength = 0;
132+
133+
if (combinedBuffer.length > self.partSizeThreshold) {
134+
// The combined buffer is too big, so slice off the end and put it back in the array.
135+
var remainder = new Buffer(combinedBuffer.length - self.partSizeThreshold);
136+
combinedBuffer.copy(remainder, 0, self.partSizeThreshold);
137+
self.receivedBuffers.push(remainder);
138+
self.receivedBuffersLength = remainder.length;
139+
140+
// Return the original buffer.
141+
return combinedBuffer.slice(0, self.partSizeThreshold);
142+
}
143+
else {
144+
// It just happened to be perfectly sized, so return it.
145+
return combinedBuffer;
146+
}
147+
};
148+
149+
// Flush a part out to S3.
150+
self.flushPart = function (callback) {
151+
var partBuffer = self.preparePartBuffer();
152+
153+
var localPartNumber = self.partNumber;
154+
self.partNumber++;
155+
self.receivedSize += partBuffer.length;
156+
self.s3Client.uploadPart(
157+
{
158+
Body: partBuffer,
159+
Bucket: destinationDetails.Bucket,
160+
Key: destinationDetails.Key,
161+
UploadId: self.multipartUploadID,
162+
PartNumber: localPartNumber
163+
},
164+
function (err, result) {
165+
if (err)
166+
self.abortUpload('Failed to upload a part to S3: ' + JSON.stringify(err));
167+
else {
168+
self.uploadedSize += partBuffer.length;
169+
self.partIds[localPartNumber - 1] = {
170+
ETag: result.ETag,
171+
PartNumber: localPartNumber
172+
};
173+
174+
callback({
175+
ETag: result.ETag,
176+
PartNumber: localPartNumber,
177+
receivedSize: self.receivedSize,
178+
uploadedSize: self.uploadedSize
179+
});
180+
}
181+
}
182+
);
62183
};
63184

64185
// Overwrite the end method so that we can hijack it to flush the last part and then complete
65186
// the multipart upload
66187
self.ws.originalEnd = self.ws.end;
67188
self.ws.end = function (Part, encoding, callback) {
68189
self.ws.originalEnd(Part, encoding, function afterDoneWithOriginalEnd() {
69-
if (self.currentPart.length > 0) {
70-
//Check to see if a last ending write might have added another part that we will need o flush.
71-
self.flushPart(function () {
72-
self.completeUpload();
73-
});
74-
}
75-
else
190+
if (Part)
191+
self.absorbBuffer(Part);
192+
193+
// Upload any remaining data
194+
var uploadRemainingData = function () {
195+
if (self.receivedBuffersLength > 0) {
196+
self.uploadHandler(uploadRemainingData);
197+
return;
198+
}
199+
200+
if (self.pendingParts > 0) {
201+
setTimeout(uploadRemainingData, 50); // Wait 50 ms for the pending uploads to finish before trying again.
202+
return;
203+
}
204+
76205
self.completeUpload();
206+
};
207+
208+
uploadRemainingData();
77209

78210
if (typeof callback == 'function')
79211
callback();
@@ -88,7 +220,7 @@ module.exports = {
88220
Key: destinationDetails.Key,
89221
UploadId: self.multipartUploadID,
90222
MultipartUpload: {
91-
Parts: self.parts
223+
Parts: self.partIds
92224
}
93225
},
94226
function (err, result) {
@@ -98,7 +230,7 @@ module.exports = {
98230
self.ws.emit('uploaded', result);
99231
}
100232
);
101-
},
233+
};
102234

103235
// When a fatal error occurs abort the multipart upload
104236
self.abortUpload = function (rootError) {
@@ -115,47 +247,6 @@ module.exports = {
115247
self.ws.emit('error', rootError);
116248
}
117249
);
118-
},
119-
120-
// Flush a single part down the line to S3.
121-
self.flushPart = function (callback) {
122-
var uploadingPart = Buffer(self.currentPart.length);
123-
self.currentPart.copy(uploadingPart);
124-
125-
var localPartNumber = self.partNumber;
126-
self.partNumber++;
127-
self.receivedSize += uploadingPart.length;
128-
self.s3Client.uploadPart(
129-
{
130-
Body: uploadingPart,
131-
Bucket: destinationDetails.Bucket,
132-
Key: destinationDetails.Key,
133-
UploadId: self.multipartUploadID,
134-
PartNumber: localPartNumber
135-
},
136-
function (err, result) {
137-
if (err)
138-
self.abortUpload('Failed to upload a part to S3: ' + JSON.stringify(err));
139-
else {
140-
self.uploadedSize += uploadingPart.length;
141-
self.parts[localPartNumber - 1] = {
142-
ETag: result.ETag,
143-
PartNumber: localPartNumber
144-
};
145-
146-
self.ws.emit('chunk', {
147-
ETag: result.ETag,
148-
PartNumber: localPartNumber,
149-
receivedSize: self.receivedSize,
150-
uploadedSize: self.uploadedSize
151-
});
152-
}
153-
154-
if (typeof callback == 'function')
155-
callback();
156-
}
157-
);
158-
self.currentPart = Buffer(0);
159250
};
160251

161252
// Use the S3 client to initialize a multipart upload to S3.
@@ -174,3 +265,5 @@ module.exports = {
174265
);
175266
}
176267
};
268+
269+
util.inherits(module.exports.Uploader, EventEmitter);

0 commit comments

Comments
 (0)