diff --git a/api/src/main/java/io/grpc/InternalCensus.java b/api/src/main/java/io/grpc/InternalCensus.java new file mode 100644 index 00000000000..b129218beca --- /dev/null +++ b/api/src/main/java/io/grpc/InternalCensus.java @@ -0,0 +1,31 @@ +/* + * Copyright 2020 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc; + +/** + * Internal accessor for configuring Census features. Do not use this. + */ +@Internal +public final class InternalCensus { + + /** + * Key to access the configuration if the default client side census features are disabled. + */ + @Internal + public static final CallOptions.Key DISABLE_CLIENT_DEFAULT_CENSUS = + CallOptions.Key.create("io.grpc.DISABLE_CLIENT_DEFAULT_CENSUS_STATS"); +} diff --git a/census/src/main/java/io/grpc/census/CensusClientInterceptor.java b/census/src/main/java/io/grpc/census/CensusClientInterceptor.java new file mode 100644 index 00000000000..f36435229d9 --- /dev/null +++ b/census/src/main/java/io/grpc/census/CensusClientInterceptor.java @@ -0,0 +1,146 @@ +/* + * Copyright 2020 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.census; + +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.ClientInterceptor; +import io.grpc.ClientInterceptors; +import io.grpc.InternalCensus; +import io.grpc.MethodDescriptor; +import java.util.ArrayList; +import java.util.List; + +/** + * A {@link ClientInterceptor} for configuring client side OpenCensus features with + * custom settings. Note OpenCensus stats and tracing features are turned on by default + * if grpc-census artifact is in the runtime classpath. The gRPC core + * library does not provide public APIs for customized OpenCensus configurations. + * Use this interceptor to do so. Intended for advanced usages. + * + *

Applying this interceptor disables the channel's default stats and tracing + * features. The effectively OpenCensus features are determined by configurations in this + * interceptor. + * + *

For the current release, applying this interceptor may have the side effect that + * effectively disables retry. + */ +// TODO(chengyuanzhang): add ExperimentalApi annotation. +public final class CensusClientInterceptor implements ClientInterceptor { + + private final List interceptors = new ArrayList<>(); + + private CensusClientInterceptor( + boolean statsEnabled, + boolean recordStartedRpcs, + boolean recordFinishedRpcs, + boolean recordRealTimeMetrics, + boolean tracingEnabled) { + if (statsEnabled) { + CensusStatsModule censusStats = + new CensusStatsModule(recordStartedRpcs, recordFinishedRpcs, recordRealTimeMetrics); + interceptors.add(censusStats.getClientInterceptor()); + } + if (tracingEnabled) { + CensusTracingModule censusTracing = new CensusTracingModule(); + interceptors.add(censusTracing.getClientInterceptor()); + } + } + + @Override + public ClientCall interceptCall(MethodDescriptor method, + CallOptions callOptions, Channel next) { + callOptions = callOptions.withOption(InternalCensus.DISABLE_CLIENT_DEFAULT_CENSUS, true); + if (!interceptors.isEmpty()) { + next = + ClientInterceptors.intercept(next, interceptors.toArray(new ClientInterceptor[0])); + } + return next.newCall(method, callOptions); + } + + /** + * Creates a new builder for a {@link CensusClientInterceptor}. + */ + public static Builder newBuilder() { + return new Builder(); + } + + /** + * A builder for a {@link CensusClientInterceptor}. + */ + public static class Builder { + + private boolean statsEnabled; + private boolean recordStartedRpcs; + private boolean recordFinishedRpcs; + private boolean recordRealTimeMetrics; + private boolean tracingEnabled; + + /** + * Disable or enable stats features. Disabled by default. + */ + public Builder setStatsEnabled(boolean value) { + statsEnabled = value; + return this; + } + + /** + * Disable or enable real-time metrics recording. Effective only if {@link #setStatsEnabled} + * is set to true. Disabled by default. + */ + public Builder setRecordStartedRpcs(boolean value) { + recordStartedRpcs = value; + return this; + } + + /** + * Disable or enable stats recording for RPC completions. Effective only if {@link + * #setStatsEnabled} is set to true. Disabled by default. + */ + public Builder setRecordFinishedRpcs(boolean value) { + recordFinishedRpcs = value; + return this; + } + + /** + * Disable or enable stats recording for RPC upstarts. Effective only if {@link + * #setStatsEnabled} is set to true. Disabled by default. + */ + public Builder setRecordRealTimeMetrics(boolean value) { + recordRealTimeMetrics = value; + return this; + } + + /** + * Disable or enable tracing features. Disabled by default. + */ + public Builder setTracingEnabled(boolean value) { + tracingEnabled = value; + return this; + } + + /** + * Builds the {@link CensusClientInterceptor}. + */ + public CensusClientInterceptor build() { + return new CensusClientInterceptor( + statsEnabled, recordStartedRpcs, recordFinishedRpcs, recordRealTimeMetrics, + tracingEnabled); + } + } +} diff --git a/census/src/main/java/io/grpc/census/CensusStatsModule.java b/census/src/main/java/io/grpc/census/CensusStatsModule.java index d625a6f5c6f..0f37a152a13 100644 --- a/census/src/main/java/io/grpc/census/CensusStatsModule.java +++ b/census/src/main/java/io/grpc/census/CensusStatsModule.java @@ -71,6 +71,12 @@ final class CensusStatsModule { private static final Logger logger = Logger.getLogger(CensusStatsModule.class.getName()); private static final double NANOS_PER_MILLI = TimeUnit.MILLISECONDS.toNanos(1); + private static final Supplier STOPWATCH_SUPPLIER = new Supplier() { + @Override + public Stopwatch get() { + return Stopwatch.createUnstarted(); + } + }; private final Tagger tagger; private final StatsRecorder statsRecorder; @@ -78,22 +84,24 @@ final class CensusStatsModule { @VisibleForTesting final Metadata.Key statsHeader; private final boolean propagateTags; - private final boolean recordStartedRpcs; - private final boolean recordFinishedRpcs; - private final boolean recordRealTimeMetrics; + @VisibleForTesting + final boolean recordStartedRpcs; + @VisibleForTesting + final boolean recordFinishedRpcs; + @VisibleForTesting + final boolean recordRealTimeMetrics; /** * Creates a {@link CensusStatsModule} with the default OpenCensus implementation. */ - CensusStatsModule(Supplier stopwatchSupplier, - boolean propagateTags, boolean recordStartedRpcs, boolean recordFinishedRpcs, - boolean recordRealTimeMetrics) { + CensusStatsModule( + boolean recordStartedRpcs, boolean recordFinishedRpcs, boolean recordRealTimeMetrics) { this( Tags.getTagger(), Tags.getTagPropagationComponent().getBinarySerializer(), Stats.getStatsRecorder(), - stopwatchSupplier, - propagateTags, recordStartedRpcs, recordFinishedRpcs, recordRealTimeMetrics); + STOPWATCH_SUPPLIER, + true, recordStartedRpcs, recordFinishedRpcs, recordRealTimeMetrics); } /** @@ -345,7 +353,8 @@ static final class ClientCallTracer extends ClientStreamTracer.Factory { callEndedUpdater = tmpCallEndedUpdater; } - private final CensusStatsModule module; + @VisibleForTesting + final CensusStatsModule module; private final Stopwatch stopwatch; private volatile ClientTracer streamTracer; private volatile int callEnded; diff --git a/census/src/main/java/io/grpc/census/CensusTracingModule.java b/census/src/main/java/io/grpc/census/CensusTracingModule.java index e9e12941ed4..f7a7b7c155a 100644 --- a/census/src/main/java/io/grpc/census/CensusTracingModule.java +++ b/census/src/main/java/io/grpc/census/CensusTracingModule.java @@ -39,6 +39,7 @@ import io.opencensus.trace.SpanContext; import io.opencensus.trace.Status; import io.opencensus.trace.Tracer; +import io.opencensus.trace.Tracing; import io.opencensus.trace.propagation.BinaryFormat; import io.opencensus.trace.unsafe.ContextUtils; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; @@ -92,6 +93,10 @@ final class CensusTracingModule { private final TracingClientInterceptor clientInterceptor = new TracingClientInterceptor(); private final ServerTracerFactory serverTracerFactory = new ServerTracerFactory(); + CensusTracingModule() { + this(Tracing.getTracer(), Tracing.getPropagationComponent().getBinaryFormat()); + } + CensusTracingModule( Tracer censusTracer, final BinaryFormat censusPropagationBinaryFormat) { this.censusTracer = checkNotNull(censusTracer, "censusTracer"); diff --git a/census/src/main/java/io/grpc/census/InternalCensusStatsAccessor.java b/census/src/main/java/io/grpc/census/InternalCensusStatsAccessor.java index 96be3258dff..6f58f5f28eb 100644 --- a/census/src/main/java/io/grpc/census/InternalCensusStatsAccessor.java +++ b/census/src/main/java/io/grpc/census/InternalCensusStatsAccessor.java @@ -18,8 +18,13 @@ import com.google.common.base.Stopwatch; import com.google.common.base.Supplier; +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; import io.grpc.ClientInterceptor; import io.grpc.Internal; +import io.grpc.InternalCensus; +import io.grpc.MethodDescriptor; import io.grpc.ServerStreamTracer; import io.opencensus.stats.StatsRecorder; import io.opencensus.tags.Tagger; @@ -32,13 +37,6 @@ @Internal public final class InternalCensusStatsAccessor { - private static final Supplier STOPWATCH_SUPPLIER = new Supplier() { - @Override - public Stopwatch get() { - return Stopwatch.createUnstarted(); - } - }; - // Prevent instantiation. private InternalCensusStatsAccessor() { } @@ -51,13 +49,8 @@ public static ClientInterceptor getClientInterceptor( boolean recordFinishedRpcs, boolean recordRealTimeMetrics) { CensusStatsModule censusStats = - new CensusStatsModule( - STOPWATCH_SUPPLIER, - true, /* propagateTags */ - recordStartedRpcs, - recordFinishedRpcs, - recordRealTimeMetrics); - return censusStats.getClientInterceptor(); + new CensusStatsModule(recordStartedRpcs, recordFinishedRpcs, recordRealTimeMetrics); + return getClientInterceptor(censusStats); } /** @@ -76,7 +69,22 @@ public static ClientInterceptor getClientInterceptor( new CensusStatsModule( tagger, tagCtxSerializer, statsRecorder, stopwatchSupplier, propagateTags, recordStartedRpcs, recordFinishedRpcs, recordRealTimeMetrics); - return censusStats.getClientInterceptor(); + return getClientInterceptor(censusStats); + } + + private static ClientInterceptor getClientInterceptor(CensusStatsModule module) { + final ClientInterceptor interceptor = module.getClientInterceptor(); + return new ClientInterceptor() { + @Override + public ClientCall interceptCall( + MethodDescriptor method, CallOptions callOptions, Channel next) { + if (callOptions.getOption( + InternalCensus.DISABLE_CLIENT_DEFAULT_CENSUS) != null) { + return next.newCall(method, callOptions); + } + return interceptor.interceptCall(method, callOptions, next); + } + }; } /** @@ -87,12 +95,7 @@ public static ServerStreamTracer.Factory getServerStreamTracerFactory( boolean recordFinishedRpcs, boolean recordRealTimeMetrics) { CensusStatsModule censusStats = - new CensusStatsModule( - STOPWATCH_SUPPLIER, - true, /* propagateTags */ - recordStartedRpcs, - recordFinishedRpcs, - recordRealTimeMetrics); + new CensusStatsModule(recordStartedRpcs, recordFinishedRpcs, recordRealTimeMetrics); return censusStats.getServerTracerFactory(); } diff --git a/census/src/main/java/io/grpc/census/InternalCensusTracingAccessor.java b/census/src/main/java/io/grpc/census/InternalCensusTracingAccessor.java index 2df6c5fb4bd..9462ece487b 100644 --- a/census/src/main/java/io/grpc/census/InternalCensusTracingAccessor.java +++ b/census/src/main/java/io/grpc/census/InternalCensusTracingAccessor.java @@ -16,10 +16,14 @@ package io.grpc.census; +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; import io.grpc.ClientInterceptor; import io.grpc.Internal; +import io.grpc.InternalCensus; +import io.grpc.MethodDescriptor; import io.grpc.ServerStreamTracer; -import io.opencensus.trace.Tracing; /** * Accessor for getting {@link ClientInterceptor} or {@link ServerStreamTracer.Factory} with @@ -37,10 +41,19 @@ private InternalCensusTracingAccessor() { */ public static ClientInterceptor getClientInterceptor() { CensusTracingModule censusTracing = - new CensusTracingModule( - Tracing.getTracer(), - Tracing.getPropagationComponent().getBinaryFormat()); - return censusTracing.getClientInterceptor(); + new CensusTracingModule(); + final ClientInterceptor interceptor = censusTracing.getClientInterceptor(); + return new ClientInterceptor() { + @Override + public ClientCall interceptCall( + MethodDescriptor method, CallOptions callOptions, Channel next) { + if (callOptions.getOption( + InternalCensus.DISABLE_CLIENT_DEFAULT_CENSUS) != null) { + return next.newCall(method, callOptions); + } + return interceptor.interceptCall(method, callOptions, next); + } + }; } /** @@ -48,9 +61,7 @@ public static ClientInterceptor getClientInterceptor() { */ public static ServerStreamTracer.Factory getServerStreamTracerFactory() { CensusTracingModule censusTracing = - new CensusTracingModule( - Tracing.getTracer(), - Tracing.getPropagationComponent().getBinaryFormat()); + new CensusTracingModule(); return censusTracing.getServerTracerFactory(); } } diff --git a/census/src/test/java/io/grpc/census/CensusClientInterceptorTest.java b/census/src/test/java/io/grpc/census/CensusClientInterceptorTest.java new file mode 100644 index 00000000000..6a11d285809 --- /dev/null +++ b/census/src/test/java/io/grpc/census/CensusClientInterceptorTest.java @@ -0,0 +1,162 @@ +/* + * Copyright 2020 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.census; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.common.collect.Iterables; +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.ClientInterceptor; +import io.grpc.ClientInterceptors; +import io.grpc.ManagedChannel; +import io.grpc.MethodDescriptor; +import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.inprocess.InternalInProcess; +import io.grpc.testing.GrpcCleanupRule; +import java.io.InputStream; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Test for {@link CensusClientInterceptor}. + */ +@RunWith(JUnit4.class) +public class CensusClientInterceptorTest { + + @Rule + public final GrpcCleanupRule grpcCleanupRule = new GrpcCleanupRule(); + + private static final CallOptions.Key CUSTOM_OPTION = + CallOptions.Key.createWithDefault("option", "default"); + private static final CallOptions CALL_OPTIONS = + CallOptions.DEFAULT.withOption(CUSTOM_OPTION, "customvalue"); + + private static class StringInputStream extends InputStream { + final String string; + + StringInputStream(String string) { + this.string = string; + } + + @Override + public int read() { + // InProcessTransport doesn't actually read bytes from the InputStream. The InputStream is + // passed to the InProcess server and consumed by MARSHALLER.parse(). + throw new UnsupportedOperationException("Should not be called"); + } + } + + private static final MethodDescriptor.Marshaller MARSHALLER = + new MethodDescriptor.Marshaller() { + @Override + public InputStream stream(String value) { + return new StringInputStream(value); + } + + @Override + public String parse(InputStream stream) { + return ((StringInputStream) stream).string; + } + }; + + private final MethodDescriptor method = + MethodDescriptor.newBuilder() + .setType(MethodDescriptor.MethodType.UNKNOWN) + .setRequestMarshaller(MARSHALLER) + .setResponseMarshaller(MARSHALLER) + .setFullMethodName("package1.service2/method3") + .build(); + + private final AtomicReference callOptionsCaptor = new AtomicReference<>(); + + private ManagedChannel channel; + + + @SuppressWarnings("unchecked") + @Before + public void setUp() { + ClientInterceptor callOptionCaptureInterceptor = new ClientInterceptor() { + @Override + public ClientCall interceptCall( + MethodDescriptor method, CallOptions callOptions, Channel next) { + callOptionsCaptor.set(callOptions); + return next.newCall(method, callOptions); + } + }; + InProcessChannelBuilder builder = + InProcessChannelBuilder.forName("non-existing server").directExecutor(); + InternalInProcess.setTestInterceptor(builder, callOptionCaptureInterceptor); + channel = grpcCleanupRule.register(builder.build()); + } + + @Test + public void usingCensusInterceptorDisablesDefaultCensus() { + ClientInterceptor interceptor = + CensusClientInterceptor.newBuilder().build(); + Channel interceptedChannel = ClientInterceptors.intercept(channel, interceptor); + interceptedChannel.newCall(method, CALL_OPTIONS); + assertThat(callOptionsCaptor.get().getStreamTracerFactories()).isEmpty(); + } + + @Test + public void customStatsConfig() { + ClientInterceptor interceptor = + CensusClientInterceptor.newBuilder() + .setStatsEnabled(true) + .setRecordStartedRpcs(false) + .setRecordFinishedRpcs(false) + .setRecordRealTimeMetrics(true) + .build(); + Channel interceptedChannel = ClientInterceptors.intercept(channel, interceptor); + interceptedChannel.newCall(method, CALL_OPTIONS); + CensusStatsModule.ClientCallTracer callTracer = + (CensusStatsModule.ClientCallTracer) Iterables.getOnlyElement( + callOptionsCaptor.get().getStreamTracerFactories()); + assertThat(callTracer.module.recordStartedRpcs).isEqualTo(false); + assertThat(callTracer.module.recordFinishedRpcs).isEqualTo(false); + assertThat(callTracer.module.recordRealTimeMetrics).isEqualTo(true); + } + + @Test + public void onlyEnableTracing() { + ClientInterceptor interceptor = + CensusClientInterceptor.newBuilder().setTracingEnabled(true).build(); + Channel interceptedChannel = ClientInterceptors.intercept(channel, interceptor); + interceptedChannel.newCall(method, CALL_OPTIONS); + assertThat(Iterables.getOnlyElement(callOptionsCaptor.get().getStreamTracerFactories())) + .isInstanceOf(CensusTracingModule.ClientCallTracer.class); + } + + @Test + public void enableStatsAndTracing() { + ClientInterceptor interceptor = + CensusClientInterceptor.newBuilder().setStatsEnabled(true).setTracingEnabled(true).build(); + Channel interceptedChannel = ClientInterceptors.intercept(channel, interceptor); + interceptedChannel.newCall(method, CALL_OPTIONS); + assertThat(callOptionsCaptor.get().getStreamTracerFactories()).hasSize(2); + assertThat(callOptionsCaptor.get().getStreamTracerFactories().get(0)) + .isInstanceOf(CensusTracingModule.ClientCallTracer.class); + assertThat(callOptionsCaptor.get().getStreamTracerFactories().get(1)) + .isInstanceOf(CensusStatsModule.ClientCallTracer.class); + } +} diff --git a/core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java b/core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java index 3546e33357c..8f447929e43 100644 --- a/core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java +++ b/core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java @@ -20,6 +20,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import io.grpc.ChannelLogger; +import io.grpc.ClientInterceptor; import io.grpc.ExperimentalApi; import io.grpc.Internal; import io.grpc.internal.AbstractManagedChannelImplBuilder; @@ -184,6 +185,11 @@ protected ClientTransportFactory buildTransportFactory() { name, scheduledExecutorService, maxInboundMetadataSize, transportIncludeStatusCause); } + @Override + protected InProcessChannelBuilder setTestInterceptor(ClientInterceptor interceptor) { + return super.setTestInterceptor(interceptor); + } + /** * Creates InProcess transports. Exposed for internal use, as it should be private. */ diff --git a/core/src/main/java/io/grpc/inprocess/InternalInProcess.java b/core/src/main/java/io/grpc/inprocess/InternalInProcess.java index 021b07a80bc..a13fbe0eec3 100644 --- a/core/src/main/java/io/grpc/inprocess/InternalInProcess.java +++ b/core/src/main/java/io/grpc/inprocess/InternalInProcess.java @@ -17,6 +17,7 @@ package io.grpc.inprocess; import io.grpc.Attributes; +import io.grpc.ClientInterceptor; import io.grpc.Internal; import io.grpc.ServerStreamTracer; import io.grpc.internal.ConnectionClientTransport; @@ -62,4 +63,14 @@ public static ConnectionClientTransport createInProcessTransport( serverStreamTracerFactories, serverListener); } + + /** + * Adds a {@link ClientInterceptor} that is closest to the network to test some internal + * features. + */ + @Internal + public static void setTestInterceptor( + InProcessChannelBuilder builder, ClientInterceptor interceptor) { + builder.setTestInterceptor(interceptor); + } } diff --git a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java index cf47a4d9f92..c7ccf712e26 100644 --- a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java +++ b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java @@ -17,6 +17,7 @@ package io.grpc.internal; import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -180,6 +181,8 @@ protected final int maxInboundMessageSize() { private boolean recordFinishedRpcs = true; private boolean recordRealTimeMetrics = false; private boolean tracingEnabled = true; + @Nullable + private ClientInterceptor testInterceptor; protected AbstractManagedChannelImplBuilder(String target) { this.target = Preconditions.checkNotNull(target, "target"); @@ -584,9 +587,21 @@ final List getEffectiveInterceptors() { effectiveInterceptors.add(0, tracingInterceptor); } } + if (testInterceptor != null) { + effectiveInterceptors.add(0, testInterceptor); + } return effectiveInterceptors; } + /** + * Adds a {@link ClientInterceptor} that is closest to the network to test some internal + * features. + */ + protected T setTestInterceptor(ClientInterceptor interceptor) { + testInterceptor = checkNotNull(interceptor, "interceptor"); + return thisT(); + } + /** * Subclasses should override this method to provide the {@link ClientTransportFactory} * appropriate for this channel. This method is meant for Transport implementors and should not diff --git a/core/src/main/java/io/grpc/internal/ClientCallImpl.java b/core/src/main/java/io/grpc/internal/ClientCallImpl.java index 436b1f41e33..b119250c798 100644 --- a/core/src/main/java/io/grpc/internal/ClientCallImpl.java +++ b/core/src/main/java/io/grpc/internal/ClientCallImpl.java @@ -40,6 +40,7 @@ import io.grpc.Context.CancellationListener; import io.grpc.Deadline; import io.grpc.DecompressorRegistry; +import io.grpc.InternalCensus; import io.grpc.InternalDecompressorRegistry; import io.grpc.LoadBalancer.PickSubchannelArgs; import io.grpc.Metadata; @@ -121,7 +122,8 @@ final class ClientCallImpl extends ClientCall { this.callOptions = callOptions; this.clientTransportProvider = clientTransportProvider; this.deadlineCancellationExecutor = deadlineCancellationExecutor; - this.retryEnabled = retryEnabled; + this.retryEnabled = retryEnabled + && callOptions.getOption(InternalCensus.DISABLE_CLIENT_DEFAULT_CENSUS) == null; PerfMark.event("ClientCall.", tag); } diff --git a/core/src/test/java/io/grpc/internal/AbstractManagedChannelImplBuilderTest.java b/core/src/test/java/io/grpc/internal/AbstractManagedChannelImplBuilderTest.java index f7f79f0b5fe..97b121e5ebe 100644 --- a/core/src/test/java/io/grpc/internal/AbstractManagedChannelImplBuilderTest.java +++ b/core/src/test/java/io/grpc/internal/AbstractManagedChannelImplBuilderTest.java @@ -31,9 +31,12 @@ import io.grpc.Channel; import io.grpc.ClientCall; import io.grpc.ClientInterceptor; +import io.grpc.ClientInterceptors; +import io.grpc.ClientStreamTracer; import io.grpc.CompressorRegistry; import io.grpc.DecompressorRegistry; import io.grpc.MethodDescriptor; +import io.grpc.MethodDescriptor.Marshaller; import io.grpc.NameResolver; import java.net.InetSocketAddress; import java.net.SocketAddress; @@ -45,6 +48,7 @@ import java.util.Map; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -69,6 +73,30 @@ public ClientCall interceptCall( private Builder builder = new Builder("fake"); private Builder directAddressBuilder = new Builder(new SocketAddress(){}, "fake"); + private AtomicReference callOptionsCaptor = new AtomicReference<>(); + @SuppressWarnings("unchecked") + private MethodDescriptor method = + MethodDescriptor.newBuilder() + .setType(MethodDescriptor.MethodType.UNKNOWN) + .setFullMethodName("a.service/method") + .setRequestMarshaller(mock(Marshaller.class)) + .setResponseMarshaller(mock(Marshaller.class)) + .build(); + + @SuppressWarnings("unchecked") + private Channel fakeChannel = new Channel() { + @Override + public ClientCall newCall( + MethodDescriptor methodDescriptor, CallOptions callOptions) { + callOptionsCaptor.set(callOptions); + return mock(ClientCall.class); + } + + @Override + public String authority() { + throw new UnsupportedOperationException("should not be called"); + } + }; @Test public void executor_default() { @@ -276,46 +304,81 @@ public void makeTargetStringForDirectAddress_scopedIpv6() throws Exception { } @Test - public void getEffectiveInterceptors_default() { + public void censusFeatures_default() { builder.intercept(DUMMY_USER_INTERCEPTOR); List effectiveInterceptors = builder.getEffectiveInterceptors(); assertEquals(3, effectiveInterceptors.size()); - assertThat(effectiveInterceptors.get(0).getClass().getName()) - .isEqualTo("io.grpc.census.CensusTracingModule$TracingClientInterceptor"); - assertThat(effectiveInterceptors.get(1).getClass().getName()) - .isEqualTo("io.grpc.census.CensusStatsModule$StatsClientInterceptor"); assertThat(effectiveInterceptors.get(2)).isSameInstanceAs(DUMMY_USER_INTERCEPTOR); + Channel channel = ClientInterceptors.intercept(fakeChannel, effectiveInterceptors); + channel.newCall(method, CallOptions.DEFAULT); + List callTracers = + callOptionsCaptor.get().getStreamTracerFactories(); + assertThat(callTracers).hasSize(2); + assertThat(callTracers.get(0).getClass().getName()) + .isEqualTo("io.grpc.census.CensusStatsModule$ClientCallTracer"); + assertThat(callTracers.get(1).getClass().getName()) + .isEqualTo("io.grpc.census.CensusTracingModule$ClientCallTracer"); } @Test - public void getEffectiveInterceptors_disableStats() { + public void censusFeatures_disableStats() { builder.intercept(DUMMY_USER_INTERCEPTOR); builder.setStatsEnabled(false); List effectiveInterceptors = builder.getEffectiveInterceptors(); assertEquals(2, effectiveInterceptors.size()); - assertThat(effectiveInterceptors.get(0).getClass().getName()) - .isEqualTo("io.grpc.census.CensusTracingModule$TracingClientInterceptor"); assertThat(effectiveInterceptors.get(1)).isSameInstanceAs(DUMMY_USER_INTERCEPTOR); + Channel channel = ClientInterceptors.intercept(fakeChannel, effectiveInterceptors); + channel.newCall(method, CallOptions.DEFAULT); + List callTracers = + callOptionsCaptor.get().getStreamTracerFactories(); + assertThat(callTracers).hasSize(1); + assertThat(callTracers.get(0).getClass().getName()) + .isEqualTo("io.grpc.census.CensusTracingModule$ClientCallTracer"); } @Test - public void getEffectiveInterceptors_disableTracing() { + public void censusFeatures_disableTracing() { builder.intercept(DUMMY_USER_INTERCEPTOR); builder.setTracingEnabled(false); List effectiveInterceptors = builder.getEffectiveInterceptors(); assertEquals(2, effectiveInterceptors.size()); - assertThat(effectiveInterceptors.get(0).getClass().getName()) - .isEqualTo("io.grpc.census.CensusStatsModule$StatsClientInterceptor"); assertThat(effectiveInterceptors.get(1)).isSameInstanceAs(DUMMY_USER_INTERCEPTOR); + Channel channel = ClientInterceptors.intercept(fakeChannel, effectiveInterceptors); + channel.newCall(method, CallOptions.DEFAULT); + List callTracers = + callOptionsCaptor.get().getStreamTracerFactories(); + assertThat(callTracers).hasSize(1); + assertThat(callTracers.get(0).getClass().getName()) + .isEqualTo("io.grpc.census.CensusStatsModule$ClientCallTracer"); + } @Test - public void getEffectiveInterceptors_disableBoth() { + public void censusFeatures_disableBoth() { builder.intercept(DUMMY_USER_INTERCEPTOR); builder.setStatsEnabled(false); builder.setTracingEnabled(false); List effectiveInterceptors = builder.getEffectiveInterceptors(); assertThat(effectiveInterceptors).containsExactly(DUMMY_USER_INTERCEPTOR); + Channel channel = ClientInterceptors.intercept(fakeChannel, effectiveInterceptors); + channel.newCall(method, CallOptions.DEFAULT); + assertThat(callOptionsCaptor.get().getStreamTracerFactories()).isEmpty(); + } + + @Test + public void getEffectiveInterceptors_testInterceptorAlwaysFirst() { + ClientInterceptor interceptor = new ClientInterceptor() { + @Override + public ClientCall interceptCall( + MethodDescriptor method, CallOptions callOptions, Channel next) { + return next.newCall(method, callOptions); + } + }; + builder.intercept(DUMMY_USER_INTERCEPTOR); + builder.setTestInterceptor(interceptor); + List effectiveInterceptors = builder.getEffectiveInterceptors(); + assertThat(effectiveInterceptors).hasSize(4); + assertThat(effectiveInterceptors.get(0)).isSameInstanceAs(interceptor); } @Test