Skip to content

Commit 2505ca7

Browse files
committed
Improved usage of updated body interface.
1 parent 2aca920 commit 2505ca7

File tree

12 files changed

+62
-35
lines changed

12 files changed

+62
-35
lines changed

fixtures/async/http/a_protocol.rb

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -395,9 +395,9 @@ module HTTP
395395
let(:app) do
396396
::Protocol::HTTP::Middleware.for do |request|
397397
Async::HTTP::Body::Hijack.response(request, 200, {}) do |stream|
398-
stream.write content
399-
stream.write content
400-
stream.close
398+
stream.write(content)
399+
stream.write(content)
400+
stream.close_write
401401
end
402402
end
403403
end

gems.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
# gem "traces", path: "../traces"
1616
# gem "sus-fixtures-async-http", path: "../sus-fixtures-async-http"
1717

18-
# gem "protocol-http", path: "../protocol-http"
18+
gem "protocol-http", path: "../protocol-http"
1919
# gem "protocol-http1", path: "../protocol-http1"
2020
# gem "protocol-http2", path: "../protocol-http2"
2121
# gem "protocol-hpack", path: "../protocol-hpack"

lib/async/http/body/hijack.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ def stream?
3636
end
3737

3838
def call(stream)
39-
return @block.call(stream)
39+
@block.call(stream)
4040
end
4141

4242
attr :input

lib/async/http/body/pipe.rb

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ def initialize(input, output = Writable.new, task: Task.current)
1717

1818
head, tail = ::Socket.pair(Socket::AF_UNIX, Socket::SOCK_STREAM)
1919

20-
@head = ::IO::Stream::Buffered.new(head)
20+
@head = ::IO::Stream(head)
2121
@tail = tail
2222

2323
@reader = nil
@@ -52,8 +52,10 @@ def reader(task)
5252
end
5353

5454
@head.close_write
55+
rescue => error
56+
raise
5557
ensure
56-
@input.close($!)
58+
@input.close(error)
5759

5860
close_head if @writer&.finished?
5961
end
@@ -68,8 +70,10 @@ def writer(task)
6870
while chunk = @head.read_partial
6971
@output.write(chunk)
7072
end
73+
rescue => error
74+
raise
7175
ensure
72-
@output.close($!)
76+
@output.close_write(error)
7377

7478
close_head if @reader&.finished?
7579
end

lib/async/http/protocol/http1/server.rb

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,19 +68,31 @@ def each(task: Task.current)
6868
stream = write_upgrade_body(protocol)
6969

7070
# At this point, the request body is hijacked, so we don't want to call #finish below.
71-
request = nil unless request.body
71+
request = nil
7272
response = nil
7373

7474
# We must return here as no further request processing can be done:
7575
return body.call(stream)
76+
elsif response.status == 101
77+
# This code path is to support legacy behavior where the response status is set to 101, but the protocol is not upgraded. This may not be a valid use case, but it is supported for compatibility. We expect the response headers to contain the `upgrade` header.
78+
write_response(@version, response.status, response.headers)
79+
80+
stream = write_tunnel_body(request.version)
81+
82+
# Same as above:
83+
request = nil
84+
response = nil
85+
86+
# We must return here as no further request processing can be done:
87+
return body&.call(stream)
7688
else
7789
write_response(@version, response.status, response.headers)
7890

7991
if request.connect? and response.success?
8092
stream = write_tunnel_body(request.version)
8193

8294
# Same as above:
83-
request = nil unless request.body
95+
request = nil
8496
response = nil
8597

8698
# We must return here as no further request processing can be done:

lib/async/http/protocol/http2/input.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,14 @@
33
# Released under the MIT License.
44
# Copyright, 2020-2023, by Samuel Williams.
55

6-
require_relative '../../body/writable'
6+
require 'protocol/http/body/writable'
77

88
module Async
99
module HTTP
1010
module Protocol
1111
module HTTP2
1212
# A writable body which requests window updates when data is read from it.
13-
class Input < Body::Writable
13+
class Input < ::Protocol::HTTP::Body::Writable
1414
def initialize(stream, length)
1515
super(length)
1616

lib/async/http/protocol/http2/output.rb

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -50,18 +50,26 @@ def write(chunk)
5050
end
5151
end
5252

53+
def close_write(error = nil)
54+
close(error)
55+
end
56+
5357
# This method should only be called from within the context of the output task.
5458
def close(error = nil)
55-
if @stream
56-
@stream.finish_output(error)
59+
stop(error)
60+
61+
if stream = @stream
5762
@stream = nil
63+
stream.finish_output(error)
5864
end
5965
end
6066

6167
# This method should only be called from within the context of the HTTP/2 stream.
6268
def stop(error)
63-
@task&.stop
64-
@task = nil
69+
if task = @task
70+
@task = nil
71+
task.stop(error)
72+
end
6573
end
6674

6775
private
@@ -70,10 +78,11 @@ def stream(task)
7078
task.annotate("Streaming #{@body} to #{@stream}.")
7179

7280
input = @stream.wait_for_input
81+
stream = ::Protocol::HTTP::Body::Stream.new(input, self)
7382

74-
@body.call(::Protocol::HTTP::Body::Stream.new(input, self))
75-
rescue Async::Stop
76-
# Ignore.
83+
@body.call(stream)
84+
rescue => error
85+
self.close(error)
7786
end
7887

7988
# Reads chunks from the given body and writes them to the stream as fast as possible.
@@ -86,11 +95,15 @@ def passthrough(task)
8695
# chunk.clear unless chunk.frozen?
8796
# GC.start
8897
end
89-
90-
self.close
98+
rescue => error
99+
raise
91100
ensure
92-
@body&.close($!)
93-
@body = nil
101+
if @body
102+
@body.close(error)
103+
@body = nil
104+
end
105+
106+
self.close(error)
94107
end
95108

96109
# Send `maximum_size` bytes of data using the specified `stream`. If the buffer has no more chunks, `END_STREAM` will be sent on the final chunk.

lib/async/http/protocol/http2/stream.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ def process_data(frame)
9898
end
9999

100100
if frame.end_stream?
101-
@input.close
101+
@input.close_write
102102
@input = nil
103103
end
104104
end

test/async/http/body.rb

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
output.write(chunk.reverse)
2424
end
2525

26-
output.close
26+
output.close_write
2727
end
2828

2929
Protocol::HTTP::Response[200, [], output]
@@ -35,7 +35,7 @@
3535

3636
reactor.async do |task|
3737
output.write("Hello World!")
38-
output.close
38+
output.close_write
3939
end
4040

4141
response = client.post("/", {}, output)
@@ -58,7 +58,7 @@
5858
notification.wait
5959
end
6060

61-
body.close
61+
body.close_write
6262
end
6363

6464
Protocol::HTTP::Response[200, {}, body]

test/async/http/body/hijack.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
3.times do
1616
stream.write(content)
1717
end
18-
stream.close
18+
stream.close_write
1919
end
2020
end
2121

test/async/http/body/pipe.rb

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
include Sus::Fixtures::Async::ReactorContext
2121

2222
let(:input_write_duration) {0}
23-
let(:io) { pipe.to_io }
23+
let(:io) {pipe.to_io}
2424

2525
def before
2626
super
@@ -31,14 +31,12 @@ def before
3131
input.write("#{first} ")
3232
sleep(input_write_duration) if input_write_duration > 0
3333
input.write(second)
34-
input.close
34+
input.close_write
3535
end
3636
end
3737

38-
def aftrer
38+
after do
3939
io.close
40-
41-
super
4240
end
4341

4442
it "returns an io socket" do

test/async/http/proxy.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@
5757
expect(response).to be(:success?)
5858

5959
input.write(data)
60-
input.close
60+
input.close_write
6161

6262
expect(response.read).to be == data
6363
end
@@ -74,7 +74,7 @@
7474
stream.flush
7575
end
7676

77-
stream.close
77+
stream.close_write
7878
end
7979
end
8080
end

0 commit comments

Comments
 (0)