Skip to content

Make View maintain ViewData references, bug fixes #8

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jul 31, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions docs/examples/basic_meter/view.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,31 +60,32 @@
# dropped from the aggregation
counter_view1 = View(
requests_counter,
SumAggregator(),
SumAggregator,
label_keys=["environment"],
config=ViewConfig.LABEL_KEYS,
view_config=ViewConfig.LABEL_KEYS,
)
counter_view2 = View(
requests_counter,
MinMaxSumCountAggregator(),
MinMaxSumCountAggregator,
label_keys=["os_type"],
config=ViewConfig.LABEL_KEYS,
view_config=ViewConfig.LABEL_KEYS,
)
# This view has ViewConfig set to UNGROUPED, meaning all recorded metrics take
# the labels directly without and consideration for label_keys
counter_view3 = View(
requests_counter,
LastValueAggregator(),
LastValueAggregator,
label_keys=["environment"], # is not used due to ViewConfig.UNGROUPED
config=ViewConfig.UNGROUPED,
view_config=ViewConfig.UNGROUPED,
)
# This view uses the HistogramAggregator which accepts an option config
# parameter to specify the bucket ranges
size_view = View(
requests_size,
HistogramAggregator(config=[20, 40, 60, 80, 100]),
HistogramAggregator,
label_keys=["environment"], # is not used due to ViewConfig.UNGROUPED
config=ViewConfig.UNGROUPED,
aggregator_config={"bounds": [20, 40, 60, 80, 100]},
view_config=ViewConfig.UNGROUPED,
)

# Register the views to the view manager to use the views. Views MUST be
Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def __init__(
):
self._labels = labels
self._metric = metric
self.view_datas = metric.meter.view_manager.generate_view_datas(
self.view_datas = metric.meter.view_manager.get_view_datas(
metric, labels
)
self._view_datas_lock = threading.Lock()
Expand Down Expand Up @@ -262,7 +262,7 @@ def observe(

if key not in self.aggregators:
# TODO: how to cleanup aggregators?
self.aggregators[key] = get_default_aggregator(self)
self.aggregators[key] = get_default_aggregator(self)()
aggregator = self.aggregators[key]
aggregator.update(value)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ class Aggregator(abc.ABC):
def __init__(self, config=None):
self.current = None
self.checkpoint = None
self.config = config
if config:
self.config = config
else:
self.config = {}

@abc.abstractmethod
def update(self, value):
Expand Down Expand Up @@ -138,13 +141,16 @@ def __init__(self, config=None):
super().__init__(config=config)
self._lock = threading.Lock()
self.last_update_timestamp = None
boundaries = self.config
boundaries = self.config.get("bounds")
if boundaries and self._validate_boundaries(boundaries):
self._boundaries = boundaries
else:
self._boundaries = (10, 20, 30, 40, 50, 60, 70, 80, 90, 100)
# no buckets except < 0 and >
self._boundaries = (0,)

self.current = OrderedDict([(bb, 0) for bb in self._boundaries])
self.checkpoint = OrderedDict([(bb, 0) for bb in self._boundaries])

self.current[">"] = 0
self.checkpoint[">"] = 0

Expand Down
23 changes: 16 additions & 7 deletions opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/batcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from typing import Sequence

from opentelemetry.sdk.metrics.export import MetricRecord
from opentelemetry.sdk.util import get_dict_as_key


class Batcher:
Expand Down Expand Up @@ -42,7 +43,7 @@ def checkpoint_set(self) -> Sequence[MetricRecord]:
metric_records = []
# pylint: disable=W0612
for (
(instrument, aggregator_type, labels),
(instrument, aggregator_type, _, labels),
aggregator,
) in self._batch_map.items():
metric_records.append(MetricRecord(instrument, labels, aggregator))
Expand All @@ -60,17 +61,25 @@ def process(self, record) -> None:
"""Stores record information to be ready for exporting."""
# Checkpoints the current aggregator value to be collected for export
aggregator = record.aggregator
aggregator.take_checkpoint()

# The uniqueness of a batch record is defined by a specific metric
# using an aggregator type with a specific set of labels.
key = (record.instrument, aggregator.__class__, record.labels)
# If two aggregators are the same but with different configs, they are still two valid unique records
# (for example, two histogram views with different buckets)
key = (
record.instrument,
aggregator.__class__,
get_dict_as_key(aggregator.config),
record.labels,
)
batch_value = self._batch_map.get(key)
if batch_value:
# Update the stored checkpointed value if exists. The call to merge
# here combines only identical records (same key).
batch_value.merge(aggregator)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this case was for the race condition when multiple process occurs at the same time. I don't think you can just call take_checkpoint() because the timestamps would not be correct.

Copy link
Author

@cnnradams cnnradams Jul 31, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I understand the reason this existed is because if you have multiple boundinstruments which both own ViewDatas with the same key, we need to merge the ViewDatas. However now since there is guaranteed to be only 1 viewdata per key we don't need to merge them anymore

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this logic existed even before views existed, so it must have been for something else.

Copy link
Author

@cnnradams cnnradams Jul 31, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure then. if I add an assert statement assert batch_value == aggregator, all tests will pass, which makes me think that atleast at our testing amount this has no other effect (edit: let me actually verify this.. some tests are failing for other reasons).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

return
if batch_value != aggregator:
aggregator.take_checkpoint()
batch_value.merge(aggregator)
else:
aggregator.take_checkpoint()

if self.stateful:
# if stateful batcher, create a copy of the aggregator and update
# it with the current checkpointed value for long-term storage
Expand Down
92 changes: 58 additions & 34 deletions opentelemetry-sdk/src/opentelemetry/sdk/metrics/view.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import logging
import threading
from collections import defaultdict
from typing import Sequence, Tuple
from typing import Optional, Sequence, Tuple

from opentelemetry.metrics import (
Counter,
Expand Down Expand Up @@ -68,36 +68,72 @@ class View:
def __init__(
self,
metric: InstrumentT,
aggregator: Aggregator,
label_keys: Sequence[str] = None,
config: ViewConfig = ViewConfig.UNGROUPED,
aggregator: type,
aggregator_config: Optional[dict] = None,
label_keys: Optional[Sequence[str]] = None,
view_config: ViewConfig = ViewConfig.UNGROUPED,
):
self.metric = metric
self.aggregator = aggregator
if aggregator_config is None:
aggregator_config = {}
self.aggregator_config = aggregator_config
if label_keys is None:
label_keys = []
self.label_keys = sorted(label_keys)
self.config = config
self.view_config = view_config
self.view_datas = set()

def get_view_data(self, labels):
"""Find an existing ViewData for this set of labels. If that ViewData
does not exist, create a new one to represent the labels
"""
active_labels = []
if self.view_config == ViewConfig.LABEL_KEYS:
# reduce the set of labels to only labels specified in label_keys
active_labels = {
(lk, lv) for lk, lv in labels if lk in set(self.label_keys)
}
active_labels = tuple(active_labels)
elif self.view_config == ViewConfig.UNGROUPED:
active_labels = labels

for view_data in self.view_datas:
if view_data.labels == active_labels:
return view_data
new_view_data = ViewData(
active_labels, self.aggregator(self.aggregator_config)
)
self.view_datas.add(new_view_data)
return new_view_data

# Uniqueness is based on metric, aggregator type, ordered label keys and ViewConfig
# Uniqueness is based on metric, aggregator type, aggregator config, ordered label keys and ViewConfig
def __hash__(self):
return hash(
(self.metric, self.aggregator, tuple(self.label_keys), self.config)
(
self.metric,
self.aggregator,
tuple(self.label_keys),
tuple(self.aggregator_config),
self.view_config,
)
)

def __eq__(self, other):
return (
self.metric == other.metric
and self.aggregator.__class__ == other.aggregator.__class__
and self.label_keys == other.label_keys
and self.config == other.config
and self.aggregator_config == other.aggregator_config
and self.view_config == other.view_config
)


class ViewManager:
def __init__(self):
self.views = defaultdict(set) # Map[Metric, Set]
self._view_lock = threading.Lock()
self.view_datas = set()

def register_view(self, view):
with self._view_lock:
Expand All @@ -114,31 +150,19 @@ def unregister_view(self, view):
elif view in self.views.get(view.metric):
self.views.get(view.metric).remove(view)

def generate_view_datas(self, metric, labels):
def get_view_datas(self, metric, labels):
view_datas = set()
views = self.views.get(metric)
# No views configured, use default aggregations
if views is None:
aggregator = get_default_aggregator(metric)
# Default config aggregates on all label keys
view_datas.add(ViewData(tuple(labels), aggregator))
else:
for view in views:
updated_labels = []
if view.config == ViewConfig.LABEL_KEYS:
label_key_set = set(view.label_keys)
for label in labels:
# Only keep labels that are in configured label_keys
if label[0] in label_key_set:
updated_labels.append(label)
updated_labels = tuple(updated_labels)
elif view.config == ViewConfig.UNGROUPED:
updated_labels = labels
# ViewData that is duplicate (same labels and aggregator) will be
# aggregated together as one
view_datas.add(
ViewData(tuple(updated_labels), view.aggregator)
)
# make a default view for the metric
default_view = View(metric, get_default_aggregator(metric))
self.register_view(default_view)
views = [default_view]

for view in views:
view_datas.add(view.get_view_data(labels))

return view_datas


Expand All @@ -150,12 +174,12 @@ def get_default_aggregator(instrument: InstrumentT) -> Aggregator:
# pylint:disable=R0201
instrument_type = instrument.__class__
if issubclass(instrument_type, (Counter, UpDownCounter)):
return SumAggregator()
return SumAggregator
if issubclass(instrument_type, (SumObserver, UpDownSumObserver)):
return LastValueAggregator()
return LastValueAggregator
if issubclass(instrument_type, ValueRecorder):
return MinMaxSumCountAggregator()
return MinMaxSumCountAggregator
if issubclass(instrument_type, ValueObserver):
return ValueObserverAggregator()
return ValueObserverAggregator
logger.warning("No default aggregator configured for: %s", instrument_type)
return SumAggregator()
return SumAggregator
12 changes: 6 additions & 6 deletions opentelemetry-sdk/tests/metrics/export/test_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def test_checkpoint_set(self):
aggregator.update(1.0)
labels = ()
_batch_map = {}
_batch_map[(metric, SumAggregator, labels)] = aggregator
_batch_map[(metric, SumAggregator, tuple(), labels)] = aggregator
batcher._batch_map = _batch_map
records = batcher.checkpoint_set()
self.assertEqual(len(records), 1)
Expand All @@ -95,7 +95,7 @@ def test_finished_collection_stateless(self):
aggregator.update(1.0)
labels = ()
_batch_map = {}
_batch_map[(metric, SumAggregator, labels)] = aggregator
_batch_map[(metric, SumAggregator, tuple(), labels)] = aggregator
batcher._batch_map = _batch_map
batcher.finished_collection()
self.assertEqual(len(batcher._batch_map), 0)
Expand All @@ -110,7 +110,7 @@ def test_finished_collection_stateful(self):
aggregator.update(1.0)
labels = ()
_batch_map = {}
_batch_map[(metric, SumAggregator, labels)] = aggregator
_batch_map[(metric, SumAggregator, tuple(), labels)] = aggregator
batcher._batch_map = _batch_map
batcher.finished_collection()
self.assertEqual(len(batcher._batch_map), 1)
Expand All @@ -125,7 +125,7 @@ def test_batcher_process_exists(self):
)
labels = ()
_batch_map = {}
batch_key = (metric, SumAggregator, labels)
batch_key = (metric, SumAggregator, tuple(), labels)
_batch_map[batch_key] = aggregator
aggregator2.update(1.0)
batcher._batch_map = _batch_map
Expand All @@ -145,7 +145,7 @@ def test_batcher_process_not_exists(self):
)
labels = ()
_batch_map = {}
batch_key = (metric, SumAggregator, labels)
batch_key = (metric, SumAggregator, tuple(), labels)
aggregator.update(1.0)
batcher._batch_map = _batch_map
record = metrics.Record(metric, labels, aggregator)
Expand All @@ -164,7 +164,7 @@ def test_batcher_process_not_stateful(self):
)
labels = ()
_batch_map = {}
batch_key = (metric, SumAggregator, labels)
batch_key = (metric, SumAggregator, tuple(), labels)
aggregator.update(1.0)
batcher._batch_map = _batch_map
record = metrics.Record(metric, labels, aggregator)
Expand Down
Loading