Import datetime
Import os
Import re
Import gzip
Import warcio
Import argparse
Import time
Import zstandard as zstd
From warcio.archiveiterator import archiveiterator
Import io
From concurrent.futures import processpoolexecutor
Def read_skippable_frame(file):
Magic_number = file.read(4)
If magic_number != b'\x28\xb5\x2f\xfd':
Return none
Frame_size = int.from_bytes(file.read(4), 'little')
Return file.read(frame_size)
Def process_warc(args):
File_path, output_file_path, pattern = args
Print(f"processing file: {file_path}")
Counter = 0
Matches_buffer = []
If file_path.endswith('.warc.gz'):
Warc_stream = gzip.open(file_path, 'rb')
Elif file_path.endswith('.warc.zst'):
With open(file_path, 'rb') as raw_file:
Dict_data = read_skippable_frame(raw_file)
Dctx = zstd.zstddecompressor(dict_data=dict_data)
Remaining_data = raw_file.read()
Warc_stream = io.bytesio(dctx.decompress(remaining_data))
Else:
Warc_stream = open(file_path, 'rb')
With warc_stream:
File_size = os.path.getsize(file_path)
Last_printed_progress = -2 # initialize to -5 so that it prints at 0% progress
For record in archiveiterator(warc_stream):
If record.rec_type == 'response':
Content = record.content_stream().read().decode(errors='replace')
Matches = pattern.findall(content)
For match in matches:
Matches_buffer.append(match)
Counter += 1
If counter % 100 == 0:
With open(output_file_path, 'a') as output_file:
Output_file.write('\n'.join(matches_buffer) + '\n')
Matches_buffer = []
# calculate and print progress every 2%
Progress = (warc_stream.tell() / file_size) * 100
If progress - last_printed_progress >= 2:
Print(f"{datetime.datetime.now().strftime('%y-%m-%d %h:%m:%s')}:progress on {file_path}: {progress:.2f}%")
Last_printed_progress = progress
# write the remaining matches
If matches_buffer:
With open(output_file_path, 'a') as output_file:
Output_file.write('\n'.join(matches_buffer) + '\n')
Print(f"removing file: {file_path}")
Os.remove(file_path) # remove the warc file after processing
Def process_warcs_in_directory(warc_directory, output_file_path, pattern):
While true:
With processpoolexecutor(max_workers=14) as executor:
For file_name in os.listdir(warc_directory):
If file_name.endswith(('.warc.gz', '.warc.zst')):
File_path = os.path.join(warc_directory, file_name)
Executor.submit(process_warc, (file_path, output_file_path, pattern))
Print("sleeping 60s")
Time.sleep(60) # wait for 60 seconds before checking the directory again
Def main(args):
Regex_pattern = r'\s*imgur\s*'
Pattern = re.compile(regex_pattern)
While true:
Process_warcs_in_directory(args.warc_directory, args.output_file_path, pattern)
If __name__ == '__main__':
Parser = argparse.argumentparser(description='process warc files and find matches for a regex pattern.')
Parser.add_argument('warc_directory', help='path to the directory containing warc files.')
Parser.add_argument('output_file_path', help='path to the output file.')
Args = parser.parse_args()
Main(args)