extern crate regex; extern crate flate2; extern crate zstd; extern crate warc; extern crate rayon; use std::fs; use std::io::{BufRead, Write}; use std::path::Path; use std::time::Duration; use regex::Regex; use flate2::read::GzDecoder; use zstd::stream::read::Decoder; use warc::Record; use rayon::prelude::*; fn read_skippable_frame(file: &mut fs::File) -> Option> { let mut magic_number = [0; 4]; if file.read_exact(&mut magic_number).is_err() || magic_number != [40, 181, 47, 253] { return None; } let mut frame_size = [0; 4]; file.read_exact(&mut frame_size).unwrap(); let frame_size = u32::from_le_bytes(frame_size) as usize; let mut frame = vec![0; frame_size]; file.read_exact(&mut frame).unwrap(); Some(frame) } fn process_warc(file_path: &str, output_file_path: &str, pattern: &Regex) { println!("Processing file: {}", file_path); let mut counter = 0; let mut matches_buffer = Vec::new(); let warc_stream = match Path::new(file_path).extension().and_then(|s| s.to_str()) { Some("gz") => Box::new(BufReader::new(GzDecoder::new(fs::File::open(file_path).unwrap()))) as Box, Some("zst") => { let mut raw_file = fs::File::open(file_path).unwrap(); let dict_data = read_skippable_frame(&mut raw_file).unwrap(); let dctx = zstd::Decoder::with_dictionary(raw_file, &dict_data).unwrap(); Box::new(BufReader::new(dctx)) as Box }, _ => Box::new(fs::File::open(file_path).unwrap()) as Box, }; let mut warc_reader = warc::Reader::new(warc_stream); while let Some(result) = warc_reader.next() { let record = result.unwrap(); if record.header().record_type() == Some(Record::RESPONSE) { let content = String::from_utf8_lossy(record.content()); let matches: Vec = pattern.find_iter(&content).map(|m| m.as_str().to_string()).collect(); matches_buffer.extend(matches); counter += matches.len(); if counter % 100 == 0 { let mut output_file = fs::OpenOptions::new() .append(true) .create(true) .open(output_file_path) .unwrap(); writeln!(output_file, "{}", matches_buffer.join("\n")).unwrap(); matches_buffer.clear(); } } } if !matches_buffer.is_empty() { let mut output_file = fs::OpenOptions::new() .append(true) .create(true) .open(output_file_path) .unwrap(); writeln!(output_file, "{}", matches_buffer.join("\n")).unwrap(); } println!("Removing file: {}", file_path); fs::remove_file(file_path).unwrap(); } fn process_warcs_in_directory(warc_directory: &str, output_file_path: &str, pattern: &Regex) { loop { let file_names: Vec = fs::read_dir(warc_directory) .unwrap() .filter_map(|entry| { let entry = entry.unwrap(); let file_name = entry.file_name().into_string().unwrap(); if file_name.ends_with(".warc.gz") || file_name.ends_with(".warc.zst") { Some(file_name) } else { None } }) .collect(); file_names.par_iter().for_each(|file_name| { let file_path = format!("{}/{}", warc_directory, file_name); process_warc(&file_path, output_file_path, pattern); }); println!("Sleeping 60s"); std::thread::sleep(Duration::from_secs(60)); } } fn main() { let args: Vec = env::args().collect(); if args.len() < 3 { println!("Usage: {} ", args[0]); return; } let warc_directory = &args[1]; let output_file_path = &args[2]; let regex_pattern = r"\S*imgur\S*"; let pattern = Regex::new(regex_pattern).unwrap(); process_warcs_in_directory(warc_directory, output_file_path, &pattern); }