Skip to content

Commit 2f3d86b

Browse files
committed
Last pull from internal before launch, hopefully. Stop implementing CoroutineScope in AbstractCoroutineServerImpl.
1 parent 5b6b64a commit 2f3d86b

File tree

11 files changed

+247
-296
lines changed

11 files changed

+247
-296
lines changed

compiler/src/main/java/io/grpc/kotlin/generator/GrpcCoroutineServerGenerator.kt

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,30 @@ package io.grpc.kotlin.generator
1919
import com.google.common.annotations.VisibleForTesting
2020
import com.google.protobuf.Descriptors.MethodDescriptor
2121
import com.google.protobuf.Descriptors.ServiceDescriptor
22-
import com.squareup.kotlinpoet.*
22+
import com.squareup.kotlinpoet.ClassName
23+
import com.squareup.kotlinpoet.CodeBlock
24+
import com.squareup.kotlinpoet.FunSpec
25+
import com.squareup.kotlinpoet.KModifier
26+
import com.squareup.kotlinpoet.MemberName
2327
import com.squareup.kotlinpoet.MemberName.Companion.member
28+
import com.squareup.kotlinpoet.ParameterSpec
2429
import com.squareup.kotlinpoet.ParameterizedTypeName.Companion.parameterizedBy
30+
import com.squareup.kotlinpoet.TypeSpec
31+
import com.squareup.kotlinpoet.asClassName
2532
import io.grpc.ServerServiceDefinition
2633
import io.grpc.Status
2734
import io.grpc.StatusException
2835
import io.grpc.kotlin.AbstractCoroutineServerImpl
2936
import io.grpc.kotlin.ServerCalls
30-
import io.grpc.kotlin.generator.protoc.*
37+
import io.grpc.kotlin.generator.protoc.Declarations
38+
import io.grpc.kotlin.generator.protoc.GeneratorConfig
39+
import io.grpc.kotlin.generator.protoc.MemberSimpleName
40+
import io.grpc.kotlin.generator.protoc.builder
41+
import io.grpc.kotlin.generator.protoc.classBuilder
42+
import io.grpc.kotlin.generator.protoc.declarations
43+
import io.grpc.kotlin.generator.protoc.methodName
44+
import io.grpc.kotlin.generator.protoc.of
45+
import io.grpc.kotlin.generator.protoc.serviceName
3146
import kotlinx.coroutines.CancellationException
3247
import kotlinx.coroutines.flow.Flow
3348
import kotlin.coroutines.CoroutineContext
@@ -78,7 +93,7 @@ class GrpcCoroutineServerGenerator(config: GeneratorConfig): ServiceCodeGenerato
7893
.addModifiers(KModifier.ABSTRACT)
7994
.addKdoc(
8095
"""
81-
Skeletal implementation of the %L service based on Kotlin coroutines.
96+
Skeletal implementation of the %L service based on Kotlin coroutines.
8297
""".trimIndent(),
8398
service.fullName
8499
)
@@ -167,7 +182,7 @@ class GrpcCoroutineServerGenerator(config: GeneratorConfig): ServiceCodeGenerato
167182
CodeBlock.of(
168183
"""
169184
%M(
170-
scope = this,
185+
context = this.context,
171186
descriptor = %L,
172187
implementation = ::%N
173188
)
@@ -204,7 +219,7 @@ class GrpcCoroutineServerGenerator(config: GeneratorConfig): ServiceCodeGenerato
204219
If creating or collecting the returned flow fails with a [%statusException:T], the RPC
205220
will fail with the corresponding [%status:T]. If it fails with a
206221
[%cancellationException:T], the RPC will fail with status `Status.CANCELLED`. If creating
207-
or collecting the returned flow fails for any other reason, the RPC will fail with
222+
or collecting the returned flow fails for any other reason, the RPC will fail with
208223
`Status.UNKNOWN` with the exception as a cause.
209224
""".trimIndent()
210225
)
@@ -241,4 +256,4 @@ class GrpcCoroutineServerGenerator(config: GeneratorConfig): ServiceCodeGenerato
241256
.addNamed(kDocSections.joinToString("\n\n"), kDocBindings)
242257
.build()
243258
}
244-
}
259+
}

compiler/src/test/java/io/grpc/kotlin/generator/CoroutineServerImplGeneratorTest.kt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ class CoroutineServerImplGeneratorTest {
6464
""".trimIndent())
6565
assertThat(stub.serverMethodDef.toString()).isEqualTo("""
6666
io.grpc.kotlin.ServerCalls.unaryServerMethodDefinition(
67-
scope = this,
67+
context = this.context,
6868
descriptor = io.grpc.examples.helloworld.GreeterGrpc.getSayHelloMethod(),
6969
implementation = ::sayHello
7070
)
@@ -91,7 +91,7 @@ class CoroutineServerImplGeneratorTest {
9191
""".trimIndent())
9292
assertThat(stub.serverMethodDef.toString()).isEqualTo("""
9393
io.grpc.kotlin.ServerCalls.clientStreamingServerMethodDefinition(
94-
scope = this,
94+
context = this.context,
9595
descriptor = io.grpc.examples.helloworld.GreeterGrpc.getClientStreamSayHelloMethod(),
9696
implementation = ::clientStreamSayHello
9797
)
@@ -117,7 +117,7 @@ class CoroutineServerImplGeneratorTest {
117117
""".trimIndent())
118118
assertThat(stub.serverMethodDef.toString()).isEqualTo("""
119119
io.grpc.kotlin.ServerCalls.serverStreamingServerMethodDefinition(
120-
scope = this,
120+
context = this.context,
121121
descriptor = io.grpc.examples.helloworld.GreeterGrpc.getServerStreamSayHelloMethod(),
122122
implementation = ::serverStreamSayHello
123123
)
@@ -145,7 +145,7 @@ class CoroutineServerImplGeneratorTest {
145145
""".trimIndent())
146146
assertThat(stub.serverMethodDef.toString()).isEqualTo("""
147147
io.grpc.kotlin.ServerCalls.bidiStreamingServerMethodDefinition(
148-
scope = this,
148+
context = this.context,
149149
descriptor = io.grpc.examples.helloworld.GreeterGrpc.getBidiStreamSayHelloMethod(),
150150
implementation = ::bidiStreamSayHello
151151
)

compiler/src/test/java/io/grpc/kotlin/generator/GreeterCoroutineImplBase.expected

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,8 @@ import kotlinx.coroutines.flow.Flow
2020
/**
2121
* Skeletal implementation of the helloworld.Greeter service based on Kotlin coroutines.
2222
*/
23-
abstract class GreeterCoroutineImplBase(
24-
coroutineContext: CoroutineContext = EmptyCoroutineContext
25-
) : AbstractCoroutineServerImpl(coroutineContext) {
23+
abstract class GreeterCoroutineImplBase(coroutineContext: CoroutineContext = EmptyCoroutineContext)
24+
: AbstractCoroutineServerImpl(coroutineContext) {
2625
/**
2726
* Returns the response to an RPC for helloworld.Greeter.SayHello.
2827
*
@@ -34,8 +33,10 @@ abstract class GreeterCoroutineImplBase(
3433
*
3534
* @param request The request from the client.
3635
*/
37-
open suspend fun sayHello(request: HelloRequest): HelloReply = throw
38-
StatusException(UNIMPLEMENTED.withDescription("Method helloworld.Greeter.SayHello is unimplemented"))
36+
open suspend fun sayHello(request: HelloRequest): HelloReply {
37+
throw
38+
StatusException(UNIMPLEMENTED.withDescription("Method helloworld.Greeter.SayHello is unimplemented"))
39+
}
3940

4041
/**
4142
* Returns the response to an RPC for helloworld.Greeter.ClientStreamSayHello.
@@ -50,8 +51,10 @@ abstract class GreeterCoroutineImplBase(
5051
* collected only once and throws [java.lang.IllegalStateException] on attempts to collect
5152
* it more than once.
5253
*/
53-
open suspend fun clientStreamSayHello(requests: Flow<HelloRequest>): HelloReply = throw
54-
StatusException(UNIMPLEMENTED.withDescription("Method helloworld.Greeter.ClientStreamSayHello is unimplemented"))
54+
open suspend fun clientStreamSayHello(requests: Flow<HelloRequest>): HelloReply {
55+
throw
56+
StatusException(UNIMPLEMENTED.withDescription("Method helloworld.Greeter.ClientStreamSayHello is unimplemented"))
57+
}
5558

5659
/**
5760
* Returns a [Flow] of responses to an RPC for helloworld.Greeter.ServerStreamSayHello.
@@ -65,8 +68,10 @@ abstract class GreeterCoroutineImplBase(
6568
*
6669
* @param request The request from the client.
6770
*/
68-
open fun serverStreamSayHello(request: MultiHelloRequest): Flow<HelloReply> = throw
69-
StatusException(UNIMPLEMENTED.withDescription("Method helloworld.Greeter.ServerStreamSayHello is unimplemented"))
71+
open fun serverStreamSayHello(request: MultiHelloRequest): Flow<HelloReply> {
72+
throw
73+
StatusException(UNIMPLEMENTED.withDescription("Method helloworld.Greeter.ServerStreamSayHello is unimplemented"))
74+
}
7075

7176
/**
7277
* Returns a [Flow] of responses to an RPC for helloworld.Greeter.BidiStreamSayHello.
@@ -82,27 +87,29 @@ abstract class GreeterCoroutineImplBase(
8287
* collected only once and throws [java.lang.IllegalStateException] on attempts to collect
8388
* it more than once.
8489
*/
85-
open fun bidiStreamSayHello(requests: Flow<HelloRequest>): Flow<HelloReply> = throw
86-
StatusException(UNIMPLEMENTED.withDescription("Method helloworld.Greeter.BidiStreamSayHello is unimplemented"))
90+
open fun bidiStreamSayHello(requests: Flow<HelloRequest>): Flow<HelloReply> {
91+
throw
92+
StatusException(UNIMPLEMENTED.withDescription("Method helloworld.Greeter.BidiStreamSayHello is unimplemented"))
93+
}
8794

8895
final override fun bindService(): ServerServiceDefinition = builder(getServiceDescriptor())
8996
.addMethod(unaryServerMethodDefinition(
90-
scope = this,
97+
context = this.context,
9198
descriptor = GreeterGrpc.getSayHelloMethod(),
9299
implementation = ::sayHello
93100
))
94101
.addMethod(clientStreamingServerMethodDefinition(
95-
scope = this,
102+
context = this.context,
96103
descriptor = GreeterGrpc.getClientStreamSayHelloMethod(),
97104
implementation = ::clientStreamSayHello
98105
))
99106
.addMethod(serverStreamingServerMethodDefinition(
100-
scope = this,
107+
context = this.context,
101108
descriptor = GreeterGrpc.getServerStreamSayHelloMethod(),
102109
implementation = ::serverStreamSayHello
103110
))
104111
.addMethod(bidiStreamingServerMethodDefinition(
105-
scope = this,
112+
context = this.context,
106113
descriptor = GreeterGrpc.getBidiStreamSayHelloMethod(),
107114
implementation = ::bidiStreamSayHello
108115
)).build()

stub/src/main/java/io/grpc/kotlin/AbstractCoroutineServerImpl.kt

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,24 +17,23 @@
1717
package io.grpc.kotlin
1818

1919
import io.grpc.BindableService
20-
import kotlinx.coroutines.CoroutineScope
21-
import kotlinx.coroutines.Job
22-
import kotlinx.coroutines.SupervisorJob
2320
import kotlin.coroutines.CoroutineContext
2421
import kotlin.coroutines.EmptyCoroutineContext
2522

2623
/**
2724
* Skeleton implementation of a coroutine-based gRPC server implementation. Intended to be
2825
* subclassed by generated code.
2926
*/
30-
abstract class AbstractCoroutineServerImpl private constructor(
31-
private val delegateScope: CoroutineScope
32-
) : CoroutineScope, BindableService {
33-
34-
constructor(coroutineContext: CoroutineContext = EmptyCoroutineContext) :
35-
this(CoroutineScope(coroutineContext + SupervisorJob(coroutineContext[Job])))
36-
// We want a SupervisorJob so one failed RPC does not bring down the entire server.
37-
38-
final override val coroutineContext: CoroutineContext
39-
get() = delegateScope.coroutineContext
40-
}
27+
abstract class AbstractCoroutineServerImpl(
28+
/** The context in which to run server coroutines. */
29+
val context: CoroutineContext = EmptyCoroutineContext
30+
) : BindableService {
31+
/*
32+
* Each RPC is executed in its own coroutine scope built from [context]. We could have a parent
33+
* scope, but it doesn't really add anything: we don't want users to be able to launch tasks
34+
* in that scope easily, since almost all coroutines should be scoped to the RPC and cancelled
35+
* if the RPC is cancelled. Users who don't want that behavior should manage their own scope for
36+
* it. Additionally, gRPC server objects don't have their own notion of shutdown: shutting down
37+
* a server means cancelling the RPCs, not calling a teardown on the server object.
38+
*/
39+
}

stub/src/main/java/io/grpc/kotlin/ClientCalls.kt

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -109,15 +109,13 @@ object ClientCalls {
109109
headers: suspend () -> GrpcMetadata = { GrpcMetadata() }
110110
): (RequestT) -> Flow<ResponseT> = {
111111
flow {
112-
emitAll(
113-
serverStreamingRpc(
114-
channel,
115-
method,
116-
it,
117-
callOptions,
118-
headers()
119-
)
120-
)
112+
serverStreamingRpc(
113+
channel,
114+
method,
115+
it,
116+
callOptions,
117+
headers()
118+
).collect { emit(it) }
121119
}
122120
}
123121

@@ -207,15 +205,13 @@ object ClientCalls {
207205
): (Flow<RequestT>) -> Flow<ResponseT> =
208206
{
209207
flow {
210-
emitAll(
211-
bidiStreamingRpc(
212-
channel,
213-
method,
214-
it,
215-
callOptions,
216-
headers()
217-
)
218-
)
208+
bidiStreamingRpc(
209+
channel,
210+
method,
211+
it,
212+
callOptions,
213+
headers()
214+
).collect { emit(it) }
219215
}
220216
}
221217

stub/src/main/java/io/grpc/kotlin/GrpcContextElement.kt

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,9 @@
1616

1717
package io.grpc.kotlin
1818

19-
import kotlinx.coroutines.CoroutineScope
20-
import io.grpc.Context as GrpcContext
21-
import kotlin.coroutines.CoroutineContext
2219
import kotlinx.coroutines.ThreadContextElement
23-
import kotlinx.coroutines.withContext
20+
import kotlin.coroutines.CoroutineContext
21+
import io.grpc.Context as GrpcContext
2422

2523
/**
2624
* A [CoroutineContext] that propagates an associated [io.grpc.Context] to coroutines run using

stub/src/main/java/io/grpc/kotlin/Helpers.kt

Lines changed: 19 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,16 @@ import kotlinx.coroutines.Dispatchers
2323
import kotlinx.coroutines.Job
2424
import kotlinx.coroutines.cancel
2525
import kotlinx.coroutines.flow.Flow
26-
import kotlinx.coroutines.flow.FlowCollector
2726
import kotlinx.coroutines.flow.collect
27+
import kotlinx.coroutines.flow.flow
28+
import kotlinx.coroutines.flow.single
2829
import kotlinx.coroutines.runBlocking
2930

3031
/**
3132
* Extracts the value of a [Deferred] known to be completed, or throws its exception if it was
3233
* not completed successfully. (Non-experimental variant of `getDone`.)
3334
*/
34-
internal val <T> Deferred<T>.doneValue: T
35+
val <T> Deferred<T>.doneValue: T
3536
get() {
3637
check(isCompleted) { "doneValue should only be called on completed Deferred values" }
3738
return runBlocking(Dispatchers.Unconfined) {
@@ -42,52 +43,44 @@ internal val <T> Deferred<T>.doneValue: T
4243
/**
4344
* Cancels a [Job] with a cause and suspends until the job completes/is finished cancelling.
4445
*/
45-
internal suspend fun Job.cancelAndJoin(message: String, cause: Exception? = null) {
46+
suspend fun Job.cancelAndJoin(message: String, cause: Exception? = null) {
4647
cancel(message, cause)
4748
join()
4849
}
4950

50-
internal suspend fun <T> FlowCollector<T>.emitAll(flow: Flow<T>) {
51-
flow.collect { emit(it) }
52-
}
53-
5451
/**
55-
* Extracts the one and only element of this flow, throwing an appropriate [StatusException] if
56-
* there is not exactly one element. (Otherwise this is fully equivalent to `Flow.single()`.)
52+
* Returns this flow, save that if there is not exactly one element, it throws a [StatusException].
53+
*
54+
* The purpose of this function is to enable the one element to get processed before we have
55+
* confirmation that the input flow is done.
5756
*/
58-
internal suspend fun <T> Flow<T>.singleOrStatus(
57+
fun <T> Flow<T>.singleOrStatusFlow(
5958
expected: String,
6059
descriptor: Any
61-
): T {
62-
// We could call Flow.single() and catch exceptions, but if the underlying flow throws
63-
// IllegalStateException or NoSuchElementException for its own reasons, we'd swallow those
64-
// and give misleading errors. Instead, we reimplement single() ourselves.
65-
var result: T? = null
60+
): Flow<T> = flow {
6661
var found = false
6762
collect {
6863
if (!found) {
6964
found = true
70-
result = it
65+
emit(it)
7166
} else {
7267
throw StatusException(
7368
Status.INTERNAL.withDescription("Expected one $expected for $descriptor but received two")
7469
)
7570
}
7671
}
77-
@Suppress("UNCHECKED_CAST")
7872
if (!found) {
7973
throw StatusException(
8074
Status.INTERNAL.withDescription("Expected one $expected for $descriptor but received none")
8175
)
82-
} else {
83-
return result as T
8476
}
8577
}
8678

87-
/** Runs [block] and returns any exception it throws, or `null` if it does not throw. */
88-
internal inline fun thrownOrNull(block: () -> Unit): Throwable? = try {
89-
block()
90-
null
91-
} catch (thrown: Throwable) {
92-
thrown
93-
}
79+
/**
80+
* Returns the one and only element of this flow, and throws a [StatusException] if there is not
81+
* exactly one element.
82+
*/
83+
suspend fun <T> Flow<T>.singleOrStatus(
84+
expected: String,
85+
descriptor: Any
86+
): T = singleOrStatusFlow(expected, descriptor).single()

0 commit comments

Comments
 (0)