Introduction
Once our research agent discovers relevant sources through search, it needs to extract the actual content from those pages. Web scraping and content extraction transform raw HTML into clean, structured information that the agent can analyze and synthesize.
Key Insight: Effective content extraction is about finding the signal in the noise. A web page may contain navigation, ads, and boilerplate, but the research agent needs only the main content.
Scraping Fundamentals
Web scraping involves fetching web pages and parsing their content. Here's a robust scraping implementation:
1import aiohttp
2import asyncio
3from bs4 import BeautifulSoup
4from dataclasses import dataclass
5from typing import Optional, Dict, Any, List
6from urllib.parse import urljoin, urlparse
7import re
8
9
10@dataclass
11class ScrapedContent:
12 """Content extracted from a web page."""
13 url: str
14 title: str
15 main_text: str
16 links: List[Dict[str, str]]
17 metadata: Dict[str, Any]
18 success: bool
19 error: Optional[str] = None
20
21
22class WebScraper:
23 """
24 Robust web scraper with retry logic and content extraction.
25 """
26
27 def __init__(
28 self,
29 timeout: int = 30,
30 max_retries: int = 3,
31 user_agent: str = None
32 ):
33 self.timeout = timeout
34 self.max_retries = max_retries
35 self.user_agent = user_agent or (
36 "Mozilla/5.0 (compatible; ResearchBot/1.0; +https://example.com/bot)"
37 )
38
39 async def fetch(self, url: str) -> Optional[str]:
40 """Fetch raw HTML from a URL."""
41 headers = {
42 "User-Agent": self.user_agent,
43 "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
44 "Accept-Language": "en-US,en;q=0.5",
45 }
46
47 for attempt in range(self.max_retries):
48 try:
49 async with aiohttp.ClientSession() as session:
50 async with session.get(
51 url,
52 headers=headers,
53 timeout=aiohttp.ClientTimeout(total=self.timeout),
54 allow_redirects=True
55 ) as response:
56 if response.status == 200:
57 return await response.text()
58 elif response.status == 429: # Rate limited
59 await asyncio.sleep(2 ** attempt)
60 elif response.status >= 400:
61 return None
62 except asyncio.TimeoutError:
63 await asyncio.sleep(1)
64 except Exception as e:
65 if attempt == self.max_retries - 1:
66 raise
67
68 return None
69
70 async def scrape(self, url: str) -> ScrapedContent:
71 """Scrape a URL and extract structured content."""
72 try:
73 html = await self.fetch(url)
74 if not html:
75 return ScrapedContent(
76 url=url,
77 title="",
78 main_text="",
79 links=[],
80 metadata={},
81 success=False,
82 error="Failed to fetch page"
83 )
84
85 soup = BeautifulSoup(html, "html.parser")
86
87 # Extract title
88 title = self._extract_title(soup)
89
90 # Extract main content
91 main_text = self._extract_main_content(soup)
92
93 # Extract links
94 links = self._extract_links(soup, url)
95
96 # Extract metadata
97 metadata = self._extract_metadata(soup)
98
99 return ScrapedContent(
100 url=url,
101 title=title,
102 main_text=main_text,
103 links=links,
104 metadata=metadata,
105 success=True
106 )
107
108 except Exception as e:
109 return ScrapedContent(
110 url=url,
111 title="",
112 main_text="",
113 links=[],
114 metadata={},
115 success=False,
116 error=str(e)
117 )
118
119 def _extract_title(self, soup: BeautifulSoup) -> str:
120 """Extract page title."""
121 # Try og:title first
122 og_title = soup.find("meta", property="og:title")
123 if og_title:
124 return og_title.get("content", "")
125
126 # Fall back to title tag
127 title_tag = soup.find("title")
128 if title_tag:
129 return title_tag.get_text().strip()
130
131 # Try h1
132 h1 = soup.find("h1")
133 if h1:
134 return h1.get_text().strip()
135
136 return ""
137
138 def _extract_main_content(self, soup: BeautifulSoup) -> str:
139 """Extract the main content from a page."""
140 # Remove unwanted elements
141 for element in soup.find_all([
142 "script", "style", "nav", "header", "footer",
143 "aside", "form", "noscript", "iframe"
144 ]):
145 element.decompose()
146
147 # Try common content containers
148 content_selectors = [
149 "article",
150 "[role='main']",
151 ".main-content",
152 ".post-content",
153 ".article-content",
154 ".content",
155 "main",
156 ]
157
158 for selector in content_selectors:
159 content = soup.select_one(selector)
160 if content:
161 text = self._clean_text(content.get_text())
162 if len(text) > 200: # Minimum content length
163 return text
164
165 # Fall back to body text
166 body = soup.find("body")
167 if body:
168 return self._clean_text(body.get_text())
169
170 return ""
171
172 def _clean_text(self, text: str) -> str:
173 """Clean extracted text."""
174 # Normalize whitespace
175 text = re.sub(r"\s+", " ", text)
176
177 # Remove excess newlines
178 lines = [line.strip() for line in text.split("\n")]
179 lines = [line for line in lines if line]
180
181 return "\n".join(lines)
182
183 def _extract_links(
184 self,
185 soup: BeautifulSoup,
186 base_url: str
187 ) -> List[Dict[str, str]]:
188 """Extract relevant links from the page."""
189 links = []
190 seen_urls = set()
191
192 for a in soup.find_all("a", href=True):
193 href = a.get("href", "")
194 text = a.get_text().strip()
195
196 # Skip empty or javascript links
197 if not href or href.startswith(("javascript:", "#", "mailto:")):
198 continue
199
200 # Make absolute URL
201 absolute_url = urljoin(base_url, href)
202
203 # Normalize and dedupe
204 if absolute_url in seen_urls:
205 continue
206 seen_urls.add(absolute_url)
207
208 links.append({
209 "url": absolute_url,
210 "text": text[:100] if text else ""
211 })
212
213 return links[:50] # Limit number of links
214
215 def _extract_metadata(self, soup: BeautifulSoup) -> Dict[str, Any]:
216 """Extract metadata from the page."""
217 metadata = {}
218
219 # Description
220 desc = soup.find("meta", attrs={"name": "description"})
221 if desc:
222 metadata["description"] = desc.get("content", "")
223
224 # Author
225 author = soup.find("meta", attrs={"name": "author"})
226 if author:
227 metadata["author"] = author.get("content", "")
228
229 # Published date
230 for prop in ["article:published_time", "datePublished"]:
231 date_meta = soup.find("meta", property=prop)
232 if date_meta:
233 metadata["published_date"] = date_meta.get("content", "")
234 break
235
236 # Keywords
237 keywords = soup.find("meta", attrs={"name": "keywords"})
238 if keywords:
239 metadata["keywords"] = keywords.get("content", "").split(",")
240
241 return metadataContent Extraction
Different types of pages require different extraction strategies. Here's a more sophisticated content extractor:
1from abc import ABC, abstractmethod
2import readability
3from newspaper import Article
4
5
6class ContentExtractor(ABC):
7 """Base class for content extractors."""
8
9 @abstractmethod
10 async def extract(self, html: str, url: str) -> Dict[str, Any]:
11 pass
12
13
14class ReadabilityExtractor(ContentExtractor):
15 """
16 Use Mozilla's Readability algorithm for clean content extraction.
17 """
18
19 async def extract(self, html: str, url: str) -> Dict[str, Any]:
20 try:
21 doc = readability.Document(html)
22 return {
23 "title": doc.title(),
24 "content": doc.summary(),
25 "short_title": doc.short_title(),
26 "success": True
27 }
28 except Exception as e:
29 return {"success": False, "error": str(e)}
30
31
32class NewspaperExtractor(ContentExtractor):
33 """
34 Use newspaper3k for article extraction.
35 Optimized for news articles and blog posts.
36 """
37
38 async def extract(self, html: str, url: str) -> Dict[str, Any]:
39 try:
40 article = Article(url)
41 article.set_html(html)
42 article.parse()
43
44 return {
45 "title": article.title,
46 "content": article.text,
47 "authors": article.authors,
48 "publish_date": str(article.publish_date) if article.publish_date else None,
49 "top_image": article.top_image,
50 "keywords": article.keywords,
51 "summary": article.summary if hasattr(article, "summary") else None,
52 "success": True
53 }
54 except Exception as e:
55 return {"success": False, "error": str(e)}
56
57
58class StructuredDataExtractor(ContentExtractor):
59 """
60 Extract structured data (JSON-LD, microdata) from pages.
61 """
62
63 async def extract(self, html: str, url: str) -> Dict[str, Any]:
64 soup = BeautifulSoup(html, "html.parser")
65 structured_data = []
66
67 # Extract JSON-LD
68 for script in soup.find_all("script", type="application/ld+json"):
69 try:
70 import json
71 data = json.loads(script.string)
72 structured_data.append(data)
73 except:
74 continue
75
76 return {
77 "structured_data": structured_data,
78 "success": len(structured_data) > 0
79 }
80
81
82class SmartExtractor:
83 """
84 Intelligently choose the best extraction method based on content type.
85 """
86
87 def __init__(self):
88 self.extractors = {
89 "readability": ReadabilityExtractor(),
90 "newspaper": NewspaperExtractor(),
91 "structured": StructuredDataExtractor(),
92 }
93
94 async def extract(self, html: str, url: str) -> Dict[str, Any]:
95 """Extract content using the best available method."""
96 results = {}
97 best_content = ""
98
99 # Try all extractors
100 for name, extractor in self.extractors.items():
101 try:
102 result = await extractor.extract(html, url)
103 results[name] = result
104
105 # Track best content by length
106 content = result.get("content", "")
107 if len(content) > len(best_content):
108 best_content = content
109 except:
110 continue
111
112 # Combine results
113 combined = {
114 "url": url,
115 "title": self._get_best_title(results),
116 "content": best_content,
117 "metadata": self._merge_metadata(results),
118 "structured_data": results.get("structured", {}).get("structured_data", []),
119 }
120
121 return combined
122
123 def _get_best_title(self, results: Dict) -> str:
124 """Get the best title from all extractors."""
125 for key in ["newspaper", "readability"]:
126 if key in results and results[key].get("title"):
127 return results[key]["title"]
128 return ""
129
130 def _merge_metadata(self, results: Dict) -> Dict:
131 """Merge metadata from all extractors."""
132 metadata = {}
133
134 if "newspaper" in results:
135 nr = results["newspaper"]
136 if nr.get("authors"):
137 metadata["authors"] = nr["authors"]
138 if nr.get("publish_date"):
139 metadata["publish_date"] = nr["publish_date"]
140 if nr.get("keywords"):
141 metadata["keywords"] = nr["keywords"]
142
143 return metadataHandling Dynamic Content
Many modern websites load content dynamically with JavaScript. For these, we need browser automation:
1from playwright.async_api import async_playwright, Browser, Page
2
3
4class DynamicScraper:
5 """
6 Scraper for JavaScript-rendered content using Playwright.
7 """
8
9 def __init__(self, headless: bool = True):
10 self.headless = headless
11 self.browser: Optional[Browser] = None
12
13 async def start(self) -> None:
14 """Start the browser."""
15 playwright = await async_playwright().start()
16 self.browser = await playwright.chromium.launch(headless=self.headless)
17
18 async def stop(self) -> None:
19 """Stop the browser."""
20 if self.browser:
21 await self.browser.close()
22
23 async def scrape(
24 self,
25 url: str,
26 wait_for: str = None,
27 timeout: int = 30000
28 ) -> ScrapedContent:
29 """
30 Scrape a JavaScript-rendered page.
31
32 Args:
33 url: URL to scrape
34 wait_for: CSS selector to wait for before extracting
35 timeout: Maximum wait time in milliseconds
36 """
37 if not self.browser:
38 await self.start()
39
40 page = await self.browser.new_page()
41
42 try:
43 # Navigate to page
44 await page.goto(url, wait_until="networkidle", timeout=timeout)
45
46 # Wait for specific element if requested
47 if wait_for:
48 await page.wait_for_selector(wait_for, timeout=timeout)
49
50 # Additional wait for dynamic content
51 await page.wait_for_timeout(1000)
52
53 # Get rendered HTML
54 html = await page.content()
55
56 # Extract content using BeautifulSoup
57 soup = BeautifulSoup(html, "html.parser")
58
59 return ScrapedContent(
60 url=url,
61 title=await self._get_title(page, soup),
62 main_text=await self._get_content(page, soup),
63 links=self._extract_links(soup, url),
64 metadata=self._extract_metadata(soup),
65 success=True
66 )
67
68 except Exception as e:
69 return ScrapedContent(
70 url=url,
71 title="",
72 main_text="",
73 links=[],
74 metadata={},
75 success=False,
76 error=str(e)
77 )
78 finally:
79 await page.close()
80
81 async def _get_title(self, page: Page, soup: BeautifulSoup) -> str:
82 """Get page title."""
83 title = await page.title()
84 if title:
85 return title
86
87 h1 = soup.find("h1")
88 return h1.get_text().strip() if h1 else ""
89
90 async def _get_content(self, page: Page, soup: BeautifulSoup) -> str:
91 """Extract main content from rendered page."""
92 # Try to get text from main content area
93 content_selectors = [
94 "article",
95 "main",
96 ".content",
97 ".post-content",
98 "[role='main']"
99 ]
100
101 for selector in content_selectors:
102 elements = await page.query_selector_all(selector)
103 if elements:
104 texts = []
105 for el in elements:
106 text = await el.inner_text()
107 texts.append(text)
108 return "\n".join(texts)
109
110 # Fall back to body text
111 body = await page.query_selector("body")
112 if body:
113 return await body.inner_text()
114
115 return ""
116
117 async def scrape_with_interaction(
118 self,
119 url: str,
120 actions: List[Dict[str, Any]]
121 ) -> ScrapedContent:
122 """
123 Scrape a page after performing interactions.
124
125 Args:
126 url: URL to scrape
127 actions: List of actions like click, scroll, type
128 """
129 if not self.browser:
130 await self.start()
131
132 page = await self.browser.new_page()
133
134 try:
135 await page.goto(url, wait_until="networkidle")
136
137 # Perform actions
138 for action in actions:
139 action_type = action.get("type")
140
141 if action_type == "click":
142 await page.click(action["selector"])
143 elif action_type == "scroll":
144 await page.evaluate("window.scrollBy(0, window.innerHeight)")
145 elif action_type == "type":
146 await page.fill(action["selector"], action["text"])
147 elif action_type == "wait":
148 await page.wait_for_timeout(action.get("ms", 1000))
149
150 # Get final content
151 html = await page.content()
152 soup = BeautifulSoup(html, "html.parser")
153
154 return ScrapedContent(
155 url=url,
156 title=await self._get_title(page, soup),
157 main_text=await self._get_content(page, soup),
158 links=[],
159 metadata={},
160 success=True
161 )
162
163 finally:
164 await page.close()Rate Limiting and Ethics
Responsible scraping requires respecting websites and their owners:
1import asyncio
2from collections import defaultdict
3from time import time
4
5
6class RateLimiter:
7 """
8 Rate limiter that respects per-domain limits.
9 """
10
11 def __init__(
12 self,
13 requests_per_second: float = 1.0,
14 per_domain_limit: float = 0.5
15 ):
16 self.global_limit = requests_per_second
17 self.domain_limit = per_domain_limit
18 self.last_request: Dict[str, float] = defaultdict(float)
19 self.lock = asyncio.Lock()
20
21 async def acquire(self, url: str) -> None:
22 """Wait until we can make a request to this URL."""
23 domain = urlparse(url).netloc
24
25 async with self.lock:
26 now = time()
27
28 # Check global limit
29 global_wait = (1.0 / self.global_limit) - (now - self.last_request["_global"])
30 if global_wait > 0:
31 await asyncio.sleep(global_wait)
32 now = time()
33
34 # Check domain limit
35 domain_wait = (1.0 / self.domain_limit) - (now - self.last_request[domain])
36 if domain_wait > 0:
37 await asyncio.sleep(domain_wait)
38
39 # Update timestamps
40 self.last_request["_global"] = time()
41 self.last_request[domain] = time()
42
43
44class RobotsTxtChecker:
45 """
46 Check robots.txt before scraping.
47 """
48
49 def __init__(self):
50 self.cache: Dict[str, Dict] = {}
51
52 async def can_fetch(self, url: str, user_agent: str = "*") -> bool:
53 """Check if we're allowed to fetch this URL."""
54 parsed = urlparse(url)
55 robots_url = f"[parsed.scheme]://[parsed.netloc]/robots.txt"
56
57 # Check cache
58 if robots_url not in self.cache:
59 await self._fetch_robots(robots_url)
60
61 rules = self.cache.get(robots_url, {})
62 path = parsed.path
63
64 # Check disallowed paths
65 disallowed = rules.get("disallow", [])
66 for pattern in disallowed:
67 if path.startswith(pattern):
68 return False
69
70 return True
71
72 async def _fetch_robots(self, robots_url: str) -> None:
73 """Fetch and parse robots.txt."""
74 try:
75 async with aiohttp.ClientSession() as session:
76 async with session.get(robots_url, timeout=10) as response:
77 if response.status != 200:
78 self.cache[robots_url] = {}
79 return
80
81 text = await response.text()
82 self.cache[robots_url] = self._parse_robots(text)
83 except:
84 self.cache[robots_url] = {}
85
86 def _parse_robots(self, text: str) -> Dict:
87 """Simple robots.txt parser."""
88 rules = {"disallow": [], "crawl_delay": None}
89
90 for line in text.split("\n"):
91 line = line.strip().lower()
92 if line.startswith("disallow:"):
93 path = line.split(":", 1)[1].strip()
94 if path:
95 rules["disallow"].append(path)
96 elif line.startswith("crawl-delay:"):
97 try:
98 rules["crawl_delay"] = float(line.split(":", 1)[1].strip())
99 except:
100 pass
101
102 return rules
103
104
105class EthicalScraper:
106 """
107 Scraper that follows ethical guidelines.
108 """
109
110 def __init__(self):
111 self.scraper = WebScraper()
112 self.rate_limiter = RateLimiter()
113 self.robots_checker = RobotsTxtChecker()
114
115 async def scrape(self, url: str) -> ScrapedContent:
116 """Scrape a URL ethically."""
117 # Check robots.txt
118 if not await self.robots_checker.can_fetch(url):
119 return ScrapedContent(
120 url=url,
121 title="",
122 main_text="",
123 links=[],
124 metadata={},
125 success=False,
126 error="Blocked by robots.txt"
127 )
128
129 # Wait for rate limit
130 await self.rate_limiter.acquire(url)
131
132 # Perform scrape
133 return await self.scraper.scrape(url)Summary
In this section, we built a comprehensive web scraping system:
- Robust Scraping: Retry logic, timeout handling, and error recovery
- Smart Extraction: Multiple algorithms (Readability, Newspaper) for optimal content extraction
- Dynamic Content: Playwright-based scraping for JavaScript-rendered pages
- Ethical Scraping: Rate limiting, robots.txt compliance, and responsible practices
In the next section, we'll cover document processing for PDFs and other file types that our research agent may encounter.