Skip to content

Commit ad509ef

Browse files
authored
Merge pull request #6 from bolteu/task/RNTL-25522-non-wildcard-retained-keys-optimization
RNTL-25522: optimize non-wildcard retained topics handling
2 parents 35c2d3a + 4c240ca commit ad509ef

File tree

1 file changed

+27
-3
lines changed

1 file changed

+27
-3
lines changed

persistence.js

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,11 +81,24 @@ async function getRetainedKeys (db, hasClusters) {
8181
return await db.hkeys(RETAINEDKEY)
8282
}
8383

84-
async function getRetainedValue (db, topic, hasClusters) {
84+
async function getRetainedValueBuffer (db, topic, hasClusters) {
8585
if (hasClusters === true) {
86-
return msgpack.decode(await db.getBuffer(retainedKey(topic)))
86+
return db.getBuffer(retainedKey(topic))
87+
}
88+
return db.hgetBuffer(RETAINEDKEY, topic)
89+
}
90+
91+
async function getRetainedValue (db, topic, hasClusters) {
92+
return msgpack.decode(await getRetainedValueBuffer(db, topic, hasClusters))
93+
}
94+
95+
async function * getRetainedValuesStream (db, topics, hasClusters) {
96+
for (const topic of topics) {
97+
const buffer = await getRetainedValueBuffer(db, topic, hasClusters)
98+
if (buffer && buffer.length > 0) {
99+
yield msgpack.decode(buffer)
100+
}
87101
}
88-
return msgpack.decode(await db.hgetBuffer(RETAINEDKEY, topic))
89102
}
90103

91104
async function * createWillStream (db, brokers, maxWills) {
@@ -131,6 +144,9 @@ class RedisPersistence extends CachedPersistence {
131144

132145
this.logger = opts.logger || new NoopLogger()
133146

147+
// By default retained values query optimization is disabled (-1)
148+
this.retainedValuesQueryThreshold = opts.retained_values_query_threshold || -1
149+
134150
if (this.sharedCacheRefreshIntervalSec) {
135151
this.once('ready', () => {
136152
setInterval(() => {
@@ -184,7 +200,15 @@ class RedisPersistence extends CachedPersistence {
184200
}
185201
}
186202

203+
hasWildcard (patterns) {
204+
return patterns.some((pattern) => pattern.includes('+') || pattern.includes('#'))
205+
}
206+
187207
createRetainedStreamCombi (patterns) {
208+
if (patterns.length <= this.retainedValuesQueryThreshold && !this.hasWildcard(patterns)) {
209+
return Readable.from(getRetainedValuesStream(this._db, patterns, this.hasClusters))
210+
}
211+
188212
const qlobber = new QlobberTrue(qlobberOpts)
189213

190214
for (const pattern of patterns) {

0 commit comments

Comments
 (0)