Skip to content

Commit

Permalink
Merge pull request #3265 from magda-io/cherry-pick-3251
Browse files Browse the repository at this point in the history
Cherry-pick PR for issue #3251 from next branch into v1.1.0
  • Loading branch information
t83714 authored Nov 14, 2021
2 parents 13086bf + 7a3c4c8 commit 67b1052
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 16 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
- #3263 Build Multi-Arch (`linux/amd64` & `linux/arm64`) Docker Images in CI (Except `magda-postgres` & `magda-elasticsearch`)
- Related to #3263, Build Multi-Arch (`linux/amd64` & `linux/arm64`) Docker Image for `magda-elasticsearch` as well.
- Related to #3263, adjusted helm chart for elasticsearch to make it run properly on linux/arm64 platform.
- #3251 Fixed akka HTTP client POST request racing conditions

## 1.0.0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,16 @@ class RegisterWebhookSpec extends BaseRegistryApiSpec with SprayJsonSupport {

// Expect the new hook to be posted
(param.authFetcher
.post(_: String, _: WebHook, _: Seq[HttpHeader])(
.post(_: String, _: WebHook, _: Seq[HttpHeader], _: Boolean)(
_: ToEntityMarshaller[WebHook]
))
.expects(s"/v0/hooks", *, *, *)
.expects(s"/v0/hooks", *, *, *, *)
.onCall(
(
url: String,
webhook: WebHook,
headers: Seq[HttpHeader],
autoRetry: Boolean,
marshaller: ToEntityMarshaller[WebHook]
) => {
// Forward the req to the registry api
Expand Down Expand Up @@ -113,15 +114,16 @@ class RegisterWebhookSpec extends BaseRegistryApiSpec with SprayJsonSupport {

// Expect the hook to update itself
(param.authFetcher
.put(_: String, _: WebHook, _: Seq[HttpHeader])(
.put(_: String, _: WebHook, _: Seq[HttpHeader], _: Boolean)(
_: ToEntityMarshaller[WebHook]
))
.expects(s"/v0/hooks/$webhookId", *, *, *)
.expects(s"/v0/hooks/$webhookId", *, *, *, *)
.onCall(
(
url: String,
webhook: WebHook,
headers: Seq[HttpHeader],
autoRetry: Boolean,
marshaller: ToEntityMarshaller[WebHook]
) => {
// Forward the req to the registry api
Expand All @@ -143,15 +145,21 @@ class RegisterWebhookSpec extends BaseRegistryApiSpec with SprayJsonSupport {

// Expect an ACK call once the indexer has determined that the webhook already exists
(param.authFetcher
.post(_: String, _: WebHookAcknowledgement, _: Seq[HttpHeader])(
.post(
_: String,
_: WebHookAcknowledgement,
_: Seq[HttpHeader],
_: Boolean
)(
_: ToEntityMarshaller[WebHookAcknowledgement]
))
.expects(s"/v0/hooks/$webhookId/ack", *, *, *)
.expects(s"/v0/hooks/$webhookId/ack", *, *, *, *)
.onCall(
(
url: String,
webhookAck: WebHookAcknowledgement,
headers: Seq[HttpHeader],
autoRetry: Boolean,
marshaller: ToEntityMarshaller[WebHookAcknowledgement]
) => {
// Don't forward this to the registry, just check the info passed is right.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2583,7 +2583,8 @@ abstract class BaseRecordsServiceAuthSpec extends ApiSpec {
.post(
_: String,
_: HttpEntity.Strict,
_: List[HttpHeader]
_: List[HttpHeader],
_: Boolean
)(
_: ToEntityMarshaller[HttpEntity.Strict]
))
Expand All @@ -2597,6 +2598,7 @@ abstract class BaseRecordsServiceAuthSpec extends ApiSpec {
|}""".stripMargin
),
*,
true,
*
)
.returning(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ class AuthApiClient(authHttpFetcher: HttpFetcher)(
.post(
s"/v0/opa/compile",
HttpEntity(ContentTypes.`application/json`, requestData),
headers
headers,
true
)
.flatMap(receiveOpaResponse[List[List[OpaQuery]]](_, policyId) { json =>
OpaParser.parseOpaResponse(json, policyId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,21 @@ import akka.http.scaladsl.model.HttpHeader
trait HttpFetcher {
def get(path: String, headers: Seq[HttpHeader] = Seq()): Future[HttpResponse]

def post[T](path: String, payload: T, headers: Seq[HttpHeader] = Seq())(
def post[T](
path: String,
payload: T,
headers: Seq[HttpHeader] = Seq(),
autoRetryConnection: Boolean = false
)(
implicit m: ToEntityMarshaller[T]
): Future[HttpResponse]

def put[T](path: String, payload: T, headers: Seq[HttpHeader] = Seq())(
def put[T](
path: String,
payload: T,
headers: Seq[HttpHeader] = Seq(),
autoRetryConnection: Boolean = false
)(
implicit m: ToEntityMarshaller[T]
): Future[HttpResponse]
}
Expand Down Expand Up @@ -59,7 +69,12 @@ class HttpFetcherImpl(baseUrl: URL)(
}
}

def post[T](path: String, payload: T, headers: Seq[HttpHeader] = Seq())(
def post[T](
path: String,
payload: T,
headers: Seq[HttpHeader] = Seq(),
autoRetryConnection: Boolean = false
)(
implicit m: ToEntityMarshaller[T]
): Future[HttpResponse] = {
val url = s"${baseUrl.getPath}${path}"
Expand All @@ -72,12 +87,41 @@ class HttpFetcherImpl(baseUrl: URL)(
val request = RequestBuilding
.Post(url, payload)
.withHeaders(scala.collection.immutable.Seq.concat(headers))
Source.single((request, 0)).via(connectionFlow).runWith(Sink.head).map {
case (response, _) => response.get
val result =
Source.single((request, 0)).via(connectionFlow).runWith(Sink.head).map {
case (response, _) => response.get
}

if (!autoRetryConnection) {
result
} else {
// Retry the request for racing condition
// https://github.com/magda-io/magda/issues/3251
result.recoverWith {
case error: Throwable
// We have to do this as UnexpectedConnectionClosureException is a private class
// https://github.com/akka/akka-http/issues/768
if error.getMessage.contains(
"The http server closed the connection unexpectedly"
) =>
Source
.single((request, 0))
.via(connectionFlow)
.runWith(Sink.head)
.map {
case (response, _) => response.get
}
}
}

}

def put[T](path: String, payload: T, headers: Seq[HttpHeader] = Seq())(
def put[T](
path: String,
payload: T,
headers: Seq[HttpHeader] = Seq(),
autoRetryConnection: Boolean = false
)(
implicit m: ToEntityMarshaller[T]
): Future[HttpResponse] = {
val url = s"${baseUrl.getPath}${path}"
Expand All @@ -90,8 +134,31 @@ class HttpFetcherImpl(baseUrl: URL)(
val request = RequestBuilding
.Put(url, payload)
.withHeaders(scala.collection.immutable.Seq.concat(headers))
Source.single((request, 0)).via(connectionFlow).runWith(Sink.head).map {
case (response, _) => response.get
val result =
Source.single((request, 0)).via(connectionFlow).runWith(Sink.head).map {
case (response, _) => response.get
}

if (!autoRetryConnection) {
result
} else {
// Retry the request for racing condition
// https://github.com/magda-io/magda/issues/3251
result.recoverWith {
case error: Throwable
// We have to do this as UnexpectedConnectionClosureException is a private class
// https://github.com/akka/akka-http/issues/768
if error.getMessage.contains(
"The http server closed the connection unexpectedly"
) =>
Source
.single((request, 0))
.via(connectionFlow)
.runWith(Sink.head)
.map {
case (response, _) => response.get
}
}
}
}

Expand Down

0 comments on commit 67b1052

Please sign in to comment.