Import argparse
Import os
Import json
Import re
Import shutil
From concurrent.futures import threadpoolexecutor, as_completed
From internetarchive import get_item, search_items
From bs4 import beautifulsoup
From warcio import archiveiterator
Import requests

Def list_warcs(collection_id):
Search = search_items(f'collection:{collection_id}')
Warc_files = []

For item in search:
Identifier = item['identifier']
Ia_item = get_item(identifier)

For file in ia_item.get_files():
If file.name.endswith(('.warc.gz', '.warc.zst')):
Warc_files.append((identifier, file.name))

Return warc_files

Def download_warc(collection_id, identifier, file_name, output_dir):
Item = get_item(identifier)
Local_path = os.path.join(output_dir, file_name)

If not os.path.exists(local_path):
Print(f"downloading {file_name}...")
Item.download(files=[file_name], destdir=output_dir, ignore_existing=true, on_the_fly=true)

Return local_path

Def extract_urls(warc_file):
Url_pattern = re.compile(r'\s*imgur\.com\s*')

With open(warc_file, 'rb') as stream:
Urls = set()
For record in archiveiterator(stream):
If record.rec_type == 'response':
Content_type = record.http_headers.get_header('content-type', '')
If 'html' in content_type:
Content = record.content_stream().read()
Soup = beautifulsoup(content, 'html.parser')
Urls.update(url_pattern.findall(str(soup)))

Return urls

Def process_warcs(collection_id, output_dir, max_workers=10, min_disk_space_gb=30):
Progress_file = "progress.json"

If os.path.exists(progress_file):
With open(progress_file, "r") as f:
Progress_data = json.load(f)
Processed_files = set(tuple(x) for x in progress_data.get("processed_files", []))
Warcs = [tuple(x) for x in progress_data.get("remaining_files", [])]
Else:
Processed_files = set()
Warcs = list_warcs(collection_id)

Def save_progress(processed, remaining):
With open(progress_file, "w") as f:
Json.dump({"processed_files": list(processed), "remaining_files": remaining}, f)

Def download_and_process(identifier, file_name, output_dir):
Warc_file = download_warc(collection_id, identifier, file_name, output_dir)
Urls = extract_urls(warc_file)
Os.remove(warc_file)
Return urls

All_processed = false
While not all_processed:
All_processed = true
Skipped_files = []

With threadpoolexecutor(max_workers=max_workers) as executor:
Tasks = {}
For identifier, file_name in warcs:
# check free disk space
_, _, free = shutil.disk_usage(output_dir)
Free_gb = free // (2**30)

# if there's enough free disk space, start the download and processing task
If free_gb >= min_disk_space_gb:
Task = executor.submit(download_and_process, identifier, file_name, output_dir)
Tasks[task] = (identifier, file_name)
Else:
Print(f"not enough disk space to download {file_name}. skipping...")
All_processed = false
Skipped_files.append((identifier, file_name))

With open('output', 'a') as output_file:
For task in as_completed(tasks):
Urls = task.result()
Identifier, file_name = tasks[task]

For url in urls:
Output_file.write(f"{url}\n")

Processed_files.add((identifier, file_name))
Save_progress(processed_files, skipped_files)

Warcs = skipped_files

If __name__ == "__main__":
Parser = argparse.argumentparser(description="download and extract urls from warcs in an internet archive collection.")
Parser.add_argument("collection_id", help="the collection identifier to process.")
Parser.add_argument("output_dir", help="the directory to download warcs and store the output file.")
Args = parser.parse_args()

Os.makedirs(args.output_dir, exist_ok=true)
Process_warcs(args.collection_id, args.output_dir)