import http.server import json import logging import multiprocessing import os import queue # For queue.Empty exception import shutil # For removing temp dir if TemporaryDirectory context manager not used for whole scope import socketserver import sys import tempfile import time import requests # For checking server readiness # Adjust the Python path to include the parent directory (project root) # so that 'lib.downloader' can be imported. sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) from lib.downloader import STATE_FILE_NAME, download_site # Configure logging for the example script logger = logging.getLogger(__name__) logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" ) # This function needs to be at the top level for multiprocessing to find it. def run_download_test_process(test_name, downloader_kwargs, queue): """ Wrapper to run download_site in a separate process and put summary in a queue. """ logger.info(f"--- Running Test in subprocess: {test_name} ---") summary = None try: summary = download_site(**downloader_kwargs) logger.info(f"Test {test_name} completed in subprocess.") except Exception as e: logger.error(f"Error in test {test_name} (subprocess): {e}", exc_info=True) # summary will remain None or be an incomplete one if error is after its creation finally: queue.put({"test_name": test_name, "summary": summary}) def create_temp_site_files(base_dir): """Creates the dummy HTML files in a 'test_site' subdirectory of base_dir.""" site_dir = os.path.join(base_dir, "test_site") os.makedirs(os.path.join(site_dir, "sub"), exist_ok=True) with open(os.path.join(site_dir, "index.html"), "w") as f: f.write( '

Index

Page 1 Page 2 Ignored Non Existent' ) with open(os.path.join(site_dir, "page1.html"), "w") as f: f.write('

Page 1

Index') with open(os.path.join(site_dir, "sub", "page2.html"), "w") as f: f.write( '

Page 2

Index Back External' ) with open(os.path.join(site_dir, "ignored_page.html"), "w") as f: f.write("

Ignored Page

") logger.info(f"Created dummy site files in {site_dir}") return site_dir # Top-level target function for the HTTP server process def _http_server_target_function(directory, host, port): import functools # Use functools.partial to set the 'directory' argument for SimpleHTTPRequestHandler # This ensures the server serves files from the specified 'directory'. Handler = functools.partial( http.server.SimpleHTTPRequestHandler, directory=directory ) try: with socketserver.TCPServer((host, port), Handler) as httpd: logger.info( f"HTTP server process (PID {os.getpid()}) started on {host}:{port}, serving {directory}" ) httpd.serve_forever() except Exception as e: logger.error( f"HTTP server process (PID {os.getpid()}) failed: {e}", exc_info=True ) raise def start_http_server_process(directory, host, port): """Starts a simple HTTP server in a separate process.""" # Removed server_ready_event = multiprocessing.Event() server_process = multiprocessing.Process( target=_http_server_target_function, args=(directory, host, port), # Removed server_ready_event from args daemon=True, ) server_process.start() logger.info( f"HTTP server process (PID: {server_process.pid}) initiated for {directory} on {host}:{port}" ) # Removed event waiting logic return server_process def find_free_port(): """Finds an available port on the local machine.""" with socketserver.TCPServer( ("localhost", 0), http.server.BaseHTTPRequestHandler ) as s: return s.server_address[1] def check_server_ready(url, retries=10, delay=0.5): """Checks if the server is responding to requests.""" for i in range(retries): try: response = requests.get(url, timeout=1) if response.status_code == 200: logger.info(f"Server is ready at {url}") return True except requests.ConnectionError: logger.debug( f"Server not ready yet at {url}, attempt {i + 1}/{retries}. Retrying in {delay}s..." ) except requests.Timeout: logger.debug( f"Server timed out at {url}, attempt {i + 1}/{retries}. Retrying in {delay}s..." ) time.sleep(delay) logger.error(f"Server failed to start at {url} after {retries} retries.") return False def main(): # Using TemporaryDirectory for automatic cleanup with tempfile.TemporaryDirectory(prefix="downloader_test_") as temp_base_dir: logger.info(f"Created temporary base directory: {temp_base_dir}") # 1. Create the dummy website files site_root_path = create_temp_site_files( temp_base_dir ) # This is /tmp/xxxx/test_site # 2. Start the HTTP server host = "localhost" port = find_free_port() server_process = start_http_server_process(site_root_path, host, port) test_url_base = f"http://{host}:{port}/" # Server serves from site_root_path, so URLs are relative to that # 3. Check if server is ready # We check the index.html which is at the root of what's being served if not check_server_ready(test_url_base + "index.html"): logger.error("Test server failed to become ready. Aborting tests.") if server_process.is_alive(): server_process.terminate() server_process.join(timeout=5) return # 4. Define test parameters # Destination for downloaded content will also be inside the temp_base_dir download_destination_root = os.path.join(temp_base_dir, "downloaded_content") os.makedirs(download_destination_root, exist_ok=True) tests_params_config = [ ( "1: Basic recursive download (depth 2)", { "start_url": test_url_base + "index.html", "dest_dir": os.path.join(download_destination_root, "test1"), "recursive": True, "follow_links": True, "depth_limit": 2, "max_age_hours": 0, }, ), ( "2: With ignore_paths and max_age (reuse test1 dir)", { "start_url": test_url_base + "index.html", "dest_dir": os.path.join( download_destination_root, "test1" ), # Use same dest "recursive": True, "follow_links": True, "depth_limit": 2, "ignore_paths": ["ignored_page.html"], "max_age_hours": 1, # Should skip files from test1 if downloaded recently }, ), ( "3: Non-recursive (single page)", { "start_url": test_url_base + "page1.html", "dest_dir": os.path.join(download_destination_root, "test3"), "recursive": False, # Effectively depth_limit 0 for the spider "max_age_hours": 0, }, ), ( "4: Depth limit 0 (only start_url)", { "start_url": test_url_base + "index.html", "dest_dir": os.path.join(download_destination_root, "test4_depth0"), "recursive": True, # 'recursive' flag enables depth control "follow_links": True, "depth_limit": 0, # Spider should only download index.html "max_age_hours": 0, }, ), ( "5: Depth limit 1", { "start_url": test_url_base + "index.html", "dest_dir": os.path.join(download_destination_root, "test5_depth1"), "recursive": True, "follow_links": True, "depth_limit": 1, # index.html and its direct links "max_age_hours": 0, }, ), ] # 5. Run tests using multiprocessing # A queue to get results back from subprocesses results_queue = multiprocessing.Queue() processes = [] for test_name, downloader_kwargs in tests_params_config: # Ensure dest_dir exists for each test before starting os.makedirs(downloader_kwargs["dest_dir"], exist_ok=True) p = multiprocessing.Process( target=run_download_test_process, args=(test_name, downloader_kwargs, results_queue), ) processes.append(p) p.start() # Wait for all processes to complete for p in processes: p.join() # Collect and print results logger.info("\n--- All Test Processes Completed. Results: ---") all_tests_passed = True results_collected = 0 failed_tests_details = [] # Store details of failed tests # ANSI escape codes for colors RED = "\033[91m" GREEN = "\033[92m" RESET = "\033[0m" while results_collected < len(tests_params_config): current_test_passed = True failure_reason = "" try: result = results_queue.get(timeout=10) # Timeout to avoid hanging results_collected += 1 test_name = result["test_name"] summary = result["summary"] print(f"\nResult for Test: {test_name}") if summary: print(f" Summary: {json.dumps(summary, indent=2)}") # Basic check: if errors array in summary is empty, consider it a pass for now if summary.get("errors") and len(summary.get("errors")) > 0: failure_reason = ( f"Reported errors in summary: {summary.get('errors')}" ) logger.error(f" Test '{test_name}' {failure_reason}") current_test_passed = False elif ( summary.get("successful_downloads", 0) == 0 and not ( test_name.startswith( "4:" ) # Test 4 might have 0 successful if only start_url is processed and summary.get("total_processed_urls", 0) > 0 # and it was processed ) and not test_name.startswith( "2:" ) # Test 2 might have 0 successful if all skipped ): # This condition is a bit loose. Specific checks below are more important. # For now, we don't mark as failed here unless other checks also fail. pass # Specific checks for state and re-download if test_name.startswith("1:"): # After Test 1 state_file = summary.get("state_file_path") if state_file and os.path.exists(state_file): with open(state_file, "r") as f: state = json.load(f) expected_success_files = [ test_url_base + "index.html", test_url_base + "page1.html", test_url_base + "sub/page2.html", ] actual_success_count = 0 for url, data in state.items(): if ( url in expected_success_files and data.get("status") == "success" ): actual_success_count += 1 if actual_success_count >= 3: logger.info( f" Test 1: State file check PASSED for key successful files." ) else: failure_reason = f"State file check FAILED. Expected ~3 successes, got {actual_success_count}. State: {state}" logger.error(f" Test 1: {failure_reason}") current_test_passed = False else: failure_reason = ( "State file not found or summary incomplete." ) logger.error(f" Test 1: {failure_reason}") current_test_passed = False elif test_name.startswith( "2:" ): # After Test 2 (re-run on test1 dir) state_file = summary.get("state_file_path") if state_file and os.path.exists(state_file): with open(state_file, "r") as f: state = json.load(f) skipped_count = 0 main_files_to_check_skip = [ test_url_base + "index.html", test_url_base + "page1.html", test_url_base + "sub/page2.html", ] for url_to_check in main_files_to_check_skip: if ( url_to_check in state and state[url_to_check].get("status") == "skipped_max_age" ): skipped_count += 1 if skipped_count >= 3: logger.info( f" Test 2: Re-download check (skipped_max_age) PASSED for key files." ) else: failure_reason = f"Re-download check FAILED. Expected ~3 skips, got {skipped_count}. State: {state}" logger.error(f" Test 2: {failure_reason}") current_test_passed = False if ( test_url_base + "ignored_page.html" in state and state[test_url_base + "ignored_page.html"].get( "status" ) == "success" ): ignore_fail_reason = "ignored_page.html was downloaded, but should have been ignored." logger.error(f" Test 2: {ignore_fail_reason}") if not failure_reason: failure_reason = ignore_fail_reason else: failure_reason += f"; {ignore_fail_reason}" current_test_passed = False else: failure_reason = ( "State file not found or summary incomplete." ) logger.error(f" Test 2: {failure_reason}") current_test_passed = False elif test_name.startswith("4:"): # Depth 0 state_file = summary.get("state_file_path") if state_file and os.path.exists(state_file): with open(state_file, "r") as f: state = json.load(f) if ( len(state) == 1 and (test_url_base + "index.html") in state and ( state[test_url_base + "index.html"].get("status") == "success" # Allow "failed" for depth 0 if the single URL itself failed, # as the test is about *not* crawling further. or state[test_url_base + "index.html"].get("status") == "failed" ) ): logger.info( f" Test 4: Depth 0 check PASSED (1 item in state)." ) else: failure_reason = f"Depth 0 check FAILED. Expected 1 item processed, got {len(state)}. State: {state}" logger.error(f" Test 4: {failure_reason}") current_test_passed = False else: failure_reason = ( "State file not found or summary incomplete." ) logger.error(f" Test 4: {failure_reason}") current_test_passed = False else: failure_reason = ( "Did not return a summary (likely failed hard in subprocess)." ) logger.error(f" Test '{test_name}' {failure_reason}") current_test_passed = False except queue.Empty: # Changed from multiprocessing.queues.Empty test_name = f"Unknown Test (result {results_collected + 1} of {len(tests_params_config)})" failure_reason = "Queue was empty after waiting, a subprocess might have died without putting result." logger.error(failure_reason) current_test_passed = False # Do not break here, try to collect other results if any. # Instead, mark this attempt as a failure. # We increment results_collected because we "processed" an attempt to get a result. if not current_test_passed: all_tests_passed = False failed_tests_details.append( {"name": test_name, "reason": failure_reason} ) # 6. Terminate the server logger.info("Terminating HTTP server process...") if server_process.is_alive(): server_process.terminate() server_process.join(timeout=5) # Wait for it to terminate if server_process.is_alive(): logger.warning( "Server process did not terminate gracefully, attempting to kill." ) server_process.kill() # Force kill if terminate didn't work server_process.join(timeout=5) if server_process.is_alive(): logger.error("SERVER PROCESS COULD NOT BE STOPPED.") else: logger.info("HTTP server process stopped.") if failed_tests_details: logger.error(f"\n--- {RED}Summary of Failed Tests{RESET} ---") for failed_test in failed_tests_details: logger.error(f"{RED} Test: {failed_test['name']}{RESET}") logger.error(f"{RED} Reason: {failed_test['reason']}{RESET}") logger.error(f"\n{RED}Some downloader tests FAILED.{RESET}") sys.exit(1) # Exit with error code if tests failed else: logger.info( f"\n{GREEN}All downloader tests PASSED (based on implemented checks).{RESET}" ) # Note: TemporaryDirectory temp_base_dir is automatically cleaned up here logger.info( f"Temporary base directory {temp_base_dir} and its contents (should be) removed." ) if __name__ == "__main__": # It's good practice to protect the main call for multiprocessing, # especially on Windows, though 'spawn' (default on macOS for 3.8+) is generally safer. multiprocessing.freeze_support() # For PyInstaller compatibility, good habit main()