-
Notifications
You must be signed in to change notification settings - Fork 596
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Provide an example for how to implement Http long polling #63
Comments
Isn't long polling simply like: val events = Source.repeat(pollRequest).via(Http().outgoingConnection(url)) I.e. continuously feed requests into the Http connection, returning a response once there is one. |
Somewhat, but plus reconnecting Hah, yeah in that sense yeah. I keep thinking of the more complicated cases somehow, will dump thoughts here in a bit (what we did in ConductR was more than just that) |
Then use |
The URI of each next request depends on the response to the previous request. An example API call:
Here, The docs strongly state that using the shared pool for long-polling requests is not encouraged. AFAIK |
A Spray implementation as a reference for what I'm trying to achieve using streams: package rfs.blaze.gateway.config
package consul
import scala.concurrent.duration._
import scala.util._
import akka.actor._
import akka.io.IO
import spray.can._
import spray.can.client._
import spray.http._
import spray.httpx.RequestBuilding._
import spray.json._
object Consul {
def startWatching(endpoint: ConsulEndpoint, sender: ActorRef)(implicit actorRefFactory: ActorRefFactory): ActorRef = {
val encodedUri = endpoint.baseUri.path.toString.replaceAllLiterally("/", "-")
val name = s"consul-watcher${encodedUri}"
val watcher = actorRefFactory.actorOf(Props[ConsulEndpointWatcher], name)
watcher.tell(ConsulEndpointWatcher.StartWatching(endpoint), sender)
watcher
}
}
object ConsulEndpointWatcher {
case class StartWatching(endpoint: ConsulEndpoint)
case object Reconnect
case class ConsulValueChanged(value: AnyRef)
case object InvalidConsulResponse
case class ConsulEndpointUnreachable(endpoint: ConsulEndpoint)
}
/**
*
*/
class ConsulEndpointWatcher extends Actor with ActorLogging {
import ConsulEndpointWatcher._
import context.dispatcher
import Http._
def receive = notConnected
def notConnected: Receive = {
case StartWatching(endpoint) ⇒ {
context.become(connecting(sender(), endpoint, None, None))
connect(endpoint)
}
}
def connecting(controller: ActorRef, endpoint: ConsulEndpoint, previousIndex: Option[Long], previousValue: Option[AnyRef]): Receive = {
case _: Connected ⇒ {
log.debug("Connected")
val connection = sender()
context.watch(connection)
context.become(watching(controller, connection, endpoint, previousIndex, previousValue))
watch(connection, endpoint, previousIndex)
}
case CommandFailed(Connect(address, _, _, _, _)) ⇒ {
log.error(s"Failed to connect using address ${address}")
controller ! ConsulEndpointUnreachable(endpoint)
// TODO: is reconnect always the correct strategy?
// TODO: implement a backoff strategy
// shouldn't we leave this to the supervision hierarchy?
context.system.scheduler.scheduleOnce(1 second, self, Reconnect)
}
case Reconnect ⇒ {
log.debug("Connecting...")
connect(endpoint)
}
case Terminated(actor) ⇒ log.debug(s"Received Terminated event in 'connecting' state. Deceased: $actor")
}
def newIndex(headers: Seq[HttpHeader]) =
headers.find(_.is("x-consul-index")).map(_.value.toLong)
def consulIndexAlreadySeen(previousIndex: Option[Long], headers: Seq[HttpHeader]) =
previousIndex.isDefined && previousIndex == newIndex(headers)
def watching(controller: ActorRef, connection: ActorRef, endpoint: ConsulEndpoint, previousIndex: Option[Long], previousValue: Option[AnyRef]): Receive = {
case HttpResponse(_, _, headers, _) if consulIndexAlreadySeen(previousIndex, headers) ⇒
log.debug(s"x-consul-index ${newIndex(headers)} already seen")
watch(connection, endpoint, newIndex(headers))
// TODO: deal with other non-200 status codes?
case HttpResponse(status: StatusCodes.ServerError, entity, _, _) ⇒ {
log.warning(s"Received status $status from consul: $entity")
watch(connection, endpoint, previousIndex, delay = 1 second)
}
case HttpResponse(status, entity, headers, _) ⇒ {
readResponseValue(entity, endpoint.reader) match {
case Success(newValue) ⇒ {
if (previousValue.forall(_ != newValue)) {
log.debug(s"Received new value $newValue on endpoint $endpoint")
controller ! ConsulValueChanged(newValue)
context.become(watching(controller, connection, endpoint, newIndex(headers), Some(newValue)))
} else {
log.debug(s"Received unchanged value $newValue on endpoint $endpoint")
}
watch(connection, endpoint, newIndex(headers))
}
case Failure(error) ⇒ {
log.error(s"Error while reading response value. Error: ${error.getMessage}, status code: ${status}")
controller ! InvalidConsulResponse
watch(connection, endpoint, newIndex(headers), delay = 1 second)
}
}
}
case event @ (SendFailed(_) | Timedout(_) | Aborted | Closed | PeerClosed | ErrorClosed(_) | Terminated(_)) ⇒ {
log.debug(s"Disconnected from $endpoint due to $event, reconnecting")
context.become(connecting(controller, endpoint, previousIndex, previousValue))
context.system.scheduler.scheduleOnce(1 second, self, Reconnect)
}
case Reconnect ⇒ log.warning("Received a Reconnect event in 'watching' state.")
}
// ==========================================================================
// Implementation details
// ==========================================================================
/** Overridable for test purposes */
private[consul] def connect(endpoint: ConsulEndpoint): Unit = {
IO(Http)(context.system) ! Connect(
host = endpoint.host,
port = endpoint.port,
settings = Some(ClientConnectionSettings(context.system).copy(
idleTimeout = endpoint.watchTimeout + endpoint.clientTimeoutBuffer,
requestTimeout = endpoint.watchTimeout + endpoint.clientTimeoutBuffer
))
)
}
private def watch(connection: ActorRef, endpoint: ConsulEndpoint, previousIndex: Option[Long], delay: FiniteDuration = 0 seconds): Unit = {
context.system.scheduler.scheduleOnce(delay, connection, requestFor(endpoint, previousIndex))
}
private def requestFor(endpoint: ConsulEndpoint, previousIndex: Option[Long]): HttpRequest = {
Get(endpoint.uri(previousIndex)) ~> addHeader(HttpHeaders.Connection("Keep-Alive"))
}
private def readResponseValue(entity: HttpEntity, reader: ConsulResponseReader): Try[AnyRef] = {
entity.toOption.map { data ⇒
Try(reader.read(JsonParser(data.asString)))
}.getOrElse(Failure(new RuntimeException("Empty http entity!")))
}
} |
I guess the host based connection pool can be used then: Source.repeat(pollRequest).via(Http().cachedHostConnectionPool(...)) This will do reconnects AFAIK at the pool level, and since this is a dedicated pool it won't interfere with the rest of the system. The final solution might be a bit more complicated and need a Graph cycle if you need to limit the poll count bounded. For example (pseudocode): Graph.create {
Source.single(pollRequest) ~> merge ~> cachedConnectionPool ~> bcast ~> out
merge <~ map(_ => pollRequest) <~ bcast
} will keep the number of outstanding polls exactly one (as there is one poll circulating in the loop). Of course above needs error handling. |
Thanks. That looks like something that might work. I'll go figure it out and report back. That DSL sure takes some time to get used to 😄 |
Finally got some time to experiment with this again. An update on my progress so far:
Here's what I have so far. All feedback/comments/suggestions would be very welcome. object AkkaHttpLongPolling {
def longPollingSource(host: String, port: Int, uri: Uri, maxWait: Duration)(implicit s: ActorSystem, fm: Materializer): Source[HttpResponse, NotUsed] = {
import GraphDSL.Implicits._
import s.dispatcher
val connectionPoolSettings = ConnectionPoolSettings(s)
Source.fromGraph(GraphDSL.create() { implicit b ⇒
val init = Source.single(createRequest(uri, maxWait, None))
val merge = b.add(Merge[HttpRequest](2))
val broadcast = b.add(Broadcast[(Try[HttpResponse], HttpRequest)](2))
val tupler = b.add(Flow[HttpRequest].map(r ⇒ (r, r)))
val http = b.add(Http().cachedHostConnectionPool[HttpRequest](host, port, connectionPoolSettings).mapMaterializedValue(_ ⇒ NotUsed))
val outbound = b.add(
Flow[(Try[HttpResponse], HttpRequest)]
.collect { case (Success(response), _) ⇒ response }
.mapAsync(1)(response ⇒ response.entity.toStrict(5.seconds).map(strictEntity ⇒ response.copy(entity = strictEntity)))
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
)
val feedback = b.add(
Flow[(Try[HttpResponse], HttpRequest)]
.map {
case (Success(response), _) ⇒ {
val index = response.headers.find(_.is("x-consul-index")).map(_.value.toLong)
println(s"Success. New index = ${index}.")
createRequest(uri, maxWait, index)
}
case (Failure(cause), _) ⇒ {
println("Failure: " + cause.getMessage)
createRequest(uri, maxWait, None)
}
}
)
// format: OFF
init ~> merge ~> tupler ~> http ~> broadcast ~> outbound
merge <~ feedback <~ broadcast
// format: ON
SourceShape(outbound.out)
})
}
private def createRequest(baseUri: Uri, maxWait: Duration, index: Option[Long]): HttpRequest = {
HttpRequest(
uri = baseUri.copy(
rawQueryString = Some(s"wait=${maxWait.toSeconds}s&index=${index.map(_.toString).getOrElse("0")}")
)
)
}
} The query parameters on the request are specific to the Consul API but I expect every long-polling endpoint has some form of those same two parameters. I plan to turn this into a blog post once I've figured some more things out and handled all the corner cases. Would this fit anywhere in the akka-http docs? Updated: implemented conversion to |
Some more progress. I tired using the connection-level flow in combination with object LongPollingHttpClientUsingSingleConnection {
def longPollingSource(host: String, port: Int, uri: Uri, maxWait: Duration)(implicit s: ActorSystem, fm: Materializer): Source[HttpResponse, NotUsed] = {
Source.fromGraph(GraphDSL.create() { implicit b ⇒
import GraphDSL.Implicits._
import s._
val settings = ClientConnectionSettings(s).withIdleTimeout(maxWait * 1.2)
val initSource: Source[HttpRequest, NotUsed] = Source.single(createRequest(uri, maxWait, None)).log("outer 0", out ⇒ s"Sending request...")
val httpFlow: Flow[HttpRequest, HttpResponse, NotUsed] =
Flow[HttpRequest]
.flatMapConcat(request ⇒
Source.single(request)
.via(Http().outgoingConnection(host, port, settings = settings))
)
.mapMaterializedValue(_ ⇒ NotUsed)
val outboundResponsesFlow: Flow[HttpResponse, HttpResponse, NotUsed] =
Flow[HttpResponse] // TODO: add size limit
.mapAsync(1)(response ⇒ response.entity.toStrict(5.seconds).map(strictEntity ⇒ response.copy(entity = strictEntity)))
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider)) // TODO: turn into log-and-resume
val feedbackResponsesFlow: Flow[HttpResponse, HttpRequest, NotUsed] =
Flow[HttpResponse]
.map { response ⇒
log.debug(s"Success. Response: ${response.copy(entity = HttpEntity.Empty)}.")
val index = response.headers.find(_.is("x-consul-index")).map(_.value.toLong)
log.debug(s"New index: ${index}.")
createRequest(uri, maxWait, index)
}
val init = b.add(initSource)
val http = b.add(httpFlow)
val merge = b.add(Merge[HttpRequest](2))
val broadcast = b.add(Broadcast[HttpResponse](2))
val outbound = b.add(outboundResponsesFlow)
val feedback = b.add(feedbackResponsesFlow)
// format: OFF
init ~> merge ~> http ~> broadcast ~> outbound
merge <~ feedback <~ broadcast
// format: ON
SourceShape(outbound.out)
})
}
private def createRequest(baseUri: Uri, maxWait: Duration, index: Option[Long]): HttpRequest = {
HttpRequest(
uri = baseUri.copy(
rawQueryString = Some(s"wait=${maxWait.toSeconds}s&index=${index.map(_.toString).getOrElse("0")}")
)
)
}
} I think the docs could use some sections on stream resiliency and failure recovery in more complex cases such as this. I think I'll dive into custom graph stages next, since they are the only code I've seen so far that has access to upstream completion/failure events. |
I hope my efforts at slowly clawing my way towards enlightenment provide some insight into the struggles of people trying to figure out how this stream stuff works 😄 |
Awesome research here, thanks for sharing it and sorry I wasn't able to comment in depth yet, a bit overloaded last week. Hope to investigate here this week :) |
I have discovered |
Had some time now while wife was shopping ;) |
Hi @ktoso and the rest of the akka team. I've been diving into the implementation of http core and streams and I'm very curious about the design decisions that went into the current design and about your ideas about the direction things are going. In the current design, http connection management has been directly linked to stream materialization and there are no hooks at all in the API for dealing with events and state changes on the connection. The stream just stops and the surrounding code will have to detect this in some way, which is not entirely trivial since AFAIK there are no easy prebuilt stages for dealing with "normal" stream completion. This means that in order to build streams that deal with connection loss, transparently reconnecting, etc., you will need to break our of the akka-stream context and dive into the implementation layer and the underlying actors. This is exactly how the host connection pools in http core have been implemented, i.e. a Flow implemented by an actor that internally uses more actors to rematerialize nested connection Flows when they complete. Unfortunately a lot of that work is private and very deeply entwined with complex implementation details. I'm attempting to strip out and copy the parts I would need to implement a persistent, resilient long polling connection Source but a lot of those implementation details are still not entirely obvious to me. I'm wondering whether this topic, i.e. exposing connection management in http core as a higher level abstraction, has been discussed in the team and whether there are plans to provide such a thing in the future. |
updated:: removed outdated link to source code. Next step, strip out some of the |
After writing that last bit in the above comment, I tested an implementation using a custom, non-shared pool configured to only allow on single connection and that seems to work, including resilience across connection failures. It seems that as long as you make sure the pool is not used by anyone else, using a single-connection host pool is actually the best solution for long polling since it stops you from having to implement all the rematerialization stuff hidden behind the pool interface. Latest implementation now here: https://github.com/agemooij/stream-experiments/blob/master/src/main/scala/scalapenos/experiments/streams/LongPollingHttpClient.scala |
I figured it's about time I spend a considerable chunk of time on this, to appreciate your effort here – I'm diving in now, thanks for keeping at it! |
So I've read through but need to give it a spin to notice exact semantics, some things that are missing you pointed out. API style actually is something we could provide - exacly such extension. I also noted:
the materialized value to control it is obviously a key feature to add. single-connection host pool likely makes sense for this scenario, I think agree, this way we don't impact anyone else; but will try to verify with others. We'll meet with the team on monday and I hope to have a look together at this and how it could move forward btw. Hope the comments some sense and we'll be back here soon :) |
I'll assign myself here to keep it on my radar |
@jrudolph good idea. I think the main problem with using Some kind of way to make absolutely sure the pool is unique, like an optional name field in If isolation can be guaranteed, then the remaining problem comes down to configuring the maximum number of pool connections for a situation in which connections get hogged for very long periods and the number of needed connections is very variable. Just setting the number to be pretty high (i.e. just guessing) would be perfectly acceptable to most users I guess. |
Good idea. The default value should be def longPollingSource(host: String, port: Int,
initialRequest: HttpRequest,
connectionSettings: ClientConnectionSettings = ClientConnectionSettings(system))
(nextRequest: HttpResponse ⇒ HttpRequest = _ => initialRequest)
(implicit m: Materializer): Source[HttpResponse, NotUsed] = {
I'd log it on the feedback side, since the output side is only meant for reporting successfully received responses.
Is this a best practice I'm unaware of? What's the danger?
I'm not sure what you mean. Are you suggesting adding API for this on Having to always drain the response entities to prevent blocking the main flow is certainly inconvenient for building user-proof APIs, so I'd prefer some easier, safely bounded way to implement response transformation in cases such as this; i.e. you have a certain knowledge of the types of responses you will likely receive and you don't want to bubble up this responsibility to the API user. |
I think using a dedicated single-connection pool is the right solution. The responsibility of the "pool" is the connection state handling. The abstraction (not the functionality) which is currently missing is something which provides This is something quite generic. Think about the burger chain where in the backend you have a pipeline of (burger stacking) cooks, basically a In akka-http, we have some implementations of those components but not abstraction that you could easily plug together. |
@asarkar the solution I reported above worked for me. |
@agemooij thanks for your response. My problem is similar but different. I've a list of unknown size and want to run a long polling request for each one. In your code, you return a P.S. I'm using the Connection-Level API because of this warning from the docs:
|
@asarkar A few initial comments:
|
@asarkar AFAICS the reason why the above code only performs the first long poller is that you ended up sequencing all pollers by using |
@agemooij The infinite blocking is required, otherwise I get a future timeout. It makes sense to me because the stream never closes. As for the connection management, if I switch to a |
@asarkar Please read what I said above and look at my example code. I'm not using the The infinite blocking call will make your actor very resilient. Much better is to turn the long poller into a self-sustaining cycle with some options to break out of its loop for specific situations. This is done by the cyclic graph I use in my example. No need for infinite blocking. |
I'll try that, although, this is a pet project and I'm running out of time. You misunderstood though, I never said you are using |
How about a more generic import scala.concurrent.Future
import akka.NotUsed
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{HttpRequest, Uri}
import akka.http.scaladsl.unmarshalling.{FromResponseUnmarshaller, Unmarshal}
import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Merge, Source}
import akka.stream.{Materializer, SourceShape}
object ConsulThings {
/** Same as [[Source.unfold]], but uses a [[Flow]] to generate the next state-element tuple. */
def unfoldFlow[S, E](s: S)(f: Flow[S, Option[(S, E)], NotUsed]): Source[E, NotUsed] = {
val first = Source.single(s)
val fo = f.takeWhile(_.isDefined).collect{case Some(elem) => elem}
val next = Flow[(S, E)].map(_._1)
Source.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val merge = b.add(Merge[S](2))
val split = b.add(Broadcast[(S, E)](2))
val emit = b.add(Flow[(S, E)].map(_._2))
first ~> merge ~> fo ~> split ~> emit
merge <~ next <~ split
SourceShape(emit.out)
})
}
/** “Watches” a blocking Consul query endpoint. */
def watch[A: FromResponseUnmarshaller](uri: Uri)(implicit system: ActorSystem, materializer: Materializer): Source[A, NotUsed] = {
import system.dispatcher
def addIndex(index: Option[String]) =
index.map(i => uri.withQuery(uri.query().+:("index" -> i).+:("wait" -> "5s"))).getOrElse(uri)
unfoldFlow(Option.empty[String])(Flow[Option[String]]
.map(i => HttpRequest(uri = addIndex(i)))
// uses a single connection; the authority of the URI must not change between elements
.via(Http(system).outgoingConnection(host = uri.authority.host.address(), port = uri.authority.port))
.mapAsync(1) { response =>
if (response.status.isSuccess()) {
val index = response.headers.find(_ is "x-consul-index").map(_.value())
Unmarshal(response).to[A].map(a => Option((index, a)))
} else {
Future.failed(new RuntimeException(response.status.toString))
}
})
}
} |
Yeah, that looks pretty good. I would recommend you to use a connection pool like in my example instead of In long-polling you are going to be dealing with intermittent reconnection and without using a connection pool, that would be very hard to deal with. |
Alternatively, wrapping the watch in |
Tuesday Mar 22, 2016 at 16:33 GMT
Originally opened as akka/akka#20121
Quoted from a Gitter discussion:
The text was updated successfully, but these errors were encountered: