import re import io import os import sys import csv import signal import requests import tkinter as tk from tkinter import filedialog from bs4 import BeautifulSoup from urllib.parse import urljoin from pdfminer.high_level import extract_text_to_fp from pdfminer.layout import LAParams from openpyxl import load_workbook, Workbook HEADERS = { "User-Agent": "Mozilla/5.0 (compatible; DOI-Email-Finder/1.0)" } EMAIL_REGEX = re.compile( r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}" ) PUBLISHER_EMAIL_DOMAINS = { "elsevier.com", "springer.com", "springernature.com", "nature.com", "wiley.com", "tandfonline.com", "oxfordjournals.org", "oup.com", "cambridge.org", "cambridgejournals.org", "ieee.org", "acm.org", "sciencedirect.com", "mdpi.com", "frontiersin.org", "hindawi.com", "plos.org", } LOCALPART_BLACKLIST = { "editor", "editors", "support", "info", "contact", "office", "help", "admin", "journal" } def normalize_doi(raw): if not raw: return "" raw = raw.strip() return re.sub(r"^https?://(dx\.)?doi\.org/", "", raw, flags=re.I) def filter_publisher_emails(emails_with_source): filtered = set() for email, source in emails_with_source: try: local, domain = email.lower().split("@", 1) except ValueError: continue if local in LOCALPART_BLACKLIST: continue if any(domain.endswith(p) for p in PUBLISHER_EMAIL_DOMAINS): continue filtered.add((email, source)) return filtered def get_crossref_emails(doi): try: r = requests.get( f"https://api.crossref.org/works/{doi}", headers=HEADERS, timeout=10 ) r.raise_for_status() except Exception: return set() emails = set() for author in r.json().get("message", {}).get("author", []): if "email" in author: emails.add((author["email"], "crossref")) return emails def extract_emails_from_html(url): try: r = requests.get(url, headers=HEADERS, timeout=10) r.raise_for_status() except Exception: return set() return {(e, "website") for e in EMAIL_REGEX.findall(r.text)} def get_scihub_pdf(doi, mirror_url=None): if not mirror_url: return None try: url = mirror_url + doi r = requests.get(url, headers=HEADERS, timeout=10) r.raise_for_status() soup = BeautifulSoup(r.text, "html.parser") obj_tag = soup.find("object", type="application/pdf") if obj_tag and obj_tag.get("data"): return urljoin(mirror_url, obj_tag["data"]) dl_tag = soup.select_one("div.download a[href]") if dl_tag: return urljoin(mirror_url, dl_tag["href"]) except Exception: return None return None def find_pdf_url(soup, base_url): if not soup: return None for a in soup.find_all("a", href=True): if ".pdf" in a["href"].lower(): return urljoin(base_url, a["href"]) return None def extract_emails_from_pdf(pdf_url, source): try: r = requests.get(pdf_url, headers=HEADERS, timeout=15) r.raise_for_status() except Exception: return set() emails = set() output_string = io.StringIO() try: extract_text_to_fp(io.BytesIO(r.content), output_string, laparams=LAParams(), output_type='text') text = output_string.getvalue() for e in EMAIL_REGEX.findall(text): emails.add((e, source)) except Exception: pass return emails def find_author_emails(doi, scihub_mirror=None): emails = set() emails |= get_crossref_emails(doi) # DOI landing page try: r = requests.get(f"https://doi.org/{doi}", headers=HEADERS, allow_redirects=True, timeout=10) r.raise_for_status() soup = BeautifulSoup(r.text, "html.parser") emails |= extract_emails_from_html(r.url) except Exception: soup = None # Sci-Hub first scihub_pdf = get_scihub_pdf(doi, scihub_mirror) if scihub_pdf: emails |= extract_emails_from_pdf(scihub_pdf, "pdf (Sci-Hub)") else: pdf_url = find_pdf_url(soup, r.url) if soup else None if pdf_url: emails |= extract_emails_from_pdf(pdf_url, "pdf (landing page)") return filter_publisher_emails(emails) def read_input_file(path): if path.lower().endswith(".csv"): with open(path, newline="", encoding="utf-8") as f: reader = csv.DictReader(f) return list(reader), "csv" if path.lower().endswith(".xlsx"): wb = load_workbook(path) ws = wb.active headers = [c.value for c in ws[1]] rows = [] for row in ws.iter_rows(min_row=2, values_only=True): rows.append(dict(zip(headers, row))) return rows, "xlsx" raise ValueError("Unsupported file type") def get_safe_output_path(input_path, filetype): base = input_path.rsplit(".", 1)[0] + "_with_emails" output_path = f"{base}.{filetype}" counter = 1 while os.path.exists(output_path): output_path = f"{base}_{counter}.{filetype}" counter += 1 return output_path def write_output_file(rows, input_path, filetype): output_path = get_safe_output_path(input_path, filetype) try: if filetype == "csv": with open(output_path, "w", newline="", encoding="utf-8") as f: writer = csv.DictWriter(f, fieldnames=rows[0].keys()) writer.writeheader() writer.writerows(rows) else: wb = Workbook() ws = wb.active ws.append(list(rows[0].keys())) for row in rows: ws.append(list(row.values())) wb.save(output_path) print(f"\nDone.\nOutput written to: {output_path}") except PermissionError: counter = 1 while True: try: alt_path = output_path.rsplit(".", 1)[0] + f"_{counter}.{filetype}" if filetype == "csv": with open(alt_path, "w", newline="", encoding="utf-8") as f: writer = csv.DictWriter(f, fieldnames=rows[0].keys()) writer.writeheader() writer.writerows(rows) else: wb.save(alt_path) print(f"\nDone.\nOutput written to: {alt_path}") break except PermissionError: counter += 1 def setup_ctrlc_handler(rows, input_path, filetype): def handler(signum, frame): print("\n[INFO] Ctrl+C detected. Saving progress...") write_output_file(rows, input_path, filetype) print("[INFO] Partial results saved. Exiting.") sys.exit(0) signal.signal(signal.SIGINT, handler) def get_scihub_mirror_from_user(): mirror = input("Enter Sci-Hub mirror URL (e.g., sci-hub.st or https://sci-hub.st/) or leave blank to skip: ").strip() if not mirror: return None if not mirror.startswith("http"): mirror = "https://" + mirror if not mirror.endswith("/"): mirror += "/" return mirror if __name__ == "__main__": root = tk.Tk() root.withdraw() file_path = filedialog.askopenfilename( title="Select CSV or XLSX file", filetypes=[("CSV / Excel", "*.csv *.xlsx")] ) if not file_path: print("No file selected.") raise SystemExit scihub_mirror = get_scihub_mirror_from_user() rows, filetype = read_input_file(file_path) print(f"Processing {len(rows)} DOI(s)...\n") setup_ctrlc_handler(rows, file_path, filetype) for i, row in enumerate(rows, start=1): doi = normalize_doi(row.get("DOI", "")) if doi: print(f"[{i}/{len(rows)}] Processing DOI: {doi}") emails = sorted({e for e, _ in find_author_emails(doi, scihub_mirror)}) print(f" Found {len(emails)} email(s): {', '.join(emails) if emails else 'None'}") row["author_emails"] = "; ".join(emails) else: print(f"[{i}/{len(rows)}] DOI missing or empty") row["author_emails"] = "" write_output_file(rows, file_path, filetype)