Introduction
MCP servers expose powerful capabilitiesβfile access, code execution, API calls, database queries. When an AI agent can invoke these tools, security becomes critical. A compromised or poorly designed MCP server can lead to data breaches, unauthorized access, or system compromise.
Security Principle: Every MCP tool is an attack surface. Design with the assumption that the AI might be manipulated through prompt injection or that inputs could be malicious.
MCP servers run with the permissions of the user who started them. A file system server running as root can read/write any file. A database server with admin credentials can drop tables. Principle of least privilege is essential.
MCP Threat Model
Understanding the threat landscape helps prioritize security measures:
Attack Vectors
| Vector | Description | Example |
|---|---|---|
| Prompt injection | Malicious input manipulates AI behavior | User tricks AI into reading sensitive files |
| Parameter manipulation | Crafted tool arguments bypass validation | Path traversal: ../../../etc/passwd |
| Tool confusion | AI calls wrong tool or with wrong params | delete_file instead of read_file |
| Resource exhaustion | Overwhelming server with requests | Infinite loop in tool execution |
| Information disclosure | Tool reveals sensitive information | Database query returns passwords |
Trust Boundaries
πtrust_boundaries.txt
1MCP TRUST BOUNDARIES
2
3βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
4β UNTRUSTED ZONE β
5β βββββββββββββββββββ β
6β β User Input β βββ Prompt injection, malicious data β
7β ββββββββββ¬βββββββββ β
8β β β
9β βΌ β
10β βββββββββββββββββββ β
11β β LLM Agent β βββ May be manipulated, hallucinate β
12β ββββββββββ¬βββββββββ β
13β β Tool calls (untrusted) β
14βββββββββββββΌββββββββββββββββββββββββββββββββββββββββββββββββββ
15 β
16βββββββββββββΌββββββββββββββββββββββββββββββββββββββββββββββββββ
17β VALIDATION BOUNDARY β
18β βββββββββββββββββββ β
19β β Input Validatorβ βββ MUST validate ALL tool inputs β
20β ββββββββββ¬βββββββββ β
21β β β
22βββββββββββββΌββββββββββββββββββββββββββββββββββββββββββββββββββ
23 β
24βββββββββββββΌββββββββββββββββββββββββββββββββββββββββββββββββββ
25β TRUSTED ZONE β
26β βββββββββββββββββββ β
27β β MCP Server β βββ Executes with server privileges β
28β ββββββββββ¬βββββββββ β
29β β β
30β βΌ β
31β βββββββββββββββββββ β
32β β System Resourcesβ βββ Files, APIs, databases, etc. β
33β βββββββββββββββββββ β
34βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
35
36KEY INSIGHT: Never trust LLM-generated tool calls.
37Validate everything at the server boundary.Input Validation
Every tool input must be validatedβtreat all inputs as potentially malicious:
Schema Validation
πschema_validation.py
1from pydantic import BaseModel, Field, validator
2from typing import Optional
3import re
4
5class ReadFileInput(BaseModel):
6 """Validated input for read_file tool."""
7
8 path: str = Field(
9 ...,
10 min_length=1,
11 max_length=4096,
12 description="Path to file to read"
13 )
14
15 @validator("path")
16 def validate_path(cls, v):
17 # Reject null bytes (injection attempt)
18 if "\x00" in v:
19 raise ValueError("Null bytes not allowed in path")
20
21 # Reject path traversal
22 if ".." in v:
23 raise ValueError("Path traversal not allowed")
24
25 # Must be absolute path
26 if not v.startswith("/"):
27 raise ValueError("Path must be absolute")
28
29 # Reject shell metacharacters
30 if re.search(r"[;&|$`]", v):
31 raise ValueError("Shell metacharacters not allowed")
32
33 return v
34
35class SQLQueryInput(BaseModel):
36 """Validated input for SQL query tool."""
37
38 query: str = Field(
39 ...,
40 max_length=10000,
41 description="SQL query to execute"
42 )
43 params: Optional[list] = Field(
44 default=[],
45 description="Query parameters (for parameterized queries)"
46 )
47
48 @validator("query")
49 def validate_query(cls, v):
50 # Only allow SELECT queries
51 v_upper = v.strip().upper()
52 if not v_upper.startswith("SELECT"):
53 raise ValueError("Only SELECT queries allowed")
54
55 # Block dangerous keywords
56 dangerous = ["DROP", "DELETE", "INSERT", "UPDATE", "ALTER", "TRUNCATE"]
57 for keyword in dangerous:
58 if keyword in v_upper:
59 raise ValueError(f"{keyword} operations not allowed")
60
61 return vPath Validation
πpath_validation.py
1from pathlib import Path
2from typing import Optional
3
4class SecurePathValidator:
5 """Validate and sanitize file paths."""
6
7 def __init__(self, allowed_roots: list[Path]):
8 self.allowed_roots = [root.resolve() for root in allowed_roots]
9
10 def validate(self, path_str: str) -> Path:
11 """Validate path is safe and within allowed roots."""
12
13 # Convert to Path and resolve to absolute
14 try:
15 path = Path(path_str).resolve()
16 except Exception as e:
17 raise ValueError(f"Invalid path: {e}")
18
19 # Check against allowed roots
20 if not self._is_within_allowed_roots(path):
21 raise PermissionError(
22 f"Access denied: {path} is outside allowed directories"
23 )
24
25 # Check for symlink escape
26 if self._has_symlink_escape(path):
27 raise PermissionError(
28 "Symlink escape detected"
29 )
30
31 return path
32
33 def _is_within_allowed_roots(self, path: Path) -> bool:
34 """Check if path is within any allowed root."""
35 for root in self.allowed_roots:
36 try:
37 path.relative_to(root)
38 return True
39 except ValueError:
40 continue
41 return False
42
43 def _has_symlink_escape(self, path: Path) -> bool:
44 """Check if symlinks lead outside allowed roots."""
45 # Walk up the path checking each component
46 current = path
47 while current != current.parent:
48 if current.is_symlink():
49 resolved = current.resolve()
50 if not self._is_within_allowed_roots(resolved):
51 return True
52 current = current.parent
53 return False
54
55
56# Usage
57validator = SecurePathValidator([
58 Path.home() / "projects",
59 Path("/tmp/sandbox")
60])
61
62try:
63 safe_path = validator.validate("/home/user/projects/file.txt")
64 # Safe to use
65except (ValueError, PermissionError) as e:
66 # Reject the request
67 print(f"Path validation failed: {e}")Access Control
Implement proper authorization for sensitive operations:
Permission-Based Access
πaccess_control.py
1from enum import Flag, auto
2from dataclasses import dataclass
3from typing import Callable
4
5class Permission(Flag):
6 """Permissions for MCP operations."""
7 READ_FILES = auto()
8 WRITE_FILES = auto()
9 EXECUTE_CODE = auto()
10 NETWORK_ACCESS = auto()
11 DATABASE_READ = auto()
12 DATABASE_WRITE = auto()
13 ADMIN = READ_FILES | WRITE_FILES | EXECUTE_CODE | NETWORK_ACCESS | DATABASE_READ | DATABASE_WRITE
14
15@dataclass
16class AccessContext:
17 """Context for access control decisions."""
18 user_id: str
19 permissions: Permission
20 allowed_paths: list[str]
21 rate_limit: int
22
23class ToolAccessControl:
24 """Access control for MCP tools."""
25
26 def __init__(self):
27 self._tool_permissions: dict[str, Permission] = {}
28
29 def require_permission(self, permission: Permission):
30 """Decorator to require specific permission for a tool."""
31 def decorator(func: Callable):
32 self._tool_permissions[func.__name__] = permission
33
34 async def wrapper(context: AccessContext, *args, **kwargs):
35 if not (context.permissions & permission):
36 raise PermissionError(
37 f"Missing permission: {permission.name}"
38 )
39 return await func(*args, **kwargs)
40
41 return wrapper
42 return decorator
43
44
45# Usage example
46access_control = ToolAccessControl()
47
48@access_control.require_permission(Permission.READ_FILES)
49async def read_file(path: str) -> str:
50 """Read a file (requires READ_FILES permission)."""
51 return Path(path).read_text()
52
53@access_control.require_permission(Permission.WRITE_FILES)
54async def write_file(path: str, content: str) -> str:
55 """Write a file (requires WRITE_FILES permission)."""
56 Path(path).write_text(content)
57 return f"Written {len(content)} bytes"
58
59@access_control.require_permission(Permission.EXECUTE_CODE)
60async def run_python(code: str) -> str:
61 """Execute Python code (requires EXECUTE_CODE permission)."""
62 # Dangerous! See sandboxing section
63 exec(code)
64 return "Executed"Human-in-the-Loop Approval
πapproval_flow.py
1from dataclasses import dataclass
2from typing import Callable, Awaitable
3from enum import Enum
4
5class ApprovalLevel(Enum):
6 NONE = "none" # No approval needed
7 NOTIFY = "notify" # Notify user but proceed
8 CONFIRM = "confirm" # Require explicit approval
9 BLOCK = "block" # Always block
10
11@dataclass
12class ApprovalRequest:
13 tool_name: str
14 arguments: dict
15 risk_assessment: str
16 level: ApprovalLevel
17
18class ApprovalGateway:
19 """Gate dangerous operations through human approval."""
20
21 def __init__(
22 self,
23 notify_callback: Callable[[str], Awaitable[None]],
24 confirm_callback: Callable[[ApprovalRequest], Awaitable[bool]]
25 ):
26 self.notify = notify_callback
27 self.confirm = confirm_callback
28
29 # Define approval requirements per tool
30 self.tool_levels = {
31 "read_file": ApprovalLevel.NONE,
32 "write_file": ApprovalLevel.CONFIRM,
33 "delete_file": ApprovalLevel.CONFIRM,
34 "run_command": ApprovalLevel.CONFIRM,
35 "send_email": ApprovalLevel.CONFIRM,
36 "database_write": ApprovalLevel.CONFIRM,
37 }
38
39 async def check_approval(
40 self,
41 tool_name: str,
42 arguments: dict
43 ) -> bool:
44 """Check if tool execution is approved."""
45
46 level = self.tool_levels.get(tool_name, ApprovalLevel.CONFIRM)
47
48 if level == ApprovalLevel.NONE:
49 return True
50
51 if level == ApprovalLevel.BLOCK:
52 return False
53
54 request = ApprovalRequest(
55 tool_name=tool_name,
56 arguments=arguments,
57 risk_assessment=self._assess_risk(tool_name, arguments),
58 level=level
59 )
60
61 if level == ApprovalLevel.NOTIFY:
62 await self.notify(
63 f"Executing {tool_name} with {arguments}"
64 )
65 return True
66
67 if level == ApprovalLevel.CONFIRM:
68 return await self.confirm(request)
69
70 return False
71
72 def _assess_risk(self, tool_name: str, arguments: dict) -> str:
73 """Assess risk level of the operation."""
74 risks = []
75
76 if tool_name in ["delete_file", "run_command"]:
77 risks.append("This operation can cause data loss")
78
79 if "password" in str(arguments).lower():
80 risks.append("Arguments may contain sensitive data")
81
82 return "; ".join(risks) if risks else "Low risk operation"Sandboxing and Isolation
Code execution tools require strict sandboxing:
Docker-Based Sandbox
πdocker_sandbox.py
1import docker
2import tempfile
3import os
4from dataclasses import dataclass
5
6@dataclass
7class SandboxConfig:
8 """Configuration for code sandbox."""
9 memory_limit: str = "256m"
10 cpu_period: int = 100000
11 cpu_quota: int = 50000 # 50% of one CPU
12 timeout: int = 30
13 network_disabled: bool = True
14 read_only: bool = True
15
16class DockerSandbox:
17 """Execute code in isolated Docker container."""
18
19 def __init__(self, config: SandboxConfig = None):
20 self.config = config or SandboxConfig()
21 self.client = docker.from_env()
22
23 async def execute_python(self, code: str) -> dict:
24 """Execute Python code in sandbox."""
25
26 # Create temporary directory for code
27 with tempfile.TemporaryDirectory() as tmpdir:
28 code_file = os.path.join(tmpdir, "script.py")
29 with open(code_file, "w") as f:
30 f.write(code)
31
32 try:
33 result = self.client.containers.run(
34 "python:3.11-slim",
35 command=["python", "/code/script.py"],
36 volumes={
37 tmpdir: {"bind": "/code", "mode": "ro"}
38 },
39 mem_limit=self.config.memory_limit,
40 cpu_period=self.config.cpu_period,
41 cpu_quota=self.config.cpu_quota,
42 network_disabled=self.config.network_disabled,
43 read_only=self.config.read_only,
44 remove=True,
45 stdout=True,
46 stderr=True,
47 detach=False,
48 # Security options
49 security_opt=["no-new-privileges:true"],
50 cap_drop=["ALL"],
51 )
52
53 return {
54 "success": True,
55 "output": result.decode("utf-8"),
56 "error": None
57 }
58
59 except docker.errors.ContainerError as e:
60 return {
61 "success": False,
62 "output": e.container.logs().decode("utf-8"),
63 "error": str(e)
64 }
65
66 except Exception as e:
67 return {
68 "success": False,
69 "output": None,
70 "error": str(e)
71 }Resource Limits
πresource_limits.py
1import resource
2import signal
3import asyncio
4from functools import wraps
5
6class ResourceLimiter:
7 """Limit resources for tool execution."""
8
9 def __init__(
10 self,
11 max_memory_mb: int = 256,
12 max_time_seconds: int = 30,
13 max_file_size_mb: int = 10
14 ):
15 self.max_memory = max_memory_mb * 1024 * 1024
16 self.max_time = max_time_seconds
17 self.max_file_size = max_file_size_mb * 1024 * 1024
18
19 def limit(self, func):
20 """Decorator to apply resource limits."""
21
22 @wraps(func)
23 async def wrapper(*args, **kwargs):
24 # Set memory limit
25 resource.setrlimit(
26 resource.RLIMIT_AS,
27 (self.max_memory, self.max_memory)
28 )
29
30 # Set file size limit
31 resource.setrlimit(
32 resource.RLIMIT_FSIZE,
33 (self.max_file_size, self.max_file_size)
34 )
35
36 # Execute with timeout
37 try:
38 return await asyncio.wait_for(
39 func(*args, **kwargs),
40 timeout=self.max_time
41 )
42 except asyncio.TimeoutError:
43 raise TimeoutError(
44 f"Tool execution exceeded {self.max_time}s limit"
45 )
46
47 return wrapper
48
49
50# Usage
51limiter = ResourceLimiter(
52 max_memory_mb=128,
53 max_time_seconds=10
54)
55
56@limiter.limit
57async def execute_untrusted_code(code: str) -> str:
58 """Execute code with resource limits."""
59 # Implementation
60 passSecrets Management
MCP servers often need API keys and credentials:
Environment-Based Configuration
πsecrets_management.py
1import os
2from dataclasses import dataclass
3from typing import Optional
4
5@dataclass
6class ServerSecrets:
7 """Secrets for MCP server."""
8 github_token: Optional[str] = None
9 database_url: Optional[str] = None
10 api_key: Optional[str] = None
11
12 @classmethod
13 def from_environment(cls) -> "ServerSecrets":
14 """Load secrets from environment variables."""
15 return cls(
16 github_token=os.environ.get("GITHUB_TOKEN"),
17 database_url=os.environ.get("DATABASE_URL"),
18 api_key=os.environ.get("API_KEY"),
19 )
20
21 def validate(self) -> list[str]:
22 """Validate required secrets are present."""
23 missing = []
24 if not self.github_token:
25 missing.append("GITHUB_TOKEN")
26 if not self.database_url:
27 missing.append("DATABASE_URL")
28 return missing
29
30
31# Never expose secrets in tool responses
32def sanitize_response(response: str, secrets: ServerSecrets) -> str:
33 """Remove any secrets that might appear in output."""
34 result = response
35
36 secret_values = [
37 secrets.github_token,
38 secrets.database_url,
39 secrets.api_key,
40 ]
41
42 for secret in secret_values:
43 if secret and secret in result:
44 result = result.replace(secret, "[REDACTED]")
45
46 return resultScoped Credentials
πscoped_credentials.py
1from dataclasses import dataclass
2from datetime import datetime, timedelta
3from typing import Optional
4
5@dataclass
6class ScopedCredential:
7 """Time-limited, scope-restricted credential."""
8 token: str
9 scopes: list[str]
10 expires_at: datetime
11 created_for: str # Tool or operation this was created for
12
13 def is_valid(self) -> bool:
14 return datetime.now() < self.expires_at
15
16 def has_scope(self, required_scope: str) -> bool:
17 return required_scope in self.scopes or "*" in self.scopes
18
19
20class CredentialManager:
21 """Manage scoped, temporary credentials."""
22
23 def __init__(self, master_token: str):
24 self._master_token = master_token
25 self._issued: list[ScopedCredential] = []
26
27 def issue_scoped_token(
28 self,
29 scopes: list[str],
30 duration: timedelta,
31 purpose: str
32 ) -> ScopedCredential:
33 """Issue a limited credential for specific operation."""
34
35 # In practice, create a real scoped token via API
36 # This is pseudocode
37 scoped_token = self._create_scoped_token(scopes)
38
39 credential = ScopedCredential(
40 token=scoped_token,
41 scopes=scopes,
42 expires_at=datetime.now() + duration,
43 created_for=purpose
44 )
45
46 self._issued.append(credential)
47 return credential
48
49 def revoke_expired(self) -> int:
50 """Revoke all expired credentials."""
51 now = datetime.now()
52 expired = [c for c in self._issued if not c.is_valid()]
53
54 for cred in expired:
55 self._revoke_token(cred.token)
56 self._issued.remove(cred)
57
58 return len(expired)
59
60 def _create_scoped_token(self, scopes: list[str]) -> str:
61 # API-specific implementation
62 pass
63
64 def _revoke_token(self, token: str) -> None:
65 # API-specific implementation
66 passAudit Logging
Comprehensive logging is essential for security monitoring:
πaudit_logging.py
1import logging
2import json
3from datetime import datetime
4from dataclasses import dataclass, asdict
5from typing import Any, Optional
6
7@dataclass
8class AuditEvent:
9 """Structured audit log event."""
10 timestamp: str
11 event_type: str
12 tool_name: str
13 arguments: dict
14 result_type: str # success, error, denied
15 user_id: Optional[str]
16 session_id: str
17 duration_ms: int
18 error_message: Optional[str] = None
19
20 def to_json(self) -> str:
21 return json.dumps(asdict(self))
22
23
24class AuditLogger:
25 """Security audit logging for MCP operations."""
26
27 def __init__(self, log_file: str = "mcp_audit.log"):
28 self.logger = logging.getLogger("mcp_audit")
29 self.logger.setLevel(logging.INFO)
30
31 handler = logging.FileHandler(log_file)
32 handler.setFormatter(logging.Formatter("%(message)s"))
33 self.logger.addHandler(handler)
34
35 def log_tool_call(
36 self,
37 tool_name: str,
38 arguments: dict,
39 result_type: str,
40 duration_ms: int,
41 session_id: str,
42 user_id: Optional[str] = None,
43 error_message: Optional[str] = None
44 ):
45 """Log a tool call event."""
46
47 # Sanitize arguments (remove sensitive data)
48 safe_args = self._sanitize_arguments(arguments)
49
50 event = AuditEvent(
51 timestamp=datetime.utcnow().isoformat(),
52 event_type="tool_call",
53 tool_name=tool_name,
54 arguments=safe_args,
55 result_type=result_type,
56 user_id=user_id,
57 session_id=session_id,
58 duration_ms=duration_ms,
59 error_message=error_message
60 )
61
62 self.logger.info(event.to_json())
63
64 def log_access_denied(
65 self,
66 tool_name: str,
67 arguments: dict,
68 reason: str,
69 session_id: str
70 ):
71 """Log an access denied event."""
72
73 event = AuditEvent(
74 timestamp=datetime.utcnow().isoformat(),
75 event_type="access_denied",
76 tool_name=tool_name,
77 arguments=self._sanitize_arguments(arguments),
78 result_type="denied",
79 user_id=None,
80 session_id=session_id,
81 duration_ms=0,
82 error_message=reason
83 )
84
85 self.logger.warning(event.to_json())
86
87 def _sanitize_arguments(self, args: dict) -> dict:
88 """Remove sensitive data from arguments."""
89 sensitive_keys = ["password", "token", "secret", "key", "credential"]
90 result = {}
91
92 for key, value in args.items():
93 if any(s in key.lower() for s in sensitive_keys):
94 result[key] = "[REDACTED]"
95 elif isinstance(value, str) and len(value) > 1000:
96 result[key] = f"[{len(value)} chars]"
97 else:
98 result[key] = value
99
100 return resultSummary
Security best practices for MCP:
- Never trust LLM inputs: Validate all tool arguments at the server boundary
- Principle of least privilege: Grant only necessary permissions
- Path validation: Prevent traversal attacks, symlink escapes
- Sandbox code execution: Use containers with resource limits
- Human approval: Gate dangerous operations through confirmation
- Secrets management: Use environment variables, never expose in responses
- Audit logging: Log all operations for security monitoring
Security is not optional. A single vulnerability in an MCP server with file system or code execution access can compromise the entire system. Review and test security measures regularly.
Next: Let's put it all togetherβintegrating MCP servers into a complete AI agent system.