extern crate regex;
extern crate flate2;
extern crate zstd;
extern crate warc;
extern crate rayon;

use std::fs;
use std::io::{BufRead, Write};
use std::path::Path;
use std::time::Duration;
use regex::Regex;
use flate2::read::GzDecoder;
use zstd::stream::read::Decoder;
use warc::Record;
use rayon::prelude::*;

fn read_skippable_frame(file: &mut fs::File) -> Option<Vec<u8>> {
    let mut magic_number = [0; 4];
    if file.read_exact(&mut magic_number).is_err() || magic_number != [40, 181, 47, 253] {
        return None;
    }
    let mut frame_size = [0; 4];
    file.read_exact(&mut frame_size).unwrap();
    let frame_size = u32::from_le_bytes(frame_size) as usize;
    let mut frame = vec![0; frame_size];
    file.read_exact(&mut frame).unwrap();
    Some(frame)
}

fn process_warc(file_path: &str, output_file_path: &str, pattern: &Regex) {
    println!("Processing file: {}", file_path);
    let mut counter = 0;
    let mut matches_buffer = Vec::new();

    let warc_stream = match Path::new(file_path).extension().and_then(|s| s.to_str()) {
        Some("gz") => Box::new(BufReader::new(GzDecoder::new(fs::File::open(file_path).unwrap()))) as Box<dyn BufRead>,
        Some("zst") => {
            let mut raw_file = fs::File::open(file_path).unwrap();
            let dict_data = read_skippable_frame(&mut raw_file).unwrap();
            let dctx = zstd::Decoder::with_dictionary(raw_file, &dict_data).unwrap();
            Box::new(BufReader::new(dctx)) as Box<dyn BufRead>
        },
        _ => Box::new(fs::File::open(file_path).unwrap()) as Box<dyn BufRead>,
    };

    let mut warc_reader = warc::Reader::new(warc_stream);
    while let Some(result) = warc_reader.next() {
        let record = result.unwrap();
        if record.header().record_type() == Some(Record::RESPONSE) {
            let content = String::from_utf8_lossy(record.content());
            let matches: Vec<String> = pattern.find_iter(&content).map(|m| m.as_str().to_string()).collect();
            matches_buffer.extend(matches);
            counter += matches.len();

            if counter % 100 == 0 {
                let mut output_file = fs::OpenOptions::new()
                    .append(true)
                    .create(true)
                    .open(output_file_path)
                    .unwrap();
                writeln!(output_file, "{}", matches_buffer.join("\n")).unwrap();
                matches_buffer.clear();
            }
        }
    }
    if !matches_buffer.is_empty() {
        let mut output_file = fs::OpenOptions::new()
            .append(true)
            .create(true)
            .open(output_file_path)
            .unwrap();
        writeln!(output_file, "{}", matches_buffer.join("\n")).unwrap();
    }

    println!("Removing file: {}", file_path);
    fs::remove_file(file_path).unwrap();
}

fn process_warcs_in_directory(warc_directory: &str, output_file_path: &str, pattern: &Regex) {
    loop {
        let file_names: Vec<String> = fs::read_dir(warc_directory)
            .unwrap()
            .filter_map(|entry| {
                let entry = entry.unwrap();
                let file_name = entry.file_name().into_string().unwrap();
                if file_name.ends_with(".warc.gz") || file_name.ends_with(".warc.zst") {
                    Some(file_name)
                } else {
                    None
                }
            })
            .collect();

        file_names.par_iter().for_each(|file_name| {
            let file_path = format!("{}/{}", warc_directory, file_name);
            process_warc(&file_path, output_file_path, pattern);
        });

        println!("Sleeping 60s");
        std::thread::sleep(Duration::from_secs(60));
    }
}
fn main() {
    let args: Vec<String> = env::args().collect();
    if args.len() < 3 {
        println!("Usage: {} <warc_directory> <output_file_path>", args[0]);
        return;
    }

    let warc_directory = &args[1];
    let output_file_path = &args[2];
    let regex_pattern = r"\S*imgur\S*";
    let pattern = Regex::new(regex_pattern).unwrap();

    process_warcs_in_directory(warc_directory, output_file_path, &pattern);
}