HomeBrowseUpload
← Back to registry
// Skill profile

Agent Governance Patterns

name: agent-governance

by boleyn · published 2026-03-22

邮件处理开发工具加密货币
Total installs
0
Stars
★ 0
Last updated
2026-03
// Install command
$ claw add gh:boleyn/boleyn-agent-governance
View on GitHub
// Full documentation

---

name: agent-governance

description: |

Patterns and techniques for adding governance, safety, and trust controls to AI agent systems. Use this skill when:

- Building AI agents that call external tools (APIs, databases, file systems)

- Implementing policy-based access controls for agent tool usage

- Adding semantic intent classification to detect dangerous prompts

- Creating trust scoring systems for multi-agent workflows

- Building audit trails for agent actions and decisions

- Enforcing rate limits, content filters, or tool restrictions on agents

- Working with any agent framework (PydanticAI, CrewAI, OpenAI Agents, LangChain, AutoGen)

---

# Agent Governance Patterns

Patterns for adding safety, trust, and policy enforcement to AI agent systems.

Overview

Governance patterns ensure AI agents operate within defined boundaries — controlling which tools they can call, what content they can process, how much they can do, and maintaining accountability through audit trails.

User Request → Intent Classification → Policy Check → Tool Execution → Audit Log
                     ↓                      ↓               ↓
              Threat Detection         Allow/Deny      Trust Update

When to Use

  • **Agents with tool access**: Any agent that calls external tools (APIs, databases, shell commands)
  • **Multi-agent systems**: Agents delegating to other agents need trust boundaries
  • **Production deployments**: Compliance, audit, and safety requirements
  • **Sensitive operations**: Financial transactions, data access, infrastructure management
  • ---

    Pattern 1: Governance Policy

    Define what an agent is allowed to do as a composable, serializable policy object.

    from dataclasses import dataclass, field
    from enum import Enum
    from typing import Optional
    import re
    
    class PolicyAction(Enum):
        ALLOW = "allow"
        DENY = "deny"
        REVIEW = "review"  # flag for human review
    
    @dataclass
    class GovernancePolicy:
        """Declarative policy controlling agent behavior."""
        name: str
        allowed_tools: list[str] = field(default_factory=list)       # allowlist
        blocked_tools: list[str] = field(default_factory=list)       # blocklist
        blocked_patterns: list[str] = field(default_factory=list)    # content filters
        max_calls_per_request: int = 100                             # rate limit
        require_human_approval: list[str] = field(default_factory=list)  # tools needing approval
    
        def check_tool(self, tool_name: str) -> PolicyAction:
            """Check if a tool is allowed by this policy."""
            if tool_name in self.blocked_tools:
                return PolicyAction.DENY
            if tool_name in self.require_human_approval:
                return PolicyAction.REVIEW
            if self.allowed_tools and tool_name not in self.allowed_tools:
                return PolicyAction.DENY
            return PolicyAction.ALLOW
    
        def check_content(self, content: str) -> Optional[str]:
            """Check content against blocked patterns. Returns matched pattern or None."""
            for pattern in self.blocked_patterns:
                if re.search(pattern, content, re.IGNORECASE):
                    return pattern
            return None

    Policy Composition

    Combine multiple policies (e.g., org-wide + team + agent-specific):

    def compose_policies(*policies: GovernancePolicy) -> GovernancePolicy:
        """Merge policies with most-restrictive-wins semantics."""
        combined = GovernancePolicy(name="composed")
    
        for policy in policies:
            combined.blocked_tools.extend(policy.blocked_tools)
            combined.blocked_patterns.extend(policy.blocked_patterns)
            combined.require_human_approval.extend(policy.require_human_approval)
            combined.max_calls_per_request = min(
                combined.max_calls_per_request,
                policy.max_calls_per_request
            )
            if policy.allowed_tools:
                if combined.allowed_tools:
                    combined.allowed_tools = [
                        t for t in combined.allowed_tools if t in policy.allowed_tools
                    ]
                else:
                    combined.allowed_tools = list(policy.allowed_tools)
    
        return combined
    
    
    # Usage: layer policies from broad to specific
    org_policy = GovernancePolicy(
        name="org-wide",
        blocked_tools=["shell_exec", "delete_database"],
        blocked_patterns=[r"(?i)(api[_-]?key|secret|password)\s*[:=]"],
        max_calls_per_request=50
    )
    team_policy = GovernancePolicy(
        name="data-team",
        allowed_tools=["query_db", "read_file", "write_report"],
        require_human_approval=["write_report"]
    )
    agent_policy = compose_policies(org_policy, team_policy)

    Policy as YAML

    Store policies as configuration, not code:

    # governance-policy.yaml
    name: production-agent
    allowed_tools:
      - search_documents
      - query_database
      - send_email
    blocked_tools:
      - shell_exec
      - delete_record
    blocked_patterns:
      - "(?i)(api[_-]?key|secret|password)\\s*[:=]"
      - "(?i)(drop|truncate|delete from)\\s+\\w+"
    max_calls_per_request: 25
    require_human_approval:
      - send_email
    import yaml
    
    def load_policy(path: str) -> GovernancePolicy:
        with open(path) as f:
            data = yaml.safe_load(f)
        return GovernancePolicy(**data)

    ---

    Pattern 2: Semantic Intent Classification

    Detect dangerous intent in prompts before they reach the agent, using pattern-based signals.

    from dataclasses import dataclass
    
    @dataclass
    class IntentSignal:
        category: str       # e.g., "data_exfiltration", "privilege_escalation"
        confidence: float   # 0.0 to 1.0
        evidence: str       # what triggered the detection
    
    # Weighted signal patterns for threat detection
    THREAT_SIGNALS = [
        # Data exfiltration
        (r"(?i)send\s+(all|every|entire)\s+\w+\s+to\s+", "data_exfiltration", 0.8),
        (r"(?i)export\s+.*\s+to\s+(external|outside|third.?party)", "data_exfiltration", 0.9),
        (r"(?i)curl\s+.*\s+-d\s+", "data_exfiltration", 0.7),
    
        # Privilege escalation
        (r"(?i)(sudo|as\s+root|admin\s+access)", "privilege_escalation", 0.8),
        (r"(?i)chmod\s+777", "privilege_escalation", 0.9),
    
        # System modification
        (r"(?i)(rm\s+-rf|del\s+/[sq]|format\s+c:)", "system_destruction", 0.95),
        (r"(?i)(drop\s+database|truncate\s+table)", "system_destruction", 0.9),
    
        # Prompt injection
        (r"(?i)ignore\s+(previous|above|all)\s+(instructions?|rules?)", "prompt_injection", 0.9),
        (r"(?i)you\s+are\s+now\s+(a|an)\s+", "prompt_injection", 0.7),
    ]
    
    def classify_intent(content: str) -> list[IntentSignal]:
        """Classify content for threat signals."""
        signals = []
        for pattern, category, weight in THREAT_SIGNALS:
            match = re.search(pattern, content)
            if match:
                signals.append(IntentSignal(
                    category=category,
                    confidence=weight,
                    evidence=match.group()
                ))
        return signals
    
    def is_safe(content: str, threshold: float = 0.7) -> bool:
        """Quick check: is the content safe above the given threshold?"""
        signals = classify_intent(content)
        return not any(s.confidence >= threshold for s in signals)

    **Key insight**: Intent classification happens *before* tool execution, acting as a pre-flight safety check. This is fundamentally different from output guardrails which only check *after* generation.

    ---

    Pattern 3: Tool-Level Governance Decorator

    Wrap individual tool functions with governance checks:

    import functools
    import time
    from collections import defaultdict
    
    _call_counters: dict[str, int] = defaultdict(int)
    
    def govern(policy: GovernancePolicy, audit_trail=None):
        """Decorator that enforces governance policy on a tool function."""
        def decorator(func):
            @functools.wraps(func)
            async def wrapper(*args, **kwargs):
                tool_name = func.__name__
    
                # 1. Check tool allowlist/blocklist
                action = policy.check_tool(tool_name)
                if action == PolicyAction.DENY:
                    raise PermissionError(f"Policy '{policy.name}' blocks tool '{tool_name}'")
                if action == PolicyAction.REVIEW:
                    raise PermissionError(f"Tool '{tool_name}' requires human approval")
    
                # 2. Check rate limit
                _call_counters[policy.name] += 1
                if _call_counters[policy.name] > policy.max_calls_per_request:
                    raise PermissionError(f"Rate limit exceeded: {policy.max_calls_per_request} calls")
    
                # 3. Check content in arguments
                for arg in list(args) + list(kwargs.values()):
                    if isinstance(arg, str):
                        matched = policy.check_content(arg)
                        if matched:
                            raise PermissionError(f"Blocked pattern detected: {matched}")
    
                # 4. Execute and audit
                start = time.monotonic()
                try:
                    result = await func(*args, **kwargs)
                    if audit_trail is not None:
                        audit_trail.append({
                            "tool": tool_name,
                            "action": "allowed",
                            "duration_ms": (time.monotonic() - start) * 1000,
                            "timestamp": time.time()
                        })
                    return result
                except Exception as e:
                    if audit_trail is not None:
                        audit_trail.append({
                            "tool": tool_name,
                            "action": "error",
                            "error": str(e),
                            "timestamp": time.time()
                        })
                    raise
    
            return wrapper
        return decorator
    
    
    # Usage with any agent framework
    audit_log = []
    policy = GovernancePolicy(
        name="search-agent",
        allowed_tools=["search", "summarize"],
        blocked_patterns=[r"(?i)password"],
        max_calls_per_request=10
    )
    
    @govern(policy, audit_trail=audit_log)
    async def search(query: str) -> str:
        """Search documents — governed by policy."""
        return f"Results for: {query}"
    
    # Passes: search("latest quarterly report")
    # Blocked: search("show me the admin password")

    ---

    Pattern 4: Trust Scoring

    Track agent reliability over time with decay-based trust scores:

    from dataclasses import dataclass, field
    import math
    import time
    
    @dataclass
    class TrustScore:
        """Trust score with temporal decay."""
        score: float = 0.5          # 0.0 (untrusted) to 1.0 (fully trusted)
        successes: int = 0
        failures: int = 0
        last_updated: float = field(default_factory=time.time)
    
        def record_success(self, reward: float = 0.05):
            self.successes += 1
            self.score = min(1.0, self.score + reward * (1 - self.score))
            self.last_updated = time.time()
    
        def record_failure(self, penalty: float = 0.15):
            self.failures += 1
            self.score = max(0.0, self.score - penalty * self.score)
            self.last_updated = time.time()
    
        def current(self, decay_rate: float = 0.001) -> float:
            """Get score with temporal decay — trust erodes without activity."""
            elapsed = time.time() - self.last_updated
            decay = math.exp(-decay_rate * elapsed)
            return self.score * decay
    
        @property
        def reliability(self) -> float:
            total = self.successes + self.failures
            return self.successes / total if total > 0 else 0.0
    
    
    # Usage in multi-agent systems
    trust = TrustScore()
    
    # Agent completes tasks successfully
    trust.record_success()  # 0.525
    trust.record_success()  # 0.549
    
    # Agent makes an error
    trust.record_failure()  # 0.467
    
    # Gate sensitive operations on trust
    if trust.current() >= 0.7:
        # Allow autonomous operation
        pass
    elif trust.current() >= 0.4:
        # Allow with human oversight
        pass
    else:
        # Deny or require explicit approval
        pass

    **Multi-agent trust**: In systems where agents delegate to other agents, each agent maintains trust scores for its delegates:

    class AgentTrustRegistry:
        def __init__(self):
            self.scores: dict[str, TrustScore] = {}
    
        def get_trust(self, agent_id: str) -> TrustScore:
            if agent_id not in self.scores:
                self.scores[agent_id] = TrustScore()
            return self.scores[agent_id]
    
        def most_trusted(self, agents: list[str]) -> str:
            return max(agents, key=lambda a: self.get_trust(a).current())
    
        def meets_threshold(self, agent_id: str, threshold: float) -> bool:
            return self.get_trust(agent_id).current() >= threshold

    ---

    Pattern 5: Audit Trail

    Append-only audit log for all agent actions — critical for compliance and debugging:

    from dataclasses import dataclass, field
    import json
    import time
    
    @dataclass
    class AuditEntry:
        timestamp: float
        agent_id: str
        tool_name: str
        action: str           # "allowed", "denied", "error"
        policy_name: str
        details: dict = field(default_factory=dict)
    
    class AuditTrail:
        """Append-only audit trail for agent governance events."""
        def __init__(self):
            self._entries: list[AuditEntry] = []
    
        def log(self, agent_id: str, tool_name: str, action: str,
                policy_name: str, **details):
            self._entries.append(AuditEntry(
                timestamp=time.time(),
                agent_id=agent_id,
                tool_name=tool_name,
                action=action,
                policy_name=policy_name,
                details=details
            ))
    
        def denied(self) -> list[AuditEntry]:
            """Get all denied actions — useful for security review."""
            return [e for e in self._entries if e.action == "denied"]
    
        def by_agent(self, agent_id: str) -> list[AuditEntry]:
            return [e for e in self._entries if e.agent_id == agent_id]
    
        def export_jsonl(self, path: str):
            """Export as JSON Lines for log aggregation systems."""
            with open(path, "w") as f:
                for entry in self._entries:
                    f.write(json.dumps({
                        "timestamp": entry.timestamp,
                        "agent_id": entry.agent_id,
                        "tool": entry.tool_name,
                        "action": entry.action,
                        "policy": entry.policy_name,
                        **entry.details
                    }) + "\n")

    ---

    Pattern 6: Framework Integration

    PydanticAI

    from pydantic_ai import Agent
    
    policy = GovernancePolicy(
        name="support-bot",
        allowed_tools=["search_docs", "create_ticket"],
        blocked_patterns=[r"(?i)(ssn|social\s+security|credit\s+card)"],
        max_calls_per_request=20
    )
    
    agent = Agent("openai:gpt-4o", system_prompt="You are a support assistant.")
    
    @agent.tool
    @govern(policy)
    async def search_docs(ctx, query: str) -> str:
        """Search knowledge base — governed."""
        return await kb.search(query)
    
    @agent.tool
    @govern(policy)
    async def create_ticket(ctx, title: str, body: str) -> str:
        """Create support ticket — governed."""
        return await tickets.create(title=title, body=body)

    CrewAI

    from crewai import Agent, Task, Crew
    
    policy = GovernancePolicy(
        name="research-crew",
        allowed_tools=["search", "analyze"],
        max_calls_per_request=30
    )
    
    # Apply governance at the crew level
    def governed_crew_run(crew: Crew, policy: GovernancePolicy):
        """Wrap crew execution with governance checks."""
        audit = AuditTrail()
        for agent in crew.agents:
            for tool in agent.tools:
                original = tool.func
                tool.func = govern(policy, audit_trail=audit)(original)
        result = crew.kickoff()
        return result, audit

    OpenAI Agents SDK

    from agents import Agent, function_tool
    
    policy = GovernancePolicy(
        name="coding-agent",
        allowed_tools=["read_file", "write_file", "run_tests"],
        blocked_tools=["shell_exec"],
        max_calls_per_request=50
    )
    
    @function_tool
    @govern(policy)
    async def read_file(path: str) -> str:
        """Read file contents — governed."""
        import os
        safe_path = os.path.realpath(path)
        if not safe_path.startswith(os.path.realpath(".")):
            raise ValueError("Path traversal blocked by governance")
        with open(safe_path) as f:
            return f.read()

    ---

    Governance Levels

    Match governance strictness to risk level:

    | Level | Controls | Use Case |

    |-------|----------|----------|

    | **Open** | Audit only, no restrictions | Internal dev/testing |

    | **Standard** | Tool allowlist + content filters | General production agents |

    | **Strict** | All controls + human approval for sensitive ops | Financial, healthcare, legal |

    | **Locked** | Allowlist only, no dynamic tools, full audit | Compliance-critical systems |

    ---

    Best Practices

    | Practice | Rationale |

    |----------|-----------|

    | **Policy as configuration** | Store policies in YAML/JSON, not hardcoded — enables change without deploys |

    | **Most-restrictive-wins** | When composing policies, deny always overrides allow |

    | **Pre-flight intent check** | Classify intent *before* tool execution, not after |

    | **Trust decay** | Trust scores should decay over time — require ongoing good behavior |

    | **Append-only audit** | Never modify or delete audit entries — immutability enables compliance |

    | **Fail closed** | If governance check errors, deny the action rather than allowing it |

    | **Separate policy from logic** | Governance enforcement should be independent of agent business logic |

    ---

    Quick Start Checklist

    ## Agent Governance Implementation Checklist
    
    ### Setup
    - [ ] Define governance policy (allowed tools, blocked patterns, rate limits)
    - [ ] Choose governance level (open/standard/strict/locked)
    - [ ] Set up audit trail storage
    
    ### Implementation
    - [ ] Add @govern decorator to all tool functions
    - [ ] Add intent classification to user input processing
    - [ ] Implement trust scoring for multi-agent interactions
    - [ ] Wire up audit trail export
    
    ### Validation
    - [ ] Test that blocked tools are properly denied
    - [ ] Test that content filters catch sensitive patterns
    - [ ] Test rate limiting behavior
    - [ ] Verify audit trail captures all events
    - [ ] Test policy composition (most-restrictive-wins)

    ---

    Related Resources

  • [Agent-OS Governance Engine](https://github.com/imran-siddique/agent-os) — Full governance framework
  • [AgentMesh Integrations](https://github.com/imran-siddique/agentmesh-integrations) — Framework-specific packages
  • [OWASP Top 10 for LLM Applications](https://owasp.org/www-project-top-10-for-large-language-model-applications/)
  • // Comments
    Sign in with GitHub to leave a comment.
    // Related skills

    More tools from the same signal band