Introduction
Credible research requires proper citation and verification. Every claim should be traceable to its source, and key facts should be verified across multiple sources. This section covers how to build robust citation and verification systems for your research agent.
Key Insight: Trust but verify. A research agent should never present information without attribution, and critical claims should be cross-referenced against multiple sources.
Citation Management
Every piece of information extracted should be traceable to its source:
🐍python
1from dataclasses import dataclass, field
2from typing import List, Dict, Any, Optional
3from datetime import datetime
4from enum import Enum
5import hashlib
6
7
8class CitationStyle(Enum):
9 """Supported citation styles."""
10 APA = "apa"
11 MLA = "mla"
12 CHICAGO = "chicago"
13 IEEE = "ieee"
14 INLINE = "inline"
15
16
17@dataclass
18class Citation:
19 """Represents a citation to a source."""
20 source_url: str
21 source_title: str
22 author: Optional[str]
23 date_accessed: datetime
24 date_published: Optional[datetime]
25 page_or_section: Optional[str]
26 quote: Optional[str] # Exact text if directly quoted
27 citation_id: str = field(default_factory=lambda: "")
28
29 def __post_init__(self):
30 if not self.citation_id:
31 # Generate unique ID
32 content = f"[self.source_url][self.quote or '']"
33 self.citation_id = hashlib.md5(content.encode()).hexdigest()[:8]
34
35 def format(self, style: CitationStyle = CitationStyle.INLINE) -> str:
36 """Format citation in specified style."""
37 if style == CitationStyle.INLINE:
38 return f"[[self.citation_id]]"
39 elif style == CitationStyle.APA:
40 author = self.author or "Unknown"
41 year = self.date_published.year if self.date_published else "n.d."
42 return f"[author] ([year]). [self.source_title]. Retrieved from [self.source_url]"
43 elif style == CitationStyle.MLA:
44 author = self.author or "Unknown"
45 return f'[author]. "[self.source_title]." Web. [self.date_accessed.strftime("%d %b %Y")].'
46 else:
47 return f"[self.source_title] - [self.source_url]"
48
49
50class CitationManager:
51 """
52 Manages citations throughout the research process.
53 """
54
55 def __init__(self):
56 self.citations: Dict[str, Citation] = {}
57 self.claim_citations: Dict[str, List[str]] = {} # claim_hash -> citation_ids
58
59 def add_citation(
60 self,
61 source_url: str,
62 source_title: str,
63 quote: str = None,
64 **kwargs
65 ) -> Citation:
66 """Add a new citation."""
67 citation = Citation(
68 source_url=source_url,
69 source_title=source_title,
70 quote=quote,
71 date_accessed=datetime.now(),
72 **kwargs
73 )
74 self.citations[citation.citation_id] = citation
75 return citation
76
77 def cite_claim(
78 self,
79 claim: str,
80 citation: Citation
81 ) -> str:
82 """Associate a claim with a citation."""
83 claim_hash = hashlib.md5(claim.encode()).hexdigest()[:12]
84
85 if claim_hash not in self.claim_citations:
86 self.claim_citations[claim_hash] = []
87
88 if citation.citation_id not in self.claim_citations[claim_hash]:
89 self.claim_citations[claim_hash].append(citation.citation_id)
90
91 return f"[claim] [[citation.citation_id]]"
92
93 def get_citations_for_claim(self, claim: str) -> List[Citation]:
94 """Get all citations for a claim."""
95 claim_hash = hashlib.md5(claim.encode()).hexdigest()[:12]
96 citation_ids = self.claim_citations.get(claim_hash, [])
97 return [self.citations[cid] for cid in citation_ids if cid in self.citations]
98
99 def generate_bibliography(
100 self,
101 style: CitationStyle = CitationStyle.APA
102 ) -> str:
103 """Generate a bibliography in the specified style."""
104 entries = []
105 for citation in sorted(self.citations.values(), key=lambda c: c.source_title):
106 entries.append(citation.format(style))
107 return "\n\n".join(entries)
108
109 def generate_footnotes(self) -> Dict[str, str]:
110 """Generate footnotes for all citations."""
111 return {
112 cid: f"[c.source_title], [c.source_url]"
113 for cid, c in self.citations.items()
114 }
115
116 def export_citations(self) -> List[Dict[str, Any]]:
117 """Export all citations as structured data."""
118 return [
119 {
120 "id": c.citation_id,
121 "url": c.source_url,
122 "title": c.source_title,
123 "author": c.author,
124 "accessed": c.date_accessed.isoformat(),
125 "published": c.date_published.isoformat() if c.date_published else None,
126 "quote": c.quote
127 }
128 for c in self.citations.values()
129 ]Fact Verification
Critical claims should be verified across multiple sources:
🐍python
1@dataclass
2class VerificationResult:
3 """Result of fact verification."""
4 claim: str
5 verified: bool
6 confidence: float
7 supporting_sources: List[str]
8 contradicting_sources: List[str]
9 verification_notes: str
10
11
12class FactVerifier:
13 """
14 Verify facts against multiple sources.
15 """
16
17 def __init__(self, llm_client, search_manager):
18 self.llm = llm_client
19 self.search = search_manager
20
21 async def verify_claim(
22 self,
23 claim: str,
24 original_source: str,
25 min_sources: int = 2
26 ) -> VerificationResult:
27 """
28 Verify a claim by searching for corroborating sources.
29 """
30 # Search for related content
31 search_results = await self.search.search(claim, num_results=10)
32
33 # Filter out the original source
34 other_sources = [
35 r for r in search_results
36 if original_source not in r.url
37 ]
38
39 if len(other_sources) < min_sources:
40 return VerificationResult(
41 claim=claim,
42 verified=False,
43 confidence=0.3,
44 supporting_sources=[],
45 contradicting_sources=[],
46 verification_notes="Insufficient sources for verification"
47 )
48
49 # Check each source
50 supporting = []
51 contradicting = []
52
53 for source in other_sources[:5]:
54 result = await self._check_source(claim, source)
55 if result == "supports":
56 supporting.append(source.url)
57 elif result == "contradicts":
58 contradicting.append(source.url)
59
60 # Calculate confidence
61 total_relevant = len(supporting) + len(contradicting)
62 if total_relevant == 0:
63 confidence = 0.4
64 else:
65 confidence = len(supporting) / total_relevant
66
67 return VerificationResult(
68 claim=claim,
69 verified=confidence > 0.6 and len(contradicting) == 0,
70 confidence=confidence,
71 supporting_sources=supporting,
72 contradicting_sources=contradicting,
73 verification_notes=self._generate_notes(supporting, contradicting)
74 )
75
76 async def _check_source(
77 self,
78 claim: str,
79 source
80 ) -> str:
81 """Check if a source supports, contradicts, or is neutral on a claim."""
82 prompt = f"""Determine if this source supports, contradicts, or is neutral about the claim.
83
84CLAIM: [claim]
85
86SOURCE: [source.title]
87CONTENT: [source.snippet[:500]]
88
89Answer with exactly one word: supports, contradicts, or neutral"""
90
91 response = await self.llm.generate(prompt)
92 response = response.strip().lower()
93
94 if "support" in response:
95 return "supports"
96 elif "contradict" in response:
97 return "contradicts"
98 return "neutral"
99
100 def _generate_notes(
101 self,
102 supporting: List[str],
103 contradicting: List[str]
104 ) -> str:
105 """Generate verification notes."""
106 notes = []
107 if supporting:
108 notes.append(f"Supported by [len(supporting)] additional source(s)")
109 if contradicting:
110 notes.append(f"Contradicted by [len(contradicting)] source(s)")
111 if not supporting and not contradicting:
112 notes.append("No corroborating or contradicting sources found")
113 return "; ".join(notes)
114
115 async def verify_multiple_claims(
116 self,
117 claims: List[str],
118 original_source: str
119 ) -> List[VerificationResult]:
120 """Verify multiple claims in parallel."""
121 import asyncio
122 tasks = [
123 self.verify_claim(claim, original_source)
124 for claim in claims
125 ]
126 return await asyncio.gather(*tasks)
127
128
129class CrossReferenceChecker:
130 """
131 Cross-reference claims across gathered sources.
132 """
133
134 def __init__(self, llm_client):
135 self.llm = llm_client
136
137 async def find_corroboration(
138 self,
139 claim: str,
140 sources: List[Dict[str, str]]
141 ) -> Dict[str, Any]:
142 """Find sources that corroborate a claim."""
143 corroborating = []
144 contradicting = []
145
146 for source in sources:
147 result = await self._check_corroboration(claim, source)
148 if result["corroborates"]:
149 corroborating.append({
150 "url": source.get("url"),
151 "title": source.get("title"),
152 "relevant_text": result["relevant_text"]
153 })
154 elif result["contradicts"]:
155 contradicting.append({
156 "url": source.get("url"),
157 "title": source.get("title"),
158 "relevant_text": result["relevant_text"]
159 })
160
161 return {
162 "claim": claim,
163 "corroborating_sources": corroborating,
164 "contradicting_sources": contradicting,
165 "corroboration_count": len(corroborating),
166 "verification_status": self._determine_status(corroborating, contradicting)
167 }
168
169 async def _check_corroboration(
170 self,
171 claim: str,
172 source: Dict[str, str]
173 ) -> Dict[str, Any]:
174 """Check if a source corroborates a claim."""
175 prompt = f"""Does this source corroborate or contradict the following claim?
176
177CLAIM: [claim]
178
179SOURCE CONTENT:
180[source.get("content", "")[:1500]]
181
182Respond with:
183STATUS: corroborates / contradicts / neutral
184RELEVANT_TEXT: Quote the specific text that supports your answer (or "N/A" if neutral)"""
185
186 response = await self.llm.generate(prompt)
187
188 status = "neutral"
189 relevant_text = ""
190
191 for line in response.split("\n"):
192 if line.startswith("STATUS:"):
193 status = line[7:].strip().lower()
194 elif line.startswith("RELEVANT_TEXT:"):
195 relevant_text = line[14:].strip()
196
197 return {
198 "corroborates": "corroborate" in status,
199 "contradicts": "contradict" in status,
200 "relevant_text": relevant_text
201 }
202
203 def _determine_status(
204 self,
205 corroborating: List,
206 contradicting: List
207 ) -> str:
208 """Determine overall verification status."""
209 if len(corroborating) >= 2 and len(contradicting) == 0:
210 return "verified"
211 elif len(contradicting) > len(corroborating):
212 return "disputed"
213 elif len(corroborating) > 0:
214 return "partially_verified"
215 else:
216 return "unverified"Source Credibility
Not all sources are equally reliable. Here's how to assess source credibility:
🐍python
1@dataclass
2class CredibilityScore:
3 """Credibility assessment for a source."""
4 url: str
5 overall_score: float # 0-1
6 domain_reputation: float
7 content_quality: float
8 author_credibility: float
9 recency_score: float
10 bias_assessment: str
11 factors: Dict[str, Any]
12
13
14class CredibilityAssessor:
15 """
16 Assess the credibility of sources.
17 """
18
19 # Known domain reputations
20 DOMAIN_SCORES = {
21 # Academic and government
22 ".edu": 0.85,
23 ".gov": 0.9,
24 "arxiv.org": 0.9,
25 "nature.com": 0.95,
26 "science.org": 0.95,
27 "ncbi.nlm.nih.gov": 0.9,
28
29 # Reference
30 "wikipedia.org": 0.7,
31 "britannica.com": 0.8,
32
33 # News (quality)
34 "reuters.com": 0.8,
35 "apnews.com": 0.8,
36 "bbc.com": 0.75,
37
38 # Tech documentation
39 "docs.python.org": 0.9,
40 "developer.mozilla.org": 0.9,
41
42 # Social/User generated
43 "reddit.com": 0.4,
44 "quora.com": 0.45,
45 "medium.com": 0.5,
46 }
47
48 def __init__(self, llm_client = None):
49 self.llm = llm_client
50
51 async def assess(
52 self,
53 url: str,
54 content: str,
55 metadata: Dict[str, Any] = None
56 ) -> CredibilityScore:
57 """Assess the credibility of a source."""
58 metadata = metadata or {}
59
60 # Domain reputation
61 domain_score = self._assess_domain(url)
62
63 # Content quality
64 content_score = await self._assess_content_quality(content)
65
66 # Author credibility
67 author_score = self._assess_author(metadata.get("author"))
68
69 # Recency
70 recency_score = self._assess_recency(metadata.get("published_date"))
71
72 # Bias assessment
73 bias = await self._assess_bias(content) if self.llm else "unknown"
74
75 # Calculate overall score
76 weights = {
77 "domain": 0.3,
78 "content": 0.3,
79 "author": 0.2,
80 "recency": 0.2
81 }
82
83 overall = (
84 domain_score * weights["domain"] +
85 content_score * weights["content"] +
86 author_score * weights["author"] +
87 recency_score * weights["recency"]
88 )
89
90 return CredibilityScore(
91 url=url,
92 overall_score=overall,
93 domain_reputation=domain_score,
94 content_quality=content_score,
95 author_credibility=author_score,
96 recency_score=recency_score,
97 bias_assessment=bias,
98 factors={
99 "domain": self._get_domain(url),
100 "has_author": bool(metadata.get("author")),
101 "has_date": bool(metadata.get("published_date")),
102 "content_length": len(content)
103 }
104 )
105
106 def _get_domain(self, url: str) -> str:
107 """Extract domain from URL."""
108 from urllib.parse import urlparse
109 return urlparse(url).netloc.replace("www.", "")
110
111 def _assess_domain(self, url: str) -> float:
112 """Assess credibility based on domain."""
113 domain = self._get_domain(url)
114
115 # Check exact matches first
116 for known_domain, score in self.DOMAIN_SCORES.items():
117 if known_domain in domain:
118 return score
119
120 # Check TLD
121 if ".edu" in domain or ".gov" in domain:
122 return 0.85
123 elif ".org" in domain:
124 return 0.6
125
126 return 0.5 # Default score
127
128 async def _assess_content_quality(self, content: str) -> float:
129 """Assess content quality indicators."""
130 score = 0.5
131
132 # Length check
133 if len(content) > 1000:
134 score += 0.1
135 if len(content) > 3000:
136 score += 0.1
137
138 # Check for citations/references
139 if "[" in content or "reference" in content.lower():
140 score += 0.1
141
142 # Check for data/statistics
143 import re
144 if re.search(r"\d+%|\d+\s*(percent|million|billion)", content):
145 score += 0.1
146
147 return min(1.0, score)
148
149 def _assess_author(self, author: Optional[str]) -> float:
150 """Assess author credibility."""
151 if not author:
152 return 0.4
153
154 # Having a named author is positive
155 score = 0.6
156
157 # Check for credentials
158 credentials = ["Ph.D", "Dr.", "Professor", "MD", "PhD"]
159 if any(cred in author for cred in credentials):
160 score = 0.8
161
162 return score
163
164 def _assess_recency(self, date_str: Optional[str]) -> float:
165 """Assess recency of content."""
166 if not date_str:
167 return 0.5
168
169 try:
170 from datetime import datetime
171 # Parse various date formats
172 for fmt in ["%Y-%m-%d", "%Y-%m", "%Y"]:
173 try:
174 pub_date = datetime.strptime(date_str[:10], fmt)
175 break
176 except:
177 continue
178 else:
179 return 0.5
180
181 days_old = (datetime.now() - pub_date).days
182
183 if days_old < 30:
184 return 0.95
185 elif days_old < 365:
186 return 0.85
187 elif days_old < 365 * 2:
188 return 0.7
189 elif days_old < 365 * 5:
190 return 0.5
191 else:
192 return 0.3
193 except:
194 return 0.5
195
196 async def _assess_bias(self, content: str) -> str:
197 """Assess potential bias in content."""
198 if not self.llm:
199 return "unknown"
200
201 prompt = f"""Assess the objectivity of this text. Is it:
202- objective (presents facts without opinion)
203- slightly_biased (mostly factual with some opinion)
204- moderately_biased (clear perspective but acknowledges others)
205- heavily_biased (strong advocacy or one-sided)
206
207TEXT:
208[content[:2000]]
209
210Answer with one word: objective, slightly_biased, moderately_biased, or heavily_biased"""
211
212 response = await self.llm.generate(prompt)
213 return response.strip().lower()Credibility assessment is not infallible. Always encourage users to verify critical information through their own research.
Audit Trail
Maintain a complete audit trail of how information was gathered:
🐍python
1from datetime import datetime
2from enum import Enum
3
4
5class ActionType(Enum):
6 SEARCH = "search"
7 SCRAPE = "scrape"
8 EXTRACT = "extract"
9 SYNTHESIZE = "synthesize"
10 VERIFY = "verify"
11 CITE = "cite"
12
13
14@dataclass
15class AuditEntry:
16 """A single entry in the audit trail."""
17 timestamp: datetime
18 action_type: ActionType
19 description: str
20 input_data: Dict[str, Any]
21 output_summary: str
22 source_urls: List[str]
23 success: bool
24 error: Optional[str] = None
25
26
27class ResearchAuditTrail:
28 """
29 Maintain complete audit trail for research transparency.
30 """
31
32 def __init__(self, research_id: str):
33 self.research_id = research_id
34 self.entries: List[AuditEntry] = []
35 self.started_at = datetime.now()
36
37 def log_search(
38 self,
39 query: str,
40 results_count: int,
41 urls: List[str]
42 ) -> None:
43 """Log a search action."""
44 self.entries.append(AuditEntry(
45 timestamp=datetime.now(),
46 action_type=ActionType.SEARCH,
47 description=f"Searched for: [query]",
48 input_data={"query": query},
49 output_summary=f"Found [results_count] results",
50 source_urls=urls,
51 success=True
52 ))
53
54 def log_scrape(
55 self,
56 url: str,
57 success: bool,
58 content_length: int = 0,
59 error: str = None
60 ) -> None:
61 """Log a scrape action."""
62 self.entries.append(AuditEntry(
63 timestamp=datetime.now(),
64 action_type=ActionType.SCRAPE,
65 description=f"Scraped: [url]",
66 input_data={"url": url},
67 output_summary=f"Extracted [content_length] characters" if success else "Failed",
68 source_urls=[url],
69 success=success,
70 error=error
71 ))
72
73 def log_extraction(
74 self,
75 source_url: str,
76 extracted_items: List[str],
77 extraction_type: str
78 ) -> None:
79 """Log an extraction action."""
80 self.entries.append(AuditEntry(
81 timestamp=datetime.now(),
82 action_type=ActionType.EXTRACT,
83 description=f"Extracted [extraction_type] from [source_url]",
84 input_data={"url": source_url, "type": extraction_type},
85 output_summary=f"Extracted [len(extracted_items)] items",
86 source_urls=[source_url],
87 success=True
88 ))
89
90 def log_synthesis(
91 self,
92 source_urls: List[str],
93 findings_count: int,
94 strategy: str
95 ) -> None:
96 """Log a synthesis action."""
97 self.entries.append(AuditEntry(
98 timestamp=datetime.now(),
99 action_type=ActionType.SYNTHESIZE,
100 description=f"Synthesized using [strategy] strategy",
101 input_data={"source_count": len(source_urls), "strategy": strategy},
102 output_summary=f"Generated [findings_count] findings",
103 source_urls=source_urls,
104 success=True
105 ))
106
107 def log_verification(
108 self,
109 claim: str,
110 verified: bool,
111 supporting_sources: List[str]
112 ) -> None:
113 """Log a verification action."""
114 self.entries.append(AuditEntry(
115 timestamp=datetime.now(),
116 action_type=ActionType.VERIFY,
117 description=f"Verified claim: [claim[:50]}...",
118 input_data={"claim": claim},
119 output_summary="Verified" if verified else "Not verified",
120 source_urls=supporting_sources,
121 success=True
122 ))
123
124 def generate_report(self) -> Dict[str, Any]:
125 """Generate an audit report."""
126 action_counts = {}
127 for entry in self.entries:
128 action_counts[entry.action_type.value] = action_counts.get(
129 entry.action_type.value, 0
130 ) + 1
131
132 unique_sources = set()
133 for entry in self.entries:
134 unique_sources.update(entry.source_urls)
135
136 duration = datetime.now() - self.started_at
137
138 return {
139 "research_id": self.research_id,
140 "started_at": self.started_at.isoformat(),
141 "duration_seconds": duration.total_seconds(),
142 "total_actions": len(self.entries),
143 "action_breakdown": action_counts,
144 "unique_sources_accessed": len(unique_sources),
145 "sources_list": list(unique_sources),
146 "errors": [
147 {"action": e.action_type.value, "error": e.error}
148 for e in self.entries if e.error
149 ]
150 }
151
152 def export_timeline(self) -> List[Dict[str, Any]]:
153 """Export chronological timeline of actions."""
154 return [
155 {
156 "time": e.timestamp.isoformat(),
157 "action": e.action_type.value,
158 "description": e.description,
159 "success": e.success,
160 "sources": e.source_urls[:3] # Limit for readability
161 }
162 for e in self.entries
163 ]A complete audit trail enables reproducible research and helps users understand exactly how findings were derived.
Summary
In this section, we built systems for ensuring research integrity:
- Citation Management: Track and format citations for every claim
- Fact Verification: Cross-reference claims against multiple sources
- Credibility Assessment: Score source reliability based on multiple factors
- Audit Trail: Complete logging of all research actions for transparency
In the next section, we'll bring everything together into a complete, working research agent implementation.