extern crate regex;
extern crate flate2;
extern crate zstd;
extern crate warc;
extern crate rayon;
use std::fs;
use std::io::{BufRead, Write};
use std::path::Path;
use std::time::Duration;
use regex::Regex;
use flate2::read::GzDecoder;
use zstd::stream::read::Decoder;
use warc::Record;
use rayon::prelude::*;
fn read_skippable_frame(file: &mut fs::File) -> Option<Vec<u8>> {
let mut magic_number = [0; 4];
if file.read_exact(&mut magic_number).is_err() || magic_number != [40, 181, 47, 253] {
return None;
}
let mut frame_size = [0; 4];
file.read_exact(&mut frame_size).unwrap();
let frame_size = u32::from_le_bytes(frame_size) as usize;
let mut frame = vec![0; frame_size];
file.read_exact(&mut frame).unwrap();
Some(frame)
}
fn process_warc(file_path: &str, output_file_path: &str, pattern: &Regex) {
println!("Processing file: {}", file_path);
let mut counter = 0;
let mut matches_buffer = Vec::new();
let warc_stream = match Path::new(file_path).extension().and_then(|s| s.to_str()) {
Some("gz") => Box::new(BufReader::new(GzDecoder::new(fs::File::open(file_path).unwrap()))) as Box<dyn BufRead>,
Some("zst") => {
let mut raw_file = fs::File::open(file_path).unwrap();
let dict_data = read_skippable_frame(&mut raw_file).unwrap();
let dctx = zstd::Decoder::with_dictionary(raw_file, &dict_data).unwrap();
Box::new(BufReader::new(dctx)) as Box<dyn BufRead>
},
_ => Box::new(fs::File::open(file_path).unwrap()) as Box<dyn BufRead>,
};
let mut warc_reader = warc::Reader::new(warc_stream);
while let Some(result) = warc_reader.next() {
let record = result.unwrap();
if record.header().record_type() == Some(Record::RESPONSE) {
let content = String::from_utf8_lossy(record.content());
let matches: Vec<String> = pattern.find_iter(&content).map(|m| m.as_str().to_string()).collect();
matches_buffer.extend(matches);
counter += matches.len();
if counter % 100 == 0 {
let mut output_file = fs::OpenOptions::new()
.append(true)
.create(true)
.open(output_file_path)
.unwrap();
writeln!(output_file, "{}", matches_buffer.join("\n")).unwrap();
matches_buffer.clear();
}
}
}
if !matches_buffer.is_empty() {
let mut output_file = fs::OpenOptions::new()
.append(true)
.create(true)
.open(output_file_path)
.unwrap();
writeln!(output_file, "{}", matches_buffer.join("\n")).unwrap();
}
println!("Removing file: {}", file_path);
fs::remove_file(file_path).unwrap();
}
fn process_warcs_in_directory(warc_directory: &str, output_file_path: &str, pattern: &Regex) {
loop {
let file_names: Vec<String> = fs::read_dir(warc_directory)
.unwrap()
.filter_map(|entry| {
let entry = entry.unwrap();
let file_name = entry.file_name().into_string().unwrap();
if file_name.ends_with(".warc.gz") || file_name.ends_with(".warc.zst") {
Some(file_name)
} else {
None
}
})
.collect();
file_names.par_iter().for_each(|file_name| {
let file_path = format!("{}/{}", warc_directory, file_name);
process_warc(&file_path, output_file_path, pattern);
});
println!("Sleeping 60s");
std::thread::sleep(Duration::from_secs(60));
}
}
fn main() {
let args: Vec<String> = env::args().collect();
if args.len() < 3 {
println!("Usage: {} <warc_directory> <output_file_path>", args[0]);
return;
}
let warc_directory = &args[1];
let output_file_path = &args[2];
let regex_pattern = r"\S*imgur\S*";
let pattern = Regex::new(regex_pattern).unwrap();
process_warcs_in_directory(warc_directory, output_file_path, &pattern);
}