Skip to content

Commit 9f4d171

Browse files
committed
add new matric and fix repeated png
1 parent 812be47 commit 9f4d171

5 files changed

Lines changed: 233 additions & 33 deletions

File tree

autotest/config-npu.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ case:
5656
npu-qwen3-sft-ep8:
5757
-
5858
type: sft
59+
phase: first
5960
parameters:
6061
config: autotest/config/npu_qwen3_moe_30BA3_ep8.py
6162
output_path: /mnt/hwfile/llmrazor/qa-llm-cicd/test_output
@@ -80,6 +81,7 @@ case:
8081
timeout: 10800
8182
-
8283
type: sft
84+
phase: resume
8385
pre_action:
8486
command: 'python ./autotest/utils/update_meta.py /mnt/hwfile/llmrazor/qa-llm-cicd/test_output npu-qwen3-sft-ep8 sft'
8587
parameters:

autotest/config.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ case:
5454
qwen3-sft-ep8:
5555
-
5656
type: sft
57+
phase: first
5758
parameters:
5859
config: autotest/config/qwen3_moe_30BA3_ep8.py
5960
output_path: /mnt/shared-storage-user/llmrazor-share/qa-llm-cicd/test_output
@@ -79,6 +80,7 @@ case:
7980
timeout: 1500
8081
-
8182
type: sft
83+
phase: resume
8284
pre_action:
8385
command: 'python ./autotest/utils/update_meta.py /mnt/shared-storage-user/llmrazor-share/qa-llm-cicd/test_output qwen3-sft-ep8 sft'
8486
parameters:
@@ -475,6 +477,7 @@ case:
475477
qwen3-5-sft-sp4-resume:
476478
-
477479
type: sft
480+
phase: first
478481
parameters:
479482
config: autotest/config/qwen3_5_moe_30BA3_sp4.py
480483
output_path: /mnt/shared-storage-user/llmrazor-share/qa-llm-cicd/test_output
@@ -499,6 +502,7 @@ case:
499502

500503
-
501504
type: sft
505+
phase: resume
502506
pre_action:
503507
command: 'python ./autotest/utils/update_meta.py /mnt/shared-storage-user/llmrazor-share/qa-llm-cicd/test_output qwen3-5-sft-sp4-resume sft'
504508
parameters:
@@ -608,6 +612,7 @@ case:
608612
qwen3-5-sft-sp4-resume-vl:
609613
-
610614
type: sft
615+
phase: first
611616
parameters:
612617
config: autotest/config/qwen3_5_moe_30BA3_sp4_vl.py
613618
output_path: /mnt/shared-storage-user/llmrazor-share/qa-llm-cicd/test_output
@@ -634,6 +639,7 @@ case:
634639

635640
-
636641
type: sft
642+
phase: resume
637643
pre_action:
638644
command: 'python ./autotest/utils/update_meta.py /mnt/shared-storage-user/llmrazor-share/qa-llm-cicd/test_output qwen3-5-sft-sp4-resume-vl sft'
639645
parameters:
@@ -891,6 +897,7 @@ case:
891897
qwen3-5-rl-vl-lmdeploy-resume:
892898
-
893899
type: rl
900+
phase: first
894901
parameters:
895902
config: autotest/config/rl_qwen3p5_vl_35B_dapo_ep2_resume.py
896903
infer_backend: lmdeploy
@@ -935,6 +942,7 @@ case:
935942

936943
-
937944
type: rl
945+
phase: resume
938946
pre_action:
939947
command: 'python ./autotest/utils/update_meta.py /mnt/shared-storage-user/llmrazor-share/qa-llm-cicd/test_output qwen3-5-rl-vl-lmdeploy-resume rl'
940948
parameters:

autotest/module/train.py

Lines changed: 108 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,22 @@
1+
import json
12
import os
3+
import shutil
4+
from typing import Any
25

36
from utils.check_metric import check_result, check_rl_result
47
from utils.run_cmd import run_cmd
58

69

10+
FIRST_RUN_TRACKER_SNAPSHOT = "_first_run_tracker.jsonl"
11+
12+
713
class Train:
814
def get_cmd(config):
915
print(config)
1016
config_path = config.get("parameters").get("config")
1117
train_type = config.get("type")
1218
nproc_per_node = config.get("resource", {}).get("gpus_per_task", 8)
13-
pip_package = config.get("resource", {}).get("pip_package", 'ls')
19+
pip_package = config.get("resource", {}).get("pip_package", "ls")
1420
if train_type in ["sft", "rl"]:
1521
model_config = config.get("parameters", {}).get("model", None)
1622
config_path = config.get("parameters", {}).get("config", None)
@@ -70,22 +76,28 @@ def get_cmd(config):
7076

7177
def validate(config):
7278
work_dir = config.get("work_dir", None)
73-
base_path = os.path.join(
74-
config.get("base_path").get("base_baseline_path"), config.get("assert_info", {}).get("base_metric", None)
75-
)
79+
base_metric = config.get("assert_info", {}).get("base_metric", None)
80+
base_path = os.path.join(config.get("base_path").get("base_baseline_path"), base_metric)
7681
train_type = config.get("type")
82+
case_name = config["case_name"]
83+
phase = config.get("phase")
84+
context = config.get("context", {})
85+
86+
cur_path = resolve_tracker_path(work_dir, train_type, phase, context=context)
87+
7788
if train_type == "sft":
78-
cur_path = os.path.join(get_latest_subdir(work_dir), "logs/exp_tracking/rank0/tracker.jsonl")
7989
check_metrics = config.get("assert_info", {}).get("check_metrics", {})
80-
return check_result(config["case_name"], base_path, cur_path, check_metrics)
90+
result = check_result(case_name, base_path, cur_path, check_metrics, phase=phase)
8191
elif train_type == "rl":
82-
cur_path = os.path.join(get_latest_subdir(work_dir), "logs/exp_tracking/tracker.jsonl")
8392
check_metrics = config.get("assert_info", {})
84-
return check_rl_result(config["case_name"], base_path, cur_path, check_metrics)
93+
result = check_rl_result(case_name, base_path, cur_path, check_metrics, phase=phase)
8594
else:
8695
print("Unknown type: {train_type}")
8796
return False
8897

98+
snapshot_first_run_tracker(work_dir, phase, cur_path, context=context)
99+
return result
100+
89101
def pre_action(config=None):
90102
action_info = config.get("pre_action", None)
91103
if action_info:
@@ -101,12 +113,92 @@ def post_action(config=None):
101113
run_cmd(action_cmd)
102114

103115

104-
def get_latest_subdir(work_dir):
105-
dirs = [
106-
d for d in os.listdir(work_dir) if os.path.isdir(os.path.join(work_dir, d)) and len(d) == 14 and d.isdigit()
107-
]
116+
def list_timestamp_subdirs(work_dir: str) -> list[str]:
117+
return sorted(
118+
name
119+
for name in os.listdir(work_dir)
120+
if os.path.isdir(os.path.join(work_dir, name)) and len(name) == 14 and name.isdigit()
121+
)
122+
123+
124+
def _tracker_relpath(train_type: str) -> str:
125+
if train_type == "sft":
126+
return "logs/exp_tracking/rank0/tracker.jsonl"
127+
return "logs/exp_tracking/tracker.jsonl"
128+
129+
130+
def _tracker_path(exp_dir: str | None, train_type: str) -> str:
131+
return os.path.join(exp_dir, _tracker_relpath(train_type))
132+
133+
134+
def _snapshot_path(work_dir: str) -> str:
135+
return os.path.join(work_dir, FIRST_RUN_TRACKER_SNAPSHOT)
136+
137+
138+
def _write_first_run_segment(src: str, dst: str) -> None:
139+
os.makedirs(os.path.dirname(dst), exist_ok=True)
140+
seen_steps: set[Any] = set()
141+
with open(src, encoding="utf-8") as fin, open(dst, "w", encoding="utf-8") as fout:
142+
for line in fin:
143+
if not line.strip():
144+
continue
145+
step = json.loads(line).get("step")
146+
if step in seen_steps:
147+
break
148+
seen_steps.add(step)
149+
fout.write(line if line.endswith("\n") else f"{line}\n")
150+
151+
152+
def _has_duplicate_steps(tracker_path: str) -> bool:
153+
steps: list[Any] = []
154+
with open(tracker_path, encoding="utf-8") as f:
155+
for line in f:
156+
if line.strip():
157+
steps.append(json.loads(line).get("step"))
158+
return len(steps) != len(set(steps))
159+
160+
161+
def resolve_tracker_path(
162+
work_dir: str,
163+
train_type: str,
164+
phase: str | None,
165+
context: dict[str, Any] | None = None,
166+
) -> str:
167+
context = context or {}
168+
snapshot = context.get("first_run_tracker") or _snapshot_path(work_dir)
169+
170+
if phase == "first":
171+
if os.path.isfile(snapshot):
172+
return snapshot
173+
174+
subdirs = list_timestamp_subdirs(work_dir)
175+
if len(subdirs) > 1:
176+
exp_dir = os.path.join(work_dir, subdirs[0])
177+
else:
178+
exp_dir = os.path.join(work_dir, subdirs[-1]) if subdirs else None
179+
live_tracker = _tracker_path(exp_dir, train_type)
180+
181+
if os.path.isfile(live_tracker) and _has_duplicate_steps(live_tracker):
182+
_write_first_run_segment(live_tracker, snapshot)
183+
if os.path.isfile(snapshot) and os.path.getsize(snapshot) > 0:
184+
return snapshot
185+
return live_tracker
186+
187+
subdirs = list_timestamp_subdirs(work_dir)
188+
exp_dir = os.path.join(work_dir, subdirs[-1]) if subdirs else None
189+
return _tracker_path(exp_dir, train_type)
190+
108191

109-
if not dirs:
110-
return None
111-
latest = max(dirs, key=lambda d: os.path.getmtime(os.path.join(work_dir, d)))
112-
return os.path.join(work_dir, latest)
192+
def snapshot_first_run_tracker(
193+
work_dir: str,
194+
phase: str | None,
195+
cur_path: str,
196+
context: dict[str, Any] | None = None,
197+
) -> None:
198+
if phase != "first" or not os.path.isfile(cur_path):
199+
return
200+
snapshot = _snapshot_path(work_dir)
201+
if cur_path != snapshot:
202+
shutil.copy2(cur_path, snapshot)
203+
if context is not None:
204+
context["first_run_tracker"] = snapshot

autotest/utils/check_metric.py

Lines changed: 96 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,13 @@
1010
)
1111
logger = logging.getLogger(__name__)
1212

13+
MEMORY_GRADIENT_WARMUP_STEPS = 5
14+
MEMORY_GRADIENT_MIN_SEGMENT_LEN = 8
15+
MEMORY_GRADIENT_POSITIVE_RATIO = 0.65
16+
MEMORY_GRADIENT_MIN_SLOPE_GB = 1e-4
17+
MEMORY_GRADIENT_MIN_REL_DRIFT = 0.00015
18+
MEMORY_GRADIENT_RESUME_DROP_GB = 0.005
19+
1320

1421
def extract_value(file, metrics):
1522
metric_all = {metric: [] for metric in metrics}
@@ -25,7 +32,57 @@ def extract_value(file, metrics):
2532
return total_step, metric_all
2633

2734

28-
def check_result(case_name, base_path, cur_path, check_metric):
35+
def _split_memory_segments(values: np.ndarray) -> list[np.ndarray]:
36+
if len(values) < MEMORY_GRADIENT_MIN_SEGMENT_LEN:
37+
return [values]
38+
39+
segments: list[np.ndarray] = []
40+
start = 0
41+
for idx in range(1, len(values)):
42+
dropped = values[idx - 1] - values[idx]
43+
if dropped >= MEMORY_GRADIENT_RESUME_DROP_GB:
44+
if idx - start >= MEMORY_GRADIENT_MIN_SEGMENT_LEN:
45+
segments.append(values[start:idx])
46+
start = idx
47+
if len(values) - start >= MEMORY_GRADIENT_MIN_SEGMENT_LEN:
48+
segments.append(values[start:])
49+
return segments or [values]
50+
51+
52+
def detect_memory_upward_gradient(values: list[float]) -> tuple[bool, str]:
53+
"""Detect sustained upward memory drift (possible leak) in the current
54+
run."""
55+
if len(values) <= MEMORY_GRADIENT_WARMUP_STEPS + MEMORY_GRADIENT_MIN_SEGMENT_LEN:
56+
return False, ""
57+
58+
series = np.asarray(values[MEMORY_GRADIENT_WARMUP_STEPS:], dtype=float)
59+
60+
for seg_idx, segment in enumerate(_split_memory_segments(series)):
61+
if len(segment) < MEMORY_GRADIENT_MIN_SEGMENT_LEN:
62+
continue
63+
64+
deltas = np.diff(segment)
65+
positive_ratio = float(np.mean(deltas > 1e-4))
66+
x = np.arange(len(segment))
67+
slope, _ = np.polyfit(x, segment, 1)
68+
mean_val = float(np.mean(segment))
69+
if mean_val < 1e-10:
70+
continue
71+
72+
relative_drift = float(slope * (len(segment) - 1) / mean_val)
73+
slope_rising = slope > MEMORY_GRADIENT_MIN_SLOPE_GB
74+
mostly_increasing = positive_ratio >= MEMORY_GRADIENT_POSITIVE_RATIO
75+
drift_too_large = relative_drift > MEMORY_GRADIENT_MIN_REL_DRIFT
76+
77+
if slope_rising and mostly_increasing and drift_too_large:
78+
return True, (
79+
f"segment {seg_idx}: slope={slope:.6f} GB/step, "
80+
f"relative_drift={relative_drift:.4f}, positive_ratio={positive_ratio:.2f}"
81+
)
82+
return False, ""
83+
84+
85+
def check_result(case_name, base_path, cur_path, check_metric, phase=None):
2986
fail_metric = {}
3087
metric_list = list(check_metric.keys())
3188
base_steps, base_metrics = extract_value(base_path, metric_list)
@@ -34,28 +91,57 @@ def check_result(case_name, base_path, cur_path, check_metric):
3491
f"current steps is not equal to base steps, current steps: {cur_steps}, base steps: {base_steps}"
3592
)
3693

37-
publish_comparison_report(case_name, check_metric, base_metrics, cur_metrics, base_path, cur_path)
94+
publish_comparison_report(case_name, check_metric, base_metrics, cur_metrics, base_path, cur_path, phase=phase)
3895

3996
for metric, threshold in check_metric.items():
4097
max_error = 0.0
4198
max_error_idx = 0
4299
check_flag = True
43100
if metric == "runtime_info/tgs":
44101
if cur_steps > 10:
45-
relative_errors = abs(np.array(base_metrics[metric][10:-1]) - np.array(cur_metrics[metric][10:-1])) / (
46-
np.array(base_metrics[metric][10:-1])
102+
base_vals = np.array(base_metrics[metric][10:-1], dtype=float)
103+
cur_vals = np.array(cur_metrics[metric][10:-1], dtype=float)
104+
degradation = np.zeros_like(base_vals, dtype=float)
105+
valid_base = np.abs(base_vals) >= 1e-10
106+
degradation[valid_base] = np.maximum(
107+
(base_vals[valid_base] - cur_vals[valid_base]) / np.abs(base_vals[valid_base]),
108+
0.0,
47109
)
48-
max_error = np.percentile(relative_errors, 80)
110+
max_error = float(np.percentile(degradation, 80))
49111
if max_error > threshold:
50112
fail_metric[metric] = (
51-
f"{metric} relative error bigger than {threshold} after 10 step, baseline: {base_metrics[metric][10:-1]}, now: {cur_metrics[metric][10:-1]}, relative error: {relative_errors}"
113+
f"{metric} degradation bigger than {threshold} after step 10, "
114+
f"baseline: {base_metrics[metric][10:-1]}, now: {cur_metrics[metric][10:-1]}, "
115+
f"degradation: {degradation.tolist()}"
52116
)
53117
check_flag = False
54118
else:
55119
check_flag = True
56120
else:
57121
logger.warning("It's meaningless to compare tgs because of the small steps.")
58122
check_flag = False
123+
elif metric == "memory/max_memory_GB":
124+
for idx, (old, cur) in enumerate(zip(base_metrics[metric], cur_metrics[metric])):
125+
if abs(old) < 1e-10:
126+
relative_error = float("inf") if abs(cur) > 1e-10 else 0.0
127+
else:
128+
relative_error = round(abs(old - cur) / abs(old), 2)
129+
if relative_error > max_error:
130+
max_error = relative_error
131+
max_error_idx = idx
132+
if relative_error > threshold:
133+
fail_metric[metric] = (
134+
f"{metric} relative error bigger than {threshold} in {idx} steps, "
135+
f"baseline: {old:.6f}, now: {cur:.6f}, relative error: {relative_error}"
136+
)
137+
check_flag = False
138+
break
139+
140+
if check_flag:
141+
has_gradient, gradient_info = detect_memory_upward_gradient(cur_metrics[metric])
142+
if has_gradient:
143+
fail_metric[metric] = f"{metric} shows sustained upward gradient in current run, {gradient_info}"
144+
check_flag = False
59145
else:
60146
for idx, (old, cur) in enumerate(zip(base_metrics[metric], cur_metrics[metric])):
61147
if abs(old) < 1e-10:
@@ -82,7 +168,7 @@ def check_result(case_name, base_path, cur_path, check_metric):
82168
return result, f"Some metric check failed: {fail_metric}"
83169

84170

85-
def check_rl_result(case_name, base_path, cur_path, assert_info):
171+
def check_rl_result(case_name, base_path, cur_path, assert_info, phase=None):
86172
fail_metric = {}
87173
check_metrics_list = assert_info["check_metrics"]
88174

@@ -96,7 +182,9 @@ def check_rl_result(case_name, base_path, cur_path, assert_info):
96182
)
97183

98184
check_metric_dict = {item["metric"]: item["threshold"] for item in check_metrics_list}
99-
publish_comparison_report(case_name, check_metric_dict, base_metrics, cur_metrics, base_path, cur_path)
185+
publish_comparison_report(
186+
case_name, check_metric_dict, base_metrics, cur_metrics, base_path, cur_path, phase=phase
187+
)
100188

101189
for config in check_metrics_list:
102190
metric = config["metric"]

0 commit comments

Comments
 (0)