Skip to content

Commit

Permalink
#3251 Fixed akka HTTP client POST request racing conditions
Browse files Browse the repository at this point in the history
  • Loading branch information
t83714 committed Oct 20, 2021
1 parent 26560ed commit 4d3d223
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 11 deletions.
3 changes: 2 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

## Next

- Upgraded to Open Policy Agent v0.33.0
- #3231 Upgraded to Open Policy Agent v0.33.0
- #3251 Fixed akka HTTP client POST request racing conditions

## 1.1.0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2583,7 +2583,8 @@ abstract class BaseRecordsServiceAuthSpec extends ApiSpec {
.post(
_: String,
_: HttpEntity.Strict,
_: List[HttpHeader]
_: List[HttpHeader],
true
)(
_: ToEntityMarshaller[HttpEntity.Strict]
))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ class AuthApiClient(authHttpFetcher: HttpFetcher)(
.post(
s"/v0/opa/compile",
HttpEntity(ContentTypes.`application/json`, requestData),
headers
headers,
true
)
.flatMap(receiveOpaResponse[List[List[OpaQuery]]](_, policyId) { json =>
OpaParser.parseOpaResponse(json, policyId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,21 @@ import akka.http.scaladsl.model.HttpHeader
trait HttpFetcher {
def get(path: String, headers: Seq[HttpHeader] = Seq()): Future[HttpResponse]

def post[T](path: String, payload: T, headers: Seq[HttpHeader] = Seq())(
def post[T](
path: String,
payload: T,
headers: Seq[HttpHeader] = Seq(),
autoRetryConnection: Boolean = false
)(
implicit m: ToEntityMarshaller[T]
): Future[HttpResponse]

def put[T](path: String, payload: T, headers: Seq[HttpHeader] = Seq())(
def put[T](
path: String,
payload: T,
headers: Seq[HttpHeader] = Seq(),
autoRetryConnection: Boolean = false
)(
implicit m: ToEntityMarshaller[T]
): Future[HttpResponse]
}
Expand Down Expand Up @@ -59,7 +69,12 @@ class HttpFetcherImpl(baseUrl: URL)(
}
}

def post[T](path: String, payload: T, headers: Seq[HttpHeader] = Seq())(
def post[T](
path: String,
payload: T,
headers: Seq[HttpHeader] = Seq(),
autoRetryConnection: Boolean = false
)(
implicit m: ToEntityMarshaller[T]
): Future[HttpResponse] = {
val url = s"${baseUrl.getPath}${path}"
Expand All @@ -72,12 +87,37 @@ class HttpFetcherImpl(baseUrl: URL)(
val request = RequestBuilding
.Post(url, payload)
.withHeaders(scala.collection.immutable.Seq.concat(headers))
Source.single((request, 0)).via(connectionFlow).runWith(Sink.head).map {
case (response, _) => response.get
val result =
Source.single((request, 0)).via(connectionFlow).runWith(Sink.head).map {
case (response, _) => response.get
}

if (!autoRetryConnection) {
result
} else {
result.recoverWith {
case error: Throwable
if error.getMessage.contains(
"The http server closed the connection unexpectedly"
) =>
Source
.single((request, 0))
.via(connectionFlow)
.runWith(Sink.head)
.map {
case (response, _) => response.get
}
}
}

}

def put[T](path: String, payload: T, headers: Seq[HttpHeader] = Seq())(
def put[T](
path: String,
payload: T,
headers: Seq[HttpHeader] = Seq(),
autoRetryConnection: Boolean = false
)(
implicit m: ToEntityMarshaller[T]
): Future[HttpResponse] = {
val url = s"${baseUrl.getPath}${path}"
Expand All @@ -90,8 +130,27 @@ class HttpFetcherImpl(baseUrl: URL)(
val request = RequestBuilding
.Put(url, payload)
.withHeaders(scala.collection.immutable.Seq.concat(headers))
Source.single((request, 0)).via(connectionFlow).runWith(Sink.head).map {
case (response, _) => response.get
val result =
Source.single((request, 0)).via(connectionFlow).runWith(Sink.head).map {
case (response, _) => response.get
}

if (!autoRetryConnection) {
result
} else {
result.recoverWith {
case error: Throwable
if error.getMessage.contains(
"The http server closed the connection unexpectedly"
) =>
Source
.single((request, 0))
.via(connectionFlow)
.runWith(Sink.head)
.map {
case (response, _) => response.get
}
}
}
}

Expand Down

0 comments on commit 4d3d223

Please sign in to comment.