Implementing Runtime Governance Contracts¶
Status: Draft preview extension for the v1.0 alpha publication surface
Audience: Agent developers, steward implementers
Estimated time: 2-4 hours (incremental adoption)
Prerequisites: Familiarity with ACGP Core Concepts and Messages & Wire Protocol
Overview¶
This guide assumes you already understand the Runtime Governance Contracts concept and need the implementation path. For the conceptual explanation, see Runtime Governance Contracts. For the normative extension semantics, see the specification.
This guide provides a 6-stage implementation path for adding the Runtime Governance Contracts extension to existing ACGP 1.0 deployments. Each stage is independently deployable, allowing incremental adoption without breaking existing functionality.
In this guide, timeout handling refers to the preview Runtime Governance Contracts evaluation-timeout policy for negotiated latency budgets. It does not replace the core v1.0 profile-failure fallback used when the Steward/session path is unavailable.
Implementation Stages¶
- Capability Announcement - Declare support during version negotiation
- Risk Classification - Add risk_level to high-volume actions
- Performance Budgets - Set latency budgets and timeout behaviors
- Eval Tier Selection - Choose evaluation depth per action
- Timeout Handling - Implement evaluation-timeout policy
- Monitoring & Tuning - Track performance and optimize contracts
Stage 1: Capability Announcement¶
Goal: Enable capability negotiation without changing evaluation logic.
Agent-Side Changes¶
class ACGPAgent:
def initialize_session(self, steward_url):
"""Negotiate preview extension support using canonical handshake messages."""
version_negotiation = {
"protocol": "acgp",
"protocol_version": "1.0.0",
"message_type": "VERSION_NEGOTIATION",
"sender_id": self.agent_id,
"receiver_id": steward_url,
"payload": {
"client_versions": ["1.0.0"],
"capabilities": {
"governance_contracts": True,
"intervention_execution_modes": ["passive", "active"],
},
}
}
response = self.send_message(steward_url, version_negotiation)
# Store steward's capabilities
self.steward_capabilities = response["payload"].get("server_capabilities", {})
self.contracts_enabled = (
self.steward_capabilities.get("governance_contracts", False)
)
Steward-Side Changes¶
class ACGPSteward:
def handle_version_negotiation(self, envelope):
"""Return negotiated preview-extension capabilities."""
agent_caps = envelope["payload"].get("capabilities", {})
response = {
"protocol": "acgp",
"protocol_version": "1.0.0",
"message_type": "VERSION_SELECTED",
"sender_id": self.steward_id,
"receiver_id": envelope["sender_id"],
"payload": {
"selected_version": "1.0.0",
"server_capabilities": {
"governance_contracts": True,
"intervention_execution_modes": ["passive", "active"],
},
},
}
self.session_state[envelope["sender_id"]] = {
"contracts_enabled": agent_caps.get("governance_contracts", False),
}
return response
Validation:
- Both sides announce capabilities
- Neither side errors if other doesn't support contracts
- Negotiation uses VERSION_NEGOTIATION / VERSION_SELECTED
Stage 2: Risk Classification¶
Goal: Classify actions by risk level without changing evaluation yet.
Risk Classification Logic¶
class ActionClassifier:
"""Classify agent actions by risk level."""
def classify_action(self, action) -> str:
"""
Returns: "low_risk", "elevated_risk", or "critical_risk"
"""
# Critical risk: Irreversible, high-impact
if self._is_critical(action):
return "critical_risk"
# Elevated risk: Sensitive operations
if self._is_elevated(action):
return "elevated_risk"
# Low risk: Default for safe operations
return "low_risk"
def _is_critical(self, action) -> bool:
"""Check for high-stakes operations."""
return (
action.tool in ["transfer_funds", "delete_resource", "grant_access"]
or (action.tool == "update_record" and action.args.get("value", 0) > 10000)
or action.requires_approval
)
def _is_elevated(self, action) -> bool:
"""Check for sensitive operations."""
return (
action.tool in ["update_record", "send_email", "api_call"]
or action.accesses_pii
or action.modifies_state
)
Integration into Agent¶
class ACGPAgent:
def __init__(self):
self.classifier = ActionClassifier()
def submit_action(self, action):
"""Submit action with risk classification."""
# Classify risk
risk_level = self.classifier.classify_action(action)
# Build evaluation request
eval_request = {
"type": "EVAL_REQUEST",
"trace_id": generate_trace_id(),
"session_id": self.session_id,
"action": action.to_dict(),
"governance_tier": self.governance_tier,
"protocol_version": "1.0.0"
}
# Add governance contract if enabled
if self.contracts_enabled:
eval_request["governance_contract"] = {
"risk_level": risk_level # NEW: Start with just risk level
}
return self.steward.evaluate(eval_request)
Validation: - Actions are classified (log risk_level) - Classification accuracy >90% (manual review sample) - No evaluation logic changed yet
Stage 3: Performance Budgets¶
Goal: Set latency budgets and fallback behaviors.
Budget Assignment¶
class ContractBuilder:
"""Build Runtime Governance Contracts with budgets."""
# Default budgets per risk level (from Runtime Governance Contracts)
DEFAULT_BUDGETS = {
"low_risk": {
"latency_budget_ms": 100,
"fallback_on_timeout": "deny"
},
"elevated_risk": {
"latency_budget_ms": 300,
"fallback_on_timeout": "allow_and_log"
},
"critical_risk": {
"latency_budget_ms": 5000,
"fallback_on_timeout": "escalate"
}
}
def build_contract(self, risk_level: str, custom_budget=None):
"""Build contract with performance budget."""
budget = custom_budget or self.DEFAULT_BUDGETS[risk_level]
return {
"risk_level": risk_level,
"performance_budget": {
"latency_budget_ms": budget["latency_budget_ms"],
"fallback_on_timeout": budget["fallback_on_timeout"]
}
}
Fallback Behavior Selection¶
def select_fallback(self, action, risk_level):
"""Choose fallback behavior based on action characteristics."""
# Safety-critical: Always deny on timeout
if action.irreversible or risk_level == "critical_risk":
return "deny"
# Repetitive actions: Use cached decision
if action.is_similar_to_recent():
return "cached_decision"
# High-volume reads: Allow and log for async audit
if action.is_read_only and risk_level == "low_risk":
return "allow_and_log"
# Ambiguous cases: Escalate to human
if action.requires_judgment:
return "escalate"
# Default: Conservative deny
return "deny"
Validation: - Budgets are reasonable for action types - Fallback behaviors align with risk tolerance - Contracts included in EVAL_REQUEST messages
Stage 4: Eval Tier Selection¶
Goal: Choose evaluation depth based on risk and performance needs.
Tier Selection Strategy¶
class TierSelector:
"""Select evaluation tier based on risk and constraints."""
def select_tier(self, risk_level: str, steward_tiers: list) -> int:
"""
Returns: 0, 1, 2, or 3
Strategy:
- low_risk: Prefer Tier 0 (in-memory)
- elevated_risk: Prefer Tier 1 (DB/cache)
- critical_risk: Prefer Tier 2+ (model/human)
"""
if risk_level == "low_risk":
return 0 if 0 in steward_tiers else min(steward_tiers)
elif risk_level == "elevated_risk":
return 1 if 1 in steward_tiers else min(steward_tiers)
elif risk_level == "critical_risk":
# Prefer Tier 2, escalate to Tier 3 if needed
if 2 in steward_tiers:
return 2
elif 3 in steward_tiers:
return 3
else:
return max(steward_tiers) # Use highest available
Complete Contract Builder¶
class ContractBuilder:
def __init__(self, steward_capabilities):
self.available_tiers = steward_capabilities.get("available_eval_tiers", [0, 1])
self.tier_selector = TierSelector()
def build_contract(self, risk_level: str, action=None):
"""Build complete governance contract."""
# Select eval tier
eval_tier = self.tier_selector.select_tier(risk_level, self.available_tiers)
# Get budget template
budget_template = self.DEFAULT_BUDGETS[risk_level].copy()
# Customize fallback if needed
if action:
budget_template["fallback_on_timeout"] = self.select_fallback(action, risk_level)
return {
"risk_level": risk_level,
"eval_tier": eval_tier, # NEW
"performance_budget": budget_template
}
Validation: - Tier selection matches risk level (low→0, elevated→1, critical→2+) - Agent respects steward's available_eval_tiers - Contracts are fully populated in messages
Stage 5: Timeout Handling¶
Goal: Implement fallback logic when latency budget is exceeded.
Steward-Side Timeout Handling¶
class GovernanceEvaluator:
"""Evaluate actions with timeout handling."""
def evaluate_with_contract(self, action, contract):
"""Evaluate action, applying fallback on timeout."""
budget_ms = contract["performance_budget"]["latency_budget_ms"]
fallback = contract["performance_budget"]["fallback_on_timeout"]
start_time = time.time()
try:
# Run evaluation with timeout
decision = self.run_evaluation(
action,
eval_tier=contract["eval_tier"],
timeout_ms=budget_ms
)
actual_latency_ms = (time.time() - start_time) * 1000
return {
"decision": decision,
"governance_status": {
"contract_honored": actual_latency_ms <= budget_ms,
"actual_latency_ms": actual_latency_ms,
"budget_used_pct": (actual_latency_ms / budget_ms) * 100,
"eval_tier_used": contract["eval_tier"]
}
}
except TimeoutError:
# Apply fallback
actual_latency_ms = (time.time() - start_time) * 1000
return self.apply_fallback(
action,
fallback,
actual_latency_ms,
budget_ms,
contract
)
def apply_fallback(self, action, fallback, actual_ms, budget_ms, contract):
"""Apply fallback behavior on timeout."""
if fallback == "deny":
decision = "deny"
reason = f"Evaluation timeout ({actual_ms}ms > {budget_ms}ms), fallback=deny"
elif fallback == "allow_and_log":
decision = "allow"
reason = f"Timeout, fallback=allow_and_log, async audit queued"
self.queue_async_audit(action, contract)
elif fallback == "cached_decision":
decision = self.get_cached_decision(action)
reason = f"Timeout, using cached decision: {decision}"
elif fallback == "escalate":
decision = "escalate"
reason = f"Timeout, fallback=escalate, awaiting human review"
self.queue_human_review(action, contract)
return {
"decision": decision,
"reason": reason,
"governance_status": {
"contract_honored": True, # Fallback was honored
"actual_latency_ms": actual_ms,
"fallback_used": fallback,
"timeout": True
}
}
Agent-Side Timeout Handling¶
class ACGPAgent:
def handle_response(self, response):
"""Handle evaluation response with timeout awareness."""
governance_status = response.get("governance_status", {})
if governance_status.get("timeout"):
# Log timeout event
self.logger.warning(
f"Governance timeout: {governance_status['actual_latency_ms']}ms "
f"(budget: {governance_status.get('budget_ms')}ms), "
f"fallback: {governance_status['fallback_used']}"
)
# Handle specific fallbacks
if governance_status["fallback_used"] == "escalate":
return self.wait_for_human_decision(response["trace_id"])
# Normal decision handling
return self.handle_decision(response["decision"])
Validation:
- Timeouts trigger correct fallback behaviors
- governance_status populated in all responses
- Async audits queued for allow_and_log
- Human review queue for escalate
Stage 6: Monitoring & Tuning¶
Goal: Measure effectiveness and optimize contracts.
Key Metrics to Track¶
class ContractMetrics:
"""Track governance contract performance."""
def collect_metrics(self):
"""Collect metrics from Governance Store audit logs."""
return {
# Latency metrics
"avg_latency_by_risk_level": self.query_avg_latency(),
"p95_latency_by_tier": self.query_p95_latency(),
"timeout_rate_by_fallback": self.query_timeout_rate(),
# Contract effectiveness
"budget_utilization_pct": self.query_budget_usage(),
"fallback_trigger_rate": self.query_fallback_rate(),
"contract_override_rate": self.query_override_rate(),
# Cost metrics
"tier_2_usage_rate": self.query_tier_usage(2),
"tier_3_usage_rate": self.query_tier_usage(3),
"estimated_monthly_cost": self.calculate_cost()
}
Dashboard Queries (SQL)¶
-- Average latency by risk level
SELECT
governance_contract->>'risk_level' as risk_level,
AVG(governance_status->>'actual_latency_ms') as avg_latency_ms,
PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY (governance_status->>'actual_latency_ms')::int) as p95_latency_ms
FROM reflection_db
WHERE governance_contract IS NOT NULL
GROUP BY governance_contract->>'risk_level';
-- Timeout rate by fallback behavior
SELECT
governance_contract->'performance_budget'->>'fallback_on_timeout' as fallback,
COUNT(*) FILTER (WHERE governance_status->>'timeout' = 'true') as timeout_count,
COUNT(*) as total_count,
(COUNT(*) FILTER (WHERE governance_status->>'timeout' = 'true')::float / COUNT(*)) * 100 as timeout_rate_pct
FROM reflection_db
WHERE governance_contract IS NOT NULL
GROUP BY fallback;
-- Eval tier usage distribution
SELECT
governance_contract->>'eval_tier' as eval_tier,
COUNT(*) as usage_count,
AVG((governance_status->>'actual_latency_ms')::int) as avg_latency_ms
FROM reflection_db
WHERE governance_contract IS NOT NULL
GROUP BY eval_tier
ORDER BY eval_tier;
Optimization Strategies¶
def optimize_contracts(self, metrics):
"""Adjust contracts based on production data."""
# 1. Increase budgets if timeout rate >5%
for risk_level, timeout_rate in metrics["timeout_rate_by_risk"].items():
if timeout_rate > 0.05:
current_budget = self.DEFAULT_BUDGETS[risk_level]["latency_budget_ms"]
new_budget = int(current_budget * 1.2) # +20%
self.logger.info(f"Increasing {risk_level} budget: {current_budget}ms → {new_budget}ms")
self.DEFAULT_BUDGETS[risk_level]["latency_budget_ms"] = new_budget
# 2. Downgrade tier if budget utilization <50%
for tier, utilization in metrics["budget_utilization_by_tier"].items():
if utilization < 0.5 and tier > 0:
self.logger.info(f"Tier {tier} underutilized ({utilization*100:.1f}%), consider Tier {tier-1}")
# 3. Switch to cached_decision if repetitive actions
for action_type, repetition_rate in metrics["action_repetition_rate"].items():
if repetition_rate > 0.7: # 70% of actions are repeats
self.logger.info(f"Action {action_type} is repetitive, using cached_decision fallback")
self.custom_fallbacks[action_type] = "cached_decision"
Validation: - Metrics dashboard is live - Timeout rate <5% for all risk levels - Budget utilization 60-80% (not too tight, not too loose) - Cost reduction measured (compare to uniform Tier 2 baseline)
Complete Example: Customer Service Bot¶
class CustomerServiceBot(ACGPAgent):
"""Full implementation with Runtime Governance Contracts."""
def __init__(self, steward_url):
super().__init__()
self.steward_url = steward_url
self.classifier = ActionClassifier()
# Sync with steward
self.sync_with_steward(steward_url)
# Initialize contract builder
self.contract_builder = ContractBuilder(self.steward_capabilities)
def handle_customer_request(self, request):
"""Process customer request with Runtime Governance Contracts."""
# 1. Classify action risk
risk_level = self.classifier.classify_action(request)
# 2. Build governance contract
contract = self.contract_builder.build_contract(risk_level, action=request)
# 3. Submit for evaluation
eval_request = {
"type": "EVAL_REQUEST",
"trace_id": generate_trace_id(),
"session_id": self.session_id,
"action": request.to_dict(),
"governance_tier": self.governance_tier,
"governance_contract": contract
}
start_time = time.time()
response = self.send_message(self.steward_url, eval_request)
e2e_latency_ms = (time.time() - start_time) * 1000
# 4. Handle response
decision = response["decision"]
governance_status = response.get("governance_status", {})
# Log metrics
self.metrics.record({
"risk_level": risk_level,
"eval_tier": contract["eval_tier"],
"budget_ms": contract["performance_budget"]["latency_budget_ms"],
"actual_latency_ms": e2e_latency_ms,
"steward_latency_ms": governance_status.get("actual_latency_ms"),
"timeout": governance_status.get("timeout", False),
"decision": decision
})
# 5. Execute or block
if decision == "allow":
return request.execute()
elif decision == "escalate":
return self.queue_for_human_review(request)
else:
return f"Action blocked: {response.get('reason')}"
# Usage
bot = CustomerServiceBot("https://steward.example.com")
# Low-risk: <100ms
bot.handle_customer_request(GetBalanceRequest(account_id="12345"))
# Elevated-risk: <300ms
bot.handle_customer_request(UpdateAddressRequest(account_id="12345", new_address="..."))
# Critical-risk: <5s with human review
bot.handle_customer_request(RefundRequest(account_id="12345", amount=5000))
Troubleshooting¶
Issue: Timeouts >10%¶
Symptoms: Most actions exceeding latency budget
Diagnosis:
SELECT
governance_contract->>'eval_tier' as tier,
AVG((governance_status->>'actual_latency_ms')::int) as avg_latency,
governance_contract->'performance_budget'->>'latency_budget_ms' as budget
FROM reflection_db
WHERE governance_status->>'timeout' = 'true'
GROUP BY tier, budget;
Solutions: 1. Increase budgets (e.g., 100ms → 150ms for Tier 0) 2. Downgrade eval tier (Tier 1 → Tier 0 for low-risk) 3. Optimize steward evaluation code 4. Add caching for repetitive actions
Issue: High Tier ⅔ Usage¶
Symptoms: Costs higher than expected
Diagnosis:
SELECT
governance_contract->>'risk_level' as risk,
governance_contract->>'eval_tier' as tier,
COUNT(*) as usage_count
FROM reflection_db
GROUP BY risk, tier
ORDER BY usage_count DESC;
Solutions: 1. Reclassify actions (critical → elevated where safe) 2. Use Hybrid pattern (async Tier 2 instead of sync) 3. Increase Tier 0/1 coverage with better rules
Issue: Capability Negotiation Fails¶
Symptoms: governance_contracts: false in VERSION_SELECTED.payload.server_capabilities
Solutions:
1. Verify the steward advertises the preview extension in VERSION_SELECTED
2. Check steward config: enable_governance_contracts: true
3. Update steward to support Runtime Governance Contracts
Testing Checklist¶
Before deploying to production:
- [] Unit tests for risk classification (>90% accuracy)
- [] Unit tests for contract building (all risk levels)
- [] Unit tests for fallback selection logic
- [] Integration test: VERSION_NEGOTIATION + VERSION_SELECTED capability negotiation
- [] Integration test: EVAL_REQUEST with governance_contract
- [] Integration test: Timeout triggers correct fallback
- [] Load test: Latency budgets met at p95 (1000 req/s)
- [] Chaos test: Steward unavailable → graceful degradation
- [] Monitoring: Metrics dashboard functional
- [] Audit: Governance Store logs include governance_status
Performance Benchmarks¶
Expected results after full implementation:
| Metric | Before (ACGP 1.0) | After (ACGP 1.1) | Improvement |
|---|---|---|---|
| P95 latency (low-risk) | 300ms | 100ms | 67% faster |
| P95 latency (elevated) | 300ms | 250ms | 17% faster |
| P95 latency (critical) | 300ms | 2000ms | Slower (intentional) |
| Monthly cost | $10,000 | $4,000 | 60% cheaper |
| Timeout rate | N/A | <5% | N/A |
Next Steps¶
- Start with Stage 1 (capability announcement) - no risk, enables future stages
- Pilot Stage 2-3 on low-risk actions only (e.g., read operations)
- Monitor Stage 4-5 for 1 week before expanding to elevated/critical
- Optimize Stage 6 based on production metrics
- Read the extension spec for advanced features: Runtime Governance Contracts
Additional Resources¶
- Concept guide: Runtime Governance Contracts
- Interactive calculator: Latency Budget Calculator
- Core architecture context: ACGP-1
- Conformance model: ACGP-6