Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better "collect Strict" specifically for WebSockets Flow #65

Open
akka-ci opened this issue Sep 8, 2016 · 14 comments
Open

Better "collect Strict" specifically for WebSockets Flow #65

akka-ci opened this issue Sep 8, 2016 · 14 comments
Labels
1 - triaged Tickets that are safe to pick up for contributing in terms of likeliness of being accepted

Comments

@akka-ci
Copy link

akka-ci commented Sep 8, 2016

Issue by ktoso
Monday Mar 21, 2016 at 09:59 GMT
Originally opened as akka/akka#20096


This is what people do (and I'd do so myself):

        val flow = Flow[Message].collect {
          case TextMessage.Strict(msg) =>
            msg.parseJson.convertTo[Anything]

which can randomly break "almost work", due to even the smallest message sometimes ending up as Streamed (I've seen this with msg as small as "Hello world").

"Almost work" here is defined as – small message comes in, but ends up being streamed, the collect here drops it, processing is stalled until subscription timeout triggers killing the textStream of the streamed message.

We should provide a safe way to "I know these will be strict, or force them to be".
This is somewhat related to the "auto drain" feature for HttpRequests too, however likely a different trick will have to be applied.

For reference, this is what I ended up with in a project I hackked on:

  implicit class flowTweaks[M](val wsInput: Source[Message, M]) {
    def forceTextStrict: Source[Strict, M] = wsInput
      .collect {
        case TextMessage.Strict(text)              Future.successful(text)
        case TextMessage.Streamed(textStream)      textStream.runFold("")(_ + _)
        case BinaryMessage.Strict(binary)          Skip
        case BinaryMessage.Streamed(binaryStream)  binaryStream.runWith(Sink.ignore); Skip
      }
      .filterNot(_ == Skip)
      .mapAsync(1)(ConstantFun.scalaIdentityFunction)
      .map(TextMessage.Strict)
    }
@akka-ci akka-ci added this to the 2.4.x milestone Sep 8, 2016
@akka-ci akka-ci added the t:http label Sep 8, 2016
@akka-ci
Copy link
Author

akka-ci commented Sep 8, 2016

Comment by ktoso
Monday Mar 21, 2016 at 11:20 GMT


This was already brought up in a somewhat similar issue: akka/akka#19089

@akka-ci
Copy link
Author

akka-ci commented Sep 8, 2016

Comment by rkuhn
Monday Mar 21, 2016 at 13:55 GMT


I think it is a mistake to allow users to pattern-match on strict entities, this opens up all kinds of scenarios that “work fine” in tests but fail in production. The reason is that whether something is strict or not depends on random interactions of TCP flow control and buffer sizes, strictness is a low-level implementation detail and not dependable for user-space.

Of course I can already hear the screams of the benchmark crazies that care less about correctness and more about arcane optimizations and general geekiness.

Therefore the probable solution is that we provide a proper (i.e. working but also configurably bounded) implementation of how to convert incoming websocket messages into their strict representation.

@akka-ci
Copy link
Author

akka-ci commented Sep 8, 2016

Comment by ktoso
Monday Mar 21, 2016 at 14:13 GMT


The reason is that whether something is strict or not depends on random interactions of TCP flow control and buffer sizes, strictness is a low-level implementation detail and not dependable for user-space

Yeah, very true - it's not predictable. One can't even assume it is Strict for a really small message ("Hello").

Also, then it seems we do not want to: akka/akka#20087 (currently it is impossible to do the draining even - since you can't match on Streamed), so most if not all impls are wrong - assuming Strict.

We could turn our thinking about it upside down a bit, and say one should never match on either Strict or Streamed, and provide DSL or directives to do the right thing. As @jrudolph notices in the other thread, one usually works with small messages so to-strict-ing incoming messages may not be as bad as in the general setting, so we could provide directives:

handleWebSocket[TextMessage.Strict](flow)

instead... WDYT?

@akka-ci
Copy link
Author

akka-ci commented Sep 8, 2016

Comment by rkuhn
Monday Mar 21, 2016 at 14:28 GMT


Yes, that’s good as well, but it should also have an upper bound on message size—and if deviations are wanted on a per-case basis then handing the full protocol should be possible (as you note, this requires users to be able to actually do that).

@akka-ci
Copy link
Author

akka-ci commented Sep 8, 2016

Comment by ktoso
Monday Mar 21, 2016 at 14:29 GMT


Yes, that’s good as well, but it should also have an upper bound on message size

Yeah good point, a default from config though, otherwise a bit too clunky

@akka-ci
Copy link
Author

akka-ci commented Sep 8, 2016

Comment by mmacfadden
Monday Apr 11, 2016 at 00:34 GMT


+1 on this issue. I think in addition to adding functionality an example somewhere in the docs on handling streamed vs strict messages would also be great.

@akka-ci
Copy link
Author

akka-ci commented Sep 8, 2016

Comment by antonkulaga
Wednesday Jul 06, 2016 at 22:12 GMT


yes, the bug is very annoying, cause seame messages are almost always Strict on localhosts and tests but often Streamed in production enviroments =(

@ktoso ktoso added 1 - triaged Tickets that are safe to pick up for contributing in terms of likeliness of being accepted and removed t:http labels Sep 8, 2016
@ktoso ktoso removed this from the 2.4.x milestone Sep 12, 2016
@ac2epsilon
Copy link

+1 Cześć ktoso. I tried to copy-n-paste your implicit flowTweaks, and found that ConstantFun is not public any more, is it? What is a replacement? Also I'm unsure what is Skip and where is it sitting. Looks like Streamed case have no any actual (production) example in whole net. So it will be very kind of you or somebody to give us a link to actual piece of code which is up and running on akka.latest. Kind of proof of concept. BTW I experienced heavy penchant of Windows 10 to Streamed over Strict even on reasonably small payloads, I suspect they redused buffers to be more gadget-friendly (?)

@ktoso
Copy link
Contributor

ktoso commented Oct 3, 2016

Constant fun is just cached functions, you can simply use a normal function there (A => A).
When you get a Streamed/Strict entity depends on many things, it's not strictly set by Akka AFAIR (something we could improve, sure).

@cleverdeveloper
Copy link

Why not just stream flatMapConcat (_.asTextMessage.getStreamedText)?

@jrudolph
Copy link
Contributor

@cleverdeveloper because it will mix data together from different WS messages.

@cleverdeveloper
Copy link

cleverdeveloper commented Jul 20, 2017

@jrudolph You are right, somehow missed it. The right code should, probably, be the following, which is not much shorter than original snippet:

stream flatMapConcat { message =>
  message.asTextMessage.getStreamedText via Flow[String].fold("")(_ + _)
}

@atemerev
Copy link
Contributor

atemerev commented May 30, 2018

Any news on this? I am also getting a weird error message if I am trying to do the getStreamedText..via trick (probably because getStreamingText is the part of Java API?)

Error:(44, 29) no type parameters for method flatMapConcat: (f: akka.http.scaladsl.model.ws.Message => akka.stream.Graph[akka.stream.SourceShape[T],M])Gdax.this.wsFlow.Repr[T] exist so that it can be applied to arguments (akka.http.scaladsl.model.ws.Message => akka.stream.javadsl.Source[String, _])
 --- because ---
argument expression's type is not compatible with formal parameter type;
 found   : akka.http.scaladsl.model.ws.Message => akka.stream.javadsl.Source[String, _]
 required: akka.http.scaladsl.model.ws.Message => akka.stream.Graph[akka.stream.SourceShape[?T],?M]
    val stringFlow = wsFlow.flatMapConcat(msg => msg.asTextMessage.getStreamedText via Flow[String].fold("")(_ + _))

@raboof
Copy link
Contributor

raboof commented May 31, 2018

Remember that asTextMessage will throw for binary messages.

Nowadays it might be a solution to use TestMessage::toStrict as introduced in #1980. Some convenient additional API's have been proposed in #1979.

Given those 2 other tickets I think this one can now be closed, WDYT?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
1 - triaged Tickets that are safe to pick up for contributing in terms of likeliness of being accepted
Projects
None yet
Development

No branches or pull requests

7 participants