From a86f424160647163b5cab516bc23de189a26ffa8 Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Sat, 17 Jun 2023 10:45:00 +0100 Subject: [PATCH] Remove EventLoop version of waiters --- .../Waiters/AWSClient+Waiter+async.swift | 50 ++++++++++- .../SotoCore/Waiters/AWSClient+Waiter.swift | 88 +++---------------- Tests/SotoCoreTests/WaiterTests.swift | 12 +-- 3 files changed, 65 insertions(+), 85 deletions(-) diff --git a/Sources/SotoCore/Waiters/AWSClient+Waiter+async.swift b/Sources/SotoCore/Waiters/AWSClient+Waiter+async.swift index c1ddbc4d2..14c6085f1 100644 --- a/Sources/SotoCore/Waiters/AWSClient+Waiter+async.swift +++ b/Sources/SotoCore/Waiters/AWSClient+Waiter+async.swift @@ -34,6 +34,54 @@ extension AWSClient { logger: Logger = AWSClient.loggingDisabled, on eventLoop: EventLoop? = nil ) async throws { - return try await self.waitUntil(input, waiter: waiter, maxWaitTime: maxWaitTime, logger: logger, on: eventLoop).get() + let maxWaitTime = maxWaitTime ?? waiter.maxDelayTime + let deadline: NIODeadline = .now() + maxWaitTime + let eventLoop = eventLoop ?? eventLoopGroup.next() + + var attempt = 0 + while true { + attempt += 1 + let result: Result + do { + result = try .success(await waiter.command(input, logger, eventLoop).get()) + } catch { + result = .failure(error) + } + var acceptorState: WaiterState? + for acceptor in waiter.acceptors { + if acceptor.matcher.match(result: result.map { $0 }) { + acceptorState = acceptor.state + break + } + } + // if state has not been set then set it based on return of API call + let waiterState: WaiterState + if let state = acceptorState { + waiterState = state + } else if case .failure = result { + waiterState = .failure + } else { + waiterState = .retry + } + // based on state succeed, fail promise or retry + switch waiterState { + case .success: + return + case .failure: + if case .failure(let error) = result { + throw error + } else { + throw ClientError.waiterFailed + } + case .retry: + let wait = waiter.calculateRetryWaitTime(attempt: attempt, remainingTime: deadline - .now()) + if wait < .seconds(0) { + throw ClientError.waiterTimeout + } else { + logger.trace("Wait \(wait.nanoseconds / 1_000_000)ms") + try await Task.sleep(nanoseconds: UInt64(wait.nanoseconds)) + } + } + } } } diff --git a/Sources/SotoCore/Waiters/AWSClient+Waiter.swift b/Sources/SotoCore/Waiters/AWSClient+Waiter.swift index d849e4a3e..fe252dc61 100644 --- a/Sources/SotoCore/Waiters/AWSClient+Waiter.swift +++ b/Sources/SotoCore/Waiters/AWSClient+Waiter.swift @@ -20,16 +20,16 @@ import NIOCore extension AWSClient { /// Waiter state - public enum WaiterState { + public enum WaiterState: Sendable { case success case retry case failure } /// A waiter is a client side abstraction used to poll a resource until a desired state is reached - public struct Waiter { + public struct Waiter: Sendable { /// An acceptor checks the result of a call and can change the waiter state based on that result - public struct Acceptor { + public struct Acceptor: Sendable { public init(state: AWSClient.WaiterState, matcher: AWSWaiterMatcher) { self.state = state self.matcher = matcher @@ -39,7 +39,7 @@ extension AWSClient { let matcher: AWSWaiterMatcher } - public typealias WaiterCommand = (Input, Logger, EventLoop?) -> EventLoopFuture + public typealias WaiterCommand = @Sendable (Input, Logger, EventLoop?) -> EventLoopFuture /// Initialize an waiter /// - Parameters: @@ -66,7 +66,13 @@ extension AWSClient { /// Calculate delay until next API call. This calculation comes from the AWS Smithy documentation /// https://awslabs.github.io/smithy/1.0/spec/waiters.html#waiter-retries + /// + /// - Parameters: + /// - attempt: Attempt number (assumes this starts at 1) + /// - remainingTime: Remaining time available + /// - Returns: Calculate retry time func calculateRetryWaitTime(attempt: Int, remainingTime: TimeAmount) -> TimeAmount { + assert(attempt >= 1, "Attempt number cannot be less than 1") let minDelay = Double(self.minDelayTime.nanoseconds) / 1_000_000_000 let maxDelay = Double(self.maxDelayTime.nanoseconds) / 1_000_000_000 let attemptCeiling = (log(maxDelay / minDelay) / log(2)) + 1 @@ -85,78 +91,4 @@ extension AWSClient { return timeDelay } } - - /// Returns an `EventLoopFuture` that will by fulfilled once waiter polling returns a success state - /// or returns an error if the polling returns an error or timesout - /// - /// - Parameters: - /// - input: Input parameters - /// - waiter: Waiter to wait on - /// - maxWaitTime: Maximum amount of time to wait - /// - logger: Logger used to provide output - /// - eventLoop: EventLoop to run API calls on - /// - Returns: EventLoopFuture that will be fulfilled once waiter has completed - public func waitUntil( - _ input: Input, - waiter: Waiter, - maxWaitTime: TimeAmount? = nil, - logger: Logger = AWSClient.loggingDisabled, - on eventLoop: EventLoop? = nil - ) -> EventLoopFuture { - let maxWaitTime = maxWaitTime ?? waiter.maxDelayTime - let deadline: NIODeadline = .now() + maxWaitTime - let eventLoop = eventLoop ?? eventLoopGroup.next() - let promise = eventLoop.makePromise(of: Void.self) - - func attempt(number: Int) { - waiter.command(input, logger, eventLoop) - .whenComplete { result in - var acceptorState: WaiterState? - for acceptor in waiter.acceptors { - if acceptor.matcher.match(result: result.map { $0 }) { - acceptorState = acceptor.state - break - } - } - // if state has not been set then set it based on return of API call - let waiterState: WaiterState - if let state = acceptorState { - waiterState = state - } else if case .failure = result { - waiterState = .failure - } else { - waiterState = .retry - } - // based on state succeed, fail promise or retry - switch waiterState { - case .success: - promise.succeed(()) - case .failure: - if case .failure(let error) = result { - promise.fail(error) - } else { - promise.fail(ClientError.waiterFailed) - } - case .retry: - let wait = waiter.calculateRetryWaitTime(attempt: number, remainingTime: deadline - .now()) - if wait < .seconds(0) { - promise.fail(ClientError.waiterTimeout) - } else { - logger.trace("Wait \(wait.nanoseconds / 1_000_000)ms") - eventLoop.scheduleTask(in: wait) { attempt(number: number + 1) } - } - } - } - } - attempt(number: 1) - return promise.futureResult - } } - -extension AWSClient.WaiterState: Sendable {} -extension AWSClient.Waiter.Acceptor: Sendable {} -// I could require the Waiter.command to be Sendable, but it just generates -// pain elsewhere where I have to mark all the API functions to be @Sendable -// which then requires multiple versions of those function if I am going to -// support backwards compatiblity -extension AWSClient.Waiter: @unchecked Sendable {} diff --git a/Tests/SotoCoreTests/WaiterTests.swift b/Tests/SotoCoreTests/WaiterTests.swift index 835ba74e6..c4a12f383 100644 --- a/Tests/SotoCoreTests/WaiterTests.swift +++ b/Tests/SotoCoreTests/WaiterTests.swift @@ -40,7 +40,7 @@ class WaiterTests: XCTestCase { let i: Int } - func operation(input: Input, logger: Logger, eventLoop: EventLoop?) -> EventLoopFuture { + @Sendable func operation(input: Input, logger: Logger, eventLoop: EventLoop?) -> EventLoopFuture { self.client.execute(operation: "Basic", path: "/", httpMethod: .POST, serviceConfig: self.config, input: input, logger: logger, on: eventLoop) } @@ -59,7 +59,7 @@ class WaiterTests: XCTestCase { let array: [Element] } - func arrayOperation(input: Input, logger: Logger, eventLoop: EventLoop?) -> EventLoopFuture { + @Sendable func arrayOperation(input: Input, logger: Logger, eventLoop: EventLoop?) -> EventLoopFuture { self.client.execute(operation: "Basic", path: "/", httpMethod: .POST, serviceConfig: self.config, input: input, logger: logger, on: eventLoop) } @@ -74,7 +74,7 @@ class WaiterTests: XCTestCase { let array: [Element]? } - func optionalArrayOperation(input: Input, logger: Logger, eventLoop: EventLoop?) -> EventLoopFuture { + @Sendable func optionalArrayOperation(input: Input, logger: Logger, eventLoop: EventLoop?) -> EventLoopFuture { self.client.execute(operation: "Basic", path: "/", httpMethod: .POST, serviceConfig: self.config, input: input, logger: logger, on: eventLoop) } @@ -102,7 +102,7 @@ class WaiterTests: XCTestCase { struct StringOutput: AWSDecodableShape & Encodable { let s: String } - func operation(input: Input, logger: Logger, eventLoop: EventLoop?) -> EventLoopFuture { + @Sendable func operation(input: Input, logger: Logger, eventLoop: EventLoop?) -> EventLoopFuture { self.client.execute(operation: "Basic", path: "/", httpMethod: .POST, serviceConfig: self.config, input: input, logger: logger, on: eventLoop) } let waiter = AWSClient.Waiter( @@ -137,7 +137,7 @@ class WaiterTests: XCTestCase { struct EnumOutput: AWSDecodableShape & Encodable { let e: YesNo } - func operation(input: Input, logger: Logger, eventLoop: EventLoop?) -> EventLoopFuture { + @Sendable func operation(input: Input, logger: Logger, eventLoop: EventLoop?) -> EventLoopFuture { self.client.execute(operation: "Basic", path: "/", httpMethod: .POST, serviceConfig: self.config, input: input, logger: logger, on: eventLoop) } let waiter = AWSClient.Waiter( @@ -222,7 +222,7 @@ class WaiterTests: XCTestCase { defer { XCTAssertNoThrow(try awsServer.stop()) } let config = createServiceConfig(serviceProtocol: .restxml, endpoint: awsServer.address) - func arrayOperation(input: Input, logger: Logger, eventLoop: EventLoop?) -> EventLoopFuture { + @Sendable func arrayOperation(input: Input, logger: Logger, eventLoop: EventLoop?) -> EventLoopFuture { self.client.execute(operation: "Basic", path: "/", httpMethod: .POST, serviceConfig: config, input: input, logger: logger, on: eventLoop) }