Chapter 13
30 min read
Section 84 of 175

Complete Research Agent Implementation

Building a Research Agent

Introduction

Now we bring together all the components we've built throughout this chapter into a complete, functional research agent. This implementation combines search, scraping, document processing, synthesis, and verification into a cohesive system.

Key Insight: A well-designed research agent is more than the sum of its parts. The orchestration of components and the flow of information between them determines the quality of research.

Complete Agent Class

Here's the complete research agent that integrates all our components:

🐍python
1import asyncio
2from dataclasses import dataclass, field
3from typing import List, Dict, Any, Optional, AsyncGenerator
4from datetime import datetime
5from pathlib import Path
6from enum import Enum
7
8
9class ResearchDepth(Enum):
10    """How deep should the research go."""
11    QUICK = "quick"           # Fast, surface-level
12    STANDARD = "standard"     # Balanced depth
13    COMPREHENSIVE = "comprehensive"  # Thorough investigation
14    ACADEMIC = "academic"     # Deep, citation-focused
15
16
17@dataclass
18class ResearchConfig:
19    """Configuration for a research session."""
20    depth: ResearchDepth = ResearchDepth.STANDARD
21    max_sources: int = 10
22    verify_facts: bool = True
23    include_citations: bool = True
24    output_format: str = "report"  # report, summary, raw
25    timeout_seconds: int = 300
26
27
28@dataclass
29class ResearchResult:
30    """Complete result of a research session."""
31    question: str
32    answer: str
33    key_findings: List[str]
34    sources: List[Dict[str, str]]
35    citations: List[Dict[str, str]]
36    verification_results: List[Dict[str, Any]]
37    confidence_score: float
38    audit_trail: Dict[str, Any]
39    metadata: Dict[str, Any]
40
41
42class ResearchAgent:
43    """
44    Complete research agent that searches, gathers, and synthesizes information.
45    """
46
47    def __init__(
48        self,
49        llm_client,
50        search_providers: List[Any] = None,
51        config: ResearchConfig = None
52    ):
53        self.llm = llm_client
54        self.config = config or ResearchConfig()
55
56        # Initialize components
57        self.search_manager = SearchManager()
58        if search_providers:
59            for provider in search_providers:
60                self.search_manager.add_provider(provider)
61
62        self.scraper = EthicalScraper()
63        self.doc_processor = UniversalDocumentProcessor()
64        self.chunker = DocumentChunker()
65
66        # Synthesis and verification
67        self.synthesizer = HierarchicalSynthesizer(llm_client)
68        self.verifier = FactVerifier(llm_client, self.search_manager)
69        self.credibility = CredibilityAssessor(llm_client)
70
71        # Citation and audit
72        self.citation_manager = CitationManager()
73        self.audit_trail = None
74
75    async def research(
76        self,
77        question: str,
78        config: ResearchConfig = None
79    ) -> ResearchResult:
80        """
81        Conduct comprehensive research on a question.
82        """
83        config = config or self.config
84        self.audit_trail = ResearchAuditTrail(
85            research_id=f"research_[datetime.now().strftime('%Y%m%d_%H%M%S')}"
86        )
87
88        try:
89            # Phase 1: Search for sources
90            sources = await self._search_phase(question, config)
91
92            # Phase 2: Gather content
93            gathered_content = await self._gather_phase(sources, config)
94
95            # Phase 3: Analyze and synthesize
96            synthesis = await self._synthesis_phase(
97                gathered_content,
98                question,
99                config
100            )
101
102            # Phase 4: Verify key findings
103            verification_results = []
104            if config.verify_facts and synthesis.key_findings:
105                verification_results = await self._verification_phase(
106                    synthesis.key_findings,
107                    sources,
108                    config
109                )
110
111            # Phase 5: Generate final output
112            result = await self._generate_result(
113                question,
114                synthesis,
115                gathered_content,
116                verification_results,
117                config
118            )
119
120            return result
121
122        except Exception as e:
123            # Return partial results on error
124            return ResearchResult(
125                question=question,
126                answer=f"Research encountered an error: [str(e)]",
127                key_findings=[],
128                sources=[],
129                citations=[],
130                verification_results=[],
131                confidence_score=0.0,
132                audit_trail=self.audit_trail.generate_report() if self.audit_trail else {},
133                metadata={"error": str(e)}
134            )
135
136    async def _search_phase(
137        self,
138        question: str,
139        config: ResearchConfig
140    ) -> List[Dict[str, str]]:
141        """Execute search phase."""
142        # Generate search queries based on depth
143        queries = await self._generate_search_queries(question, config.depth)
144
145        all_results = []
146        for query in queries:
147            results = await self.search_manager.search(
148                query,
149                num_results=config.max_sources // len(queries)
150            )
151
152            for result in results:
153                all_results.append({
154                    "url": result.url,
155                    "title": result.title,
156                    "snippet": result.snippet
157                })
158
159            self.audit_trail.log_search(
160                query,
161                len(results),
162                [r.url for r in results]
163            )
164
165        # Deduplicate
166        seen_urls = set()
167        unique_results = []
168        for result in all_results:
169            if result["url"] not in seen_urls:
170                seen_urls.add(result["url"])
171                unique_results.append(result)
172
173        return unique_results[:config.max_sources]
174
175    async def _gather_phase(
176        self,
177        sources: List[Dict[str, str]],
178        config: ResearchConfig
179    ) -> List[Dict[str, Any]]:
180        """Gather content from sources."""
181        gathered = []
182
183        for source in sources:
184            url = source["url"]
185
186            try:
187                # Check if it's a document
188                if url.lower().endswith((".pdf", ".docx", ".doc")):
189                    content = await self.doc_processor.process(url)
190                    text = content.get("text", "")
191                else:
192                    # Scrape web page
193                    scraped = await self.scraper.scrape(url)
194                    text = scraped.main_text if scraped.success else ""
195
196                if text:
197                    # Assess credibility
198                    cred_score = await self.credibility.assess(url, text)
199
200                    gathered.append({
201                        "url": url,
202                        "title": source.get("title", ""),
203                        "content": text,
204                        "credibility": cred_score.overall_score,
205                        "metadata": {"snippet": source.get("snippet", "")}
206                    })
207
208                    # Add citation
209                    citation = self.citation_manager.add_citation(
210                        source_url=url,
211                        source_title=source.get("title", ""),
212                    )
213
214                    self.audit_trail.log_scrape(
215                        url,
216                        success=True,
217                        content_length=len(text)
218                    )
219
220            except Exception as e:
221                self.audit_trail.log_scrape(url, success=False, error=str(e))
222                continue
223
224        # Sort by credibility
225        gathered.sort(key=lambda x: x["credibility"], reverse=True)
226
227        return gathered
228
229    async def _synthesis_phase(
230        self,
231        content: List[Dict[str, Any]],
232        question: str,
233        config: ResearchConfig
234    ) -> SynthesisOutput:
235        """Synthesize information from gathered content."""
236        # Prepare sources for synthesis
237        synthesis_sources = [
238            {
239                "url": c["url"],
240                "title": c["title"],
241                "content": c["content"][:3000]  # Limit content size
242            }
243            for c in content
244        ]
245
246        # Choose synthesis strategy based on depth
247        if config.depth == ResearchDepth.QUICK:
248            strategy = AggregativeSynthesis(self.llm)
249            input_data = SynthesisInput(
250                sources=synthesis_sources[:5],
251                query=question
252            )
253            synthesis = await strategy.synthesize(input_data)
254        else:
255            synthesis = await self.synthesizer.synthesize(
256                synthesis_sources,
257                question
258            )
259
260        self.audit_trail.log_synthesis(
261            [s["url"] for s in synthesis_sources],
262            len(synthesis.key_findings),
263            config.depth.value
264        )
265
266        return synthesis
267
268    async def _verification_phase(
269        self,
270        findings: List[str],
271        sources: List[Dict[str, str]],
272        config: ResearchConfig
273    ) -> List[Dict[str, Any]]:
274        """Verify key findings."""
275        verification_results = []
276
277        # Verify top findings
278        findings_to_verify = findings[:5] if config.depth != ResearchDepth.ACADEMIC else findings
279
280        for finding in findings_to_verify:
281            # Get original source
282            original_source = sources[0]["url"] if sources else ""
283
284            result = await self.verifier.verify_claim(
285                finding,
286                original_source,
287                min_sources=2 if config.depth == ResearchDepth.QUICK else 3
288            )
289
290            verification_results.append({
291                "claim": finding,
292                "verified": result.verified,
293                "confidence": result.confidence,
294                "supporting_sources": result.supporting_sources,
295                "contradicting_sources": result.contradicting_sources
296            })
297
298            self.audit_trail.log_verification(
299                finding,
300                result.verified,
301                result.supporting_sources
302            )
303
304        return verification_results
305
306    async def _generate_result(
307        self,
308        question: str,
309        synthesis: SynthesisOutput,
310        content: List[Dict[str, Any]],
311        verification: List[Dict[str, Any]],
312        config: ResearchConfig
313    ) -> ResearchResult:
314        """Generate final research result."""
315        # Calculate confidence
316        base_confidence = synthesis.confidence
317        if verification:
318            verified_count = sum(1 for v in verification if v["verified"])
319            verification_boost = (verified_count / len(verification)) * 0.2
320            confidence = min(1.0, base_confidence + verification_boost)
321        else:
322            confidence = base_confidence
323
324        # Generate formatted answer
325        if config.output_format == "report":
326            generator = ReportGenerator(self.llm)
327            report = await generator.generate_report(
328                synthesis,
329                [{"url": c["url"], "title": c["title"]} for c in content],
330                question
331            )
332            answer = report.executive_summary
333        else:
334            answer = synthesis.summary
335
336        return ResearchResult(
337            question=question,
338            answer=answer,
339            key_findings=synthesis.key_findings,
340            sources=[
341                {"url": c["url"], "title": c["title"], "credibility": c["credibility"]}
342                for c in content
343            ],
344            citations=self.citation_manager.export_citations(),
345            verification_results=verification,
346            confidence_score=confidence,
347            audit_trail=self.audit_trail.generate_report(),
348            metadata={
349                "depth": config.depth.value,
350                "sources_analyzed": len(content),
351                "findings_verified": len(verification)
352            }
353        )
354
355    async def _generate_search_queries(
356        self,
357        question: str,
358        depth: ResearchDepth
359    ) -> List[str]:
360        """Generate search queries based on depth."""
361        optimizer = QueryOptimizer(self.llm)
362
363        if depth == ResearchDepth.QUICK:
364            return [question]
365        elif depth == ResearchDepth.STANDARD:
366            return await optimizer.expand_query(question)
367        elif depth == ResearchDepth.COMPREHENSIVE:
368            base_queries = await optimizer.expand_query(question)
369            # Add more specific queries
370            additional = [
371                f"[question] research",
372                f"[question] expert analysis",
373                f"[question] data statistics"
374            ]
375            return base_queries + additional
376        else:  # ACADEMIC
377            base_queries = await optimizer.expand_query(question)
378            academic_queries = [
379                f"[question] peer reviewed",
380                f"[question] academic paper",
381                f"[question] systematic review"
382            ]
383            return base_queries + academic_queries

Research Workflow

The research agent follows a structured workflow with streaming updates:

🐍python
1class StreamingResearchAgent(ResearchAgent):
2    """
3    Research agent with streaming progress updates.
4    """
5
6    async def research_stream(
7        self,
8        question: str,
9        config: ResearchConfig = None
10    ) -> AsyncGenerator[Dict[str, Any], None]:
11        """
12        Conduct research with streaming progress updates.
13        """
14        config = config or self.config
15        self.audit_trail = ResearchAuditTrail(
16            research_id=f"research_[datetime.now().strftime('%Y%m%d_%H%M%S')}"
17        )
18
19        # Phase 1: Search
20        yield {
21            "phase": "search",
22            "status": "starting",
23            "message": "Generating search queries..."
24        }
25
26        queries = await self._generate_search_queries(question, config.depth)
27        yield {
28            "phase": "search",
29            "status": "progress",
30            "message": f"Searching with [len(queries)] queries..."
31        }
32
33        sources = await self._search_phase(question, config)
34        yield {
35            "phase": "search",
36            "status": "complete",
37            "message": f"Found [len(sources)] sources",
38            "data": {"source_count": len(sources)}
39        }
40
41        # Phase 2: Gather
42        yield {
43            "phase": "gather",
44            "status": "starting",
45            "message": "Gathering content from sources..."
46        }
47
48        gathered = []
49        for i, source in enumerate(sources):
50            yield {
51                "phase": "gather",
52                "status": "progress",
53                "message": f"Processing source [i+1]/[len(sources)]: [source['title'][:50]}..."
54            }
55
56            # Process source
57            try:
58                scraped = await self.scraper.scrape(source["url"])
59                if scraped.success:
60                    cred = await self.credibility.assess(
61                        source["url"],
62                        scraped.main_text
63                    )
64                    gathered.append({
65                        "url": source["url"],
66                        "title": source["title"],
67                        "content": scraped.main_text,
68                        "credibility": cred.overall_score
69                    })
70            except:
71                pass
72
73        yield {
74            "phase": "gather",
75            "status": "complete",
76            "message": f"Successfully gathered [len(gathered)] sources",
77            "data": {"gathered_count": len(gathered)}
78        }
79
80        # Phase 3: Synthesize
81        yield {
82            "phase": "synthesize",
83            "status": "starting",
84            "message": "Synthesizing information..."
85        }
86
87        synthesis = await self._synthesis_phase(gathered, question, config)
88
89        yield {
90            "phase": "synthesize",
91            "status": "complete",
92            "message": f"Generated [len(synthesis.key_findings)] key findings",
93            "data": {
94                "findings_count": len(synthesis.key_findings),
95                "summary_preview": synthesis.summary[:200]
96            }
97        }
98
99        # Phase 4: Verify
100        verification_results = []
101        if config.verify_facts and synthesis.key_findings:
102            yield {
103                "phase": "verify",
104                "status": "starting",
105                "message": "Verifying key claims..."
106            }
107
108            for i, finding in enumerate(synthesis.key_findings[:5]):
109                yield {
110                    "phase": "verify",
111                    "status": "progress",
112                    "message": f"Verifying finding [i+1]/[min(5, len(synthesis.key_findings))]..."
113                }
114
115                result = await self.verifier.verify_claim(
116                    finding,
117                    sources[0]["url"] if sources else "",
118                    min_sources=2
119                )
120                verification_results.append({
121                    "claim": finding,
122                    "verified": result.verified,
123                    "confidence": result.confidence
124                })
125
126            verified_count = sum(1 for v in verification_results if v["verified"])
127            yield {
128                "phase": "verify",
129                "status": "complete",
130                "message": f"Verified [verified_count]/[len(verification_results)] claims",
131                "data": {"verification_results": verification_results}
132            }
133
134        # Phase 5: Final result
135        yield {
136            "phase": "complete",
137            "status": "complete",
138            "message": "Research complete!",
139            "data": {
140                "question": question,
141                "answer": synthesis.summary,
142                "findings": synthesis.key_findings,
143                "sources": [
144                    {"url": g["url"], "title": g["title"]}
145                    for g in gathered
146                ],
147                "verification": verification_results,
148                "confidence": synthesis.confidence,
149                "audit": self.audit_trail.generate_report()
150            }
151        }
152
153
154class InteractiveResearchAgent(StreamingResearchAgent):
155    """
156    Research agent that can handle follow-up questions.
157    """
158
159    def __init__(self, *args, **kwargs):
160        super().__init__(*args, **kwargs)
161        self.research_history: List[ResearchResult] = []
162        self.context_sources: List[Dict] = []
163
164    async def ask_followup(
165        self,
166        question: str
167    ) -> ResearchResult:
168        """
169        Answer a follow-up question using existing research context.
170        """
171        # Check if we can answer from existing sources
172        context = "\n".join([
173            f"Source: [s['title']}\n[s.get('content', '')[:1000]}"
174            for s in self.context_sources[:5]
175        ])
176
177        # Try to answer from context first
178        prompt = f"""Based on the following research context, answer this follow-up question.
179If the context doesn't contain enough information, respond with "NEED_MORE_RESEARCH".
180
181Previous research context:
182[context]
183
184Follow-up question: [question]
185
186Answer:"""
187
188        response = await self.llm.generate(prompt)
189
190        if "NEED_MORE_RESEARCH" in response:
191            # Conduct new research
192            result = await self.research(question)
193            self.context_sources.extend(result.sources[:5])
194            return result
195
196        # Use existing sources
197        return ResearchResult(
198            question=question,
199            answer=response,
200            key_findings=[],
201            sources=self.context_sources[:5],
202            citations=self.citation_manager.export_citations(),
203            verification_results=[],
204            confidence_score=0.7,
205            audit_trail={},
206            metadata={"type": "followup", "used_existing_context": True}
207        )
208
209    async def research(self, question: str, config: ResearchConfig = None) -> ResearchResult:
210        """Override to track context."""
211        result = await super().research(question, config)
212        self.research_history.append(result)
213        self.context_sources.extend(result.sources)
214        return result

CLI Interface

A user-friendly CLI for interacting with the research agent:

🐍python
1import argparse
2import asyncio
3from rich.console import Console
4from rich.progress import Progress, SpinnerColumn, TextColumn
5from rich.panel import Panel
6from rich.markdown import Markdown
7from rich.table import Table
8
9
10console = Console()
11
12
13async def main():
14    parser = argparse.ArgumentParser(description="AI Research Agent")
15    parser.add_argument("question", nargs="?", help="Research question")
16    parser.add_argument("--depth", choices=["quick", "standard", "comprehensive", "academic"],
17                       default="standard", help="Research depth")
18    parser.add_argument("--no-verify", action="store_true",
19                       help="Skip fact verification")
20    parser.add_argument("--output", choices=["report", "summary", "json"],
21                       default="report", help="Output format")
22    parser.add_argument("--interactive", action="store_true",
23                       help="Start interactive session")
24
25    args = parser.parse_args()
26
27    # Initialize agent
28    from anthropic import AsyncAnthropic
29    llm_client = AsyncAnthropic()
30
31    agent = InteractiveResearchAgent(
32        llm_client=llm_client,
33        search_providers=[
34            BraveSearchProvider(api_key=os.getenv("BRAVE_API_KEY")),
35        ]
36    )
37
38    if args.interactive:
39        await interactive_session(agent)
40    elif args.question:
41        await single_research(agent, args)
42    else:
43        console.print("[red]Please provide a research question or use --interactive[/red]")
44
45
46async def single_research(agent, args):
47    """Conduct a single research session."""
48    config = ResearchConfig(
49        depth=ResearchDepth(args.depth),
50        verify_facts=not args.no_verify,
51        output_format=args.output
52    )
53
54    console.print(Panel(
55        f"[bold blue]Researching:[/bold blue] [args.question]",
56        title="AI Research Agent"
57    ))
58
59    with Progress(
60        SpinnerColumn(),
61        TextColumn("[progress.description][task.description]"),
62        console=console
63    ) as progress:
64        task = progress.add_task("Starting research...", total=None)
65
66        async for update in agent.research_stream(args.question, config):
67            progress.update(task, description=update["message"])
68
69            if update["status"] == "complete" and update["phase"] == "complete":
70                result = update["data"]
71
72    # Display results
73    console.print()
74    console.print(Panel(
75        Markdown(result["answer"]),
76        title="[bold green]Research Answer[/bold green]"
77    ))
78
79    # Key findings
80    if result["findings"]:
81        console.print("\n[bold]Key Findings:[/bold]")
82        for i, finding in enumerate(result["findings"], 1):
83            verified = any(
84                v["claim"] == finding and v["verified"]
85                for v in result.get("verification", [])
86            )
87            status = "[green]✓[/green]" if verified else "[yellow]?[/yellow]"
88            console.print(f"  [status] [i]. [finding]")
89
90    # Sources
91    if result["sources"]:
92        console.print("\n[bold]Sources:[/bold]")
93        for source in result["sources"][:5]:
94            console.print(f"  - [source['title'][:60]}...")
95            console.print(f"    [dim][source['url']][/dim]")
96
97    console.print(f"\n[dim]Confidence: [result['confidence']:.0%}[/dim]")
98
99
100async def interactive_session(agent):
101    """Run an interactive research session."""
102    console.print(Panel(
103        "[bold]AI Research Agent - Interactive Mode[/bold]\n\n"
104        "Commands:\n"
105        "  /depth <quick|standard|comprehensive|academic> - Set research depth\n"
106        "  /sources - Show sources from last research\n"
107        "  /export <filename> - Export results to file\n"
108        "  /quit - Exit\n",
109        title="Welcome"
110    ))
111
112    config = ResearchConfig()
113
114    while True:
115        try:
116            question = console.input("\n[bold cyan]Research>[/bold cyan] ")
117
118            if not question.strip():
119                continue
120
121            if question.startswith("/"):
122                await handle_command(agent, question, config)
123                continue
124
125            # Conduct research
126            console.print()
127            with console.status("[bold green]Researching..."):
128                result = await agent.research(question, config)
129
130            # Display answer
131            console.print(Panel(
132                Markdown(result.answer),
133                title="Answer"
134            ))
135
136            # Show key findings
137            if result.key_findings:
138                console.print("\n[bold]Key Findings:[/bold]")
139                for finding in result.key_findings[:5]:
140                    console.print(f"  - [finding]")
141
142            console.print(f"\n[dim]Sources: [len(result.sources)] | "
143                         f"Confidence: [result.confidence_score:.0%}[/dim]")
144
145        except KeyboardInterrupt:
146            console.print("\n[yellow]Interrupted[/yellow]")
147            break
148        except EOFError:
149            break
150
151    console.print("[green]Goodbye![/green]")
152
153
154async def handle_command(agent, command: str, config: ResearchConfig):
155    """Handle CLI commands."""
156    parts = command[1:].split()
157    cmd = parts[0].lower()
158
159    if cmd == "quit":
160        raise EOFError()
161
162    elif cmd == "depth" and len(parts) > 1:
163        try:
164            config.depth = ResearchDepth(parts[1])
165            console.print(f"[green]Depth set to [parts[1]][/green]")
166        except ValueError:
167            console.print("[red]Invalid depth. Use: quick, standard, comprehensive, academic[/red]")
168
169    elif cmd == "sources":
170        if agent.context_sources:
171            table = Table(title="Sources")
172            table.add_column("Title")
173            table.add_column("URL")
174            for source in agent.context_sources[:10]:
175                table.add_row(source["title"][:40], source["url"])
176            console.print(table)
177        else:
178            console.print("[yellow]No sources available yet[/yellow]")
179
180    elif cmd == "export" and len(parts) > 1:
181        if agent.research_history:
182            import json
183            filename = parts[1]
184            with open(filename, "w") as f:
185                last_result = agent.research_history[-1]
186                json.dump({
187                    "question": last_result.question,
188                    "answer": last_result.answer,
189                    "findings": last_result.key_findings,
190                    "sources": last_result.sources,
191                    "citations": last_result.citations
192                }, f, indent=2)
193            console.print(f"[green]Exported to [filename][/green]")
194        else:
195            console.print("[yellow]No research to export[/yellow]")
196
197    else:
198        console.print("[red]Unknown command[/red]")
199
200
201if __name__ == "__main__":
202    asyncio.run(main())
The CLI interface uses the rich library for beautiful terminal output. Install it with: pip install rich

Usage Examples

Here are examples of using the research agent:

🐍python
1# Example 1: Quick research
2async def quick_research_example():
3    agent = ResearchAgent(llm_client)
4
5    result = await agent.research(
6        "What is the current state of quantum computing?",
7        config=ResearchConfig(depth=ResearchDepth.QUICK)
8    )
9
10    print(f"Answer: [result.answer]")
11    print(f"Sources: [len(result.sources)]")
12
13
14# Example 2: Comprehensive research with verification
15async def comprehensive_research_example():
16    agent = ResearchAgent(llm_client)
17
18    result = await agent.research(
19        "What are the environmental impacts of electric vehicles vs gasoline cars?",
20        config=ResearchConfig(
21            depth=ResearchDepth.COMPREHENSIVE,
22            verify_facts=True,
23            max_sources=15
24        )
25    )
26
27    print(f"Answer: [result.answer[:500]}...")
28    print(f"\nKey Findings:")
29    for finding in result.key_findings:
30        print(f"  - [finding]")
31
32    print(f"\nVerification Results:")
33    for v in result.verification_results:
34        status = "Verified" if v["verified"] else "Unverified"
35        print(f"  [[status]] [v['claim'][:50]}...")
36
37
38# Example 3: Academic research
39async def academic_research_example():
40    agent = ResearchAgent(
41        llm_client,
42        search_providers=[
43            SerpAPIProvider(api_key=os.getenv("SERPAPI_KEY"), engine="google_scholar")
44        ]
45    )
46
47    result = await agent.research(
48        "What are the latest advances in transformer architectures for NLP?",
49        config=ResearchConfig(
50            depth=ResearchDepth.ACADEMIC,
51            include_citations=True
52        )
53    )
54
55    print(f"Answer: [result.answer]")
56    print(f"\nCitations:")
57    for citation in result.citations[:10]:
58        print(f"  [[citation['id']]] [citation['title']]")
59        print(f"      [citation['url']]")
60
61
62# Example 4: Streaming research with progress
63async def streaming_research_example():
64    agent = StreamingResearchAgent(llm_client)
65
66    async for update in agent.research_stream(
67        "What are the pros and cons of remote work?",
68        config=ResearchConfig(depth=ResearchDepth.STANDARD)
69    ):
70        print(f"[[update['phase']}] [update['message']]")
71
72        if update["phase"] == "complete":
73            data = update["data"]
74            print(f"\nFinal Answer: [data['answer'][:300]}...")
75
76
77# Example 5: Interactive follow-up questions
78async def interactive_example():
79    agent = InteractiveResearchAgent(llm_client)
80
81    # Initial research
82    result = await agent.research(
83        "What is machine learning?"
84    )
85    print(f"Initial: [result.answer[:200]}...")
86
87    # Follow-up question uses existing context
88    followup = await agent.ask_followup(
89        "What are the main types of machine learning?"
90    )
91    print(f"\nFollow-up: [followup.answer[:200]}...")

Summary

In this chapter, we built a complete research agent with the following capabilities:

  • Architecture: Modular design with clear separation of concerns for search, scraping, processing, synthesis, and verification
  • Web Search: Multi-provider search with query optimization and result ranking
  • Content Extraction: Ethical web scraping with dynamic content support
  • Document Processing: PDF and multi-format document handling with intelligent chunking
  • Information Synthesis: Hierarchical synthesis with conflict resolution
  • Verification: Fact-checking against multiple sources with credibility scoring
  • Citation Management: Complete audit trail and proper attribution

This research agent can be extended and customized for specific domains like legal research, academic research, or market analysis. The modular architecture makes it easy to swap components or add new capabilities.

In the next chapter, we'll explore multi-agent architectures where multiple specialized agents work together on complex tasks.