Use std::env;
Use std::fs::{self, file};
Use std::io::{bufread, bufreader, bufwriter, read, seek, write};
Use std::path::path;
Use std::sync::{arc, mutex};
Use std::thread;
Use std::time::duration;
Use regex::regex;
Use warc::{record, recordtype, warcheader, warcreader, recordbuilder};
Use flate2::read::gzdecoder;
Use zstd::stream::read::decoder;
Use rayon::threadpoolbuilder;
Use rayon::prelude::*;
Fn main() -> result<(), box<dyn std::error::error>> {
Let args: vec<string> = env::args().collect();
If args.len() < 3 {
Eprintln!("usage: <warc_directory> <output_file_path>");
Std::process::exit(1);
}
Let warc_directory = &args[1];
Let output_file_path = &args[2];
Let regex_pattern = r"\s*imgur\s*";
Let pattern = regex::new(regex_pattern)?;
Loop {
Process_warcs_in_directory(warc_directory, output_file_path, &pattern)?;
Thread::sleep(duration::from_secs(60));
}
}
Fn read_skippable_frame<r: read + seek>(reader: &mut r) -> result<option<vec<u8>>, box<dyn std::error::error>> {
Let mut magic_number = [0; 4];
Reader.read_exact(&mut magic_number)?;
If magic_number != [0x28, 0xb5, 0x2f, 0xfd] {
Return ok(none);
}
Let mut frame_size_bytes = [0; 4];
Reader.read_exact(&mut frame_size_bytes)?;
Let frame_size = u32::from_le_bytes(frame_size_bytes) as usize;
Let mut frame_data = vec![0; frame_size];
Reader.read_exact(&mut frame_data)?;
Ok(some(frame_data))
}
Fn process_warcs_in_directory(
Warc_directory: &str,
Output_file_path: &str,
Pattern: ®ex,
) -> result<(), box<dyn std::error::error>> {
Let output_file = arc::new(mutex::new(bufwriter::new(file::create(output_file_path)?)));
Let paths: vec<_> = fs::read_dir(warc_directory)?
.filter_map(|entry| {
Let entry = entry.ok()?;
Let path = entry.path();
Let file_name = path.file_name()?.to_str()?.to_string();
If file_name.ends_with(".warc.gz") || file_name.ends_with(".warc.zst") {
Some(path)
} else {
None
}
})
.collect();
Let thread_pool = threadpoolbuilder::new()
.num_threads(12)
.build()
.unwrap();
Thread_pool.install(|| {
Paths.par_iter().for_each(|path| {
If let err(e) = process_warc(path, &output_file, &pattern) {
Eprintln!("error processing {}: {}", path.display(), e);
}
});
});
Ok(())
}
Fn process_warc(
Path: &path,
Output_file: &arc<mutex<bufwriter<file>>>,
Pattern: ®ex,
) -> result<(), box<dyn std::error::error>> {
Let file = file::open(path)?;
Let mut warc_stream: box<dyn read + send> = if path.extension().unwrap() == "gz" {
Box::new(gzdecoder::new(file))
} else {
Let mut file = bufreader::new(file);
If let some(dict_data) = read_skippable_frame(&mut file)? {
Box::new(decoder::with_dictionary(file, &dict_data)?)
} else {
Box::new(file)
}
};
Let warc_reader = warc::warcreader::new(bufreader::new(warc_stream));
Let mut record_builder = recordbuilder::default();
For result in warc_reader.iter_records() {
Match result {
Ok((record_header, body)) => {
Let record = record_builder.build();
Let is_response = record.warc_type() == &warc::recordtype::from("response");
If is_response {
Let content = string::from_utf8_lossy(&raw_body);
If pattern.is_match(&content) {
Let mut output_file = output_file.lock().unwrap();
Writeln!(output_file, "{}", content)?;
}
}
}
Err(e) => eprintln!("error reading record: {}", e),
}
}
Ok(())
}