1
1
from __future__ import absolute_import
2
2
3
3
import sys
4
- import shutil
5
- import functools
6
- import tempfile
7
4
8
5
from sentry_sdk .consts import OP
9
6
from sentry_sdk ._compat import reraise
25
22
from typing import Any
26
23
from typing import Callable
27
24
from typing import Dict
28
- from typing import List
29
25
from typing import Optional
30
26
from typing import Tuple
31
27
from typing import TypeVar
40
36
from celery import VERSION as CELERY_VERSION
41
37
from celery import Task , Celery
42
38
from celery .app .trace import task_has_custom
43
- from celery .beat import Service # type: ignore
39
+ from celery .beat import Scheduler # type: ignore
44
40
from celery .exceptions import ( # type: ignore
45
41
Ignore ,
46
42
Reject ,
49
45
)
50
46
from celery .schedules import crontab , schedule # type: ignore
51
47
from celery .signals import ( # type: ignore
52
- beat_init ,
53
- task_prerun ,
54
48
task_failure ,
55
49
task_success ,
56
50
task_retry ,
@@ -68,9 +62,11 @@ class CeleryIntegration(Integration):
68
62
def __init__ (self , propagate_traces = True , monitor_beat_tasks = False ):
69
63
# type: (bool, bool) -> None
70
64
self .propagate_traces = propagate_traces
65
+ self .monitor_beat_tasks = monitor_beat_tasks
71
66
72
67
if monitor_beat_tasks :
73
- _patch_celery_beat_tasks ()
68
+ _patch_beat_apply_entry ()
69
+ _setup_celery_beat_signals ()
74
70
75
71
@staticmethod
76
72
def setup_once ():
@@ -131,6 +127,12 @@ def apply_async(*args, **kwargs):
131
127
) as span :
132
128
with capture_internal_exceptions ():
133
129
headers = dict (hub .iter_trace_propagation_headers (span ))
130
+ if integration .monitor_beat_tasks :
131
+ headers .update (
132
+ {
133
+ "sentry-monitor-start-timestamp-s" : "%.9f" % now (),
134
+ }
135
+ )
134
136
135
137
if headers :
136
138
# Note: kwargs can contain headers=None, so no setdefault!
@@ -320,12 +322,15 @@ def sentry_workloop(*args, **kwargs):
320
322
321
323
def _get_headers (task ):
322
324
# type: (Task) -> Dict[str, Any]
323
- headers = task .request .get ("headers" ) or {}
325
+ headers = task .request .get ("headers" , {})
324
326
327
+ # flatten nested headers
325
328
if "headers" in headers :
326
329
headers .update (headers ["headers" ])
327
330
del headers ["headers" ]
328
331
332
+ headers .update (task .request .get ("properties" , {}))
333
+
329
334
return headers
330
335
331
336
@@ -387,123 +392,47 @@ def _get_monitor_config(celery_schedule, app):
387
392
return monitor_config
388
393
389
394
390
- def _reinstall_patched_tasks (app , sender , add_updated_periodic_tasks ):
391
- # type: (Celery, Service, List[functools.partial[Any]]) -> None
392
-
393
- # Stop Celery Beat
394
- sender .stop ()
395
-
396
- # Update tasks to include Monitor information in headers
397
- for add_updated_periodic_task in add_updated_periodic_tasks :
398
- add_updated_periodic_task ()
399
-
400
- # Start Celery Beat (with new (cloned) schedule, because old one is still in use)
401
- cloned_schedule = tempfile .NamedTemporaryFile (suffix = "-patched-by-sentry-sdk" )
402
- with open (sender .schedule_filename , "rb" ) as original_schedule :
403
- shutil .copyfileobj (original_schedule , cloned_schedule )
395
+ def _patch_beat_apply_entry ():
396
+ # type: () -> None
397
+ original_apply_entry = Scheduler .apply_entry
398
+
399
+ def sentry_apply_entry (* args , ** kwargs ):
400
+ # type: (*Any, **Any) -> None
401
+ scheduler , schedule_entry = args
402
+ app = scheduler .app
403
+
404
+ celery_schedule = schedule_entry .schedule
405
+ monitor_config = _get_monitor_config (celery_schedule , app )
406
+ monitor_name = schedule_entry .name
407
+
408
+ headers = schedule_entry .options .pop ("headers" , {})
409
+ headers .update (
410
+ {
411
+ "sentry-monitor-slug" : monitor_name ,
412
+ "sentry-monitor-config" : monitor_config ,
413
+ }
414
+ )
404
415
405
- app .Beat (schedule = cloned_schedule .name ).run ()
416
+ check_in_id = capture_checkin (
417
+ monitor_slug = monitor_name ,
418
+ monitor_config = monitor_config ,
419
+ status = MonitorStatus .IN_PROGRESS ,
420
+ )
421
+ headers .update ({"sentry-monitor-check-in-id" : check_in_id })
406
422
423
+ schedule_entry .options .update (headers )
424
+ return original_apply_entry (* args , ** kwargs )
407
425
408
- # Nested functions do not work as Celery hook receiver,
409
- # so defining it here explicitly
410
- celery_beat_init = None
426
+ Scheduler .apply_entry = sentry_apply_entry
411
427
412
428
413
- def _patch_celery_beat_tasks ():
429
+ def _setup_celery_beat_signals ():
414
430
# type: () -> None
415
-
416
- global celery_beat_init
417
-
418
- def celery_beat_init (sender , ** kwargs ):
419
- # type: (Service, Dict[Any, Any]) -> None
420
-
421
- # Because we restart Celery Beat,
422
- # make sure that this will not be called infinitely
423
- beat_init .disconnect (celery_beat_init )
424
-
425
- app = sender .app
426
-
427
- add_updated_periodic_tasks = []
428
-
429
- for name in sender .scheduler .schedule .keys ():
430
- # Ignore Celery's internal tasks
431
- if name .startswith ("celery." ):
432
- continue
433
-
434
- monitor_name = name
435
-
436
- schedule_entry = sender .scheduler .schedule [name ]
437
- celery_schedule = schedule_entry .schedule
438
- monitor_config = _get_monitor_config (celery_schedule , app )
439
-
440
- if monitor_config is None :
441
- continue
442
-
443
- headers = schedule_entry .options .pop ("headers" , {})
444
- headers .update (
445
- {
446
- "headers" : {
447
- "sentry-monitor-slug" : monitor_name ,
448
- "sentry-monitor-config" : monitor_config ,
449
- },
450
- }
451
- )
452
-
453
- task_signature = app .tasks .get (schedule_entry .task ).s ()
454
- task_signature .set (headers = headers )
455
-
456
- logger .debug (
457
- "Set up Sentry Celery Beat monitoring for %s (%s)" ,
458
- task_signature ,
459
- monitor_name ,
460
- )
461
-
462
- add_updated_periodic_tasks .append (
463
- functools .partial (
464
- app .add_periodic_task ,
465
- celery_schedule ,
466
- task_signature ,
467
- args = schedule_entry .args ,
468
- kwargs = schedule_entry .kwargs ,
469
- name = schedule_entry .name ,
470
- ** (schedule_entry .options or {})
471
- )
472
- )
473
-
474
- _reinstall_patched_tasks (app , sender , add_updated_periodic_tasks )
475
-
476
- beat_init .connect (celery_beat_init )
477
- task_prerun .connect (crons_task_before_run )
478
431
task_success .connect (crons_task_success )
479
432
task_failure .connect (crons_task_failure )
480
433
task_retry .connect (crons_task_retry )
481
434
482
435
483
- def crons_task_before_run (sender , ** kwargs ):
484
- # type: (Task, Dict[Any, Any]) -> None
485
- logger .debug ("celery_task_before_run %s" , sender )
486
- headers = _get_headers (sender )
487
-
488
- if "sentry-monitor-slug" not in headers :
489
- return
490
-
491
- monitor_config = headers .get ("sentry-monitor-config" , {})
492
-
493
- start_timestamp_s = now ()
494
-
495
- check_in_id = capture_checkin (
496
- monitor_slug = headers ["sentry-monitor-slug" ],
497
- monitor_config = monitor_config ,
498
- status = MonitorStatus .IN_PROGRESS ,
499
- )
500
-
501
- headers .update ({"sentry-monitor-check-in-id" : check_in_id })
502
- headers .update ({"sentry-monitor-start-timestamp-s" : start_timestamp_s })
503
-
504
- sender .s ().set (headers = headers )
505
-
506
-
507
436
def crons_task_success (sender , ** kwargs ):
508
437
# type: (Task, Dict[Any, Any]) -> None
509
438
logger .debug ("celery_task_success %s" , sender )
@@ -514,7 +443,7 @@ def crons_task_success(sender, **kwargs):
514
443
515
444
monitor_config = headers .get ("sentry-monitor-config" , {})
516
445
517
- start_timestamp_s = headers ["sentry-monitor-start-timestamp-s" ]
446
+ start_timestamp_s = float ( headers ["sentry-monitor-start-timestamp-s" ])
518
447
519
448
capture_checkin (
520
449
monitor_slug = headers ["sentry-monitor-slug" ],
@@ -535,7 +464,7 @@ def crons_task_failure(sender, **kwargs):
535
464
536
465
monitor_config = headers .get ("sentry-monitor-config" , {})
537
466
538
- start_timestamp_s = headers ["sentry-monitor-start-timestamp-s" ]
467
+ start_timestamp_s = float ( headers ["sentry-monitor-start-timestamp-s" ])
539
468
540
469
capture_checkin (
541
470
monitor_slug = headers ["sentry-monitor-slug" ],
@@ -556,7 +485,7 @@ def crons_task_retry(sender, **kwargs):
556
485
557
486
monitor_config = headers .get ("sentry-monitor-config" , {})
558
487
559
- start_timestamp_s = headers ["sentry-monitor-start-timestamp-s" ]
488
+ start_timestamp_s = float ( headers ["sentry-monitor-start-timestamp-s" ])
560
489
561
490
capture_checkin (
562
491
monitor_slug = headers ["sentry-monitor-slug" ],
0 commit comments