Skip to content

Commit

Permalink
wire Fs2ServerCallHandler
Browse files Browse the repository at this point in the history
  • Loading branch information
naoh87 committed Feb 21, 2022
1 parent 792c065 commit e5e1b57
Showing 1 changed file with 7 additions and 16 deletions.
23 changes: 7 additions & 16 deletions runtime/src/main/scala/fs2/grpc/server/Fs2ServerCallHandler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ package server

import cats.effect._
import cats.effect.std.Dispatcher
import fs2.grpc.server.internal.Fs2StreamServerCallHandler
import fs2.grpc.server.internal.Fs2UnaryServerCallHandler
import io.grpc._

Expand All @@ -36,32 +37,22 @@ class Fs2ServerCallHandler[F[_]: Async] private (
def unaryToUnaryCall[Request, Response](
implementation: (Request, Metadata) => F[Response]
): ServerCallHandler[Request, Response] =
Fs2UnaryServerCallHandler.unary(implementation, options, dispatcher)
Fs2UnaryServerCallHandler.mkHandler(implementation, options)(_.unary(_, dispatcher))

def unaryToStreamingCall[Request, Response](
implementation: (Request, Metadata) => Stream[F, Response]
): ServerCallHandler[Request, Response] =
Fs2UnaryServerCallHandler.stream(implementation, options, dispatcher)
Fs2UnaryServerCallHandler.mkHandler(implementation, options)(_.stream(_, dispatcher))

def streamingToUnaryCall[Request, Response](
implementation: (Stream[F, Request], Metadata) => F[Response]
): ServerCallHandler[Request, Response] = new ServerCallHandler[Request, Response] {
def startCall(call: ServerCall[Request, Response], headers: Metadata): ServerCall.Listener[Request] = {
val listener = dispatcher.unsafeRunSync(Fs2StreamServerCallListener[F](call, dispatcher, options))
listener.unsafeUnaryResponse(new Metadata(), implementation(_, headers))
listener
}
}
): ServerCallHandler[Request, Response] =
Fs2StreamServerCallHandler.mkHandler(implementation, options)(_.unary(_, dispatcher))

def streamingToStreamingCall[Request, Response](
implementation: (Stream[F, Request], Metadata) => Stream[F, Response]
): ServerCallHandler[Request, Response] = new ServerCallHandler[Request, Response] {
def startCall(call: ServerCall[Request, Response], headers: Metadata): ServerCall.Listener[Request] = {
val listener = dispatcher.unsafeRunSync(Fs2StreamServerCallListener[F](call, dispatcher, options))
listener.unsafeStreamResponse(new Metadata(), implementation(_, headers))
listener
}
}
): ServerCallHandler[Request, Response] =
Fs2StreamServerCallHandler.mkHandler(implementation, options)(_.stream(_, dispatcher))
}

object Fs2ServerCallHandler {
Expand Down

0 comments on commit e5e1b57

Please sign in to comment.