Skip to content

feat: Upgrade Redis 3 to 4 #8293

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 17 commits into from
Nov 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
648 changes: 346 additions & 302 deletions package-lock.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
"pg-monitor": "1.5.0",
"pg-promise": "10.12.1",
"pluralize": "8.0.0",
"redis": "3.1.2",
"redis": "4.0.6",
"semver": "7.3.8",
"subscriptions-transport-ws": "0.11.0",
"tv4": "1.3.0",
Expand Down
29 changes: 29 additions & 0 deletions spec/DefinedSchemas.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -677,4 +677,33 @@ describe('DefinedSchemas', () => {
expect(testSchema.classLevelPermissions.create).toEqual({ requiresAuthentication: true });
expect(logger.error).toHaveBeenCalledTimes(0);
});

it('should not affect cacheAdapter', async () => {
const server = await reconfigureServer();
const logger = require('../lib/logger').logger;
spyOn(logger, 'error').and.callThrough();
const migrationOptions = {
definitions: [
{
className: 'Test',
fields: { aField: { type: 'String' } },
indexes: { aField: { aField: 1 } },
classLevelPermissions: {
create: { requiresAuthentication: true },
},
},
],
};

const cacheAdapter = {
get: () => Promise.resolve(null),
put: () => {},
del: () => {},
clear: () => {},
connect: jasmine.createSpy('clear'),
};
server.config.cacheAdapter = cacheAdapter;
await new DefinedSchemas(migrationOptions, server.config).execute();
expect(cacheAdapter.connect).not.toHaveBeenCalled();
});
});
29 changes: 15 additions & 14 deletions spec/RedisCacheAdapter.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,22 @@ describe_only(() => {

beforeEach(async () => {
cache = new RedisCacheAdapter(null, 100);
await cache.connect();
await cache.clear();
});

it('should get/set/clear', done => {
it('should get/set/clear', async () => {
const cacheNaN = new RedisCacheAdapter({
ttl: NaN,
});

cacheNaN
.put(KEY, VALUE)
.then(() => cacheNaN.get(KEY))
.then(value => expect(value).toEqual(VALUE))
.then(() => cacheNaN.clear())
.then(() => cacheNaN.get(KEY))
.then(value => expect(value).toEqual(null))
.then(() => cacheNaN.clear())
.then(done);
await cacheNaN.connect();
await cacheNaN.put(KEY, VALUE);
let value = await cacheNaN.get(KEY);
expect(value).toEqual(VALUE);
await cacheNaN.clear();
value = await cacheNaN.get(KEY);
expect(value).toEqual(null);
await cacheNaN.clear();
});

it('should expire after ttl', done => {
Expand Down Expand Up @@ -100,7 +99,7 @@ describe_only(() => {
it('handleShutdown, close connection', async () => {
await cache.handleShutdown();
setTimeout(() => {
expect(cache.client.connected).toBe(false);
expect(cache.client.isOpen).toBe(false);
}, 0);
});
});
Expand All @@ -122,8 +121,9 @@ describe_only(() => {
return Object.keys(cache.queue.queue).length;
}

it('it should clear completed operations from queue', done => {
it('it should clear completed operations from queue', async done => {
const cache = new RedisCacheAdapter({ ttl: NaN });
await cache.connect();

// execute a bunch of operations in sequence
let promise = Promise.resolve();
Expand All @@ -144,8 +144,9 @@ describe_only(() => {
promise.then(() => expect(getQueueCount(cache)).toEqual(0)).then(done);
});

it('it should count per key chained operations correctly', done => {
it('it should count per key chained operations correctly', async done => {
const cache = new RedisCacheAdapter({ ttl: NaN });
await cache.connect();

let key1Promise = Promise.resolve();
let key2Promise = Promise.resolve();
Expand Down
118 changes: 40 additions & 78 deletions src/Adapters/Cache/RedisCacheAdapter.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import redis from 'redis';
import { createClient } from 'redis';
import logger from '../../logger';
import { KeyPromiseQueue } from '../../KeyPromiseQueue';

Expand All @@ -15,114 +15,76 @@ const isValidTTL = ttl => typeof ttl === 'number' && ttl > 0;
export class RedisCacheAdapter {
constructor(redisCtx, ttl = DEFAULT_REDIS_TTL) {
this.ttl = isValidTTL(ttl) ? ttl : DEFAULT_REDIS_TTL;
this.client = redis.createClient(redisCtx);
this.client = createClient(redisCtx);
this.queue = new KeyPromiseQueue();
}

handleShutdown() {
async connect() {
if (this.client.isOpen) {
return;
}
return this.client.connect();
}

async handleShutdown() {
if (!this.client) {
return Promise.resolve();
return;
}
try {
await this.client.quit();
} catch (err) {
logger.error('RedisCacheAdapter error on shutdown', { error: err });
}
return new Promise(resolve => {
this.client.quit(err => {
if (err) {
logger.error('RedisCacheAdapter error on shutdown', { error: err });
}
resolve();
});
});
}

get(key) {
async get(key) {
debug('get', { key });
return this.queue.enqueue(
key,
() =>
new Promise(resolve => {
this.client.get(key, function (err, res) {
debug('-> get', { key, res });
if (!res) {
return resolve(null);
}
resolve(JSON.parse(res));
});
})
);
try {
await this.queue.enqueue(key);
const res = await this.client.get(key);
if (!res) {
return null;
}
return JSON.parse(res);
} catch (err) {
logger.error('RedisCacheAdapter error on get', { error: err });
}
}

put(key, value, ttl = this.ttl) {
async put(key, value, ttl = this.ttl) {
value = JSON.stringify(value);
debug('put', { key, value, ttl });

await this.queue.enqueue(key);
if (ttl === 0) {
// ttl of zero is a logical no-op, but redis cannot set expire time of zero
return this.queue.enqueue(key, () => Promise.resolve());
return;
}

if (ttl === Infinity) {
return this.queue.enqueue(
key,
() =>
new Promise(resolve => {
this.client.set(key, value, function () {
resolve();
});
})
);
return this.client.set(key, value);
}

if (!isValidTTL(ttl)) {
ttl = this.ttl;
}

return this.queue.enqueue(
key,
() =>
new Promise(resolve => {
this.client.psetex(key, ttl, value, function () {
resolve();
});
})
);
return this.client.set(key, value, { PX: ttl });
}

del(key) {
async del(key) {
debug('del', { key });
return this.queue.enqueue(
key,
() =>
new Promise(resolve => {
this.client.del(key, function () {
resolve();
});
})
);
await this.queue.enqueue(key);
return this.client.del(key);
}

clear() {
async clear() {
debug('clear');
return this.queue.enqueue(
FLUSH_DB_KEY,
() =>
new Promise(resolve => {
this.client.flushdb(function () {
resolve();
});
})
);
await this.queue.enqueue(FLUSH_DB_KEY);
return this.client.sendCommand(['FLUSHDB']);
}

// Used for testing
async getAllKeys() {
return new Promise((resolve, reject) => {
this.client.keys('*', (err, keys) => {
if (err) {
reject(err);
} else {
resolve(keys);
}
});
});
getAllKeys() {
return this.client.keys('*');
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/Adapters/PubSub/RedisPubSub.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import redis from 'redis';
import { createClient } from 'redis';

function createPublisher({ redisURL, redisOptions = {} }): any {
redisOptions.no_ready_check = true;
return redis.createClient(redisURL, redisOptions);
return createClient(redisURL, redisOptions);
}

function createSubscriber({ redisURL, redisOptions = {} }): any {
redisOptions.no_ready_check = true;
return redis.createClient(redisURL, redisOptions);
return createClient(redisURL, redisOptions);
}

const RedisPubSub = {
Expand Down
11 changes: 10 additions & 1 deletion src/ParseServer.js
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,18 @@ class ParseServer {
.performInitialization()
.then(() => hooksController.load())
.then(async () => {
const startupPromises = [];
if (schema) {
await new DefinedSchemas(schema, this.config).execute();
startupPromises.push(new DefinedSchemas(schema, this.config).execute());
}
if (
options.cacheAdapter &&
options.cacheAdapter.connect &&
typeof options.cacheAdapter.connect === 'function'
) {
startupPromises.push(options.cacheAdapter.connect());
}
await Promise.all(startupPromises);
if (serverStartComplete) {
serverStartComplete();
}
Expand Down