Chapter 13
20 min read
Section 80 of 175

Web Scraping and Extraction

Building a Research Agent

Introduction

Once our research agent discovers relevant sources through search, it needs to extract the actual content from those pages. Web scraping and content extraction transform raw HTML into clean, structured information that the agent can analyze and synthesize.

Key Insight: Effective content extraction is about finding the signal in the noise. A web page may contain navigation, ads, and boilerplate, but the research agent needs only the main content.

Scraping Fundamentals

Web scraping involves fetching web pages and parsing their content. Here's a robust scraping implementation:

🐍python
1import aiohttp
2import asyncio
3from bs4 import BeautifulSoup
4from dataclasses import dataclass
5from typing import Optional, Dict, Any, List
6from urllib.parse import urljoin, urlparse
7import re
8
9
10@dataclass
11class ScrapedContent:
12    """Content extracted from a web page."""
13    url: str
14    title: str
15    main_text: str
16    links: List[Dict[str, str]]
17    metadata: Dict[str, Any]
18    success: bool
19    error: Optional[str] = None
20
21
22class WebScraper:
23    """
24    Robust web scraper with retry logic and content extraction.
25    """
26
27    def __init__(
28        self,
29        timeout: int = 30,
30        max_retries: int = 3,
31        user_agent: str = None
32    ):
33        self.timeout = timeout
34        self.max_retries = max_retries
35        self.user_agent = user_agent or (
36            "Mozilla/5.0 (compatible; ResearchBot/1.0; +https://example.com/bot)"
37        )
38
39    async def fetch(self, url: str) -> Optional[str]:
40        """Fetch raw HTML from a URL."""
41        headers = {
42            "User-Agent": self.user_agent,
43            "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
44            "Accept-Language": "en-US,en;q=0.5",
45        }
46
47        for attempt in range(self.max_retries):
48            try:
49                async with aiohttp.ClientSession() as session:
50                    async with session.get(
51                        url,
52                        headers=headers,
53                        timeout=aiohttp.ClientTimeout(total=self.timeout),
54                        allow_redirects=True
55                    ) as response:
56                        if response.status == 200:
57                            return await response.text()
58                        elif response.status == 429:  # Rate limited
59                            await asyncio.sleep(2 ** attempt)
60                        elif response.status >= 400:
61                            return None
62            except asyncio.TimeoutError:
63                await asyncio.sleep(1)
64            except Exception as e:
65                if attempt == self.max_retries - 1:
66                    raise
67
68        return None
69
70    async def scrape(self, url: str) -> ScrapedContent:
71        """Scrape a URL and extract structured content."""
72        try:
73            html = await self.fetch(url)
74            if not html:
75                return ScrapedContent(
76                    url=url,
77                    title="",
78                    main_text="",
79                    links=[],
80                    metadata={},
81                    success=False,
82                    error="Failed to fetch page"
83                )
84
85            soup = BeautifulSoup(html, "html.parser")
86
87            # Extract title
88            title = self._extract_title(soup)
89
90            # Extract main content
91            main_text = self._extract_main_content(soup)
92
93            # Extract links
94            links = self._extract_links(soup, url)
95
96            # Extract metadata
97            metadata = self._extract_metadata(soup)
98
99            return ScrapedContent(
100                url=url,
101                title=title,
102                main_text=main_text,
103                links=links,
104                metadata=metadata,
105                success=True
106            )
107
108        except Exception as e:
109            return ScrapedContent(
110                url=url,
111                title="",
112                main_text="",
113                links=[],
114                metadata={},
115                success=False,
116                error=str(e)
117            )
118
119    def _extract_title(self, soup: BeautifulSoup) -> str:
120        """Extract page title."""
121        # Try og:title first
122        og_title = soup.find("meta", property="og:title")
123        if og_title:
124            return og_title.get("content", "")
125
126        # Fall back to title tag
127        title_tag = soup.find("title")
128        if title_tag:
129            return title_tag.get_text().strip()
130
131        # Try h1
132        h1 = soup.find("h1")
133        if h1:
134            return h1.get_text().strip()
135
136        return ""
137
138    def _extract_main_content(self, soup: BeautifulSoup) -> str:
139        """Extract the main content from a page."""
140        # Remove unwanted elements
141        for element in soup.find_all([
142            "script", "style", "nav", "header", "footer",
143            "aside", "form", "noscript", "iframe"
144        ]):
145            element.decompose()
146
147        # Try common content containers
148        content_selectors = [
149            "article",
150            "[role='main']",
151            ".main-content",
152            ".post-content",
153            ".article-content",
154            ".content",
155            "main",
156        ]
157
158        for selector in content_selectors:
159            content = soup.select_one(selector)
160            if content:
161                text = self._clean_text(content.get_text())
162                if len(text) > 200:  # Minimum content length
163                    return text
164
165        # Fall back to body text
166        body = soup.find("body")
167        if body:
168            return self._clean_text(body.get_text())
169
170        return ""
171
172    def _clean_text(self, text: str) -> str:
173        """Clean extracted text."""
174        # Normalize whitespace
175        text = re.sub(r"\s+", " ", text)
176
177        # Remove excess newlines
178        lines = [line.strip() for line in text.split("\n")]
179        lines = [line for line in lines if line]
180
181        return "\n".join(lines)
182
183    def _extract_links(
184        self,
185        soup: BeautifulSoup,
186        base_url: str
187    ) -> List[Dict[str, str]]:
188        """Extract relevant links from the page."""
189        links = []
190        seen_urls = set()
191
192        for a in soup.find_all("a", href=True):
193            href = a.get("href", "")
194            text = a.get_text().strip()
195
196            # Skip empty or javascript links
197            if not href or href.startswith(("javascript:", "#", "mailto:")):
198                continue
199
200            # Make absolute URL
201            absolute_url = urljoin(base_url, href)
202
203            # Normalize and dedupe
204            if absolute_url in seen_urls:
205                continue
206            seen_urls.add(absolute_url)
207
208            links.append({
209                "url": absolute_url,
210                "text": text[:100] if text else ""
211            })
212
213        return links[:50]  # Limit number of links
214
215    def _extract_metadata(self, soup: BeautifulSoup) -> Dict[str, Any]:
216        """Extract metadata from the page."""
217        metadata = {}
218
219        # Description
220        desc = soup.find("meta", attrs={"name": "description"})
221        if desc:
222            metadata["description"] = desc.get("content", "")
223
224        # Author
225        author = soup.find("meta", attrs={"name": "author"})
226        if author:
227            metadata["author"] = author.get("content", "")
228
229        # Published date
230        for prop in ["article:published_time", "datePublished"]:
231            date_meta = soup.find("meta", property=prop)
232            if date_meta:
233                metadata["published_date"] = date_meta.get("content", "")
234                break
235
236        # Keywords
237        keywords = soup.find("meta", attrs={"name": "keywords"})
238        if keywords:
239            metadata["keywords"] = keywords.get("content", "").split(",")
240
241        return metadata

Content Extraction

Different types of pages require different extraction strategies. Here's a more sophisticated content extractor:

🐍python
1from abc import ABC, abstractmethod
2import readability
3from newspaper import Article
4
5
6class ContentExtractor(ABC):
7    """Base class for content extractors."""
8
9    @abstractmethod
10    async def extract(self, html: str, url: str) -> Dict[str, Any]:
11        pass
12
13
14class ReadabilityExtractor(ContentExtractor):
15    """
16    Use Mozilla's Readability algorithm for clean content extraction.
17    """
18
19    async def extract(self, html: str, url: str) -> Dict[str, Any]:
20        try:
21            doc = readability.Document(html)
22            return {
23                "title": doc.title(),
24                "content": doc.summary(),
25                "short_title": doc.short_title(),
26                "success": True
27            }
28        except Exception as e:
29            return {"success": False, "error": str(e)}
30
31
32class NewspaperExtractor(ContentExtractor):
33    """
34    Use newspaper3k for article extraction.
35    Optimized for news articles and blog posts.
36    """
37
38    async def extract(self, html: str, url: str) -> Dict[str, Any]:
39        try:
40            article = Article(url)
41            article.set_html(html)
42            article.parse()
43
44            return {
45                "title": article.title,
46                "content": article.text,
47                "authors": article.authors,
48                "publish_date": str(article.publish_date) if article.publish_date else None,
49                "top_image": article.top_image,
50                "keywords": article.keywords,
51                "summary": article.summary if hasattr(article, "summary") else None,
52                "success": True
53            }
54        except Exception as e:
55            return {"success": False, "error": str(e)}
56
57
58class StructuredDataExtractor(ContentExtractor):
59    """
60    Extract structured data (JSON-LD, microdata) from pages.
61    """
62
63    async def extract(self, html: str, url: str) -> Dict[str, Any]:
64        soup = BeautifulSoup(html, "html.parser")
65        structured_data = []
66
67        # Extract JSON-LD
68        for script in soup.find_all("script", type="application/ld+json"):
69            try:
70                import json
71                data = json.loads(script.string)
72                structured_data.append(data)
73            except:
74                continue
75
76        return {
77            "structured_data": structured_data,
78            "success": len(structured_data) > 0
79        }
80
81
82class SmartExtractor:
83    """
84    Intelligently choose the best extraction method based on content type.
85    """
86
87    def __init__(self):
88        self.extractors = {
89            "readability": ReadabilityExtractor(),
90            "newspaper": NewspaperExtractor(),
91            "structured": StructuredDataExtractor(),
92        }
93
94    async def extract(self, html: str, url: str) -> Dict[str, Any]:
95        """Extract content using the best available method."""
96        results = {}
97        best_content = ""
98
99        # Try all extractors
100        for name, extractor in self.extractors.items():
101            try:
102                result = await extractor.extract(html, url)
103                results[name] = result
104
105                # Track best content by length
106                content = result.get("content", "")
107                if len(content) > len(best_content):
108                    best_content = content
109            except:
110                continue
111
112        # Combine results
113        combined = {
114            "url": url,
115            "title": self._get_best_title(results),
116            "content": best_content,
117            "metadata": self._merge_metadata(results),
118            "structured_data": results.get("structured", {}).get("structured_data", []),
119        }
120
121        return combined
122
123    def _get_best_title(self, results: Dict) -> str:
124        """Get the best title from all extractors."""
125        for key in ["newspaper", "readability"]:
126            if key in results and results[key].get("title"):
127                return results[key]["title"]
128        return ""
129
130    def _merge_metadata(self, results: Dict) -> Dict:
131        """Merge metadata from all extractors."""
132        metadata = {}
133
134        if "newspaper" in results:
135            nr = results["newspaper"]
136            if nr.get("authors"):
137                metadata["authors"] = nr["authors"]
138            if nr.get("publish_date"):
139                metadata["publish_date"] = nr["publish_date"]
140            if nr.get("keywords"):
141                metadata["keywords"] = nr["keywords"]
142
143        return metadata
Combine multiple extraction libraries for best results. Readability works well for articles, while structured data extraction captures machine-readable information.

Handling Dynamic Content

Many modern websites load content dynamically with JavaScript. For these, we need browser automation:

🐍python
1from playwright.async_api import async_playwright, Browser, Page
2
3
4class DynamicScraper:
5    """
6    Scraper for JavaScript-rendered content using Playwright.
7    """
8
9    def __init__(self, headless: bool = True):
10        self.headless = headless
11        self.browser: Optional[Browser] = None
12
13    async def start(self) -> None:
14        """Start the browser."""
15        playwright = await async_playwright().start()
16        self.browser = await playwright.chromium.launch(headless=self.headless)
17
18    async def stop(self) -> None:
19        """Stop the browser."""
20        if self.browser:
21            await self.browser.close()
22
23    async def scrape(
24        self,
25        url: str,
26        wait_for: str = None,
27        timeout: int = 30000
28    ) -> ScrapedContent:
29        """
30        Scrape a JavaScript-rendered page.
31
32        Args:
33            url: URL to scrape
34            wait_for: CSS selector to wait for before extracting
35            timeout: Maximum wait time in milliseconds
36        """
37        if not self.browser:
38            await self.start()
39
40        page = await self.browser.new_page()
41
42        try:
43            # Navigate to page
44            await page.goto(url, wait_until="networkidle", timeout=timeout)
45
46            # Wait for specific element if requested
47            if wait_for:
48                await page.wait_for_selector(wait_for, timeout=timeout)
49
50            # Additional wait for dynamic content
51            await page.wait_for_timeout(1000)
52
53            # Get rendered HTML
54            html = await page.content()
55
56            # Extract content using BeautifulSoup
57            soup = BeautifulSoup(html, "html.parser")
58
59            return ScrapedContent(
60                url=url,
61                title=await self._get_title(page, soup),
62                main_text=await self._get_content(page, soup),
63                links=self._extract_links(soup, url),
64                metadata=self._extract_metadata(soup),
65                success=True
66            )
67
68        except Exception as e:
69            return ScrapedContent(
70                url=url,
71                title="",
72                main_text="",
73                links=[],
74                metadata={},
75                success=False,
76                error=str(e)
77            )
78        finally:
79            await page.close()
80
81    async def _get_title(self, page: Page, soup: BeautifulSoup) -> str:
82        """Get page title."""
83        title = await page.title()
84        if title:
85            return title
86
87        h1 = soup.find("h1")
88        return h1.get_text().strip() if h1 else ""
89
90    async def _get_content(self, page: Page, soup: BeautifulSoup) -> str:
91        """Extract main content from rendered page."""
92        # Try to get text from main content area
93        content_selectors = [
94            "article",
95            "main",
96            ".content",
97            ".post-content",
98            "[role='main']"
99        ]
100
101        for selector in content_selectors:
102            elements = await page.query_selector_all(selector)
103            if elements:
104                texts = []
105                for el in elements:
106                    text = await el.inner_text()
107                    texts.append(text)
108                return "\n".join(texts)
109
110        # Fall back to body text
111        body = await page.query_selector("body")
112        if body:
113            return await body.inner_text()
114
115        return ""
116
117    async def scrape_with_interaction(
118        self,
119        url: str,
120        actions: List[Dict[str, Any]]
121    ) -> ScrapedContent:
122        """
123        Scrape a page after performing interactions.
124
125        Args:
126            url: URL to scrape
127            actions: List of actions like click, scroll, type
128        """
129        if not self.browser:
130            await self.start()
131
132        page = await self.browser.new_page()
133
134        try:
135            await page.goto(url, wait_until="networkidle")
136
137            # Perform actions
138            for action in actions:
139                action_type = action.get("type")
140
141                if action_type == "click":
142                    await page.click(action["selector"])
143                elif action_type == "scroll":
144                    await page.evaluate("window.scrollBy(0, window.innerHeight)")
145                elif action_type == "type":
146                    await page.fill(action["selector"], action["text"])
147                elif action_type == "wait":
148                    await page.wait_for_timeout(action.get("ms", 1000))
149
150            # Get final content
151            html = await page.content()
152            soup = BeautifulSoup(html, "html.parser")
153
154            return ScrapedContent(
155                url=url,
156                title=await self._get_title(page, soup),
157                main_text=await self._get_content(page, soup),
158                links=[],
159                metadata={},
160                success=True
161            )
162
163        finally:
164            await page.close()
Browser-based scraping is resource-intensive. Use it only when necessary for JavaScript-rendered content. Always prefer lightweight HTTP-based scraping when possible.

Rate Limiting and Ethics

Responsible scraping requires respecting websites and their owners:

🐍python
1import asyncio
2from collections import defaultdict
3from time import time
4
5
6class RateLimiter:
7    """
8    Rate limiter that respects per-domain limits.
9    """
10
11    def __init__(
12        self,
13        requests_per_second: float = 1.0,
14        per_domain_limit: float = 0.5
15    ):
16        self.global_limit = requests_per_second
17        self.domain_limit = per_domain_limit
18        self.last_request: Dict[str, float] = defaultdict(float)
19        self.lock = asyncio.Lock()
20
21    async def acquire(self, url: str) -> None:
22        """Wait until we can make a request to this URL."""
23        domain = urlparse(url).netloc
24
25        async with self.lock:
26            now = time()
27
28            # Check global limit
29            global_wait = (1.0 / self.global_limit) - (now - self.last_request["_global"])
30            if global_wait > 0:
31                await asyncio.sleep(global_wait)
32                now = time()
33
34            # Check domain limit
35            domain_wait = (1.0 / self.domain_limit) - (now - self.last_request[domain])
36            if domain_wait > 0:
37                await asyncio.sleep(domain_wait)
38
39            # Update timestamps
40            self.last_request["_global"] = time()
41            self.last_request[domain] = time()
42
43
44class RobotsTxtChecker:
45    """
46    Check robots.txt before scraping.
47    """
48
49    def __init__(self):
50        self.cache: Dict[str, Dict] = {}
51
52    async def can_fetch(self, url: str, user_agent: str = "*") -> bool:
53        """Check if we're allowed to fetch this URL."""
54        parsed = urlparse(url)
55        robots_url = f"[parsed.scheme]://[parsed.netloc]/robots.txt"
56
57        # Check cache
58        if robots_url not in self.cache:
59            await self._fetch_robots(robots_url)
60
61        rules = self.cache.get(robots_url, {})
62        path = parsed.path
63
64        # Check disallowed paths
65        disallowed = rules.get("disallow", [])
66        for pattern in disallowed:
67            if path.startswith(pattern):
68                return False
69
70        return True
71
72    async def _fetch_robots(self, robots_url: str) -> None:
73        """Fetch and parse robots.txt."""
74        try:
75            async with aiohttp.ClientSession() as session:
76                async with session.get(robots_url, timeout=10) as response:
77                    if response.status != 200:
78                        self.cache[robots_url] = {}
79                        return
80
81                    text = await response.text()
82                    self.cache[robots_url] = self._parse_robots(text)
83        except:
84            self.cache[robots_url] = {}
85
86    def _parse_robots(self, text: str) -> Dict:
87        """Simple robots.txt parser."""
88        rules = {"disallow": [], "crawl_delay": None}
89
90        for line in text.split("\n"):
91            line = line.strip().lower()
92            if line.startswith("disallow:"):
93                path = line.split(":", 1)[1].strip()
94                if path:
95                    rules["disallow"].append(path)
96            elif line.startswith("crawl-delay:"):
97                try:
98                    rules["crawl_delay"] = float(line.split(":", 1)[1].strip())
99                except:
100                    pass
101
102        return rules
103
104
105class EthicalScraper:
106    """
107    Scraper that follows ethical guidelines.
108    """
109
110    def __init__(self):
111        self.scraper = WebScraper()
112        self.rate_limiter = RateLimiter()
113        self.robots_checker = RobotsTxtChecker()
114
115    async def scrape(self, url: str) -> ScrapedContent:
116        """Scrape a URL ethically."""
117        # Check robots.txt
118        if not await self.robots_checker.can_fetch(url):
119            return ScrapedContent(
120                url=url,
121                title="",
122                main_text="",
123                links=[],
124                metadata={},
125                success=False,
126                error="Blocked by robots.txt"
127            )
128
129        # Wait for rate limit
130        await self.rate_limiter.acquire(url)
131
132        # Perform scrape
133        return await self.scraper.scrape(url)
Always respect robots.txt, implement rate limiting, and consider the impact of your scraping on the target servers. Many sites have APIs that are preferred over scraping.

Summary

In this section, we built a comprehensive web scraping system:

  • Robust Scraping: Retry logic, timeout handling, and error recovery
  • Smart Extraction: Multiple algorithms (Readability, Newspaper) for optimal content extraction
  • Dynamic Content: Playwright-based scraping for JavaScript-rendered pages
  • Ethical Scraping: Rate limiting, robots.txt compliance, and responsible practices

In the next section, we'll cover document processing for PDFs and other file types that our research agent may encounter.