Introduction
Human oversight is the ultimate safety mechanism for AI agents. No matter how sophisticated automated guardrails become, certain decisions require human judgment. This section covers patterns for integrating human oversight into agent workflows effectively.
Section Overview: We'll explore approval workflows, escalation patterns, intervention mechanisms, and progressive autonomy systems for human-agent collaboration.
Approval Workflows
When to Require Approval
| Action Type | Approval Level | Rationale |
|---|---|---|
| Read public data | None | Low risk, reversible |
| Write to workspace | None/Notify | Limited scope |
| External API calls | Review | External impact |
| Send communications | Required | Affects recipients |
| Financial actions | Required + 2FA | High stakes |
| System changes | Required + Admin | Critical infrastructure |
🐍python
1"""
2Approval Workflow System
3
4Implements multi-level approval for agent actions.
5"""
6
7from dataclasses import dataclass, field
8from datetime import datetime, timedelta
9from enum import Enum
10from typing import Callable
11import uuid
12
13
14class ApprovalLevel(Enum):
15 NONE = "none"
16 NOTIFY = "notify"
17 REVIEW = "review"
18 REQUIRED = "required"
19 REQUIRED_2FA = "required_2fa"
20 ADMIN_ONLY = "admin_only"
21
22
23class ApprovalStatus(Enum):
24 PENDING = "pending"
25 APPROVED = "approved"
26 DENIED = "denied"
27 EXPIRED = "expired"
28 AUTO_APPROVED = "auto_approved"
29
30
31@dataclass
32class ApprovalRequest:
33 """A request for human approval."""
34 id: str
35 action_type: str
36 action_details: dict
37 level: ApprovalLevel
38 created_at: datetime
39 expires_at: datetime
40 status: ApprovalStatus = ApprovalStatus.PENDING
41 approver: str | None = None
42 approval_time: datetime | None = None
43 notes: str = ""
44
45
46class ApprovalWorkflow:
47 """Manage approval workflows for agent actions."""
48
49 def __init__(self):
50 self.pending_requests: dict[str, ApprovalRequest] = {}
51 self.approval_handlers: dict[ApprovalLevel, Callable] = {}
52
53 # Configure action -> approval level mapping
54 self.action_levels: dict[str, ApprovalLevel] = {
55 "read_file": ApprovalLevel.NONE,
56 "write_file": ApprovalLevel.NOTIFY,
57 "api_call": ApprovalLevel.REVIEW,
58 "send_email": ApprovalLevel.REQUIRED,
59 "payment": ApprovalLevel.REQUIRED_2FA,
60 "delete_data": ApprovalLevel.ADMIN_ONLY,
61 }
62
63 # Configure timeouts
64 self.timeouts: dict[ApprovalLevel, int] = {
65 ApprovalLevel.NOTIFY: 0, # Immediate
66 ApprovalLevel.REVIEW: 3600, # 1 hour
67 ApprovalLevel.REQUIRED: 86400, # 24 hours
68 ApprovalLevel.REQUIRED_2FA: 300, # 5 minutes
69 ApprovalLevel.ADMIN_ONLY: 86400,
70 }
71
72 def request_approval(
73 self,
74 action_type: str,
75 action_details: dict
76 ) -> ApprovalRequest:
77 """Create an approval request."""
78
79 level = self.action_levels.get(action_type, ApprovalLevel.REQUIRED)
80
81 # Auto-approve if no approval needed
82 if level == ApprovalLevel.NONE:
83 return ApprovalRequest(
84 id=str(uuid.uuid4()),
85 action_type=action_type,
86 action_details=action_details,
87 level=level,
88 created_at=datetime.now(),
89 expires_at=datetime.now(),
90 status=ApprovalStatus.AUTO_APPROVED
91 )
92
93 # Create pending request
94 timeout = self.timeouts.get(level, 3600)
95 request = ApprovalRequest(
96 id=str(uuid.uuid4()),
97 action_type=action_type,
98 action_details=action_details,
99 level=level,
100 created_at=datetime.now(),
101 expires_at=datetime.now() + timedelta(seconds=timeout)
102 )
103
104 self.pending_requests[request.id] = request
105
106 # Send notification
107 self._notify_approvers(request)
108
109 return request
110
111 def approve(
112 self,
113 request_id: str,
114 approver: str,
115 verification: str | None = None
116 ) -> bool:
117 """Approve a pending request."""
118
119 request = self.pending_requests.get(request_id)
120 if not request:
121 return False
122
123 # Check expiration
124 if datetime.now() > request.expires_at:
125 request.status = ApprovalStatus.EXPIRED
126 return False
127
128 # Check 2FA if required
129 if request.level == ApprovalLevel.REQUIRED_2FA:
130 if not self._verify_2fa(approver, verification):
131 return False
132
133 # Check admin level
134 if request.level == ApprovalLevel.ADMIN_ONLY:
135 if not self._is_admin(approver):
136 return False
137
138 # Approve
139 request.status = ApprovalStatus.APPROVED
140 request.approver = approver
141 request.approval_time = datetime.now()
142
143 return True
144
145 def deny(
146 self,
147 request_id: str,
148 approver: str,
149 reason: str = ""
150 ) -> bool:
151 """Deny a pending request."""
152
153 request = self.pending_requests.get(request_id)
154 if not request:
155 return False
156
157 request.status = ApprovalStatus.DENIED
158 request.approver = approver
159 request.notes = reason
160
161 return True
162
163 def check_status(self, request_id: str) -> ApprovalStatus:
164 """Check the status of an approval request."""
165 request = self.pending_requests.get(request_id)
166 if not request:
167 return ApprovalStatus.EXPIRED
168
169 # Check expiration
170 if (request.status == ApprovalStatus.PENDING and
171 datetime.now() > request.expires_at):
172 request.status = ApprovalStatus.EXPIRED
173
174 return request.status
175
176 def _notify_approvers(self, request: ApprovalRequest):
177 """Send notification to approvers."""
178 # Implementation depends on notification system
179 pass
180
181 def _verify_2fa(self, user: str, code: str | None) -> bool:
182 """Verify 2FA code."""
183 # Implementation depends on 2FA system
184 return code == "123456" # Placeholder
185
186 def _is_admin(self, user: str) -> bool:
187 """Check if user is admin."""
188 # Implementation depends on auth system
189 return user.endswith("@admin.com")
190
191
192# Usage
193workflow = ApprovalWorkflow()
194
195# Request approval for sending email
196request = workflow.request_approval(
197 "send_email",
198 {"to": "user@example.com", "subject": "Hello"}
199)
200
201print(f"Request ID: {request.id}")
202print(f"Status: {request.status}")
203
204# Simulate approval
205if workflow.approve(request.id, "approver@company.com"):
206 print("Email sending approved!")
207else:
208 print("Approval failed")Escalation Patterns
When to Escalate
🐍python
1"""
2Escalation Patterns
3
4Automatically escalate issues to humans when:
51. Agent is uncertain
62. Risk threshold exceeded
73. Anomaly detected
84. User requests human
95. Critical decision required
10"""
11
12from dataclasses import dataclass
13from datetime import datetime
14from enum import Enum
15from typing import Any
16
17
18class EscalationReason(Enum):
19 UNCERTAINTY = "uncertainty"
20 HIGH_RISK = "high_risk"
21 ANOMALY = "anomaly"
22 USER_REQUEST = "user_request"
23 CRITICAL_DECISION = "critical_decision"
24 REPEATED_FAILURES = "repeated_failures"
25 POLICY_VIOLATION = "policy_violation"
26
27
28@dataclass
29class EscalationTicket:
30 """Ticket for escalated issue."""
31 id: str
32 reason: EscalationReason
33 context: dict
34 priority: int # 1-5, 1 is highest
35 created_at: datetime
36 assigned_to: str | None = None
37 resolved: bool = False
38 resolution: str | None = None
39
40
41class EscalationManager:
42 """Manage escalation of agent issues to humans."""
43
44 def __init__(self):
45 self.tickets: dict[str, EscalationTicket] = {}
46 self.escalation_thresholds = {
47 "uncertainty": 0.3, # Escalate if confidence < 30%
48 "risk": 0.7, # Escalate if risk > 70%
49 "failures": 3, # Escalate after 3 failures
50 }
51
52 # Priority mapping
53 self.priority_rules = {
54 EscalationReason.CRITICAL_DECISION: 1,
55 EscalationReason.HIGH_RISK: 2,
56 EscalationReason.POLICY_VIOLATION: 2,
57 EscalationReason.USER_REQUEST: 3,
58 EscalationReason.ANOMALY: 3,
59 EscalationReason.UNCERTAINTY: 4,
60 EscalationReason.REPEATED_FAILURES: 4,
61 }
62
63 def should_escalate(
64 self,
65 reason: EscalationReason,
66 metrics: dict
67 ) -> bool:
68 """Determine if escalation is needed."""
69
70 if reason == EscalationReason.UNCERTAINTY:
71 return metrics.get("confidence", 1.0) < self.escalation_thresholds["uncertainty"]
72
73 elif reason == EscalationReason.HIGH_RISK:
74 return metrics.get("risk_score", 0.0) > self.escalation_thresholds["risk"]
75
76 elif reason == EscalationReason.REPEATED_FAILURES:
77 return metrics.get("failure_count", 0) >= self.escalation_thresholds["failures"]
78
79 elif reason == EscalationReason.USER_REQUEST:
80 return True # Always escalate user requests
81
82 elif reason == EscalationReason.CRITICAL_DECISION:
83 return True # Always escalate critical decisions
84
85 return False
86
87 def escalate(
88 self,
89 reason: EscalationReason,
90 context: dict
91 ) -> EscalationTicket:
92 """Create an escalation ticket."""
93 import uuid
94
95 ticket = EscalationTicket(
96 id=str(uuid.uuid4()),
97 reason=reason,
98 context=context,
99 priority=self.priority_rules.get(reason, 5),
100 created_at=datetime.now()
101 )
102
103 self.tickets[ticket.id] = ticket
104
105 # Route to appropriate handler
106 self._route_ticket(ticket)
107
108 return ticket
109
110 def _route_ticket(self, ticket: EscalationTicket):
111 """Route ticket to appropriate handler."""
112 if ticket.priority == 1:
113 # Critical - page on-call
114 self._page_oncall(ticket)
115 elif ticket.priority <= 3:
116 # High priority - immediate notification
117 self._notify_team(ticket)
118 else:
119 # Normal - queue for review
120 self._queue_for_review(ticket)
121
122 def resolve(
123 self,
124 ticket_id: str,
125 resolution: str,
126 action: dict | None = None
127 ) -> bool:
128 """Resolve an escalation ticket."""
129 ticket = self.tickets.get(ticket_id)
130 if not ticket:
131 return False
132
133 ticket.resolved = True
134 ticket.resolution = resolution
135
136 # If action provided, return it to agent
137 if action:
138 self._return_to_agent(ticket, action)
139
140 return True
141
142 def _page_oncall(self, ticket: EscalationTicket):
143 """Page the on-call person."""
144 pass
145
146 def _notify_team(self, ticket: EscalationTicket):
147 """Notify the team."""
148 pass
149
150 def _queue_for_review(self, ticket: EscalationTicket):
151 """Add to review queue."""
152 pass
153
154 def _return_to_agent(self, ticket: EscalationTicket, action: dict):
155 """Return resolution to agent."""
156 pass
157
158
159class UncertaintyEscalator:
160 """Escalate based on agent uncertainty."""
161
162 def __init__(self, manager: EscalationManager):
163 self.manager = manager
164
165 def check_and_escalate(
166 self,
167 decision: str,
168 confidence: float,
169 context: dict
170 ) -> EscalationTicket | None:
171 """Check uncertainty and escalate if needed."""
172
173 if self.manager.should_escalate(
174 EscalationReason.UNCERTAINTY,
175 {"confidence": confidence}
176 ):
177 return self.manager.escalate(
178 EscalationReason.UNCERTAINTY,
179 {
180 "decision": decision,
181 "confidence": confidence,
182 "context": context,
183 "agent_recommendation": decision,
184 }
185 )
186
187 return NoneIntervention Mechanisms
Real-Time Human Intervention
🐍python
1"""
2Intervention Mechanisms
3
4Allow humans to:
51. Pause agent execution
62. Modify agent behavior
73. Override decisions
84. Kill runaway agents
9"""
10
11from dataclasses import dataclass
12from datetime import datetime
13from enum import Enum
14from typing import Callable
15import threading
16
17
18class AgentState(Enum):
19 RUNNING = "running"
20 PAUSED = "paused"
21 STOPPED = "stopped"
22 AWAITING_INPUT = "awaiting_input"
23
24
25@dataclass
26class InterventionEvent:
27 """Record of a human intervention."""
28 timestamp: datetime
29 intervention_type: str
30 reason: str
31 operator: str
32 details: dict
33
34
35class InterventionController:
36 """Control agent execution with human intervention."""
37
38 def __init__(self, agent_id: str):
39 self.agent_id = agent_id
40 self.state = AgentState.RUNNING
41 self.interventions: list[InterventionEvent] = []
42 self._pause_event = threading.Event()
43 self._pause_event.set() # Not paused initially
44 self._stop_flag = False
45
46 def pause(self, operator: str, reason: str):
47 """Pause agent execution."""
48 self.state = AgentState.PAUSED
49 self._pause_event.clear()
50
51 self.interventions.append(InterventionEvent(
52 timestamp=datetime.now(),
53 intervention_type="pause",
54 reason=reason,
55 operator=operator,
56 details={}
57 ))
58
59 def resume(self, operator: str):
60 """Resume agent execution."""
61 if self.state == AgentState.PAUSED:
62 self.state = AgentState.RUNNING
63 self._pause_event.set()
64
65 self.interventions.append(InterventionEvent(
66 timestamp=datetime.now(),
67 intervention_type="resume",
68 reason="Manual resume",
69 operator=operator,
70 details={}
71 ))
72
73 def stop(self, operator: str, reason: str):
74 """Stop agent completely."""
75 self.state = AgentState.STOPPED
76 self._stop_flag = True
77 self._pause_event.set() # Release any waiting
78
79 self.interventions.append(InterventionEvent(
80 timestamp=datetime.now(),
81 intervention_type="stop",
82 reason=reason,
83 operator=operator,
84 details={}
85 ))
86
87 def check_point(self) -> bool:
88 """Check if agent should continue (call before each action)."""
89 # Check for stop
90 if self._stop_flag:
91 return False
92
93 # Wait if paused
94 self._pause_event.wait()
95
96 return not self._stop_flag
97
98 def inject_instruction(
99 self,
100 instruction: str,
101 operator: str
102 ):
103 """Inject an instruction into the agent."""
104 self.interventions.append(InterventionEvent(
105 timestamp=datetime.now(),
106 intervention_type="inject",
107 reason="Operator instruction",
108 operator=operator,
109 details={"instruction": instruction}
110 ))
111
112 # Implementation would send to agent's message queue
113 pass
114
115
116class AgentWithIntervention:
117 """Agent that supports human intervention."""
118
119 def __init__(self, agent_id: str):
120 self.agent_id = agent_id
121 self.controller = InterventionController(agent_id)
122 self.message_queue: list[str] = []
123
124 def run(self, task: str):
125 """Run the agent with intervention checkpoints."""
126
127 while True:
128 # Check if we should continue
129 if not self.controller.check_point():
130 print(f"Agent {self.agent_id} stopped")
131 break
132
133 # Check for injected messages
134 if self.message_queue:
135 message = self.message_queue.pop(0)
136 self._process_injected_message(message)
137
138 # Normal agent iteration
139 action = self._decide_next_action()
140
141 # Another checkpoint before action
142 if not self.controller.check_point():
143 break
144
145 result = self._execute_action(action)
146
147 if self._is_complete(result):
148 break
149
150 def _process_injected_message(self, message: str):
151 """Process a message injected by operator."""
152 print(f"Processing injected: {message}")
153
154 def _decide_next_action(self) -> dict:
155 """Decide next action."""
156 pass
157
158 def _execute_action(self, action: dict) -> dict:
159 """Execute an action."""
160 pass
161
162 def _is_complete(self, result: dict) -> bool:
163 """Check if task is complete."""
164 pass
165
166
167# Usage - Operator control panel
168def operator_control(agent: AgentWithIntervention):
169 """Simulated operator control interface."""
170
171 # Pause agent
172 agent.controller.pause("operator1", "Reviewing behavior")
173
174 # Check status
175 print(f"Agent state: {agent.controller.state}")
176
177 # Inject instruction
178 agent.controller.inject_instruction(
179 "Skip the current task and move to backup plan",
180 "operator1"
181 )
182
183 # Resume
184 agent.controller.resume("operator1")Progressive Autonomy
Earning Trust Over Time
🐍python
1"""
2Progressive Autonomy
3
4Start with high oversight, reduce as agent proves reliable.
5Trust is earned through successful actions and lost through failures.
6"""
7
8from dataclasses import dataclass, field
9from datetime import datetime, timedelta
10from typing import Callable
11
12
13@dataclass
14class AutonomyLevel:
15 """Defines an autonomy level."""
16 name: str
17 approval_required: list[str] # Action types needing approval
18 allowed_actions: list[str]
19 resource_limits: dict
20 min_trust_score: float
21
22
23@dataclass
24class TrustRecord:
25 """Record of agent trustworthiness."""
26 agent_id: str
27 trust_score: float = 0.5
28 successful_actions: int = 0
29 failed_actions: int = 0
30 escalations: int = 0
31 last_failure: datetime | None = None
32 history: list[tuple[datetime, str, float]] = field(default_factory=list)
33
34
35class ProgressiveAutonomyManager:
36 """Manage progressive autonomy for agents."""
37
38 def __init__(self):
39 self.trust_records: dict[str, TrustRecord] = {}
40
41 # Define autonomy levels
42 self.levels = [
43 AutonomyLevel(
44 name="restricted",
45 approval_required=["*"], # Everything needs approval
46 allowed_actions=["read", "search"],
47 resource_limits={"api_calls": 10, "files": 0},
48 min_trust_score=0.0
49 ),
50 AutonomyLevel(
51 name="supervised",
52 approval_required=["write", "api_call", "execute"],
53 allowed_actions=["read", "search", "analyze"],
54 resource_limits={"api_calls": 50, "files": 5},
55 min_trust_score=0.3
56 ),
57 AutonomyLevel(
58 name="standard",
59 approval_required=["execute", "external_api"],
60 allowed_actions=["read", "search", "analyze", "write"],
61 resource_limits={"api_calls": 100, "files": 20},
62 min_trust_score=0.6
63 ),
64 AutonomyLevel(
65 name="trusted",
66 approval_required=["delete", "system_modify"],
67 allowed_actions=["read", "search", "analyze", "write", "execute"],
68 resource_limits={"api_calls": 500, "files": 100},
69 min_trust_score=0.8
70 ),
71 AutonomyLevel(
72 name="autonomous",
73 approval_required=["critical_system"],
74 allowed_actions=["*"],
75 resource_limits={"api_calls": 1000, "files": 500},
76 min_trust_score=0.95
77 ),
78 ]
79
80 def get_trust(self, agent_id: str) -> TrustRecord:
81 """Get or create trust record for agent."""
82 if agent_id not in self.trust_records:
83 self.trust_records[agent_id] = TrustRecord(agent_id=agent_id)
84 return self.trust_records[agent_id]
85
86 def get_autonomy_level(self, agent_id: str) -> AutonomyLevel:
87 """Get current autonomy level for agent."""
88 trust = self.get_trust(agent_id)
89
90 # Find highest level agent qualifies for
91 qualified_level = self.levels[0]
92 for level in self.levels:
93 if trust.trust_score >= level.min_trust_score:
94 qualified_level = level
95
96 return qualified_level
97
98 def record_success(self, agent_id: str, action_type: str):
99 """Record a successful action."""
100 trust = self.get_trust(agent_id)
101 trust.successful_actions += 1
102
103 # Increase trust (diminishing returns)
104 increase = 0.02 * (1 - trust.trust_score)
105 trust.trust_score = min(1.0, trust.trust_score + increase)
106
107 trust.history.append((
108 datetime.now(),
109 f"success:{action_type}",
110 trust.trust_score
111 ))
112
113 def record_failure(self, agent_id: str, action_type: str, severity: float):
114 """Record a failed action."""
115 trust = self.get_trust(agent_id)
116 trust.failed_actions += 1
117 trust.last_failure = datetime.now()
118
119 # Decrease trust based on severity
120 decrease = 0.1 * severity
121 trust.trust_score = max(0.0, trust.trust_score - decrease)
122
123 trust.history.append((
124 datetime.now(),
125 f"failure:{action_type}",
126 trust.trust_score
127 ))
128
129 def record_escalation(self, agent_id: str):
130 """Record an escalation."""
131 trust = self.get_trust(agent_id)
132 trust.escalations += 1
133
134 # Small penalty for escalations
135 trust.trust_score = max(0.0, trust.trust_score - 0.01)
136
137 def needs_approval(self, agent_id: str, action_type: str) -> bool:
138 """Check if action needs approval given current autonomy."""
139 level = self.get_autonomy_level(agent_id)
140
141 # Check if action type requires approval
142 if "*" in level.approval_required:
143 return True
144 return action_type in level.approval_required
145
146 def is_action_allowed(self, agent_id: str, action_type: str) -> bool:
147 """Check if action is allowed at current level."""
148 level = self.get_autonomy_level(agent_id)
149
150 if "*" in level.allowed_actions:
151 return True
152 return action_type in level.allowed_actions
153
154
155# Usage
156manager = ProgressiveAutonomyManager()
157
158agent_id = "research_agent_1"
159
160# New agent starts restricted
161level = manager.get_autonomy_level(agent_id)
162print(f"Initial level: {level.name}") # "restricted"
163
164# Record successful actions
165for _ in range(20):
166 manager.record_success(agent_id, "search")
167
168level = manager.get_autonomy_level(agent_id)
169print(f"After successes: {level.name}") # "supervised" or higher
170
171# Record a failure
172manager.record_failure(agent_id, "write", severity=0.5)
173level = manager.get_autonomy_level(agent_id)
174print(f"After failure: {level.name}")Key Takeaways
- Approval workflows ensure human sign-off on high-risk or irreversible actions.
- Escalation patterns automatically involve humans when agents are uncertain or detect anomalies.
- Intervention mechanisms allow real-time human control including pause, stop, and instruction injection.
- Progressive autonomy builds trust over time, starting with high oversight and reducing as agents prove reliable.
- Balance autonomy and control - too much oversight defeats the purpose of agents, too little creates risk.
Next Section Preview: We'll explore monitoring and alerting systems for maintaining visibility into agent behavior.