Chapter 8
15 min read
Section 50 of 175

MCP Security Considerations

Model Context Protocol (MCP)

Introduction

MCP servers expose powerful capabilitiesβ€”file access, code execution, API calls, database queries. When an AI agent can invoke these tools, security becomes critical. A compromised or poorly designed MCP server can lead to data breaches, unauthorized access, or system compromise.

Security Principle: Every MCP tool is an attack surface. Design with the assumption that the AI might be manipulated through prompt injection or that inputs could be malicious.
MCP servers run with the permissions of the user who started them. A file system server running as root can read/write any file. A database server with admin credentials can drop tables. Principle of least privilege is essential.

MCP Threat Model

Understanding the threat landscape helps prioritize security measures:

Attack Vectors

VectorDescriptionExample
Prompt injectionMalicious input manipulates AI behaviorUser tricks AI into reading sensitive files
Parameter manipulationCrafted tool arguments bypass validationPath traversal: ../../../etc/passwd
Tool confusionAI calls wrong tool or with wrong paramsdelete_file instead of read_file
Resource exhaustionOverwhelming server with requestsInfinite loop in tool execution
Information disclosureTool reveals sensitive informationDatabase query returns passwords

Trust Boundaries

πŸ“trust_boundaries.txt
1MCP TRUST BOUNDARIES
2
3β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
4β”‚                    UNTRUSTED ZONE                           β”‚
5β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                                        β”‚
6β”‚  β”‚   User Input    β”‚ ◀── Prompt injection, malicious data   β”‚
7β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”˜                                        β”‚
8β”‚           β”‚                                                 β”‚
9β”‚           β–Ό                                                 β”‚
10β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                                        β”‚
11β”‚  β”‚    LLM Agent    β”‚ ◀── May be manipulated, hallucinate    β”‚
12β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”˜                                        β”‚
13β”‚           β”‚ Tool calls (untrusted)                          β”‚
14β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
15            β”‚
16β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
17β”‚              VALIDATION BOUNDARY                            β”‚
18β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                                        β”‚
19β”‚  β”‚  Input Validatorβ”‚ ◀── MUST validate ALL tool inputs      β”‚
20β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”˜                                        β”‚
21β”‚           β”‚                                                 β”‚
22β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
23            β”‚
24β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
25β”‚                    TRUSTED ZONE                             β”‚
26β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                                        β”‚
27β”‚  β”‚   MCP Server    β”‚ ◀── Executes with server privileges    β”‚
28β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”˜                                        β”‚
29β”‚           β”‚                                                 β”‚
30β”‚           β–Ό                                                 β”‚
31β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                                        β”‚
32β”‚  β”‚ System Resourcesβ”‚ ◀── Files, APIs, databases, etc.       β”‚
33β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                                        β”‚
34β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
35
36KEY INSIGHT: Never trust LLM-generated tool calls.
37Validate everything at the server boundary.

Input Validation

Every tool input must be validatedβ€”treat all inputs as potentially malicious:

Schema Validation

🐍schema_validation.py
1from pydantic import BaseModel, Field, validator
2from typing import Optional
3import re
4
5class ReadFileInput(BaseModel):
6    """Validated input for read_file tool."""
7
8    path: str = Field(
9        ...,
10        min_length=1,
11        max_length=4096,
12        description="Path to file to read"
13    )
14
15    @validator("path")
16    def validate_path(cls, v):
17        # Reject null bytes (injection attempt)
18        if "\x00" in v:
19            raise ValueError("Null bytes not allowed in path")
20
21        # Reject path traversal
22        if ".." in v:
23            raise ValueError("Path traversal not allowed")
24
25        # Must be absolute path
26        if not v.startswith("/"):
27            raise ValueError("Path must be absolute")
28
29        # Reject shell metacharacters
30        if re.search(r"[;&|$`]", v):
31            raise ValueError("Shell metacharacters not allowed")
32
33        return v
34
35class SQLQueryInput(BaseModel):
36    """Validated input for SQL query tool."""
37
38    query: str = Field(
39        ...,
40        max_length=10000,
41        description="SQL query to execute"
42    )
43    params: Optional[list] = Field(
44        default=[],
45        description="Query parameters (for parameterized queries)"
46    )
47
48    @validator("query")
49    def validate_query(cls, v):
50        # Only allow SELECT queries
51        v_upper = v.strip().upper()
52        if not v_upper.startswith("SELECT"):
53            raise ValueError("Only SELECT queries allowed")
54
55        # Block dangerous keywords
56        dangerous = ["DROP", "DELETE", "INSERT", "UPDATE", "ALTER", "TRUNCATE"]
57        for keyword in dangerous:
58            if keyword in v_upper:
59                raise ValueError(f"{keyword} operations not allowed")
60
61        return v

Path Validation

🐍path_validation.py
1from pathlib import Path
2from typing import Optional
3
4class SecurePathValidator:
5    """Validate and sanitize file paths."""
6
7    def __init__(self, allowed_roots: list[Path]):
8        self.allowed_roots = [root.resolve() for root in allowed_roots]
9
10    def validate(self, path_str: str) -> Path:
11        """Validate path is safe and within allowed roots."""
12
13        # Convert to Path and resolve to absolute
14        try:
15            path = Path(path_str).resolve()
16        except Exception as e:
17            raise ValueError(f"Invalid path: {e}")
18
19        # Check against allowed roots
20        if not self._is_within_allowed_roots(path):
21            raise PermissionError(
22                f"Access denied: {path} is outside allowed directories"
23            )
24
25        # Check for symlink escape
26        if self._has_symlink_escape(path):
27            raise PermissionError(
28                "Symlink escape detected"
29            )
30
31        return path
32
33    def _is_within_allowed_roots(self, path: Path) -> bool:
34        """Check if path is within any allowed root."""
35        for root in self.allowed_roots:
36            try:
37                path.relative_to(root)
38                return True
39            except ValueError:
40                continue
41        return False
42
43    def _has_symlink_escape(self, path: Path) -> bool:
44        """Check if symlinks lead outside allowed roots."""
45        # Walk up the path checking each component
46        current = path
47        while current != current.parent:
48            if current.is_symlink():
49                resolved = current.resolve()
50                if not self._is_within_allowed_roots(resolved):
51                    return True
52            current = current.parent
53        return False
54
55
56# Usage
57validator = SecurePathValidator([
58    Path.home() / "projects",
59    Path("/tmp/sandbox")
60])
61
62try:
63    safe_path = validator.validate("/home/user/projects/file.txt")
64    # Safe to use
65except (ValueError, PermissionError) as e:
66    # Reject the request
67    print(f"Path validation failed: {e}")

Access Control

Implement proper authorization for sensitive operations:

Permission-Based Access

🐍access_control.py
1from enum import Flag, auto
2from dataclasses import dataclass
3from typing import Callable
4
5class Permission(Flag):
6    """Permissions for MCP operations."""
7    READ_FILES = auto()
8    WRITE_FILES = auto()
9    EXECUTE_CODE = auto()
10    NETWORK_ACCESS = auto()
11    DATABASE_READ = auto()
12    DATABASE_WRITE = auto()
13    ADMIN = READ_FILES | WRITE_FILES | EXECUTE_CODE | NETWORK_ACCESS | DATABASE_READ | DATABASE_WRITE
14
15@dataclass
16class AccessContext:
17    """Context for access control decisions."""
18    user_id: str
19    permissions: Permission
20    allowed_paths: list[str]
21    rate_limit: int
22
23class ToolAccessControl:
24    """Access control for MCP tools."""
25
26    def __init__(self):
27        self._tool_permissions: dict[str, Permission] = {}
28
29    def require_permission(self, permission: Permission):
30        """Decorator to require specific permission for a tool."""
31        def decorator(func: Callable):
32            self._tool_permissions[func.__name__] = permission
33
34            async def wrapper(context: AccessContext, *args, **kwargs):
35                if not (context.permissions & permission):
36                    raise PermissionError(
37                        f"Missing permission: {permission.name}"
38                    )
39                return await func(*args, **kwargs)
40
41            return wrapper
42        return decorator
43
44
45# Usage example
46access_control = ToolAccessControl()
47
48@access_control.require_permission(Permission.READ_FILES)
49async def read_file(path: str) -> str:
50    """Read a file (requires READ_FILES permission)."""
51    return Path(path).read_text()
52
53@access_control.require_permission(Permission.WRITE_FILES)
54async def write_file(path: str, content: str) -> str:
55    """Write a file (requires WRITE_FILES permission)."""
56    Path(path).write_text(content)
57    return f"Written {len(content)} bytes"
58
59@access_control.require_permission(Permission.EXECUTE_CODE)
60async def run_python(code: str) -> str:
61    """Execute Python code (requires EXECUTE_CODE permission)."""
62    # Dangerous! See sandboxing section
63    exec(code)
64    return "Executed"

Human-in-the-Loop Approval

🐍approval_flow.py
1from dataclasses import dataclass
2from typing import Callable, Awaitable
3from enum import Enum
4
5class ApprovalLevel(Enum):
6    NONE = "none"           # No approval needed
7    NOTIFY = "notify"       # Notify user but proceed
8    CONFIRM = "confirm"     # Require explicit approval
9    BLOCK = "block"         # Always block
10
11@dataclass
12class ApprovalRequest:
13    tool_name: str
14    arguments: dict
15    risk_assessment: str
16    level: ApprovalLevel
17
18class ApprovalGateway:
19    """Gate dangerous operations through human approval."""
20
21    def __init__(
22        self,
23        notify_callback: Callable[[str], Awaitable[None]],
24        confirm_callback: Callable[[ApprovalRequest], Awaitable[bool]]
25    ):
26        self.notify = notify_callback
27        self.confirm = confirm_callback
28
29        # Define approval requirements per tool
30        self.tool_levels = {
31            "read_file": ApprovalLevel.NONE,
32            "write_file": ApprovalLevel.CONFIRM,
33            "delete_file": ApprovalLevel.CONFIRM,
34            "run_command": ApprovalLevel.CONFIRM,
35            "send_email": ApprovalLevel.CONFIRM,
36            "database_write": ApprovalLevel.CONFIRM,
37        }
38
39    async def check_approval(
40        self,
41        tool_name: str,
42        arguments: dict
43    ) -> bool:
44        """Check if tool execution is approved."""
45
46        level = self.tool_levels.get(tool_name, ApprovalLevel.CONFIRM)
47
48        if level == ApprovalLevel.NONE:
49            return True
50
51        if level == ApprovalLevel.BLOCK:
52            return False
53
54        request = ApprovalRequest(
55            tool_name=tool_name,
56            arguments=arguments,
57            risk_assessment=self._assess_risk(tool_name, arguments),
58            level=level
59        )
60
61        if level == ApprovalLevel.NOTIFY:
62            await self.notify(
63                f"Executing {tool_name} with {arguments}"
64            )
65            return True
66
67        if level == ApprovalLevel.CONFIRM:
68            return await self.confirm(request)
69
70        return False
71
72    def _assess_risk(self, tool_name: str, arguments: dict) -> str:
73        """Assess risk level of the operation."""
74        risks = []
75
76        if tool_name in ["delete_file", "run_command"]:
77            risks.append("This operation can cause data loss")
78
79        if "password" in str(arguments).lower():
80            risks.append("Arguments may contain sensitive data")
81
82        return "; ".join(risks) if risks else "Low risk operation"

Sandboxing and Isolation

Code execution tools require strict sandboxing:

Docker-Based Sandbox

🐍docker_sandbox.py
1import docker
2import tempfile
3import os
4from dataclasses import dataclass
5
6@dataclass
7class SandboxConfig:
8    """Configuration for code sandbox."""
9    memory_limit: str = "256m"
10    cpu_period: int = 100000
11    cpu_quota: int = 50000  # 50% of one CPU
12    timeout: int = 30
13    network_disabled: bool = True
14    read_only: bool = True
15
16class DockerSandbox:
17    """Execute code in isolated Docker container."""
18
19    def __init__(self, config: SandboxConfig = None):
20        self.config = config or SandboxConfig()
21        self.client = docker.from_env()
22
23    async def execute_python(self, code: str) -> dict:
24        """Execute Python code in sandbox."""
25
26        # Create temporary directory for code
27        with tempfile.TemporaryDirectory() as tmpdir:
28            code_file = os.path.join(tmpdir, "script.py")
29            with open(code_file, "w") as f:
30                f.write(code)
31
32            try:
33                result = self.client.containers.run(
34                    "python:3.11-slim",
35                    command=["python", "/code/script.py"],
36                    volumes={
37                        tmpdir: {"bind": "/code", "mode": "ro"}
38                    },
39                    mem_limit=self.config.memory_limit,
40                    cpu_period=self.config.cpu_period,
41                    cpu_quota=self.config.cpu_quota,
42                    network_disabled=self.config.network_disabled,
43                    read_only=self.config.read_only,
44                    remove=True,
45                    stdout=True,
46                    stderr=True,
47                    detach=False,
48                    # Security options
49                    security_opt=["no-new-privileges:true"],
50                    cap_drop=["ALL"],
51                )
52
53                return {
54                    "success": True,
55                    "output": result.decode("utf-8"),
56                    "error": None
57                }
58
59            except docker.errors.ContainerError as e:
60                return {
61                    "success": False,
62                    "output": e.container.logs().decode("utf-8"),
63                    "error": str(e)
64                }
65
66            except Exception as e:
67                return {
68                    "success": False,
69                    "output": None,
70                    "error": str(e)
71                }

Resource Limits

🐍resource_limits.py
1import resource
2import signal
3import asyncio
4from functools import wraps
5
6class ResourceLimiter:
7    """Limit resources for tool execution."""
8
9    def __init__(
10        self,
11        max_memory_mb: int = 256,
12        max_time_seconds: int = 30,
13        max_file_size_mb: int = 10
14    ):
15        self.max_memory = max_memory_mb * 1024 * 1024
16        self.max_time = max_time_seconds
17        self.max_file_size = max_file_size_mb * 1024 * 1024
18
19    def limit(self, func):
20        """Decorator to apply resource limits."""
21
22        @wraps(func)
23        async def wrapper(*args, **kwargs):
24            # Set memory limit
25            resource.setrlimit(
26                resource.RLIMIT_AS,
27                (self.max_memory, self.max_memory)
28            )
29
30            # Set file size limit
31            resource.setrlimit(
32                resource.RLIMIT_FSIZE,
33                (self.max_file_size, self.max_file_size)
34            )
35
36            # Execute with timeout
37            try:
38                return await asyncio.wait_for(
39                    func(*args, **kwargs),
40                    timeout=self.max_time
41                )
42            except asyncio.TimeoutError:
43                raise TimeoutError(
44                    f"Tool execution exceeded {self.max_time}s limit"
45                )
46
47        return wrapper
48
49
50# Usage
51limiter = ResourceLimiter(
52    max_memory_mb=128,
53    max_time_seconds=10
54)
55
56@limiter.limit
57async def execute_untrusted_code(code: str) -> str:
58    """Execute code with resource limits."""
59    # Implementation
60    pass

Secrets Management

MCP servers often need API keys and credentials:

Environment-Based Configuration

🐍secrets_management.py
1import os
2from dataclasses import dataclass
3from typing import Optional
4
5@dataclass
6class ServerSecrets:
7    """Secrets for MCP server."""
8    github_token: Optional[str] = None
9    database_url: Optional[str] = None
10    api_key: Optional[str] = None
11
12    @classmethod
13    def from_environment(cls) -> "ServerSecrets":
14        """Load secrets from environment variables."""
15        return cls(
16            github_token=os.environ.get("GITHUB_TOKEN"),
17            database_url=os.environ.get("DATABASE_URL"),
18            api_key=os.environ.get("API_KEY"),
19        )
20
21    def validate(self) -> list[str]:
22        """Validate required secrets are present."""
23        missing = []
24        if not self.github_token:
25            missing.append("GITHUB_TOKEN")
26        if not self.database_url:
27            missing.append("DATABASE_URL")
28        return missing
29
30
31# Never expose secrets in tool responses
32def sanitize_response(response: str, secrets: ServerSecrets) -> str:
33    """Remove any secrets that might appear in output."""
34    result = response
35
36    secret_values = [
37        secrets.github_token,
38        secrets.database_url,
39        secrets.api_key,
40    ]
41
42    for secret in secret_values:
43        if secret and secret in result:
44            result = result.replace(secret, "[REDACTED]")
45
46    return result

Scoped Credentials

🐍scoped_credentials.py
1from dataclasses import dataclass
2from datetime import datetime, timedelta
3from typing import Optional
4
5@dataclass
6class ScopedCredential:
7    """Time-limited, scope-restricted credential."""
8    token: str
9    scopes: list[str]
10    expires_at: datetime
11    created_for: str  # Tool or operation this was created for
12
13    def is_valid(self) -> bool:
14        return datetime.now() < self.expires_at
15
16    def has_scope(self, required_scope: str) -> bool:
17        return required_scope in self.scopes or "*" in self.scopes
18
19
20class CredentialManager:
21    """Manage scoped, temporary credentials."""
22
23    def __init__(self, master_token: str):
24        self._master_token = master_token
25        self._issued: list[ScopedCredential] = []
26
27    def issue_scoped_token(
28        self,
29        scopes: list[str],
30        duration: timedelta,
31        purpose: str
32    ) -> ScopedCredential:
33        """Issue a limited credential for specific operation."""
34
35        # In practice, create a real scoped token via API
36        # This is pseudocode
37        scoped_token = self._create_scoped_token(scopes)
38
39        credential = ScopedCredential(
40            token=scoped_token,
41            scopes=scopes,
42            expires_at=datetime.now() + duration,
43            created_for=purpose
44        )
45
46        self._issued.append(credential)
47        return credential
48
49    def revoke_expired(self) -> int:
50        """Revoke all expired credentials."""
51        now = datetime.now()
52        expired = [c for c in self._issued if not c.is_valid()]
53
54        for cred in expired:
55            self._revoke_token(cred.token)
56            self._issued.remove(cred)
57
58        return len(expired)
59
60    def _create_scoped_token(self, scopes: list[str]) -> str:
61        # API-specific implementation
62        pass
63
64    def _revoke_token(self, token: str) -> None:
65        # API-specific implementation
66        pass

Audit Logging

Comprehensive logging is essential for security monitoring:

🐍audit_logging.py
1import logging
2import json
3from datetime import datetime
4from dataclasses import dataclass, asdict
5from typing import Any, Optional
6
7@dataclass
8class AuditEvent:
9    """Structured audit log event."""
10    timestamp: str
11    event_type: str
12    tool_name: str
13    arguments: dict
14    result_type: str  # success, error, denied
15    user_id: Optional[str]
16    session_id: str
17    duration_ms: int
18    error_message: Optional[str] = None
19
20    def to_json(self) -> str:
21        return json.dumps(asdict(self))
22
23
24class AuditLogger:
25    """Security audit logging for MCP operations."""
26
27    def __init__(self, log_file: str = "mcp_audit.log"):
28        self.logger = logging.getLogger("mcp_audit")
29        self.logger.setLevel(logging.INFO)
30
31        handler = logging.FileHandler(log_file)
32        handler.setFormatter(logging.Formatter("%(message)s"))
33        self.logger.addHandler(handler)
34
35    def log_tool_call(
36        self,
37        tool_name: str,
38        arguments: dict,
39        result_type: str,
40        duration_ms: int,
41        session_id: str,
42        user_id: Optional[str] = None,
43        error_message: Optional[str] = None
44    ):
45        """Log a tool call event."""
46
47        # Sanitize arguments (remove sensitive data)
48        safe_args = self._sanitize_arguments(arguments)
49
50        event = AuditEvent(
51            timestamp=datetime.utcnow().isoformat(),
52            event_type="tool_call",
53            tool_name=tool_name,
54            arguments=safe_args,
55            result_type=result_type,
56            user_id=user_id,
57            session_id=session_id,
58            duration_ms=duration_ms,
59            error_message=error_message
60        )
61
62        self.logger.info(event.to_json())
63
64    def log_access_denied(
65        self,
66        tool_name: str,
67        arguments: dict,
68        reason: str,
69        session_id: str
70    ):
71        """Log an access denied event."""
72
73        event = AuditEvent(
74            timestamp=datetime.utcnow().isoformat(),
75            event_type="access_denied",
76            tool_name=tool_name,
77            arguments=self._sanitize_arguments(arguments),
78            result_type="denied",
79            user_id=None,
80            session_id=session_id,
81            duration_ms=0,
82            error_message=reason
83        )
84
85        self.logger.warning(event.to_json())
86
87    def _sanitize_arguments(self, args: dict) -> dict:
88        """Remove sensitive data from arguments."""
89        sensitive_keys = ["password", "token", "secret", "key", "credential"]
90        result = {}
91
92        for key, value in args.items():
93            if any(s in key.lower() for s in sensitive_keys):
94                result[key] = "[REDACTED]"
95            elif isinstance(value, str) and len(value) > 1000:
96                result[key] = f"[{len(value)} chars]"
97            else:
98                result[key] = value
99
100        return result

Summary

Security best practices for MCP:

  1. Never trust LLM inputs: Validate all tool arguments at the server boundary
  2. Principle of least privilege: Grant only necessary permissions
  3. Path validation: Prevent traversal attacks, symlink escapes
  4. Sandbox code execution: Use containers with resource limits
  5. Human approval: Gate dangerous operations through confirmation
  6. Secrets management: Use environment variables, never expose in responses
  7. Audit logging: Log all operations for security monitoring
Security is not optional. A single vulnerability in an MCP server with file system or code execution access can compromise the entire system. Review and test security measures regularly.
Next: Let's put it all togetherβ€”integrating MCP servers into a complete AI agent system.