diff --git a/src/Kafka/Kafka.Client/Cfg/StatSettings.cs b/src/Kafka/Kafka.Client/Cfg/StatSettings.cs new file mode 100644 index 0000000..33cc38b --- /dev/null +++ b/src/Kafka/Kafka.Client/Cfg/StatSettings.cs @@ -0,0 +1,11 @@ +namespace Kafka.Client.Cfg +{ + public static class StatSettings + { + public static volatile bool ConsumerStatsEnabled = false; + + public static volatile bool ProducerStatsEnabled = false; + + public static volatile bool FetcherThreadStatsEnabled = false; + } +} diff --git a/src/Kafka/Kafka.Client/Consumers/SimpleConsumer.cs b/src/Kafka/Kafka.Client/Consumers/SimpleConsumer.cs index c35e42e..1f445ab 100644 --- a/src/Kafka/Kafka.Client/Consumers/SimpleConsumer.cs +++ b/src/Kafka/Kafka.Client/Consumers/SimpleConsumer.cs @@ -1,4 +1,6 @@ -namespace Kafka.Client.Consumers +using Kafka.Client.Cfg; + +namespace Kafka.Client.Consumers { using System; using System.Collections.Generic; @@ -137,9 +139,22 @@ public TopicMetadataResponse Send(TopicMetadataRequest request) internal FetchResponse Fetch(FetchRequest request) { Receive response = null; - var specificTimer = this.fetchRequestAndResponseStats.GetFetchRequestAndResponseStats(this.BrokerInfo).RequestTimer; - var aggregateTimer = this.fetchRequestAndResponseStats.GetFetchRequestAndResponseAllBrokersStats().RequestTimer; - aggregateTimer.Time(() => specificTimer.Time(() => { response = this.SendRequest(request); })); + if (StatSettings.ConsumerStatsEnabled) + { + var specificTimer = + this.fetchRequestAndResponseStats.GetFetchRequestAndResponseStats(this.BrokerInfo).RequestTimer; + var aggregateTimer = + this.fetchRequestAndResponseStats.GetFetchRequestAndResponseAllBrokersStats().RequestTimer; + aggregateTimer.Time(() => specificTimer.Time(() => + { + response = this.SendRequest(request); + + })); + } + else + { + response = this.SendRequest(request); + } var fetchResponse = FetchResponse.ReadFrom(response.Buffer); var fetchedSize = fetchResponse.SizeInBytes; diff --git a/src/Kafka/Kafka.Client/Kafka.Client.csproj b/src/Kafka/Kafka.Client/Kafka.Client.csproj index dc17eaa..6839ec7 100644 --- a/src/Kafka/Kafka.Client/Kafka.Client.csproj +++ b/src/Kafka/Kafka.Client/Kafka.Client.csproj @@ -142,6 +142,7 @@ + diff --git a/src/Kafka/Kafka.Client/Producers/SyncProducer.cs b/src/Kafka/Kafka.Client/Producers/SyncProducer.cs index 452b163..58402a6 100644 --- a/src/Kafka/Kafka.Client/Producers/SyncProducer.cs +++ b/src/Kafka/Kafka.Client/Producers/SyncProducer.cs @@ -1,4 +1,6 @@ -namespace Kafka.Client.Producers +using Kafka.Client.Cfg; + +namespace Kafka.Client.Producers { using System; using System.IO; @@ -92,18 +94,26 @@ public Receive DoSend(RequestOrResponse request, bool readResponse = true) public ProducerResponse Send(ProducerRequest producerRequest) { - var requestSize = producerRequest.SizeInBytes; - this.producerRequestStats.GetProducerRequestStats(this.BrokerInfo).RequestSizeHist.Update(requestSize); - this.producerRequestStats.GetProducerRequestAllBrokersStats().RequestSizeHist.Update(requestSize); - Receive response = null; - var specificTimer = this.producerRequestStats.GetProducerRequestStats(this.BrokerInfo).RequestTimer; - var aggregateTimer = this.producerRequestStats.GetProducerRequestAllBrokersStats().RequestTimer; - aggregateTimer.Time(() => specificTimer.Time(() => + if (StatSettings.ProducerStatsEnabled) + { + var requestSize = producerRequest.SizeInBytes; + this.producerRequestStats.GetProducerRequestStats(this.BrokerInfo).RequestSizeHist.Update(requestSize); + this.producerRequestStats.GetProducerRequestAllBrokersStats().RequestSizeHist.Update(requestSize); + + var specificTimer = this.producerRequestStats.GetProducerRequestStats(this.BrokerInfo).RequestTimer; + var aggregateTimer = this.producerRequestStats.GetProducerRequestAllBrokersStats().RequestTimer; + + aggregateTimer.Time(() => specificTimer.Time(() => { response = this.DoSend(producerRequest, producerRequest.RequiredAcks != 0); })); + } + else + { + response = this.DoSend(producerRequest, producerRequest.RequiredAcks != 0); + } if (producerRequest.RequiredAcks != 0) { diff --git a/src/Kafka/Kafka.Client/Server/AbstractFetcherThread.cs b/src/Kafka/Kafka.Client/Server/AbstractFetcherThread.cs index b215733..80103c2 100644 --- a/src/Kafka/Kafka.Client/Server/AbstractFetcherThread.cs +++ b/src/Kafka/Kafka.Client/Server/AbstractFetcherThread.cs @@ -1,4 +1,6 @@ -namespace Kafka.Client.Server +using Kafka.Client.Cfg; + +namespace Kafka.Client.Server { using System; using System.Collections.Generic; @@ -201,7 +203,6 @@ public void ProcessFetchRequest(FetchRequest fetchRequest) try { var messages = (ByteBufferMessageSet)partitionData.Messages; - var validBytes = messages.ValidBytes; var messageAndOffset = messages.ShallowIterator().ToEnumerable().LastOrDefault(); var newOffset = messageAndOffset != null @@ -209,9 +210,16 @@ public void ProcessFetchRequest(FetchRequest fetchRequest) : currentOffset; this.partitionMap[topicAndPartition] = newOffset; - this.FetcherLagStats.GetFetcherLagStats(topic, partitionId).Lag = partitionData.Hw - - newOffset; - this.FetcherStats.ByteRate.Mark(validBytes); + + if (StatSettings.FetcherThreadStatsEnabled) + { + var validBytes = messages.ValidBytes; + + this.FetcherLagStats.GetFetcherLagStats(topic, partitionId).Lag = partitionData.Hw + - + newOffset; + this.FetcherStats.ByteRate.Mark(validBytes); + } // Once we hand off the partition Data to the subclass, we can't mess with it any more in this thread this.ProcessPartitionData(topicAndPartition, currentOffset, partitionData);