Import argparse
Import internetarchive as ia
Import io
Import os
Import struct
From concurrent.futures import threadpoolexecutor, as_completed
From warcio.archiveiterator import archiveiterator
Import zstandard as zstd
Import re
Def list_warcs(collection_id):
Warcs = []
For result in ia.search_items(f"collection:{collection_id}"):
Item = ia.get_item(result['identifier'])
For file in item.files:
File_name = file['name']
If file_name.endswith('.warc.gz') or file_name.endswith('.warc') or file_name.endswith('.warc.zst'):
Warcs.append((result['identifier'], file_name))
Return warcs
Def download_warc(collection_id, identifier, file_name, output_dir):
Output_path = os.path.join(output_dir, os.path.basename(file_name))
Ia.download(identifier, files=[file_name], destdir=output_dir, no_directory=true, retries=3)
Return output_path
Def read_skippable_frame(stream):
Frame_header = stream.read(4)
If len(frame_header) != 4:
Return none
Frame_size = struct.unpack('<i', frame_header)[0]
If frame_size & 0x80000000:
Frame_data = stream.read(frame_size & 0x7fffffff)
Return frame_data
Else:
Stream.seek(-4, io.seek_cur)
Return none
Def extract_urls(file_path):
Urls = []
# regex pattern to match the desired urls
Url_pattern = re.compile(r'https?://[^/]*zippyshare\.com(?::[0-9]*)?/[\s]*')
With open(file_path, 'rb') as stream:
If file_path.endswith('.warc.zst'):
Dictionary_data = read_skippable_frame(stream)
Dctx = zstd.zstddecompressor(dict_data=zstd.zstddecompressiondict(dictionary_data))
Stream = dctx.stream_reader(stream)
For record in archiveiterator(stream):
If record.rec_type == 'response':
Content_type = record.http_headers.get_header('content-type', '')
If content_type.startswith('text/html'):
Content = record.content_stream().read().decode('utf-8', errors='ignore')
# find all urls in the html content
Found_urls = url_pattern.findall(content)
Urls.extend(found_urls)
Return urls
Def process_warcs(collection_id, output_dir, max_workers=4):
Warcs = list_warcs(collection_id)
Def download_and_process(identifier, file_name, output_dir):
Warc_file = download_warc(collection_id, identifier, file_name, output_dir)
Urls = extract_urls(warc_file)
Os.remove(warc_file)
Return urls
With threadpoolexecutor(max_workers=max_workers) as executor:
Tasks = {executor.submit(download_and_process, identifier, file_name, output_dir): (identifier, file_name) for
Identifier, file_name in warcs}
With open('output', 'w') as output_file:
For task in as_completed(tasks):
Urls = task.result()
For url in urls:
Output_file.write(f"{url}\n")
If __name__ == "__main__":
Parser = argparse.argumentparser(description='download and process warc files from the internet archive')
Parser.add_argument('collection_id', help='the id of the collection to process')
Parser.add_argument('--output_dir', default='warcs', help='the directory to store the downloaded warc files')
Args = parser.parse_args()
If not os.path.exists(args.output_dir):
Os.makedirs(args.output_dir)
Process_warcs(args.collection_id, args.output_dir)