Skip to content

Commit 135fb10

Browse files
committed
Add decreaseCount to concurrency monitor
1 parent f20554e commit 135fb10

File tree

2 files changed

+62
-3
lines changed

2 files changed

+62
-3
lines changed

libs/concurrencyMonitor.js

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/**
2+
* ClusterODM - A reverse proxy, load balancer and task tracker for NodeODM
3+
* Copyright (C) 2018-present MasseranoLabs LLC
4+
*
5+
* This program is free software: you can redistribute it and/or modify
6+
* it under the terms of the GNU Affero General Public License as
7+
* published by the Free Software Foundation, either version 3 of the
8+
* License, or (at your option) any later version.
9+
*
10+
* This program is distributed in the hope that it will be useful,
11+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13+
* GNU Affero General Public License for more details.
14+
*
15+
* You should have received a copy of the GNU Affero General Public License
16+
* along with this program. If not, see <https://www.gnu.org/licenses/>.
17+
*/
18+
19+
let userTasks = {};
20+
const LIMIT_WINDOW = 60 * 1000; // 1 minute
21+
22+
module.exports = {
23+
checkCommitLimitReached: function(maxConcurrentTasks, userToken){
24+
if (maxConcurrentTasks === 0) return true;
25+
if (!maxConcurrentTasks) return false;
26+
27+
if (!userToken) userToken = "default";
28+
let now = new Date().getTime();
29+
30+
// Remove expired items
31+
for (let t in userTasks){
32+
userTasks[t] = userTasks[t].filter(tm => now - tm <= LIMIT_WINDOW);
33+
if (userTasks[t].length === 0) delete(userTasks[t]);
34+
}
35+
36+
if (userTasks[userToken] === undefined) userTasks[userToken] = [];
37+
38+
userTasks[userToken].push(now);
39+
return userTasks[userToken].length > maxConcurrentTasks;
40+
},
41+
42+
decreaseCount: function(userToken){
43+
if (!userToken) userToken = "default";
44+
if (userTasks[userToken] !== undefined && userTasks[userToken].length > 0){
45+
userTasks[userToken].shift();
46+
}
47+
},
48+
};

libs/proxy.js

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ const async = require('async');
3636
const odmOptions = require('./odmOptions');
3737
const asrProvider = require('./asrProvider');
3838
const floodMonitor = require('./floodMonitor');
39+
const concurrencyMonitor = require('./concurrencyMonitor');
3940
const AWS = require('aws-sdk');
4041

4142
module.exports = {
@@ -209,13 +210,15 @@ module.exports = {
209210
const taskId = taskInfo.uuid;
210211

211212
asrProvider.onCommit(taskId, 10 * 1000);
212-
213+
213214
// Add reference to S3 path if necessary
214215
if (asrProvider.downloadsPath()){
215216
taskInfo.s3Path = asrProvider.downloadsPath();
216217
}
217-
218+
218219
const token = await routetable.lookupToken(taskId);
220+
concurrencyMonitor.decreaseCount(token);
221+
219222
try{
220223
cloudProvider.taskFinished(token, taskInfo);
221224
}catch(e){
@@ -370,11 +373,17 @@ module.exports = {
370373
asrProvider.cleanup(taskId);
371374
};
372375

376+
if (concurrencyMonitor.checkCommitLimitReached(limits.maxConcurrentTasks, query.token)){
377+
die(`Reached maximum number of concurrent tasks, please wait until other tasks have finished, then restart the task.`);
378+
return;
379+
}
380+
373381
if (await maxConcurrencyLimitReached(limits.maxConcurrentTasks, query.token)){
374-
die(`Reached maximum number of concurrent tasks: ${limits.maxConcurrentTasks}. Please wait until other tasks have finished, then restart the task.`);
382+
die(`Reached maximum number of concurrent tasks. Please wait until other tasks have finished, then restart the task.`);
375383
return;
376384
}
377385

386+
378387
floodMonitor.recordTaskCommit(query.token);
379388
utils.markTaskAsCommitted(taskId);
380389

@@ -456,6 +465,8 @@ module.exports = {
456465
});
457466
busboy.on('finish', async function() {
458467
if (taskId){
468+
concurrencyMonitor.decreaseCount(query.token);
469+
459470
let node = await routetable.lookupNode(taskId);
460471
if (node){
461472
overrideRequest(req, node, query, pathname);

0 commit comments

Comments
 (0)