Chapter 18
15 min read
Section 114 of 175

Human-in-the-Loop Controls

Agent Safety and Guardrails

Introduction

Human oversight is the ultimate safety mechanism for AI agents. No matter how sophisticated automated guardrails become, certain decisions require human judgment. This section covers patterns for integrating human oversight into agent workflows effectively.

Section Overview: We'll explore approval workflows, escalation patterns, intervention mechanisms, and progressive autonomy systems for human-agent collaboration.

Approval Workflows

When to Require Approval

Action TypeApproval LevelRationale
Read public dataNoneLow risk, reversible
Write to workspaceNone/NotifyLimited scope
External API callsReviewExternal impact
Send communicationsRequiredAffects recipients
Financial actionsRequired + 2FAHigh stakes
System changesRequired + AdminCritical infrastructure
🐍python
1"""
2Approval Workflow System
3
4Implements multi-level approval for agent actions.
5"""
6
7from dataclasses import dataclass, field
8from datetime import datetime, timedelta
9from enum import Enum
10from typing import Callable
11import uuid
12
13
14class ApprovalLevel(Enum):
15    NONE = "none"
16    NOTIFY = "notify"
17    REVIEW = "review"
18    REQUIRED = "required"
19    REQUIRED_2FA = "required_2fa"
20    ADMIN_ONLY = "admin_only"
21
22
23class ApprovalStatus(Enum):
24    PENDING = "pending"
25    APPROVED = "approved"
26    DENIED = "denied"
27    EXPIRED = "expired"
28    AUTO_APPROVED = "auto_approved"
29
30
31@dataclass
32class ApprovalRequest:
33    """A request for human approval."""
34    id: str
35    action_type: str
36    action_details: dict
37    level: ApprovalLevel
38    created_at: datetime
39    expires_at: datetime
40    status: ApprovalStatus = ApprovalStatus.PENDING
41    approver: str | None = None
42    approval_time: datetime | None = None
43    notes: str = ""
44
45
46class ApprovalWorkflow:
47    """Manage approval workflows for agent actions."""
48
49    def __init__(self):
50        self.pending_requests: dict[str, ApprovalRequest] = {}
51        self.approval_handlers: dict[ApprovalLevel, Callable] = {}
52
53        # Configure action -> approval level mapping
54        self.action_levels: dict[str, ApprovalLevel] = {
55            "read_file": ApprovalLevel.NONE,
56            "write_file": ApprovalLevel.NOTIFY,
57            "api_call": ApprovalLevel.REVIEW,
58            "send_email": ApprovalLevel.REQUIRED,
59            "payment": ApprovalLevel.REQUIRED_2FA,
60            "delete_data": ApprovalLevel.ADMIN_ONLY,
61        }
62
63        # Configure timeouts
64        self.timeouts: dict[ApprovalLevel, int] = {
65            ApprovalLevel.NOTIFY: 0,        # Immediate
66            ApprovalLevel.REVIEW: 3600,     # 1 hour
67            ApprovalLevel.REQUIRED: 86400,  # 24 hours
68            ApprovalLevel.REQUIRED_2FA: 300,  # 5 minutes
69            ApprovalLevel.ADMIN_ONLY: 86400,
70        }
71
72    def request_approval(
73        self,
74        action_type: str,
75        action_details: dict
76    ) -> ApprovalRequest:
77        """Create an approval request."""
78
79        level = self.action_levels.get(action_type, ApprovalLevel.REQUIRED)
80
81        # Auto-approve if no approval needed
82        if level == ApprovalLevel.NONE:
83            return ApprovalRequest(
84                id=str(uuid.uuid4()),
85                action_type=action_type,
86                action_details=action_details,
87                level=level,
88                created_at=datetime.now(),
89                expires_at=datetime.now(),
90                status=ApprovalStatus.AUTO_APPROVED
91            )
92
93        # Create pending request
94        timeout = self.timeouts.get(level, 3600)
95        request = ApprovalRequest(
96            id=str(uuid.uuid4()),
97            action_type=action_type,
98            action_details=action_details,
99            level=level,
100            created_at=datetime.now(),
101            expires_at=datetime.now() + timedelta(seconds=timeout)
102        )
103
104        self.pending_requests[request.id] = request
105
106        # Send notification
107        self._notify_approvers(request)
108
109        return request
110
111    def approve(
112        self,
113        request_id: str,
114        approver: str,
115        verification: str | None = None
116    ) -> bool:
117        """Approve a pending request."""
118
119        request = self.pending_requests.get(request_id)
120        if not request:
121            return False
122
123        # Check expiration
124        if datetime.now() > request.expires_at:
125            request.status = ApprovalStatus.EXPIRED
126            return False
127
128        # Check 2FA if required
129        if request.level == ApprovalLevel.REQUIRED_2FA:
130            if not self._verify_2fa(approver, verification):
131                return False
132
133        # Check admin level
134        if request.level == ApprovalLevel.ADMIN_ONLY:
135            if not self._is_admin(approver):
136                return False
137
138        # Approve
139        request.status = ApprovalStatus.APPROVED
140        request.approver = approver
141        request.approval_time = datetime.now()
142
143        return True
144
145    def deny(
146        self,
147        request_id: str,
148        approver: str,
149        reason: str = ""
150    ) -> bool:
151        """Deny a pending request."""
152
153        request = self.pending_requests.get(request_id)
154        if not request:
155            return False
156
157        request.status = ApprovalStatus.DENIED
158        request.approver = approver
159        request.notes = reason
160
161        return True
162
163    def check_status(self, request_id: str) -> ApprovalStatus:
164        """Check the status of an approval request."""
165        request = self.pending_requests.get(request_id)
166        if not request:
167            return ApprovalStatus.EXPIRED
168
169        # Check expiration
170        if (request.status == ApprovalStatus.PENDING and
171            datetime.now() > request.expires_at):
172            request.status = ApprovalStatus.EXPIRED
173
174        return request.status
175
176    def _notify_approvers(self, request: ApprovalRequest):
177        """Send notification to approvers."""
178        # Implementation depends on notification system
179        pass
180
181    def _verify_2fa(self, user: str, code: str | None) -> bool:
182        """Verify 2FA code."""
183        # Implementation depends on 2FA system
184        return code == "123456"  # Placeholder
185
186    def _is_admin(self, user: str) -> bool:
187        """Check if user is admin."""
188        # Implementation depends on auth system
189        return user.endswith("@admin.com")
190
191
192# Usage
193workflow = ApprovalWorkflow()
194
195# Request approval for sending email
196request = workflow.request_approval(
197    "send_email",
198    {"to": "user@example.com", "subject": "Hello"}
199)
200
201print(f"Request ID: {request.id}")
202print(f"Status: {request.status}")
203
204# Simulate approval
205if workflow.approve(request.id, "approver@company.com"):
206    print("Email sending approved!")
207else:
208    print("Approval failed")

Escalation Patterns

When to Escalate

🐍python
1"""
2Escalation Patterns
3
4Automatically escalate issues to humans when:
51. Agent is uncertain
62. Risk threshold exceeded
73. Anomaly detected
84. User requests human
95. Critical decision required
10"""
11
12from dataclasses import dataclass
13from datetime import datetime
14from enum import Enum
15from typing import Any
16
17
18class EscalationReason(Enum):
19    UNCERTAINTY = "uncertainty"
20    HIGH_RISK = "high_risk"
21    ANOMALY = "anomaly"
22    USER_REQUEST = "user_request"
23    CRITICAL_DECISION = "critical_decision"
24    REPEATED_FAILURES = "repeated_failures"
25    POLICY_VIOLATION = "policy_violation"
26
27
28@dataclass
29class EscalationTicket:
30    """Ticket for escalated issue."""
31    id: str
32    reason: EscalationReason
33    context: dict
34    priority: int  # 1-5, 1 is highest
35    created_at: datetime
36    assigned_to: str | None = None
37    resolved: bool = False
38    resolution: str | None = None
39
40
41class EscalationManager:
42    """Manage escalation of agent issues to humans."""
43
44    def __init__(self):
45        self.tickets: dict[str, EscalationTicket] = {}
46        self.escalation_thresholds = {
47            "uncertainty": 0.3,  # Escalate if confidence < 30%
48            "risk": 0.7,        # Escalate if risk > 70%
49            "failures": 3,      # Escalate after 3 failures
50        }
51
52        # Priority mapping
53        self.priority_rules = {
54            EscalationReason.CRITICAL_DECISION: 1,
55            EscalationReason.HIGH_RISK: 2,
56            EscalationReason.POLICY_VIOLATION: 2,
57            EscalationReason.USER_REQUEST: 3,
58            EscalationReason.ANOMALY: 3,
59            EscalationReason.UNCERTAINTY: 4,
60            EscalationReason.REPEATED_FAILURES: 4,
61        }
62
63    def should_escalate(
64        self,
65        reason: EscalationReason,
66        metrics: dict
67    ) -> bool:
68        """Determine if escalation is needed."""
69
70        if reason == EscalationReason.UNCERTAINTY:
71            return metrics.get("confidence", 1.0) < self.escalation_thresholds["uncertainty"]
72
73        elif reason == EscalationReason.HIGH_RISK:
74            return metrics.get("risk_score", 0.0) > self.escalation_thresholds["risk"]
75
76        elif reason == EscalationReason.REPEATED_FAILURES:
77            return metrics.get("failure_count", 0) >= self.escalation_thresholds["failures"]
78
79        elif reason == EscalationReason.USER_REQUEST:
80            return True  # Always escalate user requests
81
82        elif reason == EscalationReason.CRITICAL_DECISION:
83            return True  # Always escalate critical decisions
84
85        return False
86
87    def escalate(
88        self,
89        reason: EscalationReason,
90        context: dict
91    ) -> EscalationTicket:
92        """Create an escalation ticket."""
93        import uuid
94
95        ticket = EscalationTicket(
96            id=str(uuid.uuid4()),
97            reason=reason,
98            context=context,
99            priority=self.priority_rules.get(reason, 5),
100            created_at=datetime.now()
101        )
102
103        self.tickets[ticket.id] = ticket
104
105        # Route to appropriate handler
106        self._route_ticket(ticket)
107
108        return ticket
109
110    def _route_ticket(self, ticket: EscalationTicket):
111        """Route ticket to appropriate handler."""
112        if ticket.priority == 1:
113            # Critical - page on-call
114            self._page_oncall(ticket)
115        elif ticket.priority <= 3:
116            # High priority - immediate notification
117            self._notify_team(ticket)
118        else:
119            # Normal - queue for review
120            self._queue_for_review(ticket)
121
122    def resolve(
123        self,
124        ticket_id: str,
125        resolution: str,
126        action: dict | None = None
127    ) -> bool:
128        """Resolve an escalation ticket."""
129        ticket = self.tickets.get(ticket_id)
130        if not ticket:
131            return False
132
133        ticket.resolved = True
134        ticket.resolution = resolution
135
136        # If action provided, return it to agent
137        if action:
138            self._return_to_agent(ticket, action)
139
140        return True
141
142    def _page_oncall(self, ticket: EscalationTicket):
143        """Page the on-call person."""
144        pass
145
146    def _notify_team(self, ticket: EscalationTicket):
147        """Notify the team."""
148        pass
149
150    def _queue_for_review(self, ticket: EscalationTicket):
151        """Add to review queue."""
152        pass
153
154    def _return_to_agent(self, ticket: EscalationTicket, action: dict):
155        """Return resolution to agent."""
156        pass
157
158
159class UncertaintyEscalator:
160    """Escalate based on agent uncertainty."""
161
162    def __init__(self, manager: EscalationManager):
163        self.manager = manager
164
165    def check_and_escalate(
166        self,
167        decision: str,
168        confidence: float,
169        context: dict
170    ) -> EscalationTicket | None:
171        """Check uncertainty and escalate if needed."""
172
173        if self.manager.should_escalate(
174            EscalationReason.UNCERTAINTY,
175            {"confidence": confidence}
176        ):
177            return self.manager.escalate(
178                EscalationReason.UNCERTAINTY,
179                {
180                    "decision": decision,
181                    "confidence": confidence,
182                    "context": context,
183                    "agent_recommendation": decision,
184                }
185            )
186
187        return None

Intervention Mechanisms

Real-Time Human Intervention

🐍python
1"""
2Intervention Mechanisms
3
4Allow humans to:
51. Pause agent execution
62. Modify agent behavior
73. Override decisions
84. Kill runaway agents
9"""
10
11from dataclasses import dataclass
12from datetime import datetime
13from enum import Enum
14from typing import Callable
15import threading
16
17
18class AgentState(Enum):
19    RUNNING = "running"
20    PAUSED = "paused"
21    STOPPED = "stopped"
22    AWAITING_INPUT = "awaiting_input"
23
24
25@dataclass
26class InterventionEvent:
27    """Record of a human intervention."""
28    timestamp: datetime
29    intervention_type: str
30    reason: str
31    operator: str
32    details: dict
33
34
35class InterventionController:
36    """Control agent execution with human intervention."""
37
38    def __init__(self, agent_id: str):
39        self.agent_id = agent_id
40        self.state = AgentState.RUNNING
41        self.interventions: list[InterventionEvent] = []
42        self._pause_event = threading.Event()
43        self._pause_event.set()  # Not paused initially
44        self._stop_flag = False
45
46    def pause(self, operator: str, reason: str):
47        """Pause agent execution."""
48        self.state = AgentState.PAUSED
49        self._pause_event.clear()
50
51        self.interventions.append(InterventionEvent(
52            timestamp=datetime.now(),
53            intervention_type="pause",
54            reason=reason,
55            operator=operator,
56            details={}
57        ))
58
59    def resume(self, operator: str):
60        """Resume agent execution."""
61        if self.state == AgentState.PAUSED:
62            self.state = AgentState.RUNNING
63            self._pause_event.set()
64
65            self.interventions.append(InterventionEvent(
66                timestamp=datetime.now(),
67                intervention_type="resume",
68                reason="Manual resume",
69                operator=operator,
70                details={}
71            ))
72
73    def stop(self, operator: str, reason: str):
74        """Stop agent completely."""
75        self.state = AgentState.STOPPED
76        self._stop_flag = True
77        self._pause_event.set()  # Release any waiting
78
79        self.interventions.append(InterventionEvent(
80            timestamp=datetime.now(),
81            intervention_type="stop",
82            reason=reason,
83            operator=operator,
84            details={}
85        ))
86
87    def check_point(self) -> bool:
88        """Check if agent should continue (call before each action)."""
89        # Check for stop
90        if self._stop_flag:
91            return False
92
93        # Wait if paused
94        self._pause_event.wait()
95
96        return not self._stop_flag
97
98    def inject_instruction(
99        self,
100        instruction: str,
101        operator: str
102    ):
103        """Inject an instruction into the agent."""
104        self.interventions.append(InterventionEvent(
105            timestamp=datetime.now(),
106            intervention_type="inject",
107            reason="Operator instruction",
108            operator=operator,
109            details={"instruction": instruction}
110        ))
111
112        # Implementation would send to agent's message queue
113        pass
114
115
116class AgentWithIntervention:
117    """Agent that supports human intervention."""
118
119    def __init__(self, agent_id: str):
120        self.agent_id = agent_id
121        self.controller = InterventionController(agent_id)
122        self.message_queue: list[str] = []
123
124    def run(self, task: str):
125        """Run the agent with intervention checkpoints."""
126
127        while True:
128            # Check if we should continue
129            if not self.controller.check_point():
130                print(f"Agent {self.agent_id} stopped")
131                break
132
133            # Check for injected messages
134            if self.message_queue:
135                message = self.message_queue.pop(0)
136                self._process_injected_message(message)
137
138            # Normal agent iteration
139            action = self._decide_next_action()
140
141            # Another checkpoint before action
142            if not self.controller.check_point():
143                break
144
145            result = self._execute_action(action)
146
147            if self._is_complete(result):
148                break
149
150    def _process_injected_message(self, message: str):
151        """Process a message injected by operator."""
152        print(f"Processing injected: {message}")
153
154    def _decide_next_action(self) -> dict:
155        """Decide next action."""
156        pass
157
158    def _execute_action(self, action: dict) -> dict:
159        """Execute an action."""
160        pass
161
162    def _is_complete(self, result: dict) -> bool:
163        """Check if task is complete."""
164        pass
165
166
167# Usage - Operator control panel
168def operator_control(agent: AgentWithIntervention):
169    """Simulated operator control interface."""
170
171    # Pause agent
172    agent.controller.pause("operator1", "Reviewing behavior")
173
174    # Check status
175    print(f"Agent state: {agent.controller.state}")
176
177    # Inject instruction
178    agent.controller.inject_instruction(
179        "Skip the current task and move to backup plan",
180        "operator1"
181    )
182
183    # Resume
184    agent.controller.resume("operator1")

Progressive Autonomy

Earning Trust Over Time

🐍python
1"""
2Progressive Autonomy
3
4Start with high oversight, reduce as agent proves reliable.
5Trust is earned through successful actions and lost through failures.
6"""
7
8from dataclasses import dataclass, field
9from datetime import datetime, timedelta
10from typing import Callable
11
12
13@dataclass
14class AutonomyLevel:
15    """Defines an autonomy level."""
16    name: str
17    approval_required: list[str]  # Action types needing approval
18    allowed_actions: list[str]
19    resource_limits: dict
20    min_trust_score: float
21
22
23@dataclass
24class TrustRecord:
25    """Record of agent trustworthiness."""
26    agent_id: str
27    trust_score: float = 0.5
28    successful_actions: int = 0
29    failed_actions: int = 0
30    escalations: int = 0
31    last_failure: datetime | None = None
32    history: list[tuple[datetime, str, float]] = field(default_factory=list)
33
34
35class ProgressiveAutonomyManager:
36    """Manage progressive autonomy for agents."""
37
38    def __init__(self):
39        self.trust_records: dict[str, TrustRecord] = {}
40
41        # Define autonomy levels
42        self.levels = [
43            AutonomyLevel(
44                name="restricted",
45                approval_required=["*"],  # Everything needs approval
46                allowed_actions=["read", "search"],
47                resource_limits={"api_calls": 10, "files": 0},
48                min_trust_score=0.0
49            ),
50            AutonomyLevel(
51                name="supervised",
52                approval_required=["write", "api_call", "execute"],
53                allowed_actions=["read", "search", "analyze"],
54                resource_limits={"api_calls": 50, "files": 5},
55                min_trust_score=0.3
56            ),
57            AutonomyLevel(
58                name="standard",
59                approval_required=["execute", "external_api"],
60                allowed_actions=["read", "search", "analyze", "write"],
61                resource_limits={"api_calls": 100, "files": 20},
62                min_trust_score=0.6
63            ),
64            AutonomyLevel(
65                name="trusted",
66                approval_required=["delete", "system_modify"],
67                allowed_actions=["read", "search", "analyze", "write", "execute"],
68                resource_limits={"api_calls": 500, "files": 100},
69                min_trust_score=0.8
70            ),
71            AutonomyLevel(
72                name="autonomous",
73                approval_required=["critical_system"],
74                allowed_actions=["*"],
75                resource_limits={"api_calls": 1000, "files": 500},
76                min_trust_score=0.95
77            ),
78        ]
79
80    def get_trust(self, agent_id: str) -> TrustRecord:
81        """Get or create trust record for agent."""
82        if agent_id not in self.trust_records:
83            self.trust_records[agent_id] = TrustRecord(agent_id=agent_id)
84        return self.trust_records[agent_id]
85
86    def get_autonomy_level(self, agent_id: str) -> AutonomyLevel:
87        """Get current autonomy level for agent."""
88        trust = self.get_trust(agent_id)
89
90        # Find highest level agent qualifies for
91        qualified_level = self.levels[0]
92        for level in self.levels:
93            if trust.trust_score >= level.min_trust_score:
94                qualified_level = level
95
96        return qualified_level
97
98    def record_success(self, agent_id: str, action_type: str):
99        """Record a successful action."""
100        trust = self.get_trust(agent_id)
101        trust.successful_actions += 1
102
103        # Increase trust (diminishing returns)
104        increase = 0.02 * (1 - trust.trust_score)
105        trust.trust_score = min(1.0, trust.trust_score + increase)
106
107        trust.history.append((
108            datetime.now(),
109            f"success:{action_type}",
110            trust.trust_score
111        ))
112
113    def record_failure(self, agent_id: str, action_type: str, severity: float):
114        """Record a failed action."""
115        trust = self.get_trust(agent_id)
116        trust.failed_actions += 1
117        trust.last_failure = datetime.now()
118
119        # Decrease trust based on severity
120        decrease = 0.1 * severity
121        trust.trust_score = max(0.0, trust.trust_score - decrease)
122
123        trust.history.append((
124            datetime.now(),
125            f"failure:{action_type}",
126            trust.trust_score
127        ))
128
129    def record_escalation(self, agent_id: str):
130        """Record an escalation."""
131        trust = self.get_trust(agent_id)
132        trust.escalations += 1
133
134        # Small penalty for escalations
135        trust.trust_score = max(0.0, trust.trust_score - 0.01)
136
137    def needs_approval(self, agent_id: str, action_type: str) -> bool:
138        """Check if action needs approval given current autonomy."""
139        level = self.get_autonomy_level(agent_id)
140
141        # Check if action type requires approval
142        if "*" in level.approval_required:
143            return True
144        return action_type in level.approval_required
145
146    def is_action_allowed(self, agent_id: str, action_type: str) -> bool:
147        """Check if action is allowed at current level."""
148        level = self.get_autonomy_level(agent_id)
149
150        if "*" in level.allowed_actions:
151            return True
152        return action_type in level.allowed_actions
153
154
155# Usage
156manager = ProgressiveAutonomyManager()
157
158agent_id = "research_agent_1"
159
160# New agent starts restricted
161level = manager.get_autonomy_level(agent_id)
162print(f"Initial level: {level.name}")  # "restricted"
163
164# Record successful actions
165for _ in range(20):
166    manager.record_success(agent_id, "search")
167
168level = manager.get_autonomy_level(agent_id)
169print(f"After successes: {level.name}")  # "supervised" or higher
170
171# Record a failure
172manager.record_failure(agent_id, "write", severity=0.5)
173level = manager.get_autonomy_level(agent_id)
174print(f"After failure: {level.name}")

Key Takeaways

  • Approval workflows ensure human sign-off on high-risk or irreversible actions.
  • Escalation patterns automatically involve humans when agents are uncertain or detect anomalies.
  • Intervention mechanisms allow real-time human control including pause, stop, and instruction injection.
  • Progressive autonomy builds trust over time, starting with high oversight and reducing as agents prove reliable.
  • Balance autonomy and control - too much oversight defeats the purpose of agents, too little creates risk.
Next Section Preview: We'll explore monitoring and alerting systems for maintaining visibility into agent behavior.