Skip to content

Commit 4897c5c

Browse files
can-anyscalegemini-code-assist[bot]
authored andcommitted
[core] fix get_metric_check_condition tests (ray-project#58598)
Fix `get_metric_check_condition` to use `fetch_prometheus_timeseries`, which is a non-flaky version of `fetch_prometheus`. Update all of test usage accordingly. Test: - CI --------- Signed-off-by: Cuong Nguyen <can@anyscale.com> Signed-off-by: Cuong Nguyen <128072568+can-anyscale@users.noreply.github.com> Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
1 parent eac7ebe commit 4897c5c

File tree

4 files changed

+41
-23
lines changed

4 files changed

+41
-23
lines changed

python/ray/_private/test_utils.py

Lines changed: 26 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -654,8 +654,29 @@ def matches(self, sample: Sample):
654654
return True
655655

656656

657+
@dataclass
658+
class PrometheusTimeseries:
659+
"""A collection of timeseries from multiple addresses. Each timeseries is a
660+
collection of samples with the same metric name and labels. Concretely:
661+
- components_dict: a dictionary of addresses to the Component labels
662+
- metric_descriptors: a dictionary of metric names to the Metric object
663+
- metric_samples: the latest value of each label
664+
"""
665+
666+
components_dict: Dict[str, Set[str]] = field(default_factory=dict)
667+
metric_descriptors: Dict[str, Metric] = field(default_factory=dict)
668+
metric_samples: Dict[frozenset, Sample] = field(default_factory=dict)
669+
670+
def flush(self):
671+
self.components_dict.clear()
672+
self.metric_descriptors.clear()
673+
self.metric_samples.clear()
674+
675+
657676
def get_metric_check_condition(
658-
metrics_to_check: List[MetricSamplePattern], export_addr: Optional[str] = None
677+
metrics_to_check: List[MetricSamplePattern],
678+
timeseries: PrometheusTimeseries,
679+
export_addr: Optional[str] = None,
659680
) -> Callable[[], bool]:
660681
"""A condition to check if a prometheus metrics reach a certain value.
661682
@@ -665,6 +686,7 @@ def get_metric_check_condition(
665686
Args:
666687
metrics_to_check: A list of MetricSamplePattern. The fields that
667688
aren't `None` will be matched.
689+
timeseries: A PrometheusTimeseries object to store the metrics.
668690
export_addr: Optional address to export metrics to.
669691
670692
Returns:
@@ -677,7 +699,9 @@ def get_metric_check_condition(
677699

678700
def f():
679701
for metric_pattern in metrics_to_check:
680-
_, _, metric_samples = fetch_prometheus([prom_addr])
702+
metric_samples = fetch_prometheus_timeseries(
703+
[prom_addr], timeseries
704+
).metric_samples.values()
681705
for metric_sample in metric_samples:
682706
if metric_pattern.matches(metric_sample):
683707
break
@@ -993,25 +1017,6 @@ def fetch_prometheus(prom_addresses):
9931017
return components_dict, metric_descriptors, metric_samples
9941018

9951019

996-
@dataclass
997-
class PrometheusTimeseries:
998-
"""A collection of timeseries from multiple addresses. Each timeseries is a
999-
collection of samples with the same metric name and labels. Concretely:
1000-
- components_dict: a dictionary of addresses to the Component labels
1001-
- metric_descriptors: a dictionary of metric names to the Metric object
1002-
- metric_samples: the latest value of each label
1003-
"""
1004-
1005-
components_dict: Dict[str, Set[str]] = field(default_factory=defaultdict)
1006-
metric_descriptors: Dict[str, Metric] = field(default_factory=defaultdict)
1007-
metric_samples: Dict[frozenset, Sample] = field(default_factory=defaultdict)
1008-
1009-
def flush(self):
1010-
self.components_dict.clear()
1011-
self.metric_descriptors.clear()
1012-
self.metric_samples.clear()
1013-
1014-
10151020
def fetch_prometheus_timeseries(
10161021
prom_addreses: List[str],
10171022
result: PrometheusTimeseries,

python/ray/tests/test_autoscaler_e2e.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from ray._common.test_utils import SignalActor, wait_for_condition
99
from ray._private.test_utils import (
1010
MetricSamplePattern,
11+
PrometheusTimeseries,
1112
get_metric_check_condition,
1213
)
1314
from ray.autoscaler._private.constants import AUTOSCALER_METRIC_PORT
@@ -174,6 +175,7 @@ class Foo:
174175
def ping(self):
175176
return True
176177

178+
timeseries = PrometheusTimeseries()
177179
zero_reported_condition = get_metric_check_condition(
178180
[
179181
MetricSamplePattern(
@@ -199,6 +201,7 @@ def ping(self):
199201
partial_label_match={"NodeType": "ray.head.default"},
200202
),
201203
],
204+
timeseries,
202205
export_addr=autoscaler_export_addr,
203206
)
204207
wait_for_condition(zero_reported_condition)
@@ -239,6 +242,7 @@ def ping(self):
239242
partial_label_match={"NodeType": "ray.head.default"},
240243
),
241244
],
245+
timeseries,
242246
export_addr=autoscaler_export_addr,
243247
)
244248
wait_for_condition(two_cpu_no_pending_condition)

python/ray/tests/test_scheduling.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
from ray._private.internal_api import memory_summary
1717
from ray._private.test_utils import (
1818
MetricSamplePattern,
19+
PrometheusTimeseries,
1920
get_metric_check_condition,
2021
object_memory_usage,
2122
)
@@ -673,13 +674,18 @@ def start_infeasible(n):
673674
# longer timeout is necessary to pass on windows debug/asan builds.
674675
timeout = 180
675676

677+
timeseries = PrometheusTimeseries()
676678
wait_for_condition(
677-
get_metric_check_condition([MetricSamplePattern(name=metric_name, value=2)]),
679+
get_metric_check_condition(
680+
[MetricSamplePattern(name=metric_name, value=2)], timeseries
681+
),
678682
timeout=timeout,
679683
)
680684
start_infeasible.remote(2)
681685
wait_for_condition(
682-
get_metric_check_condition([MetricSamplePattern(name=metric_name, value=3)]),
686+
get_metric_check_condition(
687+
[MetricSamplePattern(name=metric_name, value=3)], timeseries
688+
),
683689
timeout=timeout,
684690
)
685691

python/ray/tests/test_scheduling_2.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from ray._common.test_utils import SignalActor, wait_for_condition
1313
from ray._private.test_utils import (
1414
MetricSamplePattern,
15+
PrometheusTimeseries,
1516
get_metric_check_condition,
1617
make_global_state_accessor,
1718
)
@@ -784,6 +785,7 @@ def ready(self):
784785
pg = placement_group(bundles=[{"CPU": 1}], strategy="SPREAD")
785786
ray.get(pg.ready())
786787

788+
timeseries = PrometheusTimeseries()
787789
placement_metric_condition = get_metric_check_condition(
788790
[
789791
MetricSamplePattern(
@@ -802,6 +804,7 @@ def ready(self):
802804
partial_label_match={"WorkloadType": "PlacementGroup"},
803805
),
804806
],
807+
timeseries,
805808
)
806809
wait_for_condition(placement_metric_condition, timeout=60)
807810

0 commit comments

Comments
 (0)