Skip to content

Commit 7a1d103

Browse files
Vraj2725Vraj Patel
andauthored
Adding logs to debug unexpected response in celery sqs broker for remaining versions and syncing unit tests for all versions (#338)
Adding logs to debug unexpected response in celery sqs broker for remaining versions and syncing unit tests for all versions *Issue #, if available:* #315 *Description of changes:* Adding a check to make sure Attributes field is present and log the response if no to helping debugging the issue By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice. Co-authored-by: Vraj Patel <[email protected]>
1 parent 1a1c9c4 commit 7a1d103

File tree

6 files changed

+168
-21
lines changed

6 files changed

+168
-21
lines changed

images/airflow/2.10.1/python/mwaa/celery/sqs_broker.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1158,7 +1158,11 @@ def _size(self, queue):
11581158
resp = c.get_queue_attributes(
11591159
QueueUrl=url, AttributeNames=["ApproximateNumberOfMessages"]
11601160
)
1161-
return int(resp["Attributes"]["ApproximateNumberOfMessages"])
1161+
try:
1162+
return int(resp["Attributes"]["ApproximateNumberOfMessages"])
1163+
except Exception:
1164+
logger.error("Unexpected response from SQS get_queue_attributes: %s", resp)
1165+
raise
11621166

11631167
def _purge(self, queue):
11641168
"""Delete all current messages in a queue."""

images/airflow/2.9.2/python/mwaa/celery/sqs_broker.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1158,7 +1158,11 @@ def _size(self, queue):
11581158
resp = c.get_queue_attributes(
11591159
QueueUrl=url, AttributeNames=["ApproximateNumberOfMessages"]
11601160
)
1161-
return int(resp["Attributes"]["ApproximateNumberOfMessages"])
1161+
try:
1162+
return int(resp["Attributes"]["ApproximateNumberOfMessages"])
1163+
except Exception:
1164+
logger.error("Unexpected response from SQS get_queue_attributes: %s", resp)
1165+
raise
11621166

11631167
def _purge(self, queue):
11641168
"""Delete all current messages in a queue."""

tests/images/airflow/2.10.1/celery/test_sqs_broker_2_10_1.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,3 +184,36 @@ def test_worker_heartbeat_conditional_metrics(self, mock_stats, mock_channel, nu
184184
else:
185185
assert not any(call[0][0] == "mwaa.celery.sqs.consumption_paused"
186186
for call in mock_stats.gauge.call_args_list)
187+
188+
def test_size_with_attributes(self, mock_channel):
189+
"""Test _size returns message count when Attributes exist."""
190+
queue = 'test-queue'
191+
mock_sqs = MagicMock()
192+
mock_sqs.get_queue_attributes.return_value = {
193+
'Attributes': {'ApproximateNumberOfMessages': '5'}
194+
}
195+
196+
with patch.object(mock_channel, '_new_queue', return_value='queue-url'), \
197+
patch.object(mock_channel, 'canonical_queue_name', return_value=queue), \
198+
patch.object(mock_channel, 'sqs', return_value=mock_sqs):
199+
result = mock_channel._size(queue)
200+
201+
assert result == 5
202+
mock_sqs.get_queue_attributes.assert_called_once_with(
203+
QueueUrl='queue-url', AttributeNames=['ApproximateNumberOfMessages']
204+
)
205+
206+
def test_size_without_attributes(self, mock_channel):
207+
"""Test _size raises exception when Attributes missing."""
208+
queue = 'test-queue'
209+
mock_sqs = MagicMock()
210+
mock_sqs.get_queue_attributes.return_value = {}
211+
212+
with patch.object(mock_channel, '_new_queue', return_value='queue-url'), \
213+
patch.object(mock_channel, 'canonical_queue_name', return_value=queue), \
214+
patch.object(mock_channel, 'sqs', return_value=mock_sqs), \
215+
patch('mwaa.celery.sqs_broker.logger') as mock_logger:
216+
with pytest.raises(KeyError):
217+
mock_channel._size(queue)
218+
219+
mock_logger.error.assert_called_once()

tests/images/airflow/2.10.3/python/mwaa/celery/test_sqs_broker_2_10_3.py

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -119,9 +119,8 @@ def test_size_with_attributes(self, mock_channel):
119119
}
120120

121121
with patch.object(mock_channel, '_new_queue', return_value='queue-url'), \
122-
patch.object(mock_channel, 'canonical_queue_name', return_value=queue), \
123-
patch.object(mock_channel, 'sqs', return_value=mock_sqs):
124-
122+
patch.object(mock_channel, 'canonical_queue_name', return_value=queue), \
123+
patch.object(mock_channel, 'sqs', return_value=mock_sqs):
125124
result = mock_channel._size(queue)
126125

127126
assert result == 5
@@ -136,10 +135,9 @@ def test_size_without_attributes(self, mock_channel):
136135
mock_sqs.get_queue_attributes.return_value = {}
137136

138137
with patch.object(mock_channel, '_new_queue', return_value='queue-url'), \
139-
patch.object(mock_channel, 'canonical_queue_name', return_value=queue), \
140-
patch.object(mock_channel, 'sqs', return_value=mock_sqs), \
141-
patch('mwaa.celery.sqs_broker.logger') as mock_logger:
142-
138+
patch.object(mock_channel, 'canonical_queue_name', return_value=queue), \
139+
patch.object(mock_channel, 'sqs', return_value=mock_sqs), \
140+
patch('mwaa.celery.sqs_broker.logger') as mock_logger:
143141
with pytest.raises(KeyError):
144142
mock_channel._size(queue)
145143

tests/images/airflow/2.11.0/python/mwaa/celery/test_sqs_broker_2_11_0.py

Lines changed: 87 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -100,13 +100,12 @@ def test_size_with_attributes(self, mock_channel):
100100
mock_sqs.get_queue_attributes.return_value = {
101101
'Attributes': {'ApproximateNumberOfMessages': '5'}
102102
}
103-
103+
104104
with patch.object(mock_channel, '_new_queue', return_value='queue-url'), \
105-
patch.object(mock_channel, 'canonical_queue_name', return_value=queue), \
106-
patch.object(mock_channel, 'sqs', return_value=mock_sqs):
107-
105+
patch.object(mock_channel, 'canonical_queue_name', return_value=queue), \
106+
patch.object(mock_channel, 'sqs', return_value=mock_sqs):
108107
result = mock_channel._size(queue)
109-
108+
110109
assert result == 5
111110
mock_sqs.get_queue_attributes.assert_called_once_with(
112111
QueueUrl='queue-url', AttributeNames=['ApproximateNumberOfMessages']
@@ -117,13 +116,89 @@ def test_size_without_attributes(self, mock_channel):
117116
queue = 'test-queue'
118117
mock_sqs = MagicMock()
119118
mock_sqs.get_queue_attributes.return_value = {}
120-
119+
121120
with patch.object(mock_channel, '_new_queue', return_value='queue-url'), \
122-
patch.object(mock_channel, 'canonical_queue_name', return_value=queue), \
123-
patch.object(mock_channel, 'sqs', return_value=mock_sqs), \
124-
patch('mwaa.celery.sqs_broker.logger') as mock_logger:
125-
121+
patch.object(mock_channel, 'canonical_queue_name', return_value=queue), \
122+
patch.object(mock_channel, 'sqs', return_value=mock_sqs), \
123+
patch('mwaa.celery.sqs_broker.logger') as mock_logger:
126124
with pytest.raises(KeyError):
127125
mock_channel._size(queue)
128-
129-
mock_logger.error.assert_called_once()
126+
127+
mock_logger.error.assert_called_once()
128+
129+
def test_celery_worker_task_limit_constant_parsing(self):
130+
"""Test CELERY_WORKER_TASK_LIMIT correctly parses environment variable."""
131+
with patch('mwaa.celery.sqs_broker.os.environ.get') as mock_env_get:
132+
mock_env_get.return_value = '20,10'
133+
134+
# Re-import to trigger constant re-evaluation
135+
import importlib
136+
import mwaa.celery.sqs_broker
137+
importlib.reload(mwaa.celery.sqs_broker)
138+
139+
from mwaa.celery.sqs_broker import CELERY_WORKER_TASK_LIMIT
140+
assert CELERY_WORKER_TASK_LIMIT == 20
141+
142+
@pytest.mark.parametrize("monitoring_enabled,expected_tasks", [
143+
(True, 15), # When monitoring enabled, use actual task count
144+
(False, 20), # When monitoring disabled, use CELERY_WORKER_TASK_LIMIT
145+
])
146+
@patch('mwaa.celery.sqs_broker.Stats')
147+
def test_worker_heartbeat_active_tasks_calculation(self, mock_stats, mock_channel, monitoring_enabled,
148+
expected_tasks):
149+
"""Test num_active_tasks calculation logic."""
150+
message = {'test': 'message'}
151+
exchange = 'celeryev'
152+
routing_key = 'worker.heartbeat'
153+
154+
mock_channel._get_tasks_from_state = MagicMock(return_value=[{'task': 'test'}] * 15)
155+
mock_channel._is_task_consumption_paused = MagicMock(return_value=False)
156+
mock_channel.idle_worker_monitoring_enabled = monitoring_enabled
157+
158+
with patch('mwaa.celery.sqs_broker.os.environ.get') as mock_env_get:
159+
mock_env_get.return_value = 'true'
160+
161+
mock_channel.basic_publish(message, exchange, routing_key)
162+
163+
if expected_tasks >= 20: # CELERY_WORKER_TASK_LIMIT
164+
mock_stats.gauge.assert_any_call("mwaa.celery.at_max_concurrency", expected_tasks)
165+
else:
166+
assert not any(call[0][0] == "mwaa.celery.at_max_concurrency"
167+
for call in mock_stats.gauge.call_args_list)
168+
169+
@pytest.mark.parametrize("num_tasks,is_paused,expect_max_concurrency,expect_consumption_paused", [
170+
(25, False, True, False), # Above limit, not paused
171+
(15, True, False, True), # Below limit, paused
172+
(20, False, True, False), # At limit, not paused
173+
(10, False, False, False), # Below limit, not paused
174+
])
175+
@patch('mwaa.celery.sqs_broker.Stats')
176+
def test_worker_heartbeat_conditional_metrics(self, mock_stats, mock_channel, num_tasks, is_paused,
177+
expect_max_concurrency, expect_consumption_paused):
178+
"""Test conditional metric emission logic."""
179+
message = {'test': 'message'}
180+
exchange = 'celeryev'
181+
routing_key = 'worker.heartbeat'
182+
183+
mock_channel._get_tasks_from_state = MagicMock(return_value=[{'task': 'test'}] * num_tasks)
184+
mock_channel._is_task_consumption_paused = MagicMock(return_value=is_paused)
185+
mock_channel.idle_worker_monitoring_enabled = True
186+
187+
with patch('mwaa.celery.sqs_broker.os.environ.get') as mock_env_get:
188+
mock_env_get.return_value = 'true'
189+
190+
mock_channel.basic_publish(message, exchange, routing_key)
191+
192+
mock_stats.gauge.assert_any_call("mwaa.celery.process.heartbeat", 1)
193+
194+
if expect_max_concurrency:
195+
mock_stats.gauge.assert_any_call("mwaa.celery.at_max_concurrency", num_tasks)
196+
else:
197+
assert not any(call[0][0] == "mwaa.celery.at_max_concurrency"
198+
for call in mock_stats.gauge.call_args_list)
199+
200+
if expect_consumption_paused:
201+
mock_stats.gauge.assert_any_call("mwaa.celery.sqs.consumption_paused", 1)
202+
else:
203+
assert not any(call[0][0] == "mwaa.celery.sqs.consumption_paused"
204+
for call in mock_stats.gauge.call_args_list)

tests/images/airflow/2.9.2/python/mwaa/celery/test_sqs_broker_2_9_2.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,3 +184,36 @@ def test_worker_heartbeat_conditional_metrics(self, mock_stats, mock_channel, nu
184184
else:
185185
assert not any(call[0][0] == "mwaa.celery.sqs.consumption_paused"
186186
for call in mock_stats.gauge.call_args_list)
187+
188+
def test_size_with_attributes(self, mock_channel):
189+
"""Test _size returns message count when Attributes exist."""
190+
queue = 'test-queue'
191+
mock_sqs = MagicMock()
192+
mock_sqs.get_queue_attributes.return_value = {
193+
'Attributes': {'ApproximateNumberOfMessages': '5'}
194+
}
195+
196+
with patch.object(mock_channel, '_new_queue', return_value='queue-url'), \
197+
patch.object(mock_channel, 'canonical_queue_name', return_value=queue), \
198+
patch.object(mock_channel, 'sqs', return_value=mock_sqs):
199+
result = mock_channel._size(queue)
200+
201+
assert result == 5
202+
mock_sqs.get_queue_attributes.assert_called_once_with(
203+
QueueUrl='queue-url', AttributeNames=['ApproximateNumberOfMessages']
204+
)
205+
206+
def test_size_without_attributes(self, mock_channel):
207+
"""Test _size raises exception when Attributes missing."""
208+
queue = 'test-queue'
209+
mock_sqs = MagicMock()
210+
mock_sqs.get_queue_attributes.return_value = {}
211+
212+
with patch.object(mock_channel, '_new_queue', return_value='queue-url'), \
213+
patch.object(mock_channel, 'canonical_queue_name', return_value=queue), \
214+
patch.object(mock_channel, 'sqs', return_value=mock_sqs), \
215+
patch('mwaa.celery.sqs_broker.logger') as mock_logger:
216+
with pytest.raises(KeyError):
217+
mock_channel._size(queue)
218+
219+
mock_logger.error.assert_called_once()

0 commit comments

Comments
 (0)