@@ -4,80 +4,48 @@ module Yabeda
44 module Rack
55 module Queue
66 class Middleware
7- HEADER_KEYS = %w[ HTTP_X_REQUEST_START HTTP_X_QUEUE_START ] . freeze
8- REQUEST_BODY_WAIT_KEY = "puma.request_body_wait"
9-
107 class StderrLogger
11- def warn ( message )
12- Kernel . warn ( message )
13- end
8+ def warn ( message ) = Kernel . warn ( message )
149 end
1510
1611 class YabedaReporter
17- def observe ( value )
18- Yabeda . rack_queue . rack_queue_duration . measure ( { } , value )
19- end
12+ def observe ( value ) = Yabeda . rack_queue . rack_queue_duration . measure ( { } , value )
2013 end
2114
22- def initialize ( app , reporter : YabedaReporter . new , parser : HeaderTimestampParser . new , logger : nil , clock : nil )
15+ def initialize ( app , reporter : YabedaReporter . new , logger : nil , clock : nil )
2316 @app = app
2417 @reporter = reporter
25- @parser = parser
18+ @parser = HeaderTimestampParser . new
2619 @logger = logger || StderrLogger . new
2720 @clock = clock || -> { Process . clock_gettime ( Process ::CLOCK_REALTIME ) }
2821 end
2922
3023 def call ( env )
31- x_request_start = env [ HEADER_KEYS [ 0 ] ]
32- x_queue_start = env [ HEADER_KEYS [ 1 ] ]
33-
34- if x_request_start || x_queue_start
35- now = @clock . call
36- request_start = request_start_timestamp ( x_request_start , x_queue_start , now )
37- report_queue_time ( env , now , request_start ) if request_start
38- end
39-
24+ measure_queue_time ( env ) if env [ "HTTP_X_REQUEST_START" ] || env [ "HTTP_X_QUEUE_START" ]
4025 @app . call ( env )
4126 end
4227
4328 private
4429
45- def request_start_timestamp ( x_request_start , x_queue_start , now )
46- parsed = parse_header_timestamp ( x_request_start , now )
47- return parsed if parsed
48-
49- parse_header_timestamp ( x_queue_start , now )
50- end
51-
52- def parse_header_timestamp ( value , now )
53- return nil if value . nil?
54-
55- @parser . parse ( value , now : now )
30+ def measure_queue_time ( env )
31+ now = @clock . call
32+ start = @parser . parse ( env [ "HTTP_X_REQUEST_START" ] , now : now ) ||
33+ @parser . parse ( env [ "HTTP_X_QUEUE_START" ] , now : now )
34+ report_queue_time ( env , now , start ) if start
5635 end
5736
5837 def report_queue_time ( env , now , request_start )
5938 queue_time = now - request_start
60- if queue_time . negative?
61- @logger . warn ( "Negative rack queue duration (#{ queue_time } ) observed; dropping measurement" )
62- return
63- end
64-
65- body_wait = parse_request_body_wait ( env [ REQUEST_BODY_WAIT_KEY ] )
66- queue_time -= body_wait if body_wait
67- queue_time = 0.0 if queue_time . negative?
39+ return @logger . warn ( "Negative rack queue duration (#{ queue_time } ); dropping" ) if queue_time . negative?
6840
69- @reporter . observe ( queue_time )
41+ body_wait = parse_body_wait ( env [ "puma.request_body_wait" ] )
42+ @reporter . observe ( [ queue_time - ( body_wait || 0 ) , 0.0 ] . max )
7043 end
7144
72- def parse_request_body_wait ( value )
73- return nil if value . nil?
74-
75- milliseconds = Float ( value )
76- return nil if milliseconds . negative?
77-
78- milliseconds / 1_000.0
45+ def parse_body_wait ( value )
46+ ms = Float ( value )
47+ ms / 1_000.0 unless ms . negative?
7948 rescue ArgumentError , TypeError
80- nil
8149 end
8250 end
8351 end
0 commit comments