Skip to content

Streaming tests #182

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion async-http.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ Gem::Specification.new do |spec|
spec.add_dependency "async-pool", "~> 0.7"
spec.add_dependency "io-endpoint", "~> 0.11"
spec.add_dependency "io-stream", "~> 0.4"
spec.add_dependency "protocol-http", "~> 0.34"
spec.add_dependency "protocol-http", "~> 0.35"
spec.add_dependency "protocol-http1", "~> 0.20"
spec.add_dependency "protocol-http2", "~> 0.18"
spec.add_dependency "traces", ">= 0.10"
Expand Down
39 changes: 39 additions & 0 deletions lib/async/http/body/finishable.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2019-2023, by Samuel Williams.

require 'protocol/http/body/wrapper'
require 'async/variable'

module Async
module HTTP
module Body
class Finishable < ::Protocol::HTTP::Body::Wrapper
def initialize(body)
super(body)

@closed = Async::Variable.new
@error = nil
end

def close(error = nil)
unless @closed.resolved?
@error = error
@closed.value = true
end

super
end

def wait
@closed.wait
end

def inspect
"#<#{self.class} closed=#{@closed} error=#{@error}> | #{super}"
end
end
end
end
end
3 changes: 3 additions & 0 deletions lib/async/http/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,9 @@ def make_response(request, connection)

# The connection won't be released until the body is completely read/released.
::Protocol::HTTP::Body::Completable.wrap(response) do
# TODO: We should probably wait until the request is fully consumed and/or the connection is ready before releasing it back into the pool.

# Release the connection back into the pool:
@pool.release(connection)
end

Expand Down
16 changes: 12 additions & 4 deletions lib/async/http/protocol/http1/server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
# Copyright, 2024, by Anton Zhuravsky.

require_relative 'connection'
require_relative '../../body/finishable'

require 'console/event/failure'

module Async
Expand Down Expand Up @@ -46,6 +48,11 @@ def each(task: Task.current)
task.annotate("Reading #{self.version} requests for #{self.class}.")

while request = next_request
if body = request.body
finishable = Body::Finishable.new(body)
request.body = finishable
end

response = yield(request, self)
version = request.version
body = response&.body
Expand Down Expand Up @@ -102,23 +109,24 @@ def each(task: Task.current)
head = request.head?

# Same as above:
request = nil unless request.body
request = nil
response = nil

write_body(version, body, head, trailer)
end
end

# We are done with the body, you shouldn't need to call close on it:
# We are done with the body:
body = nil
else
# If the request failed to generate a response, it was an internal server error:
write_response(@version, 500, {})
write_body(version, nil)

request&.finish
end

# Gracefully finish reading the request body if it was not already done so.
request&.each{}
finishable&.wait

# This ensures we yield at least once every iteration of the loop and allow other fibers to execute.
task.yield
Expand Down
2 changes: 2 additions & 0 deletions test/async/http/middleware/location_redirector.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
with '301' do
let(:app) do
Protocol::HTTP::Middleware.for do |request|
request.finish # TODO: request.discard - or some default handling?

case request.path
when '/home'
Protocol::HTTP::Response[301, {'location' => '/'}, []]
Expand Down
142 changes: 142 additions & 0 deletions test/protocol/http/body/stream.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2024, by Samuel Williams.

require "async/http/protocol/http"
require "protocol/http/body/streamable"
require "sus/fixtures/async/http"

AnEchoServer = Sus::Shared("an echo server") do
let(:app) do
::Protocol::HTTP::Middleware.for do |request|
output = ::Protocol::HTTP::Body::Writable.new

Async do
stream = ::Protocol::HTTP::Body::Stream.new(request.body, output)

Console.debug(self, "Echoing chunks...")
while chunk = stream.readpartial(1024)
Console.debug(self, "Reading chunk:", chunk: chunk)
stream.write(chunk)
end
rescue EOFError
Console.debug(self, "EOF.")
# Ignore.
ensure
Console.debug(self, "Closing stream.")
stream.close
end

::Protocol::HTTP::Response[200, {}, output]
end
end

it "should echo the request body" do
chunks = ["Hello,", "World!"]
response_chunks = Queue.new

output = ::Protocol::HTTP::Body::Writable.new
response = client.post("/", body: output)
stream = ::Protocol::HTTP::Body::Stream.new(response.body, output)

begin
Console.debug(self, "Echoing chunks...")
chunks.each do |chunk|
Console.debug(self, "Writing chunk:", chunk: chunk)
stream.write(chunk)
end

Console.debug(self, "Closing write.")
stream.close_write

Console.debug(self, "Reading chunks...")
while chunk = stream.readpartial(1024)
Console.debug(self, "Reading chunk:", chunk: chunk)
response_chunks << chunk
end
rescue EOFError
Console.debug(self, "EOF.")
# Ignore.
ensure
Console.debug(self, "Closing stream.")
stream.close
response_chunks.close
end

chunks.each do |chunk|
expect(response_chunks.pop).to be == chunk
end
end
end

AnEchoClient = Sus::Shared("an echo client") do
let(:chunks) {["Hello,", "World!"]}
let(:response_chunks) {Queue.new}

let(:app) do
::Protocol::HTTP::Middleware.for do |request|
output = ::Protocol::HTTP::Body::Writable.new

Async do
stream = ::Protocol::HTTP::Body::Stream.new(request.body, output)

Console.debug(self, "Echoing chunks...")
chunks.each do |chunk|
stream.write(chunk)
end

Console.debug(self, "Closing write.")
stream.close_write

Console.debug(self, "Reading chunks...")
while chunk = stream.readpartial(1024)
Console.debug(self, "Reading chunk:", chunk: chunk)
response_chunks << chunk
end
rescue EOFError
Console.debug(self, "EOF.")
# Ignore.
ensure
Console.debug(self, "Closing stream.")
stream.close
end

::Protocol::HTTP::Response[200, {}, output]
end
end

it "should echo the response body" do
output = ::Protocol::HTTP::Body::Writable.new
response = client.post("/", body: output)
stream = ::Protocol::HTTP::Body::Stream.new(response.body, output)

begin
Console.debug(self, "Echoing chunks...")
while chunk = stream.readpartial(1024)
stream.write(chunk)
end
rescue EOFError
Console.debug(self, "EOF.")
# Ignore.
ensure
Console.debug(self, "Closing stream.")
stream.close
end

chunks.each do |chunk|
expect(response_chunks.pop).to be == chunk
end
end
end

[Async::HTTP::Protocol::HTTP1, Async::HTTP::Protocol::HTTP2].each do |protocol|
describe protocol, unique: protocol.name do
include Sus::Fixtures::Async::HTTP::ServerContext

let(:protocol) {subject}

it_behaves_like AnEchoServer
it_behaves_like AnEchoClient
end
end
134 changes: 134 additions & 0 deletions test/protocol/http/body/streamable.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2024, by Samuel Williams.

require "async/http/protocol/http"
require "protocol/http/body/streamable"
require "sus/fixtures/async/http"

AnEchoServer = Sus::Shared("an echo server") do
let(:app) do
::Protocol::HTTP::Middleware.for do |request|
streamable = ::Protocol::HTTP::Body::Streamable.response(request) do |stream|
Console.debug(self, "Echoing chunks...")
while chunk = stream.readpartial(1024)
Console.debug(self, "Reading chunk:", chunk: chunk)
stream.write(chunk)
end
rescue EOFError
Console.debug(self, "EOF.")
# Ignore.
ensure
Console.debug(self, "Closing stream.")
stream.close
end

::Protocol::HTTP::Response[200, {}, streamable]
end
end

it "should echo the request body" do
chunks = ["Hello,", "World!"]
response_chunks = Queue.new

output = ::Protocol::HTTP::Body::Writable.new
response = client.post("/", body: output)
stream = ::Protocol::HTTP::Body::Stream.new(response.body, output)

begin
Console.debug(self, "Echoing chunks...")
chunks.each do |chunk|
Console.debug(self, "Writing chunk:", chunk: chunk)
stream.write(chunk)
end

Console.debug(self, "Closing write.")
stream.close_write

Console.debug(self, "Reading chunks...")
while chunk = stream.readpartial(1024)
Console.debug(self, "Reading chunk:", chunk: chunk)
response_chunks << chunk
end
rescue EOFError
Console.debug(self, "EOF.")
# Ignore.
ensure
Console.debug(self, "Closing stream.")
stream.close
response_chunks.close
end

chunks.each do |chunk|
expect(response_chunks.pop).to be == chunk
end
end
end

AnEchoClient = Sus::Shared("an echo client") do
let(:chunks) {["Hello,", "World!"]}
let(:response_chunks) {Queue.new}

let(:app) do
::Protocol::HTTP::Middleware.for do |request|
streamable = ::Protocol::HTTP::Body::Streamable.response(request) do |stream|
Console.debug(self, "Echoing chunks...")
chunks.each do |chunk|
stream.write(chunk)
end

Console.debug(self, "Closing write.")
stream.close_write

Console.debug(self, "Reading chunks...")
while chunk = stream.readpartial(1024)
Console.debug(self, "Reading chunk:", chunk: chunk)
response_chunks << chunk
end
rescue EOFError
Console.debug(self, "EOF.")
# Ignore.
ensure
Console.debug(self, "Closing stream.")
stream.close
end

::Protocol::HTTP::Response[200, {}, streamable]
end
end

it "should echo the response body" do
output = ::Protocol::HTTP::Body::Writable.new
response = client.post("/", body: output)
stream = ::Protocol::HTTP::Body::Stream.new(response.body, output)

begin
Console.debug(self, "Echoing chunks...")
while chunk = stream.readpartial(1024)
stream.write(chunk)
end
rescue EOFError
Console.debug(self, "EOF.")
# Ignore.
ensure
Console.debug(self, "Closing stream.")
stream.close
end

chunks.each do |chunk|
expect(response_chunks.pop).to be == chunk
end
end
end

[Async::HTTP::Protocol::HTTP1, Async::HTTP::Protocol::HTTP2].each do |protocol|
describe protocol, unique: protocol.name do
include Sus::Fixtures::Async::HTTP::ServerContext

let(:protocol) {subject}

it_behaves_like AnEchoServer
it_behaves_like AnEchoClient
end
end
Loading