Skip to content

Commit 0adf557

Browse files
authored
Revert "Tidy up body code. (#181)"
This reverts commit d5f0b31.
1 parent d5f0b31 commit 0adf557

File tree

18 files changed

+286
-90
lines changed

18 files changed

+286
-90
lines changed

examples/upload/client.rb

Lines changed: 2 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -9,27 +9,10 @@
99

1010
require 'async'
1111
require 'protocol/http/body/file'
12+
require 'async/http/body/delayed'
1213
require 'async/http/client'
1314
require 'async/http/endpoint'
1415

15-
class Delayed < ::Protocol::HTTP::Body::Wrapper
16-
def initialize(body, delay = 0.01)
17-
super(body)
18-
19-
@delay = delay
20-
end
21-
22-
def ready?
23-
false
24-
end
25-
26-
def read
27-
sleep(@delay)
28-
29-
return super
30-
end
31-
end
32-
3316
Async do
3417
endpoint = Async::HTTP::Endpoint.parse("http://localhost:9222")
3518
client = Async::HTTP::Client.new(endpoint, protocol: Async::HTTP::Protocol::HTTP2)
@@ -38,7 +21,7 @@ def read
3821
['accept', 'text/plain'],
3922
]
4023

41-
body = Delayed.new(Protocol::HTTP::Body::File.open(File.join(__dir__, "data.txt"), block_size: 32))
24+
body = Async::HTTP::Body::Delayed.new(Protocol::HTTP::Body::File.open(File.join(__dir__, "data.txt"), block_size: 32))
4225

4326
response = client.post(endpoint.path, headers, body)
4427

fixtures/async/http/a_protocol.rb

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ module HTTP
162162
request_received.wait
163163
headers.add('etag', 'abcd')
164164

165-
body.close_write
165+
body.close
166166
end
167167

168168
response = client.post("/", headers, body)
@@ -187,7 +187,7 @@ module HTTP
187187
response_received.wait
188188
headers.add('etag', 'abcd')
189189

190-
body.close_write
190+
body.close
191191
end
192192

193193
::Protocol::HTTP::Response[200, headers, body]
@@ -395,9 +395,9 @@ module HTTP
395395
let(:app) do
396396
::Protocol::HTTP::Middleware.for do |request|
397397
Async::HTTP::Body::Hijack.response(request, 200, {}) do |stream|
398-
stream.write(content)
399-
stream.write(content)
400-
stream.close_write
398+
stream.write content
399+
stream.write content
400+
stream.close
401401
end
402402
end
403403
end
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
# frozen_string_literal: true
2+
3+
# Released under the MIT License.
4+
# Copyright, 2019-2023, by Samuel Williams.
5+
6+
require 'protocol/http/body/deflate'
7+
8+
module Async
9+
module HTTP
10+
module Body
11+
AWritableBody = Sus::Shared("a writable body") do
12+
it "can write and read data" do
13+
3.times do |i|
14+
body.write("Hello World #{i}")
15+
expect(body.read).to be == "Hello World #{i}"
16+
end
17+
end
18+
19+
it "can buffer data in order" do
20+
3.times do |i|
21+
body.write("Hello World #{i}")
22+
end
23+
24+
3.times do |i|
25+
expect(body.read).to be == "Hello World #{i}"
26+
end
27+
end
28+
29+
with '#join' do
30+
it "can join chunks" do
31+
3.times do |i|
32+
body.write("#{i}")
33+
end
34+
35+
body.close
36+
37+
expect(body.join).to be == "012"
38+
end
39+
end
40+
41+
with '#each' do
42+
it "can read all data in order" do
43+
3.times do |i|
44+
body.write("Hello World #{i}")
45+
end
46+
47+
body.close
48+
49+
3.times do |i|
50+
chunk = body.read
51+
expect(chunk).to be == "Hello World #{i}"
52+
end
53+
end
54+
55+
it "can propagate failures" do
56+
reactor.async do
57+
expect do
58+
body.each do |chunk|
59+
raise RuntimeError.new("It was too big!")
60+
end
61+
end.to raise_exception(RuntimeError, message: be =~ /big/)
62+
end
63+
64+
expect{
65+
body.write("Beep boop") # This will cause a failure.
66+
::Async::Task.current.yield
67+
body.write("Beep boop") # This will fail.
68+
}.to raise_exception(RuntimeError, message: be =~ /big/)
69+
end
70+
71+
it "can propagate failures in nested bodies" do
72+
nested = ::Protocol::HTTP::Body::Deflate.for(body)
73+
74+
reactor.async do
75+
expect do
76+
nested.each do |chunk|
77+
raise RuntimeError.new("It was too big!")
78+
end
79+
end.to raise_exception(RuntimeError, message: be =~ /big/)
80+
end
81+
82+
expect{
83+
body.write("Beep boop") # This will cause a failure.
84+
::Async::Task.current.yield
85+
body.write("Beep boop") # This will fail.
86+
}.to raise_exception(RuntimeError, message: be =~ /big/)
87+
end
88+
89+
it "can consume chunks" do
90+
body.write("Hello World!")
91+
body.close
92+
93+
expect(body).not.to be(:empty?)
94+
95+
body.each do |chunk|
96+
expect(chunk).to be == "Hello World!"
97+
end
98+
99+
expect(body).to be(:empty?)
100+
end
101+
end
102+
end
103+
end
104+
end
105+
end

gems.rb

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@
2020
# gem "protocol-http2", path: "../protocol-http2"
2121
# gem "protocol-hpack", path: "../protocol-hpack"
2222

23-
gem "protocol-http", git: "https://github.com/socketry/protocol-http.git"
24-
2523
group :maintenance, optional: true do
2624
gem "bake-modernize"
2725
gem "bake-gem"

lib/async/http/body/delayed.rb

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# frozen_string_literal: true
2+
3+
# Released under the MIT License.
4+
# Copyright, 2018-2023, by Samuel Williams.
5+
# Copyright, 2020, by Bruno Sutic.
6+
# Copyright, 2023, by Thomas Morgan.
7+
8+
require 'protocol/http/body/wrapper'
9+
10+
module Async
11+
module HTTP
12+
module Body
13+
class Delayed < ::Protocol::HTTP::Body::Wrapper
14+
def initialize(body, delay = 0.01)
15+
super(body)
16+
17+
@delay = delay
18+
end
19+
20+
def ready?
21+
false
22+
end
23+
24+
def read
25+
Async::Task.current.sleep(@delay)
26+
27+
return super
28+
end
29+
end
30+
end
31+
end
32+
end

lib/async/http/body/hijack.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ def stream?
3636
end
3737

3838
def call(stream)
39-
@block.call(stream)
39+
return @block.call(stream)
4040
end
4141

4242
attr :input

lib/async/http/body/pipe.rb

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ def initialize(input, output = Writable.new, task: Task.current)
1717

1818
head, tail = ::Socket.pair(Socket::AF_UNIX, Socket::SOCK_STREAM)
1919

20-
@head = ::IO::Stream(head)
20+
@head = ::IO::Stream::Buffered.new(head)
2121
@tail = tail
2222

2323
@reader = nil
@@ -52,10 +52,8 @@ def reader(task)
5252
end
5353

5454
@head.close_write
55-
rescue => error
56-
raise
5755
ensure
58-
@input.close(error)
56+
@input.close($!)
5957

6058
close_head if @writer&.finished?
6159
end
@@ -70,10 +68,8 @@ def writer(task)
7068
while chunk = @head.read_partial
7169
@output.write(chunk)
7270
end
73-
rescue => error
74-
raise
7571
ensure
76-
@output.close_write(error)
72+
@output.close($!)
7773

7874
close_head if @reader&.finished?
7975
end

lib/async/http/body/slowloris.rb

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
# frozen_string_literal: true
2+
3+
# Released under the MIT License.
4+
# Copyright, 2019-2023, by Samuel Williams.
5+
6+
require_relative 'writable'
7+
8+
require 'async/clock'
9+
10+
module Async
11+
module HTTP
12+
module Body
13+
# A dynamic body which you can write to and read from.
14+
class Slowloris < Writable
15+
class ThroughputError < StandardError
16+
def initialize(throughput, minimum_throughput, time_since_last_write)
17+
super("Slow write: #{throughput.round(1)}bytes/s less than required #{minimum_throughput.round}bytes/s.")
18+
end
19+
end
20+
21+
# In order for this implementation to work correctly, you need to use a LimitedQueue.
22+
# @param minimum_throughput [Integer] the minimum bytes per second otherwise this body will be forcefully closed.
23+
def initialize(*arguments, minimum_throughput: 1024, **options)
24+
super(*arguments, **options)
25+
26+
@minimum_throughput = minimum_throughput
27+
28+
@last_write_at = nil
29+
@last_chunk_size = nil
30+
end
31+
32+
attr :minimum_throughput
33+
34+
# If #read is called regularly to maintain throughput, that is good. If #read is not called, that is a problem. Throughput is dependent on data being available, from #write, so it doesn't seem particularly problimatic to do this check in #write.
35+
def write(chunk)
36+
if @last_chunk_size
37+
time_since_last_write = Async::Clock.now - @last_write_at
38+
throughput = @last_chunk_size / time_since_last_write
39+
40+
if throughput < @minimum_throughput
41+
error = ThroughputError.new(throughput, @minimum_throughput, time_since_last_write)
42+
43+
self.close(error)
44+
end
45+
end
46+
47+
super.tap do
48+
@last_write_at = Async::Clock.now
49+
@last_chunk_size = chunk&.bytesize
50+
end
51+
end
52+
end
53+
end
54+
end
55+
end

lib/async/http/protocol/http1/server.rb

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -68,31 +68,19 @@ def each(task: Task.current)
6868
stream = write_upgrade_body(protocol)
6969

7070
# At this point, the request body is hijacked, so we don't want to call #finish below.
71-
request = nil
71+
request = nil unless request.body
7272
response = nil
7373

7474
# We must return here as no further request processing can be done:
7575
return body.call(stream)
76-
elsif response.status == 101
77-
# This code path is to support legacy behavior where the response status is set to 101, but the protocol is not upgraded. This may not be a valid use case, but it is supported for compatibility. We expect the response headers to contain the `upgrade` header.
78-
write_response(@version, response.status, response.headers)
79-
80-
stream = write_tunnel_body(request.version)
81-
82-
# Same as above:
83-
request = nil
84-
response = nil
85-
86-
# We must return here as no further request processing can be done:
87-
return body&.call(stream)
8876
else
8977
write_response(@version, response.status, response.headers)
9078

9179
if request.connect? and response.success?
9280
stream = write_tunnel_body(request.version)
9381

9482
# Same as above:
95-
request = nil
83+
request = nil unless request.body
9684
response = nil
9785

9886
# We must return here as no further request processing can be done:

lib/async/http/protocol/http2/input.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,14 @@
33
# Released under the MIT License.
44
# Copyright, 2020-2023, by Samuel Williams.
55

6-
require 'protocol/http/body/writable'
6+
require_relative '../../body/writable'
77

88
module Async
99
module HTTP
1010
module Protocol
1111
module HTTP2
1212
# A writable body which requests window updates when data is read from it.
13-
class Input < ::Protocol::HTTP::Body::Writable
13+
class Input < Body::Writable
1414
def initialize(stream, length)
1515
super(length)
1616

0 commit comments

Comments
 (0)