| extern crate regex; |
| extern crate flate2; |
| extern crate zstd; |
| extern crate warc; |
| extern crate rayon; |
| |
| use std::fs; |
| use std::io::{BufRead, Write}; |
| use std::path::Path; |
| use std::time::Duration; |
| use regex::Regex; |
| use flate2::read::GzDecoder; |
| use zstd::stream::read::Decoder; |
| use warc::Record; |
| use rayon::prelude::*; |
| |
| fn read_skippable_frame(file: &mut fs::File) -> Option<Vec<u8>> { |
| let mut magic_number = [0; 4]; |
| if file.read_exact(&mut magic_number).is_err() || magic_number != [40, 181, 47, 253] { |
| return None; |
| } |
| let mut frame_size = [0; 4]; |
| file.read_exact(&mut frame_size).unwrap(); |
| let frame_size = u32::from_le_bytes(frame_size) as usize; |
| let mut frame = vec![0; frame_size]; |
| file.read_exact(&mut frame).unwrap(); |
| Some(frame) |
| } |
| |
| fn process_warc(file_path: &str, output_file_path: &str, pattern: &Regex) { |
| println!("Processing file: {}", file_path); |
| let mut counter = 0; |
| let mut matches_buffer = Vec::new(); |
| |
| let warc_stream = match Path::new(file_path).extension().and_then(|s| s.to_str()) { |
| Some("gz") => Box::new(BufReader::new(GzDecoder::new(fs::File::open(file_path).unwrap()))) as Box<dyn BufRead>, |
| Some("zst") => { |
| let mut raw_file = fs::File::open(file_path).unwrap(); |
| let dict_data = read_skippable_frame(&mut raw_file).unwrap(); |
| let dctx = zstd::Decoder::with_dictionary(raw_file, &dict_data).unwrap(); |
| Box::new(BufReader::new(dctx)) as Box<dyn BufRead> |
| }, |
| _ => Box::new(fs::File::open(file_path).unwrap()) as Box<dyn BufRead>, |
| }; |
| |
| let mut warc_reader = warc::Reader::new(warc_stream); |
| while let Some(result) = warc_reader.next() { |
| let record = result.unwrap(); |
| if record.header().record_type() == Some(Record::RESPONSE) { |
| let content = String::from_utf8_lossy(record.content()); |
| let matches: Vec<String> = pattern.find_iter(&content).map(|m| m.as_str().to_string()).collect(); |
| matches_buffer.extend(matches); |
| counter += matches.len(); |
| |
| if counter % 100 == 0 { |
| let mut output_file = fs::OpenOptions::new() |
| .append(true) |
| .create(true) |
| .open(output_file_path) |
| .unwrap(); |
| writeln!(output_file, "{}", matches_buffer.join("\n")).unwrap(); |
| matches_buffer.clear(); |
| } |
| } |
| } |
| if !matches_buffer.is_empty() { |
| let mut output_file = fs::OpenOptions::new() |
| .append(true) |
| .create(true) |
| .open(output_file_path) |
| .unwrap(); |
| writeln!(output_file, "{}", matches_buffer.join("\n")).unwrap(); |
| } |
| |
| println!("Removing file: {}", file_path); |
| fs::remove_file(file_path).unwrap(); |
| } |
| |
| fn process_warcs_in_directory(warc_directory: &str, output_file_path: &str, pattern: &Regex) { |
| loop { |
| let file_names: Vec<String> = fs::read_dir(warc_directory) |
| .unwrap() |
| .filter_map(|entry| { |
| let entry = entry.unwrap(); |
| let file_name = entry.file_name().into_string().unwrap(); |
| if file_name.ends_with(".warc.gz") || file_name.ends_with(".warc.zst") { |
| Some(file_name) |
| } else { |
| None |
| } |
| }) |
| .collect(); |
| |
| file_names.par_iter().for_each(|file_name| { |
| let file_path = format!("{}/{}", warc_directory, file_name); |
| process_warc(&file_path, output_file_path, pattern); |
| }); |
| |
| println!("Sleeping 60s"); |
| std::thread::sleep(Duration::from_secs(60)); |
| } |
| } |
| fn main() { |
| let args: Vec<String> = env::args().collect(); |
| if args.len() < 3 { |
| println!("Usage: {} <warc_directory> <output_file_path>", args[0]); |
| return; |
| } |
| |
| let warc_directory = &args[1]; |
| let output_file_path = &args[2]; |
| let regex_pattern = r"\S*imgur\S*"; |
| let pattern = Regex::new(regex_pattern).unwrap(); |
| |
| process_warcs_in_directory(warc_directory, output_file_path, &pattern); |
| } |