import csv import os import re import time import errno from datetime import datetime from selenium import webdriver from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.common.exceptions import TimeoutException from bs4 import BeautifulSoup BASE_URL = "https://www.lebensmittel-sonderposten.de/Suessigkeiten/?p={page}" def parse_price(price_str): # Keep decimal comma, remove other characters cleaned = re.sub(r"[^\d,\.]", "", price_str or "") if "," in cleaned and "." in cleaned: cleaned = cleaned.replace(".", "") # remove thousands separator try: return cleaned # keep as string with comma except ValueError: return "0,0" # Selenium opts = Options() opts.add_argument("--headless") opts.add_argument("--disable-gpu") opts.add_argument("--no-sandbox") opts.add_argument("--disable-blink-features=AutomationControlled") opts.add_argument("start-maximized") opts.add_argument( "user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" ) driver = webdriver.Chrome(options=opts) wait = WebDriverWait(driver, 10) def get_products(page, retries=3): url = BASE_URL.format(page=page) for attempt in range(retries): driver.get(url) try: wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, ".product-box"))) time.sleep(1) break except TimeoutException: print(f"⚠️ Timeout loading page {page}, retry {attempt+1}/{retries}") if attempt == retries - 1: return [] soup = BeautifulSoup(driver.page_source, "html.parser") products = [] for product in soup.select(".product-box"): name_el = product.select_one("a.product-name") price_el = product.select_one(".product-price.with-list-price, .product-price") list_price_el = product.select_one(".list-price-price") saving_el = product.select_one(".list-price-percentage") base_price_el = product.select_one(".product-base-price") unit_el = product.select_one(".price-unit-content") if not name_el or not price_el: continue name = name_el.text.strip() url = name_el.get("href") # Extract prices with regex price = "0,0" if price_el and price_el.contents: text = price_el.get_text(" ", strip=True) match = re.search(r"[\d,.]+", text) if match: price = match.group(0).replace(".", "").replace(",", ",") list_price = price if list_price_el: match = re.search(r"[\d,.]+", list_price_el.text) if match: list_price = match.group(0).replace(".", "").replace(",", ",") saving_percent = "0,0" if saving_el: match = re.search(r"([\d,.]+)", saving_el.text) if match: saving_percent = match.group(1).replace(".", ",") base_price = None if base_price_el: match = re.search(r"[\d,.]+", base_price_el.text) if match: base_price = match.group(0).replace(".", ",") unit = unit_el.text.strip() if unit_el else None products.append({ "name": name, "url": url, "price": price, "list_price": list_price, "saving": saving_percent, "price_per_kg": base_price, "unit": unit }) return products def safe_write_csv(filename, data): """Write CSV safely, retrying if permission denied, keeping decimal commas.""" max_tries = 100 attempt = 0 base, ext = os.path.splitext(filename) csv_file = None while attempt < max_tries: suffix = f"_{attempt}" if attempt > 0 else "" candidate = f"{base}{suffix}{ext}" try: with open(candidate, mode="w", newline="", encoding="utf-8") as f: writer = csv.DictWriter( f, fieldnames=["name", "url", "price", "list_price", "saving", "price_per_kg", "unit"], delimiter=";" ) writer.writeheader() for p in data: writer.writerow(p) csv_file = candidate break except PermissionError: attempt += 1 continue except OSError as e: if e.errno in (errno.EACCES, errno.EPERM): attempt += 1 continue raise if csv_file: print(f"✅ Saved {len(data)} products to {csv_file}") else: print("❌ Failed to save CSV file after multiple attempts.") # scraping loop all_products = [] page = 1 current_datetime = datetime.now().strftime("%Y-%m-%d_%H-%M") csv_filename = f"sonderposten_{current_datetime}.csv" try: while True: print(f"Scraping page {page}...") products = get_products(page) soup = BeautifulSoup(driver.page_source, "html.parser") alert = soup.select_one("div.alert.alert-info[role='alert']") if alert and "Keine Produkte gefunden" in alert.text: print("✅ No more products found, stopping.") break if not products: print("⚠️ No products found on this page, stopping.") break all_products.extend(products) page += 1 except KeyboardInterrupt: print("\n⏹️ Scraping cancelled — saving current progress...") finally: driver.quit() # Always sort by saving descending all_products.sort(key=lambda x: float(x["saving"].replace(",", ".")), reverse=True) safe_write_csv(csv_filename, all_products)