import datetime import os import re import gzip import warcio import argparse import time import zstandard as zstd from warcio.archiveiterator import ArchiveIterator import io from concurrent.futures import ProcessPoolExecutor def read_skippable_frame(file): magic_number = file.read(4) if magic_number != b'\x28\xB5\x2F\xFD': return None frame_size = int.from_bytes(file.read(4), 'little') return file.read(frame_size) def process_warc(args): file_path, output_file_path, pattern = args print(f"Processing file: {file_path}") counter = 0 matches_buffer = [] if file_path.endswith('.warc.gz'): warc_stream = gzip.open(file_path, 'rb') elif file_path.endswith('.warc.zst'): with open(file_path, 'rb') as raw_file: dict_data = read_skippable_frame(raw_file) dctx = zstd.ZstdDecompressor(dict_data=dict_data) remaining_data = raw_file.read() warc_stream = io.BytesIO(dctx.decompress(remaining_data)) else: warc_stream = open(file_path, 'rb') with warc_stream: file_size = os.path.getsize(file_path) last_printed_progress = -2 # Initialize to -5 so that it prints at 0% progress for record in ArchiveIterator(warc_stream): if record.rec_type == 'response': content = record.content_stream().read().decode(errors='replace') matches = pattern.findall(content) for match in matches: matches_buffer.append(match) counter += 1 if counter % 100 == 0: with open(output_file_path, 'a') as output_file: output_file.write('\n'.join(matches_buffer) + '\n') matches_buffer = [] # Calculate and print progress every 2% progress = (warc_stream.tell() / file_size) * 100 if progress - last_printed_progress >= 2: print(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}:Progress on {file_path}: {progress:.2f}%") last_printed_progress = progress # Write the remaining matches if matches_buffer: with open(output_file_path, 'a') as output_file: output_file.write('\n'.join(matches_buffer) + '\n') print(f"Removing file: {file_path}") os.remove(file_path) # Remove the WARC file after processing def process_warcs_in_directory(warc_directory, output_file_path, pattern): while True: with ProcessPoolExecutor(max_workers=14) as executor: for file_name in os.listdir(warc_directory): if file_name.endswith(('.warc.gz', '.warc.zst')): file_path = os.path.join(warc_directory, file_name) executor.submit(process_warc, (file_path, output_file_path, pattern)) print("Sleeping 60s") time.sleep(60) # Wait for 60 seconds before checking the directory again def main(args): regex_pattern = r'\S*imgur\S*' pattern = re.compile(regex_pattern) while True: process_warcs_in_directory(args.warc_directory, args.output_file_path, pattern) if __name__ == '__main__': parser = argparse.ArgumentParser(description='Process WARC files and find matches for a regex pattern.') parser.add_argument('warc_directory', help='Path to the directory containing WARC files.') parser.add_argument('output_file_path', help='Path to the output file.') args = parser.parse_args() main(args)