@@ -2367,6 +2367,26 @@ class CustomListener(logging.handlers.QueueListener):
2367
2367
class CustomQueue (queue .Queue ):
2368
2368
pass
2369
2369
2370
+ class CustomQueueProtocol :
2371
+ def __init__ (self , maxsize = 0 ):
2372
+ self .queue = queue .Queue (maxsize )
2373
+
2374
+ def __getattr__ (self , attribute ):
2375
+ queue = object .__getattribute__ (self , 'queue' )
2376
+ return getattr (queue , attribute )
2377
+
2378
+ class CustomQueueFakeProtocol (CustomQueueProtocol ):
2379
+ # An object implementing the Queue API (incorrect signatures).
2380
+ # The object will be considered a valid queue class since we
2381
+ # do not check the signatures (only callability of methods)
2382
+ # but will NOT be usable in production since a TypeError will
2383
+ # be raised due to a missing argument.
2384
+ def empty (self , x ):
2385
+ pass
2386
+
2387
+ class CustomQueueWrongProtocol (CustomQueueProtocol ):
2388
+ empty = None
2389
+
2370
2390
def queueMaker ():
2371
2391
return queue .Queue ()
2372
2392
@@ -3900,18 +3920,16 @@ def do_queuehandler_configuration(self, qspec, lspec):
3900
3920
@threading_helper .requires_working_threading ()
3901
3921
@support .requires_subprocess ()
3902
3922
def test_config_queue_handler (self ):
3903
- q = CustomQueue ()
3904
- dq = {
3905
- '()' : __name__ + '.CustomQueue' ,
3906
- 'maxsize' : 10
3907
- }
3923
+ qs = [CustomQueue (), CustomQueueProtocol ()]
3924
+ dqs = [{'()' : f'{ __name__ } .{ cls } ' , 'maxsize' : 10 }
3925
+ for cls in ['CustomQueue' , 'CustomQueueProtocol' ]]
3908
3926
dl = {
3909
3927
'()' : __name__ + '.listenerMaker' ,
3910
3928
'arg1' : None ,
3911
3929
'arg2' : None ,
3912
3930
'respect_handler_level' : True
3913
3931
}
3914
- qvalues = (None , __name__ + '.queueMaker' , __name__ + '.CustomQueue' , dq , q )
3932
+ qvalues = (None , __name__ + '.queueMaker' , __name__ + '.CustomQueue' , * dqs , * qs )
3915
3933
lvalues = (None , __name__ + '.CustomListener' , dl , CustomListener )
3916
3934
for qspec , lspec in itertools .product (qvalues , lvalues ):
3917
3935
self .do_queuehandler_configuration (qspec , lspec )
@@ -3931,15 +3949,21 @@ def test_config_queue_handler(self):
3931
3949
@support .requires_subprocess ()
3932
3950
@patch ("multiprocessing.Manager" )
3933
3951
def test_config_queue_handler_does_not_create_multiprocessing_manager (self , manager ):
3934
- # gh-120868
3952
+ # gh-120868, gh-121723
3935
3953
3936
3954
from multiprocessing import Queue as MQ
3937
3955
3938
3956
q1 = {"()" : "queue.Queue" , "maxsize" : - 1 }
3939
3957
q2 = MQ ()
3940
3958
q3 = queue .Queue ()
3941
-
3942
- for qspec in (q1 , q2 , q3 ):
3959
+ # CustomQueueFakeProtocol passes the checks but will not be usable
3960
+ # since the signatures are incompatible. Checking the Queue API
3961
+ # without testing the type of the actual queue is a trade-off
3962
+ # between usability and the work we need to do in order to safely
3963
+ # check that the queue object correctly implements the API.
3964
+ q4 = CustomQueueFakeProtocol ()
3965
+
3966
+ for qspec in (q1 , q2 , q3 , q4 ):
3943
3967
self .apply_config (
3944
3968
{
3945
3969
"version" : 1 ,
@@ -3955,21 +3979,62 @@ def test_config_queue_handler_does_not_create_multiprocessing_manager(self, mana
3955
3979
3956
3980
@patch ("multiprocessing.Manager" )
3957
3981
def test_config_queue_handler_invalid_config_does_not_create_multiprocessing_manager (self , manager ):
3958
- # gh-120868
3982
+ # gh-120868, gh-121723
3959
3983
3960
- with self .assertRaises (ValueError ):
3961
- self .apply_config (
3962
- {
3963
- "version" : 1 ,
3964
- "handlers" : {
3965
- "queue_listener" : {
3966
- "class" : "logging.handlers.QueueHandler" ,
3967
- "queue" : object (),
3984
+ for qspec in [object (), CustomQueueWrongProtocol ()]:
3985
+ with self .assertRaises (ValueError ):
3986
+ self .apply_config (
3987
+ {
3988
+ "version" : 1 ,
3989
+ "handlers" : {
3990
+ "queue_listener" : {
3991
+ "class" : "logging.handlers.QueueHandler" ,
3992
+ "queue" : qspec ,
3993
+ },
3968
3994
},
3969
- },
3995
+ }
3996
+ )
3997
+ manager .assert_not_called ()
3998
+
3999
+ @skip_if_tsan_fork
4000
+ @support .requires_subprocess ()
4001
+ @unittest .skipUnless (support .Py_DEBUG , "requires a debug build for testing"
4002
+ "assertions in multiprocessing" )
4003
+ def test_config_queue_handler_multiprocessing_context (self ):
4004
+ # regression test for gh-121723
4005
+ if support .MS_WINDOWS :
4006
+ start_methods = ['spawn' ]
4007
+ else :
4008
+ start_methods = ['spawn' , 'fork' , 'forkserver' ]
4009
+ for start_method in start_methods :
4010
+ with self .subTest (start_method = start_method ):
4011
+ ctx = multiprocessing .get_context (start_method )
4012
+ with ctx .Manager () as manager :
4013
+ q = manager .Queue ()
4014
+ records = []
4015
+ # use 1 process and 1 task per child to put 1 record
4016
+ with ctx .Pool (1 , initializer = self ._mpinit_issue121723 ,
4017
+ initargs = (q , "text" ), maxtasksperchild = 1 ):
4018
+ records .append (q .get (timeout = 60 ))
4019
+ self .assertTrue (q .empty ())
4020
+ self .assertEqual (len (records ), 1 )
4021
+
4022
+ @staticmethod
4023
+ def _mpinit_issue121723 (qspec , message_to_log ):
4024
+ # static method for pickling support
4025
+ logging .config .dictConfig ({
4026
+ 'version' : 1 ,
4027
+ 'disable_existing_loggers' : True ,
4028
+ 'handlers' : {
4029
+ 'log_to_parent' : {
4030
+ 'class' : 'logging.handlers.QueueHandler' ,
4031
+ 'queue' : qspec
3970
4032
}
3971
- )
3972
- manager .assert_not_called ()
4033
+ },
4034
+ 'root' : {'handlers' : ['log_to_parent' ], 'level' : 'DEBUG' }
4035
+ })
4036
+ # log a message (this creates a record put in the queue)
4037
+ logging .getLogger ().info (message_to_log )
3973
4038
3974
4039
@skip_if_tsan_fork
3975
4040
@support .requires_subprocess ()
0 commit comments