diff --git a/src/main/scala/com/github/matsluni/akkahttpspi/AkkaHttpClient.scala b/src/main/scala/com/github/matsluni/akkahttpspi/AkkaHttpClient.scala index 8275892..1e189e2 100644 --- a/src/main/scala/com/github/matsluni/akkahttpspi/AkkaHttpClient.scala +++ b/src/main/scala/com/github/matsluni/akkahttpspi/AkkaHttpClient.scala @@ -41,13 +41,13 @@ import scala.compat.java8.OptionConverters._ import scala.concurrent.duration.Duration import scala.concurrent.{Await, ExecutionContext} -class AkkaHttpClient(shutdownHandle: () => Unit, connectionSettings: ConnectionPoolSettings)(implicit actorSystem: ActorSystem, ec: ExecutionContext, mat: Materializer) extends SdkAsyncHttpClient { +class AkkaHttpClient(shutdownHandle: () => Unit, connectionSettings: ConnectionPoolSettings, protocol: HttpProtocol)(implicit actorSystem: ActorSystem, ec: ExecutionContext, mat: Materializer) extends SdkAsyncHttpClient { import AkkaHttpClient._ lazy val runner = new RequestRunner() override def execute(request: AsyncExecuteRequest): CompletableFuture[Void] = { - val akkaHttpRequest = toAkkaRequest(request.request(), request.requestContentPublisher()) + val akkaHttpRequest = toAkkaRequest(protocol, request.request(), request.requestContentPublisher()) runner.run( () => Http().singleRequest(akkaHttpRequest, settings = connectionSettings), request.responseHandler() @@ -65,7 +65,7 @@ object AkkaHttpClient { val logger = LoggerFactory.getLogger(this.getClass) - private[akkahttpspi] def toAkkaRequest(request: SdkHttpRequest, contentPublisher: SdkHttpContentPublisher): HttpRequest = { + private[akkahttpspi] def toAkkaRequest(protocol: HttpProtocol, request: SdkHttpRequest, contentPublisher: SdkHttpContentPublisher): HttpRequest = { val (contentTypeHeader, reqheaders) = convertHeaders(request.headers()) val method = convertMethod(request.method().name()) HttpRequest( @@ -73,7 +73,7 @@ object AkkaHttpClient { uri = Uri(request.getUri.toString), headers = reqheaders, entity = entityForMethodAndContentType(method, contentTypeHeaderToContentType(contentTypeHeader), contentPublisher), - protocol = HttpProtocols.`HTTP/1.1` + protocol = protocol ) } @@ -139,7 +139,9 @@ object AkkaHttpClient { case class AkkaHttpClientBuilder(private val actorSystem: Option[ActorSystem] = None, private val executionContext: Option[ExecutionContext] = None, - private val connectionPoolSettings: Option[ConnectionPoolSettings] = None) extends SdkAsyncHttpClient.Builder[AkkaHttpClientBuilder] { + private val connectionPoolSettings: Option[ConnectionPoolSettings] = None, + private val protocol: HttpProtocol = HttpProtocols.`HTTP/1.1` + ) extends SdkAsyncHttpClient.Builder[AkkaHttpClientBuilder] { def buildWithDefaults(attributeMap: AttributeMap): SdkAsyncHttpClient = { implicit val as = actorSystem.getOrElse(ActorSystem("aws-akka-http")) implicit val ec = executionContext.getOrElse(as.dispatcher) @@ -152,12 +154,13 @@ object AkkaHttpClient { } () } - new AkkaHttpClient(shutdownhandleF, cps)(as, ec, mat) + new AkkaHttpClient(shutdownhandleF, cps, protocol)(as, ec, mat) } def withActorSystem(actorSystem: ActorSystem): AkkaHttpClientBuilder = copy(actorSystem = Some(actorSystem)) def withActorSystem(actorSystem: ClassicActorSystemProvider): AkkaHttpClientBuilder = copy(actorSystem = Some(actorSystem.classicSystem)) def withExecutionContext(executionContext: ExecutionContext): AkkaHttpClientBuilder = copy(executionContext = Some(executionContext)) def withConnectionPoolSettings(connectionPoolSettings: ConnectionPoolSettings): AkkaHttpClientBuilder = copy(connectionPoolSettings = Some(connectionPoolSettings)) + def withProtocol(protocol: HttpProtocol): AkkaHttpClientBuilder = copy(protocol = protocol) } lazy val xAmzJson = ContentType(MediaType.customBinary("application", "x-amz-json-1.0", Compressible)) diff --git a/src/test/scala/com/github/matsluni/akkahttpspi/dynamodb/TestDynamoDB.scala b/src/test/scala/com/github/matsluni/akkahttpspi/dynamodb/TestDynamoDB.scala index 93d3324..95d4a03 100644 --- a/src/test/scala/com/github/matsluni/akkahttpspi/dynamodb/TestDynamoDB.scala +++ b/src/test/scala/com/github/matsluni/akkahttpspi/dynamodb/TestDynamoDB.scala @@ -16,6 +16,8 @@ package com.github.matsluni.akkahttpspi.dynamodb +import akka.http.scaladsl.model.HttpProtocols +import com.github.matsluni.akkahttpspi.AkkaHttpClient.AkkaHttpClientBuilder import com.github.matsluni.akkahttpspi.{AkkaHttpAsyncHttpService, LocalstackBaseAwsClientTest} import software.amazon.awssdk.auth.credentials.{AwsBasicCredentials, StaticCredentialsProvider} import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient @@ -25,7 +27,7 @@ import scala.jdk.CollectionConverters._ class TestDynamoDB extends LocalstackBaseAwsClientTest[DynamoDbAsyncClient] { "DynamoDB" should { - "create a table" in withClient { implicit client => + "create a table" in withClient() { implicit client => val attributes = AttributeDefinition.builder.attributeName("film_id").attributeType(ScalarAttributeType.S).build() val keySchema = KeySchemaElement.builder.attributeName("film_id").keyType(KeyType.HASH).build() @@ -47,11 +49,15 @@ class TestDynamoDB extends LocalstackBaseAwsClientTest[DynamoDbAsyncClient] { tableResult.tableNames().asScala should have size (1) } + "work with HTTP/2" in withClient(_.withProtocol(HttpProtocols.`HTTP/2.0`)) { implicit client => + val result = client.listTables().join() + result.tableNames() should not be null + } } - def withClient(testCode: DynamoDbAsyncClient => Any): Any = { + private def withClient(builderFn: AkkaHttpClientBuilder => AkkaHttpClientBuilder = identity)(testCode: DynamoDbAsyncClient => Any): Any = { - val akkaClient = new AkkaHttpAsyncHttpService().createAsyncHttpClientFactory().build() + val akkaClient = builderFn(new AkkaHttpAsyncHttpService().createAsyncHttpClientFactory()).build() val client = DynamoDbAsyncClient .builder() diff --git a/src/test/scala/com/github/matsluni/akkahttpspi/s3/TestS3.scala b/src/test/scala/com/github/matsluni/akkahttpspi/s3/TestS3.scala index 26df999..86858d5 100644 --- a/src/test/scala/com/github/matsluni/akkahttpspi/s3/TestS3.scala +++ b/src/test/scala/com/github/matsluni/akkahttpspi/s3/TestS3.scala @@ -17,8 +17,11 @@ package com.github.matsluni.akkahttpspi.s3 import java.io.{File, FileWriter} +import java.util.concurrent.CompletionException +import akka.http.scaladsl.model.HttpProtocols import com.dimafeng.testcontainers.GenericContainer +import com.github.matsluni.akkahttpspi.AkkaHttpClient.AkkaHttpClientBuilder import com.github.matsluni.akkahttpspi.testcontainers.TimeoutWaitStrategy import com.github.matsluni.akkahttpspi.{AkkaHttpAsyncHttpService, BaseAwsClientTest} import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider @@ -34,14 +37,14 @@ import scala.util.Random class TestS3 extends BaseAwsClientTest[S3AsyncClient] { "Async S3 client" should { - "create bucket" in withClient { implicit client => + "create bucket" in withClient() { implicit client => val bucketName = createBucket() val buckets = client.listBuckets().join buckets.buckets() should have size (1) buckets.buckets().asScala.toList.head.name() should be(bucketName) } - "upload and download a file to a bucket" in withClient { implicit client => + "upload and download a file to a bucket" in withClient() { implicit client => val bucketName = createBucket() val fileContent = 0 to 1000 mkString @@ -55,7 +58,7 @@ class TestS3 extends BaseAwsClientTest[S3AsyncClient] { result.response().contentLength() shouldEqual fileContent.getBytes().length } - "multipart upload" in withClient { implicit client => + "multipart upload" in withClient() { implicit client => val bucketName = createBucket() val randomFile = File.createTempFile("aws1", Random.alphanumeric.take(5).mkString) val fileContent = (0 to 1000000).mkString @@ -85,6 +88,14 @@ class TestS3 extends BaseAwsClientTest[S3AsyncClient] { result.asUtf8String() should be(fileContent + fileContent) } + //adobe/s3mock does not support HTTP/2 + "work with HTTP/2" in withClient(_.withProtocol(HttpProtocols.`HTTP/2.0`)) { implicit client => + the[CompletionException] thrownBy { + val result = client.listBuckets().join() + result.buckets() should not be null + } should have message "software.amazon.awssdk.services.s3.model.S3Exception: null (Service: S3, Status Code: 426, Request ID: null)" + } + } def createBucket()(implicit client: S3AsyncClient): String = { @@ -93,9 +104,9 @@ class TestS3 extends BaseAwsClientTest[S3AsyncClient] { bucketName } - private def withClient(testCode: S3AsyncClient => Any): Any = { + private def withClient(builderFn: AkkaHttpClientBuilder => AkkaHttpClientBuilder = identity)(testCode: S3AsyncClient => Any): Any = { - val akkaClient = new AkkaHttpAsyncHttpService().createAsyncHttpClientFactory().build() + val akkaClient = builderFn(new AkkaHttpAsyncHttpService().createAsyncHttpClientFactory()).build() val client = S3AsyncClient .builder() @@ -126,7 +137,7 @@ class TestS3 extends BaseAwsClientTest[S3AsyncClient] { private lazy val containerInstance = new GenericContainer( dockerImage = "adobe/s3mock:2.13.0", exposedPorts = Seq(exposedServicePort), - waitStrategy = Some(TimeoutWaitStrategy(10 seconds)) + waitStrategy = Some(TimeoutWaitStrategy(15 seconds)) ) override val container: GenericContainer = containerInstance } diff --git a/src/test/scala/com/github/matsluni/akkahttpspi/sns/TestSNS.scala b/src/test/scala/com/github/matsluni/akkahttpspi/sns/TestSNS.scala index f45fb36..68d561b 100644 --- a/src/test/scala/com/github/matsluni/akkahttpspi/sns/TestSNS.scala +++ b/src/test/scala/com/github/matsluni/akkahttpspi/sns/TestSNS.scala @@ -16,6 +16,8 @@ package com.github.matsluni.akkahttpspi.sns +import akka.http.scaladsl.model.HttpProtocols +import com.github.matsluni.akkahttpspi.AkkaHttpClient.AkkaHttpClientBuilder import com.github.matsluni.akkahttpspi.{AkkaHttpAsyncHttpService, LocalstackBaseAwsClientTest} import software.amazon.awssdk.auth.credentials.{AwsBasicCredentials, StaticCredentialsProvider} import software.amazon.awssdk.services.sns.SnsAsyncClient @@ -24,17 +26,22 @@ import software.amazon.awssdk.services.sns.model.{CreateTopicRequest, PublishReq class TestSNS extends LocalstackBaseAwsClientTest[SnsAsyncClient] { "Async SNS client" should { - "publish a message to a topic" in withClient { implicit client => + "publish a message to a topic" in withClient() { implicit client => val arn = client.createTopic(CreateTopicRequest.builder().name("topic-example").build()).join().topicArn() val result = client.publish(PublishRequest.builder().message("a message").topicArn(arn).build()).join() result.messageId() should not be null } + + "work with HTTP/2" in withClient(_.withProtocol(HttpProtocols.`HTTP/2.0`)) { implicit client => + val result = client.listTopics().join() + result.topics() should not be null + } } - def withClient(testCode: SnsAsyncClient => Any): Any = { + private def withClient(builderFn: AkkaHttpClientBuilder => AkkaHttpClientBuilder = identity)(testCode: SnsAsyncClient => Any): Any = { - val akkaClient = new AkkaHttpAsyncHttpService().createAsyncHttpClientFactory().build() + val akkaClient = builderFn(new AkkaHttpAsyncHttpService().createAsyncHttpClientFactory()).build() val client = SnsAsyncClient .builder() diff --git a/src/test/scala/com/github/matsluni/akkahttpspi/sqs/TestSQS.scala b/src/test/scala/com/github/matsluni/akkahttpspi/sqs/TestSQS.scala index 8a01faa..da1e128 100644 --- a/src/test/scala/com/github/matsluni/akkahttpspi/sqs/TestSQS.scala +++ b/src/test/scala/com/github/matsluni/akkahttpspi/sqs/TestSQS.scala @@ -16,23 +16,27 @@ package com.github.matsluni.akkahttpspi.sqs +import akka.http.scaladsl.model.HttpProtocols +import com.github.matsluni.akkahttpspi.AkkaHttpClient.AkkaHttpClientBuilder import com.github.matsluni.akkahttpspi.{AkkaHttpAsyncHttpService, ElasticMQSQSBaseAwsClientTest} import software.amazon.awssdk.auth.credentials.{AwsBasicCredentials, StaticCredentialsProvider} import software.amazon.awssdk.services.sqs.SqsAsyncClient import software.amazon.awssdk.services.sqs.model._ +import java.util.concurrent.CompletionException + // switched to use ElasticMQ container instead of Localstack due to https://github.com/localstack/localstack/issues/8545 class TestSQS extends ElasticMQSQSBaseAwsClientTest[SqsAsyncClient] { "Async SQS client" should { - "publish a message to a queue" in withClient { implicit client => + "publish a message to a queue" in withClient() { implicit client => client.createQueue(CreateQueueRequest.builder().queueName("foo").build()).join() client.sendMessage(SendMessageRequest.builder().queueUrl(s"$endpoint/queue/foo").messageBody("123").build()).join() val receivedMessage = client.receiveMessage(ReceiveMessageRequest.builder().queueUrl(s"$endpoint/queue/foo").maxNumberOfMessages(1).build()).join() receivedMessage.messages().get(0).body() should be("123") } - "delete a message" in withClient { implicit client => + "delete a message" in withClient() { implicit client => client.createQueue(CreateQueueRequest.builder().queueName("foo").build()).join() client.sendMessage(SendMessageRequest.builder().queueUrl(s"$endpoint/queue/foo").messageBody("123").build()).join() @@ -44,11 +48,19 @@ class TestSQS extends ElasticMQSQSBaseAwsClientTest[SqsAsyncClient] { receivedMessage.messages() shouldBe java.util.Collections.EMPTY_LIST } + //softwaremill/elasticmq-native does not support HTTP/2 + "work with HTTP/2" in withClient(_.withProtocol(HttpProtocols.`HTTP/2.0`)) { implicit client => + the [CompletionException] thrownBy { + val result = client.listQueues().join() + result.queueUrls() should not be null + } should have message "software.amazon.awssdk.services.sqs.model.SqsException: null (Service: Sqs, Status Code: 505, Request ID: null)" + } + } - def withClient(testCode: SqsAsyncClient => Any): Any = { + private def withClient(builderFn: AkkaHttpClientBuilder => AkkaHttpClientBuilder = identity)(testCode: SqsAsyncClient => Any): Any = { - val akkaClient = new AkkaHttpAsyncHttpService().createAsyncHttpClientFactory().build() + val akkaClient = builderFn(new AkkaHttpAsyncHttpService().createAsyncHttpClientFactory()).build() val client = SqsAsyncClient .builder()