Skip to content

Commit 0ca2ccb

Browse files
Merge pull request #9116 from alphaprinz/notif_backports
[Backport into 5.18] pr 9111, dfs 2834
2 parents 73dfa4c + 63aa494 commit 0ca2ccb

File tree

4 files changed

+23
-11
lines changed

4 files changed

+23
-11
lines changed

package-lock.json

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@
107107
"nan": "2.22.2",
108108
"ncp": "2.0.0",
109109
"node-addon-api": "8.3.1",
110-
"node-rdkafka": "3.4.0",
110+
"node-rdkafka": "3.4.1",
111111
"performance-now": "2.1.0",
112112
"pg": "8.16.0",
113113
"ping": "0.4.4",

src/test/system_tests/ceph_s3_tests/test_ceph_s3_deploy.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ commit_epoch=$(git show -s --format=%ci ${CEPH_TESTS_VERSION} | awk '{print $1}'
3838
commit_date=$(date -d ${commit_epoch} +%s)
3939
current_date=$(date +%s)
4040

41-
max_days="450"
41+
max_days="600"
4242
if [ $((current_date-commit_date)) -gt $((3600*24*${max_days})) ]
4343
then
4444
echo "ceph tests were not updated for ${max_days} days, Exiting"

src/util/notifications_util.js

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -103,10 +103,11 @@ class Notificator {
103103
throw err;
104104
} finally {
105105
await log.close();
106-
this.notif_to_connect.clear();
107106
for (const conn of this.connect_str_to_connection.values()) {
108107
conn.destroy();
109108
}
109+
this.connect_str_to_connection.clear();
110+
this.notif_to_connect.clear();
110111
}
111112
}
112113
}
@@ -259,18 +260,25 @@ class KafkaNotificator {
259260

260261
async connect() {
261262
this.connection = new Kafka.HighLevelProducer(this.connect_obj.kafka_options_object);
263+
dbg.log2("Kafka producer connecting, connect =", this.connect_obj);
262264
await new Promise((res, rej) => {
263265
this.connection.on('ready', () => {
266+
dbg.log2("Kafka producer connected for connection =", this.connect_obj);
264267
res();
265268
});
266269
this.connection.on('connection.failure', err => {
270+
dbg.error("Kafka producer failed to connect. connect = ", this.connect_obj, ", err =", err);
267271
rej(err);
268272
});
269273
this.connection.on('event.log', arg => {
270-
dbg.log1("event log", arg);
274+
dbg.log2("event log", arg);
275+
});
276+
this.connection.on('event.error', arg => {
277+
dbg.error("event error =", arg);
271278
});
272279
this.connection.connect();
273280
});
281+
dbg.log2("Kafka producer's connect done, connect =", this.connect_obj);
274282
this.connection.setPollInterval(100);
275283
}
276284

@@ -283,10 +291,12 @@ class KafkaNotificator {
283291
Buffer.from(JSON.stringify(notif.notif)),
284292
null,
285293
Date.now(),
286-
(err, offset) => {
294+
err => {
287295
if (err) {
296+
dbg.error("Failed to notify. Connect =", connect_obj, ", notif =", notif);
288297
promise_failure_cb(notif, failure_ctxt, err).then(resolve);
289298
} else {
299+
dbg.log2("Kafka notify successful. Connect =", connect_obj, ", notif =", notif);
290300
resolve();
291301
}
292302
}
@@ -295,8 +305,10 @@ class KafkaNotificator {
295305
}
296306

297307
destroy() {
298-
this.connection.flush(10000);
299-
this.connection.disconnect();
308+
if (this.connection.isConnected()) {
309+
this.connection.flush(10000);
310+
this.connection.disconnect();
311+
}
300312
}
301313
}
302314

0 commit comments

Comments
 (0)