Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions automated_security_helper/base/scanner_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
from abc import abstractmethod

# Pattern for valid CLI flag keys: one or two leading dashes followed by
# a letter, then alphanumerics/underscores/hyphens.
_VALID_FLAG_KEY_PATTERN = re.compile(r"^-{1,2}[A-Za-z][A-Za-z0-9_\-]*$")
# a letter, then alphanumerics/underscores/hyphens. Optionally followed by
# '=' and a value (e.g., --exclude="path1,path2" or --skip-path=".venv/").
_VALID_FLAG_KEY_PATTERN = re.compile(r"^-{1,2}[A-Za-z][A-Za-z0-9_\-]*(=.*)?$")
from pathlib import Path


Expand Down
153 changes: 146 additions & 7 deletions automated_security_helper/cli/mcp_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -647,6 +647,10 @@ async def get_scan_results(
if scanners or severities:
results = _apply_content_filters(results, scanners, severities)

# When actionable_only is requested, extract a flat findings list for easy consumption
if actionable_only:
results = _add_findings_list(results)

# Apply response size filter based on parameter
if filter_level == "full":
# Return full results for backward compatibility
Expand Down Expand Up @@ -773,18 +777,58 @@ def _filter_minimal(results: Dict[str, Any]) -> Dict[str, Any]:
}


def _get_finding_severity(result: Dict[str, Any]) -> str:
"""
Determine the severity of a SARIF finding.

Checks explicit issue_severity in properties first, then falls back to
mapping the SARIF level to a severity string.

Args:
result: A single SARIF result dict

Returns:
Uppercase severity string (CRITICAL, HIGH, MEDIUM, LOW, INFO)
"""
# SARIF level -> severity mapping
_SARIF_LEVEL_TO_SEVERITY = {
"error": "CRITICAL",
"warning": "MEDIUM",
"note": "LOW",
"none": "INFO",
}

# Check explicit issue_severity in properties first
props = result.get("properties", {}) or {}
issue_severity = (props.get("issue_severity") or "").upper()
if issue_severity in ("CRITICAL", "HIGH", "MEDIUM", "LOW", "INFO"):
return issue_severity

# Fall back to SARIF level mapping
level = (result.get("level") or "note").lower()
return _SARIF_LEVEL_TO_SEVERITY.get(level, "LOW")


def _is_finding_suppressed(result: Dict[str, Any]) -> bool:
"""Check if a SARIF finding is suppressed."""
suppressions = result.get("suppressions")
return bool(suppressions and len(suppressions) > 0)


def _filter_actionable_only(results: Dict[str, Any]) -> Dict[str, Any]:
"""
Filter results to exclude suppressed findings.
Filter results to exclude suppressed findings and return only actionable findings.

This removes findings that have been marked as false positives or accepted risks,
returning only actionable findings that require attention.
returning only actionable findings that require attention. The SARIF results are
filtered to contain only non-suppressed findings.

Args:
results: The full scan results

Returns:
Filtered results with suppressed findings excluded
Filtered results with suppressed findings excluded and SARIF results
containing only actionable findings
"""
import copy

Expand All @@ -801,10 +845,7 @@ def _filter_actionable_only(results: Dict[str, Any]) -> Dict[str, Any]:
run["results"] = [
result
for result in run["results"]
if not (
result.get("suppressions")
and len(result.get("suppressions", [])) > 0
)
if not _is_finding_suppressed(result)
]

# Update summary stats to reflect only actionable findings
Expand Down Expand Up @@ -839,12 +880,76 @@ def _filter_actionable_only(results: Dict[str, Any]) -> Dict[str, Any]:
return filtered_results


def _add_findings_list(results: Dict[str, Any]) -> Dict[str, Any]:
"""
Extract a flat list of actionable findings from SARIF results and add it
to the top level of the response for easy consumption.

This provides a clear, focused list of findings that require attention,
rather than requiring consumers to navigate the nested SARIF structure.

Args:
results: The filtered scan results (after actionable_only and severity filters)

Returns:
Results with an added top-level 'findings' list
"""
findings_list = []

# Extract findings from SARIF runs
sarif = results.get("raw_results", {}).get("sarif", {})
if "runs" in sarif:
for run in sarif["runs"]:
for result in run.get("results", []):
props = result.get("properties", {}) or {}
locations = result.get("locations", [])

# Extract location info
file_path = None
line_start = None
line_end = None
if locations:
phys_loc = (
locations[0].get("physicalLocation", {}) if locations else {}
)
artifact_loc = phys_loc.get("artifactLocation", {})
file_path = artifact_loc.get("uri")
region = phys_loc.get("region", {})
line_start = region.get("startLine")
line_end = region.get("endLine")

finding = {
"rule_id": result.get("ruleId"),
"severity": _get_finding_severity(result),
"message": (
result.get("message", {}).get("text")
or result.get("message", {}).get("markdown")
),
"scanner": props.get("scanner_name"),
"file_path": file_path,
"line_start": line_start,
"line_end": line_end,
}

# Only include non-None values
finding = {k: v for k, v in finding.items() if v is not None}
findings_list.append(finding)

results["findings"] = findings_list
results["findings_count"] = len(findings_list)

return results


def _apply_content_filters(
results: Dict[str, Any], scanners: str = None, severities: str = None
) -> Dict[str, Any]:
"""
Filter results by scanner names and/or severity levels.

This filters both the metadata (severity_counts, scanner_results) AND the actual
SARIF findings to only include results matching the specified criteria.

Args:
results: The full scan results
scanners: Comma-separated list of scanner names (e.g., "bandit,semgrep")
Expand All @@ -866,6 +971,30 @@ def _apply_content_filters(
# Create a deep copy to avoid modifying the original
filtered_results = copy.deepcopy(results)

# Filter SARIF findings by scanner and/or severity
if "raw_results" in filtered_results and "sarif" in filtered_results["raw_results"]:
sarif = filtered_results["raw_results"]["sarif"]
if "runs" in sarif:
for run in sarif["runs"]:
if "results" in run and run["results"]:
filtered_sarif_results = []
for result in run["results"]:
# Filter by scanner name if specified
if scanner_list:
props = result.get("properties", {}) or {}
result_scanner = (props.get("scanner_name") or "").lower()
if result_scanner and result_scanner not in scanner_list:
continue

# Filter by severity if specified
if severity_list:
finding_severity = _get_finding_severity(result).lower()
if finding_severity not in severity_list:
continue

filtered_sarif_results.append(result)
run["results"] = filtered_sarif_results

# Filter scanner_reports
if "scanner_reports" in filtered_results and scanner_list:
filtered_scanner_reports = {}
Expand Down Expand Up @@ -955,6 +1084,16 @@ def _apply_content_filters(
"suppressed",
]:
filtered_summary_stats[key] = 0
# Recalculate actionable based on filtered severity counts if it existed
if "actionable" in filtered_summary_stats:
severity_keys = ["critical", "high", "medium", "low", "info"]
new_actionable = sum(
filtered_summary_stats.get(k, 0) for k in severity_keys
)
filtered_summary_stats["actionable"] = new_actionable
filtered_summary_stats["total"] = (
new_actionable + filtered_summary_stats.get("suppressed", 0)
)
filtered_results["summary_stats"] = filtered_summary_stats

# Add filter metadata
Expand Down
5 changes: 4 additions & 1 deletion automated_security_helper/cli/scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,10 @@ def run_ash_scan_cli_command(
if source_dir is None:
source_dir = Path.cwd().as_posix()
if output_dir is None:
output_dir = Path.cwd().joinpath(".ash", "ash_output").as_posix()
# Default output_dir is relative to source_dir, not CWD.
# This ensures that when --source-dir points to a different project,
# the output (and config resolution) happens in the correct location.
output_dir = Path(source_dir).joinpath(".ash", "ash_output").as_posix()

if Path(source_dir).absolute().as_posix() == Path(output_dir).absolute().as_posix():
output_dir = Path(output_dir).joinpath(".ash", "ash_output")
Expand Down
114 changes: 114 additions & 0 deletions automated_security_helper/config/config_linter.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class LintCategory(str, Enum):
SUPPRESSION_LINE_RANGE = "suppression-line-range"
SUPPRESSION_EXPIRED = "suppression-expired"
SUPPRESSION_UNUSED = "suppression-unused"
IGNORE_PATH_ISSUE = "ignore-path-issue"
SYNTAX_ERROR = "syntax-error"


Expand Down Expand Up @@ -118,13 +119,16 @@ def lint(
config_path: Path,
output_dir: Optional[Path] = None,
check_unused: bool = False,
source_dir: Optional[Path] = None,
) -> LintResult:
"""Lint a configuration file and return all issues found.

Args:
config_path: Path to the configuration file
output_dir: Path to the ASH output directory (for unused suppressions check)
check_unused: Whether to check for unused suppressions
source_dir: Path to the source directory (for resolving ignore_path patterns).
If None, inferred from config_path location.

Returns:
LintResult with all issues found
Expand All @@ -150,6 +154,9 @@ def lint(
# Run suppression-specific lint checks
cls._check_suppression_issues(config_data, result)

# Run ignore_path and suppression path checks
cls._check_ignore_path_issues(config_path, config_data, result, source_dir)

# Check for unused suppressions if requested
if check_unused:
cls._check_unused_suppressions(config_path, config_data, output_dir, result)
Expand Down Expand Up @@ -625,6 +632,113 @@ def _check_suppression_issues(
)
)

@classmethod
def _check_ignore_path_issues(
cls,
config_path: Path,
config_data: Dict[str, Any],
result: LintResult,
source_dir: Optional[Path] = None,
) -> None:
"""Check ignore_paths and suppression paths for common issues.

Detects:
- Paths that point to existing directories but lack a '**' glob suffix,
which means they won't match any files inside the directory.
- Only warns when the path actually exists as a directory in the repo
(avoids false positives for virtual environments that may not be initialized).
"""
# Determine the project root for resolving relative paths
if source_dir is not None:
project_root = source_dir.resolve()
else:
config_parent = config_path.resolve().parent
# If config is in .ash/, go up one level to project root
if config_parent.name == ".ash":
project_root = config_parent.parent
else:
project_root = config_parent

global_settings = config_data.get("global_settings", {})
if not isinstance(global_settings, dict):
return

# Check ignore_paths
ignore_paths = global_settings.get("ignore_paths", [])
if isinstance(ignore_paths, list):
for i, entry in enumerate(ignore_paths):
if not isinstance(entry, dict):
continue
path_pattern = entry.get("path", "")
if path_pattern:
cls._check_single_path_pattern(
path_pattern=path_pattern,
path_prefix=f"global_settings.ignore_paths[{i}].path",
project_root=project_root,
result=result,
)

# Check suppression paths
suppressions = global_settings.get("suppressions", [])
if isinstance(suppressions, list):
for i, entry in enumerate(suppressions):
if not isinstance(entry, dict):
continue
path_pattern = entry.get("path", "")
if path_pattern:
cls._check_single_path_pattern(
path_pattern=path_pattern,
path_prefix=f"global_settings.suppressions[{i}].path",
project_root=project_root,
result=result,
)

@classmethod
def _check_single_path_pattern(
cls,
path_pattern: str,
path_prefix: str,
project_root: Path,
result: LintResult,
) -> None:
"""Check a single path pattern for directory-without-glob issues.

A path like 'tests/test_data' or 'tests/test_data/' that points to an
existing directory will silently fail to match any files inside it.
The user likely intended 'tests/test_data/**'.
"""
# Skip patterns that already contain glob characters that would match files
# Patterns with ** already handle recursive matching
if "**" in path_pattern:
return

# Strip trailing slash for directory check
clean_path = path_pattern.rstrip("/")

# Skip patterns with wildcards in the filename portion (e.g., "src/*.py")
# These are valid file-matching patterns
basename = Path(clean_path).name
if "*" in basename or "?" in basename or "[" in basename:
return

# Check if this path resolves to an existing directory
# We strip leading **/ patterns that won't exist as literal paths
candidate = project_root / clean_path
if candidate.is_dir():
# The path exists as a directory — warn that it needs /**
result.issues.append(
LintIssue(
severity=LintSeverity.WARNING,
category=LintCategory.IGNORE_PATH_ISSUE,
message=(
f"Path '{path_pattern}' points to a directory but lacks a '**' glob suffix. "
f"This pattern will not match files inside the directory. "
f"Use '{clean_path}/**' to ignore all files recursively."
),
path=path_prefix,
)
)

@classmethod
def _check_unused_suppressions(
cls,
Expand Down
Loading
Loading