use std::env;
use std::fs::{self, File};
use std::io::{BufRead, BufReader, BufWriter, Read, Seek, Write};
use std::path::Path;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;

use regex::Regex;
use warc::{Record, RecordType, WarcHeader, WarcReader, RecordBuilder};
use flate2::read::GzDecoder;
use zstd::stream::read::Decoder;
use rayon::ThreadPoolBuilder;
use rayon::prelude::*;

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let args: Vec<String> = env::args().collect();
    if args.len() < 3 {
        eprintln!("Usage: <warc_directory> <output_file_path>");
        std::process::exit(1);
    }

    let warc_directory = &args[1];
    let output_file_path = &args[2];

    let regex_pattern = r"\S*imgur\S*";
    let pattern = Regex::new(regex_pattern)?;

    loop {
        process_warcs_in_directory(warc_directory, output_file_path, &pattern)?;
        thread::sleep(Duration::from_secs(60));
    }
}

fn read_skippable_frame<R: Read + Seek>(reader: &mut R) -> Result<Option<Vec<u8>>, Box<dyn std::error::Error>> {
    let mut magic_number = [0; 4];
    reader.read_exact(&mut magic_number)?;
    if magic_number != [0x28, 0xB5, 0x2F, 0xFD] {
        return Ok(None);
    }

    let mut frame_size_bytes = [0; 4];
    reader.read_exact(&mut frame_size_bytes)?;
    let frame_size = u32::from_le_bytes(frame_size_bytes) as usize;

    let mut frame_data = vec![0; frame_size];
    reader.read_exact(&mut frame_data)?;
    Ok(Some(frame_data))
}

fn process_warcs_in_directory(
    warc_directory: &str,
    output_file_path: &str,
    pattern: &Regex,
) -> Result<(), Box<dyn std::error::Error>> {
    let output_file = Arc::new(Mutex::new(BufWriter::new(File::create(output_file_path)?)));

    let paths: Vec<_> = fs::read_dir(warc_directory)?
        .filter_map(|entry| {
            let entry = entry.ok()?;
            let path = entry.path();
            let file_name = path.file_name()?.to_str()?.to_string();
            if file_name.ends_with(".warc.gz") || file_name.ends_with(".warc.zst") {
                Some(path)
            } else {
                None
            }
        })
        .collect();

    let thread_pool = ThreadPoolBuilder::new()
        .num_threads(12)
        .build()
        .unwrap();

    thread_pool.install(|| {
        paths.par_iter().for_each(|path| {
            if let Err(e) = process_warc(path, &output_file, &pattern) {
                eprintln!("Error processing {}: {}", path.display(), e);
            }
        });
    });

    Ok(())
}
fn process_warc(
    path: &Path,
    output_file: &Arc<Mutex<BufWriter<File>>>,
    pattern: &Regex,
) -> Result<(), Box<dyn std::error::Error>> {
    let file = File::open(path)?;
    let mut warc_stream: Box<dyn Read + Send> = if path.extension().unwrap() == "gz" {
        Box::new(GzDecoder::new(file))
    } else {
        let mut file = BufReader::new(file);
        if let Some(dict_data) = read_skippable_frame(&mut file)? {
            Box::new(Decoder::with_dictionary(file, &dict_data)?)
        } else {
            Box::new(file)
        }
    };

    let warc_reader = warc::WarcReader::new(BufReader::new(warc_stream));
	let mut record_builder = RecordBuilder::default();

	for result in warc_reader.iter_records() {
		match result {
			Ok((body)) => {
				let record = record_builder.clone().build();
				let is_response = record?.warc_type() == &warc::RecordType::from("response");

				if is_response {
					let content = String::from_utf8_lossy(&body.body());
					if pattern.is_match(&content) {
						let mut output_file = output_file.lock().unwrap();
						writeln!(output_file, "{}", content)?;
                }
            }
        }
        Err(e) => eprintln!("Error reading record: {}", e),
    }
}

    Ok(())
}