Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow HTTP protocol selection #226

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ import scala.compat.java8.OptionConverters._
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext}

class AkkaHttpClient(shutdownHandle: () => Unit, connectionSettings: ConnectionPoolSettings)(implicit actorSystem: ActorSystem, ec: ExecutionContext, mat: Materializer) extends SdkAsyncHttpClient {
class AkkaHttpClient(shutdownHandle: () => Unit, connectionSettings: ConnectionPoolSettings, protocol: HttpProtocol)(implicit actorSystem: ActorSystem, ec: ExecutionContext, mat: Materializer) extends SdkAsyncHttpClient {
import AkkaHttpClient._

lazy val runner = new RequestRunner()

override def execute(request: AsyncExecuteRequest): CompletableFuture[Void] = {
val akkaHttpRequest = toAkkaRequest(request.request(), request.requestContentPublisher())
val akkaHttpRequest = toAkkaRequest(protocol, request.request(), request.requestContentPublisher())
runner.run(
() => Http().singleRequest(akkaHttpRequest, settings = connectionSettings),
request.responseHandler()
Expand All @@ -65,15 +65,15 @@ object AkkaHttpClient {

val logger = LoggerFactory.getLogger(this.getClass)

private[akkahttpspi] def toAkkaRequest(request: SdkHttpRequest, contentPublisher: SdkHttpContentPublisher): HttpRequest = {
private[akkahttpspi] def toAkkaRequest(protocol: HttpProtocol, request: SdkHttpRequest, contentPublisher: SdkHttpContentPublisher): HttpRequest = {
val (contentTypeHeader, reqheaders) = convertHeaders(request.headers())
val method = convertMethod(request.method().name())
HttpRequest(
method = method,
uri = Uri(request.getUri.toString),
headers = reqheaders,
entity = entityForMethodAndContentType(method, contentTypeHeaderToContentType(contentTypeHeader), contentPublisher),
protocol = HttpProtocols.`HTTP/1.1`
protocol = protocol
)
}

Expand Down Expand Up @@ -139,7 +139,9 @@ object AkkaHttpClient {

case class AkkaHttpClientBuilder(private val actorSystem: Option[ActorSystem] = None,
private val executionContext: Option[ExecutionContext] = None,
private val connectionPoolSettings: Option[ConnectionPoolSettings] = None) extends SdkAsyncHttpClient.Builder[AkkaHttpClientBuilder] {
private val connectionPoolSettings: Option[ConnectionPoolSettings] = None,
private val protocol: HttpProtocol = HttpProtocols.`HTTP/1.1`
) extends SdkAsyncHttpClient.Builder[AkkaHttpClientBuilder] {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems with your latest refactoring here, the whole attributeMap code is accidently gone.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can also use a new withProtocol(HttpProtocol) builder method to set the protocol?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems with your latest refactoring here, the whole attributeMap code is accidently gone.

the removal of the whole attributeMap code was on purpose. let's explore that in #232

Maybe we can also use a new withProtocol(HttpProtocol) builder method to set the protocol?

yes I can do that

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems with your latest refactoring here, the whole attributeMap code is accidently gone.

the removal of the whole attributeMap code was on purpose. let's explore that in #232

ok, but now I realize the README and tests don't make sense 🤦

def buildWithDefaults(attributeMap: AttributeMap): SdkAsyncHttpClient = {
implicit val as = actorSystem.getOrElse(ActorSystem("aws-akka-http"))
implicit val ec = executionContext.getOrElse(as.dispatcher)
Expand All @@ -152,12 +154,13 @@ object AkkaHttpClient {
}
()
}
new AkkaHttpClient(shutdownhandleF, cps)(as, ec, mat)
new AkkaHttpClient(shutdownhandleF, cps, protocol)(as, ec, mat)
}
def withActorSystem(actorSystem: ActorSystem): AkkaHttpClientBuilder = copy(actorSystem = Some(actorSystem))
def withActorSystem(actorSystem: ClassicActorSystemProvider): AkkaHttpClientBuilder = copy(actorSystem = Some(actorSystem.classicSystem))
def withExecutionContext(executionContext: ExecutionContext): AkkaHttpClientBuilder = copy(executionContext = Some(executionContext))
def withConnectionPoolSettings(connectionPoolSettings: ConnectionPoolSettings): AkkaHttpClientBuilder = copy(connectionPoolSettings = Some(connectionPoolSettings))
def withProtocol(protocol: HttpProtocol): AkkaHttpClientBuilder = copy(protocol = protocol)
}

lazy val xAmzJson = ContentType(MediaType.customBinary("application", "x-amz-json-1.0", Compressible))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.github.matsluni.akkahttpspi.dynamodb

import akka.http.scaladsl.model.HttpProtocols
import com.github.matsluni.akkahttpspi.AkkaHttpClient.AkkaHttpClientBuilder
import com.github.matsluni.akkahttpspi.{AkkaHttpAsyncHttpService, LocalstackBaseAwsClientTest}
import software.amazon.awssdk.auth.credentials.{AwsBasicCredentials, StaticCredentialsProvider}
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient
Expand All @@ -25,7 +27,7 @@ import scala.jdk.CollectionConverters._

class TestDynamoDB extends LocalstackBaseAwsClientTest[DynamoDbAsyncClient] {
"DynamoDB" should {
"create a table" in withClient { implicit client =>
"create a table" in withClient() { implicit client =>
val attributes = AttributeDefinition.builder.attributeName("film_id").attributeType(ScalarAttributeType.S).build()
val keySchema = KeySchemaElement.builder.attributeName("film_id").keyType(KeyType.HASH).build()

Expand All @@ -47,11 +49,15 @@ class TestDynamoDB extends LocalstackBaseAwsClientTest[DynamoDbAsyncClient] {
tableResult.tableNames().asScala should have size (1)
}

"work with HTTP/2" in withClient(_.withProtocol(HttpProtocols.`HTTP/2.0`)) { implicit client =>
val result = client.listTables().join()
result.tableNames() should not be null
}
}

def withClient(testCode: DynamoDbAsyncClient => Any): Any = {
private def withClient(builderFn: AkkaHttpClientBuilder => AkkaHttpClientBuilder = identity)(testCode: DynamoDbAsyncClient => Any): Any = {

val akkaClient = new AkkaHttpAsyncHttpService().createAsyncHttpClientFactory().build()
val akkaClient = builderFn(new AkkaHttpAsyncHttpService().createAsyncHttpClientFactory()).build()

val client = DynamoDbAsyncClient
.builder()
Expand Down
23 changes: 17 additions & 6 deletions src/test/scala/com/github/matsluni/akkahttpspi/s3/TestS3.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@
package com.github.matsluni.akkahttpspi.s3

import java.io.{File, FileWriter}
import java.util.concurrent.CompletionException

import akka.http.scaladsl.model.HttpProtocols
import com.dimafeng.testcontainers.GenericContainer
import com.github.matsluni.akkahttpspi.AkkaHttpClient.AkkaHttpClientBuilder
import com.github.matsluni.akkahttpspi.testcontainers.TimeoutWaitStrategy
import com.github.matsluni.akkahttpspi.{AkkaHttpAsyncHttpService, BaseAwsClientTest}
import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider
Expand All @@ -34,14 +37,14 @@ import scala.util.Random
class TestS3 extends BaseAwsClientTest[S3AsyncClient] {

"Async S3 client" should {
"create bucket" in withClient { implicit client =>
"create bucket" in withClient() { implicit client =>
val bucketName = createBucket()
val buckets = client.listBuckets().join
buckets.buckets() should have size (1)
buckets.buckets().asScala.toList.head.name() should be(bucketName)
}

"upload and download a file to a bucket" in withClient { implicit client =>
"upload and download a file to a bucket" in withClient() { implicit client =>
val bucketName = createBucket()
val fileContent = 0 to 1000 mkString

Expand All @@ -55,7 +58,7 @@ class TestS3 extends BaseAwsClientTest[S3AsyncClient] {
result.response().contentLength() shouldEqual fileContent.getBytes().length
}

"multipart upload" in withClient { implicit client =>
"multipart upload" in withClient() { implicit client =>
val bucketName = createBucket()
val randomFile = File.createTempFile("aws1", Random.alphanumeric.take(5).mkString)
val fileContent = (0 to 1000000).mkString
Expand Down Expand Up @@ -85,6 +88,14 @@ class TestS3 extends BaseAwsClientTest[S3AsyncClient] {
result.asUtf8String() should be(fileContent + fileContent)
}

//adobe/s3mock does not support HTTP/2
"work with HTTP/2" in withClient(_.withProtocol(HttpProtocols.`HTTP/2.0`)) { implicit client =>
the[CompletionException] thrownBy {
val result = client.listBuckets().join()
result.buckets() should not be null
} should have message "software.amazon.awssdk.services.s3.model.S3Exception: null (Service: S3, Status Code: 426, Request ID: null)"
}

}

def createBucket()(implicit client: S3AsyncClient): String = {
Expand All @@ -93,9 +104,9 @@ class TestS3 extends BaseAwsClientTest[S3AsyncClient] {
bucketName
}

private def withClient(testCode: S3AsyncClient => Any): Any = {
private def withClient(builderFn: AkkaHttpClientBuilder => AkkaHttpClientBuilder = identity)(testCode: S3AsyncClient => Any): Any = {

val akkaClient = new AkkaHttpAsyncHttpService().createAsyncHttpClientFactory().build()
val akkaClient = builderFn(new AkkaHttpAsyncHttpService().createAsyncHttpClientFactory()).build()

val client = S3AsyncClient
.builder()
Expand Down Expand Up @@ -126,7 +137,7 @@ class TestS3 extends BaseAwsClientTest[S3AsyncClient] {
private lazy val containerInstance = new GenericContainer(
dockerImage = "adobe/s3mock:2.13.0",
exposedPorts = Seq(exposedServicePort),
waitStrategy = Some(TimeoutWaitStrategy(10 seconds))
waitStrategy = Some(TimeoutWaitStrategy(15 seconds))
)
override val container: GenericContainer = containerInstance
}
13 changes: 10 additions & 3 deletions src/test/scala/com/github/matsluni/akkahttpspi/sns/TestSNS.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.github.matsluni.akkahttpspi.sns

import akka.http.scaladsl.model.HttpProtocols
import com.github.matsluni.akkahttpspi.AkkaHttpClient.AkkaHttpClientBuilder
import com.github.matsluni.akkahttpspi.{AkkaHttpAsyncHttpService, LocalstackBaseAwsClientTest}
import software.amazon.awssdk.auth.credentials.{AwsBasicCredentials, StaticCredentialsProvider}
import software.amazon.awssdk.services.sns.SnsAsyncClient
Expand All @@ -24,17 +26,22 @@ import software.amazon.awssdk.services.sns.model.{CreateTopicRequest, PublishReq
class TestSNS extends LocalstackBaseAwsClientTest[SnsAsyncClient] {

"Async SNS client" should {
"publish a message to a topic" in withClient { implicit client =>
"publish a message to a topic" in withClient() { implicit client =>
val arn = client.createTopic(CreateTopicRequest.builder().name("topic-example").build()).join().topicArn()
val result = client.publish(PublishRequest.builder().message("a message").topicArn(arn).build()).join()

result.messageId() should not be null
}

"work with HTTP/2" in withClient(_.withProtocol(HttpProtocols.`HTTP/2.0`)) { implicit client =>
val result = client.listTopics().join()
result.topics() should not be null
}
}

def withClient(testCode: SnsAsyncClient => Any): Any = {
private def withClient(builderFn: AkkaHttpClientBuilder => AkkaHttpClientBuilder = identity)(testCode: SnsAsyncClient => Any): Any = {

val akkaClient = new AkkaHttpAsyncHttpService().createAsyncHttpClientFactory().build()
val akkaClient = builderFn(new AkkaHttpAsyncHttpService().createAsyncHttpClientFactory()).build()

val client = SnsAsyncClient
.builder()
Expand Down
20 changes: 16 additions & 4 deletions src/test/scala/com/github/matsluni/akkahttpspi/sqs/TestSQS.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,27 @@

package com.github.matsluni.akkahttpspi.sqs

import akka.http.scaladsl.model.HttpProtocols
import com.github.matsluni.akkahttpspi.AkkaHttpClient.AkkaHttpClientBuilder
import com.github.matsluni.akkahttpspi.{AkkaHttpAsyncHttpService, ElasticMQSQSBaseAwsClientTest}
import software.amazon.awssdk.auth.credentials.{AwsBasicCredentials, StaticCredentialsProvider}
import software.amazon.awssdk.services.sqs.SqsAsyncClient
import software.amazon.awssdk.services.sqs.model._

import java.util.concurrent.CompletionException

// switched to use ElasticMQ container instead of Localstack due to https://github.com/localstack/localstack/issues/8545
class TestSQS extends ElasticMQSQSBaseAwsClientTest[SqsAsyncClient] {
"Async SQS client" should {

"publish a message to a queue" in withClient { implicit client =>
"publish a message to a queue" in withClient() { implicit client =>
client.createQueue(CreateQueueRequest.builder().queueName("foo").build()).join()
client.sendMessage(SendMessageRequest.builder().queueUrl(s"$endpoint/queue/foo").messageBody("123").build()).join()
val receivedMessage = client.receiveMessage(ReceiveMessageRequest.builder().queueUrl(s"$endpoint/queue/foo").maxNumberOfMessages(1).build()).join()
receivedMessage.messages().get(0).body() should be("123")
}

"delete a message" in withClient { implicit client =>
"delete a message" in withClient() { implicit client =>
client.createQueue(CreateQueueRequest.builder().queueName("foo").build()).join()
client.sendMessage(SendMessageRequest.builder().queueUrl(s"$endpoint/queue/foo").messageBody("123").build()).join()

Expand All @@ -44,11 +48,19 @@ class TestSQS extends ElasticMQSQSBaseAwsClientTest[SqsAsyncClient] {
receivedMessage.messages() shouldBe java.util.Collections.EMPTY_LIST
}

//softwaremill/elasticmq-native does not support HTTP/2
"work with HTTP/2" in withClient(_.withProtocol(HttpProtocols.`HTTP/2.0`)) { implicit client =>
the [CompletionException] thrownBy {
val result = client.listQueues().join()
result.queueUrls() should not be null
} should have message "software.amazon.awssdk.services.sqs.model.SqsException: null (Service: Sqs, Status Code: 505, Request ID: null)"
}

}

def withClient(testCode: SqsAsyncClient => Any): Any = {
private def withClient(builderFn: AkkaHttpClientBuilder => AkkaHttpClientBuilder = identity)(testCode: SqsAsyncClient => Any): Any = {

val akkaClient = new AkkaHttpAsyncHttpService().createAsyncHttpClientFactory().build()
val akkaClient = builderFn(new AkkaHttpAsyncHttpService().createAsyncHttpClientFactory()).build()

val client = SqsAsyncClient
.builder()
Expand Down