Chapter 13
15 min read
Section 83 of 175

Source Citation and Verification

Building a Research Agent

Introduction

Credible research requires proper citation and verification. Every claim should be traceable to its source, and key facts should be verified across multiple sources. This section covers how to build robust citation and verification systems for your research agent.

Key Insight: Trust but verify. A research agent should never present information without attribution, and critical claims should be cross-referenced against multiple sources.

Citation Management

Every piece of information extracted should be traceable to its source:

🐍python
1from dataclasses import dataclass, field
2from typing import List, Dict, Any, Optional
3from datetime import datetime
4from enum import Enum
5import hashlib
6
7
8class CitationStyle(Enum):
9    """Supported citation styles."""
10    APA = "apa"
11    MLA = "mla"
12    CHICAGO = "chicago"
13    IEEE = "ieee"
14    INLINE = "inline"
15
16
17@dataclass
18class Citation:
19    """Represents a citation to a source."""
20    source_url: str
21    source_title: str
22    author: Optional[str]
23    date_accessed: datetime
24    date_published: Optional[datetime]
25    page_or_section: Optional[str]
26    quote: Optional[str]  # Exact text if directly quoted
27    citation_id: str = field(default_factory=lambda: "")
28
29    def __post_init__(self):
30        if not self.citation_id:
31            # Generate unique ID
32            content = f"[self.source_url][self.quote or '']"
33            self.citation_id = hashlib.md5(content.encode()).hexdigest()[:8]
34
35    def format(self, style: CitationStyle = CitationStyle.INLINE) -> str:
36        """Format citation in specified style."""
37        if style == CitationStyle.INLINE:
38            return f"[[self.citation_id]]"
39        elif style == CitationStyle.APA:
40            author = self.author or "Unknown"
41            year = self.date_published.year if self.date_published else "n.d."
42            return f"[author] ([year]). [self.source_title]. Retrieved from [self.source_url]"
43        elif style == CitationStyle.MLA:
44            author = self.author or "Unknown"
45            return f'[author]. "[self.source_title]." Web. [self.date_accessed.strftime("%d %b %Y")].'
46        else:
47            return f"[self.source_title] - [self.source_url]"
48
49
50class CitationManager:
51    """
52    Manages citations throughout the research process.
53    """
54
55    def __init__(self):
56        self.citations: Dict[str, Citation] = {}
57        self.claim_citations: Dict[str, List[str]] = {}  # claim_hash -> citation_ids
58
59    def add_citation(
60        self,
61        source_url: str,
62        source_title: str,
63        quote: str = None,
64        **kwargs
65    ) -> Citation:
66        """Add a new citation."""
67        citation = Citation(
68            source_url=source_url,
69            source_title=source_title,
70            quote=quote,
71            date_accessed=datetime.now(),
72            **kwargs
73        )
74        self.citations[citation.citation_id] = citation
75        return citation
76
77    def cite_claim(
78        self,
79        claim: str,
80        citation: Citation
81    ) -> str:
82        """Associate a claim with a citation."""
83        claim_hash = hashlib.md5(claim.encode()).hexdigest()[:12]
84
85        if claim_hash not in self.claim_citations:
86            self.claim_citations[claim_hash] = []
87
88        if citation.citation_id not in self.claim_citations[claim_hash]:
89            self.claim_citations[claim_hash].append(citation.citation_id)
90
91        return f"[claim] [[citation.citation_id]]"
92
93    def get_citations_for_claim(self, claim: str) -> List[Citation]:
94        """Get all citations for a claim."""
95        claim_hash = hashlib.md5(claim.encode()).hexdigest()[:12]
96        citation_ids = self.claim_citations.get(claim_hash, [])
97        return [self.citations[cid] for cid in citation_ids if cid in self.citations]
98
99    def generate_bibliography(
100        self,
101        style: CitationStyle = CitationStyle.APA
102    ) -> str:
103        """Generate a bibliography in the specified style."""
104        entries = []
105        for citation in sorted(self.citations.values(), key=lambda c: c.source_title):
106            entries.append(citation.format(style))
107        return "\n\n".join(entries)
108
109    def generate_footnotes(self) -> Dict[str, str]:
110        """Generate footnotes for all citations."""
111        return {
112            cid: f"[c.source_title], [c.source_url]"
113            for cid, c in self.citations.items()
114        }
115
116    def export_citations(self) -> List[Dict[str, Any]]:
117        """Export all citations as structured data."""
118        return [
119            {
120                "id": c.citation_id,
121                "url": c.source_url,
122                "title": c.source_title,
123                "author": c.author,
124                "accessed": c.date_accessed.isoformat(),
125                "published": c.date_published.isoformat() if c.date_published else None,
126                "quote": c.quote
127            }
128            for c in self.citations.values()
129        ]

Fact Verification

Critical claims should be verified across multiple sources:

🐍python
1@dataclass
2class VerificationResult:
3    """Result of fact verification."""
4    claim: str
5    verified: bool
6    confidence: float
7    supporting_sources: List[str]
8    contradicting_sources: List[str]
9    verification_notes: str
10
11
12class FactVerifier:
13    """
14    Verify facts against multiple sources.
15    """
16
17    def __init__(self, llm_client, search_manager):
18        self.llm = llm_client
19        self.search = search_manager
20
21    async def verify_claim(
22        self,
23        claim: str,
24        original_source: str,
25        min_sources: int = 2
26    ) -> VerificationResult:
27        """
28        Verify a claim by searching for corroborating sources.
29        """
30        # Search for related content
31        search_results = await self.search.search(claim, num_results=10)
32
33        # Filter out the original source
34        other_sources = [
35            r for r in search_results
36            if original_source not in r.url
37        ]
38
39        if len(other_sources) < min_sources:
40            return VerificationResult(
41                claim=claim,
42                verified=False,
43                confidence=0.3,
44                supporting_sources=[],
45                contradicting_sources=[],
46                verification_notes="Insufficient sources for verification"
47            )
48
49        # Check each source
50        supporting = []
51        contradicting = []
52
53        for source in other_sources[:5]:
54            result = await self._check_source(claim, source)
55            if result == "supports":
56                supporting.append(source.url)
57            elif result == "contradicts":
58                contradicting.append(source.url)
59
60        # Calculate confidence
61        total_relevant = len(supporting) + len(contradicting)
62        if total_relevant == 0:
63            confidence = 0.4
64        else:
65            confidence = len(supporting) / total_relevant
66
67        return VerificationResult(
68            claim=claim,
69            verified=confidence > 0.6 and len(contradicting) == 0,
70            confidence=confidence,
71            supporting_sources=supporting,
72            contradicting_sources=contradicting,
73            verification_notes=self._generate_notes(supporting, contradicting)
74        )
75
76    async def _check_source(
77        self,
78        claim: str,
79        source
80    ) -> str:
81        """Check if a source supports, contradicts, or is neutral on a claim."""
82        prompt = f"""Determine if this source supports, contradicts, or is neutral about the claim.
83
84CLAIM: [claim]
85
86SOURCE: [source.title]
87CONTENT: [source.snippet[:500]]
88
89Answer with exactly one word: supports, contradicts, or neutral"""
90
91        response = await self.llm.generate(prompt)
92        response = response.strip().lower()
93
94        if "support" in response:
95            return "supports"
96        elif "contradict" in response:
97            return "contradicts"
98        return "neutral"
99
100    def _generate_notes(
101        self,
102        supporting: List[str],
103        contradicting: List[str]
104    ) -> str:
105        """Generate verification notes."""
106        notes = []
107        if supporting:
108            notes.append(f"Supported by [len(supporting)] additional source(s)")
109        if contradicting:
110            notes.append(f"Contradicted by [len(contradicting)] source(s)")
111        if not supporting and not contradicting:
112            notes.append("No corroborating or contradicting sources found")
113        return "; ".join(notes)
114
115    async def verify_multiple_claims(
116        self,
117        claims: List[str],
118        original_source: str
119    ) -> List[VerificationResult]:
120        """Verify multiple claims in parallel."""
121        import asyncio
122        tasks = [
123            self.verify_claim(claim, original_source)
124            for claim in claims
125        ]
126        return await asyncio.gather(*tasks)
127
128
129class CrossReferenceChecker:
130    """
131    Cross-reference claims across gathered sources.
132    """
133
134    def __init__(self, llm_client):
135        self.llm = llm_client
136
137    async def find_corroboration(
138        self,
139        claim: str,
140        sources: List[Dict[str, str]]
141    ) -> Dict[str, Any]:
142        """Find sources that corroborate a claim."""
143        corroborating = []
144        contradicting = []
145
146        for source in sources:
147            result = await self._check_corroboration(claim, source)
148            if result["corroborates"]:
149                corroborating.append({
150                    "url": source.get("url"),
151                    "title": source.get("title"),
152                    "relevant_text": result["relevant_text"]
153                })
154            elif result["contradicts"]:
155                contradicting.append({
156                    "url": source.get("url"),
157                    "title": source.get("title"),
158                    "relevant_text": result["relevant_text"]
159                })
160
161        return {
162            "claim": claim,
163            "corroborating_sources": corroborating,
164            "contradicting_sources": contradicting,
165            "corroboration_count": len(corroborating),
166            "verification_status": self._determine_status(corroborating, contradicting)
167        }
168
169    async def _check_corroboration(
170        self,
171        claim: str,
172        source: Dict[str, str]
173    ) -> Dict[str, Any]:
174        """Check if a source corroborates a claim."""
175        prompt = f"""Does this source corroborate or contradict the following claim?
176
177CLAIM: [claim]
178
179SOURCE CONTENT:
180[source.get("content", "")[:1500]]
181
182Respond with:
183STATUS: corroborates / contradicts / neutral
184RELEVANT_TEXT: Quote the specific text that supports your answer (or "N/A" if neutral)"""
185
186        response = await self.llm.generate(prompt)
187
188        status = "neutral"
189        relevant_text = ""
190
191        for line in response.split("\n"):
192            if line.startswith("STATUS:"):
193                status = line[7:].strip().lower()
194            elif line.startswith("RELEVANT_TEXT:"):
195                relevant_text = line[14:].strip()
196
197        return {
198            "corroborates": "corroborate" in status,
199            "contradicts": "contradict" in status,
200            "relevant_text": relevant_text
201        }
202
203    def _determine_status(
204        self,
205        corroborating: List,
206        contradicting: List
207    ) -> str:
208        """Determine overall verification status."""
209        if len(corroborating) >= 2 and len(contradicting) == 0:
210            return "verified"
211        elif len(contradicting) > len(corroborating):
212            return "disputed"
213        elif len(corroborating) > 0:
214            return "partially_verified"
215        else:
216            return "unverified"

Source Credibility

Not all sources are equally reliable. Here's how to assess source credibility:

🐍python
1@dataclass
2class CredibilityScore:
3    """Credibility assessment for a source."""
4    url: str
5    overall_score: float  # 0-1
6    domain_reputation: float
7    content_quality: float
8    author_credibility: float
9    recency_score: float
10    bias_assessment: str
11    factors: Dict[str, Any]
12
13
14class CredibilityAssessor:
15    """
16    Assess the credibility of sources.
17    """
18
19    # Known domain reputations
20    DOMAIN_SCORES = {
21        # Academic and government
22        ".edu": 0.85,
23        ".gov": 0.9,
24        "arxiv.org": 0.9,
25        "nature.com": 0.95,
26        "science.org": 0.95,
27        "ncbi.nlm.nih.gov": 0.9,
28
29        # Reference
30        "wikipedia.org": 0.7,
31        "britannica.com": 0.8,
32
33        # News (quality)
34        "reuters.com": 0.8,
35        "apnews.com": 0.8,
36        "bbc.com": 0.75,
37
38        # Tech documentation
39        "docs.python.org": 0.9,
40        "developer.mozilla.org": 0.9,
41
42        # Social/User generated
43        "reddit.com": 0.4,
44        "quora.com": 0.45,
45        "medium.com": 0.5,
46    }
47
48    def __init__(self, llm_client = None):
49        self.llm = llm_client
50
51    async def assess(
52        self,
53        url: str,
54        content: str,
55        metadata: Dict[str, Any] = None
56    ) -> CredibilityScore:
57        """Assess the credibility of a source."""
58        metadata = metadata or {}
59
60        # Domain reputation
61        domain_score = self._assess_domain(url)
62
63        # Content quality
64        content_score = await self._assess_content_quality(content)
65
66        # Author credibility
67        author_score = self._assess_author(metadata.get("author"))
68
69        # Recency
70        recency_score = self._assess_recency(metadata.get("published_date"))
71
72        # Bias assessment
73        bias = await self._assess_bias(content) if self.llm else "unknown"
74
75        # Calculate overall score
76        weights = {
77            "domain": 0.3,
78            "content": 0.3,
79            "author": 0.2,
80            "recency": 0.2
81        }
82
83        overall = (
84            domain_score * weights["domain"] +
85            content_score * weights["content"] +
86            author_score * weights["author"] +
87            recency_score * weights["recency"]
88        )
89
90        return CredibilityScore(
91            url=url,
92            overall_score=overall,
93            domain_reputation=domain_score,
94            content_quality=content_score,
95            author_credibility=author_score,
96            recency_score=recency_score,
97            bias_assessment=bias,
98            factors={
99                "domain": self._get_domain(url),
100                "has_author": bool(metadata.get("author")),
101                "has_date": bool(metadata.get("published_date")),
102                "content_length": len(content)
103            }
104        )
105
106    def _get_domain(self, url: str) -> str:
107        """Extract domain from URL."""
108        from urllib.parse import urlparse
109        return urlparse(url).netloc.replace("www.", "")
110
111    def _assess_domain(self, url: str) -> float:
112        """Assess credibility based on domain."""
113        domain = self._get_domain(url)
114
115        # Check exact matches first
116        for known_domain, score in self.DOMAIN_SCORES.items():
117            if known_domain in domain:
118                return score
119
120        # Check TLD
121        if ".edu" in domain or ".gov" in domain:
122            return 0.85
123        elif ".org" in domain:
124            return 0.6
125
126        return 0.5  # Default score
127
128    async def _assess_content_quality(self, content: str) -> float:
129        """Assess content quality indicators."""
130        score = 0.5
131
132        # Length check
133        if len(content) > 1000:
134            score += 0.1
135        if len(content) > 3000:
136            score += 0.1
137
138        # Check for citations/references
139        if "[" in content or "reference" in content.lower():
140            score += 0.1
141
142        # Check for data/statistics
143        import re
144        if re.search(r"\d+%|\d+\s*(percent|million|billion)", content):
145            score += 0.1
146
147        return min(1.0, score)
148
149    def _assess_author(self, author: Optional[str]) -> float:
150        """Assess author credibility."""
151        if not author:
152            return 0.4
153
154        # Having a named author is positive
155        score = 0.6
156
157        # Check for credentials
158        credentials = ["Ph.D", "Dr.", "Professor", "MD", "PhD"]
159        if any(cred in author for cred in credentials):
160            score = 0.8
161
162        return score
163
164    def _assess_recency(self, date_str: Optional[str]) -> float:
165        """Assess recency of content."""
166        if not date_str:
167            return 0.5
168
169        try:
170            from datetime import datetime
171            # Parse various date formats
172            for fmt in ["%Y-%m-%d", "%Y-%m", "%Y"]:
173                try:
174                    pub_date = datetime.strptime(date_str[:10], fmt)
175                    break
176                except:
177                    continue
178            else:
179                return 0.5
180
181            days_old = (datetime.now() - pub_date).days
182
183            if days_old < 30:
184                return 0.95
185            elif days_old < 365:
186                return 0.85
187            elif days_old < 365 * 2:
188                return 0.7
189            elif days_old < 365 * 5:
190                return 0.5
191            else:
192                return 0.3
193        except:
194            return 0.5
195
196    async def _assess_bias(self, content: str) -> str:
197        """Assess potential bias in content."""
198        if not self.llm:
199            return "unknown"
200
201        prompt = f"""Assess the objectivity of this text. Is it:
202- objective (presents facts without opinion)
203- slightly_biased (mostly factual with some opinion)
204- moderately_biased (clear perspective but acknowledges others)
205- heavily_biased (strong advocacy or one-sided)
206
207TEXT:
208[content[:2000]]
209
210Answer with one word: objective, slightly_biased, moderately_biased, or heavily_biased"""
211
212        response = await self.llm.generate(prompt)
213        return response.strip().lower()
Credibility assessment is not infallible. Always encourage users to verify critical information through their own research.

Audit Trail

Maintain a complete audit trail of how information was gathered:

🐍python
1from datetime import datetime
2from enum import Enum
3
4
5class ActionType(Enum):
6    SEARCH = "search"
7    SCRAPE = "scrape"
8    EXTRACT = "extract"
9    SYNTHESIZE = "synthesize"
10    VERIFY = "verify"
11    CITE = "cite"
12
13
14@dataclass
15class AuditEntry:
16    """A single entry in the audit trail."""
17    timestamp: datetime
18    action_type: ActionType
19    description: str
20    input_data: Dict[str, Any]
21    output_summary: str
22    source_urls: List[str]
23    success: bool
24    error: Optional[str] = None
25
26
27class ResearchAuditTrail:
28    """
29    Maintain complete audit trail for research transparency.
30    """
31
32    def __init__(self, research_id: str):
33        self.research_id = research_id
34        self.entries: List[AuditEntry] = []
35        self.started_at = datetime.now()
36
37    def log_search(
38        self,
39        query: str,
40        results_count: int,
41        urls: List[str]
42    ) -> None:
43        """Log a search action."""
44        self.entries.append(AuditEntry(
45            timestamp=datetime.now(),
46            action_type=ActionType.SEARCH,
47            description=f"Searched for: [query]",
48            input_data={"query": query},
49            output_summary=f"Found [results_count] results",
50            source_urls=urls,
51            success=True
52        ))
53
54    def log_scrape(
55        self,
56        url: str,
57        success: bool,
58        content_length: int = 0,
59        error: str = None
60    ) -> None:
61        """Log a scrape action."""
62        self.entries.append(AuditEntry(
63            timestamp=datetime.now(),
64            action_type=ActionType.SCRAPE,
65            description=f"Scraped: [url]",
66            input_data={"url": url},
67            output_summary=f"Extracted [content_length] characters" if success else "Failed",
68            source_urls=[url],
69            success=success,
70            error=error
71        ))
72
73    def log_extraction(
74        self,
75        source_url: str,
76        extracted_items: List[str],
77        extraction_type: str
78    ) -> None:
79        """Log an extraction action."""
80        self.entries.append(AuditEntry(
81            timestamp=datetime.now(),
82            action_type=ActionType.EXTRACT,
83            description=f"Extracted [extraction_type] from [source_url]",
84            input_data={"url": source_url, "type": extraction_type},
85            output_summary=f"Extracted [len(extracted_items)] items",
86            source_urls=[source_url],
87            success=True
88        ))
89
90    def log_synthesis(
91        self,
92        source_urls: List[str],
93        findings_count: int,
94        strategy: str
95    ) -> None:
96        """Log a synthesis action."""
97        self.entries.append(AuditEntry(
98            timestamp=datetime.now(),
99            action_type=ActionType.SYNTHESIZE,
100            description=f"Synthesized using [strategy] strategy",
101            input_data={"source_count": len(source_urls), "strategy": strategy},
102            output_summary=f"Generated [findings_count] findings",
103            source_urls=source_urls,
104            success=True
105        ))
106
107    def log_verification(
108        self,
109        claim: str,
110        verified: bool,
111        supporting_sources: List[str]
112    ) -> None:
113        """Log a verification action."""
114        self.entries.append(AuditEntry(
115            timestamp=datetime.now(),
116            action_type=ActionType.VERIFY,
117            description=f"Verified claim: [claim[:50]}...",
118            input_data={"claim": claim},
119            output_summary="Verified" if verified else "Not verified",
120            source_urls=supporting_sources,
121            success=True
122        ))
123
124    def generate_report(self) -> Dict[str, Any]:
125        """Generate an audit report."""
126        action_counts = {}
127        for entry in self.entries:
128            action_counts[entry.action_type.value] = action_counts.get(
129                entry.action_type.value, 0
130            ) + 1
131
132        unique_sources = set()
133        for entry in self.entries:
134            unique_sources.update(entry.source_urls)
135
136        duration = datetime.now() - self.started_at
137
138        return {
139            "research_id": self.research_id,
140            "started_at": self.started_at.isoformat(),
141            "duration_seconds": duration.total_seconds(),
142            "total_actions": len(self.entries),
143            "action_breakdown": action_counts,
144            "unique_sources_accessed": len(unique_sources),
145            "sources_list": list(unique_sources),
146            "errors": [
147                {"action": e.action_type.value, "error": e.error}
148                for e in self.entries if e.error
149            ]
150        }
151
152    def export_timeline(self) -> List[Dict[str, Any]]:
153        """Export chronological timeline of actions."""
154        return [
155            {
156                "time": e.timestamp.isoformat(),
157                "action": e.action_type.value,
158                "description": e.description,
159                "success": e.success,
160                "sources": e.source_urls[:3]  # Limit for readability
161            }
162            for e in self.entries
163        ]
A complete audit trail enables reproducible research and helps users understand exactly how findings were derived.

Summary

In this section, we built systems for ensuring research integrity:

  • Citation Management: Track and format citations for every claim
  • Fact Verification: Cross-reference claims against multiple sources
  • Credibility Assessment: Score source reliability based on multiple factors
  • Audit Trail: Complete logging of all research actions for transparency

In the next section, we'll bring everything together into a complete, working research agent implementation.