diff --git a/Sources/AsyncHTTPClient/AsyncAwait/HTTPClientRequest+Prepared.swift b/Sources/AsyncHTTPClient/AsyncAwait/HTTPClientRequest+Prepared.swift index 360e91b89..2d5e3e2e0 100644 --- a/Sources/AsyncHTTPClient/AsyncAwait/HTTPClientRequest+Prepared.swift +++ b/Sources/AsyncHTTPClient/AsyncAwait/HTTPClientRequest+Prepared.swift @@ -95,7 +95,7 @@ extension RequestBodyLength { case .none: self = .known(0) case .byteBuffer(let buffer): - self = .known(buffer.readableBytes) + self = .known(Int64(buffer.readableBytes)) case .sequence(let length, _, _), .asyncSequence(let length, _): self = length } diff --git a/Sources/AsyncHTTPClient/AsyncAwait/HTTPClientRequest.swift b/Sources/AsyncHTTPClient/AsyncAwait/HTTPClientRequest.swift index 4ed79e38c..ad81bfa32 100644 --- a/Sources/AsyncHTTPClient/AsyncAwait/HTTPClientRequest.swift +++ b/Sources/AsyncHTTPClient/AsyncAwait/HTTPClientRequest.swift @@ -125,7 +125,7 @@ extension HTTPClientRequest.Body { public static func bytes( _ bytes: Bytes ) -> Self where Bytes.Element == UInt8 { - self.bytes(bytes, length: .known(bytes.count)) + self.bytes(bytes, length: .known(Int64(bytes.count))) } /// Create an ``HTTPClientRequest/Body-swift.struct`` from a `Sequence` of bytes. @@ -140,7 +140,7 @@ extension HTTPClientRequest.Body { /// /// Caution should be taken with this method to ensure that the `length` is correct. Incorrect lengths /// will cause unnecessary runtime failures. Setting `length` to ``Length/unknown`` will trigger the upload - /// to use `chunked` `Transfer-Encoding`, while using ``Length/known(_:)`` will use `Content-Length`. + /// to use `chunked` `Transfer-Encoding`, while using ``Length/known(_:)-9q0ge`` will use `Content-Length`. /// /// - parameters: /// - bytes: The bytes of the request body. @@ -225,7 +225,7 @@ extension HTTPClientRequest.Body { /// /// Caution should be taken with this method to ensure that the `length` is correct. Incorrect lengths /// will cause unnecessary runtime failures. Setting `length` to ``Length/unknown`` will trigger the upload - /// to use `chunked` `Transfer-Encoding`, while using ``Length/known(_:)`` will use `Content-Length`. + /// to use `chunked` `Transfer-Encoding`, while using ``Length/known(_:)-9q0ge`` will use `Content-Length`. /// /// - parameters: /// - bytes: The bytes of the request body. @@ -265,7 +265,7 @@ extension HTTPClientRequest.Body { /// /// Caution should be taken with this method to ensure that the `length` is correct. Incorrect lengths /// will cause unnecessary runtime failures. Setting `length` to ``Length/unknown`` will trigger the upload - /// to use `chunked` `Transfer-Encoding`, while using ``Length/known(_:)`` will use `Content-Length`. + /// to use `chunked` `Transfer-Encoding`, while using ``Length/known(_:)-9q0ge`` will use `Content-Length`. /// /// - parameters: /// - sequenceOfBytes: The bytes of the request body. @@ -293,7 +293,7 @@ extension HTTPClientRequest.Body { /// /// Caution should be taken with this method to ensure that the `length` is correct. Incorrect lengths /// will cause unnecessary runtime failures. Setting `length` to ``Length/unknown`` will trigger the upload - /// to use `chunked` `Transfer-Encoding`, while using ``Length/known(_:)`` will use `Content-Length`. + /// to use `chunked` `Transfer-Encoding`, while using ``Length/known(_:)-9q0ge`` will use `Content-Length`. /// /// - parameters: /// - bytes: The bytes of the request body. @@ -341,7 +341,13 @@ extension HTTPClientRequest.Body { public static let unknown: Self = .init(storage: .unknown) /// The size of the request body is known and exactly `count` bytes + @available(*, deprecated, message: "Use `known(_ count: Int64)` with an explicit Int64 argument instead") public static func known(_ count: Int) -> Self { + .init(storage: .known(Int64(count))) + } + + /// The size of the request body is known and exactly `count` bytes + public static func known(_ count: Int64) -> Self { .init(storage: .known(count)) } diff --git a/Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine.swift b/Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine.swift index b575ae094..533062036 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine.swift @@ -58,7 +58,7 @@ struct HTTPRequestStateMachine { /// The request is streaming its request body. `expectedBodyLength` has a value, if the request header contained /// a `"content-length"` header field. If the request header contained a `"transfer-encoding" = "chunked"` /// header field, the `expectedBodyLength` is `nil`. - case streaming(expectedBodyLength: Int?, sentBodyBytes: Int, producer: ProducerControlState) + case streaming(expectedBodyLength: Int64?, sentBodyBytes: Int64, producer: ProducerControlState) /// The request has sent its request body and end. case endSent } @@ -308,13 +308,13 @@ struct HTTPRequestStateMachine { // pause. The reason for this is as follows: There might be thread synchronization // situations in which the producer might not have received the plea to pause yet. - if let expected = expectedBodyLength, sentBodyBytes + part.readableBytes > expected { + if let expected = expectedBodyLength, sentBodyBytes + Int64(part.readableBytes) > expected { let error = HTTPClientError.bodyLengthMismatch self.state = .failed(error) return .failRequest(error, .close(promise)) } - sentBodyBytes += part.readableBytes + sentBodyBytes += Int64(part.readableBytes) let requestState: RequestState = .streaming( expectedBodyLength: expectedBodyLength, @@ -768,7 +768,7 @@ struct HTTPRequestStateMachine { } extension RequestFramingMetadata.Body { - var expectedLength: Int? { + var expectedLength: Int64? { switch self { case .fixedSize(let length): return length case .stream: return nil diff --git a/Sources/AsyncHTTPClient/ConnectionPool/RequestBodyLength.swift b/Sources/AsyncHTTPClient/ConnectionPool/RequestBodyLength.swift index 83f0e6edf..58ba694a7 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/RequestBodyLength.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/RequestBodyLength.swift @@ -20,5 +20,5 @@ internal enum RequestBodyLength: Hashable, Sendable { /// size of the request body is not known before starting the request case unknown /// size of the request body is fixed and exactly `count` bytes - case known(_ count: Int) + case known(_ count: Int64) } diff --git a/Sources/AsyncHTTPClient/ConnectionPool/RequestFramingMetadata.swift b/Sources/AsyncHTTPClient/ConnectionPool/RequestFramingMetadata.swift index 98080e364..033060a99 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/RequestFramingMetadata.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/RequestFramingMetadata.swift @@ -15,7 +15,7 @@ struct RequestFramingMetadata: Hashable { enum Body: Hashable { case stream - case fixedSize(Int) + case fixedSize(Int64) } var connectionClose: Bool diff --git a/Sources/AsyncHTTPClient/HTTPHandler.swift b/Sources/AsyncHTTPClient/HTTPHandler.swift index 98415a124..c8a485023 100644 --- a/Sources/AsyncHTTPClient/HTTPHandler.swift +++ b/Sources/AsyncHTTPClient/HTTPHandler.swift @@ -70,7 +70,19 @@ extension HTTPClient { /// Body size. If nil,`Transfer-Encoding` will automatically be set to `chunked`. Otherwise a `Content-Length` /// header is set with the given `length`. - public var length: Int? + @available(*, deprecated, renamed: "contentLength") + public var length: Int? { + get { + self.contentLength.flatMap { Int($0) } + } + set { + self.contentLength = newValue.flatMap { Int64($0) } + } + } + + /// Body size. If nil,`Transfer-Encoding` will automatically be set to `chunked`. Otherwise a `Content-Length` + /// header is set with the given `contentLength`. + public var contentLength: Int64? /// Body chunk provider. public var stream: @Sendable (StreamWriter) -> EventLoopFuture @@ -78,8 +90,8 @@ extension HTTPClient { @usableFromInline typealias StreamCallback = @Sendable (StreamWriter) -> EventLoopFuture @inlinable - init(length: Int?, stream: @escaping StreamCallback) { - self.length = length + init(contentLength: Int64?, stream: @escaping StreamCallback) { + self.contentLength = contentLength.flatMap { $0 } self.stream = stream } @@ -88,7 +100,7 @@ extension HTTPClient { /// - parameters: /// - buffer: Body `ByteBuffer` representation. public static func byteBuffer(_ buffer: ByteBuffer) -> Body { - return Body(length: buffer.readableBytes) { writer in + return Body(contentLength: Int64(buffer.readableBytes)) { writer in writer.write(.byteBuffer(buffer)) } } @@ -100,8 +112,19 @@ extension HTTPClient { /// header is set with the given `length`. /// - stream: Body chunk provider. @preconcurrency + @available(*, deprecated, renamed: "stream(contentLength:bodyStream:)") public static func stream(length: Int? = nil, _ stream: @Sendable @escaping (StreamWriter) -> EventLoopFuture) -> Body { - return Body(length: length, stream: stream) + return Body(contentLength: length.flatMap { Int64($0) }, stream: stream) + } + + /// Create and stream body using ``StreamWriter``. + /// + /// - parameters: + /// - contentLength: Body size. If nil, `Transfer-Encoding` will automatically be set to `chunked`. Otherwise a `Content-Length` + /// header is set with the given `contentLength`. + /// - bodyStream: Body chunk provider. + public static func stream(contentLength: Int64? = nil, bodyStream: @Sendable @escaping (StreamWriter) -> EventLoopFuture) -> Body { + return Body(contentLength: contentLength, stream: bodyStream) } /// Create and stream body using a collection of bytes. @@ -111,7 +134,7 @@ extension HTTPClient { @preconcurrency @inlinable public static func bytes(_ bytes: Bytes) -> Body where Bytes: RandomAccessCollection, Bytes: Sendable, Bytes.Element == UInt8 { - return Body(length: bytes.count) { writer in + return Body(contentLength: Int64(bytes.count)) { writer in if bytes.count <= bagOfBytesToByteBufferConversionChunkSize { return writer.write(.byteBuffer(ByteBuffer(bytes: bytes))) } else { @@ -125,7 +148,7 @@ extension HTTPClient { /// - parameters: /// - string: Body `String` representation. public static func string(_ string: String) -> Body { - return Body(length: string.utf8.count) { writer in + return Body(contentLength: Int64(string.utf8.count)) { writer in if string.utf8.count <= bagOfBytesToByteBufferConversionChunkSize { return writer.write(.byteBuffer(ByteBuffer(string: string))) } else { @@ -858,7 +881,7 @@ extension RequestBodyLength { self = .known(0) return } - guard let length = body.length else { + guard let length = body.contentLength else { self = .unknown return } diff --git a/Tests/AsyncHTTPClientTests/AsyncAwaitEndToEndTests.swift b/Tests/AsyncHTTPClientTests/AsyncAwaitEndToEndTests.swift index a30a8cf91..626f5b4ae 100644 --- a/Tests/AsyncHTTPClientTests/AsyncAwaitEndToEndTests.swift +++ b/Tests/AsyncHTTPClientTests/AsyncAwaitEndToEndTests.swift @@ -184,6 +184,62 @@ final class AsyncAwaitEndToEndTests: XCTestCase { } } + struct AsyncSequenceByteBufferGenerator: AsyncSequence, Sendable, AsyncIteratorProtocol { + typealias Element = ByteBuffer + + let chunkSize: Int + let totalChunks: Int + let buffer: ByteBuffer + var chunksGenerated: Int = 0 + + init(chunkSize: Int, totalChunks: Int) { + self.chunkSize = chunkSize + self.totalChunks = totalChunks + self.buffer = ByteBuffer(repeating: 1, count: self.chunkSize) + } + + mutating func next() async throws -> ByteBuffer? { + guard self.chunksGenerated < self.totalChunks else { return nil } + + self.chunksGenerated += 1 + return self.buffer + } + + func makeAsyncIterator() -> AsyncSequenceByteBufferGenerator { + return self + } + } + + func testEchoStreamThatHas3GBInTotal() async throws { + let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) + defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) } + let bin = HTTPBin(.http1_1()) { _ in HTTPEchoHandler() } + defer { XCTAssertNoThrow(try bin.shutdown()) } + + let client: HTTPClient = makeDefaultHTTPClient(eventLoopGroupProvider: .shared(eventLoopGroup)) + defer { XCTAssertNoThrow(try client.syncShutdown()) } + + let logger = Logger(label: "HTTPClient", factory: StreamLogHandler.standardOutput(label:)) + + var request = HTTPClientRequest(url: "http://localhost:\(bin.port)/") + request.method = .POST + + let sequence = AsyncSequenceByteBufferGenerator( + chunkSize: 4_194_304, // 4MB chunk + totalChunks: 768 // Total = 3GB + ) + request.body = .stream(sequence, length: .unknown) + + let response: HTTPClientResponse = try await client.execute(request, deadline: .now() + .seconds(30), logger: logger) + XCTAssertEqual(response.headers["content-length"], []) + + var receivedBytes: Int64 = 0 + for try await part in response.body { + receivedBytes += Int64(part.readableBytes) + } + XCTAssertEqual(receivedBytes, 3_221_225_472) // 3GB + } + func testPostWithAsyncSequenceOfByteBuffers() { XCTAsyncTest { let bin = HTTPBin(.http2(compress: false)) { _ in HTTPEchoHandler() } diff --git a/Tests/AsyncHTTPClientTests/HTTP1ClientChannelHandlerTests.swift b/Tests/AsyncHTTPClientTests/HTTP1ClientChannelHandlerTests.swift index f6a2840d9..f4f2d67f8 100644 --- a/Tests/AsyncHTTPClientTests/HTTP1ClientChannelHandlerTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTP1ClientChannelHandlerTests.swift @@ -113,7 +113,7 @@ class HTTP1ClientChannelHandlerTests: XCTestCase { guard let testUtils = maybeTestUtils else { return XCTFail("Expected connection setup works") } var maybeRequest: HTTPClient.Request? - XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: .stream(length: 100) { writer in + XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: .stream(contentLength: 100) { writer in testWriter.start(writer: writer) })) guard let request = maybeRequest else { return XCTFail("Expected to be able to create a request") } @@ -345,7 +345,7 @@ class HTTP1ClientChannelHandlerTests: XCTestCase { guard let testUtils = maybeTestUtils else { return XCTFail("Expected connection setup works") } var maybeRequest: HTTPClient.Request? - XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: .stream(length: 10) { writer in + XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: .stream(contentLength: 10) { writer in // Advance time by more than the idle write timeout (that's 1 millisecond) to trigger the timeout. embedded.embeddedEventLoop.advanceTime(by: .milliseconds(2)) return testWriter.start(writer: writer) @@ -384,7 +384,7 @@ class HTTP1ClientChannelHandlerTests: XCTestCase { guard let testUtils = maybeTestUtils else { return XCTFail("Expected connection setup works") } var maybeRequest: HTTPClient.Request? - XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: .stream(length: 10) { writer in + XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: .stream(contentLength: 10) { writer in embedded.isWritable = false embedded.pipeline.fireChannelWritabilityChanged() // This should not trigger any errors or timeouts, because the timer isn't running @@ -432,7 +432,7 @@ class HTTP1ClientChannelHandlerTests: XCTestCase { guard let testUtils = maybeTestUtils else { return XCTFail("Expected connection setup works") } var maybeRequest: HTTPClient.Request? - XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: .stream(length: 2) { writer in + XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: .stream(contentLength: 2) { writer in return testWriter.start(writer: writer, expectedErrors: [HTTPClientError.cancelled]) })) guard let request = maybeRequest else { return XCTFail("Expected to be able to create a request") } @@ -595,7 +595,7 @@ class HTTP1ClientChannelHandlerTests: XCTestCase { guard let testUtils = maybeTestUtils else { return XCTFail("Expected connection setup works") } var maybeRequest: HTTPClient.Request? - XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: .stream(length: 10) { writer in + XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: .stream(contentLength: 10) { writer in testWriter.start(writer: writer) })) guard let request = maybeRequest else { return XCTFail("Expected to be able to create a request") } diff --git a/Tests/AsyncHTTPClientTests/HTTP1ConnectionTests.swift b/Tests/AsyncHTTPClientTests/HTTP1ConnectionTests.swift index 3ff73de06..5ea8bb77c 100644 --- a/Tests/AsyncHTTPClientTests/HTTP1ConnectionTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTP1ConnectionTests.swift @@ -116,8 +116,8 @@ class HTTP1ConnectionTests: XCTestCase { XCTAssertNoThrow(maybeRequest = try HTTPClient.Request( url: "http://localhost/hello/swift", method: .POST, - body: .stream(length: 4) { writer -> EventLoopFuture in - func recursive(count: UInt8, promise: EventLoopPromise) { + body: .stream(contentLength: 4) { writer -> EventLoopFuture in + @Sendable func recursive(count: UInt8, promise: EventLoopPromise) { guard count < 4 else { return promise.succeed(()) } diff --git a/Tests/AsyncHTTPClientTests/HTTP2ClientRequestHandlerTests.swift b/Tests/AsyncHTTPClientTests/HTTP2ClientRequestHandlerTests.swift index 545ba1e3c..2428199a4 100644 --- a/Tests/AsyncHTTPClientTests/HTTP2ClientRequestHandlerTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTP2ClientRequestHandlerTests.swift @@ -115,7 +115,7 @@ class HTTP2ClientRequestHandlerTests: XCTestCase { let testWriter = TestBackpressureWriter(eventLoop: embedded.eventLoop, parts: 50) var maybeRequest: HTTPClient.Request? - XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: .stream(length: 100) { writer in + XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: .stream(contentLength: 100) { writer in testWriter.start(writer: writer) })) guard let request = maybeRequest else { return XCTFail("Expected to be able to create a request") } @@ -295,7 +295,7 @@ class HTTP2ClientRequestHandlerTests: XCTestCase { let testWriter = TestBackpressureWriter(eventLoop: embedded.eventLoop, parts: 5) var maybeRequest: HTTPClient.Request? - XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: .stream(length: 10) { writer in + XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: .stream(contentLength: 10) { writer in // Advance time by more than the idle write timeout (that's 1 millisecond) to trigger the timeout. embedded.embeddedEventLoop.advanceTime(by: .milliseconds(2)) return testWriter.start(writer: writer) @@ -335,7 +335,7 @@ class HTTP2ClientRequestHandlerTests: XCTestCase { let testWriter = TestBackpressureWriter(eventLoop: embedded.eventLoop, parts: 5) var maybeRequest: HTTPClient.Request? - XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: .stream(length: 10) { writer in + XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: .stream(contentLength: 10) { writer in embedded.isWritable = false embedded.pipeline.fireChannelWritabilityChanged() // This should not trigger any errors or timeouts, because the timer isn't running @@ -385,7 +385,7 @@ class HTTP2ClientRequestHandlerTests: XCTestCase { let testWriter = TestBackpressureWriter(eventLoop: embedded.eventLoop, parts: 5) var maybeRequest: HTTPClient.Request? - XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: .stream(length: 2) { writer in + XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: .stream(contentLength: 2) { writer in return testWriter.start(writer: writer, expectedErrors: [HTTPClientError.cancelled]) })) guard let request = maybeRequest else { return XCTFail("Expected to be able to create a request") } diff --git a/Tests/AsyncHTTPClientTests/HTTP2ClientTests.swift b/Tests/AsyncHTTPClientTests/HTTP2ClientTests.swift index 97f0385ea..889cd38b9 100644 --- a/Tests/AsyncHTTPClientTests/HTTP2ClientTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTP2ClientTests.swift @@ -68,7 +68,7 @@ class HTTP2ClientTests: XCTestCase { let client = self.makeDefaultHTTPClient() defer { XCTAssertNoThrow(try client.syncShutdown()) } var response: HTTPClient.Response? - let body = HTTPClient.Body.stream(length: nil) { writer in + let body = HTTPClient.Body.stream(contentLength: nil) { writer in writer.write(.byteBuffer(ByteBuffer(integer: UInt64(0)))).flatMap { writer.write(.byteBuffer(ByteBuffer(integer: UInt64(0)))) } @@ -84,7 +84,7 @@ class HTTP2ClientTests: XCTestCase { defer { XCTAssertNoThrow(try bin.shutdown()) } let client = self.makeDefaultHTTPClient() defer { XCTAssertNoThrow(try client.syncShutdown()) } - let body = HTTPClient.Body.stream(length: 12) { writer in + let body = HTTPClient.Body.stream(contentLength: 12) { writer in writer.write(.byteBuffer(ByteBuffer(integer: UInt64(0)))).flatMap { writer.write(.byteBuffer(ByteBuffer(integer: UInt64(0)))) } diff --git a/Tests/AsyncHTTPClientTests/HTTPClientInternalTests.swift b/Tests/AsyncHTTPClientTests/HTTPClientInternalTests.swift index 6f412a30d..80446251c 100644 --- a/Tests/AsyncHTTPClientTests/HTTPClientInternalTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPClientInternalTests.swift @@ -52,7 +52,7 @@ class HTTPClientInternalTests: XCTestCase { XCTAssertNoThrow(try httpBin.shutdown()) } - let body: HTTPClient.Body = .stream(length: 50) { writer in + let body: HTTPClient.Body = .stream(contentLength: 50) { writer in do { var request = try Request(url: "http://localhost:\(httpBin.port)/events/10/1") request.headers.add(name: "Accept", value: "text/event-stream") @@ -81,13 +81,13 @@ class HTTPClientInternalTests: XCTestCase { XCTAssertNoThrow(try httpBin.shutdown()) } - var body: HTTPClient.Body = .stream(length: 50) { _ in + var body: HTTPClient.Body = .stream(contentLength: 50) { _ in httpClient.eventLoopGroup.next().makeFailedFuture(HTTPClientError.invalidProxyResponse) } XCTAssertThrowsError(try httpClient.post(url: "http://localhost:\(httpBin.port)/post", body: body).wait()) - body = .stream(length: 50) { _ in + body = .stream(contentLength: 50) { _ in do { var request = try Request(url: "http://localhost:\(httpBin.port)/events/10/1") request.headers.add(name: "Accept", value: "text/event-stream") @@ -223,7 +223,7 @@ class HTTPClientInternalTests: XCTestCase { XCTAssertNoThrow(try httpClient.syncShutdown(requiresCleanClose: true)) } - let body: HTTPClient.Body = .stream(length: 8) { writer in + let body: HTTPClient.Body = .stream(contentLength: 8) { writer in let buffer = ByteBuffer(string: "1234") return writer.write(.byteBuffer(buffer)).flatMap { let buffer = ByteBuffer(string: "4321") @@ -366,7 +366,7 @@ class HTTPClientInternalTests: XCTestCase { let el2 = group.next() XCTAssert(el1 !== el2) - let body: HTTPClient.Body = .stream(length: 8) { writer in + let body: HTTPClient.Body = .stream(contentLength: 8) { writer in XCTAssert(el1.inEventLoop) let buffer = ByteBuffer(string: "1234") return writer.write(.byteBuffer(buffer)).flatMap { diff --git a/Tests/AsyncHTTPClientTests/HTTPClientRequestTests.swift b/Tests/AsyncHTTPClientTests/HTTPClientRequestTests.swift index b0b1be1d8..c93ab4cb5 100644 --- a/Tests/AsyncHTTPClientTests/HTTPClientRequestTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPClientRequestTests.swift @@ -312,7 +312,7 @@ class HTTPClientRequestTests: XCTestCase { request.method = .POST let sequence = AnySendableSequence(ByteBuffer(string: "post body").readableBytesView) - request.body = .bytes(sequence, length: .known(9)) + request.body = .bytes(sequence, length: .known(Int64(9))) var preparedRequest: PreparedRequest? XCTAssertNoThrow(preparedRequest = try PreparedRequest(request)) guard let preparedRequest = preparedRequest else { return } @@ -424,7 +424,7 @@ class HTTPClientRequestTests: XCTestCase { .async .map { ByteBuffer($0) } - request.body = .stream(asyncSequence, length: .known(9)) + request.body = .stream(asyncSequence, length: .known(Int64(9))) var preparedRequest: PreparedRequest? XCTAssertNoThrow(preparedRequest = try PreparedRequest(request)) guard let preparedRequest = preparedRequest else { return } @@ -476,7 +476,7 @@ class HTTPClientRequestTests: XCTestCase { String(repeating: "1", count: bagOfBytesToByteBufferConversionChunkSize) + String(repeating: "2", count: bagOfBytesToByteBufferConversionChunkSize) ).utf8, - length: .known(bagOfBytesToByteBufferConversionChunkSize * 3) + length: .known(Int64(bagOfBytesToByteBufferConversionChunkSize * 3)) ).collect() let expectedChunks = [ @@ -495,7 +495,7 @@ class HTTPClientRequestTests: XCTestCase { Array(repeating: 0, count: bagOfBytesToByteBufferConversionChunkSize) + Array(repeating: 1, count: bagOfBytesToByteBufferConversionChunkSize) ), - length: .known(bagOfBytesToByteBufferConversionChunkSize * 3), + length: .known(Int64(bagOfBytesToByteBufferConversionChunkSize * 3)), bagOfBytesToByteBufferConversionChunkSize: bagOfBytesToByteBufferConversionChunkSize, byteBufferMaxSize: byteBufferMaxSize ).collect() @@ -516,7 +516,7 @@ class HTTPClientRequestTests: XCTestCase { } let body = try await HTTPClientRequest.Body.bytes( makeBytes(), - length: .known(bagOfBytesToByteBufferConversionChunkSize * 3) + length: .known(Int64(bagOfBytesToByteBufferConversionChunkSize * 3)) ).collect() var firstChunk = ByteBuffer(repeating: 0, count: bagOfBytesToByteBufferConversionChunkSize) @@ -539,7 +539,7 @@ class HTTPClientRequestTests: XCTestCase { } let body = try await HTTPClientRequest.Body._bytes( makeBytes(), - length: .known(bagOfBytesToByteBufferConversionChunkSize * 3), + length: .known(Int64(bagOfBytesToByteBufferConversionChunkSize * 3)), bagOfBytesToByteBufferConversionChunkSize: bagOfBytesToByteBufferConversionChunkSize, byteBufferMaxSize: byteBufferMaxSize ).collect() @@ -614,8 +614,8 @@ extension HTTPClient.Body { } private struct LengthMismatch: Error { - var announcedLength: Int - var actualLength: Int + var announcedLength: Int64 + var actualLength: Int64 } @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) @@ -631,8 +631,8 @@ extension Optional where Wrapped == HTTPClientRequest.Prepared.Body { case .sequence(let announcedLength, _, let generate): let buffer = generate(ByteBufferAllocator()) if case .known(let announcedLength) = announcedLength, - announcedLength != buffer.readableBytes { - throw LengthMismatch(announcedLength: announcedLength, actualLength: buffer.readableBytes) + announcedLength != Int64(buffer.readableBytes) { + throw LengthMismatch(announcedLength: announcedLength, actualLength: Int64(buffer.readableBytes)) } return buffer case .asyncSequence(length: let announcedLength, let generate): @@ -641,8 +641,8 @@ extension Optional where Wrapped == HTTPClientRequest.Prepared.Body { accumulatedBuffer.writeBuffer(&buffer) } if case .known(let announcedLength) = announcedLength, - announcedLength != accumulatedBuffer.readableBytes { - throw LengthMismatch(announcedLength: announcedLength, actualLength: accumulatedBuffer.readableBytes) + announcedLength != Int64(accumulatedBuffer.readableBytes) { + throw LengthMismatch(announcedLength: announcedLength, actualLength: Int64(accumulatedBuffer.readableBytes)) } return accumulatedBuffer } diff --git a/Tests/AsyncHTTPClientTests/HTTPClientTests.swift b/Tests/AsyncHTTPClientTests/HTTPClientTests.swift index 1bfca1d30..51bc1f005 100644 --- a/Tests/AsyncHTTPClientTests/HTTPClientTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPClientTests.swift @@ -621,7 +621,7 @@ final class HTTPClientTests: XCTestCaseHTTPClientTestsBaseClass { let request = try HTTPClient.Request(url: self.defaultHTTPBinURLPrefix + "post", method: .POST, headers: ["transfer-encoding": "chunked"], - body: .stream { streamWriter in + body: .stream(bodyStream: { streamWriter in _ = streamWriter.write(.byteBuffer(.init())) let promise = self.clientGroup.next().makePromise(of: Void.self) @@ -630,7 +630,7 @@ final class HTTPClientTests: XCTestCaseHTTPClientTestsBaseClass { } return promise.futureResult - }) + })) XCTAssertThrowsError(try localClient.execute(request: request).wait()) { XCTAssertEqual($0 as? HTTPClientError, .writeTimeout) @@ -802,7 +802,7 @@ final class HTTPClientTests: XCTestCaseHTTPClientTestsBaseClass { } func testUploadStreaming() throws { - let body: HTTPClient.Body = .stream(length: 8) { writer in + let body: HTTPClient.Body = .stream(contentLength: 8) { writer in let buffer = ByteBuffer(string: "1234") return writer.write(.byteBuffer(buffer)).flatMap { let buffer = ByteBuffer(string: "4321") @@ -1953,9 +1953,9 @@ final class HTTPClientTests: XCTestCaseHTTPClientTestsBaseClass { } func testValidationErrorsAreSurfaced() throws { - let request = try HTTPClient.Request(url: self.defaultHTTPBinURLPrefix + "get", method: .TRACE, body: .stream { _ in + let request = try HTTPClient.Request(url: self.defaultHTTPBinURLPrefix + "get", method: .TRACE, body: .stream(bodyStream: { _ in self.defaultClient.eventLoopGroup.next().makeSucceededFuture(()) - }) + })) let runningRequest = self.defaultClient.execute(request: request) XCTAssertThrowsError(try runningRequest.wait()) { error in XCTAssertEqual(HTTPClientError.traceRequestWithBody, error as? HTTPClientError) @@ -2048,10 +2048,10 @@ final class HTTPClientTests: XCTestCaseHTTPClientTestsBaseClass { return try? HTTPClient.Request(url: "http://\(localAddress.ipAddress!):\(localAddress.port!)", method: .POST, headers: ["transfer-encoding": "chunked"], - body: .stream { streamWriter in + body: .stream(bodyStream: { streamWriter in streamWriterPromise.succeed(streamWriter) return sentOffAllBodyPartsPromise.futureResult - }) + })) } guard let server = makeServer(), let request = makeRequest(server: server) else { @@ -2083,7 +2083,7 @@ final class HTTPClientTests: XCTestCaseHTTPClientTestsBaseClass { } func testUploadStreamingCallinToleratedFromOtsideEL() throws { - let request = try HTTPClient.Request(url: self.defaultHTTPBinURLPrefix + "get", method: .POST, body: .stream(length: 4) { writer in + let request = try HTTPClient.Request(url: self.defaultHTTPBinURLPrefix + "get", method: .POST, body: .stream(contentLength: 4) { writer in let promise = self.defaultClient.eventLoopGroup.next().makePromise(of: Void.self) // We have to toleare callins from any thread DispatchQueue(label: "upload-streaming").async { @@ -2602,9 +2602,9 @@ final class HTTPClientTests: XCTestCaseHTTPClientTestsBaseClass { } var request = try HTTPClient.Request(url: "http://localhost:\(server.serverPort)/") - request.body = .stream { writer in + request.body = .stream(bodyStream: { writer in writer.write(.byteBuffer(ByteBuffer(string: "1234"))) - } + }) let future = client.execute(request: request) @@ -2703,7 +2703,7 @@ final class HTTPClientTests: XCTestCaseHTTPClientTestsBaseClass { XCTAssertThrowsError( try self.defaultClient.execute(request: Request(url: url, - body: .stream(length: 10) { streamWriter in + body: .stream(contentLength: 10) { streamWriter in let promise = self.defaultClient.eventLoopGroup.next().makePromise(of: Void.self) DispatchQueue(label: "content-length-test").async { streamWriter.write(.byteBuffer(ByteBuffer(string: "1"))).cascade(to: promise) @@ -2733,7 +2733,7 @@ final class HTTPClientTests: XCTestCaseHTTPClientTestsBaseClass { XCTAssertThrowsError( try self.defaultClient.execute(request: Request(url: url, - body: .stream(length: 1) { streamWriter in + body: .stream(contentLength: 1) { streamWriter in streamWriter.write(.byteBuffer(ByteBuffer(string: tooLong))) })).wait()) { error in XCTAssertEqual(error as! HTTPClientError, HTTPClientError.bodyLengthMismatch) @@ -2756,7 +2756,7 @@ final class HTTPClientTests: XCTestCaseHTTPClientTestsBaseClass { func testBodyUploadAfterEndFails() { let url = self.defaultHTTPBinURLPrefix + "post" - func uploader(_ streamWriter: HTTPClient.Body.StreamWriter) -> EventLoopFuture { + let uploader = { @Sendable (_ streamWriter: HTTPClient.Body.StreamWriter) -> EventLoopFuture in let done = streamWriter.write(.byteBuffer(ByteBuffer(string: "X"))) done.recover { error in XCTFail("unexpected error \(error)") @@ -2777,7 +2777,7 @@ final class HTTPClientTests: XCTestCaseHTTPClientTestsBaseClass { } var request: HTTPClient.Request? - XCTAssertNoThrow(request = try Request(url: url, body: .stream(length: 1, uploader))) + XCTAssertNoThrow(request = try Request(url: url, body: .stream(contentLength: 1, bodyStream: uploader))) XCTAssertThrowsError(try self.defaultClient.execute(request: XCTUnwrap(request)).wait()) { XCTAssertEqual($0 as? HTTPClientError, .writeAfterRequestSent) } @@ -2793,7 +2793,7 @@ final class HTTPClientTests: XCTestCaseHTTPClientTestsBaseClass { _ = self.defaultClient.get(url: "http://localhost:\(self.defaultHTTPBin.port)/events/10/1") var request = try HTTPClient.Request(url: "http://localhost:\(self.defaultHTTPBin.port)/wait", method: .POST) - request.body = .stream { writer in + request.body = .stream(bodyStream: { writer in // Start writing chunks so tha we will try to write after read timeout is thrown for _ in 1...10 { _ = writer.write(.byteBuffer(ByteBuffer(string: "1234"))) @@ -2805,7 +2805,7 @@ final class HTTPClientTests: XCTestCaseHTTPClientTestsBaseClass { } return promise.futureResult - } + }) // We specify a deadline of 2 ms co that request will be timed out before all chunks are writtent, // we need to verify that second error on write after timeout does not lead to double-release. @@ -2968,10 +2968,10 @@ final class HTTPClientTests: XCTestCaseHTTPClientTestsBaseClass { let delegate = ResponseStreamDelegate(eventLoop: delegateEL) - let body: HTTPClient.Body = .stream { writer in + let body: HTTPClient.Body = .stream(bodyStream: { writer in let finalPromise = writeEL.makePromise(of: Void.self) - func writeLoop(_ writer: HTTPClient.Body.StreamWriter, index: Int) { + @Sendable func writeLoop(_ writer: HTTPClient.Body.StreamWriter, index: Int) { // always invoke from the wrong el to test thread safety writeEL.preconditionInEventLoop() @@ -3004,7 +3004,7 @@ final class HTTPClientTests: XCTestCaseHTTPClientTestsBaseClass { } return finalPromise.futureResult - } + }) let request = try! HTTPClient.Request(url: "http://localhost:\(httpBin.port)", body: body) let future = httpClient.execute(request: request, delegate: delegate, eventLoop: .delegate(on: delegateEL)) @@ -3068,9 +3068,9 @@ final class HTTPClientTests: XCTestCaseHTTPClientTestsBaseClass { let body = ByteBuffer(bytes: 0..<11) var request = try Request(url: httpBin.baseURL) - request.body = .stream { writer in + request.body = .stream(bodyStream: { writer in writer.write(.byteBuffer(body)) - } + }) XCTAssertThrowsError(try self.defaultClient.execute( request: request, delegate: ResponseAccumulator(request: request, maxBodySize: 10) @@ -3086,9 +3086,9 @@ final class HTTPClientTests: XCTestCaseHTTPClientTestsBaseClass { let body = ByteBuffer(bytes: 0..<10) var request = try Request(url: httpBin.baseURL) - request.body = .stream { writer in + request.body = .stream(bodyStream: { writer in writer.write(.byteBuffer(body)) - } + }) let response = try self.defaultClient.execute( request: request, delegate: ResponseAccumulator(request: request, maxBodySize: 10) @@ -3113,10 +3113,10 @@ final class HTTPClientTests: XCTestCaseHTTPClientTestsBaseClass { let delegate = ResponseStreamDelegate(eventLoop: delegateEL) - let body: HTTPClient.Body = .stream { writer in + let body: HTTPClient.Body = .stream(bodyStream: { writer in let finalPromise = writeEL.makePromise(of: Void.self) - func writeLoop(_ writer: HTTPClient.Body.StreamWriter, index: Int) { + @Sendable func writeLoop(_ writer: HTTPClient.Body.StreamWriter, index: Int) { // always invoke from the wrong el to test thread safety writeEL.preconditionInEventLoop() @@ -3143,7 +3143,7 @@ final class HTTPClientTests: XCTestCaseHTTPClientTestsBaseClass { } return finalPromise.futureResult - } + }) let request = try! HTTPClient.Request(url: "http://localhost:\(httpBin.port)", body: body) let future = httpClient.execute(request: request, delegate: delegate, eventLoop: .delegate(on: delegateEL)) @@ -3164,10 +3164,10 @@ final class HTTPClientTests: XCTestCaseHTTPClientTestsBaseClass { let httpClient = HTTPClient(eventLoopGroupProvider: .shared(eventLoopGroup)) defer { XCTAssertNoThrow(try httpClient.syncShutdown()) } - let body: HTTPClient.Body = .stream { writer in + let body: HTTPClient.Body = .stream(bodyStream: { writer in let finalPromise = writeEL.makePromise(of: Void.self) - func writeLoop(_ writer: HTTPClient.Body.StreamWriter, index: Int) { + @Sendable func writeLoop(_ writer: HTTPClient.Body.StreamWriter, index: Int) { // always invoke from the wrong el to test thread safety writeEL.preconditionInEventLoop() @@ -3194,7 +3194,7 @@ final class HTTPClientTests: XCTestCaseHTTPClientTestsBaseClass { } return finalPromise.futureResult - } + }) let request = try! HTTPClient.Request(url: "http://localhost:\(httpBin.port)", body: body) let future = httpClient.execute(request: request) @@ -3220,10 +3220,10 @@ final class HTTPClientTests: XCTestCaseHTTPClientTestsBaseClass { let httpClient = HTTPClient(eventLoopGroupProvider: .shared(eventLoopGroup)) defer { XCTAssertNoThrow(try httpClient.syncShutdown()) } - let body: HTTPClient.Body = .stream { writer in + let body: HTTPClient.Body = .stream(bodyStream: { writer in let finalPromise = writeEL.makePromise(of: Void.self) - func writeLoop(_ writer: HTTPClient.Body.StreamWriter, index: Int) { + @Sendable func writeLoop(_ writer: HTTPClient.Body.StreamWriter, index: Int) { // always invoke from the wrong el to test thread safety writeEL.preconditionInEventLoop() @@ -3250,7 +3250,7 @@ final class HTTPClientTests: XCTestCaseHTTPClientTestsBaseClass { } return finalPromise.futureResult - } + }) let headers = HTTPHeaders([("Connection", "close")]) let request = try! HTTPClient.Request(url: "http://localhost:\(httpBin.port)", headers: headers, body: body) diff --git a/Tests/AsyncHTTPClientTests/NoBytesSentOverBodyLimitTests.swift b/Tests/AsyncHTTPClientTests/NoBytesSentOverBodyLimitTests.swift index 41285d5c5..756facb3f 100644 --- a/Tests/AsyncHTTPClientTests/NoBytesSentOverBodyLimitTests.swift +++ b/Tests/AsyncHTTPClientTests/NoBytesSentOverBodyLimitTests.swift @@ -40,7 +40,7 @@ final class NoBytesSentOverBodyLimitTests: XCTestCaseHTTPClientTestsBaseClass { let request = try Request( url: "http://localhost:\(server.serverPort)", - body: .stream(length: 1) { streamWriter in + body: .stream(contentLength: 1) { streamWriter in streamWriter.write(.byteBuffer(ByteBuffer(string: tooLong))) } ) diff --git a/Tests/AsyncHTTPClientTests/RequestBagTests.swift b/Tests/AsyncHTTPClientTests/RequestBagTests.swift index 610e429f5..fa094c1af 100644 --- a/Tests/AsyncHTTPClientTests/RequestBagTests.swift +++ b/Tests/AsyncHTTPClientTests/RequestBagTests.swift @@ -14,6 +14,7 @@ @testable import AsyncHTTPClient import Logging +import NIOConcurrencyHelpers import NIOCore import NIOEmbedded import NIOHTTP1 @@ -26,24 +27,36 @@ final class RequestBagTests: XCTestCase { defer { XCTAssertNoThrow(try embeddedEventLoop.syncShutdownGracefully()) } let logger = Logger(label: "test") - var writtenBytes = 0 - var writes = 0 + struct TestState { + var writtenBytes: Int = 0 + var writes: Int = 0 + var streamIsAllowedToWrite: Bool = false + } + + let testState = NIOLockedValueBox(TestState()) + let bytesToSent = (3000...10000).randomElement()! let expectedWrites = bytesToSent / 100 + ((bytesToSent % 100 > 0) ? 1 : 0) - var streamIsAllowedToWrite = false let writeDonePromise = embeddedEventLoop.makePromise(of: Void.self) - let requestBody: HTTPClient.Body = .stream(length: bytesToSent) { writer -> EventLoopFuture in - func write(donePromise: EventLoopPromise) { - XCTAssertTrue(streamIsAllowedToWrite) - guard writtenBytes < bytesToSent else { - return donePromise.succeed(()) + let requestBody: HTTPClient.Body = .stream(contentLength: Int64(bytesToSent)) { writer -> EventLoopFuture in + @Sendable func write(donePromise: EventLoopPromise) { + let futureWrite: EventLoopFuture? = testState.withLockedValue { state in + XCTAssertTrue(state.streamIsAllowedToWrite) + guard state.writtenBytes < bytesToSent else { + donePromise.succeed(()) + return nil + } + let byteCount = min(bytesToSent - state.writtenBytes, 100) + let buffer = ByteBuffer(bytes: [UInt8](repeating: 1, count: byteCount)) + state.writes += 1 + return writer.write(.byteBuffer(buffer)) } - let byteCount = min(bytesToSent - writtenBytes, 100) - let buffer = ByteBuffer(bytes: [UInt8](repeating: 1, count: byteCount)) - writes += 1 - writer.write(.byteBuffer(buffer)).whenSuccess { _ in - writtenBytes += 100 + + futureWrite?.whenSuccess { _ in + testState.withLockedValue { state in + state.writtenBytes += 100 + } write(donePromise: donePromise) } } @@ -81,9 +94,9 @@ final class RequestBagTests: XCTestCase { executor.runRequest(bag) XCTAssertEqual(delegate.hitDidSendRequestHead, 1) - streamIsAllowedToWrite = true + testState.withLockedValue { $0.streamIsAllowedToWrite = true } bag.resumeRequestBodyStream() - streamIsAllowedToWrite = false + testState.withLockedValue { $0.streamIsAllowedToWrite = false } // after starting the body stream we should have received two writes var receivedBytes = 0 @@ -91,14 +104,14 @@ final class RequestBagTests: XCTestCase { XCTAssertNoThrow(try executor.receiveRequestBody { receivedBytes += $0.readableBytes }) - XCTAssertEqual(delegate.hitDidSendRequestPart, writes) + XCTAssertEqual(delegate.hitDidSendRequestPart, testState.withLockedValue { $0.writes }) if i % 2 == 1 { - streamIsAllowedToWrite = true + testState.withLockedValue { $0.streamIsAllowedToWrite = true } executor.resumeRequestBodyStream() - streamIsAllowedToWrite = false + testState.withLockedValue { $0.streamIsAllowedToWrite = false } XCTAssertLessThanOrEqual(executor.requestBodyPartsCount, 2) - XCTAssertEqual(delegate.hitDidSendRequestPart, writes) + XCTAssertEqual(delegate.hitDidSendRequestPart, testState.withLockedValue { $0.writes }) } } @@ -153,7 +166,7 @@ final class RequestBagTests: XCTestCase { defer { XCTAssertNoThrow(try embeddedEventLoop.syncShutdownGracefully()) } let logger = Logger(label: "test") - let requestBody: HTTPClient.Body = .stream(length: 12) { writer -> EventLoopFuture in + let requestBody: HTTPClient.Body = .stream(contentLength: 12) { writer -> EventLoopFuture in writer.write(.byteBuffer(ByteBuffer(bytes: 0...3))).flatMap { _ -> EventLoopFuture in embeddedEventLoop.makeFailedFuture(TestError()) @@ -530,21 +543,21 @@ final class RequestBagTests: XCTestCase { var maybeRequest: HTTPClient.Request? let writeSecondPartPromise = embeddedEventLoop.makePromise(of: Void.self) + let firstWriteSuccess: NIOLockedValueBox = .init(false) XCTAssertNoThrow(maybeRequest = try HTTPClient.Request( url: "https://swift.org", method: .POST, headers: ["content-length": "12"], - body: .stream(length: 12) { writer -> EventLoopFuture in - var firstWriteSuccess = false + body: .stream(contentLength: 12) { writer -> EventLoopFuture in return writer.write(.byteBuffer(.init(bytes: 0...3))).flatMap { _ in - firstWriteSuccess = true + firstWriteSuccess.withLockedValue { $0 = true } return writeSecondPartPromise.futureResult }.flatMap { return writer.write(.byteBuffer(.init(bytes: 4...7))) }.always { result in - XCTAssertTrue(firstWriteSuccess) + XCTAssertTrue(firstWriteSuccess.withLockedValue { $0 }) guard case .failure(let error) = result else { return XCTFail("Expected the second write to fail") @@ -859,11 +872,11 @@ final class RequestBagTests: XCTestCase { let writerPromise = group.any().makePromise(of: HTTPClient.Body.StreamWriter.self) let donePromise = group.any().makePromise(of: Void.self) - request.body = .stream { [leakDetector] writer in + request.body = .stream(bodyStream: { [leakDetector] writer in _ = leakDetector writerPromise.succeed(writer) return donePromise.futureResult - } + }) let resultFuture = httpClient.execute(request: request) request.body = nil diff --git a/Tests/AsyncHTTPClientTests/TransactionTests.swift b/Tests/AsyncHTTPClientTests/TransactionTests.swift index a8a2bb30e..40f71d010 100644 --- a/Tests/AsyncHTTPClientTests/TransactionTests.swift +++ b/Tests/AsyncHTTPClientTests/TransactionTests.swift @@ -517,7 +517,7 @@ final class TransactionTests: XCTestCase { var request = HTTPClientRequest(url: "https://localhost:\(httpBin.port)/") request.method = .POST request.headers = ["host": "localhost:\(httpBin.port)"] - request.body = .stream(streamWriter, length: .known(800)) + request.body = .stream(streamWriter, length: .known(Int64(800))) var maybePreparedRequest: PreparedRequest? XCTAssertNoThrow(maybePreparedRequest = try PreparedRequest(request))