From af3e1c14ea71492d0729d0450d05838bd44d4542 Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Sat, 4 Dec 2021 17:09:24 +0100 Subject: [PATCH 1/2] Add ControlPlaneRequestEncoder --- .../ControlPlaneRequestEncoder.swift | 126 ++++++++++++ .../ControlPlaneRequestEncoderTests.swift | 179 ++++++++++++++++++ 2 files changed, 305 insertions(+) create mode 100644 Sources/AWSLambdaRuntimeCore/ControlPlaneRequestEncoder.swift create mode 100644 Tests/AWSLambdaRuntimeCoreTests/ControlPlaneRequestEncoderTests.swift diff --git a/Sources/AWSLambdaRuntimeCore/ControlPlaneRequestEncoder.swift b/Sources/AWSLambdaRuntimeCore/ControlPlaneRequestEncoder.swift new file mode 100644 index 00000000..4e33d746 --- /dev/null +++ b/Sources/AWSLambdaRuntimeCore/ControlPlaneRequestEncoder.swift @@ -0,0 +1,126 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftAWSLambdaRuntime open source project +// +// Copyright (c) 2021 Apple Inc. and the SwiftAWSLambdaRuntime project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import NIOCore + +struct ControlPlaneRequestEncoder: _EmittingChannelHandler { + typealias OutboundOut = ByteBuffer + + private var host: String + private var byteBuffer: ByteBuffer! + + init(host: String) { + self.host = host + } + + mutating func writeRequest(_ request: ControlPlaneRequest, context: ChannelHandlerContext, promise: EventLoopPromise?) { + self.byteBuffer.clear(minimumCapacity: self.byteBuffer.storageCapacity) + + switch request { + case .next: + self.byteBuffer.writeString(.nextInvocationRequestLine) + self.byteBuffer.writeHostHeader(host: self.host) + self.byteBuffer.writeString(.userAgentHeader) + self.byteBuffer.writeString(.CRLF) // end of head + context.write(self.wrapOutboundOut(self.byteBuffer), promise: promise) + context.flush() + + case .invocationResponse(let requestID, let payload): + let contentLength = payload?.readableBytes ?? 0 + self.byteBuffer.writeInvocationResultRequestLine(requestID) + self.byteBuffer.writeHostHeader(host: self.host) + self.byteBuffer.writeString(.userAgentHeader) + self.byteBuffer.writeContentLengthHeader(length: contentLength) + self.byteBuffer.writeString(.CRLF) // end of head + if contentLength > 0 { + context.write(self.wrapOutboundOut(self.byteBuffer), promise: nil) + context.write(self.wrapOutboundOut(payload!), promise: promise) + } else { + context.write(self.wrapOutboundOut(self.byteBuffer), promise: promise) + } + context.flush() + + case .invocationError(let requestID, let errorMessage): + let payload = errorMessage.toJSONBytes() + self.byteBuffer.writeInvocationErrorRequestLine(requestID) + self.byteBuffer.writeContentLengthHeader(length: payload.count) + self.byteBuffer.writeHostHeader(host: self.host) + self.byteBuffer.writeString(.userAgentHeader) + self.byteBuffer.writeString(.unhandledErrorHeader) + self.byteBuffer.writeString(.CRLF) // end of head + self.byteBuffer.writeBytes(payload) + context.write(self.wrapOutboundOut(self.byteBuffer), promise: promise) + context.flush() + + case .initializationError(let errorMessage): + let payload = errorMessage.toJSONBytes() + self.byteBuffer.writeString(.runtimeInitErrorRequestLine) + self.byteBuffer.writeContentLengthHeader(length: payload.count) + self.byteBuffer.writeHostHeader(host: self.host) + self.byteBuffer.writeString(.userAgentHeader) + self.byteBuffer.writeString(.unhandledErrorHeader) + self.byteBuffer.writeString(.CRLF) // end of head + self.byteBuffer.writeBytes(payload) + context.write(self.wrapOutboundOut(self.byteBuffer), promise: promise) + context.flush() + } + } + + mutating func writerAdded(context: ChannelHandlerContext) { + self.byteBuffer = context.channel.allocator.buffer(capacity: 256) + } + + mutating func writerRemoved(context: ChannelHandlerContext) { + self.byteBuffer = nil + } +} + +extension String { + static let CRLF: String = "\r\n" + + static let userAgentHeader: String = "user-agent: Swift-Lambda/Unknown\r\n" + static let unhandledErrorHeader: String = "lambda-runtime-function-error-type: Unhandled\r\n" + + static let nextInvocationRequestLine: String = + "GET /2018-06-01/runtime/invocation/next HTTP/1.1\r\n" + + static let runtimeInitErrorRequestLine: String = + "POST /2018-06-01/runtime/init/error HTTP/1.1\r\n" +} + +extension ByteBuffer { + fileprivate mutating func writeInvocationResultRequestLine(_ requestID: String) { + self.writeString("POST /2018-06-01/runtime/invocation/") + self.writeString(requestID) + self.writeString("/response HTTP/1.1\r\n") + } + + fileprivate mutating func writeInvocationErrorRequestLine(_ requestID: String) { + self.writeString("POST /2018-06-01/runtime/invocation/") + self.writeString(requestID) + self.writeString("/error HTTP/1.1\r\n") + } + + fileprivate mutating func writeHostHeader(host: String) { + self.writeString("host: ") + self.writeString(host) + self.writeString(.CRLF) + } + + fileprivate mutating func writeContentLengthHeader(length: Int) { + self.writeString("content-length: ") + self.writeString("\(length)") + self.writeString(.CRLF) + } +} diff --git a/Tests/AWSLambdaRuntimeCoreTests/ControlPlaneRequestEncoderTests.swift b/Tests/AWSLambdaRuntimeCoreTests/ControlPlaneRequestEncoderTests.swift new file mode 100644 index 00000000..ac6c0838 --- /dev/null +++ b/Tests/AWSLambdaRuntimeCoreTests/ControlPlaneRequestEncoderTests.swift @@ -0,0 +1,179 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftAWSLambdaRuntime open source project +// +// Copyright (c) 2021 Apple Inc. and the SwiftAWSLambdaRuntime project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +@testable import AWSLambdaRuntimeCore +import NIOCore +import NIOEmbedded +import NIOHTTP1 +import XCTest + +final class ControlPlaneRequestEncoderTests: XCTestCase { + let host = "192.168.0.1" + + var client: EmbeddedChannel! + var server: EmbeddedChannel! + + override func setUp() { + self.client = EmbeddedChannel(handler: ControlPlaneRequestEncoderHandler(host: self.host)) + self.server = EmbeddedChannel(handlers: [ + ByteToMessageHandler(HTTPRequestDecoder(leftOverBytesStrategy: .dropBytes)), + NIOHTTPServerRequestAggregator(maxContentLength: 1024 * 1024), + ]) + } + + override func tearDown() { + XCTAssertNoThrow(try self.client.finish(acceptAlreadyClosed: false)) + XCTAssertNoThrow(try self.server.finish(acceptAlreadyClosed: false)) + self.client = nil + self.server = nil + } + + func testNextRequest() { + var request: NIOHTTPServerRequestFull? + XCTAssertNoThrow(request = try self.sendRequest(.next)) + + XCTAssertEqual(request?.head.isKeepAlive, true) + XCTAssertEqual(request?.head.method, .GET) + XCTAssertEqual(request?.head.uri, "/2018-06-01/runtime/invocation/next") + XCTAssertEqual(request?.head.version, .http1_1) + XCTAssertEqual(request?.head.headers["host"], [self.host]) + XCTAssertEqual(request?.head.headers["user-agent"], ["Swift-Lambda/Unknown"]) + + XCTAssertNil(try self.server.readInbound(as: NIOHTTPServerRequestFull.self)) + } + + func testPostInvocationSuccessWithoutBody() { + let requestID = UUID().uuidString + var request: NIOHTTPServerRequestFull? + XCTAssertNoThrow(request = try self.sendRequest(.invocationResponse(requestID, nil))) + + XCTAssertEqual(request?.head.isKeepAlive, true) + XCTAssertEqual(request?.head.method, .POST) + XCTAssertEqual(request?.head.uri, "/2018-06-01/runtime/invocation/\(requestID)/response") + XCTAssertEqual(request?.head.version, .http1_1) + XCTAssertEqual(request?.head.headers["host"], [self.host]) + XCTAssertEqual(request?.head.headers["user-agent"], ["Swift-Lambda/Unknown"]) + XCTAssertEqual(request?.head.headers["content-length"], ["0"]) + + XCTAssertNil(try self.server.readInbound(as: NIOHTTPServerRequestFull.self)) + } + + func testPostInvocationSuccessWithBody() { + let requestID = UUID().uuidString + let payload = ByteBuffer(string: "hello swift lambda!") + + var request: NIOHTTPServerRequestFull? + XCTAssertNoThrow(request = try self.sendRequest(.invocationResponse(requestID, payload))) + + XCTAssertEqual(request?.head.isKeepAlive, true) + XCTAssertEqual(request?.head.method, .POST) + XCTAssertEqual(request?.head.uri, "/2018-06-01/runtime/invocation/\(requestID)/response") + XCTAssertEqual(request?.head.version, .http1_1) + XCTAssertEqual(request?.head.headers["host"], [self.host]) + XCTAssertEqual(request?.head.headers["user-agent"], ["Swift-Lambda/Unknown"]) + XCTAssertEqual(request?.head.headers["content-length"], ["\(payload.readableBytes)"]) + XCTAssertEqual(request?.body, payload) + + XCTAssertNil(try self.server.readInbound(as: NIOHTTPServerRequestFull.self)) + } + + func testPostInvocationErrorWithBody() { + let requestID = UUID().uuidString + let error = ErrorResponse(errorType: "SomeError", errorMessage: "An error happened") + var request: NIOHTTPServerRequestFull? + XCTAssertNoThrow(request = try self.sendRequest(.invocationError(requestID, error))) + + XCTAssertEqual(request?.head.isKeepAlive, true) + XCTAssertEqual(request?.head.method, .POST) + XCTAssertEqual(request?.head.uri, "/2018-06-01/runtime/invocation/\(requestID)/error") + XCTAssertEqual(request?.head.version, .http1_1) + XCTAssertEqual(request?.head.headers["host"], [self.host]) + XCTAssertEqual(request?.head.headers["user-agent"], ["Swift-Lambda/Unknown"]) + XCTAssertEqual(request?.head.headers["lambda-runtime-function-error-type"], ["Unhandled"]) + let expectedBody = #"{"errorType":"SomeError","errorMessage":"An error happened"}"# + + XCTAssertEqual(request?.head.headers["content-length"], ["\(expectedBody.utf8.count)"]) + XCTAssertEqual(try request?.body?.getString(at: 0, length: XCTUnwrap(request?.body?.readableBytes)), + expectedBody) + + XCTAssertNil(try self.server.readInbound(as: NIOHTTPServerRequestFull.self)) + } + + func testPostStartupError() { + let error = ErrorResponse(errorType: "StartupError", errorMessage: "Urgh! Startup failed. 😨") + var request: NIOHTTPServerRequestFull? + XCTAssertNoThrow(request = try self.sendRequest(.initializationError(error))) + + XCTAssertEqual(request?.head.isKeepAlive, true) + XCTAssertEqual(request?.head.method, .POST) + XCTAssertEqual(request?.head.uri, "/2018-06-01/runtime/init/error") + XCTAssertEqual(request?.head.version, .http1_1) + XCTAssertEqual(request?.head.headers["host"], [self.host]) + XCTAssertEqual(request?.head.headers["user-agent"], ["Swift-Lambda/Unknown"]) + XCTAssertEqual(request?.head.headers["lambda-runtime-function-error-type"], ["Unhandled"]) + let expectedBody = #"{"errorType":"StartupError","errorMessage":"Urgh! Startup failed. 😨"}"# + XCTAssertEqual(request?.head.headers["content-length"], ["\(expectedBody.utf8.count)"]) + XCTAssertEqual(try request?.body?.getString(at: 0, length: XCTUnwrap(request?.body?.readableBytes)), + expectedBody) + + XCTAssertNil(try self.server.readInbound(as: NIOHTTPServerRequestFull.self)) + } + + func testMultipleNextAndResponseSuccessRequests() { + for _ in 0 ..< 1000 { + var nextRequest: NIOHTTPServerRequestFull? + XCTAssertNoThrow(nextRequest = try self.sendRequest(.next)) + XCTAssertEqual(nextRequest?.head.method, .GET) + XCTAssertEqual(nextRequest?.head.uri, "/2018-06-01/runtime/invocation/next") + + let requestID = UUID().uuidString + let payload = ByteBuffer(string: "hello swift lambda!") + var successRequest: NIOHTTPServerRequestFull? + XCTAssertNoThrow(successRequest = try self.sendRequest(.invocationResponse(requestID, payload))) + XCTAssertEqual(successRequest?.head.method, .POST) + XCTAssertEqual(successRequest?.head.uri, "/2018-06-01/runtime/invocation/\(requestID)/response") + } + } + + func sendRequest(_ request: ControlPlaneRequest) throws -> NIOHTTPServerRequestFull? { + try self.client.writeOutbound(request) + while let part = try self.client.readOutbound(as: ByteBuffer.self) { + XCTAssertNoThrow(try self.server.writeInbound(part)) + } + return try self.server.readInbound(as: NIOHTTPServerRequestFull.self) + } +} + +private final class ControlPlaneRequestEncoderHandler: ChannelOutboundHandler { + typealias OutboundIn = ControlPlaneRequest + typealias OutboundOut = ByteBuffer + + private var encoder: ControlPlaneRequestEncoder + + init(host: String) { + self.encoder = ControlPlaneRequestEncoder(host: host) + } + + func handlerAdded(context: ChannelHandlerContext) { + self.encoder.writerAdded(context: context) + } + + func handlerRemoved(context: ChannelHandlerContext) { + self.encoder.writerRemoved(context: context) + } + + func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise?) { + self.encoder.writeRequest(self.unwrapOutboundIn(data), context: context, promise: promise) + } +} From 379ba887ff3df4805eb58508c0a56c080955b0b1 Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Sat, 11 Dec 2021 12:34:19 +0100 Subject: [PATCH 2/2] Code review. --- Sources/AWSLambdaRuntimeCore/ControlPlaneRequestEncoder.swift | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Sources/AWSLambdaRuntimeCore/ControlPlaneRequestEncoder.swift b/Sources/AWSLambdaRuntimeCore/ControlPlaneRequestEncoder.swift index 4e33d746..a91e1e44 100644 --- a/Sources/AWSLambdaRuntimeCore/ControlPlaneRequestEncoder.swift +++ b/Sources/AWSLambdaRuntimeCore/ControlPlaneRequestEncoder.swift @@ -43,9 +43,9 @@ struct ControlPlaneRequestEncoder: _EmittingChannelHandler { self.byteBuffer.writeString(.userAgentHeader) self.byteBuffer.writeContentLengthHeader(length: contentLength) self.byteBuffer.writeString(.CRLF) // end of head - if contentLength > 0 { + if let payload = payload, contentLength > 0 { context.write(self.wrapOutboundOut(self.byteBuffer), promise: nil) - context.write(self.wrapOutboundOut(payload!), promise: promise) + context.write(self.wrapOutboundOut(payload), promise: promise) } else { context.write(self.wrapOutboundOut(self.byteBuffer), promise: promise) }