Chapter 18
15 min read
Section 112 of 175

Output Filtering and Moderation

Agent Safety and Guardrails

Introduction

While input validation prevents malicious content from entering the system, output filtering ensures that the agent's responses are safe, appropriate, and don't leak sensitive information. This is crucial for agents that interact with users or external systems.

Section Overview: We'll cover content moderation systems, secret detection, PII protection, and building a comprehensive output filtering pipeline.

Content Moderation

Moderation Categories

CategoryDescriptionAction
HarmfulViolence, self-harm contentBlock
HatefulDiscrimination, slursBlock
SexualExplicit sexual contentBlock/Filter
DeceptiveMisinformation, scamsBlock
IllegalInstructions for illegal activitiesBlock
PersonalPrivate informationRedact
🐍python
1"""
2Content Moderation System
3
4Multi-category moderation for agent outputs.
5"""
6
7from dataclasses import dataclass
8from enum import Enum
9
10
11class ModerationCategory(Enum):
12    HARMFUL = "harmful"
13    HATEFUL = "hateful"
14    SEXUAL = "sexual"
15    DECEPTIVE = "deceptive"
16    ILLEGAL = "illegal"
17    PERSONAL = "personal"
18    SAFE = "safe"
19
20
21class ModerationAction(Enum):
22    ALLOW = "allow"
23    WARN = "warn"
24    REDACT = "redact"
25    BLOCK = "block"
26
27
28@dataclass
29class ModerationResult:
30    """Result of content moderation."""
31    category: ModerationCategory
32    action: ModerationAction
33    confidence: float
34    flagged_content: str | None
35    moderated_content: str
36
37
38class ContentModerator:
39    """Moderate agent output content."""
40
41    def __init__(self):
42        # Category to action mapping
43        self.category_actions = {
44            ModerationCategory.HARMFUL: ModerationAction.BLOCK,
45            ModerationCategory.HATEFUL: ModerationAction.BLOCK,
46            ModerationCategory.SEXUAL: ModerationAction.BLOCK,
47            ModerationCategory.DECEPTIVE: ModerationAction.BLOCK,
48            ModerationCategory.ILLEGAL: ModerationAction.BLOCK,
49            ModerationCategory.PERSONAL: ModerationAction.REDACT,
50            ModerationCategory.SAFE: ModerationAction.ALLOW,
51        }
52
53        # Pattern-based detection (supplement with ML in production)
54        self.harmful_patterns = [
55            r"how to (?:make|build|create) (?:a )?(?:bomb|weapon|explosive)",
56            r"instructions for (?:suicide|self-harm)",
57            r"ways to (?:hurt|harm|kill)",
58        ]
59
60        self.hateful_patterns = [
61            # Patterns for hate speech detection
62        ]
63
64        self.illegal_patterns = [
65            r"how to (?:hack|crack|break into)",
66            r"bypass (?:security|authentication)",
67            r"steal (?:data|information|credentials)",
68        ]
69
70    def moderate(self, content: str) -> ModerationResult:
71        """Moderate content and determine action."""
72        import re
73
74        # Check each category
75        for category, patterns in [
76            (ModerationCategory.HARMFUL, self.harmful_patterns),
77            (ModerationCategory.ILLEGAL, self.illegal_patterns),
78        ]:
79            for pattern in patterns:
80                match = re.search(pattern, content.lower())
81                if match:
82                    action = self.category_actions[category]
83                    return ModerationResult(
84                        category=category,
85                        action=action,
86                        confidence=0.9,
87                        flagged_content=match.group(),
88                        moderated_content=self._apply_action(content, action)
89                    )
90
91        # Default: content is safe
92        return ModerationResult(
93            category=ModerationCategory.SAFE,
94            action=ModerationAction.ALLOW,
95            confidence=0.95,
96            flagged_content=None,
97            moderated_content=content
98        )
99
100    def _apply_action(self, content: str, action: ModerationAction) -> str:
101        """Apply moderation action to content."""
102        if action == ModerationAction.BLOCK:
103            return "[Content blocked due to policy violation]"
104        elif action == ModerationAction.REDACT:
105            return self._redact_sensitive(content)
106        elif action == ModerationAction.WARN:
107            return f"[Warning: This content may be sensitive]\n{content}"
108        return content
109
110    def _redact_sensitive(self, content: str) -> str:
111        """Redact sensitive portions of content."""
112        # Implementation for selective redaction
113        return content
114
115
116class MLModerator:
117    """ML-based content moderation (production)."""
118
119    def __init__(self, model_endpoint: str):
120        self.model_endpoint = model_endpoint
121        self.threshold = 0.7
122
123    def classify(self, content: str) -> dict[ModerationCategory, float]:
124        """Classify content across all categories."""
125        # In production, call ML model
126        # Returns probability for each category
127        return {
128            ModerationCategory.SAFE: 0.95,
129            ModerationCategory.HARMFUL: 0.02,
130            ModerationCategory.HATEFUL: 0.01,
131            # ... other categories
132        }
133
134    def get_highest_risk(
135        self,
136        scores: dict[ModerationCategory, float]
137    ) -> tuple[ModerationCategory, float]:
138        """Get highest risk category."""
139        # Exclude SAFE from risk calculation
140        risk_scores = {
141            k: v for k, v in scores.items()
142            if k != ModerationCategory.SAFE
143        }
144        if not risk_scores:
145            return ModerationCategory.SAFE, 1.0
146
147        highest = max(risk_scores.items(), key=lambda x: x[1])
148        return highest[0], highest[1]

Secret Detection

Preventing Credential Leaks

🐍python
1"""
2Secret Detection System
3
4Prevent agents from accidentally exposing:
5- API keys
6- Passwords
7- Tokens
8- Private keys
9- Database credentials
10"""
11
12import re
13from dataclasses import dataclass
14
15
16@dataclass
17class SecretMatch:
18    """Detected secret in content."""
19    secret_type: str
20    pattern_name: str
21    match: str
22    start: int
23    end: int
24
25
26class SecretDetector:
27    """Detect secrets in agent output."""
28
29    def __init__(self):
30        # Patterns for common secret types
31        self.patterns = {
32            # API Keys
33            "aws_access_key": r"AKIA[0-9A-Z]{16}",
34            "aws_secret_key": r"[A-Za-z0-9/+=]{40}",
35            "github_token": r"ghp_[A-Za-z0-9]{36}",
36            "github_oauth": r"gho_[A-Za-z0-9]{36}",
37            "openai_key": r"sk-[A-Za-z0-9]{48}",
38            "stripe_key": r"sk_(?:live|test)_[A-Za-z0-9]{24,}",
39            "google_api": r"AIza[0-9A-Za-z-_]{35}",
40
41            # Passwords and credentials
42            "password_field": r"(?:password|passwd|pwd)s*[:=]s*['"]?([^'"\s]+)",
43            "bearer_token": r"Bearer\s+[A-Za-z0-9_-]+\.[A-Za-z0-9_-]+\.[A-Za-z0-9_-]+",
44            "basic_auth": r"Basic\s+[A-Za-z0-9+/=]+",
45
46            # Private keys
47            "private_key": r"-----BEGIN (?:RSA |EC |DSA |OPENSSH )?PRIVATE KEY-----",
48            "ssh_key": r"ssh-(?:rsa|dss|ed25519)\s+[A-Za-z0-9+/=]+",
49
50            # Database
51            "connection_string": r"(?:mongodb|postgres|mysql|redis)://[^\s]+",
52            "database_password": r"(?:db|database)_passwords*[:=]s*['"]?([^'"\s]+)",
53
54            # Generic patterns
55            "generic_api_key": r"(?:api[_-]?key|apikey)s*[:=]s*['"]?([A-Za-z0-9_-]{20,})",
56            "generic_secret": r"(?:secret|token)s*[:=]s*['"]?([A-Za-z0-9_-]{20,})",
57        }
58
59    def scan(self, content: str) -> list[SecretMatch]:
60        """Scan content for secrets."""
61        matches = []
62
63        for pattern_name, pattern in self.patterns.items():
64            for match in re.finditer(pattern, content, re.IGNORECASE):
65                matches.append(SecretMatch(
66                    secret_type=self._categorize_secret(pattern_name),
67                    pattern_name=pattern_name,
68                    match=match.group(),
69                    start=match.start(),
70                    end=match.end()
71                ))
72
73        return matches
74
75    def redact(self, content: str, matches: list[SecretMatch]) -> str:
76        """Redact detected secrets from content."""
77        if not matches:
78            return content
79
80        # Sort matches by position (reverse to avoid offset issues)
81        sorted_matches = sorted(matches, key=lambda m: m.start, reverse=True)
82
83        result = content
84        for match in sorted_matches:
85            # Preserve first and last few characters for context
86            redacted = self._create_redaction(match.match, match.secret_type)
87            result = result[:match.start] + redacted + result[match.end:]
88
89        return result
90
91    def _categorize_secret(self, pattern_name: str) -> str:
92        """Categorize the type of secret."""
93        if "aws" in pattern_name:
94            return "aws_credential"
95        elif "github" in pattern_name:
96            return "github_credential"
97        elif "password" in pattern_name:
98            return "password"
99        elif "key" in pattern_name:
100            return "api_key"
101        elif "token" in pattern_name:
102            return "token"
103        return "secret"
104
105    def _create_redaction(self, secret: str, secret_type: str) -> str:
106        """Create a redaction string."""
107        if len(secret) > 8:
108            return f"{secret[:2]}***REDACTED_{secret_type.upper()}***{secret[-2:]}"
109        return f"***REDACTED_{secret_type.upper()}***"
110
111
112# Usage example
113detector = SecretDetector()
114
115test_content = '''
116Here are the API configurations:
117- AWS Key: AKIAIOSFODNN7EXAMPLE
118- OpenAI: sk-1234567890abcdef1234567890abcdef12345678
119- Password: password=supersecret123
120'''
121
122matches = detector.scan(test_content)
123print(f"Found {len(matches)} secrets")
124
125redacted = detector.redact(test_content, matches)
126print(redacted)

PII Protection

Personally Identifiable Information

🐍python
1"""
2PII Protection System
3
4Detect and protect personally identifiable information:
5- Names
6- Email addresses
7- Phone numbers
8- Social Security Numbers
9- Credit card numbers
10- Addresses
11- Medical information
12"""
13
14import re
15from dataclasses import dataclass
16from enum import Enum
17
18
19class PIIType(Enum):
20    EMAIL = "email"
21    PHONE = "phone"
22    SSN = "ssn"
23    CREDIT_CARD = "credit_card"
24    ADDRESS = "address"
25    NAME = "name"
26    DATE_OF_BIRTH = "dob"
27    IP_ADDRESS = "ip_address"
28
29
30@dataclass
31class PIIMatch:
32    """Detected PII in content."""
33    pii_type: PIIType
34    value: str
35    start: int
36    end: int
37    confidence: float
38
39
40class PIIDetector:
41    """Detect PII in agent output."""
42
43    def __init__(self):
44        self.patterns = {
45            PIIType.EMAIL: r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}",
46            PIIType.PHONE: r"(?:\+1)?[-.\s]?\(?[0-9]{3}\)?[-.\s]?[0-9]{3}[-.\s]?[0-9]{4}",
47            PIIType.SSN: r"\b[0-9]{3}-[0-9]{2}-[0-9]{4}\b",
48            PIIType.CREDIT_CARD: r"\b(?:4[0-9]{12}(?:[0-9]{3})?|5[1-5][0-9]{14}|3[47][0-9]{13})\b",
49            PIIType.IP_ADDRESS: r"\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b",
50        }
51
52    def detect(self, content: str) -> list[PIIMatch]:
53        """Detect all PII in content."""
54        matches = []
55
56        for pii_type, pattern in self.patterns.items():
57            for match in re.finditer(pattern, content):
58                # Validate the match (e.g., check Luhn for credit cards)
59                if self._validate_match(pii_type, match.group()):
60                    matches.append(PIIMatch(
61                        pii_type=pii_type,
62                        value=match.group(),
63                        start=match.start(),
64                        end=match.end(),
65                        confidence=self._calculate_confidence(pii_type, match.group())
66                    ))
67
68        return matches
69
70    def _validate_match(self, pii_type: PIIType, value: str) -> bool:
71        """Validate that a match is actually PII."""
72        if pii_type == PIIType.CREDIT_CARD:
73            return self._luhn_check(value.replace("-", "").replace(" ", ""))
74        elif pii_type == PIIType.IP_ADDRESS:
75            parts = value.split(".")
76            return all(0 <= int(p) <= 255 for p in parts)
77        return True
78
79    def _luhn_check(self, card_number: str) -> bool:
80        """Luhn algorithm for credit card validation."""
81        def digits_of(n):
82            return [int(d) for d in str(n)]
83
84        digits = digits_of(card_number)
85        odd_digits = digits[-1::-2]
86        even_digits = digits[-2::-2]
87
88        checksum = sum(odd_digits)
89        for d in even_digits:
90            checksum += sum(digits_of(d * 2))
91
92        return checksum % 10 == 0
93
94    def _calculate_confidence(self, pii_type: PIIType, value: str) -> float:
95        """Calculate confidence that this is actual PII."""
96        # More sophisticated analysis in production
97        return 0.9
98
99
100class PIIProtector:
101    """Protect PII through various strategies."""
102
103    def __init__(self, detector: PIIDetector):
104        self.detector = detector
105        self.protection_strategies = {
106            PIIType.EMAIL: self._mask_email,
107            PIIType.PHONE: self._mask_phone,
108            PIIType.SSN: self._mask_ssn,
109            PIIType.CREDIT_CARD: self._mask_card,
110            PIIType.IP_ADDRESS: self._mask_ip,
111        }
112
113    def protect(self, content: str) -> str:
114        """Detect and protect all PII in content."""
115        matches = self.detector.detect(content)
116
117        if not matches:
118            return content
119
120        # Sort by position (reverse)
121        sorted_matches = sorted(matches, key=lambda m: m.start, reverse=True)
122
123        result = content
124        for match in sorted_matches:
125            strategy = self.protection_strategies.get(match.pii_type)
126            if strategy:
127                masked = strategy(match.value)
128                result = result[:match.start] + masked + result[match.end:]
129
130        return result
131
132    def _mask_email(self, email: str) -> str:
133        """Mask email address."""
134        local, domain = email.split("@")
135        masked_local = local[0] + "***" + local[-1] if len(local) > 2 else "***"
136        return f"{masked_local}@{domain}"
137
138    def _mask_phone(self, phone: str) -> str:
139        """Mask phone number."""
140        digits = re.sub(r"\D", "", phone)
141        return f"***-***-{digits[-4:]}"
142
143    def _mask_ssn(self, ssn: str) -> str:
144        """Mask Social Security Number."""
145        return f"***-**-{ssn[-4:]}"
146
147    def _mask_card(self, card: str) -> str:
148        """Mask credit card number."""
149        digits = re.sub(r"\D", "", card)
150        return f"****-****-****-{digits[-4:]}"
151
152    def _mask_ip(self, ip: str) -> str:
153        """Mask IP address."""
154        parts = ip.split(".")
155        return f"***.***.***.{parts[-1]}"
156
157
158# Usage
159detector = PIIDetector()
160protector = PIIProtector(detector)
161
162content = """
163Contact John at john.doe@example.com or call 555-123-4567.
164His SSN is 123-45-6789 and card number is 4111111111111111.
165"""
166
167protected = protector.protect(content)
168print(protected)

Output Filtering Pipeline

Complete Output Processing

🐍python
1"""
2Complete Output Filtering Pipeline
3
4Combines all output filtering mechanisms into
5a single, comprehensive pipeline.
6"""
7
8from dataclasses import dataclass, field
9from enum import Enum
10from typing import Any
11
12
13class OutputStatus(Enum):
14    APPROVED = "approved"
15    MODIFIED = "modified"
16    BLOCKED = "blocked"
17
18
19@dataclass
20class OutputResult:
21    """Result of output processing."""
22    status: OutputStatus
23    original_output: str
24    processed_output: str
25    modifications: list[str] = field(default_factory=list)
26    block_reasons: list[str] = field(default_factory=list)
27    metadata: dict = field(default_factory=dict)
28
29
30class OutputFilterPipeline:
31    """Complete output filtering pipeline."""
32
33    def __init__(self, config: dict | None = None):
34        self.config = config or {}
35
36        # Initialize filters
37        self.content_moderator = ContentModerator()
38        self.secret_detector = SecretDetector()
39        self.pii_detector = PIIDetector()
40        self.pii_protector = PIIProtector(self.pii_detector)
41
42        # Configuration
43        self.enable_moderation = self.config.get("enable_moderation", True)
44        self.enable_secret_detection = self.config.get("enable_secret_detection", True)
45        self.enable_pii_protection = self.config.get("enable_pii_protection", True)
46
47    def process(self, output: str) -> OutputResult:
48        """Process output through all filters."""
49        modifications = []
50        block_reasons = []
51        processed = output
52
53        # Step 1: Content moderation
54        if self.enable_moderation:
55            mod_result = self.content_moderator.moderate(processed)
56
57            if mod_result.action == ModerationAction.BLOCK:
58                return OutputResult(
59                    status=OutputStatus.BLOCKED,
60                    original_output=output,
61                    processed_output="",
62                    block_reasons=[
63                        f"Content blocked: {mod_result.category.value}"
64                    ]
65                )
66            elif mod_result.action in [ModerationAction.REDACT, ModerationAction.WARN]:
67                processed = mod_result.moderated_content
68                modifications.append(f"Content moderated: {mod_result.category.value}")
69
70        # Step 2: Secret detection and redaction
71        if self.enable_secret_detection:
72            secrets = self.secret_detector.scan(processed)
73
74            if secrets:
75                processed = self.secret_detector.redact(processed, secrets)
76                secret_types = set(s.secret_type for s in secrets)
77                modifications.append(
78                    f"Secrets redacted: {', '.join(secret_types)}"
79                )
80
81        # Step 3: PII protection
82        if self.enable_pii_protection:
83            pii_matches = self.pii_detector.detect(processed)
84
85            if pii_matches:
86                processed = self.pii_protector.protect(processed)
87                pii_types = set(p.pii_type.value for p in pii_matches)
88                modifications.append(
89                    f"PII protected: {', '.join(pii_types)}"
90                )
91
92        # Step 4: Format validation
93        format_result = self._validate_format(processed)
94        if not format_result["valid"]:
95            modifications.extend(format_result["fixes"])
96            processed = format_result["fixed_output"]
97
98        # Determine final status
99        if modifications:
100            status = OutputStatus.MODIFIED
101        else:
102            status = OutputStatus.APPROVED
103
104        return OutputResult(
105            status=status,
106            original_output=output,
107            processed_output=processed,
108            modifications=modifications,
109            block_reasons=block_reasons
110        )
111
112    def _validate_format(self, output: str) -> dict:
113        """Validate output format and fix issues."""
114        issues = []
115        fixed = output
116
117        # Check for unclosed code blocks (triple backtick)
118        triple_tick = chr(96) * 3  # backtick character
119        if fixed.count(triple_tick) % 2 != 0:
120            fixed += "\n" + triple_tick
121            issues.append("Closed unclosed code block")
122
123        # Check for reasonable length
124        max_length = self.config.get("max_output_length", 50000)
125        if len(fixed) > max_length:
126            fixed = fixed[:max_length] + "\n[Output truncated due to length]"
127            issues.append("Truncated excessive output")
128
129        return {
130            "valid": len(issues) == 0,
131            "fixes": issues,
132            "fixed_output": fixed
133        }
134
135
136# Example usage
137pipeline = OutputFilterPipeline({
138    "enable_moderation": True,
139    "enable_secret_detection": True,
140    "enable_pii_protection": True,
141    "max_output_length": 10000
142})
143
144# Process agent output
145agent_output = """
146Here's the configuration:
147- API Key: sk-1234567890abcdef1234567890abcdef12345678
148- Contact: user@example.com
149
150The system is ready.
151"""
152
153result = pipeline.process(agent_output)
154print(f"Status: {result.status}")
155print(f"Modifications: {result.modifications}")
156print(f"Processed:\n{result.processed_output}")

Key Takeaways

  • Content moderation catches harmful, inappropriate, or policy-violating outputs before they reach users.
  • Secret detection prevents accidental exposure of API keys, passwords, and other credentials.
  • PII protection masks personal information to comply with privacy regulations and protect users.
  • Build a pipeline that combines all filters into a comprehensive, configurable system.
  • Log modifications for audit purposes and to understand what filtering was applied.
Next Section Preview: We'll explore action boundaries and permissions - controlling what agents can actually do.