Skip to content

Commit

Permalink
tweak ServerSuite
Browse files Browse the repository at this point in the history
  • Loading branch information
naoh87 committed Feb 17, 2022
1 parent 9d3cac8 commit f0e2687
Showing 1 changed file with 72 additions and 66 deletions.
138 changes: 72 additions & 66 deletions runtime/src/test/scala/fs2/grpc/server/ServerSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,90 +23,97 @@ package fs2
package grpc
package server

import scala.concurrent.duration._
import cats.effect._
import cats.effect.std.Dispatcher
import cats.effect.testkit.TestContext
import cats.effect.testkit.TestControl
import io.grpc._
import scala.concurrent.duration._

class ServerSuite extends Fs2GrpcSuite {

private val compressionOps =
ServerOptions.default.configureCallOptions(_.withServerCompressor(Some(GzipCompressor)))

private def startCall[R](
sync: Fs2ServerCallHandler[IO] => ServerCallHandler[String, Int],
private def startCall(
implement: Fs2ServerCallHandler[IO] => ServerCallHandler[String, Int],
serverOptions: ServerOptions = ServerOptions.default
)(f: (ServerCall.Listener[String], DummyServerCall) => IO[R]): IO[R] =
TestControl.executeEmbed(
Dispatcher[IO]
.map(Fs2ServerCallHandler[IO](_, serverOptions))
.use(h =>
IO.defer {
val dummy = new DummyServerCall
f(sync(h).startCall(dummy, new Metadata()), dummy)
}
)(call: ServerCall[String, Int], thunk: ServerCall.Listener[String] => IO[Unit]): IO[Unit] =
for {
releaseRef <- IO.ref[IO[Unit]](IO.unit)
startBarrier <- Deferred[IO, Unit]
_ <- TestControl
.execute(
for {
allocated <- Dispatcher[IO].map(Fs2ServerCallHandler[IO](_, serverOptions)).allocated
(handler, release) = allocated
_ <- releaseRef.set(release)
listener <- IO(implement(handler).startCall(call, new Metadata()))
_ <- startBarrier.get
_ <- IO.defer(thunk(listener))
} yield ()
)
)
.flatMap { tc =>
for {
_ <- tc.tick
_ <- startBarrier.complete(())
_ <- tc.tickAll
} yield ()
}
_ <- releaseRef.get
} yield ()

private def syncCall(
fs: (ServerCall.Listener[String] => Unit)*
): ServerCall.Listener[String] => IO[Unit] =
listener => IO(fs.foreach(_.apply(listener)))

test("unaryToUnary with compression") {
startCall(_.unaryToUnaryCall((req, _) => IO(req.length)), compressionOps) { (_, dummy) =>
IO {
assertEquals(dummy.explicitCompressor, Some("gzip"))
}
}
testCompression(_.unaryToUnaryCall((req, _) => IO(req.length)))
}

test("unaryToStream with compression") {
startCall(_.unaryToStreamingCall((req, _) => Stream.emit(req.length)), compressionOps) { (_, dummy) =>
IO {
assertEquals(dummy.explicitCompressor, Some("gzip"))
}
}
testCompression(_.unaryToStreamingCall((req, _) => Stream.emit(req.length).repeatN(5)))
}

test("streamToUnary with compression") {
startCall(_.streamingToUnaryCall((req, _) => req.compile.count.map(_.toInt)), compressionOps) { (_, dummy) =>
IO {
assertEquals(dummy.explicitCompressor, Some("gzip"))
}
}
testCompression(_.streamingToUnaryCall((req, _) => req.compile.foldMonoid.map(_.length)))
}

test("streamToStream with compression") {
startCall(_.streamingToStreamingCall((req, _) => req.map(_.length)), compressionOps) { (_, dummy) =>
IO {
assertEquals(dummy.explicitCompressor, Some("gzip"))
}
test("streamToStream with compression")(
testCompression(_.streamingToStreamingCall((req, _) => req.map(_.length)))
)

private def testCompression(sync: Fs2ServerCallHandler[IO] => ServerCallHandler[String, Int]): IO[Unit] = {
val dummy = new DummyServerCall
startCall(sync, compressionOps)(dummy, _ => IO.unit) >> IO {
assertEquals(dummy.explicitCompressor, Some("gzip"))
}
}

test("single message to unaryToUnary") {
startCall(_.unaryToUnaryCall((req, _) => IO(req.length))) { (listener, dummy) =>
listener.onMessage("123")
listener.onHalfClose()
IO {
assertEquals(dummy.explicitCompressor, None)
assertEquals(dummy.messages.size, 1)
assertEquals(dummy.messages(0), 3)
assertEquals(dummy.currentStatus.isDefined, true)
assertEquals(dummy.currentStatus.get.isOk, true)
}.delayBy(1.seconds)
val dummy = new DummyServerCall
startCall(_.unaryToUnaryCall((req, _) => IO(req.length)))(
dummy,
syncCall(_.onMessage("123"), _.onHalfClose())
) >> IO {
assertEquals(dummy.explicitCompressor, None)
assertEquals(dummy.messages.size, 1)
assertEquals(dummy.messages(0), 3)
assertEquals(dummy.currentStatus.isDefined, true)
assertEquals(dummy.currentStatus.get.isOk, true)
}
}

runTest("cancellation for unaryToUnary") { (tc, d) =>
test("cancellation for unaryToUnary") {
val dummy = new DummyServerCall
val listener = Fs2ServerCallHandler[IO](d, ServerOptions.default)
.unaryToUnaryCall[String, Int]((req, _) => IO(req.length))
.startCall(dummy, new Metadata())

listener.onCancel()
tc.tick()

assertEquals(dummy.currentStatus, None)
assertEquals(dummy.messages.length, 0)
startCall(_.unaryToUnaryCall((req, _) => IO(req.length)))(
dummy,
syncCall(_.onCancel())
) >> IO {
assertEquals(dummy.currentStatus, None)
assertEquals(dummy.messages.length, 0)
}
}

runTest("cancellation on the fly for unaryToUnary") { (tc, d) =>
Expand Down Expand Up @@ -145,13 +152,12 @@ class ServerSuite extends Fs2GrpcSuite {
}

test("no messages to unaryToUnary") {
startCall(
_.unaryToUnaryCall((req, _) => IO(req.length))
) { (listener, dummy) =>
listener.onHalfClose()
IO {
assertEquals(dummy.currentStatus.map(_.getCode), Some(Status.Code.INTERNAL))
}.delayBy(1.seconds)
val dummy = new DummyServerCall
startCall(_.unaryToUnaryCall((req, _) => IO(req.length)))(
dummy,
syncCall(_.onHalfClose())
) >> IO {
assertEquals(dummy.currentStatus.map(_.getCode), Some(Status.Code.INTERNAL))
}
}

Expand Down Expand Up @@ -204,14 +210,14 @@ class ServerSuite extends Fs2GrpcSuite {
}

test("cancellation for streamingToStreaming") {
val dummy = new DummyServerCall
startCall(
_.streamingToStreamingCall((_, _) => Stream.emit(3).repeat.take(5).zipLeft(Stream.awakeDelay[IO](1.seconds)))
) { (listener, dummy) =>
IO {
listener.onCancel() // wait onCancel for stream is started
}.delayBy(2.seconds) >> IO {
assertEquals(dummy.currentStatus.map(_.getCode), Some(Status.Code.CANCELLED))
}.delayBy(1.seconds) // wait for stream cancellation is completed
)(
dummy,
syncCall(_.onCancel())
) >> IO {
assertEquals(dummy.currentStatus.map(_.getCode), Some(Status.Code.CANCELLED))
}
}

Expand Down

0 comments on commit f0e2687

Please sign in to comment.