Implementing Governance Contracts¶
Status: ACGP 1.1 (Optional Extension)
Audience: Agent developers, steward implementers
Estimated time: 2-4 hours (incremental adoption)
Prerequisites: Familiarity with ACGP Core and Message Formats
Overview¶
This guide provides a 6-stage implementation path for adding Governance Contracts (ACGP-1010) to existing ACGP 1.0 deployments. Each stage is independently deployable, allowing incremental adoption without breaking existing functionality.
Implementation Stages¶
- Capability Announcement - Declare support in SYNC handshake
- Risk Classification - Add risk_level to high-volume actions
- Performance Budgets - Set latency budgets and fallback behaviors
- Eval Tier Selection - Choose evaluation depth per action
- Timeout Handling - Implement fallback logic
- Monitoring & Tuning - Track performance and optimize contracts
Stage 1: Capability Announcement¶
Goal: Enable capability negotiation without changing evaluation logic.
Agent-Side Changes¶
class ACGPAgent:
def sync_with_steward(self, steward_url):
"""Announce ACGP 1.1 support during SYNC."""
sync_message = {
"type": "SYNC",
"protocol_version": "1.1.0", # Bump from 1.0.x
"agent_id": self.agent_id,
"acl_tier": self.acl_tier,
"capabilities": {
"supports_governance_contracts": True, # NEW
"max_eval_tier": 2 # NEW: Agent can wait up to Tier 2
}
}
response = self.send_message(steward_url, sync_message)
# Store steward's capabilities
self.steward_capabilities = response.get("capabilities", {})
self.contracts_enabled = (
self.steward_capabilities.get("supports_governance_contracts", False)
)
Steward-Side Changes¶
class ACGPSteward:
def handle_sync(self, sync_message):
"""Respond with steward capabilities."""
agent_caps = sync_message.get("capabilities", {})
sync_ack = {
"type": "SYNC_ACK",
"protocol_version": "1.1.0",
"session_id": generate_session_id(),
"capabilities": {
"supports_governance_contracts": True, # NEW
"available_eval_tiers": [0, 1, 2, 3] # NEW: All tiers available
}
}
# Negotiate common ground
self.session_state[sync_ack["session_id"]] = {
"contracts_enabled": agent_caps.get("supports_governance_contracts", False),
"max_eval_tier": min(
agent_caps.get("max_eval_tier", 3),
max(self.available_eval_tiers)
)
}
return sync_ack
Validation: - Both sides announce capabilities - Neither side errors if other doesn't support contracts - Protocol version bumped to 1.1.0
Stage 2: Risk Classification¶
Goal: Classify actions by risk level without changing evaluation yet.
Risk Classification Logic¶
class ActionClassifier:
"""Classify agent actions by risk level."""
def classify_action(self, action) -> str:
"""
Returns: "low_risk", "elevated_risk", or "critical_risk"
"""
# Critical risk: Irreversible, high-impact
if self._is_critical(action):
return "critical_risk"
# Elevated risk: Sensitive operations
if self._is_elevated(action):
return "elevated_risk"
# Low risk: Default for safe operations
return "low_risk"
def _is_critical(self, action) -> bool:
"""Check for high-stakes operations."""
return (
action.tool in ["transfer_funds", "delete_resource", "grant_access"]
or (action.tool == "update_record" and action.args.get("value", 0) > 10000)
or action.requires_approval
)
def _is_elevated(self, action) -> bool:
"""Check for sensitive operations."""
return (
action.tool in ["update_record", "send_email", "api_call"]
or action.accesses_pii
or action.modifies_state
)
Integration into Agent¶
class ACGPAgent:
def __init__(self):
self.classifier = ActionClassifier()
def submit_action(self, action):
"""Submit action with risk classification."""
# Classify risk
risk_level = self.classifier.classify_action(action)
# Build evaluation request
eval_request = {
"type": "EVAL_REQUEST",
"trace_id": generate_trace_id(),
"session_id": self.session_id,
"action": action.to_dict(),
"acl_tier": self.acl_tier,
"protocol_version": "1.1.0"
}
# Add governance contract if enabled
if self.contracts_enabled:
eval_request["governance_contract"] = {
"risk_level": risk_level # NEW: Start with just risk level
}
return self.steward.evaluate(eval_request)
Validation: - Actions are classified (log risk_level) - Classification accuracy >90% (manual review sample) - No evaluation logic changed yet
Stage 3: Performance Budgets¶
Goal: Set latency budgets and fallback behaviors.
Budget Assignment¶
class ContractBuilder:
"""Build governance contracts with budgets."""
# Default budgets per risk level (from ACGP-1010)
DEFAULT_BUDGETS = {
"low_risk": {
"latency_budget_ms": 100,
"fallback_behavior": "deny"
},
"elevated_risk": {
"latency_budget_ms": 300,
"fallback_behavior": "allow_and_log"
},
"critical_risk": {
"latency_budget_ms": 5000,
"fallback_behavior": "escalate"
}
}
def build_contract(self, risk_level: str, custom_budget=None):
"""Build contract with performance budget."""
budget = custom_budget or self.DEFAULT_BUDGETS[risk_level]
return {
"risk_level": risk_level,
"performance_budget": {
"latency_budget_ms": budget["latency_budget_ms"],
"fallback_behavior": budget["fallback_behavior"]
}
}
Fallback Behavior Selection¶
def select_fallback(self, action, risk_level):
"""Choose fallback behavior based on action characteristics."""
# Safety-critical: Always deny on timeout
if action.irreversible or risk_level == "critical_risk":
return "deny"
# Repetitive actions: Use cached decision
if action.is_similar_to_recent():
return "cached_decision"
# High-volume reads: Allow and log for async audit
if action.is_read_only and risk_level == "low_risk":
return "allow_and_log"
# Ambiguous cases: Escalate to human
if action.requires_judgment:
return "escalate"
# Default: Conservative deny
return "deny"
Validation: - Budgets are reasonable for action types - Fallback behaviors align with risk tolerance - Contracts included in EVAL_REQUEST messages
Stage 4: Eval Tier Selection¶
Goal: Choose evaluation depth based on risk and performance needs.
Tier Selection Strategy¶
class TierSelector:
"""Select evaluation tier based on risk and constraints."""
def select_tier(self, risk_level: str, steward_tiers: list) -> int:
"""
Returns: 0, 1, 2, or 3
Strategy:
- low_risk: Prefer Tier 0 (in-memory)
- elevated_risk: Prefer Tier 1 (DB/cache)
- critical_risk: Prefer Tier 2+ (model/human)
"""
if risk_level == "low_risk":
return 0 if 0 in steward_tiers else min(steward_tiers)
elif risk_level == "elevated_risk":
return 1 if 1 in steward_tiers else min(steward_tiers)
elif risk_level == "critical_risk":
# Prefer Tier 2, escalate to Tier 3 if needed
if 2 in steward_tiers:
return 2
elif 3 in steward_tiers:
return 3
else:
return max(steward_tiers) # Use highest available
Complete Contract Builder¶
class ContractBuilder:
def __init__(self, steward_capabilities):
self.available_tiers = steward_capabilities.get("available_eval_tiers", [0, 1])
self.tier_selector = TierSelector()
def build_contract(self, risk_level: str, action=None):
"""Build complete governance contract."""
# Select eval tier
eval_tier = self.tier_selector.select_tier(risk_level, self.available_tiers)
# Get budget template
budget_template = self.DEFAULT_BUDGETS[risk_level].copy()
# Customize fallback if needed
if action:
budget_template["fallback_behavior"] = self.select_fallback(action, risk_level)
return {
"risk_level": risk_level,
"eval_tier": eval_tier, # NEW
"performance_budget": budget_template
}
Validation: - Tier selection matches risk level (low→0, elevated→1, critical→2+) - Agent respects steward's available_eval_tiers - Contracts are fully populated in messages
Stage 5: Timeout Handling¶
Goal: Implement fallback logic when latency budget is exceeded.
Steward-Side Timeout Handling¶
class GovernanceEvaluator:
"""Evaluate actions with timeout handling."""
def evaluate_with_contract(self, action, contract):
"""Evaluate action, applying fallback on timeout."""
budget_ms = contract["performance_budget"]["latency_budget_ms"]
fallback = contract["performance_budget"]["fallback_behavior"]
start_time = time.time()
try:
# Run evaluation with timeout
decision = self.run_evaluation(
action,
eval_tier=contract["eval_tier"],
timeout_ms=budget_ms
)
actual_latency_ms = (time.time() - start_time) * 1000
return {
"decision": decision,
"governance_status": {
"contract_honored": actual_latency_ms <= budget_ms,
"actual_latency_ms": actual_latency_ms,
"budget_used_pct": (actual_latency_ms / budget_ms) * 100,
"eval_tier_used": contract["eval_tier"]
}
}
except TimeoutError:
# Apply fallback
actual_latency_ms = (time.time() - start_time) * 1000
return self.apply_fallback(
action,
fallback,
actual_latency_ms,
budget_ms,
contract
)
def apply_fallback(self, action, fallback, actual_ms, budget_ms, contract):
"""Apply fallback behavior on timeout."""
if fallback == "deny":
decision = "deny"
reason = f"Evaluation timeout ({actual_ms}ms > {budget_ms}ms), fallback=deny"
elif fallback == "allow_and_log":
decision = "allow"
reason = f"Timeout, fallback=allow_and_log, async audit queued"
self.queue_async_audit(action, contract)
elif fallback == "cached_decision":
decision = self.get_cached_decision(action)
reason = f"Timeout, using cached decision: {decision}"
elif fallback == "escalate":
decision = "escalate"
reason = f"Timeout, fallback=escalate, awaiting human review"
self.queue_human_review(action, contract)
return {
"decision": decision,
"reason": reason,
"governance_status": {
"contract_honored": True, # Fallback was honored
"actual_latency_ms": actual_ms,
"fallback_used": fallback,
"timeout": True
}
}
Agent-Side Timeout Handling¶
class ACGPAgent:
def handle_response(self, response):
"""Handle evaluation response with timeout awareness."""
governance_status = response.get("governance_status", {})
if governance_status.get("timeout"):
# Log timeout event
self.logger.warning(
f"Governance timeout: {governance_status['actual_latency_ms']}ms "
f"(budget: {governance_status.get('budget_ms')}ms), "
f"fallback: {governance_status['fallback_used']}"
)
# Handle specific fallbacks
if governance_status["fallback_used"] == "escalate":
return self.wait_for_human_decision(response["trace_id"])
# Normal decision handling
return self.handle_decision(response["decision"])
Validation:
- Timeouts trigger correct fallback behaviors
- governance_status populated in all responses
- Async audits queued for allow_and_log
- Human review queue for escalate
Stage 6: Monitoring & Tuning¶
Goal: Measure effectiveness and optimize contracts.
Key Metrics to Track¶
class ContractMetrics:
"""Track governance contract performance."""
def collect_metrics(self):
"""Collect metrics from ReflectionDB audit logs."""
return {
# Latency metrics
"avg_latency_by_risk_level": self.query_avg_latency(),
"p95_latency_by_tier": self.query_p95_latency(),
"timeout_rate_by_fallback": self.query_timeout_rate(),
# Contract effectiveness
"budget_utilization_pct": self.query_budget_usage(),
"fallback_trigger_rate": self.query_fallback_rate(),
"contract_override_rate": self.query_override_rate(),
# Cost metrics
"tier_2_usage_rate": self.query_tier_usage(2),
"tier_3_usage_rate": self.query_tier_usage(3),
"estimated_monthly_cost": self.calculate_cost()
}
Dashboard Queries (SQL)¶
-- Average latency by risk level
SELECT
governance_contract->>'risk_level' as risk_level,
AVG(governance_status->>'actual_latency_ms') as avg_latency_ms,
PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY (governance_status->>'actual_latency_ms')::int) as p95_latency_ms
FROM reflection_db
WHERE governance_contract IS NOT NULL
GROUP BY governance_contract->>'risk_level';
-- Timeout rate by fallback behavior
SELECT
governance_contract->'performance_budget'->>'fallback_behavior' as fallback,
COUNT(*) FILTER (WHERE governance_status->>'timeout' = 'true') as timeout_count,
COUNT(*) as total_count,
(COUNT(*) FILTER (WHERE governance_status->>'timeout' = 'true')::float / COUNT(*)) * 100 as timeout_rate_pct
FROM reflection_db
WHERE governance_contract IS NOT NULL
GROUP BY fallback;
-- Eval tier usage distribution
SELECT
governance_contract->>'eval_tier' as eval_tier,
COUNT(*) as usage_count,
AVG((governance_status->>'actual_latency_ms')::int) as avg_latency_ms
FROM reflection_db
WHERE governance_contract IS NOT NULL
GROUP BY eval_tier
ORDER BY eval_tier;
Optimization Strategies¶
def optimize_contracts(self, metrics):
"""Adjust contracts based on production data."""
# 1. Increase budgets if timeout rate >5%
for risk_level, timeout_rate in metrics["timeout_rate_by_risk"].items():
if timeout_rate > 0.05:
current_budget = self.DEFAULT_BUDGETS[risk_level]["latency_budget_ms"]
new_budget = int(current_budget * 1.2) # +20%
self.logger.info(f"Increasing {risk_level} budget: {current_budget}ms → {new_budget}ms")
self.DEFAULT_BUDGETS[risk_level]["latency_budget_ms"] = new_budget
# 2. Downgrade tier if budget utilization <50%
for tier, utilization in metrics["budget_utilization_by_tier"].items():
if utilization < 0.5 and tier > 0:
self.logger.info(f"Tier {tier} underutilized ({utilization*100:.1f}%), consider Tier {tier-1}")
# 3. Switch to cached_decision if repetitive actions
for action_type, repetition_rate in metrics["action_repetition_rate"].items():
if repetition_rate > 0.7: # 70% of actions are repeats
self.logger.info(f"Action {action_type} is repetitive, using cached_decision fallback")
self.custom_fallbacks[action_type] = "cached_decision"
Validation: - Metrics dashboard is live - Timeout rate <5% for all risk levels - Budget utilization 60-80% (not too tight, not too loose) - Cost reduction measured (compare to uniform Tier 2 baseline)
Complete Example: Customer Service Bot¶
class CustomerServiceBot(ACGPAgent):
"""Full implementation with governance contracts."""
def __init__(self, steward_url):
super().__init__()
self.steward_url = steward_url
self.classifier = ActionClassifier()
# Sync with steward
self.sync_with_steward(steward_url)
# Initialize contract builder
self.contract_builder = ContractBuilder(self.steward_capabilities)
def handle_customer_request(self, request):
"""Process customer request with governance contracts."""
# 1. Classify action risk
risk_level = self.classifier.classify_action(request)
# 2. Build governance contract
contract = self.contract_builder.build_contract(risk_level, action=request)
# 3. Submit for evaluation
eval_request = {
"type": "EVAL_REQUEST",
"trace_id": generate_trace_id(),
"session_id": self.session_id,
"action": request.to_dict(),
"acl_tier": self.acl_tier,
"protocol_version": "1.1.0",
"governance_contract": contract
}
start_time = time.time()
response = self.send_message(self.steward_url, eval_request)
e2e_latency_ms = (time.time() - start_time) * 1000
# 4. Handle response
decision = response["decision"]
governance_status = response.get("governance_status", {})
# Log metrics
self.metrics.record({
"risk_level": risk_level,
"eval_tier": contract["eval_tier"],
"budget_ms": contract["performance_budget"]["latency_budget_ms"],
"actual_latency_ms": e2e_latency_ms,
"steward_latency_ms": governance_status.get("actual_latency_ms"),
"timeout": governance_status.get("timeout", False),
"decision": decision
})
# 5. Execute or block
if decision == "allow":
return request.execute()
elif decision == "escalate":
return self.queue_for_human_review(request)
else:
return f"Action blocked: {response.get('reason')}"
# Usage
bot = CustomerServiceBot("https://steward.example.com")
# Low-risk: <100ms
bot.handle_customer_request(GetBalanceRequest(account_id="12345"))
# Elevated-risk: <300ms
bot.handle_customer_request(UpdateAddressRequest(account_id="12345", new_address="..."))
# Critical-risk: <5s with human review
bot.handle_customer_request(RefundRequest(account_id="12345", amount=5000))
Troubleshooting¶
Issue: Timeouts >10%¶
Symptoms: Most actions exceeding latency budget
Diagnosis:
SELECT
governance_contract->>'eval_tier' as tier,
AVG((governance_status->>'actual_latency_ms')::int) as avg_latency,
governance_contract->'performance_budget'->>'latency_budget_ms' as budget
FROM reflection_db
WHERE governance_status->>'timeout' = 'true'
GROUP BY tier, budget;
Solutions: 1. Increase budgets (e.g., 100ms → 150ms for Tier 0) 2. Downgrade eval tier (Tier 1 → Tier 0 for low-risk) 3. Optimize steward evaluation code 4. Add caching for repetitive actions
Issue: High Tier ⅔ Usage¶
Symptoms: Costs higher than expected
Diagnosis:
SELECT
governance_contract->>'risk_level' as risk,
governance_contract->>'eval_tier' as tier,
COUNT(*) as usage_count
FROM reflection_db
GROUP BY risk, tier
ORDER BY usage_count DESC;
Solutions: 1. Reclassify actions (critical → elevated where safe) 2. Use Hybrid pattern (async Tier 2 instead of sync) 3. Increase Tier 0/1 coverage with better rules
Issue: Capability Negotiation Fails¶
Symptoms: supports_governance_contracts: false in SYNC_ACK
Solutions:
1. Verify steward ACGP version ≥1.1.0
2. Check steward config: enable_governance_contracts: true
3. Update steward to support ACGP-1010
Testing Checklist¶
Before deploying to production:
- [] Unit tests for risk classification (>90% accuracy)
- [] Unit tests for contract building (all risk levels)
- [] Unit tests for fallback selection logic
- [] Integration test: SYNC capability negotiation
- [] Integration test: EVAL_REQUEST with governance_contract
- [] Integration test: Timeout triggers correct fallback
- [] Load test: Latency budgets met at p95 (1000 req/s)
- [] Chaos test: Steward unavailable → graceful degradation
- [] Monitoring: Metrics dashboard functional
- [] Audit: ReflectionDB logs include governance_status
Performance Benchmarks¶
Expected results after full implementation:
| Metric | Before (ACGP 1.0) | After (ACGP 1.1) | Improvement |
|---|---|---|---|
| P95 latency (low-risk) | 300ms | 100ms | 67% faster |
| P95 latency (elevated) | 300ms | 250ms | 17% faster |
| P95 latency (critical) | 300ms | 2000ms | Slower (intentional) |
| Monthly cost | $10,000 | $4,000 | 60% cheaper |
| Timeout rate | N/A | <5% | N/A |
Next Steps¶
- Start with Stage 1 (capability announcement) - no risk, enables future stages
- Pilot Stage 2-3 on low-risk actions only (e.g., read operations)
- Monitor Stage 4-5 for 1 week before expanding to elevated/critical
- Optimize Stage 6 based on production metrics
- Read the spec for advanced features: ACGP-1010
Additional Resources¶
- Concept guide: Governance Contracts
- Interactive calculator: Latency Budget Calculator
- Architecture patterns: ACGP-1002 8.6
- Conformance tests: ACGP-1009 5.4