Implementing Runtime Governance Contracts

Status: Draft preview extension for the v1.0 alpha publication surface
Audience: Agent developers, steward implementers
Estimated time: 2-4 hours (incremental adoption)
Prerequisites: Familiarity with ACGP Core Concepts and Messages & Wire Protocol


Overview

This guide assumes you already understand the Runtime Governance Contracts concept and need the implementation path. For the conceptual explanation, see Runtime Governance Contracts. For the normative extension semantics, see the specification.

This guide provides a 6-stage implementation path for adding the Runtime Governance Contracts extension to existing ACGP 1.0 deployments. Each stage is independently deployable, allowing incremental adoption without breaking existing functionality.

In this guide, timeout handling refers to the preview Runtime Governance Contracts evaluation-timeout policy for negotiated latency budgets. It does not replace the core v1.0 profile-failure fallback used when the Steward/session path is unavailable.

Implementation Stages

  1. Capability Announcement - Declare support during version negotiation
  2. Risk Classification - Add risk_level to high-volume actions
  3. Performance Budgets - Set latency budgets and timeout behaviors
  4. Eval Tier Selection - Choose evaluation depth per action
  5. Timeout Handling - Implement evaluation-timeout policy
  6. Monitoring & Tuning - Track performance and optimize contracts

Stage 1: Capability Announcement

Goal: Enable capability negotiation without changing evaluation logic.

Agent-Side Changes

class ACGPAgent:
    def initialize_session(self, steward_url):
        """Negotiate preview extension support using canonical handshake messages."""
        version_negotiation = {
            "protocol": "acgp",
            "protocol_version": "1.0.0",
            "message_type": "VERSION_NEGOTIATION",
            "sender_id": self.agent_id,
            "receiver_id": steward_url,
            "payload": {
                "client_versions": ["1.0.0"],
                "capabilities": {
                    "governance_contracts": True,
                    "intervention_execution_modes": ["passive", "active"],
                },
            }
        }

        response = self.send_message(steward_url, version_negotiation)

        # Store steward's capabilities
        self.steward_capabilities = response["payload"].get("server_capabilities", {})
        self.contracts_enabled = (
            self.steward_capabilities.get("governance_contracts", False)
        )

Steward-Side Changes

class ACGPSteward:
    def handle_version_negotiation(self, envelope):
        """Return negotiated preview-extension capabilities."""
        agent_caps = envelope["payload"].get("capabilities", {})

        response = {
            "protocol": "acgp",
            "protocol_version": "1.0.0",
            "message_type": "VERSION_SELECTED",
            "sender_id": self.steward_id,
            "receiver_id": envelope["sender_id"],
            "payload": {
                "selected_version": "1.0.0",
                "server_capabilities": {
                    "governance_contracts": True,
                    "intervention_execution_modes": ["passive", "active"],
                },
            },
        }

        self.session_state[envelope["sender_id"]] = {
            "contracts_enabled": agent_caps.get("governance_contracts", False),
        }

        return response

Validation: - Both sides announce capabilities - Neither side errors if other doesn't support contracts - Negotiation uses VERSION_NEGOTIATION / VERSION_SELECTED


Stage 2: Risk Classification

Goal: Classify actions by risk level without changing evaluation yet.

Risk Classification Logic

class ActionClassifier:
    """Classify agent actions by risk level."""

    def classify_action(self, action) -> str:
        """
        Returns: "low_risk", "elevated_risk", or "critical_risk"
        """
        # Critical risk: Irreversible, high-impact
        if self._is_critical(action):
            return "critical_risk"

        # Elevated risk: Sensitive operations
        if self._is_elevated(action):
            return "elevated_risk"

        # Low risk: Default for safe operations
        return "low_risk"

    def _is_critical(self, action) -> bool:
        """Check for high-stakes operations."""
        return (
            action.tool in ["transfer_funds", "delete_resource", "grant_access"]
            or (action.tool == "update_record" and action.args.get("value", 0) > 10000)
            or action.requires_approval
        )

    def _is_elevated(self, action) -> bool:
        """Check for sensitive operations."""
        return (
            action.tool in ["update_record", "send_email", "api_call"]
            or action.accesses_pii
            or action.modifies_state
        )

Integration into Agent

class ACGPAgent:
    def __init__(self):
        self.classifier = ActionClassifier()

    def submit_action(self, action):
        """Submit action with risk classification."""
        # Classify risk
        risk_level = self.classifier.classify_action(action)

        # Build evaluation request
        eval_request = {
            "type": "EVAL_REQUEST",
            "trace_id": generate_trace_id(),
            "session_id": self.session_id,
            "action": action.to_dict(),
            "governance_tier": self.governance_tier,
            "protocol_version": "1.0.0"
        }

        # Add governance contract if enabled
        if self.contracts_enabled:
            eval_request["governance_contract"] = {
                "risk_level": risk_level  # NEW: Start with just risk level
            }

        return self.steward.evaluate(eval_request)

Validation: - Actions are classified (log risk_level) - Classification accuracy >90% (manual review sample) - No evaluation logic changed yet


Stage 3: Performance Budgets

Goal: Set latency budgets and fallback behaviors.

Budget Assignment

class ContractBuilder:
    """Build Runtime Governance Contracts with budgets."""

    # Default budgets per risk level (from Runtime Governance Contracts)
    DEFAULT_BUDGETS = {
        "low_risk": {
            "latency_budget_ms": 100,
            "fallback_on_timeout": "deny"
        },
        "elevated_risk": {
            "latency_budget_ms": 300,
            "fallback_on_timeout": "allow_and_log"
        },
        "critical_risk": {
            "latency_budget_ms": 5000,
            "fallback_on_timeout": "escalate"
        }
    }

    def build_contract(self, risk_level: str, custom_budget=None):
        """Build contract with performance budget."""
        budget = custom_budget or self.DEFAULT_BUDGETS[risk_level]

        return {
            "risk_level": risk_level,
            "performance_budget": {
                "latency_budget_ms": budget["latency_budget_ms"],
                "fallback_on_timeout": budget["fallback_on_timeout"]
            }
        }

Fallback Behavior Selection

def select_fallback(self, action, risk_level):
    """Choose fallback behavior based on action characteristics."""

    # Safety-critical: Always deny on timeout
    if action.irreversible or risk_level == "critical_risk":
        return "deny"

    # Repetitive actions: Use cached decision
    if action.is_similar_to_recent():
        return "cached_decision"

    # High-volume reads: Allow and log for async audit
    if action.is_read_only and risk_level == "low_risk":
        return "allow_and_log"

    # Ambiguous cases: Escalate to human
    if action.requires_judgment:
        return "escalate"

    # Default: Conservative deny
    return "deny"

Validation: - Budgets are reasonable for action types - Fallback behaviors align with risk tolerance - Contracts included in EVAL_REQUEST messages


Stage 4: Eval Tier Selection

Goal: Choose evaluation depth based on risk and performance needs.

Tier Selection Strategy

class TierSelector:
    """Select evaluation tier based on risk and constraints."""

    def select_tier(self, risk_level: str, steward_tiers: list) -> int:
        """
        Returns: 0, 1, 2, or 3

        Strategy:
        - low_risk: Prefer Tier 0 (in-memory)
        - elevated_risk: Prefer Tier 1 (DB/cache)
        - critical_risk: Prefer Tier 2+ (model/human)
        """
        if risk_level == "low_risk":
            return 0 if 0 in steward_tiers else min(steward_tiers)

        elif risk_level == "elevated_risk":
            return 1 if 1 in steward_tiers else min(steward_tiers)

        elif risk_level == "critical_risk":
            # Prefer Tier 2, escalate to Tier 3 if needed
            if 2 in steward_tiers:
                return 2
            elif 3 in steward_tiers:
                return 3
            else:
                return max(steward_tiers)  # Use highest available

Complete Contract Builder

class ContractBuilder:
    def __init__(self, steward_capabilities):
        self.available_tiers = steward_capabilities.get("available_eval_tiers", [0, 1])
        self.tier_selector = TierSelector()

    def build_contract(self, risk_level: str, action=None):
        """Build complete governance contract."""
        # Select eval tier
        eval_tier = self.tier_selector.select_tier(risk_level, self.available_tiers)

        # Get budget template
        budget_template = self.DEFAULT_BUDGETS[risk_level].copy()

        # Customize fallback if needed
        if action:
            budget_template["fallback_on_timeout"] = self.select_fallback(action, risk_level)

        return {
            "risk_level": risk_level,
            "eval_tier": eval_tier,  # NEW
            "performance_budget": budget_template
        }

Validation: - Tier selection matches risk level (low→0, elevated→1, critical→2+) - Agent respects steward's available_eval_tiers - Contracts are fully populated in messages


Stage 5: Timeout Handling

Goal: Implement fallback logic when latency budget is exceeded.

Steward-Side Timeout Handling

class GovernanceEvaluator:
    """Evaluate actions with timeout handling."""

    def evaluate_with_contract(self, action, contract):
        """Evaluate action, applying fallback on timeout."""
        budget_ms = contract["performance_budget"]["latency_budget_ms"]
        fallback = contract["performance_budget"]["fallback_on_timeout"]

        start_time = time.time()

        try:
            # Run evaluation with timeout
            decision = self.run_evaluation(
                action,
                eval_tier=contract["eval_tier"],
                timeout_ms=budget_ms
            )

            actual_latency_ms = (time.time() - start_time) * 1000

            return {
                "decision": decision,
                "governance_status": {
                    "contract_honored": actual_latency_ms <= budget_ms,
                    "actual_latency_ms": actual_latency_ms,
                    "budget_used_pct": (actual_latency_ms / budget_ms) * 100,
                    "eval_tier_used": contract["eval_tier"]
                }
            }

        except TimeoutError:
            # Apply fallback
            actual_latency_ms = (time.time() - start_time) * 1000

            return self.apply_fallback(
                action,
                fallback,
                actual_latency_ms,
                budget_ms,
                contract
            )

    def apply_fallback(self, action, fallback, actual_ms, budget_ms, contract):
        """Apply fallback behavior on timeout."""
        if fallback == "deny":
            decision = "deny"
            reason = f"Evaluation timeout ({actual_ms}ms > {budget_ms}ms), fallback=deny"

        elif fallback == "allow_and_log":
            decision = "allow"
            reason = f"Timeout, fallback=allow_and_log, async audit queued"
            self.queue_async_audit(action, contract)

        elif fallback == "cached_decision":
            decision = self.get_cached_decision(action)
            reason = f"Timeout, using cached decision: {decision}"

        elif fallback == "escalate":
            decision = "escalate"
            reason = f"Timeout, fallback=escalate, awaiting human review"
            self.queue_human_review(action, contract)

        return {
            "decision": decision,
            "reason": reason,
            "governance_status": {
                "contract_honored": True,  # Fallback was honored
                "actual_latency_ms": actual_ms,
                "fallback_used": fallback,
                "timeout": True
            }
        }

Agent-Side Timeout Handling

class ACGPAgent:
    def handle_response(self, response):
        """Handle evaluation response with timeout awareness."""
        governance_status = response.get("governance_status", {})

        if governance_status.get("timeout"):
            # Log timeout event
            self.logger.warning(
                f"Governance timeout: {governance_status['actual_latency_ms']}ms "
                f"(budget: {governance_status.get('budget_ms')}ms), "
                f"fallback: {governance_status['fallback_used']}"
            )

            # Handle specific fallbacks
            if governance_status["fallback_used"] == "escalate":
                return self.wait_for_human_decision(response["trace_id"])

        # Normal decision handling
        return self.handle_decision(response["decision"])

Validation: - Timeouts trigger correct fallback behaviors - governance_status populated in all responses - Async audits queued for allow_and_log - Human review queue for escalate


Stage 6: Monitoring & Tuning

Goal: Measure effectiveness and optimize contracts.

Key Metrics to Track

class ContractMetrics:
    """Track governance contract performance."""

    def collect_metrics(self):
        """Collect metrics from Governance Store audit logs."""
        return {
            # Latency metrics
            "avg_latency_by_risk_level": self.query_avg_latency(),
            "p95_latency_by_tier": self.query_p95_latency(),
            "timeout_rate_by_fallback": self.query_timeout_rate(),

            # Contract effectiveness
            "budget_utilization_pct": self.query_budget_usage(),
            "fallback_trigger_rate": self.query_fallback_rate(),
            "contract_override_rate": self.query_override_rate(),

            # Cost metrics
            "tier_2_usage_rate": self.query_tier_usage(2),
            "tier_3_usage_rate": self.query_tier_usage(3),
            "estimated_monthly_cost": self.calculate_cost()
        }

Dashboard Queries (SQL)

-- Average latency by risk level
SELECT 
    governance_contract->>'risk_level' as risk_level,
    AVG(governance_status->>'actual_latency_ms') as avg_latency_ms,
    PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY (governance_status->>'actual_latency_ms')::int) as p95_latency_ms
FROM reflection_db
WHERE governance_contract IS NOT NULL
GROUP BY governance_contract->>'risk_level';

-- Timeout rate by fallback behavior
SELECT 
    governance_contract->'performance_budget'->>'fallback_on_timeout' as fallback,
    COUNT(*) FILTER (WHERE governance_status->>'timeout' = 'true') as timeout_count,
    COUNT(*) as total_count,
    (COUNT(*) FILTER (WHERE governance_status->>'timeout' = 'true')::float / COUNT(*)) * 100 as timeout_rate_pct
FROM reflection_db
WHERE governance_contract IS NOT NULL
GROUP BY fallback;

-- Eval tier usage distribution
SELECT 
    governance_contract->>'eval_tier' as eval_tier,
    COUNT(*) as usage_count,
    AVG((governance_status->>'actual_latency_ms')::int) as avg_latency_ms
FROM reflection_db
WHERE governance_contract IS NOT NULL
GROUP BY eval_tier
ORDER BY eval_tier;

Optimization Strategies

def optimize_contracts(self, metrics):
    """Adjust contracts based on production data."""

    # 1. Increase budgets if timeout rate >5%
    for risk_level, timeout_rate in metrics["timeout_rate_by_risk"].items():
        if timeout_rate > 0.05:
            current_budget = self.DEFAULT_BUDGETS[risk_level]["latency_budget_ms"]
            new_budget = int(current_budget * 1.2)  # +20%
            self.logger.info(f"Increasing {risk_level} budget: {current_budget}ms → {new_budget}ms")
            self.DEFAULT_BUDGETS[risk_level]["latency_budget_ms"] = new_budget

    # 2. Downgrade tier if budget utilization <50%
    for tier, utilization in metrics["budget_utilization_by_tier"].items():
        if utilization < 0.5 and tier > 0:
            self.logger.info(f"Tier {tier} underutilized ({utilization*100:.1f}%), consider Tier {tier-1}")

    # 3. Switch to cached_decision if repetitive actions
    for action_type, repetition_rate in metrics["action_repetition_rate"].items():
        if repetition_rate > 0.7:  # 70% of actions are repeats
            self.logger.info(f"Action {action_type} is repetitive, using cached_decision fallback")
            self.custom_fallbacks[action_type] = "cached_decision"

Validation: - Metrics dashboard is live - Timeout rate <5% for all risk levels - Budget utilization 60-80% (not too tight, not too loose) - Cost reduction measured (compare to uniform Tier 2 baseline)


Complete Example: Customer Service Bot

class CustomerServiceBot(ACGPAgent):
    """Full implementation with Runtime Governance Contracts."""

    def __init__(self, steward_url):
        super().__init__()
        self.steward_url = steward_url
        self.classifier = ActionClassifier()

        # Sync with steward
        self.sync_with_steward(steward_url)

        # Initialize contract builder
        self.contract_builder = ContractBuilder(self.steward_capabilities)

    def handle_customer_request(self, request):
        """Process customer request with Runtime Governance Contracts."""

        # 1. Classify action risk
        risk_level = self.classifier.classify_action(request)

        # 2. Build governance contract
        contract = self.contract_builder.build_contract(risk_level, action=request)

        # 3. Submit for evaluation
        eval_request = {
            "type": "EVAL_REQUEST",
            "trace_id": generate_trace_id(),
            "session_id": self.session_id,
            "action": request.to_dict(),
            "governance_tier": self.governance_tier,
            "governance_contract": contract
        }

        start_time = time.time()
        response = self.send_message(self.steward_url, eval_request)
        e2e_latency_ms = (time.time() - start_time) * 1000

        # 4. Handle response
        decision = response["decision"]
        governance_status = response.get("governance_status", {})

        # Log metrics
        self.metrics.record({
            "risk_level": risk_level,
            "eval_tier": contract["eval_tier"],
            "budget_ms": contract["performance_budget"]["latency_budget_ms"],
            "actual_latency_ms": e2e_latency_ms,
            "steward_latency_ms": governance_status.get("actual_latency_ms"),
            "timeout": governance_status.get("timeout", False),
            "decision": decision
        })

        # 5. Execute or block
        if decision == "allow":
            return request.execute()
        elif decision == "escalate":
            return self.queue_for_human_review(request)
        else:
            return f"Action blocked: {response.get('reason')}"

# Usage
bot = CustomerServiceBot("https://steward.example.com")

# Low-risk: <100ms
bot.handle_customer_request(GetBalanceRequest(account_id="12345"))

# Elevated-risk: <300ms
bot.handle_customer_request(UpdateAddressRequest(account_id="12345", new_address="..."))

# Critical-risk: <5s with human review
bot.handle_customer_request(RefundRequest(account_id="12345", amount=5000))

Troubleshooting

Issue: Timeouts >10%

Symptoms: Most actions exceeding latency budget

Diagnosis:

SELECT 
    governance_contract->>'eval_tier' as tier,
    AVG((governance_status->>'actual_latency_ms')::int) as avg_latency,
    governance_contract->'performance_budget'->>'latency_budget_ms' as budget
FROM reflection_db
WHERE governance_status->>'timeout' = 'true'
GROUP BY tier, budget;

Solutions: 1. Increase budgets (e.g., 100ms → 150ms for Tier 0) 2. Downgrade eval tier (Tier 1 → Tier 0 for low-risk) 3. Optimize steward evaluation code 4. Add caching for repetitive actions


Issue: High Tier ⅔ Usage

Symptoms: Costs higher than expected

Diagnosis:

SELECT 
    governance_contract->>'risk_level' as risk,
    governance_contract->>'eval_tier' as tier,
    COUNT(*) as usage_count
FROM reflection_db
GROUP BY risk, tier
ORDER BY usage_count DESC;

Solutions: 1. Reclassify actions (critical → elevated where safe) 2. Use Hybrid pattern (async Tier 2 instead of sync) 3. Increase Tier 0/1 coverage with better rules


Issue: Capability Negotiation Fails

Symptoms: governance_contracts: false in VERSION_SELECTED.payload.server_capabilities

Solutions: 1. Verify the steward advertises the preview extension in VERSION_SELECTED 2. Check steward config: enable_governance_contracts: true 3. Update steward to support Runtime Governance Contracts


Testing Checklist

Before deploying to production:

  • [] Unit tests for risk classification (>90% accuracy)
  • [] Unit tests for contract building (all risk levels)
  • [] Unit tests for fallback selection logic
  • [] Integration test: VERSION_NEGOTIATION + VERSION_SELECTED capability negotiation
  • [] Integration test: EVAL_REQUEST with governance_contract
  • [] Integration test: Timeout triggers correct fallback
  • [] Load test: Latency budgets met at p95 (1000 req/s)
  • [] Chaos test: Steward unavailable → graceful degradation
  • [] Monitoring: Metrics dashboard functional
  • [] Audit: Governance Store logs include governance_status

Performance Benchmarks

Expected results after full implementation:

Metric Before (ACGP 1.0) After (ACGP 1.1) Improvement
P95 latency (low-risk) 300ms 100ms 67% faster
P95 latency (elevated) 300ms 250ms 17% faster
P95 latency (critical) 300ms 2000ms Slower (intentional)
Monthly cost $10,000 $4,000 60% cheaper
Timeout rate N/A <5% N/A

Next Steps

  1. Start with Stage 1 (capability announcement) - no risk, enables future stages
  2. Pilot Stage 2-3 on low-risk actions only (e.g., read operations)
  3. Monitor Stage 4-5 for 1 week before expanding to elevated/critical
  4. Optimize Stage 6 based on production metrics
  5. Read the extension spec for advanced features: Runtime Governance Contracts

Additional Resources