Implementing Governance Contracts

Status: ACGP 1.1 (Optional Extension)
Audience: Agent developers, steward implementers
Estimated time: 2-4 hours (incremental adoption)
Prerequisites: Familiarity with ACGP Core and Message Formats


Overview

This guide provides a 6-stage implementation path for adding Governance Contracts (ACGP-1010) to existing ACGP 1.0 deployments. Each stage is independently deployable, allowing incremental adoption without breaking existing functionality.

Implementation Stages

  1. Capability Announcement - Declare support in SYNC handshake
  2. Risk Classification - Add risk_level to high-volume actions
  3. Performance Budgets - Set latency budgets and fallback behaviors
  4. Eval Tier Selection - Choose evaluation depth per action
  5. Timeout Handling - Implement fallback logic
  6. Monitoring & Tuning - Track performance and optimize contracts

Stage 1: Capability Announcement

Goal: Enable capability negotiation without changing evaluation logic.

Agent-Side Changes

class ACGPAgent:
    def sync_with_steward(self, steward_url):
        """Announce ACGP 1.1 support during SYNC."""
        sync_message = {
            "type": "SYNC",
            "protocol_version": "1.1.0",  # Bump from 1.0.x
            "agent_id": self.agent_id,
            "acl_tier": self.acl_tier,
            "capabilities": {
                "supports_governance_contracts": True,  # NEW
                "max_eval_tier": 2  # NEW: Agent can wait up to Tier 2
            }
        }

        response = self.send_message(steward_url, sync_message)

        # Store steward's capabilities
        self.steward_capabilities = response.get("capabilities", {})
        self.contracts_enabled = (
            self.steward_capabilities.get("supports_governance_contracts", False)
        )

Steward-Side Changes

class ACGPSteward:
    def handle_sync(self, sync_message):
        """Respond with steward capabilities."""
        agent_caps = sync_message.get("capabilities", {})

        sync_ack = {
            "type": "SYNC_ACK",
            "protocol_version": "1.1.0",
            "session_id": generate_session_id(),
            "capabilities": {
                "supports_governance_contracts": True,  # NEW
                "available_eval_tiers": [0, 1, 2, 3]  # NEW: All tiers available
            }
        }

        # Negotiate common ground
        self.session_state[sync_ack["session_id"]] = {
            "contracts_enabled": agent_caps.get("supports_governance_contracts", False),
            "max_eval_tier": min(
                agent_caps.get("max_eval_tier", 3),
                max(self.available_eval_tiers)
            )
        }

        return sync_ack

Validation: - Both sides announce capabilities - Neither side errors if other doesn't support contracts - Protocol version bumped to 1.1.0


Stage 2: Risk Classification

Goal: Classify actions by risk level without changing evaluation yet.

Risk Classification Logic

class ActionClassifier:
    """Classify agent actions by risk level."""

    def classify_action(self, action) -> str:
        """
        Returns: "low_risk", "elevated_risk", or "critical_risk"
        """
        # Critical risk: Irreversible, high-impact
        if self._is_critical(action):
            return "critical_risk"

        # Elevated risk: Sensitive operations
        if self._is_elevated(action):
            return "elevated_risk"

        # Low risk: Default for safe operations
        return "low_risk"

    def _is_critical(self, action) -> bool:
        """Check for high-stakes operations."""
        return (
            action.tool in ["transfer_funds", "delete_resource", "grant_access"]
            or (action.tool == "update_record" and action.args.get("value", 0) > 10000)
            or action.requires_approval
        )

    def _is_elevated(self, action) -> bool:
        """Check for sensitive operations."""
        return (
            action.tool in ["update_record", "send_email", "api_call"]
            or action.accesses_pii
            or action.modifies_state
        )

Integration into Agent

class ACGPAgent:
    def __init__(self):
        self.classifier = ActionClassifier()

    def submit_action(self, action):
        """Submit action with risk classification."""
        # Classify risk
        risk_level = self.classifier.classify_action(action)

        # Build evaluation request
        eval_request = {
            "type": "EVAL_REQUEST",
            "trace_id": generate_trace_id(),
            "session_id": self.session_id,
            "action": action.to_dict(),
            "acl_tier": self.acl_tier,
            "protocol_version": "1.1.0"
        }

        # Add governance contract if enabled
        if self.contracts_enabled:
            eval_request["governance_contract"] = {
                "risk_level": risk_level  # NEW: Start with just risk level
            }

        return self.steward.evaluate(eval_request)

Validation: - Actions are classified (log risk_level) - Classification accuracy >90% (manual review sample) - No evaluation logic changed yet


Stage 3: Performance Budgets

Goal: Set latency budgets and fallback behaviors.

Budget Assignment

class ContractBuilder:
    """Build governance contracts with budgets."""

    # Default budgets per risk level (from ACGP-1010)
    DEFAULT_BUDGETS = {
        "low_risk": {
            "latency_budget_ms": 100,
            "fallback_behavior": "deny"
        },
        "elevated_risk": {
            "latency_budget_ms": 300,
            "fallback_behavior": "allow_and_log"
        },
        "critical_risk": {
            "latency_budget_ms": 5000,
            "fallback_behavior": "escalate"
        }
    }

    def build_contract(self, risk_level: str, custom_budget=None):
        """Build contract with performance budget."""
        budget = custom_budget or self.DEFAULT_BUDGETS[risk_level]

        return {
            "risk_level": risk_level,
            "performance_budget": {
                "latency_budget_ms": budget["latency_budget_ms"],
                "fallback_behavior": budget["fallback_behavior"]
            }
        }

Fallback Behavior Selection

def select_fallback(self, action, risk_level):
    """Choose fallback behavior based on action characteristics."""

    # Safety-critical: Always deny on timeout
    if action.irreversible or risk_level == "critical_risk":
        return "deny"

    # Repetitive actions: Use cached decision
    if action.is_similar_to_recent():
        return "cached_decision"

    # High-volume reads: Allow and log for async audit
    if action.is_read_only and risk_level == "low_risk":
        return "allow_and_log"

    # Ambiguous cases: Escalate to human
    if action.requires_judgment:
        return "escalate"

    # Default: Conservative deny
    return "deny"

Validation: - Budgets are reasonable for action types - Fallback behaviors align with risk tolerance - Contracts included in EVAL_REQUEST messages


Stage 4: Eval Tier Selection

Goal: Choose evaluation depth based on risk and performance needs.

Tier Selection Strategy

class TierSelector:
    """Select evaluation tier based on risk and constraints."""

    def select_tier(self, risk_level: str, steward_tiers: list) -> int:
        """
        Returns: 0, 1, 2, or 3

        Strategy:
        - low_risk: Prefer Tier 0 (in-memory)
        - elevated_risk: Prefer Tier 1 (DB/cache)
        - critical_risk: Prefer Tier 2+ (model/human)
        """
        if risk_level == "low_risk":
            return 0 if 0 in steward_tiers else min(steward_tiers)

        elif risk_level == "elevated_risk":
            return 1 if 1 in steward_tiers else min(steward_tiers)

        elif risk_level == "critical_risk":
            # Prefer Tier 2, escalate to Tier 3 if needed
            if 2 in steward_tiers:
                return 2
            elif 3 in steward_tiers:
                return 3
            else:
                return max(steward_tiers)  # Use highest available

Complete Contract Builder

class ContractBuilder:
    def __init__(self, steward_capabilities):
        self.available_tiers = steward_capabilities.get("available_eval_tiers", [0, 1])
        self.tier_selector = TierSelector()

    def build_contract(self, risk_level: str, action=None):
        """Build complete governance contract."""
        # Select eval tier
        eval_tier = self.tier_selector.select_tier(risk_level, self.available_tiers)

        # Get budget template
        budget_template = self.DEFAULT_BUDGETS[risk_level].copy()

        # Customize fallback if needed
        if action:
            budget_template["fallback_behavior"] = self.select_fallback(action, risk_level)

        return {
            "risk_level": risk_level,
            "eval_tier": eval_tier,  # NEW
            "performance_budget": budget_template
        }

Validation: - Tier selection matches risk level (low→0, elevated→1, critical→2+) - Agent respects steward's available_eval_tiers - Contracts are fully populated in messages


Stage 5: Timeout Handling

Goal: Implement fallback logic when latency budget is exceeded.

Steward-Side Timeout Handling

class GovernanceEvaluator:
    """Evaluate actions with timeout handling."""

    def evaluate_with_contract(self, action, contract):
        """Evaluate action, applying fallback on timeout."""
        budget_ms = contract["performance_budget"]["latency_budget_ms"]
        fallback = contract["performance_budget"]["fallback_behavior"]

        start_time = time.time()

        try:
            # Run evaluation with timeout
            decision = self.run_evaluation(
                action,
                eval_tier=contract["eval_tier"],
                timeout_ms=budget_ms
            )

            actual_latency_ms = (time.time() - start_time) * 1000

            return {
                "decision": decision,
                "governance_status": {
                    "contract_honored": actual_latency_ms <= budget_ms,
                    "actual_latency_ms": actual_latency_ms,
                    "budget_used_pct": (actual_latency_ms / budget_ms) * 100,
                    "eval_tier_used": contract["eval_tier"]
                }
            }

        except TimeoutError:
            # Apply fallback
            actual_latency_ms = (time.time() - start_time) * 1000

            return self.apply_fallback(
                action,
                fallback,
                actual_latency_ms,
                budget_ms,
                contract
            )

    def apply_fallback(self, action, fallback, actual_ms, budget_ms, contract):
        """Apply fallback behavior on timeout."""
        if fallback == "deny":
            decision = "deny"
            reason = f"Evaluation timeout ({actual_ms}ms > {budget_ms}ms), fallback=deny"

        elif fallback == "allow_and_log":
            decision = "allow"
            reason = f"Timeout, fallback=allow_and_log, async audit queued"
            self.queue_async_audit(action, contract)

        elif fallback == "cached_decision":
            decision = self.get_cached_decision(action)
            reason = f"Timeout, using cached decision: {decision}"

        elif fallback == "escalate":
            decision = "escalate"
            reason = f"Timeout, fallback=escalate, awaiting human review"
            self.queue_human_review(action, contract)

        return {
            "decision": decision,
            "reason": reason,
            "governance_status": {
                "contract_honored": True,  # Fallback was honored
                "actual_latency_ms": actual_ms,
                "fallback_used": fallback,
                "timeout": True
            }
        }

Agent-Side Timeout Handling

class ACGPAgent:
    def handle_response(self, response):
        """Handle evaluation response with timeout awareness."""
        governance_status = response.get("governance_status", {})

        if governance_status.get("timeout"):
            # Log timeout event
            self.logger.warning(
                f"Governance timeout: {governance_status['actual_latency_ms']}ms "
                f"(budget: {governance_status.get('budget_ms')}ms), "
                f"fallback: {governance_status['fallback_used']}"
            )

            # Handle specific fallbacks
            if governance_status["fallback_used"] == "escalate":
                return self.wait_for_human_decision(response["trace_id"])

        # Normal decision handling
        return self.handle_decision(response["decision"])

Validation: - Timeouts trigger correct fallback behaviors - governance_status populated in all responses - Async audits queued for allow_and_log - Human review queue for escalate


Stage 6: Monitoring & Tuning

Goal: Measure effectiveness and optimize contracts.

Key Metrics to Track

class ContractMetrics:
    """Track governance contract performance."""

    def collect_metrics(self):
        """Collect metrics from ReflectionDB audit logs."""
        return {
            # Latency metrics
            "avg_latency_by_risk_level": self.query_avg_latency(),
            "p95_latency_by_tier": self.query_p95_latency(),
            "timeout_rate_by_fallback": self.query_timeout_rate(),

            # Contract effectiveness
            "budget_utilization_pct": self.query_budget_usage(),
            "fallback_trigger_rate": self.query_fallback_rate(),
            "contract_override_rate": self.query_override_rate(),

            # Cost metrics
            "tier_2_usage_rate": self.query_tier_usage(2),
            "tier_3_usage_rate": self.query_tier_usage(3),
            "estimated_monthly_cost": self.calculate_cost()
        }

Dashboard Queries (SQL)

-- Average latency by risk level
SELECT 
    governance_contract->>'risk_level' as risk_level,
    AVG(governance_status->>'actual_latency_ms') as avg_latency_ms,
    PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY (governance_status->>'actual_latency_ms')::int) as p95_latency_ms
FROM reflection_db
WHERE governance_contract IS NOT NULL
GROUP BY governance_contract->>'risk_level';

-- Timeout rate by fallback behavior
SELECT 
    governance_contract->'performance_budget'->>'fallback_behavior' as fallback,
    COUNT(*) FILTER (WHERE governance_status->>'timeout' = 'true') as timeout_count,
    COUNT(*) as total_count,
    (COUNT(*) FILTER (WHERE governance_status->>'timeout' = 'true')::float / COUNT(*)) * 100 as timeout_rate_pct
FROM reflection_db
WHERE governance_contract IS NOT NULL
GROUP BY fallback;

-- Eval tier usage distribution
SELECT 
    governance_contract->>'eval_tier' as eval_tier,
    COUNT(*) as usage_count,
    AVG((governance_status->>'actual_latency_ms')::int) as avg_latency_ms
FROM reflection_db
WHERE governance_contract IS NOT NULL
GROUP BY eval_tier
ORDER BY eval_tier;

Optimization Strategies

def optimize_contracts(self, metrics):
    """Adjust contracts based on production data."""

    # 1. Increase budgets if timeout rate >5%
    for risk_level, timeout_rate in metrics["timeout_rate_by_risk"].items():
        if timeout_rate > 0.05:
            current_budget = self.DEFAULT_BUDGETS[risk_level]["latency_budget_ms"]
            new_budget = int(current_budget * 1.2)  # +20%
            self.logger.info(f"Increasing {risk_level} budget: {current_budget}ms → {new_budget}ms")
            self.DEFAULT_BUDGETS[risk_level]["latency_budget_ms"] = new_budget

    # 2. Downgrade tier if budget utilization <50%
    for tier, utilization in metrics["budget_utilization_by_tier"].items():
        if utilization < 0.5 and tier > 0:
            self.logger.info(f"Tier {tier} underutilized ({utilization*100:.1f}%), consider Tier {tier-1}")

    # 3. Switch to cached_decision if repetitive actions
    for action_type, repetition_rate in metrics["action_repetition_rate"].items():
        if repetition_rate > 0.7:  # 70% of actions are repeats
            self.logger.info(f"Action {action_type} is repetitive, using cached_decision fallback")
            self.custom_fallbacks[action_type] = "cached_decision"

Validation: - Metrics dashboard is live - Timeout rate <5% for all risk levels - Budget utilization 60-80% (not too tight, not too loose) - Cost reduction measured (compare to uniform Tier 2 baseline)


Complete Example: Customer Service Bot

class CustomerServiceBot(ACGPAgent):
    """Full implementation with governance contracts."""

    def __init__(self, steward_url):
        super().__init__()
        self.steward_url = steward_url
        self.classifier = ActionClassifier()

        # Sync with steward
        self.sync_with_steward(steward_url)

        # Initialize contract builder
        self.contract_builder = ContractBuilder(self.steward_capabilities)

    def handle_customer_request(self, request):
        """Process customer request with governance contracts."""

        # 1. Classify action risk
        risk_level = self.classifier.classify_action(request)

        # 2. Build governance contract
        contract = self.contract_builder.build_contract(risk_level, action=request)

        # 3. Submit for evaluation
        eval_request = {
            "type": "EVAL_REQUEST",
            "trace_id": generate_trace_id(),
            "session_id": self.session_id,
            "action": request.to_dict(),
            "acl_tier": self.acl_tier,
            "protocol_version": "1.1.0",
            "governance_contract": contract
        }

        start_time = time.time()
        response = self.send_message(self.steward_url, eval_request)
        e2e_latency_ms = (time.time() - start_time) * 1000

        # 4. Handle response
        decision = response["decision"]
        governance_status = response.get("governance_status", {})

        # Log metrics
        self.metrics.record({
            "risk_level": risk_level,
            "eval_tier": contract["eval_tier"],
            "budget_ms": contract["performance_budget"]["latency_budget_ms"],
            "actual_latency_ms": e2e_latency_ms,
            "steward_latency_ms": governance_status.get("actual_latency_ms"),
            "timeout": governance_status.get("timeout", False),
            "decision": decision
        })

        # 5. Execute or block
        if decision == "allow":
            return request.execute()
        elif decision == "escalate":
            return self.queue_for_human_review(request)
        else:
            return f"Action blocked: {response.get('reason')}"

# Usage
bot = CustomerServiceBot("https://steward.example.com")

# Low-risk: <100ms
bot.handle_customer_request(GetBalanceRequest(account_id="12345"))

# Elevated-risk: <300ms
bot.handle_customer_request(UpdateAddressRequest(account_id="12345", new_address="..."))

# Critical-risk: <5s with human review
bot.handle_customer_request(RefundRequest(account_id="12345", amount=5000))

Troubleshooting

Issue: Timeouts >10%

Symptoms: Most actions exceeding latency budget

Diagnosis:

SELECT 
    governance_contract->>'eval_tier' as tier,
    AVG((governance_status->>'actual_latency_ms')::int) as avg_latency,
    governance_contract->'performance_budget'->>'latency_budget_ms' as budget
FROM reflection_db
WHERE governance_status->>'timeout' = 'true'
GROUP BY tier, budget;

Solutions: 1. Increase budgets (e.g., 100ms → 150ms for Tier 0) 2. Downgrade eval tier (Tier 1 → Tier 0 for low-risk) 3. Optimize steward evaluation code 4. Add caching for repetitive actions


Issue: High Tier ⅔ Usage

Symptoms: Costs higher than expected

Diagnosis:

SELECT 
    governance_contract->>'risk_level' as risk,
    governance_contract->>'eval_tier' as tier,
    COUNT(*) as usage_count
FROM reflection_db
GROUP BY risk, tier
ORDER BY usage_count DESC;

Solutions: 1. Reclassify actions (critical → elevated where safe) 2. Use Hybrid pattern (async Tier 2 instead of sync) 3. Increase Tier 0/1 coverage with better rules


Issue: Capability Negotiation Fails

Symptoms: supports_governance_contracts: false in SYNC_ACK

Solutions: 1. Verify steward ACGP version ≥1.1.0 2. Check steward config: enable_governance_contracts: true 3. Update steward to support ACGP-1010


Testing Checklist

Before deploying to production:

  • [] Unit tests for risk classification (>90% accuracy)
  • [] Unit tests for contract building (all risk levels)
  • [] Unit tests for fallback selection logic
  • [] Integration test: SYNC capability negotiation
  • [] Integration test: EVAL_REQUEST with governance_contract
  • [] Integration test: Timeout triggers correct fallback
  • [] Load test: Latency budgets met at p95 (1000 req/s)
  • [] Chaos test: Steward unavailable → graceful degradation
  • [] Monitoring: Metrics dashboard functional
  • [] Audit: ReflectionDB logs include governance_status

Performance Benchmarks

Expected results after full implementation:

Metric Before (ACGP 1.0) After (ACGP 1.1) Improvement
P95 latency (low-risk) 300ms 100ms 67% faster
P95 latency (elevated) 300ms 250ms 17% faster
P95 latency (critical) 300ms 2000ms Slower (intentional)
Monthly cost $10,000 $4,000 60% cheaper
Timeout rate N/A <5% N/A

Next Steps

  1. Start with Stage 1 (capability announcement) - no risk, enables future stages
  2. Pilot Stage 2-3 on low-risk actions only (e.g., read operations)
  3. Monitor Stage 4-5 for 1 week before expanding to elevated/critical
  4. Optimize Stage 6 based on production metrics
  5. Read the spec for advanced features: ACGP-1010

Additional Resources