herolib_python/lib/downloader/downloader.py
2025-08-05 15:15:36 +02:00

413 lines
18 KiB
Python

import json
import logging
import mimetypes # Added
import os
from datetime import datetime, timedelta
from urllib.parse import urljoin, urlparse
import scrapy
from scrapy.crawler import CrawlerProcess
from scrapy.linkextractors import LinkExtractor
from scrapy.utils.project import get_project_settings
# Configure logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
STATE_FILE_NAME = ".download_state.json"
class GenericDownloaderSpider(scrapy.Spider):
name = "generic_downloader"
def __init__(
self,
start_url,
dest_dir,
allowed_domains,
ignore_paths=None,
depth_limit=0,
follow_links=True,
max_age_hours=0,
state_data=None,
*args,
**kwargs,
):
super(GenericDownloaderSpider, self).__init__(*args, **kwargs)
self.start_urls = [start_url]
self.dest_dir = dest_dir
self.allowed_domains = allowed_domains
self.ignore_paths = ignore_paths if ignore_paths else []
self.depth_limit = int(depth_limit)
self.follow_links = bool(follow_links)
self.max_age_hours = int(max_age_hours)
self.state_data = state_data if state_data else {}
self.link_extractor = LinkExtractor(allow_domains=self.allowed_domains)
os.makedirs(self.dest_dir, exist_ok=True)
logger.info(f"Downloader initialized for {start_url}")
logger.info(f"Destination directory: {self.dest_dir}")
logger.info(f"Allowed domains: {self.allowed_domains}")
logger.info(f"Ignore paths: {self.ignore_paths}")
logger.info(f"Depth limit: {self.depth_limit}")
logger.info(f"Follow links: {self.follow_links}")
logger.info(f"Max age (hours): {self.max_age_hours}")
def _should_ignore(self, url_path):
for pattern in self.ignore_paths:
if pattern in url_path: # Simple substring match for now, can be regex
return True
return False
def _get_file_path(self, response): # Changed signature to take response
url = response.url
parsed_url = urlparse(url)
original_path = parsed_url.path # e.g. /foo/bar.html or /foo/ or /
# Determine base_name and current_ext from original_path
if original_path.endswith("/"):
base_name = "index"
current_ext = ""
# path_for_dirs is the path part that forms the directory structure
path_for_dirs = original_path.lstrip("/")
else:
path_basename = os.path.basename(original_path)
if (
not path_basename and original_path == "/"
): # Root path e.g. http://example.com
base_name = "index"
current_ext = ""
else: # e.g. /file.txt or /file_no_ext or /.config
base_name, current_ext = os.path.splitext(path_basename)
if not base_name and current_ext: # Hidden file like /.bashrc
base_name = current_ext # Treat .bashrc as base_name
current_ext = "" # No further extension part
path_for_dirs = os.path.dirname(original_path.lstrip("/"))
# Try to get extension from Content-Type
content_type = (
response.headers.get("Content-Type", b"")
.decode("utf-8")
.split(";")[0]
.strip()
)
mime_ext = mimetypes.guess_extension(content_type) if content_type else None
final_ext = current_ext
if mime_ext and not current_ext: # No path extension, use MIME type's
final_ext = mime_ext
elif (
mime_ext
and current_ext.lower() in [".htm", ".html"]
and mime_ext
and mime_ext.lower() not in [".htm", ".html"]
):
# Path had .html/.htm, but MIME type suggests something more specific
final_ext = mime_ext
logger.debug(
f"URL {url}: Path ext {current_ext} overridden by Content-Type ext {mime_ext}."
)
elif not final_ext and (
content_type.startswith("text/")
or content_type
in ["application/javascript", "application/json", "application/xml"]
):
# Fallback for common text types if no extension determined yet and no path ext
if not base_name.endswith(
(".js", ".css", ".json", ".xml", ".txt")
): # Avoid double .html.html
final_ext = ".html"
filename = base_name + final_ext
# Create path components for the directory structure
components = []
if path_for_dirs:
components.extend(comp for comp in path_for_dirs.split("/") if comp)
components.append(filename)
# Sanitize components
sane_components = []
for comp_idx, comp_val in enumerate(components):
# Basic sanitization: replace invalid chars, limit length, avoid '..'
# Allow '.' for filenames but not as a full component name if it's not the only char
if comp_val == "..":
continue # Skip parent dir references in path construction
sane_comp = "".join(
c if c.isalnum() or c in ["-", "_", "."] else "_" for c in comp_val
)
sane_comp = sane_comp[:150] # Limit component length
if (
not sane_comp and comp_idx == len(components) - 1
): # last component (filename) became empty
sane_comp = "downloaded_file" + final_ext # fallback filename
elif not sane_comp:
sane_comp = "_" # placeholder for empty dir name
if sane_comp: # Ensure component is not empty after sanitization
sane_components.append(sane_comp)
if not sane_components: # If all components were sanitized away or skipped
sane_components = [filename if filename else "unknown_file" + final_ext]
file_path = os.path.join(self.dest_dir, *sane_components)
return file_path
def parse(self, response, depth=0):
url = response.url
logger.info(f"Processing URL (depth {depth}): {url}")
parsed_url = urlparse(url)
if self._should_ignore(parsed_url.path):
logger.info(f"Ignoring URL (matches ignore_paths): {url}")
return
file_path = self._get_file_path(response) # Pass response object
# Check download state and max_age
if url in self.state_data:
url_state = self.state_data[url]
last_download_time_str = url_state.get("timestamp")
# Consider previous status; only skip if it was a success or another skip
can_skip_based_on_history = url_state.get("status", "").startswith(
"success"
) or url_state.get("status", "").startswith("skipped")
if last_download_time_str and can_skip_based_on_history:
last_download_time = datetime.fromisoformat(last_download_time_str)
if self.max_age_hours > 0 and (
datetime.utcnow() - last_download_time
) < timedelta(hours=self.max_age_hours):
logger.info(
f"Skipping download for {url}, recently processed at {last_download_time_str} with status '{url_state.get('status')}'."
)
# Update state to reflect this skip check
self.state_data[url]["status"] = "skipped_max_age"
self.state_data[url]["skipped_timestamp"] = (
datetime.utcnow().isoformat()
)
# Still need to check for links if recursive
# Corrected depth condition:
# Follow if self.depth_limit is 0 (infinite) OR current depth is less than a positive limit.
if self.follow_links and (
self.depth_limit == 0 or depth < self.depth_limit
):
for link in self.link_extractor.extract_links(response):
parsed_link_url = urlparse(link.url)
if not self._should_ignore(parsed_link_url.path):
yield response.follow(link, callback=self.parse)
else:
logger.info(
f"Ignoring extracted link (matches ignore_paths): {link.url}"
)
return
logger.info(f"Processing and saving {url} to {file_path}")
os.makedirs(os.path.dirname(file_path), exist_ok=True)
try:
with open(file_path, "wb") as f:
f.write(response.body)
logger.info(f"Successfully saved {url} to {file_path}")
self.state_data[url] = {
"timestamp": datetime.utcnow().isoformat(),
"status": "success",
"path": file_path,
"size": len(response.body),
}
except Exception as e:
logger.error(f"Failed to save {url} to {file_path}: {e}")
self.state_data[url] = {
"timestamp": datetime.utcnow().isoformat(),
"status": "failed",
"error": str(e),
}
return # Do not proceed further if save failed
# Corrected depth condition for following links:
# Follow if self.depth_limit is 0 (infinite) OR current depth is less than a positive limit.
if self.follow_links and (self.depth_limit == 0 or depth < self.depth_limit):
logger.info(
f"Following links from {url} at custom depth {depth} (for next level {depth + 1})"
)
extracted_links = list(self.link_extractor.extract_links(response))
if not extracted_links:
logger.info(f" No links extracted from {url} by LinkExtractor.")
else:
logger.info(
f" LinkExtractor found {len(extracted_links)} links from {url}: {[l.url for l in extracted_links]}"
)
for link_idx, link in enumerate(extracted_links):
logger.debug(
f" Considering link {link_idx + 1}/{len(extracted_links)}: Text='{link.text}', URL='{link.url}'"
)
parsed_link_url = urlparse(link.url)
if self._should_ignore(parsed_link_url.path):
logger.info(
f" Ignoring extracted link (matches ignore_paths): {link.url}"
)
else:
logger.info(
f" Yielding request for: {link.url} (to be processed at custom depth {depth + 1})"
)
yield response.follow(link, callback=self.parse)
def closed(self, reason):
logger.info(f"Spider closed: {reason}. Finalizing and saving state.")
state_file_path = os.path.join(self.dest_dir, STATE_FILE_NAME)
try:
# Ensure the directory for the state file exists, though dest_dir should already.
os.makedirs(os.path.dirname(state_file_path), exist_ok=True)
with open(state_file_path, "w") as f:
json.dump(self.state_data, f, indent=4)
logger.info(
f"Spider successfully saved state ({len(self.state_data)} items) to {state_file_path}"
)
except Exception as e:
logger.error(
f"Spider failed to save state to {state_file_path}: {e}", exc_info=True
)
def download_site(
start_url,
dest_dir,
recursive=True,
ignore_paths=None,
depth_limit=0, # 0 means no limit if recursive is True
follow_links=True, # This is somewhat redundant if recursive is True, but good for clarity
max_age_hours=24, # Re-download if older than 24 hours
):
"""
Downloads a website or a single page.
:param start_url: The URL to start downloading from.
:param dest_dir: The directory to save downloaded files.
:param recursive: Whether to download recursively.
:param ignore_paths: A list of path substrings or regex patterns to ignore.
:param depth_limit: Maximum depth for recursive downloads (0 for no limit).
:param follow_links: Whether to follow links on pages (primarily for recursive).
:param max_age_hours: Max age of a file in hours. If a file was downloaded
more recently than this, it won't be re-downloaded.
0 means always re-download.
:return: A dictionary summarizing the download process.
"""
parsed_url = urlparse(start_url)
if not parsed_url.scheme or not parsed_url.netloc:
logger.error(
f"Invalid start_url: {start_url}. Must be a full URL (e.g., http://example.com)"
)
return None
allowed_domains = [parsed_url.hostname] # Changed from netloc to hostname
state_file_path = os.path.join(dest_dir, STATE_FILE_NAME)
state_data = {}
if os.path.exists(state_file_path):
try:
with open(state_file_path, "r") as f:
state_data = json.load(f)
logger.info(f"Loaded download state from {state_file_path}")
except json.JSONDecodeError:
logger.warning(
f"Could not decode JSON from state file {state_file_path}. Starting fresh."
)
except Exception as e:
logger.error(
f"Error loading state file {state_file_path}: {e}. Starting fresh."
)
settings = get_project_settings()
settings.set("ROBOTSTXT_OBEY", False) # Explicitly disable robots.txt
# settings.set('LOG_LEVEL', 'DEBUG') # Optionally enable for more Scrapy internal logs
effective_scrapy_depth = 0 # Default for non-recursive or depth_limit=0 with recursion (0 means infinite for Scrapy)
if recursive and int(depth_limit) > 0:
effective_scrapy_depth = int(depth_limit)
# If not recursive, effective_scrapy_depth remains 0.
# If recursive and depth_limit is 0, effective_scrapy_depth remains 0 (infinite).
settings.set("DEPTH_LIMIT", effective_scrapy_depth)
logger.info(f"Scrapy DEPTH_LIMIT set to: {effective_scrapy_depth}")
# Scrapy's DEPTH_PRIORITY and SCHEDULER_DISK_QUEUE might be useful for large crawls
# For now, keeping it simple.
process = CrawlerProcess(settings)
# The spider needs to be instantiated with all its custom args
# Scrapy's process.crawl can take kwargs which are passed to the spider's __init__
process.crawl(
GenericDownloaderSpider,
start_url=start_url,
dest_dir=dest_dir,
allowed_domains=allowed_domains,
ignore_paths=ignore_paths,
depth_limit=int(depth_limit)
if recursive
else 0, # Spider handles its own depth based on this
follow_links=follow_links and recursive,
max_age_hours=int(max_age_hours),
state_data=state_data,
)
logger.info(f"Starting download process for {start_url}...")
process.start() # This will block until the crawl is finished
# The spider's closed() method is now responsible for writing the final state.
# Load this definitive state to build the summary.
final_state_data_for_summary = {}
if os.path.exists(state_file_path):
try:
with open(state_file_path, "r") as f:
final_state_data_for_summary = json.load(f)
logger.info(
f"Loaded final state ({len(final_state_data_for_summary)} items) from {state_file_path} for summary construction."
)
except json.JSONDecodeError as e:
logger.error(
f"Error decoding JSON from final state file {state_file_path} for summary: {e}. Summary will be based on empty or incomplete state."
)
except Exception as e:
logger.error(
f"Error loading final state from {state_file_path} for summary: {e}. Summary will be based on empty or incomplete state."
)
else:
logger.warning(
f"State file {state_file_path} not found after spider closed. Summary will be based on empty state."
)
summary = {
"start_url": start_url,
"dest_dir": dest_dir,
"total_processed_urls": len(final_state_data_for_summary),
"successful_downloads": 0,
"failed_downloads": 0,
"skipped_max_age": 0,
"total_bytes_downloaded": 0,
"state_file_path": state_file_path,
"errors": [],
}
# Populate summary from the final_state_data_for_summary loaded from the file
for url_key, data_val in final_state_data_for_summary.items():
status = data_val.get("status")
if status == "success":
summary["successful_downloads"] += 1
summary["total_bytes_downloaded"] += data_val.get("size", 0)
elif status == "failed":
summary["failed_downloads"] += 1
if "error" in data_val:
summary["errors"].append(f"URL: {url_key}, Error: {data_val['error']}")
elif status == "skipped_max_age":
summary["skipped_max_age"] += 1
# Any errors during state file loading for summary should also be noted if critical
# For now, the logs capture it. If final_state_data_for_summary is empty due to load error, summary will reflect that.
logger.info(f"Download process finished. Summary: {json.dumps(summary, indent=2)}")
return summary