@@ -703,7 +703,8 @@ def __init__(self, frequency):
703
703
704
704
self .sampler = self .make_sampler ()
705
705
706
- self .new_profiles = deque () # type: Deque[Profile]
706
+ # cap the number of new profiles at any time so it does not grow infinitely
707
+ self .new_profiles = deque (maxlen = 128 ) # type: Deque[Profile]
707
708
self .active_profiles = set () # type: Set[Profile]
708
709
709
710
def __enter__ (self ):
@@ -723,8 +724,13 @@ def teardown(self):
723
724
# type: () -> None
724
725
raise NotImplementedError
725
726
727
+ def ensure_running (self ):
728
+ # type: () -> None
729
+ raise NotImplementedError
730
+
726
731
def start_profiling (self , profile ):
727
732
# type: (Profile) -> None
733
+ self .ensure_running ()
728
734
self .new_profiles .append (profile )
729
735
730
736
def stop_profiling (self , profile ):
@@ -827,21 +833,44 @@ def __init__(self, frequency):
827
833
828
834
# used to signal to the thread that it should stop
829
835
self .running = False
830
-
831
- # make sure the thread is a daemon here otherwise this
832
- # can keep the application running after other threads
833
- # have exited
834
- self .thread = threading .Thread (name = self .name , target = self .run , daemon = True )
836
+ self .thread = None # type: Optional[threading.Thread]
837
+ self .pid = None # type: Optional[int]
838
+ self .lock = threading .Lock ()
835
839
836
840
def setup (self ):
837
841
# type: () -> None
838
- self .running = True
839
- self .thread .start ()
842
+ pass
840
843
841
844
def teardown (self ):
842
845
# type: () -> None
843
- self .running = False
844
- self .thread .join ()
846
+ if self .running :
847
+ self .running = False
848
+ if self .thread is not None :
849
+ self .thread .join ()
850
+
851
+ def ensure_running (self ):
852
+ # type: () -> None
853
+ pid = os .getpid ()
854
+
855
+ # is running on the right process
856
+ if self .running and self .pid == pid :
857
+ return
858
+
859
+ with self .lock :
860
+ # another thread may have tried to acquire the lock
861
+ # at the same time so it may start another thread
862
+ # make sure to check again before proceeding
863
+ if self .running and self .pid == pid :
864
+ return
865
+
866
+ self .pid = pid
867
+ self .running = True
868
+
869
+ # make sure the thread is a daemon here otherwise this
870
+ # can keep the application running after other threads
871
+ # have exited
872
+ self .thread = threading .Thread (name = self .name , target = self .run , daemon = True )
873
+ self .thread .start ()
845
874
846
875
def run (self ):
847
876
# type: () -> None
@@ -888,22 +917,49 @@ def __init__(self, frequency):
888
917
889
918
super (GeventScheduler , self ).__init__ (frequency = frequency )
890
919
920
+ self .make_thread = lambda : ThreadPool (1 )
921
+
891
922
# used to signal to the thread that it should stop
892
923
self .running = False
924
+ self .thread = None # type: Optional[ThreadPool]
925
+ self .pid = None # type: Optional[int]
893
926
894
- # Using gevent's ThreadPool allows us to bypass greenlets and spawn
895
- # native threads.
896
- self .pool = ThreadPool (1 )
927
+ # This intentionally uses the gevent patched threading.Lock.
928
+ # The lock will be required when first trying to start profiles
929
+ # as we need to spawn the profiler thread from the greenlets.
930
+ self .lock = threading .Lock ()
897
931
898
932
def setup (self ):
899
933
# type: () -> None
900
- self .running = True
901
- self .pool .spawn (self .run )
934
+ pass
902
935
903
936
def teardown (self ):
904
937
# type: () -> None
905
- self .running = False
906
- self .pool .join ()
938
+ if self .running :
939
+ self .running = False
940
+ if self .thread is not None :
941
+ self .thread .join ()
942
+
943
+ def ensure_running (self ):
944
+ # type: () -> None
945
+ pid = os .getpid ()
946
+
947
+ # is running on the right process
948
+ if self .running and self .pid == pid :
949
+ return
950
+
951
+ with self .lock :
952
+ # another thread may have tried to acquire the lock
953
+ # at the same time so it may start another thread
954
+ # make sure to check again before proceeding
955
+ if self .running and self .pid == pid :
956
+ return
957
+
958
+ self .pid = pid
959
+ self .running = True
960
+
961
+ self .thread = self .make_thread ()
962
+ self .thread .spawn (self .run )
907
963
908
964
def run (self ):
909
965
# type: () -> None
0 commit comments