Skip to content

Commit

Permalink
Remove EventLoop version of waiters
Browse files Browse the repository at this point in the history
  • Loading branch information
adam-fowler committed Jun 17, 2023
1 parent 1919b32 commit a86f424
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 85 deletions.
50 changes: 49 additions & 1 deletion Sources/SotoCore/Waiters/AWSClient+Waiter+async.swift
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,54 @@ extension AWSClient {
logger: Logger = AWSClient.loggingDisabled,
on eventLoop: EventLoop? = nil
) async throws {
return try await self.waitUntil(input, waiter: waiter, maxWaitTime: maxWaitTime, logger: logger, on: eventLoop).get()
let maxWaitTime = maxWaitTime ?? waiter.maxDelayTime
let deadline: NIODeadline = .now() + maxWaitTime
let eventLoop = eventLoop ?? eventLoopGroup.next()

var attempt = 0
while true {
attempt += 1
let result: Result<Output, Error>
do {
result = try .success(await waiter.command(input, logger, eventLoop).get())
} catch {
result = .failure(error)
}
var acceptorState: WaiterState?
for acceptor in waiter.acceptors {
if acceptor.matcher.match(result: result.map { $0 }) {
acceptorState = acceptor.state
break
}
}
// if state has not been set then set it based on return of API call
let waiterState: WaiterState
if let state = acceptorState {
waiterState = state
} else if case .failure = result {
waiterState = .failure

Check warning on line 62 in Sources/SotoCore/Waiters/AWSClient+Waiter+async.swift

View check run for this annotation

Codecov / codecov/patch

Sources/SotoCore/Waiters/AWSClient+Waiter+async.swift#L62

Added line #L62 was not covered by tests
} else {
waiterState = .retry
}
// based on state succeed, fail promise or retry
switch waiterState {
case .success:
return
case .failure:
if case .failure(let error) = result {
throw error
} else {
throw ClientError.waiterFailed

Check warning on line 74 in Sources/SotoCore/Waiters/AWSClient+Waiter+async.swift

View check run for this annotation

Codecov / codecov/patch

Sources/SotoCore/Waiters/AWSClient+Waiter+async.swift#L71-L74

Added lines #L71 - L74 were not covered by tests
}
case .retry:
let wait = waiter.calculateRetryWaitTime(attempt: attempt, remainingTime: deadline - .now())
if wait < .seconds(0) {
throw ClientError.waiterTimeout
} else {
logger.trace("Wait \(wait.nanoseconds / 1_000_000)ms")
try await Task.sleep(nanoseconds: UInt64(wait.nanoseconds))
}
}
}
}
}
88 changes: 10 additions & 78 deletions Sources/SotoCore/Waiters/AWSClient+Waiter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,16 @@ import NIOCore

extension AWSClient {
/// Waiter state
public enum WaiterState {
public enum WaiterState: Sendable {
case success
case retry
case failure
}

/// A waiter is a client side abstraction used to poll a resource until a desired state is reached
public struct Waiter<Input: Sendable, Output: Sendable> {
public struct Waiter<Input: Sendable, Output: Sendable>: Sendable {
/// An acceptor checks the result of a call and can change the waiter state based on that result
public struct Acceptor {
public struct Acceptor: Sendable {
public init(state: AWSClient.WaiterState, matcher: AWSWaiterMatcher) {
self.state = state
self.matcher = matcher
Expand All @@ -39,7 +39,7 @@ extension AWSClient {
let matcher: AWSWaiterMatcher
}

public typealias WaiterCommand = (Input, Logger, EventLoop?) -> EventLoopFuture<Output>
public typealias WaiterCommand = @Sendable (Input, Logger, EventLoop?) -> EventLoopFuture<Output>

/// Initialize an waiter
/// - Parameters:
Expand All @@ -66,7 +66,13 @@ extension AWSClient {

/// Calculate delay until next API call. This calculation comes from the AWS Smithy documentation
/// https://awslabs.github.io/smithy/1.0/spec/waiters.html#waiter-retries
///
/// - Parameters:
/// - attempt: Attempt number (assumes this starts at 1)
/// - remainingTime: Remaining time available
/// - Returns: Calculate retry time
func calculateRetryWaitTime(attempt: Int, remainingTime: TimeAmount) -> TimeAmount {
assert(attempt >= 1, "Attempt number cannot be less than 1")
let minDelay = Double(self.minDelayTime.nanoseconds) / 1_000_000_000
let maxDelay = Double(self.maxDelayTime.nanoseconds) / 1_000_000_000
let attemptCeiling = (log(maxDelay / minDelay) / log(2)) + 1
Expand All @@ -85,78 +91,4 @@ extension AWSClient {
return timeDelay
}
}

/// Returns an `EventLoopFuture` that will by fulfilled once waiter polling returns a success state
/// or returns an error if the polling returns an error or timesout
///
/// - Parameters:
/// - input: Input parameters
/// - waiter: Waiter to wait on
/// - maxWaitTime: Maximum amount of time to wait
/// - logger: Logger used to provide output
/// - eventLoop: EventLoop to run API calls on
/// - Returns: EventLoopFuture that will be fulfilled once waiter has completed
public func waitUntil<Input, Output>(
_ input: Input,
waiter: Waiter<Input, Output>,
maxWaitTime: TimeAmount? = nil,
logger: Logger = AWSClient.loggingDisabled,
on eventLoop: EventLoop? = nil
) -> EventLoopFuture<Void> {
let maxWaitTime = maxWaitTime ?? waiter.maxDelayTime
let deadline: NIODeadline = .now() + maxWaitTime
let eventLoop = eventLoop ?? eventLoopGroup.next()
let promise = eventLoop.makePromise(of: Void.self)

func attempt(number: Int) {
waiter.command(input, logger, eventLoop)
.whenComplete { result in
var acceptorState: WaiterState?
for acceptor in waiter.acceptors {
if acceptor.matcher.match(result: result.map { $0 }) {
acceptorState = acceptor.state
break
}
}
// if state has not been set then set it based on return of API call
let waiterState: WaiterState
if let state = acceptorState {
waiterState = state
} else if case .failure = result {
waiterState = .failure
} else {
waiterState = .retry
}
// based on state succeed, fail promise or retry
switch waiterState {
case .success:
promise.succeed(())
case .failure:
if case .failure(let error) = result {
promise.fail(error)
} else {
promise.fail(ClientError.waiterFailed)
}
case .retry:
let wait = waiter.calculateRetryWaitTime(attempt: number, remainingTime: deadline - .now())
if wait < .seconds(0) {
promise.fail(ClientError.waiterTimeout)
} else {
logger.trace("Wait \(wait.nanoseconds / 1_000_000)ms")
eventLoop.scheduleTask(in: wait) { attempt(number: number + 1) }
}
}
}
}
attempt(number: 1)
return promise.futureResult
}
}

extension AWSClient.WaiterState: Sendable {}
extension AWSClient.Waiter.Acceptor: Sendable {}
// I could require the Waiter.command to be Sendable, but it just generates
// pain elsewhere where I have to mark all the API functions to be @Sendable
// which then requires multiple versions of those function if I am going to
// support backwards compatiblity
extension AWSClient.Waiter: @unchecked Sendable {}
12 changes: 6 additions & 6 deletions Tests/SotoCoreTests/WaiterTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class WaiterTests: XCTestCase {
let i: Int
}

func operation(input: Input, logger: Logger, eventLoop: EventLoop?) -> EventLoopFuture<Output> {
@Sendable func operation(input: Input, logger: Logger, eventLoop: EventLoop?) -> EventLoopFuture<Output> {
self.client.execute(operation: "Basic", path: "/", httpMethod: .POST, serviceConfig: self.config, input: input, logger: logger, on: eventLoop)
}

Expand All @@ -59,7 +59,7 @@ class WaiterTests: XCTestCase {
let array: [Element]
}

func arrayOperation(input: Input, logger: Logger, eventLoop: EventLoop?) -> EventLoopFuture<ArrayOutput> {
@Sendable func arrayOperation(input: Input, logger: Logger, eventLoop: EventLoop?) -> EventLoopFuture<ArrayOutput> {
self.client.execute(operation: "Basic", path: "/", httpMethod: .POST, serviceConfig: self.config, input: input, logger: logger, on: eventLoop)
}

Expand All @@ -74,7 +74,7 @@ class WaiterTests: XCTestCase {
let array: [Element]?
}

func optionalArrayOperation(input: Input, logger: Logger, eventLoop: EventLoop?) -> EventLoopFuture<OptionalArrayOutput> {
@Sendable func optionalArrayOperation(input: Input, logger: Logger, eventLoop: EventLoop?) -> EventLoopFuture<OptionalArrayOutput> {
self.client.execute(operation: "Basic", path: "/", httpMethod: .POST, serviceConfig: self.config, input: input, logger: logger, on: eventLoop)
}

Expand Down Expand Up @@ -102,7 +102,7 @@ class WaiterTests: XCTestCase {
struct StringOutput: AWSDecodableShape & Encodable {
let s: String
}
func operation(input: Input, logger: Logger, eventLoop: EventLoop?) -> EventLoopFuture<StringOutput> {
@Sendable func operation(input: Input, logger: Logger, eventLoop: EventLoop?) -> EventLoopFuture<StringOutput> {
self.client.execute(operation: "Basic", path: "/", httpMethod: .POST, serviceConfig: self.config, input: input, logger: logger, on: eventLoop)
}
let waiter = AWSClient.Waiter(
Expand Down Expand Up @@ -137,7 +137,7 @@ class WaiterTests: XCTestCase {
struct EnumOutput: AWSDecodableShape & Encodable {
let e: YesNo
}
func operation(input: Input, logger: Logger, eventLoop: EventLoop?) -> EventLoopFuture<EnumOutput> {
@Sendable func operation(input: Input, logger: Logger, eventLoop: EventLoop?) -> EventLoopFuture<EnumOutput> {
self.client.execute(operation: "Basic", path: "/", httpMethod: .POST, serviceConfig: self.config, input: input, logger: logger, on: eventLoop)
}
let waiter = AWSClient.Waiter(
Expand Down Expand Up @@ -222,7 +222,7 @@ class WaiterTests: XCTestCase {
defer { XCTAssertNoThrow(try awsServer.stop()) }
let config = createServiceConfig(serviceProtocol: .restxml, endpoint: awsServer.address)

func arrayOperation(input: Input, logger: Logger, eventLoop: EventLoop?) -> EventLoopFuture<ArrayOutput> {
@Sendable func arrayOperation(input: Input, logger: Logger, eventLoop: EventLoop?) -> EventLoopFuture<ArrayOutput> {
self.client.execute(operation: "Basic", path: "/", httpMethod: .POST, serviceConfig: config, input: input, logger: logger, on: eventLoop)
}

Expand Down

0 comments on commit a86f424

Please sign in to comment.