| |
| """ |
| BibGuard - Citation Hallucination Detector |
| |
| Validates bibliography entries against multiple academic data sources: |
| arXiv, CrossRef, DBLP, Semantic Scholar, OpenAlex, and Google Scholar |
| |
| Usage: |
| python main.py --bib references.bib |
| python main.py --bib references.bib --output report.md |
| """ |
| import argparse |
| import sys |
| from pathlib import Path |
| from datetime import datetime |
| from dataclasses import dataclass, field |
| from typing import List, Optional |
| from concurrent.futures import ThreadPoolExecutor, as_completed |
| import threading |
| import copy |
|
|
| from src.parser import BibParser |
| from src.fetcher import ( |
| ArxivFetcher, CrossRefFetcher, DBLPFetcher, |
| SemanticScholarFetcher, OpenAlexFetcher, ScholarFetcher |
| ) |
| from src.comparator import MetadataComparator, EntryReport, resolve_year, CURRENT_YEAR |
| from src.sanitizer import BibSanitizer |
| from src.local_db import LocalConferenceDB |
| from src.ui import BibUI |
| from src.utils import ProgressDisplay, TextNormalizer |
| @dataclass |
| class WorkflowStep: |
| name: str |
| enabled: bool = True |
| display_name: str = "" |
| priority: int = 0 |
|
|
| @dataclass |
| class WorkflowConfig: |
| steps: List[WorkflowStep] = field(default_factory=list) |
| def get_enabled_steps(self) -> List[WorkflowStep]: |
| return sorted([s for s in self.steps if s.enabled], key=lambda x: x.priority) |
|
|
| def get_default_workflow() -> WorkflowConfig: |
| return WorkflowConfig(steps=[ |
| WorkflowStep("arxiv_id", True, "arXiv by ID", 0), |
| WorkflowStep("crossref_doi", True, "CrossRef by DOI", 1), |
| WorkflowStep("semantic_scholar", True, "Semantic Scholar", 2), |
| WorkflowStep("dblp", True, "DBLP", 3), |
| WorkflowStep("openalex", True, "OpenAlex", 4), |
| WorkflowStep("arxiv_title", True, "arXiv by Title", 5), |
| WorkflowStep("crossref_title", True, "CrossRef by Title", 6), |
| WorkflowStep("google_scholar", False, "Google Scholar", 7), |
| ]) |
|
|
| def main(): |
| parser = argparse.ArgumentParser( |
| description="BibGuard: Citation Fixer & Validator", |
| formatter_class=argparse.RawDescriptionHelpFormatter |
| ) |
| |
| parser.add_argument("--bib", "-b", required=True, help="Path to .bib file") |
| parser.add_argument("--output", "-o", help="Output report path (optional)") |
| |
| args = parser.parse_args() |
| |
| bib_path = Path(args.bib) |
| if not bib_path.exists(): |
| print(f"Error: Bib file not found: {args.bib}") |
| sys.exit(1) |
| |
| workflow = get_default_workflow() |
| |
| try: |
| run_fix_and_verify(bib_path, workflow) |
| except KeyboardInterrupt: |
| print("\nCancelled") |
| sys.exit(130) |
|
|
|
|
| def run_fix_and_verify(bib_path: Path, workflow): |
| """Run validation, auto-fix issues, and verify.""" |
| progress = ProgressDisplay() |
| bib_parser = BibParser() |
| ui = BibUI() |
| |
| print(f"📚 BibGuard - Auto-Fix & Verify") |
| print(f" Target: {bib_path}\n") |
| |
| |
| entries = bib_parser.parse_file(str(bib_path)) |
| if not entries: |
| print("No entries found") |
| return |
|
|
| print(f"Found {len(entries)} entries. Running validation and auto-fix...\n") |
| |
| |
| fetchers = { |
| 'arxiv': ArxivFetcher(), |
| 'crossref': CrossRefFetcher(), |
| 'scholar': ScholarFetcher(), |
| 'semantic': SemanticScholarFetcher(), |
| 'openalex': OpenAlexFetcher(), |
| 'dblp': DBLPFetcher(), |
| } |
| comparator = MetadataComparator() |
| sanitizer = BibSanitizer() |
| |
| fixed_count = 0 |
| updated_entries = [] |
| fixed_details = {} |
| removed_details = [] |
| manual_review_queue = [] |
| |
| |
| print("🧹 Running formatting sanity checks...") |
| sanitize_fixes = sanitizer.sanitize_all(entries) |
| ui.show_sanitize_report(sanitize_fixes) |
| |
| |
| if sanitize_fixes: |
| bib_parser.save_entries(str(bib_path), entries) |
| |
| for key, fixes in sanitize_fixes.items(): |
| if key not in fixed_details: |
| fixed_details[key] = [] |
| for fix in fixes: |
| fixed_details[key].append(fix.description) |
| fixed_count += 1 |
| |
| |
| dupes = sanitizer.find_duplicates(entries) |
| if dupes: |
| print(f"\n⚠ Found {len(dupes)} duplicate title(s):") |
| for title, keys in dupes.items(): |
| print(f" {' / '.join(keys)}") |
| print() |
|
|
| |
| local_db = LocalConferenceDB() |
| local_db_loaded = local_db.load() |
| |
| api_needed_entries = entries |
| if local_db_loaded: |
| local_matched_count = 0 |
| for entry in entries: |
| official = local_db.lookup(entry.title) |
| if official: |
| local_matched_count += 1 |
| |
| if local_matched_count > 0: |
| print(f" 📚 Local DB matched: {local_matched_count}; still verifying all entries online") |
| |
| |
| analysis_results = [] |
| |
| with progress.progress_context(len(api_needed_entries), "Analyzing Entries") as prog: |
| with ThreadPoolExecutor(max_workers=min(10, max(1, len(api_needed_entries)))) as executor: |
| futures = {executor.submit(validate_entry, e, workflow, fetchers, comparator): e for e in api_needed_entries} |
| |
| for future in as_completed(futures): |
| entry = futures[future] |
| try: |
| best_result, candidates = future.result() |
| analysis_results.append((entry, best_result, candidates)) |
| prog.update(entry.key, "Analyzed", 1) |
| except Exception as e: |
| prog.mark_error() |
| prog.update(entry.key, "Failed", 1) |
| |
| analysis_results.append((entry, None, [])) |
|
|
| |
| |
| to_fix = [] |
| to_review = [] |
| to_remove = [] |
| ok_entries = [] |
| |
| for entry, best_result, candidates in analysis_results: |
| if not best_result: |
| ok_entries.append(entry) |
| continue |
| |
| if best_result.is_match and best_result.fetched_data: |
| to_fix.append((entry, best_result, candidates)) |
| elif candidates: |
| to_review.append((entry, best_result, candidates)) |
| else: |
| to_remove.append(entry) |
| |
| |
| ui.show_analysis_report(ok_entries, to_fix, to_review, to_remove) |
| |
| if not (to_fix or to_review or to_remove): |
| return |
|
|
| |
| print(f"\n🚀 Applying fixes...") |
| |
| updated_entries = [] |
| |
| updated_entries.extend(ok_entries) |
| |
| |
| for entry, best_result, candidates in to_fix: |
| changes = apply_fix(entry, best_result.fetched_data, all_candidates=candidates, allow_optional_updates=True) |
| if changes: |
| fixed_count += 1 |
| fixed_details[entry.key] = changes |
| updated_entries.append(entry) |
| |
| |
| for entry in to_remove: |
| removed_details.append((entry, "No matching metadata found in any source")) |
| |
| |
| |
| for item in to_review: |
| manual_review_queue.append(item) |
| updated_entries.append(item[0]) |
|
|
|
|
| |
| if manual_review_queue: |
| print(f"\n\n🔍 Manual Review Required for {len(manual_review_queue)} entries:") |
| |
| |
| manual_review_queue.sort(key=lambda x: x[0].key) |
| |
| entries_to_remove = set() |
| |
| for entry, best_res, candidates in manual_review_queue: |
| ui.show_manual_review(entry, best_res, candidates, apply_fix) |
| |
| while True: |
| choice = input(f"\nSelect [1-{len(candidates)}], (s)kip, (r)emove, or (q)uit: ").strip().lower() |
| |
| if choice == 'q': |
| print("Exiting manual review.") |
| |
| break |
| elif choice == 's': |
| print("Skipped.") |
| break |
| elif choice == 'r': |
| print("Marked for removal.") |
| entries_to_remove.add(entry.key) |
| removed_details.append((entry, "Removed by user during manual review")) |
| break |
| elif choice.isdigit(): |
| idx = int(choice) - 1 |
| if 0 <= idx < len(candidates): |
| selected = candidates[idx] |
| if not _candidate_exact_match(selected): |
| print("Cannot apply: selected candidate is not an exact title/author/year match.") |
| continue |
| changes = apply_fix(entry, selected.fetched_data, allow_optional_updates=True) |
| if changes: |
| fixed_count += 1 |
| if entry.key not in fixed_details: fixed_details[entry.key] = [] |
| fixed_details[entry.key].extend(changes) |
| print(f"Applied: {', '.join(changes)}") |
| else: |
| print("No changes needed for selected source.") |
| break |
| else: |
| print("Invalid selection.") |
| else: |
| print("Invalid input.") |
| |
| if choice == 'q': |
| break |
| |
| |
| if entries_to_remove: |
| updated_entries = [e for e in updated_entries if e.key not in entries_to_remove] |
|
|
| |
| |
| has_phase1_changes = any(k not in sanitize_fixes for k in fixed_details) or removed_details |
| if has_phase1_changes or fixed_count > len(sanitize_fixes): |
| bib_parser.save_entries(str(bib_path), updated_entries) |
|
|
|
|
| |
| print("\n🔄 Double checking (Re-validation)...") |
| |
| entries = bib_parser.parse_file(str(bib_path)) |
| reports = [] |
| |
| with progress.progress_context(len(entries), "Verifying") as prog: |
| with ThreadPoolExecutor(max_workers=min(10, len(entries))) as executor: |
| |
| futures = {executor.submit(validate_entry, e, workflow, fetchers, comparator): e for e in entries} |
| |
| for future in as_completed(futures): |
| entry = futures[future] |
| try: |
| best_result, _ = future.result() |
| reports.append(EntryReport(entry=entry, comparison=best_result)) |
| |
| if best_result.is_match: |
| prog.mark_success() |
| else: |
| prog.mark_error() |
| prog.update(entry.key, "Verified", 1) |
| except Exception: |
| prog.mark_error() |
| prog.update(entry.key, "Failed", 1) |
| |
| |
| total = len(entries) |
| verified = sum(1 for r in reports if r.comparison and r.comparison.is_match) |
| issues = sum(1 for r in reports if r.comparison and r.comparison.has_issues) |
| not_found = sum(1 for r in reports if r.comparison and not r.comparison.is_match and not r.comparison.has_issues) |
| |
| |
| |
| ui.show_final_report(total, verified, issues, not_found, reports, fixed_count, fixed_details, removed_details) |
| print("") |
|
|
| def apply_local_fix(entry, official) -> list: |
| """ |
| Apply non-core fixes from local conference DB. |
| This never changes title, authors, or year; those fields define the |
| reference identity and must be verified against live metadata. |
| """ |
| changes = [] |
| |
| |
| if official.booktitle and entry.entry_type.lower() in ('misc', 'article'): |
| old_type = entry.entry_type |
| entry.entry_type = 'inproceedings' |
| if 'ENTRYTYPE' in entry.raw_entry: |
| entry.raw_entry['ENTRYTYPE'] = 'inproceedings' |
| |
| if entry.journal and 'arxiv' in entry.journal.lower(): |
| entry.journal = "" |
| if 'journal' in entry.raw_entry: |
| del entry.raw_entry['journal'] |
| changes.append(f"Type: @{old_type} → @inproceedings [local_db]") |
| |
| |
| if official.booktitle and not entry.booktitle: |
| entry.booktitle = official.booktitle |
| entry.raw_entry['booktitle'] = official.booktitle |
| changes.append(f"Booktitle: [Added] {official.booktitle[:50]}... [local_db]") |
| |
| |
| if official.doi and not entry.doi: |
| entry.doi = official.doi |
| entry.raw_entry['doi'] = official.doi |
| changes.append(f"DOI: [Added] {official.doi} [local_db]") |
| |
| return changes |
|
|
|
|
| def apply_fix( |
| entry, |
| data, |
| all_candidates=None, |
| *, |
| allow_core_updates: bool = False, |
| allow_optional_updates: bool = False, |
| ) -> list: |
| """Update only safe metadata by default. |
| |
| Core identity fields (title, author, year) are not overwritten unless |
| allow_core_updates=True. RefCheck should validate references, not transform |
| a nearby candidate into a different citation. |
| """ |
| changes = [] |
| |
| |
| def clean(s): return str(s).strip() if s else "" |
| |
| |
| new_title = clean(data.title) |
| if new_title and new_title.lower() != entry.title.lower(): |
| if allow_core_updates: |
| changes.append(f"Title: {entry.title} -> {new_title}") |
| entry.title = new_title |
| |
| |
| if allow_core_updates: |
| if all_candidates: |
| best_year, year_src = resolve_year(all_candidates, bib_year=entry.year) |
| if best_year and best_year != entry.year: |
| if int(best_year) > CURRENT_YEAR: |
| changes.append(f"⚠ Skip suspicious future year {best_year} from {year_src}") |
| else: |
| changes.append(f"Year: {entry.year} -> {best_year} [{year_src}]") |
| entry.year = best_year |
| else: |
| |
| new_year = clean(getattr(data, 'year', '')) |
| if new_year and new_year != entry.year: |
| if new_year.isdigit() and int(new_year) > CURRENT_YEAR: |
| changes.append(f"⚠ Skip suspicious future year {new_year}") |
| else: |
| changes.append(f"Year: {entry.year} -> {new_year}") |
| entry.year = new_year |
| |
| |
| |
| has_initial_conflict = False |
| if all_candidates: |
| for cand in all_candidates: |
| if hasattr(cand, 'author_initial_conflict') and cand.author_initial_conflict: |
| has_initial_conflict = True |
| break |
| |
| if not allow_core_updates: |
| pass |
| elif has_initial_conflict: |
| |
| changes.append(f"⚠ Author initial conflict detected — preserving bib authors") |
| else: |
| |
| current_authors_raw = TextNormalizer.parse_author_list(entry.author) |
| current_authors_norm = [TextNormalizer.normalize_author_name(a) for a in current_authors_raw] |
| |
| new_authors_list = getattr(data, 'authors', []) |
| if isinstance(new_authors_list, str): |
| new_authors_list = TextNormalizer.parse_author_list(new_authors_list) |
| |
| |
| new_authors_list = [TextNormalizer.strip_dblp_disambiguation_id(str(a)) for a in new_authors_list] |
| |
| |
| for raw_auth in current_authors_raw: |
| if TextNormalizer.has_dblp_disambiguation_id(raw_auth.strip()): |
| changes.append(f"⚠ DBLP disambiguation ID detected in author: '{raw_auth.strip()}'") |
| |
| final_authors = [] |
| |
| for new_auth in new_authors_list: |
| new_auth_str = str(new_auth).strip() |
| new_auth_norm = TextNormalizer.normalize_author_name(new_auth_str) |
| |
| |
| match_found = False |
| for i, old_norm in enumerate(current_authors_norm): |
| if old_norm == new_auth_norm: |
| |
| final_authors.append(current_authors_raw[i].strip()) |
| match_found = True |
| break |
| |
| if not match_found: |
| |
| final_authors.append(new_auth_str) |
| |
| |
| new_author_str = " and ".join(final_authors) |
| |
| |
| def simple_norm(s): return s.lower().replace(" ", "").strip() |
| |
| if simple_norm(new_author_str) != simple_norm(entry.author): |
| old_auth = (entry.author[:50] + '...') if len(entry.author) > 50 else entry.author |
| new_auth_disp = (new_author_str[:50] + '...') if len(new_author_str) > 50 else new_author_str |
| changes.append(f"Author: {old_auth} -> {new_auth_disp}") |
| entry.author = new_author_str |
| |
| |
| if allow_optional_updates and hasattr(data, 'doi') and data.doi and not entry.doi: |
| changes.append(f"DOI: [Added] {data.doi}") |
| entry.doi = data.doi |
|
|
| return changes |
|
|
|
|
| def _candidate_exact_match(candidate) -> bool: |
| return bool( |
| candidate |
| and getattr(candidate, "is_match", False) |
| and getattr(candidate, "title_match", False) |
| and getattr(candidate, "author_match", False) |
| and getattr(candidate, "year_match", False) |
| and not getattr(candidate, "author_initial_conflict", False) |
| ) |
|
|
|
|
| def validate_entry(entry, workflow, fetchers, comparator): |
| """Validate a single entry against configured data sources. Returns (best_result, all_results).""" |
| from src.utils import TextNormalizer |
| |
| results = [] |
| |
| for step in workflow.get_enabled_steps(): |
| result = None |
| data = None |
| |
| if step.name == "arxiv_id" and entry.has_arxiv: |
| data = fetchers['arxiv'].fetch_by_id(entry.arxiv_id) |
| if data: result = comparator.compare(entry, data, "arxiv") |
| |
| elif step.name == "crossref_doi" and entry.doi: |
| data = fetchers['crossref'].search_by_doi(entry.doi) |
| if data: |
| |
| from src.sanitizer import BibSanitizer |
| doi_fixes = BibSanitizer().check_doi_title_match(entry, data) |
| if doi_fixes: |
| |
| |
| result = None |
| else: |
| result = comparator.compare(entry, data, "crossref") |
| |
| elif step.name == "semantic_scholar" and entry.title: |
| data = fetchers['semantic'].fetch_by_doi(entry.doi) if entry.doi else None |
| if not data: |
| data = fetchers['semantic'].search_by_title(entry.title) |
| if data: result = comparator.compare(entry, data, "semantic_scholar") |
| |
| elif step.name == "dblp" and entry.title: |
| data = fetchers['dblp'].search_by_title(entry.title) |
| if data: result = comparator.compare(entry, data, "dblp") |
| |
| elif step.name == "openalex" and entry.title: |
| data = fetchers['openalex'].fetch_by_doi(entry.doi) if entry.doi else None |
| if not data: |
| data = fetchers['openalex'].search_by_title(entry.title) |
| if data: result = comparator.compare(entry, data, "openalex") |
| |
| elif step.name == "arxiv_title" and entry.title: |
| metas = fetchers['arxiv'].search_by_title(entry.title) |
| if metas: |
| norm1 = TextNormalizer.normalize_for_comparison(entry.title) |
| best, best_sim = None, 0 |
| for m in metas: |
| sim = TextNormalizer.similarity_ratio( |
| norm1, TextNormalizer.normalize_for_comparison(m.title) |
| ) |
| if sim > best_sim: |
| best, best_sim = m, sim |
| if best and best_sim > 0.5: |
| result = comparator.compare(entry, best, "arxiv") |
| |
| elif step.name == "crossref_title" and entry.title: |
| data = fetchers['crossref'].search_by_title(entry.title) |
| if data: result = comparator.compare(entry, data, "crossref") |
| |
| elif step.name == "google_scholar" and entry.title: |
| data = fetchers['scholar'].search_by_title(entry.title) |
| if data: result = comparator.compare(entry, data, "scholar") |
| |
| if result: |
| result.evidence_step = step.name |
| result.evidence_url = getattr(data, "url", "") if data else "" |
| results.append(result) |
| |
| if results: |
| best = max(results, key=lambda r: r.confidence) |
| _apply_cross_source_conflict_guard(best, results) |
| _apply_evidence_guard(best, results) |
| return best, results |
| |
| |
| return comparator.create_unable_result(entry, "Not found in any data source"), [] |
|
|
|
|
| def _apply_cross_source_conflict_guard(best, results) -> None: |
| """Reject candidates when exact-title sources disagree on core metadata.""" |
| if not best or not getattr(best, "fetched_title", ""): |
| return |
|
|
| conflicts = [] |
| for result in results: |
| if result is best: |
| continue |
| if getattr(result, "title_similarity", 0.0) < 0.95: |
| continue |
|
|
| best_year = str(getattr(best, "fetched_year", "") or "").strip() |
| other_year = str(getattr(result, "fetched_year", "") or "").strip() |
| if best_year and other_year and best_year != other_year: |
| conflicts.append(f"{result.source}={other_year}") |
|
|
| if not conflicts: |
| return |
|
|
| issue = ( |
| f"Cross-source year conflict: best {best.source}={best.fetched_year}, " |
| f"also found {'; '.join(dict.fromkeys(conflicts))}" |
| ) |
| if issue not in best.issues: |
| best.issues.append(issue) |
| best.is_match = False |
| best.confidence = min(best.confidence, 0.8) |
|
|
|
|
| def _apply_evidence_guard(best, results) -> None: |
| """Require primary evidence or at least two agreeing exact sources.""" |
| if not best or not getattr(best, "is_match", False): |
| return |
|
|
| evidence_step = getattr(best, "evidence_step", "") |
| if evidence_step in {"arxiv_id", "arxiv_title", "crossref_doi"}: |
| return |
|
|
| best_year = str(getattr(best, "fetched_year", "") or "").strip() |
| agreeing_sources = {getattr(best, "source", "")} |
| for result in results: |
| if result is best or not getattr(result, "is_match", False): |
| continue |
| if getattr(result, "title_similarity", 0.0) < 0.95: |
| continue |
| other_year = str(getattr(result, "fetched_year", "") or "").strip() |
| if best_year and other_year == best_year: |
| agreeing_sources.add(getattr(result, "source", "")) |
|
|
| if len(agreeing_sources) >= 2: |
| return |
|
|
| issue = ( |
| "Insufficient evidence: exact match found only in " |
| f"{best.source}; needs arXiv/DOI evidence or another agreeing source" |
| ) |
| if issue not in best.issues: |
| best.issues.append(issue) |
| best.is_match = False |
| best.confidence = min(best.confidence, 0.8) |
|
|
|
|
|
|
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|