Skip to content

Commit 27c0ae8

Browse files
fix: RabbitMQ monitor to more properly handle all nodes failure (louislam#6646)
Co-authored-by: Frank Elsinga <frank@elsinga.de>
1 parent 5accda3 commit 27c0ae8

2 files changed

Lines changed: 267 additions & 38 deletions

File tree

server/monitor-types/rabbitmq.js

Lines changed: 75 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -17,45 +17,83 @@ class RabbitMqMonitorType extends MonitorType {
1717
throw new Error("Invalid RabbitMQ Nodes");
1818
}
1919

20-
for (let baseUrl of baseUrls) {
20+
if (baseUrls.length === 0) {
21+
throw new Error("No RabbitMQ nodes configured");
22+
}
23+
24+
const errors = [];
25+
26+
for (let i = 0; i < baseUrls.length; i++) {
27+
const baseUrl = baseUrls[i];
28+
const nodeIndex = i + 1;
29+
2130
try {
22-
// Without a trailing slash, path in baseUrl will be removed. https://example.com/api -> https://example.com
23-
if ( !baseUrl.endsWith("/") ) {
24-
baseUrl += "/";
25-
}
26-
const options = {
27-
// Do not start with slash, it will strip the trailing slash from baseUrl
28-
url: new URL("api/health/checks/alarms/", baseUrl).href,
29-
method: "get",
30-
timeout: monitor.timeout * 1000,
31-
headers: {
32-
"Accept": "application/json",
33-
"Authorization": "Basic " + Buffer.from(`${monitor.rabbitmqUsername || ""}:${monitor.rabbitmqPassword || ""}`).toString("base64"),
34-
},
35-
signal: axiosAbortSignal((monitor.timeout + 10) * 1000),
36-
// Capture reason for 503 status
37-
validateStatus: (status) => status === 200 || status === 503,
38-
};
39-
log.debug("monitor", `[${monitor.name}] Axios Request: ${JSON.stringify(options)}`);
40-
const res = await axios.request(options);
41-
log.debug("monitor", `[${monitor.name}] Axios Response: status=${res.status} body=${JSON.stringify(res.data)}`);
42-
if (res.status === 200) {
43-
heartbeat.status = UP;
44-
heartbeat.msg = "OK";
45-
break;
46-
} else if (res.status === 503) {
47-
throw new Error(res.data.reason);
48-
} else {
49-
throw new Error(`${res.status} - ${res.statusText}`);
50-
}
31+
await this.checkSingleNode(monitor, baseUrl, `${nodeIndex}/${baseUrls.length}`);
32+
// If checkSingleNode succeeds (doesn't throw), set heartbeat to UP
33+
heartbeat.status = UP;
34+
heartbeat.msg = baseUrls.length === 1 ? "Node is reachable and there are no alerts in the cluster" : `One of the ${baseUrls.length} nodes is reachable and there are no alerts in the cluster`;
35+
return;
5136
} catch (error) {
52-
if (axios.isCancel(error)) {
53-
log.debug("monitor", `[${monitor.name}] Request timed out`);
54-
throw new Error("Request timed out");
55-
} else {
56-
log.debug("monitor", `[${monitor.name}] Axios Error: ${JSON.stringify(error.message)}`);
57-
throw new Error(error.message);
58-
}
37+
log.warn(this.name, `Node ${nodeIndex}: ${error.message}`);
38+
errors.push(`Node ${nodeIndex}: ${error.message}`);
39+
}
40+
}
41+
42+
// If we reach here, all nodes failed
43+
throw new Error(`All ${errors.length} nodes failed because ${errors.join("; ")}`);
44+
}
45+
46+
/**
47+
* Check a single RabbitMQ node
48+
* @param {object} monitor Monitor configuration
49+
* @param {string} baseUrl Base URL of the RabbitMQ node
50+
* @param {string} nodeInfo Node index info for logging (e.g., "1/3")
51+
* @returns {Promise<void>}
52+
* @throws {Error} If the node check fails
53+
*/
54+
async checkSingleNode(monitor, baseUrl, nodeInfo) {
55+
// Without a trailing slash, path in baseUrl will be removed. https://example.com/api -> https://example.com
56+
let normalizedUrl = baseUrl;
57+
if (!normalizedUrl.endsWith("/")) {
58+
normalizedUrl += "/";
59+
}
60+
61+
const options = {
62+
// Do not start with slash, it will strip the trailing slash from baseUrl
63+
url: new URL("api/health/checks/alarms/", normalizedUrl).href,
64+
method: "get",
65+
timeout: monitor.timeout * 1000,
66+
headers: {
67+
"Accept": "application/json",
68+
"Authorization": "Basic " + Buffer.from(`${monitor.rabbitmqUsername || ""}:${monitor.rabbitmqPassword || ""}`).toString("base64"),
69+
},
70+
signal: axiosAbortSignal((monitor.timeout + 10) * 1000),
71+
// Capture reason for 503 status
72+
validateStatus: (status) => status === 200 || status === 503,
73+
};
74+
75+
log.debug("monitor", `[${monitor.name}] Checking node ${nodeInfo}: ${baseUrl}`);
76+
77+
try {
78+
const res = await axios.request(options);
79+
log.debug("monitor", `[${monitor.name}] Axios Response: status=${res.status} body=${JSON.stringify(res.data)}`);
80+
81+
if (res.status === 200) {
82+
log.debug("monitor", `[${monitor.name}] Node ${nodeInfo} is healthy`);
83+
// Success - return without error
84+
} else if (res.status === 503) {
85+
throw new Error(res.data.reason);
86+
} else {
87+
throw new Error(`${res.status} - ${res.statusText}`);
88+
}
89+
} catch (error) {
90+
if (axios.isCancel(error)) {
91+
throw new Error("Request timed out");
92+
} else if (error.response) {
93+
// Re-throw with the original error message if it's already formatted
94+
throw error;
95+
} else {
96+
throw new Error(error.message);
5997
}
6098
}
6199
}

test/backend-test/monitors/test-rabbitmq.js

Lines changed: 192 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ describe("RabbitMQ Single Node", {
1717
rabbitmqNodes: JSON.stringify([ connectionString ]),
1818
rabbitmqUsername: "guest",
1919
rabbitmqPassword: "guest",
20+
timeout: 10,
2021
};
2122

2223
const heartbeat = {
@@ -27,7 +28,7 @@ describe("RabbitMQ Single Node", {
2728
try {
2829
await rabbitMQMonitor.check(monitor, heartbeat, {});
2930
assert.strictEqual(heartbeat.status, UP);
30-
assert.strictEqual(heartbeat.msg, "OK");
31+
assert.strictEqual(heartbeat.msg, "Node is reachable and there are no alerts in the cluster");
3132
} finally {
3233
rabbitMQContainer.stop();
3334
}
@@ -39,6 +40,7 @@ describe("RabbitMQ Single Node", {
3940
rabbitmqNodes: JSON.stringify([ "http://localhost:15672" ]),
4041
rabbitmqUsername: "rabbitmqUser",
4142
rabbitmqPassword: "rabbitmqPass",
43+
timeout: 10,
4244
};
4345

4446
const heartbeat = {
@@ -55,4 +57,193 @@ describe("RabbitMQ Single Node", {
5557
);
5658
});
5759

60+
test("checkSingleNode() succeeds when node is healthy", async () => {
61+
const rabbitMQContainer = await new RabbitMQContainer().withStartupTimeout(60000).start();
62+
const rabbitMQMonitor = new RabbitMqMonitorType();
63+
const connectionString = `http://${rabbitMQContainer.getHost()}:${rabbitMQContainer.getMappedPort(15672)}`;
64+
65+
const monitor = {
66+
name: "Test Monitor",
67+
rabbitmqUsername: "guest",
68+
rabbitmqPassword: "guest",
69+
timeout: 10,
70+
};
71+
72+
try {
73+
// Should not throw - just validates the node is healthy
74+
await rabbitMQMonitor.checkSingleNode(monitor, connectionString, "1/1");
75+
} finally {
76+
rabbitMQContainer.stop();
77+
}
78+
});
79+
80+
test("checkSingleNode() throws error when node is unreachable", async () => {
81+
const rabbitMQMonitor = new RabbitMqMonitorType();
82+
const monitor = {
83+
name: "Test Monitor",
84+
rabbitmqUsername: "guest",
85+
rabbitmqPassword: "guest",
86+
timeout: 10,
87+
};
88+
89+
// Should reject with any error (connection refused, timeout, etc.)
90+
await assert.rejects(
91+
rabbitMQMonitor.checkSingleNode(monitor, "http://localhost:15672", "1/1"),
92+
Error
93+
);
94+
});
95+
});
96+
97+
describe("RabbitMQ Multi-Node (Mocked)", () => {
98+
test("check() succeeds when first node is healthy", async () => {
99+
const rabbitMQMonitor = new RabbitMqMonitorType();
100+
const monitor = {
101+
rabbitmqNodes: JSON.stringify([ "http://node1:15672", "http://node2:15672" ]),
102+
rabbitmqUsername: "guest",
103+
rabbitmqPassword: "guest",
104+
timeout: 10,
105+
};
106+
107+
const heartbeat = {
108+
msg: "",
109+
status: PENDING,
110+
};
111+
112+
// Mock checkSingleNode to succeed on first call (just don't throw)
113+
let callCount = 0;
114+
rabbitMQMonitor.checkSingleNode = async (mon, url, nodeInfo) => {
115+
callCount++;
116+
// Success - don't throw
117+
};
118+
119+
await rabbitMQMonitor.check(monitor, heartbeat, {});
120+
assert.strictEqual(heartbeat.status, UP);
121+
assert.strictEqual(heartbeat.msg, "One of the 2 nodes is reachable and there are no alerts in the cluster");
122+
assert.strictEqual(callCount, 1, "Should only check first node");
123+
});
124+
125+
test("check() succeeds when second node is healthy after first fails", async () => {
126+
const rabbitMQMonitor = new RabbitMqMonitorType();
127+
const monitor = {
128+
rabbitmqNodes: JSON.stringify([ "http://node1:15672", "http://node2:15672" ]),
129+
rabbitmqUsername: "guest",
130+
rabbitmqPassword: "guest",
131+
timeout: 10,
132+
};
133+
134+
const heartbeat = {
135+
msg: "",
136+
status: PENDING,
137+
};
138+
139+
// Mock checkSingleNode to fail first, succeed second
140+
let callCount = 0;
141+
rabbitMQMonitor.checkSingleNode = async (mon, url, nodeInfo) => {
142+
callCount++;
143+
if (callCount === 1) {
144+
throw new Error("Node 1 connection failed");
145+
}
146+
// Second call succeeds - don't throw
147+
};
148+
149+
await rabbitMQMonitor.check(monitor, heartbeat, {});
150+
assert.strictEqual(heartbeat.status, UP);
151+
assert.strictEqual(heartbeat.msg, "One of the 2 nodes is reachable and there are no alerts in the cluster");
152+
assert.strictEqual(callCount, 2, "Should check both nodes");
153+
});
154+
155+
test("check() fails with consolidated error when all nodes are down", async () => {
156+
const rabbitMQMonitor = new RabbitMqMonitorType();
157+
const monitor = {
158+
rabbitmqNodes: JSON.stringify([
159+
"http://node1:15672",
160+
"http://node2:15672",
161+
"http://node3:15672"
162+
]),
163+
rabbitmqUsername: "guest",
164+
rabbitmqPassword: "guest",
165+
timeout: 10,
166+
};
167+
168+
const heartbeat = {
169+
msg: "",
170+
status: PENDING,
171+
};
172+
173+
// Mock checkSingleNode to always fail
174+
let callCount = 0;
175+
rabbitMQMonitor.checkSingleNode = async (mon, url, nodeInfo) => {
176+
callCount++;
177+
throw new Error(`Connection failed to node ${callCount}`);
178+
};
179+
180+
await assert.rejects(
181+
rabbitMQMonitor.check(monitor, heartbeat, {}),
182+
(error) => {
183+
assert.match(error.message, /All 3 nodes failed/);
184+
assert.match(error.message, /Node 1:/);
185+
assert.match(error.message, /Node 2:/);
186+
assert.match(error.message, /Node 3:/);
187+
return true;
188+
}
189+
);
190+
assert.strictEqual(callCount, 3, "Should check all three nodes");
191+
});
192+
193+
test("check() fails when no nodes are configured", async () => {
194+
const rabbitMQMonitor = new RabbitMqMonitorType();
195+
const monitor = {
196+
rabbitmqNodes: JSON.stringify([]),
197+
rabbitmqUsername: "guest",
198+
rabbitmqPassword: "guest",
199+
timeout: 10,
200+
};
201+
202+
const heartbeat = {
203+
msg: "",
204+
status: PENDING,
205+
};
206+
207+
await assert.rejects(
208+
rabbitMQMonitor.check(monitor, heartbeat, {}),
209+
/No RabbitMQ nodes configured/
210+
);
211+
});
212+
213+
test("check() tries all nodes before failing", async () => {
214+
const rabbitMQMonitor = new RabbitMqMonitorType();
215+
const monitor = {
216+
rabbitmqNodes: JSON.stringify([
217+
"http://node1:15672",
218+
"http://node2:15672",
219+
"http://node3:15672",
220+
"http://node4:15672"
221+
]),
222+
rabbitmqUsername: "guest",
223+
rabbitmqPassword: "guest",
224+
timeout: 10,
225+
};
226+
227+
const heartbeat = {
228+
msg: "",
229+
status: PENDING,
230+
};
231+
232+
const checkedNodes = [];
233+
rabbitMQMonitor.checkSingleNode = async (mon, url, nodeInfo) => {
234+
checkedNodes.push(url);
235+
throw new Error(`Failed: ${url}`);
236+
};
237+
238+
await assert.rejects(
239+
rabbitMQMonitor.check(monitor, heartbeat, {}),
240+
/All 4 nodes failed/
241+
);
242+
243+
assert.strictEqual(checkedNodes.length, 4, "Should check all 4 nodes");
244+
assert.strictEqual(checkedNodes[0], "http://node1:15672");
245+
assert.strictEqual(checkedNodes[1], "http://node2:15672");
246+
assert.strictEqual(checkedNodes[2], "http://node3:15672");
247+
assert.strictEqual(checkedNodes[3], "http://node4:15672");
248+
});
58249
});

0 commit comments

Comments
 (0)