Skip to content

Commit ebe4e04

Browse files
authored
fix: SDK gRPC endpoint dispatchers (#2097)
1 parent 2cb978a commit ebe4e04

File tree

11 files changed

+339
-33
lines changed

11 files changed

+339
-33
lines changed

build.sbt

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,21 @@ lazy val pluginTesterJava = Project(id = "akka-grpc-plugin-tester-java", base =
270270
ReflectiveCodeGen.codeGeneratorSettings ++= Seq("server_power_apis"))
271271
.pluginTestingSettings
272272

273+
lazy val pluginTesterSdkHandler =
274+
Project(id = "akka-grpc-plugin-tester-sdk-handler", base = file("plugin-tester-sdk-handler"))
275+
.disablePlugins(MimaPlugin, CiReleasePlugin)
276+
.settings(Dependencies.pluginTester)
277+
.settings(
278+
(publish / skip) := true,
279+
fork := true,
280+
PB.protocVersion := Dependencies.Versions.googleProtobuf,
281+
ReflectiveCodeGen.generatedLanguages := Seq("Java"),
282+
crossScalaVersions := Dependencies.Versions.CrossScalaForLib,
283+
scalaVersion := Dependencies.Versions.CrossScalaForLib.head,
284+
Test / parallelExecution := false,
285+
ReflectiveCodeGen.codeGeneratorSettings ++= Seq("generate_scala_handler_factory"))
286+
.pluginTestingSettings
287+
273288
lazy val root = Project(id = "akka-grpc", base = file("."))
274289
.disablePlugins(SitePlugin, MimaPlugin, CiReleasePlugin)
275290
.aggregate(
@@ -281,6 +296,7 @@ lazy val root = Project(id = "akka-grpc", base = file("."))
281296
interopTests,
282297
pluginTesterScala,
283298
pluginTesterJava,
299+
pluginTesterSdkHandler,
284300
benchmarks,
285301
docs)
286302
.settings(

codegen/src/main/twirl/templates/JavaServer/ScalaHandler.scala.txt

Lines changed: 25 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -58,36 +58,36 @@ public final class @{serviceName}ScalaHandlerFactory implements InstancePerReque
5858
private static final CompletionStage<akka.http.javadsl.model.HttpResponse> unsupportedMediaType = CompletableFuture.completedFuture(
5959
akka.http.javadsl.model.HttpResponse.create().withStatus(akka.http.javadsl.model.StatusCodes.UNSUPPORTED_MEDIA_TYPE));
6060

61-
62-
/**
61+
/**
6362
* Creates a `HttpRequest` to `HttpResponse` handler that can be used in for example
6463
* `Http.get(system).bindAndHandleAsync`. It ends with `StatusCodes.NotFound` if the request is not matching.
6564
*
6665
* Use {@@link akka.grpc.javadsl.ServiceHandler#concatOrNotFound} when combining several services.
6766
*/
68-
@@SuppressWarnings("All")
69-
@@Override
70-
public PartialFunction<akka.http.scaladsl.model.HttpRequest, Future<akka.http.scaladsl.model.HttpResponse>> partialInstancePerRequest(Function1<akka.http.scaladsl.model.HttpRequest, @{serviceName}> serviceFactory, String prefix, PartialFunction<Throwable, Trailers> eHandler, ClassicActorSystemProvider systemProvider) {
71-
InstancePerRequestPF.GrpcMethod[] methods = {
72-
new InstancePerRequestPF.GrpcMethod<@{serviceName}>(
73-
"@service.methods.head.grpcName",
74-
this::@{service.methods.head.name})
75-
@for(method <- service.methods.tail) {
76-
,
77-
new InstancePerRequestPF.GrpcMethod<@{serviceName}>(
78-
"@{method.grpcName}",
79-
this::@{method.name})
80-
}
81-
};
82-
83-
return new InstancePerRequestPF<@{serviceName}>(
84-
serviceFactory,
85-
prefix,
86-
methods,
87-
eHandler,
88-
systemProvider
89-
);
90-
}
67+
@@Override
68+
@@SuppressWarnings({"all","unchecked"})
69+
public PartialFunction<akka.http.scaladsl.model.HttpRequest, Future<akka.http.scaladsl.model.HttpResponse>> partialInstancePerRequest(Function1<akka.http.scaladsl.model.HttpRequest, @{serviceName}> serviceFactory, String prefix, PartialFunction<Throwable, Trailers> eHandler, ClassicActorSystemProvider systemProvider, Materializer materializer) {
70+
InstancePerRequestPF.GrpcMethod[] methods = {
71+
new InstancePerRequestPF.GrpcMethod<@{serviceName}>(
72+
"@service.methods.head.grpcName",
73+
this::@{service.methods.head.name})
74+
@for(method <- service.methods.tail) {
75+
,
76+
new InstancePerRequestPF.GrpcMethod<@{serviceName}>(
77+
"@{method.grpcName}",
78+
this::@{method.name})
79+
}
80+
};
81+
82+
return new InstancePerRequestPF<@{serviceName}>(
83+
serviceFactory,
84+
prefix,
85+
methods,
86+
eHandler,
87+
systemProvider,
88+
materializer
89+
);
90+
}
9191

9292
public String getServiceName() {
9393
return @{service.name}.name;

plugin-tester-sdk-handler/LICENSE

Lines changed: 88 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# Test module for testing the SDK-specific server handler
2+
3+
Tests run through the main sbt project
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
2+
syntax = "proto3";
3+
4+
option java_multiple_files = true;
5+
option java_package = "example.myapp.helloworld.grpc";
6+
option java_outer_classname = "HelloWorldProto";
7+
8+
package helloworld;
9+
10+
service GreeterService {
11+
rpc SayHello (HelloRequest) returns (HelloReply) {}
12+
rpc ItKeepsReplying (HelloRequest) returns (stream HelloReply) {}
13+
}
14+
message HelloRequest {
15+
string name = 1;
16+
}
17+
18+
message HelloReply {
19+
string message = 1;
20+
}
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/*
2+
* Copyright (C) 2025 Lightbend Inc. <https://www.lightbend.com>
3+
*/
4+
5+
package com.example
6+
7+
import akka.NotUsed
8+
import akka.actor.typed.ActorSystem
9+
import akka.actor.testkit.typed.scaladsl.ActorTestKit
10+
import akka.actor.typed.scaladsl.Behaviors
11+
import akka.grpc.GrpcClientSettings
12+
import akka.grpc.scaladsl.GrpcExceptionHandler
13+
import akka.http.scaladsl.Http
14+
import akka.stream.ActorAttributes
15+
import akka.stream.Materializer
16+
import akka.stream.javadsl.Source
17+
import akka.stream.scaladsl.Sink
18+
import com.typesafe.config.ConfigFactory
19+
import example.myapp.helloworld.grpc.GreeterService
20+
import example.myapp.helloworld.grpc.GreeterServiceClient
21+
import example.myapp.helloworld.grpc.GreeterServiceScalaHandlerFactory
22+
import example.myapp.helloworld.grpc.HelloReply
23+
import example.myapp.helloworld.grpc.HelloRequest
24+
import org.scalatest.BeforeAndAfterAll
25+
import org.scalatest.concurrent.ScalaFutures
26+
import org.scalatest.matchers.should.Matchers
27+
import org.scalatest.time.Span
28+
import org.scalatest.wordspec.AnyWordSpec
29+
30+
import java.util.concurrent.CompletableFuture
31+
import java.util.concurrent.CompletionStage
32+
import scala.concurrent.duration.DurationInt
33+
import scala.jdk.FutureConverters.CompletionStageOps
34+
35+
class ServiceExecutionContextSpec extends AnyWordSpec with Matchers with ScalaFutures with BeforeAndAfterAll {
36+
37+
implicit val system: ActorSystem[Nothing] = ActorSystem[Any](
38+
Behaviors.empty,
39+
"ServiceExecutionContextSpec",
40+
ConfigFactory.parseString("""
41+
akka.http.server.enable-http2 = on
42+
custom-dispatcher = {
43+
type = "Dispatcher"
44+
executor = "fork-join-executor"
45+
}
46+
"""))
47+
override implicit val patienceConfig: PatienceConfig = PatienceConfig(5.seconds, Span(100, org.scalatest.time.Millis))
48+
49+
private final class GreeterServiceImpl extends GreeterService {
50+
51+
private def replyWithCurrentThreadName() =
52+
HelloReply.newBuilder().setMessage(Thread.currentThread().getName).build()
53+
54+
override def sayHello(in: HelloRequest): CompletionStage[HelloReply] = {
55+
CompletableFuture.completedFuture(replyWithCurrentThreadName())
56+
}
57+
58+
override def itKeepsReplying(in: HelloRequest): Source[HelloReply, NotUsed] = {
59+
// invokin thread
60+
Source
61+
.single(replyWithCurrentThreadName())
62+
.concat(
63+
Source
64+
.single(in)
65+
// thread name in the running stream (materialized by akka grpc)
66+
.map(_ => replyWithCurrentThreadName()))
67+
}
68+
}
69+
70+
"The SDK instance per request handler" should {
71+
72+
val customMaterializer = Materializer(system, ActorAttributes.dispatcher("custom-dispatcher"))
73+
74+
val handler = new GreeterServiceScalaHandlerFactory()
75+
76+
val handlerPf = handler.partialInstancePerRequest(
77+
_ => new GreeterServiceImpl,
78+
"",
79+
GrpcExceptionHandler.defaultMapper(system.classicSystem),
80+
system,
81+
customMaterializer)
82+
83+
val bound = Http().newServerAt("127.0.0.1", 0).withMaterializer(customMaterializer).bind(handlerPf).futureValue
84+
val port = bound.localAddress.getPort
85+
86+
val client =
87+
GreeterServiceClient.create(GrpcClientSettings.connectToServiceAt("127.0.0.1", port).withTls(false), system)
88+
89+
"run unary requests on the specified execution context" in {
90+
val reply = client.sayHello(HelloRequest.getDefaultInstance).asScala.futureValue
91+
92+
reply.getMessage should include("custom-dispatcher")
93+
}
94+
95+
"run streamed responses and their streams on specified execution context" in {
96+
val replies = client.itKeepsReplying(HelloRequest.getDefaultInstance).asScala.runWith(Sink.seq).futureValue
97+
98+
replies should have size 2
99+
replies(0).getMessage should include("custom-dispatcher")
100+
// Note: this is only true because we started http server with the materializer as well
101+
replies(1).getMessage should include("custom-dispatcher")
102+
}
103+
104+
}
105+
106+
override def afterAll(): Unit = {
107+
ActorTestKit.shutdown(system)
108+
}
109+
110+
}

project/Dependencies.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,4 +157,13 @@ object Dependencies {
157157
Test.scalaTestPlusJunit,
158158
Test.akkaTestkitTyped,
159159
GrpcApi.googleApiProtos)
160+
161+
val sdkPluginTester = l ++= Seq(
162+
// usually automatically added by `suggestedDependencies`, which doesn't work with ReflectiveCodeGen
163+
Compile.grpcStub,
164+
Compile.akkaPki,
165+
Runtime.logback,
166+
Test.scalaTest,
167+
Test.scalaTestPlusJunit,
168+
Test.akkaTestkitTyped)
160169
}

runtime/src/main/scala/akka/grpc/internal/InstancePerRequestPF.scala

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import akka.stream.Materializer
1616
import akka.stream.SystemMaterializer
1717

1818
import java.util.concurrent.CompletionStage
19-
import scala.concurrent.ExecutionContext
19+
2020
import scala.concurrent.Future
2121
import scala.jdk.FutureConverters.CompletionStageOps
2222

@@ -51,12 +51,20 @@ private[akka] final class InstancePerRequestPF[S](
5151
prefix: String,
5252
methods: Array[InstancePerRequestPF.GrpcMethod[S]],
5353
eHandler: PartialFunction[Throwable, Trailers],
54-
system: ClassicActorSystemProvider)
54+
system: ClassicActorSystemProvider,
55+
materializer: Materializer)
5556
extends PartialFunction[HttpRequest, Future[HttpResponse]] {
56-
// Note how the factory and partial function is Akka HTTP scaladsl. This is intentional.
5757

58-
private val materializer: Materializer = SystemMaterializer(system).materializer
59-
private implicit val ec: ExecutionContext = materializer.executionContext
58+
// for bin/source comp with existing generated code
59+
def this(
60+
factory: HttpRequest => S,
61+
prefix: String,
62+
methods: Array[InstancePerRequestPF.GrpcMethod[S]],
63+
eHandler: PartialFunction[Throwable, Trailers],
64+
system: ClassicActorSystemProvider) =
65+
this(factory, prefix, methods, eHandler, system, SystemMaterializer(system).materializer)
66+
67+
// Note how the factory and partial function is Akka HTTP scaladsl. This is intentional.
6068
private val spi = TelemetryExtension(system).spi
6169
private val javaEHandler: akka.japi.Function[ActorSystem, akka.japi.Function[Throwable, Trailers]] =
6270
(_: ActorSystem) => { eHandler.apply _ }

runtime/src/main/scala/akka/grpc/scaladsl/InstancePerRequestFactory.scala

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,10 @@ import akka.annotation.DoNotInherit
1010
import akka.grpc.Trailers
1111
import akka.http.scaladsl.model.HttpRequest
1212
import akka.http.scaladsl.model.HttpResponse
13+
import akka.stream.Materializer
14+
import akka.stream.SystemMaterializer
1315

16+
import scala.annotation.nowarn
1417
import scala.concurrent.Future
1518

1619
/**
@@ -24,10 +27,32 @@ import scala.concurrent.Future
2427
@DoNotInherit
2528
@ApiMayChange
2629
trait InstancePerRequestFactory[S] {
30+
31+
// Note: this may be strange, one of these two methods is always overridden, either the old one, or the new
32+
// having this cycle avoids having to implement the old method in new generated code just for compatibility
33+
34+
@nowarn("msg=deprecated")
35+
def partialInstancePerRequest(
36+
serviceFactory: HttpRequest => S,
37+
prefix: String,
38+
eHandler: PartialFunction[Throwable, Trailers],
39+
systemProvider: ClassicActorSystemProvider,
40+
materializer: Materializer): PartialFunction[HttpRequest, Future[HttpResponse]] = {
41+
partialInstancePerRequest(serviceFactory, prefix, eHandler, systemProvider)
42+
}
43+
44+
// for compatibility with existing generated sources and SDK
45+
@deprecated(since = "2.5.9")
2746
def partialInstancePerRequest(
2847
serviceFactory: HttpRequest => S,
2948
prefix: String,
3049
eHandler: PartialFunction[Throwable, Trailers],
31-
systemProvider: ClassicActorSystemProvider): PartialFunction[HttpRequest, Future[HttpResponse]]
50+
systemProvider: ClassicActorSystemProvider): PartialFunction[HttpRequest, Future[HttpResponse]] =
51+
partialInstancePerRequest(
52+
serviceFactory,
53+
prefix,
54+
eHandler,
55+
systemProvider,
56+
SystemMaterializer(systemProvider).materializer)
3257

3358
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Copyright (C) 2024-2025 Lightbend Inc. <https://www.lightbend.com>
3+
*/
4+
5+
package akka.grpc.internal;
6+
7+
import akka.actor.ClassicActorSystemProvider;
8+
import akka.grpc.Trailers;
9+
import akka.grpc.scaladsl.InstancePerRequestFactory;
10+
import akka.http.scaladsl.model.HttpRequest;
11+
import akka.http.scaladsl.model.HttpResponse;
12+
import akka.stream.Materializer;
13+
import scala.Function1;
14+
import scala.PartialFunction;
15+
import scala.concurrent.Future;
16+
17+
// in previous versions of the plugin this would be generated
18+
public class OldTestServiceScalaHandlerFactory implements InstancePerRequestFactory<TestService> {
19+
20+
@Override
21+
public PartialFunction<HttpRequest, Future<HttpResponse>> partialInstancePerRequest(Function1<HttpRequest, TestService> serviceFactory, String prefix, PartialFunction<Throwable, Trailers> eHandler, ClassicActorSystemProvider systemProvider) {
22+
return null;
23+
}
24+
25+
}

0 commit comments

Comments
 (0)