This commit is contained in:
Simeon Wallrath 2026-01-21 16:41:43 +01:00
commit 791398168c
3 changed files with 303 additions and 0 deletions

277
DOIScraper.py Normal file
View file

@ -0,0 +1,277 @@
import re
import io
import os
import sys
import csv
import signal
import requests
import tkinter as tk
from tkinter import filedialog
from bs4 import BeautifulSoup
from urllib.parse import urljoin
from pdfminer.high_level import extract_text_to_fp
from pdfminer.layout import LAParams
from openpyxl import load_workbook, Workbook
HEADERS = {
"User-Agent": "Mozilla/5.0 (compatible; DOI-Email-Finder/1.0)"
}
EMAIL_REGEX = re.compile(
r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}"
)
PUBLISHER_EMAIL_DOMAINS = {
"elsevier.com", "springer.com", "springernature.com", "nature.com",
"wiley.com", "tandfonline.com", "oxfordjournals.org", "oup.com",
"cambridge.org", "cambridgejournals.org", "ieee.org", "acm.org",
"sciencedirect.com", "mdpi.com", "frontiersin.org", "hindawi.com", "plos.org",
}
LOCALPART_BLACKLIST = {
"editor", "editors", "support", "info", "contact",
"office", "help", "admin", "journal"
}
def normalize_doi(raw):
if not raw:
return ""
raw = raw.strip()
return re.sub(r"^https?://(dx\.)?doi\.org/", "", raw, flags=re.I)
def filter_publisher_emails(emails_with_source):
filtered = set()
for email, source in emails_with_source:
try:
local, domain = email.lower().split("@", 1)
except ValueError:
continue
if local in LOCALPART_BLACKLIST:
continue
if any(domain.endswith(p) for p in PUBLISHER_EMAIL_DOMAINS):
continue
filtered.add((email, source))
return filtered
def get_crossref_emails(doi):
try:
r = requests.get(
f"https://api.crossref.org/works/{doi}",
headers=HEADERS,
timeout=10
)
r.raise_for_status()
except Exception:
return set()
emails = set()
for author in r.json().get("message", {}).get("author", []):
if "email" in author:
emails.add((author["email"], "crossref"))
return emails
def extract_emails_from_html(url):
try:
r = requests.get(url, headers=HEADERS, timeout=10)
r.raise_for_status()
except Exception:
return set()
return {(e, "website") for e in EMAIL_REGEX.findall(r.text)}
def get_scihub_pdf(doi, mirror_url=None):
if not mirror_url:
return None
try:
url = mirror_url + doi
r = requests.get(url, headers=HEADERS, timeout=10)
r.raise_for_status()
soup = BeautifulSoup(r.text, "html.parser")
obj_tag = soup.find("object", type="application/pdf")
if obj_tag and obj_tag.get("data"):
return urljoin(mirror_url, obj_tag["data"])
dl_tag = soup.select_one("div.download a[href]")
if dl_tag:
return urljoin(mirror_url, dl_tag["href"])
except Exception:
return None
return None
def find_pdf_url(soup, base_url):
if not soup:
return None
for a in soup.find_all("a", href=True):
if ".pdf" in a["href"].lower():
return urljoin(base_url, a["href"])
return None
def extract_emails_from_pdf(pdf_url, source):
try:
r = requests.get(pdf_url, headers=HEADERS, timeout=15)
r.raise_for_status()
except Exception:
return set()
emails = set()
output_string = io.StringIO()
try:
extract_text_to_fp(io.BytesIO(r.content), output_string, laparams=LAParams(), output_type='text')
text = output_string.getvalue()
for e in EMAIL_REGEX.findall(text):
emails.add((e, source))
except Exception:
pass
return emails
def find_author_emails(doi, scihub_mirror=None):
emails = set()
emails |= get_crossref_emails(doi)
# DOI landing page
try:
r = requests.get(f"https://doi.org/{doi}", headers=HEADERS, allow_redirects=True, timeout=10)
r.raise_for_status()
soup = BeautifulSoup(r.text, "html.parser")
emails |= extract_emails_from_html(r.url)
except Exception:
soup = None
# Sci-Hub first
scihub_pdf = get_scihub_pdf(doi, scihub_mirror)
if scihub_pdf:
emails |= extract_emails_from_pdf(scihub_pdf, "pdf (Sci-Hub)")
else:
pdf_url = find_pdf_url(soup, r.url) if soup else None
if pdf_url:
emails |= extract_emails_from_pdf(pdf_url, "pdf (landing page)")
return filter_publisher_emails(emails)
def read_input_file(path):
if path.lower().endswith(".csv"):
with open(path, newline="", encoding="utf-8") as f:
reader = csv.DictReader(f)
return list(reader), "csv"
if path.lower().endswith(".xlsx"):
wb = load_workbook(path)
ws = wb.active
headers = [c.value for c in ws[1]]
rows = []
for row in ws.iter_rows(min_row=2, values_only=True):
rows.append(dict(zip(headers, row)))
return rows, "xlsx"
raise ValueError("Unsupported file type")
def get_safe_output_path(input_path, filetype):
base = input_path.rsplit(".", 1)[0] + "_with_emails"
output_path = f"{base}.{filetype}"
counter = 1
while os.path.exists(output_path):
output_path = f"{base}_{counter}.{filetype}"
counter += 1
return output_path
def write_output_file(rows, input_path, filetype):
output_path = get_safe_output_path(input_path, filetype)
try:
if filetype == "csv":
with open(output_path, "w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=rows[0].keys())
writer.writeheader()
writer.writerows(rows)
else:
wb = Workbook()
ws = wb.active
ws.append(list(rows[0].keys()))
for row in rows:
ws.append(list(row.values()))
wb.save(output_path)
print(f"\nDone.\nOutput written to: {output_path}")
except PermissionError:
counter = 1
while True:
try:
alt_path = output_path.rsplit(".", 1)[0] + f"_{counter}.{filetype}"
if filetype == "csv":
with open(alt_path, "w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=rows[0].keys())
writer.writeheader()
writer.writerows(rows)
else:
wb.save(alt_path)
print(f"\nDone.\nOutput written to: {alt_path}")
break
except PermissionError:
counter += 1
def setup_ctrlc_handler(rows, input_path, filetype):
def handler(signum, frame):
print("\n[INFO] Ctrl+C detected. Saving progress...")
write_output_file(rows, input_path, filetype)
print("[INFO] Partial results saved. Exiting.")
sys.exit(0)
signal.signal(signal.SIGINT, handler)
def get_scihub_mirror_from_user():
mirror = input("Enter Sci-Hub mirror URL (e.g., sci-hub.st or https://sci-hub.st/) or leave blank to skip: ").strip()
if not mirror:
return None
if not mirror.startswith("http"):
mirror = "https://" + mirror
if not mirror.endswith("/"):
mirror += "/"
return mirror
if __name__ == "__main__":
root = tk.Tk()
root.withdraw()
file_path = filedialog.askopenfilename(
title="Select CSV or XLSX file",
filetypes=[("CSV / Excel", "*.csv *.xlsx")]
)
if not file_path:
print("No file selected.")
raise SystemExit
scihub_mirror = get_scihub_mirror_from_user()
rows, filetype = read_input_file(file_path)
print(f"Processing {len(rows)} DOI(s)...\n")
setup_ctrlc_handler(rows, file_path, filetype)
for i, row in enumerate(rows, start=1):
doi = normalize_doi(row.get("DOI", ""))
if doi:
print(f"[{i}/{len(rows)}] Processing DOI: {doi}")
emails = sorted({e for e, _ in find_author_emails(doi, scihub_mirror)})
print(f" Found {len(emails)} email(s): {', '.join(emails) if emails else 'None'}")
row["author_emails"] = "; ".join(emails)
else:
print(f"[{i}/{len(rows)}] DOI missing or empty")
row["author_emails"] = ""
write_output_file(rows, file_path, filetype)