Chapter 13
18 min read
Section 81 of 175

Document Processing

Building a Research Agent

Introduction

Research often involves documents beyond web pages: PDFs, Word documents, academic papers, and more. A research agent needs to extract and process content from these various formats to conduct comprehensive research.

Key Insight: Document processing is about more than just extracting text. Understanding document structure, tables, figures, and citations provides richer context for research.

PDF Processing

PDFs are ubiquitous in research, from academic papers to technical documentation. Here's a robust PDF processing system:

🐍python
1import io
2from dataclasses import dataclass, field
3from typing import List, Dict, Any, Optional
4from pathlib import Path
5import aiohttp
6import asyncio
7
8
9@dataclass
10class PDFPage:
11    """Represents a single PDF page."""
12    page_number: int
13    text: str
14    tables: List[Dict[str, Any]] = field(default_factory=list)
15    images: List[Dict[str, Any]] = field(default_factory=list)
16
17
18@dataclass
19class PDFDocument:
20    """Represents a processed PDF document."""
21    source: str
22    title: str
23    pages: List[PDFPage]
24    metadata: Dict[str, Any]
25    total_pages: int
26
27    @property
28    def full_text(self) -> str:
29        """Get all text from the document."""
30        return "\n\n".join(page.text for page in self.pages)
31
32    def get_page(self, num: int) -> Optional[PDFPage]:
33        """Get a specific page by number."""
34        for page in self.pages:
35            if page.page_number == num:
36                return page
37        return None
38
39
40class PDFProcessor:
41    """
42    Process PDF documents with multiple extraction strategies.
43    """
44
45    def __init__(self, use_ocr: bool = False):
46        self.use_ocr = use_ocr
47
48    async def process_url(self, url: str) -> PDFDocument:
49        """Download and process a PDF from a URL."""
50        async with aiohttp.ClientSession() as session:
51            async with session.get(url) as response:
52                if response.status != 200:
53                    raise Exception(f"Failed to download PDF: [response.status]")
54                pdf_bytes = await response.read()
55
56        return await self.process_bytes(pdf_bytes, source=url)
57
58    async def process_file(self, path: Path) -> PDFDocument:
59        """Process a PDF from a local file."""
60        with open(path, "rb") as f:
61            pdf_bytes = f.read()
62        return await self.process_bytes(pdf_bytes, source=str(path))
63
64    async def process_bytes(
65        self,
66        pdf_bytes: bytes,
67        source: str = "unknown"
68    ) -> PDFDocument:
69        """Process PDF from bytes."""
70        # Try different extraction methods
71        try:
72            return await self._extract_with_pymupdf(pdf_bytes, source)
73        except ImportError:
74            pass
75
76        try:
77            return await self._extract_with_pdfplumber(pdf_bytes, source)
78        except ImportError:
79            pass
80
81        try:
82            return await self._extract_with_pypdf(pdf_bytes, source)
83        except ImportError:
84            raise ImportError("No PDF library available. Install pymupdf, pdfplumber, or pypdf.")
85
86    async def _extract_with_pymupdf(
87        self,
88        pdf_bytes: bytes,
89        source: str
90    ) -> PDFDocument:
91        """Extract using PyMuPDF (fitz) - best quality."""
92        import fitz
93
94        doc = fitz.open(stream=pdf_bytes, filetype="pdf")
95
96        pages = []
97        for page_num in range(len(doc)):
98            page = doc[page_num]
99
100            # Extract text
101            text = page.get_text()
102
103            # Extract tables (basic)
104            tables = self._extract_tables_basic(page.get_text("blocks"))
105
106            pages.append(PDFPage(
107                page_number=page_num + 1,
108                text=text,
109                tables=tables
110            ))
111
112        # Get metadata
113        metadata = doc.metadata or {}
114
115        return PDFDocument(
116            source=source,
117            title=metadata.get("title", ""),
118            pages=pages,
119            metadata=metadata,
120            total_pages=len(pages)
121        )
122
123    async def _extract_with_pdfplumber(
124        self,
125        pdf_bytes: bytes,
126        source: str
127    ) -> PDFDocument:
128        """Extract using pdfplumber - good for tables."""
129        import pdfplumber
130
131        pdf = pdfplumber.open(io.BytesIO(pdf_bytes))
132
133        pages = []
134        for page_num, page in enumerate(pdf.pages):
135            text = page.extract_text() or ""
136
137            # Extract tables
138            tables = []
139            for table in page.extract_tables():
140                if table:
141                    tables.append({
142                        "data": table,
143                        "rows": len(table),
144                        "cols": len(table[0]) if table else 0
145                    })
146
147            pages.append(PDFPage(
148                page_number=page_num + 1,
149                text=text,
150                tables=tables
151            ))
152
153        metadata = pdf.metadata or {}
154
155        return PDFDocument(
156            source=source,
157            title=metadata.get("/Title", ""),
158            pages=pages,
159            metadata=metadata,
160            total_pages=len(pages)
161        )
162
163    async def _extract_with_pypdf(
164        self,
165        pdf_bytes: bytes,
166        source: str
167    ) -> PDFDocument:
168        """Extract using pypdf - basic but widely available."""
169        from pypdf import PdfReader
170
171        reader = PdfReader(io.BytesIO(pdf_bytes))
172
173        pages = []
174        for page_num, page in enumerate(reader.pages):
175            text = page.extract_text() or ""
176
177            pages.append(PDFPage(
178                page_number=page_num + 1,
179                text=text,
180                tables=[]
181            ))
182
183        metadata = reader.metadata or {}
184
185        return PDFDocument(
186            source=source,
187            title=metadata.get("/Title", ""),
188            pages=pages,
189            metadata=dict(metadata),
190            total_pages=len(pages)
191        )
192
193    def _extract_tables_basic(
194        self,
195        blocks: List
196    ) -> List[Dict[str, Any]]:
197        """Basic table extraction from text blocks."""
198        # This is a simplified approach
199        # For production, use pdfplumber or Camelot
200        tables = []
201        # Placeholder for table detection logic
202        return tables

Document Chunking

Large documents need to be chunked for processing by LLMs. Here's a smart chunking system:

🐍python
1from dataclasses import dataclass
2from typing import List, Optional
3import re
4
5
6@dataclass
7class DocumentChunk:
8    """A chunk of a document."""
9    content: str
10    page_number: Optional[int]
11    chunk_index: int
12    metadata: Dict[str, Any]
13
14    @property
15    def token_count(self) -> int:
16        """Estimate token count."""
17        return len(self.content.split()) * 1.3  # Rough estimate
18
19
20class DocumentChunker:
21    """
22    Split documents into chunks for LLM processing.
23    """
24
25    def __init__(
26        self,
27        chunk_size: int = 1000,
28        chunk_overlap: int = 200,
29        respect_boundaries: bool = True
30    ):
31        self.chunk_size = chunk_size
32        self.chunk_overlap = chunk_overlap
33        self.respect_boundaries = respect_boundaries
34
35    def chunk_document(
36        self,
37        document: PDFDocument
38    ) -> List[DocumentChunk]:
39        """Chunk a PDF document intelligently."""
40        chunks = []
41
42        for page in document.pages:
43            page_chunks = self._chunk_text(
44                page.text,
45                page_number=page.page_number
46            )
47            chunks.extend(page_chunks)
48
49        # Renumber chunks
50        for i, chunk in enumerate(chunks):
51            chunk.chunk_index = i
52
53        return chunks
54
55    def chunk_text(
56        self,
57        text: str,
58        metadata: Dict[str, Any] = None
59    ) -> List[DocumentChunk]:
60        """Chunk plain text."""
61        return self._chunk_text(text, metadata=metadata or {})
62
63    def _chunk_text(
64        self,
65        text: str,
66        page_number: int = None,
67        metadata: Dict[str, Any] = None
68    ) -> List[DocumentChunk]:
69        """Internal chunking logic."""
70        if not text.strip():
71            return []
72
73        chunks = []
74        meta = metadata or {}
75
76        if self.respect_boundaries:
77            # Try to split on natural boundaries
78            sections = self._split_on_boundaries(text)
79        else:
80            sections = [text]
81
82        current_chunk = ""
83        chunk_idx = 0
84
85        for section in sections:
86            if len(current_chunk) + len(section) <= self.chunk_size:
87                current_chunk += section + "\n"
88            else:
89                # Save current chunk
90                if current_chunk.strip():
91                    chunks.append(DocumentChunk(
92                        content=current_chunk.strip(),
93                        page_number=page_number,
94                        chunk_index=chunk_idx,
95                        metadata=meta
96                    ))
97                    chunk_idx += 1
98
99                # Start new chunk with overlap
100                if self.chunk_overlap > 0:
101                    overlap_start = max(0, len(current_chunk) - self.chunk_overlap)
102                    current_chunk = current_chunk[overlap_start:] + section + "\n"
103                else:
104                    current_chunk = section + "\n"
105
106                # Handle sections larger than chunk_size
107                while len(current_chunk) > self.chunk_size:
108                    chunks.append(DocumentChunk(
109                        content=current_chunk[:self.chunk_size].strip(),
110                        page_number=page_number,
111                        chunk_index=chunk_idx,
112                        metadata=meta
113                    ))
114                    chunk_idx += 1
115
116                    overlap_start = max(0, self.chunk_size - self.chunk_overlap)
117                    current_chunk = current_chunk[overlap_start:]
118
119        # Add remaining content
120        if current_chunk.strip():
121            chunks.append(DocumentChunk(
122                content=current_chunk.strip(),
123                page_number=page_number,
124                chunk_index=chunk_idx,
125                metadata=meta
126            ))
127
128        return chunks
129
130    def _split_on_boundaries(self, text: str) -> List[str]:
131        """Split text on natural boundaries like paragraphs and headings."""
132        # Split on double newlines (paragraphs)
133        paragraphs = re.split(r"\n\s*\n", text)
134
135        sections = []
136        for para in paragraphs:
137            para = para.strip()
138            if para:
139                sections.append(para)
140
141        return sections
142
143
144class SemanticChunker:
145    """
146    Chunk documents based on semantic similarity.
147    """
148
149    def __init__(self, embedding_model, similarity_threshold: float = 0.7):
150        self.embedder = embedding_model
151        self.threshold = similarity_threshold
152
153    async def chunk(
154        self,
155        text: str,
156        max_chunk_size: int = 1000
157    ) -> List[DocumentChunk]:
158        """Create semantically coherent chunks."""
159        # Split into sentences
160        sentences = self._split_sentences(text)
161
162        # Get embeddings
163        embeddings = await self.embedder.embed_batch(sentences)
164
165        # Group similar sentences
166        chunks = []
167        current_chunk = []
168        current_size = 0
169
170        for i, (sentence, embedding) in enumerate(zip(sentences, embeddings)):
171            if current_size + len(sentence) > max_chunk_size:
172                # Start new chunk
173                if current_chunk:
174                    chunks.append(DocumentChunk(
175                        content=" ".join(current_chunk),
176                        page_number=None,
177                        chunk_index=len(chunks),
178                        metadata={}
179                    ))
180                current_chunk = [sentence]
181                current_size = len(sentence)
182            else:
183                # Check semantic similarity with previous
184                if i > 0:
185                    similarity = self._cosine_similarity(
186                        embeddings[i-1], embedding
187                    )
188                    if similarity < self.threshold and current_chunk:
189                        # Low similarity - start new chunk
190                        chunks.append(DocumentChunk(
191                            content=" ".join(current_chunk),
192                            page_number=None,
193                            chunk_index=len(chunks),
194                            metadata={}
195                        ))
196                        current_chunk = [sentence]
197                        current_size = len(sentence)
198                        continue
199
200                current_chunk.append(sentence)
201                current_size += len(sentence)
202
203        if current_chunk:
204            chunks.append(DocumentChunk(
205                content=" ".join(current_chunk),
206                page_number=None,
207                chunk_index=len(chunks),
208                metadata={}
209            ))
210
211        return chunks
212
213    def _split_sentences(self, text: str) -> List[str]:
214        """Split text into sentences."""
215        return re.split(r"(?<=[.!?])\s+", text)
216
217    def _cosine_similarity(self, a: List[float], b: List[float]) -> float:
218        """Calculate cosine similarity between two vectors."""
219        import numpy as np
220        a = np.array(a)
221        b = np.array(b)
222        return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
Semantic chunking preserves context better than fixed-size chunking, but requires an embedding model. Use fixed-size for simple cases and semantic chunking for complex research.

Other Document Formats

Research may involve various document types. Here's how to handle common formats:

🐍python
1from abc import ABC, abstractmethod
2from pathlib import Path
3
4
5class DocumentProcessor(ABC):
6    """Base class for document processors."""
7
8    @property
9    @abstractmethod
10    def supported_extensions(self) -> List[str]:
11        pass
12
13    @abstractmethod
14    async def process(self, source: str) -> Dict[str, Any]:
15        pass
16
17
18class WordProcessor(DocumentProcessor):
19    """Process Word documents (.docx)."""
20
21    @property
22    def supported_extensions(self) -> List[str]:
23        return [".docx", ".doc"]
24
25    async def process(self, source: str) -> Dict[str, Any]:
26        from docx import Document
27
28        if source.startswith("http"):
29            # Download first
30            async with aiohttp.ClientSession() as session:
31                async with session.get(source) as response:
32                    content = await response.read()
33            doc = Document(io.BytesIO(content))
34        else:
35            doc = Document(source)
36
37        paragraphs = [p.text for p in doc.paragraphs if p.text.strip()]
38
39        # Extract tables
40        tables = []
41        for table in doc.tables:
42            table_data = []
43            for row in table.rows:
44                table_data.append([cell.text for cell in row.cells])
45            tables.append(table_data)
46
47        return {
48            "text": "\n\n".join(paragraphs),
49            "tables": tables,
50            "source": source
51        }
52
53
54class MarkdownProcessor(DocumentProcessor):
55    """Process Markdown files."""
56
57    @property
58    def supported_extensions(self) -> List[str]:
59        return [".md", ".markdown"]
60
61    async def process(self, source: str) -> Dict[str, Any]:
62        if source.startswith("http"):
63            async with aiohttp.ClientSession() as session:
64                async with session.get(source) as response:
65                    text = await response.text()
66        else:
67            with open(source, "r") as f:
68                text = f.read()
69
70        # Parse structure
71        sections = self._parse_sections(text)
72
73        return {
74            "text": text,
75            "sections": sections,
76            "source": source
77        }
78
79    def _parse_sections(self, text: str) -> List[Dict[str, str]]:
80        """Parse markdown into sections by headers."""
81        sections = []
82        current_section = {"title": "", "content": ""}
83
84        for line in text.split("\n"):
85            if line.startswith("#"):
86                if current_section["content"]:
87                    sections.append(current_section)
88                # Count heading level
89                level = len(line.split()[0])
90                title = line.lstrip("#").strip()
91                current_section = {"title": title, "level": level, "content": ""}
92            else:
93                current_section["content"] += line + "\n"
94
95        if current_section["content"]:
96            sections.append(current_section)
97
98        return sections
99
100
101class HTMLProcessor(DocumentProcessor):
102    """Process HTML files."""
103
104    @property
105    def supported_extensions(self) -> List[str]:
106        return [".html", ".htm"]
107
108    async def process(self, source: str) -> Dict[str, Any]:
109        if source.startswith("http"):
110            async with aiohttp.ClientSession() as session:
111                async with session.get(source) as response:
112                    html = await response.text()
113        else:
114            with open(source, "r") as f:
115                html = f.read()
116
117        soup = BeautifulSoup(html, "html.parser")
118
119        # Remove scripts and styles
120        for tag in soup.find_all(["script", "style"]):
121            tag.decompose()
122
123        text = soup.get_text(separator="\n")
124
125        return {
126            "text": text,
127            "title": soup.title.string if soup.title else "",
128            "source": source
129        }
130
131
132class UniversalDocumentProcessor:
133    """
134    Process any supported document type.
135    """
136
137    def __init__(self):
138        self.processors = {
139            ".pdf": PDFProcessor(),
140            ".docx": WordProcessor(),
141            ".doc": WordProcessor(),
142            ".md": MarkdownProcessor(),
143            ".markdown": MarkdownProcessor(),
144            ".html": HTMLProcessor(),
145            ".htm": HTMLProcessor(),
146        }
147
148    async def process(self, source: str) -> Dict[str, Any]:
149        """Process a document from file path or URL."""
150        # Determine extension
151        if "." in source:
152            ext = "." + source.rsplit(".", 1)[-1].lower()
153            ext = ext.split("?")[0]  # Remove query params
154        else:
155            raise ValueError(f"Cannot determine file type: [source]")
156
157        processor = self.processors.get(ext)
158        if not processor:
159            raise ValueError(f"Unsupported file type: [ext]")
160
161        return await processor.process(source)
162
163    def supported_types(self) -> List[str]:
164        """Get list of supported file types."""
165        return list(self.processors.keys())

Document Analysis

Beyond extraction, documents need analysis to extract key information:

🐍python
1class DocumentAnalyzer:
2    """
3    Analyze documents to extract key information.
4    """
5
6    def __init__(self, llm_client):
7        self.llm = llm_client
8
9    async def extract_key_points(
10        self,
11        text: str,
12        max_points: int = 10
13    ) -> List[str]:
14        """Extract key points from a document."""
15        prompt = f"""Extract the [max_points] most important points from this text.
16Return each point on a new line, prefixed with a dash.
17
18TEXT:
19[text[:8000]]
20
21KEY POINTS:"""
22
23        response = await self.llm.generate(prompt)
24
25        points = []
26        for line in response.split("\n"):
27            line = line.strip()
28            if line.startswith("-"):
29                points.append(line[1:].strip())
30            elif line and len(points) < max_points:
31                points.append(line)
32
33        return points[:max_points]
34
35    async def extract_entities(
36        self,
37        text: str
38    ) -> Dict[str, List[str]]:
39        """Extract named entities from text."""
40        prompt = f"""Extract named entities from this text.
41Group them by type: PERSON, ORGANIZATION, LOCATION, DATE, CONCEPT.
42
43TEXT:
44[text[:4000]]
45
46Return in this format:
47PERSON: name1, name2
48ORGANIZATION: org1, org2
49etc."""
50
51        response = await self.llm.generate(prompt)
52
53        entities = {}
54        for line in response.split("\n"):
55            if ":" in line:
56                entity_type, values = line.split(":", 1)
57                entity_type = entity_type.strip().upper()
58                entities[entity_type] = [
59                    v.strip() for v in values.split(",") if v.strip()
60                ]
61
62        return entities
63
64    async def generate_summary(
65        self,
66        text: str,
67        length: str = "medium"
68    ) -> str:
69        """Generate a summary of the document."""
70        length_guides = {
71            "short": "2-3 sentences",
72            "medium": "1-2 paragraphs",
73            "long": "comprehensive summary with key details"
74        }
75
76        prompt = f"""Write a [length_guides.get(length, 'medium')] summary of this text.
77
78TEXT:
79[text[:8000]]
80
81SUMMARY:"""
82
83        return await self.llm.generate(prompt)
84
85    async def extract_citations(
86        self,
87        text: str
88    ) -> List[Dict[str, str]]:
89        """Extract citations and references from academic text."""
90        prompt = f"""Extract all citations and references from this text.
91For each, provide the author(s), title if available, and year.
92
93TEXT:
94[text[:6000]]
95
96Return in this format, one per line:
97- Author(s): [names], Title: [title], Year: [year]"""
98
99        response = await self.llm.generate(prompt)
100
101        citations = []
102        for line in response.split("\n"):
103            if line.strip().startswith("-"):
104                citation = {"raw": line.strip()[1:].strip()}
105                # Parse structured data if possible
106                if "Author" in line:
107                    parts = line.split(",")
108                    for part in parts:
109                        if "Author" in part:
110                            citation["authors"] = part.split(":")[-1].strip()
111                        elif "Title" in part:
112                            citation["title"] = part.split(":")[-1].strip()
113                        elif "Year" in part:
114                            citation["year"] = part.split(":")[-1].strip()
115                citations.append(citation)
116
117        return citations
118
119    async def compare_documents(
120        self,
121        doc1: str,
122        doc2: str
123    ) -> Dict[str, Any]:
124        """Compare two documents for similarities and differences."""
125        prompt = f"""Compare these two texts and identify:
1261. Main similarities
1272. Key differences
1283. Contradictions (if any)
1294. Unique points in each
130
131TEXT 1:
132[doc1[:3000]]
133
134TEXT 2:
135[doc2[:3000]]
136
137Provide a structured comparison."""
138
139        response = await self.llm.generate(prompt)
140
141        return {
142            "comparison": response,
143            "doc1_length": len(doc1),
144            "doc2_length": len(doc2)
145        }
Document analysis with LLMs is powerful but can be slow and expensive for large documents. Consider chunking and parallel processing for efficiency.

Summary

In this section, we built comprehensive document processing capabilities:

  • PDF Processing: Multi-library support with PyMuPDF, pdfplumber, and pypdf
  • Document Chunking: Fixed-size and semantic chunking strategies for LLM processing
  • Format Support: Universal processor for Word, Markdown, HTML, and other formats
  • Document Analysis: LLM-powered extraction of key points, entities, summaries, and citations

In the next section, we'll learn how to synthesize information from multiple sources into coherent research findings.