Skip to content

[FEATURE][POLICY]: Policy impact analysis and what-if simulation #2240

@crivetimihai

Description

@crivetimihai

Summary

Implement a policy impact analysis system that simulates the effects of policy changes before deployment, showing exactly which users, tools, and resources will be affected by proposed modifications.

Parent Epic

Related Issues

Problem Statement

Policy administrators face significant challenges:

  • Blind Changes: Cannot predict impact of policy modifications
  • Regression Risk: Changes may inadvertently revoke critical access
  • Compliance Gaps: Hard to verify policies meet requirements before deployment
  • Stakeholder Communication: Difficult to explain policy changes to affected teams
  • Rollout Anxiety: Fear of breaking production access prevents necessary updates

Organizations need "what-if" analysis to safely evolve their authorization model.

Proposed Solution

Architecture

┌─────────────────────────────────────────────────────────────────┐
│                  Policy Impact Analyzer                         │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│   ┌─────────────┐     ┌─────────────┐     ┌─────────────┐      │
│   │   Current   │     │  Proposed   │     │   Access    │      │
│   │   Policies  │     │  Changes    │     │   Patterns  │      │
│   └──────┬──────┘     └──────┬──────┘     └──────┬──────┘      │
│          │                   │                   │              │
│          └───────────────────┼───────────────────┘              │
│                              ▼                                  │
│                    ┌─────────────────┐                         │
│                    │    Diff Engine  │                         │
│                    └────────┬────────┘                         │
│                             │                                   │
│          ┌──────────────────┼──────────────────┐               │
│          ▼                  ▼                  ▼               │
│   ┌─────────────┐   ┌─────────────┐   ┌─────────────┐         │
│   │  Access     │   │  Coverage   │   │  Risk       │         │
│   │  Delta      │   │  Analysis   │   │  Assessment │         │
│   └─────────────┘   └─────────────┘   └─────────────┘         │
│                             │                                   │
│                             ▼                                   │
│                    ┌─────────────────┐                         │
│                    │  Impact Report  │                         │
│                    └─────────────────┘                         │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Core Components

1. Policy Diff Engine

class PolicyDiffEngine:
    """Compute differences between policy versions."""
    
    async def compute_diff(
        self,
        current: PolicySet,
        proposed: PolicySet
    ) -> PolicyDiff:
        """Compute semantic diff between policy sets."""
        return PolicyDiff(
            added_policies=self._find_added(current, proposed),
            removed_policies=self._find_removed(current, proposed),
            modified_policies=self._find_modified(current, proposed),
            permission_changes=await self._compute_permission_changes(
                current, proposed
            )
        )
    
    async def _compute_permission_changes(
        self,
        current: PolicySet,
        proposed: PolicySet
    ) -> list[PermissionChange]:
        """Compute actual permission changes."""
        changes = []
        
        # Get all principal-action-resource combinations
        all_combinations = self._enumerate_combinations(current, proposed)
        
        for combo in all_combinations:
            current_decision = await self.pdp.evaluate(
                policies=current,
                principal=combo.principal,
                action=combo.action,
                resource=combo.resource
            )
            
            proposed_decision = await self.pdp.evaluate(
                policies=proposed,
                principal=combo.principal,
                action=combo.action,
                resource=combo.resource
            )
            
            if current_decision != proposed_decision:
                changes.append(PermissionChange(
                    principal=combo.principal,
                    action=combo.action,
                    resource=combo.resource,
                    current=current_decision,
                    proposed=proposed_decision,
                    change_type=self._classify_change(
                        current_decision, proposed_decision
                    )
                ))
        
        return changes

2. Impact Analyzer

class PolicyImpactAnalyzer:
    """Analyze real-world impact of policy changes."""
    
    async def analyze_impact(
        self,
        diff: PolicyDiff,
        historical_access: AccessLog
    ) -> ImpactReport:
        """Analyze impact using historical access patterns."""
        affected_users = set()
        affected_tools = set()
        breaking_changes = []
        
        for change in diff.permission_changes:
            # Find users who would be affected
            users = await self._find_affected_users(
                change, historical_access
            )
            affected_users.update(users)
            
            # Find tools that would be affected
            tools = await self._find_affected_tools(change)
            affected_tools.update(tools)
            
            # Check if this breaks existing workflows
            if change.change_type == ChangeType.REVOKED:
                recent_usage = await self._check_recent_usage(
                    change, historical_access
                )
                if recent_usage.count > 0:
                    breaking_changes.append(BreakingChange(
                        change=change,
                        affected_users=users,
                        recent_usage_count=recent_usage.count,
                        last_used=recent_usage.last_access
                    ))
        
        return ImpactReport(
            diff=diff,
            affected_users=list(affected_users),
            affected_tools=list(affected_tools),
            breaking_changes=breaking_changes,
            risk_score=self._calculate_risk_score(
                diff, breaking_changes
            ),
            recommendations=self._generate_recommendations(
                diff, breaking_changes
            )
        )
    
    async def _find_affected_users(
        self,
        change: PermissionChange,
        historical: AccessLog
    ) -> set[str]:
        """Find users who match the principal pattern."""
        if change.principal.startswith("Role::"):
            role = self._extract_role(change.principal)
            return await self.user_service.get_users_with_role(role)
        return {change.principal}

3. What-If Simulator

class WhatIfSimulator:
    """Simulate specific access scenarios."""
    
    async def simulate(
        self,
        scenario: WhatIfScenario,
        policy_set: PolicySet
    ) -> SimulationResult:
        """Simulate a specific access scenario."""
        results = []
        
        for request in scenario.requests:
            decision = await self.pdp.evaluate(
                policies=policy_set,
                principal=request.principal,
                action=request.action,
                resource=request.resource,
                context=request.context
            )
            
            results.append(SimulationResult(
                request=request,
                decision=decision,
                matching_policies=self._find_matching_policies(
                    request, policy_set, decision
                ),
                evaluation_trace=self._get_evaluation_trace()
            ))
        
        return SimulationBatchResult(
            scenario=scenario,
            results=results,
            summary=self._summarize_results(results)
        )
    
    async def compare_scenarios(
        self,
        scenario: WhatIfScenario,
        current_policies: PolicySet,
        proposed_policies: PolicySet
    ) -> ScenarioComparison:
        """Compare scenario results between policy versions."""
        current_results = await self.simulate(scenario, current_policies)
        proposed_results = await self.simulate(scenario, proposed_policies)
        
        differences = []
        for curr, prop in zip(current_results.results, proposed_results.results):
            if curr.decision != prop.decision:
                differences.append(ScenarioDifference(
                    request=curr.request,
                    current_decision=curr.decision,
                    proposed_decision=prop.decision,
                    current_policies=curr.matching_policies,
                    proposed_policies=prop.matching_policies
                ))
        
        return ScenarioComparison(
            scenario=scenario,
            current=current_results,
            proposed=proposed_results,
            differences=differences
        )

4. Coverage Analyzer

class PolicyCoverageAnalyzer:
    """Analyze policy coverage across the system."""
    
    async def analyze_coverage(
        self,
        policies: PolicySet,
        inventory: SystemInventory
    ) -> CoverageReport:
        """Analyze policy coverage against system inventory."""
        uncovered = []
        partially_covered = []
        
        for tool in inventory.tools:
            coverage = await self._check_tool_coverage(tool, policies)
            
            if coverage.level == CoverageLevel.NONE:
                uncovered.append(UncoveredResource(
                    resource=tool,
                    type="tool",
                    recommendation=f"Add policy for {tool.name}"
                ))
            elif coverage.level == CoverageLevel.PARTIAL:
                partially_covered.append(PartiallyCoveredResource(
                    resource=tool,
                    covered_actions=coverage.covered_actions,
                    uncovered_actions=coverage.uncovered_actions
                ))
        
        return CoverageReport(
            total_resources=len(inventory.tools) + len(inventory.servers),
            fully_covered=len(inventory.tools) - len(uncovered) - len(partially_covered),
            partially_covered=len(partially_covered),
            uncovered=len(uncovered),
            details=CoverageDetails(
                uncovered_resources=uncovered,
                partially_covered_resources=partially_covered
            ),
            coverage_percentage=self._calculate_coverage_percentage(
                inventory, uncovered, partially_covered
            )
        )

Impact Report Schema

@dataclass
class ImpactReport:
    """Comprehensive policy change impact report."""
    
    # Change summary
    policies_added: int
    policies_removed: int
    policies_modified: int
    
    # Permission changes
    permissions_granted: list[PermissionChange]
    permissions_revoked: list[PermissionChange]
    permissions_modified: list[PermissionChange]
    
    # Affected entities
    affected_users: list[AffectedUser]
    affected_roles: list[str]
    affected_tools: list[str]
    affected_servers: list[str]
    
    # Risk assessment
    risk_score: float  # 0-100
    risk_level: RiskLevel  # LOW, MEDIUM, HIGH, CRITICAL
    breaking_changes: list[BreakingChange]
    
    # Recommendations
    recommendations: list[Recommendation]
    rollback_plan: RollbackPlan
    
    def to_slack_message(self) -> dict:
        """Format as Slack message."""
        color = {
            RiskLevel.LOW: "good",
            RiskLevel.MEDIUM: "warning", 
            RiskLevel.HIGH: "danger",
            RiskLevel.CRITICAL: "danger"
        }[self.risk_level]
        
        return {
            "attachments": [{
                "color": color,
                "title": f"Policy Change Impact Analysis",
                "fields": [
                    {"title": "Risk Score", "value": f"{self.risk_score}/100", "short": True},
                    {"title": "Affected Users", "value": str(len(self.affected_users)), "short": True},
                    {"title": "Breaking Changes", "value": str(len(self.breaking_changes)), "short": True},
                    {"title": "Permissions Revoked", "value": str(len(self.permissions_revoked)), "short": True}
                ]
            }]
        }

Configuration

plugins:
  - name: "PolicyImpactAnalyzer"
    kind: "policy_impact.plugin.PolicyImpactAnalyzer"
    version: "0.1.0"
    hooks: ["policy_pre_deploy", "api_endpoint"]
    config:
      analysis:
        use_historical_access: true
        historical_window_days: 30
        enumerate_all_combinations: false  # Use sampling for large sets
        sample_size: 10000
        
      risk_thresholds:
        low: 0-25
        medium: 26-50
        high: 51-75
        critical: 76-100
        
      blocking:
        block_on_risk_level: critical
        block_on_breaking_changes: true
        require_approval_above: medium
        
      notifications:
        notify_affected_users: true
        slack_channel: "#policy-changes"
        email_on_breaking: true

REST API

POST /api/v1/policies/impact           # Analyze policy change impact
POST /api/v1/policies/whatif           # Run what-if simulation
GET  /api/v1/policies/coverage         # Get coverage report
POST /api/v1/policies/compare          # Compare two policy versions
GET  /api/v1/policies/affected-users   # Get affected users for change

Acceptance Criteria

  • Computes permission diff between policy versions
  • Identifies users affected by proposed changes
  • Detects breaking changes based on historical access
  • Calculates risk score for policy changes
  • Supports what-if scenario simulation
  • Generates coverage reports
  • Provides rollback recommendations
  • Integrates with approval workflow ([FEATURE][AUTH]: Just-in-time (JIT) access and temporary privilege elevation #2227)
  • Notifies affected users via Slack/email

Implementation Phases

Phase 1: Diff Engine

  • Policy parsing and normalization
  • Permission change computation
  • Basic diff visualization

Phase 2: Impact Analysis

  • Historical access pattern analysis
  • Affected user identification
  • Breaking change detection

Phase 3: What-If Simulator

  • Scenario definition DSL
  • Batch simulation execution
  • Version comparison

Phase 4: Risk Assessment

  • Risk scoring algorithm
  • Approval workflow integration
  • Notification system

Security Considerations

  • Impact reports contain sensitive access patterns
  • RBAC required for analysis endpoints
  • Anonymize user data in shared reports
  • Rate limiting on simulation endpoints
  • Audit logging for all analysis requests

Dependencies

References

Metadata

Metadata

Assignees

No one assigned

    Labels

    COULDP3: Nice-to-have features with minimal impact if left out; included if time permitsenhancementNew feature or requestpluginspythonPython / backend development (FastAPI)sweng-group-5Group 5 - Policy-as-Code Security & Compliance AutomationtcdSwEng Projects

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions