import argparse import os import json import re import shutil from concurrent.futures import ThreadPoolExecutor, as_completed from internetarchive import get_item, search_items from bs4 import BeautifulSoup from warcio import ArchiveIterator import requests def list_warcs(collection_id): search = search_items(f'collection:{collection_id}') warc_files = [] for item in search: identifier = item['identifier'] ia_item = get_item(identifier) for file in ia_item.get_files(): if file.name.endswith(('.warc.gz', '.warc.zst')): warc_files.append((identifier, file.name)) return warc_files def download_warc(collection_id, identifier, file_name, output_dir): item = get_item(identifier) local_path = os.path.join(output_dir, file_name) if not os.path.exists(local_path): print(f"Downloading {file_name}...") item.download(files=[file_name], destdir=output_dir, ignore_existing=True, on_the_fly=True) return local_path def extract_urls(warc_file): url_pattern = re.compile(r'\S*imgur\.com\S*') with open(warc_file, 'rb') as stream: urls = set() for record in ArchiveIterator(stream): if record.rec_type == 'response': content_type = record.http_headers.get_header('Content-Type', '') if 'html' in content_type: content = record.content_stream().read() soup = BeautifulSoup(content, 'html.parser') urls.update(url_pattern.findall(str(soup))) return urls def process_warcs(collection_id, output_dir, max_workers=10, min_disk_space_gb=30): progress_file = "progress.json" if os.path.exists(progress_file): with open(progress_file, "r") as f: progress_data = json.load(f) processed_files = set(tuple(x) for x in progress_data.get("processed_files", [])) warcs = [tuple(x) for x in progress_data.get("remaining_files", [])] else: processed_files = set() warcs = list_warcs(collection_id) def save_progress(processed, remaining): with open(progress_file, "w") as f: json.dump({"processed_files": list(processed), "remaining_files": remaining}, f) def download_and_process(identifier, file_name, output_dir): warc_file = download_warc(collection_id, identifier, file_name, output_dir) urls = extract_urls(warc_file) os.remove(warc_file) return urls all_processed = False while not all_processed: all_processed = True skipped_files = [] with ThreadPoolExecutor(max_workers=max_workers) as executor: tasks = {} for identifier, file_name in warcs: # Check free disk space _, _, free = shutil.disk_usage(output_dir) free_gb = free // (2**30) # If there's enough free disk space, start the download and processing task if free_gb >= min_disk_space_gb: task = executor.submit(download_and_process, identifier, file_name, output_dir) tasks[task] = (identifier, file_name) else: print(f"Not enough disk space to download {file_name}. Skipping...") all_processed = False skipped_files.append((identifier, file_name)) with open('output', 'a') as output_file: for task in as_completed(tasks): urls = task.result() identifier, file_name = tasks[task] for url in urls: output_file.write(f"{url}\n") processed_files.add((identifier, file_name)) save_progress(processed_files, skipped_files) warcs = skipped_files if __name__ == "__main__": parser = argparse.ArgumentParser(description="Download and extract URLs from WARCs in an Internet Archive collection.") parser.add_argument("collection_id", help="The collection identifier to process.") parser.add_argument("output_dir", help="The directory to download WARCs and store the output file.") args = parser.parse_args() os.makedirs(args.output_dir, exist_ok=True) process_warcs(args.collection_id, args.output_dir)