use std::env;
use std::fs::{self, File};
use std::io::{BufRead, BufReader, BufWriter, Read, Seek, Write};
use std::path::Path;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
use regex::Regex;
use warc::{Record, RecordType, WarcHeader, WarcReader, RecordBuilder};
use flate2::read::GzDecoder;
use zstd::stream::read::Decoder;
use rayon::ThreadPoolBuilder;
use rayon::prelude::*;
fn main() -> Result<(), Box<dyn std::error::Error>> {
let args: Vec<String> = env::args().collect();
if args.len() < 3 {
eprintln!("Usage: <warc_directory> <output_file_path>");
std::process::exit(1);
}
let warc_directory = &args[1];
let output_file_path = &args[2];
let regex_pattern = r"\S*imgur\S*";
let pattern = Regex::new(regex_pattern)?;
loop {
process_warcs_in_directory(warc_directory, output_file_path, &pattern)?;
thread::sleep(Duration::from_secs(60));
}
}
fn read_skippable_frame<R: Read + Seek>(reader: &mut R) -> Result<Option<Vec<u8>>, Box<dyn std::error::Error>> {
let mut magic_number = [0; 4];
reader.read_exact(&mut magic_number)?;
if magic_number != [0x28, 0xB5, 0x2F, 0xFD] {
return Ok(None);
}
let mut frame_size_bytes = [0; 4];
reader.read_exact(&mut frame_size_bytes)?;
let frame_size = u32::from_le_bytes(frame_size_bytes) as usize;
let mut frame_data = vec![0; frame_size];
reader.read_exact(&mut frame_data)?;
Ok(Some(frame_data))
}
fn process_warcs_in_directory(
warc_directory: &str,
output_file_path: &str,
pattern: &Regex,
) -> Result<(), Box<dyn std::error::Error>> {
let output_file = Arc::new(Mutex::new(BufWriter::new(File::create(output_file_path)?)));
let paths: Vec<_> = fs::read_dir(warc_directory)?
.filter_map(|entry| {
let entry = entry.ok()?;
let path = entry.path();
let file_name = path.file_name()?.to_str()?.to_string();
if file_name.ends_with(".warc.gz") || file_name.ends_with(".warc.zst") {
Some(path)
} else {
None
}
})
.collect();
let thread_pool = ThreadPoolBuilder::new()
.num_threads(12)
.build()
.unwrap();
thread_pool.install(|| {
paths.par_iter().for_each(|path| {
if let Err(e) = process_warc(path, &output_file, &pattern) {
eprintln!("Error processing {}: {}", path.display(), e);
}
});
});
Ok(())
}
fn process_warc(
path: &Path,
output_file: &Arc<Mutex<BufWriter<File>>>,
pattern: &Regex,
) -> Result<(), Box<dyn std::error::Error>> {
let file = File::open(path)?;
let mut warc_stream: Box<dyn Read + Send> = if path.extension().unwrap() == "gz" {
Box::new(GzDecoder::new(file))
} else {
let mut file = BufReader::new(file);
if let Some(dict_data) = read_skippable_frame(&mut file)? {
Box::new(Decoder::with_dictionary(file, &dict_data)?)
} else {
Box::new(file)
}
};
let warc_reader = warc::WarcReader::new(BufReader::new(warc_stream));
let mut record_builder = RecordBuilder::default();
for result in warc_reader.iter_raw_records() {
match result {
Ok((raw_record_header, raw_body)) => {
let record = record_builder.build_raw();
let is_response = record.warc_type() == &warc::RecordType::from("response");
if is_response {
let content = String::from_utf8_lossy(&raw_body);
if pattern.is_match(&content) {
let mut output_file = output_file.lock().unwrap();
writeln!(output_file, "{}", content)?;
}
}
}
Err(e) => eprintln!("Error reading record: {}", e),
}
}
Ok(())
}