Skip to content

Commit d291694

Browse files
committed
Batch to total request message.
1 parent 17ead99 commit d291694

File tree

5 files changed

+99
-9
lines changed

5 files changed

+99
-9
lines changed

package-lock.json

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "aws-lambda-stream",
3-
"version": "1.1.2",
3+
"version": "1.1.3",
44
"description": "Create stream processors with AWS Lambda functions.",
55
"keywords": [
66
"aws",

src/sinks/sqs.js

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,19 @@ import _ from 'highland';
22

33
import Connector from '../connectors/sqs';
44

5-
import { toBatchUow, unBatchUow } from '../utils/batch';
5+
import { batchWithSize, toBatchUow, unBatchUow } from '../utils/batch';
66
import { ratelimit } from '../utils/ratelimit';
77
import { rejectWithFault } from '../utils/faults';
88
import { debug as d } from '../utils/print';
9+
import { storeClaimcheck } from './claimcheck';
910

1011
export const sendToSqs = ({ // eslint-disable-line import/prefer-default-export
1112
id: pipelineId,
1213
debug = d('sqs'),
1314
queueUrl = process.env.QUEUE_URL,
1415
messageField = 'message',
1516
batchSize = Number(process.env.SQS_BATCH_SIZE) || Number(process.env.BATCH_SIZE) || 10,
17+
maxPublishRequestSize = Number(process.env.PUBLISH_MAX_REQ_SIZE) || Number(process.env.MAX_REQ_SIZE) || 256 * 1024,
1618
parallel = Number(process.env.SQS_PARALLEL) || Number(process.env.PARALLEL) || 8,
1719
step = 'send',
1820
...opt
@@ -46,7 +48,15 @@ export const sendToSqs = ({ // eslint-disable-line import/prefer-default-export
4648
return (s) => s
4749
.through(ratelimit(opt))
4850

49-
.batch(batchSize)
51+
.consume(batchWithSize({
52+
...opt,
53+
batchSize,
54+
maxRequestSize: maxPublishRequestSize,
55+
requestEntryField: messageField,
56+
claimcheckEventField: 'MessageBody',
57+
debug,
58+
}))
59+
.through(storeClaimcheck(opt))
5060
.map(toBatchUow)
5161

5262
.map(toInputParams)

src/utils/batch.js

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ export const compact = (rule) => {
5151
export const batchWithSize = ({
5252
claimCheckBucketName = process.env.CLAIMCHECK_BUCKET_NAME,
5353
putClaimcheckRequest = 'putClaimcheckRequest',
54+
// Detail handles EB, but others like SQS may need something else like 'MessageBody'
55+
claimcheckEventField = 'Detail',
5456
...opt
5557
}) => {
5658
let batched = [];
@@ -77,9 +79,9 @@ export const batchWithSize = ({
7779
logMetrics([x], [size], opt);
7880
if (claimCheckBucketName) {
7981
// setup claim check
80-
x[putClaimcheckRequest] = toPutClaimcheckRequest(JSON.parse(x[opt.requestEntryField].Detail), claimCheckBucketName);
81-
x[opt.requestEntryField].Detail = JSON.stringify(toClaimcheckEvent(
82-
JSON.parse(x[opt.requestEntryField].Detail),
82+
x[putClaimcheckRequest] = toPutClaimcheckRequest(JSON.parse(x[opt.requestEntryField][claimcheckEventField]), claimCheckBucketName);
83+
x[opt.requestEntryField][claimcheckEventField] = JSON.stringify(toClaimcheckEvent(
84+
JSON.parse(x[opt.requestEntryField][claimcheckEventField]),
8385
claimCheckBucketName,
8486
));
8587
size = Buffer.byteLength(JSON.stringify(x[opt.requestEntryField]));

test/unit/sinks/sqs.test.js

Lines changed: 79 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,11 @@ describe('sinks/sqs.js', () => {
1818
Id: '1',
1919
MessageBody: JSON.stringify({ f1: 'v1' }),
2020
},
21+
}, {
22+
message: {
23+
Id: '2',
24+
MessageBody: JSON.stringify({ f1: 'v2' }),
25+
},
2126
}];
2227

2328
_(uows)
@@ -26,7 +31,7 @@ describe('sinks/sqs.js', () => {
2631
.tap((collected) => {
2732
// console.log(JSON.stringify(collected, null, 2));
2833

29-
expect(collected.length).to.equal(1);
34+
expect(collected.length).to.equal(2);
3035
expect(collected[0]).to.deep.equal({
3136
message: {
3237
Id: '1',
@@ -36,6 +41,79 @@ describe('sinks/sqs.js', () => {
3641
Entries: [{
3742
Id: '1',
3843
MessageBody: JSON.stringify({ f1: 'v1' }),
44+
}, {
45+
Id: '2',
46+
MessageBody: JSON.stringify({ f1: 'v2' }),
47+
}],
48+
},
49+
sendMessageBatchResponse: {},
50+
});
51+
expect(collected[1]).to.deep.equal({
52+
message: {
53+
Id: '2',
54+
MessageBody: JSON.stringify({ f1: 'v2' }),
55+
},
56+
inputParams: {
57+
Entries: [{
58+
Id: '1',
59+
MessageBody: JSON.stringify({ f1: 'v1' }),
60+
}, {
61+
Id: '2',
62+
MessageBody: JSON.stringify({ f1: 'v2' }),
63+
}],
64+
},
65+
sendMessageBatchResponse: {},
66+
});
67+
})
68+
.done(done);
69+
});
70+
71+
it('should split a batch due to size constraints', (done) => {
72+
sinon.stub(Connector.prototype, 'sendMessageBatch').resolves({});
73+
74+
const uows = [{
75+
message: {
76+
Id: '1',
77+
MessageBody: JSON.stringify({ f1: 'v1' }),
78+
},
79+
}, {
80+
message: {
81+
Id: '2',
82+
MessageBody: JSON.stringify({ f1: 'v2' }),
83+
},
84+
}];
85+
86+
_(uows)
87+
.through(sendToSqs({
88+
maxPublishRequestSize: 50,
89+
}))
90+
.collect()
91+
.tap((collected) => {
92+
// console.log(JSON.stringify(collected, null, 2));
93+
94+
expect(collected.length).to.equal(2);
95+
expect(collected[0]).to.deep.equal({
96+
message: {
97+
Id: '1',
98+
MessageBody: JSON.stringify({ f1: 'v1' }),
99+
},
100+
inputParams: {
101+
Entries: [{
102+
Id: '1',
103+
MessageBody: JSON.stringify({ f1: 'v1' }),
104+
}],
105+
},
106+
sendMessageBatchResponse: {},
107+
});
108+
expect(collected[1]).to.deep.equal({
109+
message: {
110+
Id: '2',
111+
MessageBody: JSON.stringify({ f1: 'v2' }),
112+
},
113+
inputParams: {
114+
Entries: [{
115+
Id: '2',
116+
MessageBody: JSON.stringify({ f1: 'v2' }),
39117
}],
40118
},
41119
sendMessageBatchResponse: {},

0 commit comments

Comments
 (0)