Skip to content

Commit cd3392e

Browse files
authored
feat: define default request headers for a client (#2018)
1 parent 6cc9309 commit cd3392e

File tree

10 files changed

+167
-14
lines changed

10 files changed

+167
-14
lines changed

codegen/src/main/twirl/templates/JavaClient/Client.scala.txt

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ public abstract class @{service.name}Client extends @{service.name}ClientPowerAp
4646
private final io.grpc.CallOptions options;
4747
private final Materializer mat;
4848
private final ExecutionContext ec;
49+
private final MetadataImpl defaultMetadata;
4950

5051
private Default@{service.name}Client(GrpcChannel channel, boolean isChannelOwned, ClassicActorSystemProvider sys) {
5152
this.channel = channel;
@@ -54,30 +55,41 @@ public abstract class @{service.name}Client extends @{service.name}ClientPowerAp
5455
this.mat = SystemMaterializer.get(sys).materializer();
5556
this.ec = sys.classicSystem().dispatcher();
5657
this.options = NettyClientUtils.callOptions(settings);
58+
this.defaultMetadata = MetadataImpl.empty();
5759

5860
sys.classicSystem().getWhenTerminated().whenComplete((v, e) -> close());
5961
}
6062

63+
private Default@{service.name}Client(GrpcChannel channel, boolean isChannelOwned, GrpcClientSettings settings, io.grpc.CallOptions options, Materializer mat, ExecutionContext ec, MetadataImpl defaultMetadata) {
64+
this.channel = channel;
65+
this.isChannelOwned = isChannelOwned;
66+
this.settings = settings;
67+
this.options = options;
68+
this.mat = mat;
69+
this.ec = ec;
70+
this.defaultMetadata = defaultMetadata;
71+
}
72+
6173
@for(method <- service.methods) {
6274
@if(method.methodType == akka.grpc.gen.Unary) {
6375
private final SingleResponseRequestBuilder<@method.inputTypeUnboxed, @method.outputTypeUnboxed> @{method.name}RequestBuilder(akka.grpc.internal.InternalChannel channel){
64-
return new JavaUnaryRequestBuilder<>(@{method.name}Descriptor, channel, options, settings, ec);
76+
return new JavaUnaryRequestBuilder<>(@{method.name}Descriptor, channel, options, settings, defaultMetadata, ec);
6577
}
6678
} else {
6779
@if(method.methodType == akka.grpc.gen.ClientStreaming){
6880
private final SingleResponseRequestBuilder<akka.stream.javadsl.Source<@method.inputTypeUnboxed, akka.NotUsed>, @method.outputTypeUnboxed> @{method.name}RequestBuilder(akka.grpc.internal.InternalChannel channel){
6981
return new JavaClientStreamingRequestBuilder<>(
70-
@{method.name}Descriptor, channel, options, settings, mat, ec);
82+
@{method.name}Descriptor, channel, options, settings, defaultMetadata, mat, ec);
7183
}
7284
} else if(method.methodType == akka.grpc.gen.ServerStreaming){
7385
private final StreamResponseRequestBuilder<@method.inputTypeUnboxed, @method.outputTypeUnboxed> @{method.name}RequestBuilder(akka.grpc.internal.InternalChannel channel){
7486
return new JavaServerStreamingRequestBuilder<>(
75-
@{method.name}Descriptor, channel, options, settings, ec);
87+
@{method.name}Descriptor, channel, options, settings, defaultMetadata, ec);
7688
}
7789
} else if(method.methodType == akka.grpc.gen.BidiStreaming){
7890
private final StreamResponseRequestBuilder<akka.stream.javadsl.Source<@method.inputTypeUnboxed, akka.NotUsed>, @method.outputTypeUnboxed> @{method.name}RequestBuilder(akka.grpc.internal.InternalChannel channel){
7991
return new JavaBidirectionalStreamingRequestBuilder<>(
80-
@{method.name}Descriptor, channel, options, settings, ec);
92+
@{method.name}Descriptor, channel, options, settings, defaultMetadata, ec);
8193
}
8294
}
8395
}
@@ -139,8 +151,30 @@ public abstract class @{service.name}Client extends @{service.name}ClientPowerAp
139151
public java.util.concurrent.CompletionStage<akka.Done> closed() {
140152
return channel.closedCS();
141153
}
154+
155+
/**
156+
* The same client instance decorated to add the given key and value to the metadata of any request issued.
157+
*/
158+
public @{service.name}Client addRequestHeader(String key, String value) {
159+
return new Default@{service.name}Client(
160+
channel,
161+
isChannelOwned,
162+
settings,
163+
options,
164+
mat,
165+
ec,
166+
defaultMetadata.addEntry(key, value));
167+
}
142168
}
143169

170+
/**
171+
* The same client instance decorated to add the given key and value to the metadata of any request issued.
172+
*/
173+
public @{service.name}Client addRequestHeader(String key, String value) {
174+
// default implementation ignoring header for source compatibility
175+
return this;
176+
}
177+
144178
}
145179

146180

codegen/src/main/twirl/templates/ScalaClient/Client.scala.txt

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import akka.grpc.GrpcClientSettings
1818
import akka.grpc.scaladsl.AkkaGrpcClient
1919

2020
import akka.grpc.internal.NettyClientUtils
21+
import akka.grpc.internal.MetadataImpl
2122

2223
import akka.grpc.AkkaGrpcGenerated
2324

@@ -39,16 +40,23 @@ import akka.grpc.AkkaGrpcGenerated
3940

4041
// Not sealed so users can extend to write their stubs
4142
@@AkkaGrpcGenerated
42-
trait @{service.name}Client extends @{service.name} with @{service.name}ClientPowerApi with AkkaGrpcClient
43+
trait @{service.name}Client extends @{service.name} with @{service.name}ClientPowerApi with AkkaGrpcClient {
44+
/**
45+
* The same client instance decorated to add the given key and value to the metadata of any request issued.
46+
*/
47+
override def addRequestHeader(key: String, value: String): @{service.name}Client =
48+
// default implementation ignoring header for source compatibility
49+
this
50+
}
4351

4452
@@AkkaGrpcGenerated
4553
object @{service.name}Client {
4654
def apply(settings: GrpcClientSettings)(implicit sys: ClassicActorSystemProvider): @{service.name}Client =
47-
new Default@{service.name}Client(GrpcChannel(settings), isChannelOwned = true)
55+
new Default@{service.name}Client(GrpcChannel(settings), isChannelOwned = true, defaultMetadata = MetadataImpl.empty)
4856
def apply(channel: GrpcChannel)(implicit sys: ClassicActorSystemProvider): @{service.name}Client =
49-
new Default@{service.name}Client(channel, isChannelOwned = false)
57+
new Default@{service.name}Client(channel, isChannelOwned = false, defaultMetadata = MetadataImpl.empty)
5058

51-
private class Default@{service.name}Client(channel: GrpcChannel, isChannelOwned: Boolean)(implicit sys: ClassicActorSystemProvider) extends @{service.name}Client {
59+
private class Default@{service.name}Client(channel: GrpcChannel, isChannelOwned: Boolean, defaultMetadata: MetadataImpl)(implicit sys: ClassicActorSystemProvider) extends @{service.name}Client {
5260
import @{service.name}.MethodDescriptors._
5361

5462
private implicit val ex: ExecutionContext = sys.classicSystem.dispatcher
@@ -58,14 +66,14 @@ object @{service.name}Client {
5866
@for(method <- service.methods) {
5967
private def @{method.name}RequestBuilder(channel: akka.grpc.internal.InternalChannel) =
6068
@if(method.methodType == akka.grpc.gen.Unary) {
61-
new ScalaUnaryRequestBuilder(@{method.name}Descriptor, channel, options, settings)
69+
new ScalaUnaryRequestBuilder(@{method.name}Descriptor, channel, options, settings, defaultMetadata)
6270
} else {
6371
@if(method.methodType == akka.grpc.gen.ServerStreaming) {
64-
new ScalaServerStreamingRequestBuilder(@{method.name}Descriptor, channel, options, settings)
72+
new ScalaServerStreamingRequestBuilder(@{method.name}Descriptor, channel, options, settings, defaultMetadata)
6573
} else if(method.methodType == akka.grpc.gen.ClientStreaming) {
66-
new ScalaClientStreamingRequestBuilder(@{method.name}Descriptor, channel, options, settings)
74+
new ScalaClientStreamingRequestBuilder(@{method.name}Descriptor, channel, options, settings, defaultMetadata)
6775
} else if (method.methodType == akka.grpc.gen.BidiStreaming) {
68-
new ScalaBidirectionalStreamingRequestBuilder(@{method.name}Descriptor, channel, options, settings)
76+
new ScalaBidirectionalStreamingRequestBuilder(@{method.name}Descriptor, channel, options, settings, defaultMetadata)
6977
}
7078
}
7179
}
@@ -90,6 +98,13 @@ object @{service.name}Client {
9098
@{method.nameSafe}().invoke(in)
9199
}
92100

101+
/**
102+
* The same client instance decorated to add the given key and value to the metadata of any request issued.
103+
*/
104+
override def addRequestHeader(key: String, value: String): @{service.name}Client =
105+
new Default@{service.name}Client(channel, isChannelOwned, defaultMetadata.addEntry(key, value))
106+
107+
93108
override def close(): scala.concurrent.Future[akka.Done] =
94109
if (isChannelOwned) channel.close()
95110
else throw new GrpcClientCloseException()
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Copyright (C) 2024 Lightbend Inc. <https://www.lightbend.com>
3+
*/
4+
5+
package example.myapp.helloworld;
6+
7+
import akka.actor.ActorSystem;
8+
import akka.grpc.GrpcClientSettings;
9+
import example.myapp.helloworld.grpc.GreeterServiceClient;
10+
import example.myapp.helloworld.grpc.HelloReply;
11+
import example.myapp.helloworld.grpc.HelloRequest;
12+
import io.grpc.StatusRuntimeException;
13+
14+
import java.util.concurrent.ExecutionException;
15+
import java.util.concurrent.TimeUnit;
16+
17+
public class AuthenticatedGreeterClient {
18+
19+
public static void main(String[] args) throws Exception {
20+
21+
String serverHost = "127.0.0.1";
22+
int serverPort = 8082;
23+
24+
ActorSystem system = ActorSystem.create("HelloWorldClient");
25+
26+
// Configure the client by code:
27+
GrpcClientSettings settings = GrpcClientSettings.connectToServiceAt(serverHost, serverPort, system).withTls(false);
28+
29+
GreeterServiceClient client = null;
30+
try {
31+
client = GreeterServiceClient.create(settings, system);
32+
33+
try {
34+
client.sayHello(HelloRequest.newBuilder().setName("Alice").build()).toCompletableFuture().get(5, TimeUnit.SECONDS);
35+
throw new RuntimeException("Call should have failed");
36+
} catch (ExecutionException ex) {
37+
system.log().warning(ex, "Call without authentication fails as expected");
38+
}
39+
40+
HelloReply replyWhenAuthenticated = client.sayHello()
41+
.addHeader("Token", "XYZ")
42+
.invoke(HelloRequest.newBuilder().setName("Alice").build()).toCompletableFuture().get(5, TimeUnit.SECONDS);
43+
system.log().info("Call with authentication succeeds: {}", replyWhenAuthenticated);
44+
45+
GreeterServiceClient clientWithMeta = client.addRequestHeader("Token", "XYZ");
46+
47+
HelloReply replyWhenInterceptAuthenticated = clientWithMeta.sayHello()
48+
.addHeader("Token", "XYZ")
49+
.invoke(HelloRequest.newBuilder().setName("Alice").build()).toCompletableFuture().get(5, TimeUnit.SECONDS);
50+
system.log().info("Call with authentication succeeds: {}", replyWhenInterceptAuthenticated);
51+
52+
} catch (StatusRuntimeException e) {
53+
System.out.println("Status: " + e.getStatus());
54+
} catch (Exception e) {
55+
System.out.println(e);
56+
} finally {
57+
if (client != null) client.close();
58+
system.terminate();
59+
}
60+
}
61+
}

plugin-tester-java/src/test/scala/example/myapp/helloworld/JGreeterServiceSpec.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,12 @@ class JGreeterServiceSpec extends Matchers with AnyWordSpecLike with BeforeAndAf
5656
HelloReply.newBuilder.setMessage("Hello, Alice").setTimestamp(timestamp).build()
5757
reply.toCompletableFuture.get should ===(expectedResponse)
5858
}
59+
60+
"use default metadata" in {
61+
val clientWithHeader = clients.last.addRequestHeader("Authorization", "Bearer test")
62+
val reply = clientWithHeader.sayHello(HelloRequest.newBuilder().setName("Alice").build())
63+
reply.toCompletableFuture.get should ===(HelloReply.newBuilder.setMessage("Hello, Alice (authenticated)").build())
64+
}
5965
}
6066

6167
"GreeterServicePowerApi" should {

plugin-tester-scala/src/main/scala/example/myapp/helloworld/AuthenticatedGreeterClient.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,5 +27,13 @@ object AuthenticatedGreeterClient {
2727
val replyWhenAuthenticated =
2828
Await.result(client.sayHello().addHeader("Token", "XYZ").invoke(HelloRequest("Alice")), 10.seconds)
2929
sys.log.warning(s"Call with authentication succeeds: $replyWhenAuthenticated")
30+
31+
val interceptedClient: GreeterServiceClient =
32+
client.addRequestHeader("Token", "XYZ")
33+
34+
val replyWhenInterceptAuthenticated =
35+
Await.result(interceptedClient.sayHello(HelloRequest("Alice")), 10.seconds)
36+
sys.log.warning(s"Call with intercepted authentication succeeds: $replyWhenInterceptAuthenticated")
37+
3038
}
3139
}

plugin-tester-scala/src/test/scala/example/myapp/helloworld/GreeterServiceSpec.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,12 @@ class GreeterSpec extends Matchers with AnyWordSpecLike with BeforeAndAfterAll w
6969
val reply = eagerClient.sayHello(HelloRequest("Alice"))
7070
reply.futureValue should ===(HelloReply("Hello, Alice", Some(Timestamp.apply(123456, 123))))
7171
}
72+
73+
"use default metadata" in {
74+
val clientWithHeader = clients.last.addRequestHeader("Authorization", "Bearer test")
75+
val reply = clientWithHeader.sayHello(HelloRequest("Alice"))
76+
reply.futureValue should ===(HelloReply("Hello, Alice (authenticated)"))
77+
}
7278
}
7379

7480
"GreeterServicePowerApi" should {
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
# ok because has default impl in generated user impls
2+
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.grpc.javadsl.AkkaGrpcClient.addRequestHeader")
3+
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.grpc.scaladsl.AkkaGrpcClient.addRequestHeader")
4+
# new internal constructor in generated classes (and we generate reflection client)
5+
ProblemFilters.exclude[DirectMissingMethodProblem]("grpc.reflection.v1alpha.reflection.ServerReflectionClient#DefaultServerReflectionClient.this")

runtime/src/main/scala/akka/grpc/internal/MetadataImpl.scala

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,11 @@ import com.google.protobuf.any
1919
import com.google.rpc.Status
2020
import scalapb.{ GeneratedMessage, GeneratedMessageCompanion }
2121

22-
@InternalApi private[akka] object MetadataImpl {
22+
/**
23+
* INTERNAL API
24+
*/
25+
// Note: empty value used by generated code, cannot be private
26+
@InternalApi object MetadataImpl {
2327
val BINARY_SUFFIX: String = io.grpc.Metadata.BINARY_HEADER_SUFFIX
2428

2529
val empty = new MetadataImpl(List.empty)
@@ -56,7 +60,11 @@ import scalapb.{ GeneratedMessage, GeneratedMessageCompanion }
5660
}
5761
}
5862

59-
@InternalApi private[akka] final class MetadataImpl(val entries: List[(String, MetadataEntry)]) {
63+
/**
64+
* INTERNAL API
65+
*/
66+
// Note: type used by generated code, cannot be private
67+
@InternalApi final class MetadataImpl(val entries: List[(String, MetadataEntry)]) {
6068
def addEntry(key: String, value: String): MetadataImpl = {
6169
if (key.endsWith(MetadataImpl.BINARY_SUFFIX))
6270
throw new IllegalArgumentException(s"String header names must not end with '${MetadataImpl.BINARY_SUFFIX}'")

runtime/src/main/scala/akka/grpc/javadsl/AkkaGrpcClient.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,11 @@ import akka.annotation.DoNotInherit
1313
@DoNotInherit
1414
trait AkkaGrpcClient {
1515

16+
/**
17+
* @return The same client decorated to add the given key and value to the metadata of any request issued.
18+
*/
19+
def addRequestHeader(key: String, value: String): AkkaGrpcClient
20+
1621
/**
1722
* Initiates a shutdown in which preexisting and new calls are cancelled.
1823
*

runtime/src/main/scala/akka/grpc/scaladsl/AkkaGrpcClient.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,9 @@ trait AkkaGrpcClient {
2929
* after maxConnectionAttempts.
3030
*/
3131
def closed: Future[Done]
32+
33+
/**
34+
* The same client instance decorated to add the given key and value to the metadata of any request issued.
35+
*/
36+
def addRequestHeader(key: String, value: String): AkkaGrpcClient
3237
}

0 commit comments

Comments
 (0)