Skip to content

Commit 32fc45d

Browse files
authored
Add KeyPromiseQueue to Push and Job StatusHandlers (#7267)
* Add KeyPromiseQueue to Push and Job StatusHandlers * Update CHANGELOG.md * Update CHANGELOG.md
1 parent f7d2e09 commit 32fc45d

File tree

4 files changed

+17
-28
lines changed

4 files changed

+17
-28
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ ___
116116
- Test Parse Server continuously against all relevant Postgres versions (minor versions), added Postgres compatibility table to Parse Server docs (Corey Baker) [#7176](https://github.com/parse-community/parse-server/pull/7176)
117117
- Randomize test suite (Diamond Lewis) [#7265](https://github.com/parse-community/parse-server/pull/7265)
118118
- LDAP: Properly unbind client on group search error (Diamond Lewis) [#7265](https://github.com/parse-community/parse-server/pull/7265)
119+
- Improve data consistency in Push and Job Status update (Diamond Lewis) [#7267](https://github.com/parse-community/parse-server/pull/7267)
119120
___
120121
## 4.5.0
121122
[Full Changelog](https://github.com/parse-community/parse-server/compare/4.4.0...4.5.0)

src/Adapters/Cache/RedisCacheAdapter/index.js renamed to src/Adapters/Cache/RedisCacheAdapter.js

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import redis from 'redis';
2-
import logger from '../../../logger';
3-
import { KeyPromiseQueue } from './KeyPromiseQueue';
2+
import logger from '../../logger';
3+
import { KeyPromiseQueue } from '../../KeyPromiseQueue';
44

55
const DEFAULT_REDIS_TTL = 30 * 1000; // 30 seconds in milliseconds
66
const FLUSH_DB_KEY = '__flush_db__';

src/StatusHandler.js

+14-26
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
11
import { md5Hash, newObjectId } from './cryptoUtils';
2+
import { KeyPromiseQueue } from './KeyPromiseQueue';
23
import { logger } from './logger';
34
import rest from './rest';
45
import Auth from './Auth';
56

67
const PUSH_STATUS_COLLECTION = '_PushStatus';
78
const JOB_STATUS_COLLECTION = '_JobStatus';
89

10+
const pushPromiseQueue = new KeyPromiseQueue();
11+
const jobPromiseQueue = new KeyPromiseQueue();
12+
913
const incrementOp = function (object = {}, key, amount = 1) {
1014
if (!object[key]) {
1115
object[key] = { __op: 'Increment', amount: amount };
@@ -28,22 +32,14 @@ export function flatten(array) {
2832
}
2933

3034
function statusHandler(className, database) {
31-
let lastPromise = Promise.resolve();
32-
3335
function create(object) {
34-
lastPromise = lastPromise.then(() => {
35-
return database.create(className, object).then(() => {
36-
return Promise.resolve(object);
37-
});
36+
return database.create(className, object).then(() => {
37+
return Promise.resolve(object);
3838
});
39-
return lastPromise;
4039
}
4140

4241
function update(where, object) {
43-
lastPromise = lastPromise.then(() => {
44-
return database.update(className, where, object);
45-
});
46-
return lastPromise;
42+
return jobPromiseQueue.enqueue(where.objectId, () => database.update(className, where, object));
4743
}
4844

4945
return Object.freeze({
@@ -53,29 +49,21 @@ function statusHandler(className, database) {
5349
}
5450

5551
function restStatusHandler(className, config) {
56-
let lastPromise = Promise.resolve();
5752
const auth = Auth.master(config);
5853
function create(object) {
59-
lastPromise = lastPromise.then(() => {
60-
return rest.create(config, auth, className, object).then(({ response }) => {
61-
// merge the objects
62-
return Promise.resolve(Object.assign({}, object, response));
63-
});
54+
return rest.create(config, auth, className, object).then(({ response }) => {
55+
return { ...object, ...response };
6456
});
65-
return lastPromise;
6657
}
6758

6859
function update(where, object) {
69-
// TODO: when we have updateWhere, use that for proper interfacing
70-
lastPromise = lastPromise.then(() => {
71-
return rest
60+
return pushPromiseQueue.enqueue(where.objectId, () =>
61+
rest
7262
.update(config, auth, className, { objectId: where.objectId }, object)
7363
.then(({ response }) => {
74-
// merge the objects
75-
return Promise.resolve(Object.assign({}, object, response));
76-
});
77-
});
78-
return lastPromise;
64+
return { ...object, ...response };
65+
})
66+
);
7967
}
8068

8169
return Object.freeze({

0 commit comments

Comments
 (0)