Introduction
While input validation prevents malicious content from entering the system, output filtering ensures that the agent's responses are safe, appropriate, and don't leak sensitive information. This is crucial for agents that interact with users or external systems.
Section Overview: We'll cover content moderation systems, secret detection, PII protection, and building a comprehensive output filtering pipeline.
Content Moderation
Moderation Categories
| Category | Description | Action |
|---|---|---|
| Harmful | Violence, self-harm content | Block |
| Hateful | Discrimination, slurs | Block |
| Sexual | Explicit sexual content | Block/Filter |
| Deceptive | Misinformation, scams | Block |
| Illegal | Instructions for illegal activities | Block |
| Personal | Private information | Redact |
🐍python
1"""
2Content Moderation System
3
4Multi-category moderation for agent outputs.
5"""
6
7from dataclasses import dataclass
8from enum import Enum
9
10
11class ModerationCategory(Enum):
12 HARMFUL = "harmful"
13 HATEFUL = "hateful"
14 SEXUAL = "sexual"
15 DECEPTIVE = "deceptive"
16 ILLEGAL = "illegal"
17 PERSONAL = "personal"
18 SAFE = "safe"
19
20
21class ModerationAction(Enum):
22 ALLOW = "allow"
23 WARN = "warn"
24 REDACT = "redact"
25 BLOCK = "block"
26
27
28@dataclass
29class ModerationResult:
30 """Result of content moderation."""
31 category: ModerationCategory
32 action: ModerationAction
33 confidence: float
34 flagged_content: str | None
35 moderated_content: str
36
37
38class ContentModerator:
39 """Moderate agent output content."""
40
41 def __init__(self):
42 # Category to action mapping
43 self.category_actions = {
44 ModerationCategory.HARMFUL: ModerationAction.BLOCK,
45 ModerationCategory.HATEFUL: ModerationAction.BLOCK,
46 ModerationCategory.SEXUAL: ModerationAction.BLOCK,
47 ModerationCategory.DECEPTIVE: ModerationAction.BLOCK,
48 ModerationCategory.ILLEGAL: ModerationAction.BLOCK,
49 ModerationCategory.PERSONAL: ModerationAction.REDACT,
50 ModerationCategory.SAFE: ModerationAction.ALLOW,
51 }
52
53 # Pattern-based detection (supplement with ML in production)
54 self.harmful_patterns = [
55 r"how to (?:make|build|create) (?:a )?(?:bomb|weapon|explosive)",
56 r"instructions for (?:suicide|self-harm)",
57 r"ways to (?:hurt|harm|kill)",
58 ]
59
60 self.hateful_patterns = [
61 # Patterns for hate speech detection
62 ]
63
64 self.illegal_patterns = [
65 r"how to (?:hack|crack|break into)",
66 r"bypass (?:security|authentication)",
67 r"steal (?:data|information|credentials)",
68 ]
69
70 def moderate(self, content: str) -> ModerationResult:
71 """Moderate content and determine action."""
72 import re
73
74 # Check each category
75 for category, patterns in [
76 (ModerationCategory.HARMFUL, self.harmful_patterns),
77 (ModerationCategory.ILLEGAL, self.illegal_patterns),
78 ]:
79 for pattern in patterns:
80 match = re.search(pattern, content.lower())
81 if match:
82 action = self.category_actions[category]
83 return ModerationResult(
84 category=category,
85 action=action,
86 confidence=0.9,
87 flagged_content=match.group(),
88 moderated_content=self._apply_action(content, action)
89 )
90
91 # Default: content is safe
92 return ModerationResult(
93 category=ModerationCategory.SAFE,
94 action=ModerationAction.ALLOW,
95 confidence=0.95,
96 flagged_content=None,
97 moderated_content=content
98 )
99
100 def _apply_action(self, content: str, action: ModerationAction) -> str:
101 """Apply moderation action to content."""
102 if action == ModerationAction.BLOCK:
103 return "[Content blocked due to policy violation]"
104 elif action == ModerationAction.REDACT:
105 return self._redact_sensitive(content)
106 elif action == ModerationAction.WARN:
107 return f"[Warning: This content may be sensitive]\n{content}"
108 return content
109
110 def _redact_sensitive(self, content: str) -> str:
111 """Redact sensitive portions of content."""
112 # Implementation for selective redaction
113 return content
114
115
116class MLModerator:
117 """ML-based content moderation (production)."""
118
119 def __init__(self, model_endpoint: str):
120 self.model_endpoint = model_endpoint
121 self.threshold = 0.7
122
123 def classify(self, content: str) -> dict[ModerationCategory, float]:
124 """Classify content across all categories."""
125 # In production, call ML model
126 # Returns probability for each category
127 return {
128 ModerationCategory.SAFE: 0.95,
129 ModerationCategory.HARMFUL: 0.02,
130 ModerationCategory.HATEFUL: 0.01,
131 # ... other categories
132 }
133
134 def get_highest_risk(
135 self,
136 scores: dict[ModerationCategory, float]
137 ) -> tuple[ModerationCategory, float]:
138 """Get highest risk category."""
139 # Exclude SAFE from risk calculation
140 risk_scores = {
141 k: v for k, v in scores.items()
142 if k != ModerationCategory.SAFE
143 }
144 if not risk_scores:
145 return ModerationCategory.SAFE, 1.0
146
147 highest = max(risk_scores.items(), key=lambda x: x[1])
148 return highest[0], highest[1]Secret Detection
Preventing Credential Leaks
🐍python
1"""
2Secret Detection System
3
4Prevent agents from accidentally exposing:
5- API keys
6- Passwords
7- Tokens
8- Private keys
9- Database credentials
10"""
11
12import re
13from dataclasses import dataclass
14
15
16@dataclass
17class SecretMatch:
18 """Detected secret in content."""
19 secret_type: str
20 pattern_name: str
21 match: str
22 start: int
23 end: int
24
25
26class SecretDetector:
27 """Detect secrets in agent output."""
28
29 def __init__(self):
30 # Patterns for common secret types
31 self.patterns = {
32 # API Keys
33 "aws_access_key": r"AKIA[0-9A-Z]{16}",
34 "aws_secret_key": r"[A-Za-z0-9/+=]{40}",
35 "github_token": r"ghp_[A-Za-z0-9]{36}",
36 "github_oauth": r"gho_[A-Za-z0-9]{36}",
37 "openai_key": r"sk-[A-Za-z0-9]{48}",
38 "stripe_key": r"sk_(?:live|test)_[A-Za-z0-9]{24,}",
39 "google_api": r"AIza[0-9A-Za-z-_]{35}",
40
41 # Passwords and credentials
42 "password_field": r"(?:password|passwd|pwd)s*[:=]s*['"]?([^'"\s]+)",
43 "bearer_token": r"Bearer\s+[A-Za-z0-9_-]+\.[A-Za-z0-9_-]+\.[A-Za-z0-9_-]+",
44 "basic_auth": r"Basic\s+[A-Za-z0-9+/=]+",
45
46 # Private keys
47 "private_key": r"-----BEGIN (?:RSA |EC |DSA |OPENSSH )?PRIVATE KEY-----",
48 "ssh_key": r"ssh-(?:rsa|dss|ed25519)\s+[A-Za-z0-9+/=]+",
49
50 # Database
51 "connection_string": r"(?:mongodb|postgres|mysql|redis)://[^\s]+",
52 "database_password": r"(?:db|database)_passwords*[:=]s*['"]?([^'"\s]+)",
53
54 # Generic patterns
55 "generic_api_key": r"(?:api[_-]?key|apikey)s*[:=]s*['"]?([A-Za-z0-9_-]{20,})",
56 "generic_secret": r"(?:secret|token)s*[:=]s*['"]?([A-Za-z0-9_-]{20,})",
57 }
58
59 def scan(self, content: str) -> list[SecretMatch]:
60 """Scan content for secrets."""
61 matches = []
62
63 for pattern_name, pattern in self.patterns.items():
64 for match in re.finditer(pattern, content, re.IGNORECASE):
65 matches.append(SecretMatch(
66 secret_type=self._categorize_secret(pattern_name),
67 pattern_name=pattern_name,
68 match=match.group(),
69 start=match.start(),
70 end=match.end()
71 ))
72
73 return matches
74
75 def redact(self, content: str, matches: list[SecretMatch]) -> str:
76 """Redact detected secrets from content."""
77 if not matches:
78 return content
79
80 # Sort matches by position (reverse to avoid offset issues)
81 sorted_matches = sorted(matches, key=lambda m: m.start, reverse=True)
82
83 result = content
84 for match in sorted_matches:
85 # Preserve first and last few characters for context
86 redacted = self._create_redaction(match.match, match.secret_type)
87 result = result[:match.start] + redacted + result[match.end:]
88
89 return result
90
91 def _categorize_secret(self, pattern_name: str) -> str:
92 """Categorize the type of secret."""
93 if "aws" in pattern_name:
94 return "aws_credential"
95 elif "github" in pattern_name:
96 return "github_credential"
97 elif "password" in pattern_name:
98 return "password"
99 elif "key" in pattern_name:
100 return "api_key"
101 elif "token" in pattern_name:
102 return "token"
103 return "secret"
104
105 def _create_redaction(self, secret: str, secret_type: str) -> str:
106 """Create a redaction string."""
107 if len(secret) > 8:
108 return f"{secret[:2]}***REDACTED_{secret_type.upper()}***{secret[-2:]}"
109 return f"***REDACTED_{secret_type.upper()}***"
110
111
112# Usage example
113detector = SecretDetector()
114
115test_content = '''
116Here are the API configurations:
117- AWS Key: AKIAIOSFODNN7EXAMPLE
118- OpenAI: sk-1234567890abcdef1234567890abcdef12345678
119- Password: password=supersecret123
120'''
121
122matches = detector.scan(test_content)
123print(f"Found {len(matches)} secrets")
124
125redacted = detector.redact(test_content, matches)
126print(redacted)PII Protection
Personally Identifiable Information
🐍python
1"""
2PII Protection System
3
4Detect and protect personally identifiable information:
5- Names
6- Email addresses
7- Phone numbers
8- Social Security Numbers
9- Credit card numbers
10- Addresses
11- Medical information
12"""
13
14import re
15from dataclasses import dataclass
16from enum import Enum
17
18
19class PIIType(Enum):
20 EMAIL = "email"
21 PHONE = "phone"
22 SSN = "ssn"
23 CREDIT_CARD = "credit_card"
24 ADDRESS = "address"
25 NAME = "name"
26 DATE_OF_BIRTH = "dob"
27 IP_ADDRESS = "ip_address"
28
29
30@dataclass
31class PIIMatch:
32 """Detected PII in content."""
33 pii_type: PIIType
34 value: str
35 start: int
36 end: int
37 confidence: float
38
39
40class PIIDetector:
41 """Detect PII in agent output."""
42
43 def __init__(self):
44 self.patterns = {
45 PIIType.EMAIL: r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}",
46 PIIType.PHONE: r"(?:\+1)?[-.\s]?\(?[0-9]{3}\)?[-.\s]?[0-9]{3}[-.\s]?[0-9]{4}",
47 PIIType.SSN: r"\b[0-9]{3}-[0-9]{2}-[0-9]{4}\b",
48 PIIType.CREDIT_CARD: r"\b(?:4[0-9]{12}(?:[0-9]{3})?|5[1-5][0-9]{14}|3[47][0-9]{13})\b",
49 PIIType.IP_ADDRESS: r"\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b",
50 }
51
52 def detect(self, content: str) -> list[PIIMatch]:
53 """Detect all PII in content."""
54 matches = []
55
56 for pii_type, pattern in self.patterns.items():
57 for match in re.finditer(pattern, content):
58 # Validate the match (e.g., check Luhn for credit cards)
59 if self._validate_match(pii_type, match.group()):
60 matches.append(PIIMatch(
61 pii_type=pii_type,
62 value=match.group(),
63 start=match.start(),
64 end=match.end(),
65 confidence=self._calculate_confidence(pii_type, match.group())
66 ))
67
68 return matches
69
70 def _validate_match(self, pii_type: PIIType, value: str) -> bool:
71 """Validate that a match is actually PII."""
72 if pii_type == PIIType.CREDIT_CARD:
73 return self._luhn_check(value.replace("-", "").replace(" ", ""))
74 elif pii_type == PIIType.IP_ADDRESS:
75 parts = value.split(".")
76 return all(0 <= int(p) <= 255 for p in parts)
77 return True
78
79 def _luhn_check(self, card_number: str) -> bool:
80 """Luhn algorithm for credit card validation."""
81 def digits_of(n):
82 return [int(d) for d in str(n)]
83
84 digits = digits_of(card_number)
85 odd_digits = digits[-1::-2]
86 even_digits = digits[-2::-2]
87
88 checksum = sum(odd_digits)
89 for d in even_digits:
90 checksum += sum(digits_of(d * 2))
91
92 return checksum % 10 == 0
93
94 def _calculate_confidence(self, pii_type: PIIType, value: str) -> float:
95 """Calculate confidence that this is actual PII."""
96 # More sophisticated analysis in production
97 return 0.9
98
99
100class PIIProtector:
101 """Protect PII through various strategies."""
102
103 def __init__(self, detector: PIIDetector):
104 self.detector = detector
105 self.protection_strategies = {
106 PIIType.EMAIL: self._mask_email,
107 PIIType.PHONE: self._mask_phone,
108 PIIType.SSN: self._mask_ssn,
109 PIIType.CREDIT_CARD: self._mask_card,
110 PIIType.IP_ADDRESS: self._mask_ip,
111 }
112
113 def protect(self, content: str) -> str:
114 """Detect and protect all PII in content."""
115 matches = self.detector.detect(content)
116
117 if not matches:
118 return content
119
120 # Sort by position (reverse)
121 sorted_matches = sorted(matches, key=lambda m: m.start, reverse=True)
122
123 result = content
124 for match in sorted_matches:
125 strategy = self.protection_strategies.get(match.pii_type)
126 if strategy:
127 masked = strategy(match.value)
128 result = result[:match.start] + masked + result[match.end:]
129
130 return result
131
132 def _mask_email(self, email: str) -> str:
133 """Mask email address."""
134 local, domain = email.split("@")
135 masked_local = local[0] + "***" + local[-1] if len(local) > 2 else "***"
136 return f"{masked_local}@{domain}"
137
138 def _mask_phone(self, phone: str) -> str:
139 """Mask phone number."""
140 digits = re.sub(r"\D", "", phone)
141 return f"***-***-{digits[-4:]}"
142
143 def _mask_ssn(self, ssn: str) -> str:
144 """Mask Social Security Number."""
145 return f"***-**-{ssn[-4:]}"
146
147 def _mask_card(self, card: str) -> str:
148 """Mask credit card number."""
149 digits = re.sub(r"\D", "", card)
150 return f"****-****-****-{digits[-4:]}"
151
152 def _mask_ip(self, ip: str) -> str:
153 """Mask IP address."""
154 parts = ip.split(".")
155 return f"***.***.***.{parts[-1]}"
156
157
158# Usage
159detector = PIIDetector()
160protector = PIIProtector(detector)
161
162content = """
163Contact John at john.doe@example.com or call 555-123-4567.
164His SSN is 123-45-6789 and card number is 4111111111111111.
165"""
166
167protected = protector.protect(content)
168print(protected)Output Filtering Pipeline
Complete Output Processing
🐍python
1"""
2Complete Output Filtering Pipeline
3
4Combines all output filtering mechanisms into
5a single, comprehensive pipeline.
6"""
7
8from dataclasses import dataclass, field
9from enum import Enum
10from typing import Any
11
12
13class OutputStatus(Enum):
14 APPROVED = "approved"
15 MODIFIED = "modified"
16 BLOCKED = "blocked"
17
18
19@dataclass
20class OutputResult:
21 """Result of output processing."""
22 status: OutputStatus
23 original_output: str
24 processed_output: str
25 modifications: list[str] = field(default_factory=list)
26 block_reasons: list[str] = field(default_factory=list)
27 metadata: dict = field(default_factory=dict)
28
29
30class OutputFilterPipeline:
31 """Complete output filtering pipeline."""
32
33 def __init__(self, config: dict | None = None):
34 self.config = config or {}
35
36 # Initialize filters
37 self.content_moderator = ContentModerator()
38 self.secret_detector = SecretDetector()
39 self.pii_detector = PIIDetector()
40 self.pii_protector = PIIProtector(self.pii_detector)
41
42 # Configuration
43 self.enable_moderation = self.config.get("enable_moderation", True)
44 self.enable_secret_detection = self.config.get("enable_secret_detection", True)
45 self.enable_pii_protection = self.config.get("enable_pii_protection", True)
46
47 def process(self, output: str) -> OutputResult:
48 """Process output through all filters."""
49 modifications = []
50 block_reasons = []
51 processed = output
52
53 # Step 1: Content moderation
54 if self.enable_moderation:
55 mod_result = self.content_moderator.moderate(processed)
56
57 if mod_result.action == ModerationAction.BLOCK:
58 return OutputResult(
59 status=OutputStatus.BLOCKED,
60 original_output=output,
61 processed_output="",
62 block_reasons=[
63 f"Content blocked: {mod_result.category.value}"
64 ]
65 )
66 elif mod_result.action in [ModerationAction.REDACT, ModerationAction.WARN]:
67 processed = mod_result.moderated_content
68 modifications.append(f"Content moderated: {mod_result.category.value}")
69
70 # Step 2: Secret detection and redaction
71 if self.enable_secret_detection:
72 secrets = self.secret_detector.scan(processed)
73
74 if secrets:
75 processed = self.secret_detector.redact(processed, secrets)
76 secret_types = set(s.secret_type for s in secrets)
77 modifications.append(
78 f"Secrets redacted: {', '.join(secret_types)}"
79 )
80
81 # Step 3: PII protection
82 if self.enable_pii_protection:
83 pii_matches = self.pii_detector.detect(processed)
84
85 if pii_matches:
86 processed = self.pii_protector.protect(processed)
87 pii_types = set(p.pii_type.value for p in pii_matches)
88 modifications.append(
89 f"PII protected: {', '.join(pii_types)}"
90 )
91
92 # Step 4: Format validation
93 format_result = self._validate_format(processed)
94 if not format_result["valid"]:
95 modifications.extend(format_result["fixes"])
96 processed = format_result["fixed_output"]
97
98 # Determine final status
99 if modifications:
100 status = OutputStatus.MODIFIED
101 else:
102 status = OutputStatus.APPROVED
103
104 return OutputResult(
105 status=status,
106 original_output=output,
107 processed_output=processed,
108 modifications=modifications,
109 block_reasons=block_reasons
110 )
111
112 def _validate_format(self, output: str) -> dict:
113 """Validate output format and fix issues."""
114 issues = []
115 fixed = output
116
117 # Check for unclosed code blocks (triple backtick)
118 triple_tick = chr(96) * 3 # backtick character
119 if fixed.count(triple_tick) % 2 != 0:
120 fixed += "\n" + triple_tick
121 issues.append("Closed unclosed code block")
122
123 # Check for reasonable length
124 max_length = self.config.get("max_output_length", 50000)
125 if len(fixed) > max_length:
126 fixed = fixed[:max_length] + "\n[Output truncated due to length]"
127 issues.append("Truncated excessive output")
128
129 return {
130 "valid": len(issues) == 0,
131 "fixes": issues,
132 "fixed_output": fixed
133 }
134
135
136# Example usage
137pipeline = OutputFilterPipeline({
138 "enable_moderation": True,
139 "enable_secret_detection": True,
140 "enable_pii_protection": True,
141 "max_output_length": 10000
142})
143
144# Process agent output
145agent_output = """
146Here's the configuration:
147- API Key: sk-1234567890abcdef1234567890abcdef12345678
148- Contact: user@example.com
149
150The system is ready.
151"""
152
153result = pipeline.process(agent_output)
154print(f"Status: {result.status}")
155print(f"Modifications: {result.modifications}")
156print(f"Processed:\n{result.processed_output}")Key Takeaways
- Content moderation catches harmful, inappropriate, or policy-violating outputs before they reach users.
- Secret detection prevents accidental exposure of API keys, passwords, and other credentials.
- PII protection masks personal information to comply with privacy regulations and protect users.
- Build a pipeline that combines all filters into a comprehensive, configurable system.
- Log modifications for audit purposes and to understand what filtering was applied.
Next Section Preview: We'll explore action boundaries and permissions - controlling what agents can actually do.