RefCheck / main.py
voidful's picture
Require primary evidence for verified references
a3caaa3 verified
#!/usr/bin/env python3
"""
BibGuard - Citation Hallucination Detector
Validates bibliography entries against multiple academic data sources:
arXiv, CrossRef, DBLP, Semantic Scholar, OpenAlex, and Google Scholar
Usage:
python main.py --bib references.bib
python main.py --bib references.bib --output report.md
"""
import argparse
import sys
from pathlib import Path
from datetime import datetime
from dataclasses import dataclass, field
from typing import List, Optional
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
import copy
from src.parser import BibParser
from src.fetcher import (
ArxivFetcher, CrossRefFetcher, DBLPFetcher,
SemanticScholarFetcher, OpenAlexFetcher, ScholarFetcher
)
from src.comparator import MetadataComparator, EntryReport, resolve_year, CURRENT_YEAR
from src.sanitizer import BibSanitizer
from src.local_db import LocalConferenceDB
from src.ui import BibUI
from src.utils import ProgressDisplay, TextNormalizer
@dataclass
class WorkflowStep:
name: str
enabled: bool = True
display_name: str = ""
priority: int = 0
@dataclass
class WorkflowConfig:
steps: List[WorkflowStep] = field(default_factory=list)
def get_enabled_steps(self) -> List[WorkflowStep]:
return sorted([s for s in self.steps if s.enabled], key=lambda x: x.priority)
def get_default_workflow() -> WorkflowConfig:
return WorkflowConfig(steps=[
WorkflowStep("arxiv_id", True, "arXiv by ID", 0),
WorkflowStep("crossref_doi", True, "CrossRef by DOI", 1),
WorkflowStep("semantic_scholar", True, "Semantic Scholar", 2),
WorkflowStep("dblp", True, "DBLP", 3),
WorkflowStep("openalex", True, "OpenAlex", 4),
WorkflowStep("arxiv_title", True, "arXiv by Title", 5),
WorkflowStep("crossref_title", True, "CrossRef by Title", 6),
WorkflowStep("google_scholar", False, "Google Scholar", 7),
])
def main():
parser = argparse.ArgumentParser(
description="BibGuard: Citation Fixer & Validator",
formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument("--bib", "-b", required=True, help="Path to .bib file")
parser.add_argument("--output", "-o", help="Output report path (optional)")
args = parser.parse_args()
bib_path = Path(args.bib)
if not bib_path.exists():
print(f"Error: Bib file not found: {args.bib}")
sys.exit(1)
workflow = get_default_workflow()
try:
run_fix_and_verify(bib_path, workflow)
except KeyboardInterrupt:
print("\nCancelled")
sys.exit(130)
def run_fix_and_verify(bib_path: Path, workflow):
"""Run validation, auto-fix issues, and verify."""
progress = ProgressDisplay()
bib_parser = BibParser()
ui = BibUI()
print(f"📚 BibGuard - Auto-Fix & Verify")
print(f" Target: {bib_path}\n")
# --- Pass 1: Validate & Fix ---
entries = bib_parser.parse_file(str(bib_path))
if not entries:
print("No entries found")
return
print(f"Found {len(entries)} entries. Running validation and auto-fix...\n")
# Initialize components
fetchers = {
'arxiv': ArxivFetcher(),
'crossref': CrossRefFetcher(),
'scholar': ScholarFetcher(),
'semantic': SemanticScholarFetcher(),
'openalex': OpenAlexFetcher(),
'dblp': DBLPFetcher(),
}
comparator = MetadataComparator()
sanitizer = BibSanitizer()
fixed_count = 0
updated_entries = []
fixed_details = {} # Key: entry_key, Value: list of changes
removed_details = [] # List of (entry_key, reason)
manual_review_queue = [] # List of (entry, best_result, candidates)
# --- Phase 0: Sanitize (Offline Checks) ---
print("🧹 Running formatting sanity checks...")
sanitize_fixes = sanitizer.sanitize_all(entries)
ui.show_sanitize_report(sanitize_fixes)
# If sanitization made changes, save immediately so Phase 1 works on clean data
if sanitize_fixes:
bib_parser.save_entries(str(bib_path), entries)
# Merge sanitize fixes into fixed_details for the final report
for key, fixes in sanitize_fixes.items():
if key not in fixed_details:
fixed_details[key] = []
for fix in fixes:
fixed_details[key].append(fix.description)
fixed_count += 1
# Duplicate detection
dupes = sanitizer.find_duplicates(entries)
if dupes:
print(f"\n⚠ Found {len(dupes)} duplicate title(s):")
for title, keys in dupes.items():
print(f" {' / '.join(keys)}")
print()
# --- Phase 0.5: Local DB Lookup ---
local_db = LocalConferenceDB()
local_db_loaded = local_db.load()
api_needed_entries = entries # Always verify against live/network sources.
if local_db_loaded:
local_matched_count = 0
for entry in entries:
official = local_db.lookup(entry.title)
if official:
local_matched_count += 1
if local_matched_count > 0:
print(f" 📚 Local DB matched: {local_matched_count}; still verifying all entries online")
# --- Phase 1: Analysis (API Fetch) ---
analysis_results = []
with progress.progress_context(len(api_needed_entries), "Analyzing Entries") as prog:
with ThreadPoolExecutor(max_workers=min(10, max(1, len(api_needed_entries)))) as executor:
futures = {executor.submit(validate_entry, e, workflow, fetchers, comparator): e for e in api_needed_entries}
for future in as_completed(futures):
entry = futures[future]
try:
best_result, candidates = future.result()
analysis_results.append((entry, best_result, candidates))
prog.update(entry.key, "Analyzed", 1)
except Exception as e:
prog.mark_error()
prog.update(entry.key, "Failed", 1)
# Keep valid entry even if fetch failed
analysis_results.append((entry, None, []))
# --- Phase 2: Meaningful Report ---
# Categorize results
to_fix = []
to_review = []
to_remove = []
ok_entries = []
for entry, best_result, candidates in analysis_results:
if not best_result:
ok_entries.append(entry)
continue
if best_result.is_match and best_result.fetched_data:
to_fix.append((entry, best_result, candidates))
elif candidates:
to_review.append((entry, best_result, candidates))
else:
to_remove.append(entry)
# Visualize Analysis Report
ui.show_analysis_report(ok_entries, to_fix, to_review, to_remove)
if not (to_fix or to_review or to_remove):
return
# --- Phase 3: Apply Fixes ---
print(f"\n🚀 Applying fixes...")
updated_entries = []
# Add OK entries first (preserve order if we cared, but we sort later usually)
updated_entries.extend(ok_entries)
# Process Fixes
for entry, best_result, candidates in to_fix:
changes = apply_fix(entry, best_result.fetched_data, all_candidates=candidates, allow_optional_updates=True)
if changes:
fixed_count += 1
fixed_details[entry.key] = changes
updated_entries.append(entry)
# Process Removals
for entry in to_remove:
removed_details.append((entry, "No matching metadata found in any source"))
# Do NOT add to updated_entries
# Process Reviews (Add to queue)
for item in to_review:
manual_review_queue.append(item)
updated_entries.append(item[0]) # Add tentatively, filter later if removed
# --- Interactive Manual Review ---
if manual_review_queue:
print(f"\n\n🔍 Manual Review Required for {len(manual_review_queue)} entries:")
# Sort by key for consistent order
manual_review_queue.sort(key=lambda x: x[0].key)
entries_to_remove = set()
for entry, best_res, candidates in manual_review_queue:
ui.show_manual_review(entry, best_res, candidates, apply_fix)
while True:
choice = input(f"\nSelect [1-{len(candidates)}], (s)kip, (r)emove, or (q)uit: ").strip().lower()
if choice == 'q':
print("Exiting manual review.")
# Keep remaining in queue as is (already in updated_entries)
break
elif choice == 's':
print("Skipped.")
break
elif choice == 'r':
print("Marked for removal.")
entries_to_remove.add(entry.key)
removed_details.append((entry, "Removed by user during manual review"))
break
elif choice.isdigit():
idx = int(choice) - 1
if 0 <= idx < len(candidates):
selected = candidates[idx]
if not _candidate_exact_match(selected):
print("Cannot apply: selected candidate is not an exact title/author/year match.")
continue
changes = apply_fix(entry, selected.fetched_data, allow_optional_updates=True)
if changes:
fixed_count += 1
if entry.key not in fixed_details: fixed_details[entry.key] = []
fixed_details[entry.key].extend(changes)
print(f"Applied: {', '.join(changes)}")
else:
print("No changes needed for selected source.")
break
else:
print("Invalid selection.")
else:
print("Invalid input.")
if choice == 'q':
break
# Filter out removed entries
if entries_to_remove:
updated_entries = [e for e in updated_entries if e.key not in entries_to_remove]
# Overwrite file if changes made
# Overwrite file if changes made (beyond Phase 0 sanitization)
has_phase1_changes = any(k not in sanitize_fixes for k in fixed_details) or removed_details
if has_phase1_changes or fixed_count > len(sanitize_fixes):
bib_parser.save_entries(str(bib_path), updated_entries)
# --- Pass 2: Double Check ---
print("\n🔄 Double checking (Re-validation)...")
entries = bib_parser.parse_file(str(bib_path))
reports = []
with progress.progress_context(len(entries), "Verifying") as prog:
with ThreadPoolExecutor(max_workers=min(10, len(entries))) as executor:
# Note: validate_entry now returns tuple, need to handle
futures = {executor.submit(validate_entry, e, workflow, fetchers, comparator): e for e in entries}
for future in as_completed(futures):
entry = futures[future]
try:
best_result, _ = future.result() # Ignore candidates in verify pass
reports.append(EntryReport(entry=entry, comparison=best_result))
if best_result.is_match:
prog.mark_success()
else:
prog.mark_error()
prog.update(entry.key, "Verified", 1)
except Exception:
prog.mark_error()
prog.update(entry.key, "Failed", 1)
# Summary
total = len(entries)
verified = sum(1 for r in reports if r.comparison and r.comparison.is_match)
issues = sum(1 for r in reports if r.comparison and r.comparison.has_issues)
not_found = sum(1 for r in reports if r.comparison and not r.comparison.is_match and not r.comparison.has_issues)
# Visual Final Status
ui.show_final_report(total, verified, issues, not_found, reports, fixed_count, fixed_details, removed_details)
print("")
def apply_local_fix(entry, official) -> list:
"""
Apply non-core fixes from local conference DB.
This never changes title, authors, or year; those fields define the
reference identity and must be verified against live metadata.
"""
changes = []
# Entry type upgrade: misc/article → inproceedings if booktitle exists
if official.booktitle and entry.entry_type.lower() in ('misc', 'article'):
old_type = entry.entry_type
entry.entry_type = 'inproceedings'
if 'ENTRYTYPE' in entry.raw_entry:
entry.raw_entry['ENTRYTYPE'] = 'inproceedings'
# Clear journal if it was arXiv
if entry.journal and 'arxiv' in entry.journal.lower():
entry.journal = ""
if 'journal' in entry.raw_entry:
del entry.raw_entry['journal']
changes.append(f"Type: @{old_type} → @inproceedings [local_db]")
# Booktitle: adopt from DB if missing or different
if official.booktitle and not entry.booktitle:
entry.booktitle = official.booktitle
entry.raw_entry['booktitle'] = official.booktitle
changes.append(f"Booktitle: [Added] {official.booktitle[:50]}... [local_db]")
# DOI: adopt if missing
if official.doi and not entry.doi:
entry.doi = official.doi
entry.raw_entry['doi'] = official.doi
changes.append(f"DOI: [Added] {official.doi} [local_db]")
return changes
def apply_fix(
entry,
data,
all_candidates=None,
*,
allow_core_updates: bool = False,
allow_optional_updates: bool = False,
) -> list:
"""Update only safe metadata by default.
Core identity fields (title, author, year) are not overwritten unless
allow_core_updates=True. RefCheck should validate references, not transform
a nearby candidate into a different citation.
"""
changes = []
# Helper to clean string
def clean(s): return str(s).strip() if s else ""
# Title
new_title = clean(data.title)
if new_title and new_title.lower() != entry.title.lower():
if allow_core_updates:
changes.append(f"Title: {entry.title} -> {new_title}")
entry.title = new_title
# Year: Use resolve_year() if we have multiple candidates
if allow_core_updates:
if all_candidates:
best_year, year_src = resolve_year(all_candidates, bib_year=entry.year)
if best_year and best_year != entry.year:
if int(best_year) > CURRENT_YEAR:
changes.append(f"⚠ Skip suspicious future year {best_year} from {year_src}")
else:
changes.append(f"Year: {entry.year} -> {best_year} [{year_src}]")
entry.year = best_year
else:
# Single candidate fallback
new_year = clean(getattr(data, 'year', ''))
if new_year and new_year != entry.year:
if new_year.isdigit() and int(new_year) > CURRENT_YEAR:
changes.append(f"⚠ Skip suspicious future year {new_year}")
else:
changes.append(f"Year: {entry.year} -> {new_year}")
entry.year = new_year
# Author: Smart Merge Strategy
# Check for author initial conflict first
has_initial_conflict = False
if all_candidates:
for cand in all_candidates:
if hasattr(cand, 'author_initial_conflict') and cand.author_initial_conflict:
has_initial_conflict = True
break
if not allow_core_updates:
pass
elif has_initial_conflict:
# Don't overwrite authors when initials conflict
changes.append(f"⚠ Author initial conflict detected — preserving bib authors")
else:
# Normal author merge logic
current_authors_raw = TextNormalizer.parse_author_list(entry.author)
current_authors_norm = [TextNormalizer.normalize_author_name(a) for a in current_authors_raw]
new_authors_list = getattr(data, 'authors', [])
if isinstance(new_authors_list, str):
new_authors_list = TextNormalizer.parse_author_list(new_authors_list)
# Strip DBLP disambiguation IDs from new authors
new_authors_list = [TextNormalizer.strip_dblp_disambiguation_id(str(a)) for a in new_authors_list]
# Also check if the EXISTING bib authors have DBLP disambiguation IDs baked in
for raw_auth in current_authors_raw:
if TextNormalizer.has_dblp_disambiguation_id(raw_auth.strip()):
changes.append(f"⚠ DBLP disambiguation ID detected in author: '{raw_auth.strip()}'")
final_authors = []
for new_auth in new_authors_list:
new_auth_str = str(new_auth).strip()
new_auth_norm = TextNormalizer.normalize_author_name(new_auth_str)
# Try to find a match in the existing list
match_found = False
for i, old_norm in enumerate(current_authors_norm):
if old_norm == new_auth_norm:
# Found a match! Use the OLD format
final_authors.append(current_authors_raw[i].strip())
match_found = True
break
if not match_found:
# New author, use the new string
final_authors.append(new_auth_str)
# Reconstruct the string
new_author_str = " and ".join(final_authors)
# Check if the result is effectively different from the original full string
def simple_norm(s): return s.lower().replace(" ", "").strip()
if simple_norm(new_author_str) != simple_norm(entry.author):
old_auth = (entry.author[:50] + '...') if len(entry.author) > 50 else entry.author
new_auth_disp = (new_author_str[:50] + '...') if len(new_author_str) > 50 else new_author_str
changes.append(f"Author: {old_auth} -> {new_auth_disp}")
entry.author = new_author_str
# Optional fields (doi, journal, etc.)
if allow_optional_updates and hasattr(data, 'doi') and data.doi and not entry.doi:
changes.append(f"DOI: [Added] {data.doi}")
entry.doi = data.doi
return changes
def _candidate_exact_match(candidate) -> bool:
return bool(
candidate
and getattr(candidate, "is_match", False)
and getattr(candidate, "title_match", False)
and getattr(candidate, "author_match", False)
and getattr(candidate, "year_match", False)
and not getattr(candidate, "author_initial_conflict", False)
)
def validate_entry(entry, workflow, fetchers, comparator):
"""Validate a single entry against configured data sources. Returns (best_result, all_results)."""
from src.utils import TextNormalizer
results = []
for step in workflow.get_enabled_steps():
result = None
data = None
if step.name == "arxiv_id" and entry.has_arxiv:
data = fetchers['arxiv'].fetch_by_id(entry.arxiv_id)
if data: result = comparator.compare(entry, data, "arxiv")
elif step.name == "crossref_doi" and entry.doi:
data = fetchers['crossref'].search_by_doi(entry.doi)
if data:
# DOI cross-validation: check if the DOI actually resolves to this paper
from src.sanitizer import BibSanitizer
doi_fixes = BibSanitizer().check_doi_title_match(entry, data)
if doi_fixes:
# DOI points to a different work — skip this result
# The fixes have already cleared the bad DOI from the entry
result = None
else:
result = comparator.compare(entry, data, "crossref")
elif step.name == "semantic_scholar" and entry.title:
data = fetchers['semantic'].fetch_by_doi(entry.doi) if entry.doi else None
if not data:
data = fetchers['semantic'].search_by_title(entry.title)
if data: result = comparator.compare(entry, data, "semantic_scholar")
elif step.name == "dblp" and entry.title:
data = fetchers['dblp'].search_by_title(entry.title)
if data: result = comparator.compare(entry, data, "dblp")
elif step.name == "openalex" and entry.title:
data = fetchers['openalex'].fetch_by_doi(entry.doi) if entry.doi else None
if not data:
data = fetchers['openalex'].search_by_title(entry.title)
if data: result = comparator.compare(entry, data, "openalex")
elif step.name == "arxiv_title" and entry.title:
metas = fetchers['arxiv'].search_by_title(entry.title)
if metas:
norm1 = TextNormalizer.normalize_for_comparison(entry.title)
best, best_sim = None, 0
for m in metas:
sim = TextNormalizer.similarity_ratio(
norm1, TextNormalizer.normalize_for_comparison(m.title)
)
if sim > best_sim:
best, best_sim = m, sim
if best and best_sim > 0.5:
result = comparator.compare(entry, best, "arxiv")
elif step.name == "crossref_title" and entry.title:
data = fetchers['crossref'].search_by_title(entry.title)
if data: result = comparator.compare(entry, data, "crossref")
elif step.name == "google_scholar" and entry.title:
data = fetchers['scholar'].search_by_title(entry.title)
if data: result = comparator.compare(entry, data, "scholar")
if result:
result.evidence_step = step.name
result.evidence_url = getattr(data, "url", "") if data else ""
results.append(result)
if results:
best = max(results, key=lambda r: r.confidence)
_apply_cross_source_conflict_guard(best, results)
_apply_evidence_guard(best, results)
return best, results
# No results
return comparator.create_unable_result(entry, "Not found in any data source"), []
def _apply_cross_source_conflict_guard(best, results) -> None:
"""Reject candidates when exact-title sources disagree on core metadata."""
if not best or not getattr(best, "fetched_title", ""):
return
conflicts = []
for result in results:
if result is best:
continue
if getattr(result, "title_similarity", 0.0) < 0.95:
continue
best_year = str(getattr(best, "fetched_year", "") or "").strip()
other_year = str(getattr(result, "fetched_year", "") or "").strip()
if best_year and other_year and best_year != other_year:
conflicts.append(f"{result.source}={other_year}")
if not conflicts:
return
issue = (
f"Cross-source year conflict: best {best.source}={best.fetched_year}, "
f"also found {'; '.join(dict.fromkeys(conflicts))}"
)
if issue not in best.issues:
best.issues.append(issue)
best.is_match = False
best.confidence = min(best.confidence, 0.8)
def _apply_evidence_guard(best, results) -> None:
"""Require primary evidence or at least two agreeing exact sources."""
if not best or not getattr(best, "is_match", False):
return
evidence_step = getattr(best, "evidence_step", "")
if evidence_step in {"arxiv_id", "arxiv_title", "crossref_doi"}:
return
best_year = str(getattr(best, "fetched_year", "") or "").strip()
agreeing_sources = {getattr(best, "source", "")}
for result in results:
if result is best or not getattr(result, "is_match", False):
continue
if getattr(result, "title_similarity", 0.0) < 0.95:
continue
other_year = str(getattr(result, "fetched_year", "") or "").strip()
if best_year and other_year == best_year:
agreeing_sources.add(getattr(result, "source", ""))
if len(agreeing_sources) >= 2:
return
issue = (
"Insufficient evidence: exact match found only in "
f"{best.source}; needs arXiv/DOI evidence or another agreeing source"
)
if issue not in best.issues:
best.issues.append(issue)
best.is_match = False
best.confidence = min(best.confidence, 0.8)
if __name__ == "__main__":
main()