Introduction
Research often involves documents beyond web pages: PDFs, Word documents, academic papers, and more. A research agent needs to extract and process content from these various formats to conduct comprehensive research.
Key Insight: Document processing is about more than just extracting text. Understanding document structure, tables, figures, and citations provides richer context for research.
PDF Processing
PDFs are ubiquitous in research, from academic papers to technical documentation. Here's a robust PDF processing system:
1import io
2from dataclasses import dataclass, field
3from typing import List, Dict, Any, Optional
4from pathlib import Path
5import aiohttp
6import asyncio
7
8
9@dataclass
10class PDFPage:
11 """Represents a single PDF page."""
12 page_number: int
13 text: str
14 tables: List[Dict[str, Any]] = field(default_factory=list)
15 images: List[Dict[str, Any]] = field(default_factory=list)
16
17
18@dataclass
19class PDFDocument:
20 """Represents a processed PDF document."""
21 source: str
22 title: str
23 pages: List[PDFPage]
24 metadata: Dict[str, Any]
25 total_pages: int
26
27 @property
28 def full_text(self) -> str:
29 """Get all text from the document."""
30 return "\n\n".join(page.text for page in self.pages)
31
32 def get_page(self, num: int) -> Optional[PDFPage]:
33 """Get a specific page by number."""
34 for page in self.pages:
35 if page.page_number == num:
36 return page
37 return None
38
39
40class PDFProcessor:
41 """
42 Process PDF documents with multiple extraction strategies.
43 """
44
45 def __init__(self, use_ocr: bool = False):
46 self.use_ocr = use_ocr
47
48 async def process_url(self, url: str) -> PDFDocument:
49 """Download and process a PDF from a URL."""
50 async with aiohttp.ClientSession() as session:
51 async with session.get(url) as response:
52 if response.status != 200:
53 raise Exception(f"Failed to download PDF: [response.status]")
54 pdf_bytes = await response.read()
55
56 return await self.process_bytes(pdf_bytes, source=url)
57
58 async def process_file(self, path: Path) -> PDFDocument:
59 """Process a PDF from a local file."""
60 with open(path, "rb") as f:
61 pdf_bytes = f.read()
62 return await self.process_bytes(pdf_bytes, source=str(path))
63
64 async def process_bytes(
65 self,
66 pdf_bytes: bytes,
67 source: str = "unknown"
68 ) -> PDFDocument:
69 """Process PDF from bytes."""
70 # Try different extraction methods
71 try:
72 return await self._extract_with_pymupdf(pdf_bytes, source)
73 except ImportError:
74 pass
75
76 try:
77 return await self._extract_with_pdfplumber(pdf_bytes, source)
78 except ImportError:
79 pass
80
81 try:
82 return await self._extract_with_pypdf(pdf_bytes, source)
83 except ImportError:
84 raise ImportError("No PDF library available. Install pymupdf, pdfplumber, or pypdf.")
85
86 async def _extract_with_pymupdf(
87 self,
88 pdf_bytes: bytes,
89 source: str
90 ) -> PDFDocument:
91 """Extract using PyMuPDF (fitz) - best quality."""
92 import fitz
93
94 doc = fitz.open(stream=pdf_bytes, filetype="pdf")
95
96 pages = []
97 for page_num in range(len(doc)):
98 page = doc[page_num]
99
100 # Extract text
101 text = page.get_text()
102
103 # Extract tables (basic)
104 tables = self._extract_tables_basic(page.get_text("blocks"))
105
106 pages.append(PDFPage(
107 page_number=page_num + 1,
108 text=text,
109 tables=tables
110 ))
111
112 # Get metadata
113 metadata = doc.metadata or {}
114
115 return PDFDocument(
116 source=source,
117 title=metadata.get("title", ""),
118 pages=pages,
119 metadata=metadata,
120 total_pages=len(pages)
121 )
122
123 async def _extract_with_pdfplumber(
124 self,
125 pdf_bytes: bytes,
126 source: str
127 ) -> PDFDocument:
128 """Extract using pdfplumber - good for tables."""
129 import pdfplumber
130
131 pdf = pdfplumber.open(io.BytesIO(pdf_bytes))
132
133 pages = []
134 for page_num, page in enumerate(pdf.pages):
135 text = page.extract_text() or ""
136
137 # Extract tables
138 tables = []
139 for table in page.extract_tables():
140 if table:
141 tables.append({
142 "data": table,
143 "rows": len(table),
144 "cols": len(table[0]) if table else 0
145 })
146
147 pages.append(PDFPage(
148 page_number=page_num + 1,
149 text=text,
150 tables=tables
151 ))
152
153 metadata = pdf.metadata or {}
154
155 return PDFDocument(
156 source=source,
157 title=metadata.get("/Title", ""),
158 pages=pages,
159 metadata=metadata,
160 total_pages=len(pages)
161 )
162
163 async def _extract_with_pypdf(
164 self,
165 pdf_bytes: bytes,
166 source: str
167 ) -> PDFDocument:
168 """Extract using pypdf - basic but widely available."""
169 from pypdf import PdfReader
170
171 reader = PdfReader(io.BytesIO(pdf_bytes))
172
173 pages = []
174 for page_num, page in enumerate(reader.pages):
175 text = page.extract_text() or ""
176
177 pages.append(PDFPage(
178 page_number=page_num + 1,
179 text=text,
180 tables=[]
181 ))
182
183 metadata = reader.metadata or {}
184
185 return PDFDocument(
186 source=source,
187 title=metadata.get("/Title", ""),
188 pages=pages,
189 metadata=dict(metadata),
190 total_pages=len(pages)
191 )
192
193 def _extract_tables_basic(
194 self,
195 blocks: List
196 ) -> List[Dict[str, Any]]:
197 """Basic table extraction from text blocks."""
198 # This is a simplified approach
199 # For production, use pdfplumber or Camelot
200 tables = []
201 # Placeholder for table detection logic
202 return tablesDocument Chunking
Large documents need to be chunked for processing by LLMs. Here's a smart chunking system:
1from dataclasses import dataclass
2from typing import List, Optional
3import re
4
5
6@dataclass
7class DocumentChunk:
8 """A chunk of a document."""
9 content: str
10 page_number: Optional[int]
11 chunk_index: int
12 metadata: Dict[str, Any]
13
14 @property
15 def token_count(self) -> int:
16 """Estimate token count."""
17 return len(self.content.split()) * 1.3 # Rough estimate
18
19
20class DocumentChunker:
21 """
22 Split documents into chunks for LLM processing.
23 """
24
25 def __init__(
26 self,
27 chunk_size: int = 1000,
28 chunk_overlap: int = 200,
29 respect_boundaries: bool = True
30 ):
31 self.chunk_size = chunk_size
32 self.chunk_overlap = chunk_overlap
33 self.respect_boundaries = respect_boundaries
34
35 def chunk_document(
36 self,
37 document: PDFDocument
38 ) -> List[DocumentChunk]:
39 """Chunk a PDF document intelligently."""
40 chunks = []
41
42 for page in document.pages:
43 page_chunks = self._chunk_text(
44 page.text,
45 page_number=page.page_number
46 )
47 chunks.extend(page_chunks)
48
49 # Renumber chunks
50 for i, chunk in enumerate(chunks):
51 chunk.chunk_index = i
52
53 return chunks
54
55 def chunk_text(
56 self,
57 text: str,
58 metadata: Dict[str, Any] = None
59 ) -> List[DocumentChunk]:
60 """Chunk plain text."""
61 return self._chunk_text(text, metadata=metadata or {})
62
63 def _chunk_text(
64 self,
65 text: str,
66 page_number: int = None,
67 metadata: Dict[str, Any] = None
68 ) -> List[DocumentChunk]:
69 """Internal chunking logic."""
70 if not text.strip():
71 return []
72
73 chunks = []
74 meta = metadata or {}
75
76 if self.respect_boundaries:
77 # Try to split on natural boundaries
78 sections = self._split_on_boundaries(text)
79 else:
80 sections = [text]
81
82 current_chunk = ""
83 chunk_idx = 0
84
85 for section in sections:
86 if len(current_chunk) + len(section) <= self.chunk_size:
87 current_chunk += section + "\n"
88 else:
89 # Save current chunk
90 if current_chunk.strip():
91 chunks.append(DocumentChunk(
92 content=current_chunk.strip(),
93 page_number=page_number,
94 chunk_index=chunk_idx,
95 metadata=meta
96 ))
97 chunk_idx += 1
98
99 # Start new chunk with overlap
100 if self.chunk_overlap > 0:
101 overlap_start = max(0, len(current_chunk) - self.chunk_overlap)
102 current_chunk = current_chunk[overlap_start:] + section + "\n"
103 else:
104 current_chunk = section + "\n"
105
106 # Handle sections larger than chunk_size
107 while len(current_chunk) > self.chunk_size:
108 chunks.append(DocumentChunk(
109 content=current_chunk[:self.chunk_size].strip(),
110 page_number=page_number,
111 chunk_index=chunk_idx,
112 metadata=meta
113 ))
114 chunk_idx += 1
115
116 overlap_start = max(0, self.chunk_size - self.chunk_overlap)
117 current_chunk = current_chunk[overlap_start:]
118
119 # Add remaining content
120 if current_chunk.strip():
121 chunks.append(DocumentChunk(
122 content=current_chunk.strip(),
123 page_number=page_number,
124 chunk_index=chunk_idx,
125 metadata=meta
126 ))
127
128 return chunks
129
130 def _split_on_boundaries(self, text: str) -> List[str]:
131 """Split text on natural boundaries like paragraphs and headings."""
132 # Split on double newlines (paragraphs)
133 paragraphs = re.split(r"\n\s*\n", text)
134
135 sections = []
136 for para in paragraphs:
137 para = para.strip()
138 if para:
139 sections.append(para)
140
141 return sections
142
143
144class SemanticChunker:
145 """
146 Chunk documents based on semantic similarity.
147 """
148
149 def __init__(self, embedding_model, similarity_threshold: float = 0.7):
150 self.embedder = embedding_model
151 self.threshold = similarity_threshold
152
153 async def chunk(
154 self,
155 text: str,
156 max_chunk_size: int = 1000
157 ) -> List[DocumentChunk]:
158 """Create semantically coherent chunks."""
159 # Split into sentences
160 sentences = self._split_sentences(text)
161
162 # Get embeddings
163 embeddings = await self.embedder.embed_batch(sentences)
164
165 # Group similar sentences
166 chunks = []
167 current_chunk = []
168 current_size = 0
169
170 for i, (sentence, embedding) in enumerate(zip(sentences, embeddings)):
171 if current_size + len(sentence) > max_chunk_size:
172 # Start new chunk
173 if current_chunk:
174 chunks.append(DocumentChunk(
175 content=" ".join(current_chunk),
176 page_number=None,
177 chunk_index=len(chunks),
178 metadata={}
179 ))
180 current_chunk = [sentence]
181 current_size = len(sentence)
182 else:
183 # Check semantic similarity with previous
184 if i > 0:
185 similarity = self._cosine_similarity(
186 embeddings[i-1], embedding
187 )
188 if similarity < self.threshold and current_chunk:
189 # Low similarity - start new chunk
190 chunks.append(DocumentChunk(
191 content=" ".join(current_chunk),
192 page_number=None,
193 chunk_index=len(chunks),
194 metadata={}
195 ))
196 current_chunk = [sentence]
197 current_size = len(sentence)
198 continue
199
200 current_chunk.append(sentence)
201 current_size += len(sentence)
202
203 if current_chunk:
204 chunks.append(DocumentChunk(
205 content=" ".join(current_chunk),
206 page_number=None,
207 chunk_index=len(chunks),
208 metadata={}
209 ))
210
211 return chunks
212
213 def _split_sentences(self, text: str) -> List[str]:
214 """Split text into sentences."""
215 return re.split(r"(?<=[.!?])\s+", text)
216
217 def _cosine_similarity(self, a: List[float], b: List[float]) -> float:
218 """Calculate cosine similarity between two vectors."""
219 import numpy as np
220 a = np.array(a)
221 b = np.array(b)
222 return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))Other Document Formats
Research may involve various document types. Here's how to handle common formats:
1from abc import ABC, abstractmethod
2from pathlib import Path
3
4
5class DocumentProcessor(ABC):
6 """Base class for document processors."""
7
8 @property
9 @abstractmethod
10 def supported_extensions(self) -> List[str]:
11 pass
12
13 @abstractmethod
14 async def process(self, source: str) -> Dict[str, Any]:
15 pass
16
17
18class WordProcessor(DocumentProcessor):
19 """Process Word documents (.docx)."""
20
21 @property
22 def supported_extensions(self) -> List[str]:
23 return [".docx", ".doc"]
24
25 async def process(self, source: str) -> Dict[str, Any]:
26 from docx import Document
27
28 if source.startswith("http"):
29 # Download first
30 async with aiohttp.ClientSession() as session:
31 async with session.get(source) as response:
32 content = await response.read()
33 doc = Document(io.BytesIO(content))
34 else:
35 doc = Document(source)
36
37 paragraphs = [p.text for p in doc.paragraphs if p.text.strip()]
38
39 # Extract tables
40 tables = []
41 for table in doc.tables:
42 table_data = []
43 for row in table.rows:
44 table_data.append([cell.text for cell in row.cells])
45 tables.append(table_data)
46
47 return {
48 "text": "\n\n".join(paragraphs),
49 "tables": tables,
50 "source": source
51 }
52
53
54class MarkdownProcessor(DocumentProcessor):
55 """Process Markdown files."""
56
57 @property
58 def supported_extensions(self) -> List[str]:
59 return [".md", ".markdown"]
60
61 async def process(self, source: str) -> Dict[str, Any]:
62 if source.startswith("http"):
63 async with aiohttp.ClientSession() as session:
64 async with session.get(source) as response:
65 text = await response.text()
66 else:
67 with open(source, "r") as f:
68 text = f.read()
69
70 # Parse structure
71 sections = self._parse_sections(text)
72
73 return {
74 "text": text,
75 "sections": sections,
76 "source": source
77 }
78
79 def _parse_sections(self, text: str) -> List[Dict[str, str]]:
80 """Parse markdown into sections by headers."""
81 sections = []
82 current_section = {"title": "", "content": ""}
83
84 for line in text.split("\n"):
85 if line.startswith("#"):
86 if current_section["content"]:
87 sections.append(current_section)
88 # Count heading level
89 level = len(line.split()[0])
90 title = line.lstrip("#").strip()
91 current_section = {"title": title, "level": level, "content": ""}
92 else:
93 current_section["content"] += line + "\n"
94
95 if current_section["content"]:
96 sections.append(current_section)
97
98 return sections
99
100
101class HTMLProcessor(DocumentProcessor):
102 """Process HTML files."""
103
104 @property
105 def supported_extensions(self) -> List[str]:
106 return [".html", ".htm"]
107
108 async def process(self, source: str) -> Dict[str, Any]:
109 if source.startswith("http"):
110 async with aiohttp.ClientSession() as session:
111 async with session.get(source) as response:
112 html = await response.text()
113 else:
114 with open(source, "r") as f:
115 html = f.read()
116
117 soup = BeautifulSoup(html, "html.parser")
118
119 # Remove scripts and styles
120 for tag in soup.find_all(["script", "style"]):
121 tag.decompose()
122
123 text = soup.get_text(separator="\n")
124
125 return {
126 "text": text,
127 "title": soup.title.string if soup.title else "",
128 "source": source
129 }
130
131
132class UniversalDocumentProcessor:
133 """
134 Process any supported document type.
135 """
136
137 def __init__(self):
138 self.processors = {
139 ".pdf": PDFProcessor(),
140 ".docx": WordProcessor(),
141 ".doc": WordProcessor(),
142 ".md": MarkdownProcessor(),
143 ".markdown": MarkdownProcessor(),
144 ".html": HTMLProcessor(),
145 ".htm": HTMLProcessor(),
146 }
147
148 async def process(self, source: str) -> Dict[str, Any]:
149 """Process a document from file path or URL."""
150 # Determine extension
151 if "." in source:
152 ext = "." + source.rsplit(".", 1)[-1].lower()
153 ext = ext.split("?")[0] # Remove query params
154 else:
155 raise ValueError(f"Cannot determine file type: [source]")
156
157 processor = self.processors.get(ext)
158 if not processor:
159 raise ValueError(f"Unsupported file type: [ext]")
160
161 return await processor.process(source)
162
163 def supported_types(self) -> List[str]:
164 """Get list of supported file types."""
165 return list(self.processors.keys())Document Analysis
Beyond extraction, documents need analysis to extract key information:
1class DocumentAnalyzer:
2 """
3 Analyze documents to extract key information.
4 """
5
6 def __init__(self, llm_client):
7 self.llm = llm_client
8
9 async def extract_key_points(
10 self,
11 text: str,
12 max_points: int = 10
13 ) -> List[str]:
14 """Extract key points from a document."""
15 prompt = f"""Extract the [max_points] most important points from this text.
16Return each point on a new line, prefixed with a dash.
17
18TEXT:
19[text[:8000]]
20
21KEY POINTS:"""
22
23 response = await self.llm.generate(prompt)
24
25 points = []
26 for line in response.split("\n"):
27 line = line.strip()
28 if line.startswith("-"):
29 points.append(line[1:].strip())
30 elif line and len(points) < max_points:
31 points.append(line)
32
33 return points[:max_points]
34
35 async def extract_entities(
36 self,
37 text: str
38 ) -> Dict[str, List[str]]:
39 """Extract named entities from text."""
40 prompt = f"""Extract named entities from this text.
41Group them by type: PERSON, ORGANIZATION, LOCATION, DATE, CONCEPT.
42
43TEXT:
44[text[:4000]]
45
46Return in this format:
47PERSON: name1, name2
48ORGANIZATION: org1, org2
49etc."""
50
51 response = await self.llm.generate(prompt)
52
53 entities = {}
54 for line in response.split("\n"):
55 if ":" in line:
56 entity_type, values = line.split(":", 1)
57 entity_type = entity_type.strip().upper()
58 entities[entity_type] = [
59 v.strip() for v in values.split(",") if v.strip()
60 ]
61
62 return entities
63
64 async def generate_summary(
65 self,
66 text: str,
67 length: str = "medium"
68 ) -> str:
69 """Generate a summary of the document."""
70 length_guides = {
71 "short": "2-3 sentences",
72 "medium": "1-2 paragraphs",
73 "long": "comprehensive summary with key details"
74 }
75
76 prompt = f"""Write a [length_guides.get(length, 'medium')] summary of this text.
77
78TEXT:
79[text[:8000]]
80
81SUMMARY:"""
82
83 return await self.llm.generate(prompt)
84
85 async def extract_citations(
86 self,
87 text: str
88 ) -> List[Dict[str, str]]:
89 """Extract citations and references from academic text."""
90 prompt = f"""Extract all citations and references from this text.
91For each, provide the author(s), title if available, and year.
92
93TEXT:
94[text[:6000]]
95
96Return in this format, one per line:
97- Author(s): [names], Title: [title], Year: [year]"""
98
99 response = await self.llm.generate(prompt)
100
101 citations = []
102 for line in response.split("\n"):
103 if line.strip().startswith("-"):
104 citation = {"raw": line.strip()[1:].strip()}
105 # Parse structured data if possible
106 if "Author" in line:
107 parts = line.split(",")
108 for part in parts:
109 if "Author" in part:
110 citation["authors"] = part.split(":")[-1].strip()
111 elif "Title" in part:
112 citation["title"] = part.split(":")[-1].strip()
113 elif "Year" in part:
114 citation["year"] = part.split(":")[-1].strip()
115 citations.append(citation)
116
117 return citations
118
119 async def compare_documents(
120 self,
121 doc1: str,
122 doc2: str
123 ) -> Dict[str, Any]:
124 """Compare two documents for similarities and differences."""
125 prompt = f"""Compare these two texts and identify:
1261. Main similarities
1272. Key differences
1283. Contradictions (if any)
1294. Unique points in each
130
131TEXT 1:
132[doc1[:3000]]
133
134TEXT 2:
135[doc2[:3000]]
136
137Provide a structured comparison."""
138
139 response = await self.llm.generate(prompt)
140
141 return {
142 "comparison": response,
143 "doc1_length": len(doc1),
144 "doc2_length": len(doc2)
145 }Summary
In this section, we built comprehensive document processing capabilities:
- PDF Processing: Multi-library support with PyMuPDF, pdfplumber, and pypdf
- Document Chunking: Fixed-size and semantic chunking strategies for LLM processing
- Format Support: Universal processor for Word, Markdown, HTML, and other formats
- Document Analysis: LLM-powered extraction of key points, entities, summaries, and citations
In the next section, we'll learn how to synthesize information from multiple sources into coherent research findings.