use std::env; use std::fs::{self, File}; use std::io::{self, BufReader, BufWriter, Read, Write}; use std::path::Path; use std::sync::{Arc, Mutex}; use std::thread; use std::time::Duration; use regex::Regex; use warc::{Record, RecordType}; use flate2::read::GzDecoder; use zstd::stream::read::Decoder; use rayon::ThreadPoolBuilder; use rayon::prelude::*; fn main() -> Result<(), Box> { let args: Vec = env::args().collect(); if args.len() < 3 { eprintln!("Usage: "); std::process::exit(1); } let warc_directory = &args[1]; let output_file_path = &args[2]; let regex_pattern = r"\S*imgur\S*"; let pattern = Regex::new(regex_pattern)?; loop { process_warcs_in_directory(warc_directory, output_file_path, &pattern)?; thread::sleep(Duration::from_secs(60)); } } use zstd::dict::DecoderDictionary; fn read_skippable_frame(file: &mut File) -> Result>, Box> { let mut magic_number = [0; 4]; file.read_exact(&mut magic_number)?; if magic_number != [0x28, 0xB5, 0x2F, 0xFD] { return Ok(None); } let mut frame_size_bytes = [0; 4]; file.read_exact(&mut frame_size_bytes)?; let frame_size = u32::from_le_bytes(frame_size_bytes) as usize; let mut frame_data = vec![0; frame_size]; file.read_exact(&mut frame_data)?; Ok(Some(frame_data)) fn process_warcs_in_directory( warc_directory: &str, output_file_path: &str, pattern: &Regex, ) -> Result<(), Box> { let output_file = Arc::new(Mutex::new(BufWriter::new(File::create(output_file_path)?))); let paths: Vec<_> = fs::read_dir(warc_directory)? .filter_map(|entry| { let entry = entry.ok()?; let path = entry.path(); let file_name = path.file_name()?.to_str()?.to_string(); if file_name.ends_with(".warc.gz") || file_name.ends_with(".warc.zst") { Some(path) } else { None } }) .collect(); let thread_pool = ThreadPoolBuilder::new() .num_threads(12) .build() .unwrap(); thread_pool.install(|| { paths.par_iter().for_each(|path| { if let Err(e) = process_warc(path, &output_file, &pattern) { eprintln!("Error processing {}: {}", path.display(), e); } }); }); Ok(()) } fn process_warc( path: &Path, output_file: &Arc>>, pattern: &Regex, ) -> Result<(), Box> { let file = File::open(path)?; let mut warc_stream: Box = match path.extension().and_then(|os_str| os_str.to_str()) { Some("gz") => Box::new(BufReader::new(GzDecoder::new(file))), Some("zst") => { let mut file = BufReader::new(file); if let Some(dict_data) = read_skippable_frame(&mut file)? { let decoder = Decoder::with_dictionary(file, &DecoderDictionary::new(&dict_data)?)?; Box::new(BufReader::new(decoder)) } else { return Err("Failed to read zstd dictionary.".into()); } } _ => Box::new(BufReader::new(file)), }; println!("Processing file: {}", path.display()); let mut matches_buffer = Vec::new(); let mut record = Record::new(); while let Ok(_) = record.read_from(&mut warc_stream) { let current_position = warc_stream.stream_position()?; if current_position - last_update >= progress_update { let percentage = (current_position as f64 / total_length as f64) * 100.0; println!("Progress: {:.1}% ({}/{})", percentage, current_position, total_length); last_update = current_position; } if record.record_type() == Some(RecordType::Response) { let content = String::from_utf8_lossy(record.content()); for mat in pattern.find_iter(&content) { matches_buffer.push(mat.as_str().to_string()); if matches_buffer.len() >= 100 { let mut output_file = output_file.lock().unwrap(); for mat in &matches_buffer { writeln!(output_file, "{}", mat)?; } matches_buffer.clear(); } } } record.clear(); } if !matches_buffer.is_empty() { let mut output_file = output_file.lock().unwrap(); for mat in &matches_buffer { writeln!(output_file, "{}", mat)?; } } println!("Removing file: {}", path.display()); fs::remove_file(path)?; Ok(()) }