112
112
try :
113
113
from gevent import get_hub as get_gevent_hub # type: ignore
114
114
from gevent .monkey import get_original , is_module_patched # type: ignore
115
+ from gevent .threadpool import ThreadPool # type: ignore
115
116
116
117
thread_sleep = get_original ("time" , "sleep" )
117
118
except ImportError :
@@ -127,6 +128,8 @@ def is_module_patched(*args, **kwargs):
127
128
# unable to import from gevent means no modules have been patched
128
129
return False
129
130
131
+ ThreadPool = None
132
+
130
133
131
134
def is_gevent ():
132
135
# type: () -> bool
@@ -177,10 +180,7 @@ def setup_profiler(options):
177
180
):
178
181
_scheduler = ThreadScheduler (frequency = frequency )
179
182
elif profiler_mode == GeventScheduler .mode :
180
- try :
181
- _scheduler = GeventScheduler (frequency = frequency )
182
- except ImportError :
183
- raise ValueError ("Profiler mode: {} is not available" .format (profiler_mode ))
183
+ _scheduler = GeventScheduler (frequency = frequency )
184
184
else :
185
185
raise ValueError ("Unknown profiler mode: {}" .format (profiler_mode ))
186
186
@@ -703,7 +703,8 @@ def __init__(self, frequency):
703
703
704
704
self .sampler = self .make_sampler ()
705
705
706
- self .new_profiles = deque () # type: Deque[Profile]
706
+ # cap the number of new profiles at any time so it does not grow infinitely
707
+ self .new_profiles = deque (maxlen = 128 ) # type: Deque[Profile]
707
708
self .active_profiles = set () # type: Set[Profile]
708
709
709
710
def __enter__ (self ):
@@ -723,8 +724,13 @@ def teardown(self):
723
724
# type: () -> None
724
725
raise NotImplementedError
725
726
727
+ def ensure_running (self ):
728
+ # type: () -> None
729
+ raise NotImplementedError
730
+
726
731
def start_profiling (self , profile ):
727
732
# type: (Profile) -> None
733
+ self .ensure_running ()
728
734
self .new_profiles .append (profile )
729
735
730
736
def stop_profiling (self , profile ):
@@ -827,21 +833,44 @@ def __init__(self, frequency):
827
833
828
834
# used to signal to the thread that it should stop
829
835
self .running = False
830
-
831
- # make sure the thread is a daemon here otherwise this
832
- # can keep the application running after other threads
833
- # have exited
834
- self .thread = threading .Thread (name = self .name , target = self .run , daemon = True )
836
+ self .thread = None # type: Optional[threading.Thread]
837
+ self .pid = None # type: Optional[int]
838
+ self .lock = threading .Lock ()
835
839
836
840
def setup (self ):
837
841
# type: () -> None
838
- self .running = True
839
- self .thread .start ()
842
+ pass
840
843
841
844
def teardown (self ):
842
845
# type: () -> None
843
- self .running = False
844
- self .thread .join ()
846
+ if self .running :
847
+ self .running = False
848
+ if self .thread is not None :
849
+ self .thread .join ()
850
+
851
+ def ensure_running (self ):
852
+ # type: () -> None
853
+ pid = os .getpid ()
854
+
855
+ # is running on the right process
856
+ if self .running and self .pid == pid :
857
+ return
858
+
859
+ with self .lock :
860
+ # another thread may have tried to acquire the lock
861
+ # at the same time so it may start another thread
862
+ # make sure to check again before proceeding
863
+ if self .running and self .pid == pid :
864
+ return
865
+
866
+ self .pid = pid
867
+ self .running = True
868
+
869
+ # make sure the thread is a daemon here otherwise this
870
+ # can keep the application running after other threads
871
+ # have exited
872
+ self .thread = threading .Thread (name = self .name , target = self .run , daemon = True )
873
+ self .thread .start ()
845
874
846
875
def run (self ):
847
876
# type: () -> None
@@ -882,28 +911,52 @@ class GeventScheduler(Scheduler):
882
911
def __init__ (self , frequency ):
883
912
# type: (int) -> None
884
913
885
- # This can throw an ImportError that must be caught if `gevent` is
886
- # not installed.
887
- from gevent .threadpool import ThreadPool # type: ignore
914
+ if ThreadPool is None :
915
+ raise ValueError ("Profiler mode: {} is not available" .format (self .mode ))
888
916
889
917
super (GeventScheduler , self ).__init__ (frequency = frequency )
890
918
891
919
# used to signal to the thread that it should stop
892
920
self .running = False
921
+ self .thread = None # type: Optional[ThreadPool]
922
+ self .pid = None # type: Optional[int]
893
923
894
- # Using gevent's ThreadPool allows us to bypass greenlets and spawn
895
- # native threads.
896
- self .pool = ThreadPool (1 )
924
+ # This intentionally uses the gevent patched threading.Lock.
925
+ # The lock will be required when first trying to start profiles
926
+ # as we need to spawn the profiler thread from the greenlets.
927
+ self .lock = threading .Lock ()
897
928
898
929
def setup (self ):
899
930
# type: () -> None
900
- self .running = True
901
- self .pool .spawn (self .run )
931
+ pass
902
932
903
933
def teardown (self ):
904
934
# type: () -> None
905
- self .running = False
906
- self .pool .join ()
935
+ if self .running :
936
+ self .running = False
937
+ if self .thread is not None :
938
+ self .thread .join ()
939
+
940
+ def ensure_running (self ):
941
+ # type: () -> None
942
+ pid = os .getpid ()
943
+
944
+ # is running on the right process
945
+ if self .running and self .pid == pid :
946
+ return
947
+
948
+ with self .lock :
949
+ # another thread may have tried to acquire the lock
950
+ # at the same time so it may start another thread
951
+ # make sure to check again before proceeding
952
+ if self .running and self .pid == pid :
953
+ return
954
+
955
+ self .pid = pid
956
+ self .running = True
957
+
958
+ self .thread = ThreadPool (1 )
959
+ self .thread .spawn (self .run )
907
960
908
961
def run (self ):
909
962
# type: () -> None
0 commit comments