import argparse
import os
import json
import re
import shutil
from concurrent.futures import ThreadPoolExecutor, as_completed
from internetarchive import get_item, search_items
from bs4 import BeautifulSoup
from warcio import ArchiveIterator
import requests
def list_warcs(collection_id):
search = search_items(f'collection:{collection_id}')
warc_files = []
for item in search:
identifier = item['identifier']
ia_item = get_item(identifier)
for file in ia_item.get_files():
if file.name.endswith(('.warc.gz', '.warc.zst')):
warc_files.append((identifier, file.name))
return warc_files
def download_warc(collection_id, identifier, file_name, output_dir):
item = get_item(identifier)
local_path = os.path.join(output_dir, file_name)
if not os.path.exists(local_path):
print(f"Downloading {file_name}...")
item.download(files=[file_name], destdir=output_dir, ignore_existing=True, on_the_fly=True)
return local_path
def extract_urls(warc_file):
url_pattern = re.compile(r'\S*imgur\.com\S*')
with open(warc_file, 'rb') as stream:
urls = set()
for record in ArchiveIterator(stream):
if record.rec_type == 'response':
content_type = record.http_headers.get_header('Content-Type', '')
if 'html' in content_type:
content = record.content_stream().read()
soup = BeautifulSoup(content, 'html.parser')
urls.update(url_pattern.findall(str(soup)))
return urls
def process_warcs(collection_id, output_dir, max_workers=10, min_disk_space_gb=30):
progress_file = "progress.json"
if os.path.exists(progress_file):
with open(progress_file, "r") as f:
progress_data = json.load(f)
processed_files = set(tuple(x) for x in progress_data.get("processed_files", []))
warcs = [tuple(x) for x in progress_data.get("remaining_files", [])]
else:
processed_files = set()
warcs = list_warcs(collection_id)
def save_progress(processed, remaining):
with open(progress_file, "w") as f:
json.dump({"processed_files": list(processed), "remaining_files": remaining}, f)
def download_and_process(identifier, file_name, output_dir):
warc_file = download_warc(collection_id, identifier, file_name, output_dir)
urls = extract_urls(warc_file)
os.remove(warc_file)
return urls
all_processed = False
while not all_processed:
all_processed = True
skipped_files = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
tasks = {}
for identifier, file_name in warcs:
# Check free disk space
_, _, free = shutil.disk_usage(output_dir)
free_gb = free // (2**30)
# If there's enough free disk space, start the download and processing task
if free_gb >= min_disk_space_gb:
task = executor.submit(download_and_process, identifier, file_name, output_dir)
tasks[task] = (identifier, file_name)
else:
print(f"Not enough disk space to download {file_name}. Skipping...")
all_processed = False
skipped_files.append((identifier, file_name))
with open('output', 'a') as output_file:
for task in as_completed(tasks):
urls = task.result()
identifier, file_name = tasks[task]
for url in urls:
output_file.write(f"{url}\n")
processed_files.add((identifier, file_name))
save_progress(processed_files, skipped_files)
warcs = skipped_files
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Download and extract URLs from WARCs in an Internet Archive collection.")
parser.add_argument("collection_id", help="The collection identifier to process.")
parser.add_argument("output_dir", help="The directory to download WARCs and store the output file.")
args = parser.parse_args()
os.makedirs(args.output_dir, exist_ok=True)
process_warcs(args.collection_id, args.output_dir)