diff --git a/Cargo.toml b/Cargo.toml index ba4d450..f2f5c99 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,7 @@ path = "src/main.rs" ## core clap = { version = "4", features = ["derive"]} chrono = "0.4.40" +csv = "1.3.0" kbo = "0.5.0" log = "0.4.20" needletail = { version = "0.6.0", default-features = false, features = ["flate2"] } diff --git a/src/cli.rs b/src/cli.rs index 63f0d68..01d3f5a 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -31,8 +31,11 @@ pub enum Commands { #[arg(group = "input", required = true)] seq_files: Vec, + #[arg(short = 'l', long = "input-list", group = "input", required = true, help_heading = "Input")] + input_list: Option, + // Outputs - #[arg(short = 'o', long = "output-prefix", required = false, help_heading = "Output")] + #[arg(short = 'o', long = "output-prefix", required = true, help_heading = "Output")] output_prefix: Option, // Build parameters @@ -65,16 +68,20 @@ pub enum Commands { // Call variants in query relative to a reference Call{ - // Input fasta or fastq query file(s) + // Inputs + // // Input fasta or fastq query file(s) #[arg(group = "input", required = true)] query_file: PathBuf, - - // Reference fasta or fastq file + // // Reference fasta or fastq file #[arg(long = "reference", short = 'r', required = true, help_heading = "Input")] ref_file: PathBuf, + // Outputs + #[arg(short = 'o', long = "output", required = false, help_heading = "Output")] + output_file: Option, + // Upper bound for random match probability - #[arg(long = "max-error-prob", default_value_t = 0.0000001, help_heading = "Algorithm")] + #[arg(long = "max-error-prob", default_value_t = 0.00000001, help_heading = "Algorithm")] max_error_prob: f64, // Resources @@ -99,9 +106,18 @@ pub enum Commands { // Find indexed k-mers in a query Find { - // Input fasta or fastq query file(s) + // Inputs + // // Input fasta or fastq query file(s) #[arg(group = "input", required = true)] query_files: Vec, + // // Input list + #[arg(short = 'l', long = "input-list", group = "input", required = true, help_heading = "Input")] + input_list: Option, + + // Output options + // // Output file + #[arg(short = 'o', long = "output", required = false, help_heading = "Output")] + output_file: Option, // Reference // // Sequence file @@ -160,10 +176,18 @@ pub enum Commands { query_files: Vec, // Input options + // // Input list + #[arg(short = 'l', long = "input-list", group = "input", required = true, help_heading = "Input")] + input_list: Option, // // Reference file #[arg(short = 'r', long = "reference", required = true, help_heading = "Input")] ref_file: String, + // Output options + // // Output file + #[arg(short = 'o', long = "output", required = false, help_heading = "Output")] + output_file: Option, + // Parameters // // Upper bound for random match probability #[arg(long = "max-error-prob", default_value_t = 0.0000001, help_heading = "Algorithm")] diff --git a/src/main.rs b/src/main.rs index c0cacbe..330ad13 100644 --- a/src/main.rs +++ b/src/main.rs @@ -12,6 +12,7 @@ // at your option. // use std::io::Write; +use std::path::PathBuf; use clap::Parser; use log::info; @@ -52,6 +53,33 @@ fn read_fastx_file( seq_data } +fn read_input_list( + input_list_file: &String, + delimiter: u8 +) -> Vec<(String, PathBuf)> { + let fs = match std::fs::File::open(input_list_file) { + Ok(fs) => fs, + Err(e) => panic!(" Error in reading --input-list: {}", e), + }; + + let mut reader = csv::ReaderBuilder::new() + .delimiter(delimiter) + .has_headers(false) + .from_reader(fs); + + reader.records().map(|line| { + if let Ok(record) = line { + if record.len() > 1 { + (record[0].to_string(), PathBuf::from(record[1].to_string())) + } else { + (record[0].to_string(), PathBuf::from(record[0].to_string())) + } + } else { + panic!(" Error in reading --input-list: {}", input_list_file); + } + }).collect::>() +} + /// Initializes the logger with verbosity given in `log_max_level`. fn init_log(log_max_level: usize) { stderrlog::new() @@ -86,43 +114,53 @@ fn main() { // Subcommands: match &cli.command { Some(cli::Commands::Build { - seq_files, - output_prefix, + seq_files, + input_list, + output_prefix, kmer_size, - prefix_precalc, - dedup_batches, - num_threads, - mem_gb, - temp_dir, - verbose, + prefix_precalc, + dedup_batches, + num_threads, + mem_gb, + temp_dir, + verbose, }) => { - init_log(if *verbose { 2 } else { 1 }); + init_log(if *verbose { 2 } else { 1 }); let mut sbwt_build_options = kbo::BuildOpts::default(); - sbwt_build_options.k = *kmer_size; - sbwt_build_options.num_threads = *num_threads; - sbwt_build_options.prefix_precalc = *prefix_precalc; - sbwt_build_options.dedup_batches = *dedup_batches; - sbwt_build_options.mem_gb = *mem_gb; - sbwt_build_options.temp_dir = temp_dir.clone(); + sbwt_build_options.k = *kmer_size; + sbwt_build_options.num_threads = *num_threads; + sbwt_build_options.prefix_precalc = *prefix_precalc; + sbwt_build_options.dedup_batches = *dedup_batches; + sbwt_build_options.mem_gb = *mem_gb; + sbwt_build_options.temp_dir = temp_dir.clone(); - info!("Building SBWT index from {} files...", seq_files.len()); - let mut seq_data: Vec> = Vec::new(); - seq_files.iter().for_each(|file| { - seq_data.append(&mut read_fastx_file(file).into_iter().map(|(_, seq)| seq).collect::>>()); - }); + let mut in_files = seq_files.clone(); + if let Some(list) = input_list { + let contents = read_input_list(list, b'\t'); + let contents_iter = contents.iter().map(|(_, path)| path.to_str().unwrap().to_string()); + in_files.extend(contents_iter); + } + + info!("Building SBWT index from {} files...", in_files.len()); + let mut seq_data: Vec> = Vec::new(); + in_files.iter().for_each(|file| { + seq_data.append(&mut read_fastx_file(file).into_iter().map(|(_, seq)| seq).collect::>>()); + }); - let (sbwt, lcs) = kbo::build(&seq_data, sbwt_build_options); + let (sbwt, lcs) = kbo::build(&seq_data, sbwt_build_options); - info!("Serializing SBWT index to {}.sbwt ...", output_prefix.as_ref().unwrap()); - info!("Serializing LCS array to {}.lcs ...", output_prefix.as_ref().unwrap()); - kbo::index::serialize_sbwt(output_prefix.as_ref().unwrap(), &sbwt, &lcs); + info!("Serializing SBWT index to {}.sbwt ...", output_prefix.as_ref().unwrap()); + info!("Serializing LCS array to {}.lcs ...", output_prefix.as_ref().unwrap()); + kbo::index::serialize_sbwt(output_prefix.as_ref().unwrap(), &sbwt, &lcs); - }, + }, Some(cli::Commands::Find { query_files, + input_list, ref_file, + output_file, index_prefix, detailed, min_len, @@ -187,12 +225,33 @@ fn main() { .build() .unwrap(); + + let mut in_files: Vec<(String, PathBuf)> = query_files.iter().map(|file| (file.clone(), PathBuf::from(file))).collect(); + if let Some(list) = input_list { + let mut contents = read_input_list(list, b'\t'); + in_files.append(&mut contents); + } + + let mut ofs = if output_file.is_some() { + let ofs = match std::fs::File::create(output_file.as_ref().unwrap()) { + Ok(file) => file, + Err(e) => panic!(" Error in opening --output: {}", e), + }; + Some(ofs) + } else { + None + }; + info!("Querying SBWT index..."); - println!("query\tref\tq.start\tq.end\tstrand\tlength\tmismatches\tgap_bases\tgap_opens\tidentity\tcoverage\tquery.contig\tref.contig"); - let stdout = std::io::stdout(); - query_files.iter().for_each(|file| { + if let Some(ofs) = &mut ofs { + let _ = ofs.write(b"query\tref\tq.start\tq.end\tstrand\tlength\tmismatches\tgap_bases\tgap_opens\tidentity\tcoverage\tquery.contig\tref.contig"); + } else { + let stdout = std::io::stdout(); + let _ = stdout.lock().write(b"query\tref\tq.start\tq.end\tstrand\tlength\tmismatches\tgap_bases\tgap_opens\tidentity\tcoverage\tquery.contig\tref.contig"); + } + in_files.iter().for_each(|(file, path)| { let mut run_lengths: Vec<(kbo::format::RLE, char, String, String, usize, usize)> = indexes.par_iter().map(|((sbwt, lcs), ref_contig, ref_bases)| { - let mut reader = needletail::parse_fastx_file(file).expect("valid path/file"); + let mut reader = needletail::parse_fastx_file(path).ok().unwrap(); let mut res: Vec<(kbo::format::RLE, char, String, String, usize, usize)> = Vec::new(); while let Some(seqrec) = read_from_fastx_parser(&mut *reader) { let query_contig = std::str::from_utf8(seqrec.id()).expect("UTF-8"); @@ -227,26 +286,45 @@ fn main() { let aln_end = if *strand == '+' { aln.end } else { query_bases - aln.start }; let coverage = (aln.matches as f64 + aln.mismatches as f64)/(*ref_bases as f64) * 100_f64; let identity = (aln.matches as f64)/(aln_len as f64) * 100_f64; - let _ = writeln!(&mut stdout.lock(), - "{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{:.2}\t{:.2}\t{}\t{}", - file, ref_file.clone().unwrap(), - aln_start, - aln_end, - strand, - aln.end - aln.start, - aln.mismatches, - aln.gap_bases, - aln.gap_opens, - identity, - coverage, - query_contig, - ref_contig); + if ofs.is_some() { + let _ = writeln!(&mut ofs.as_ref().unwrap(), + "{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{:.2}\t{:.2}\t{}\t{}", + file, ref_file.clone().unwrap(), + aln_start, + aln_end, + strand, + aln.end - aln.start, + aln.mismatches, + aln.gap_bases, + aln.gap_opens, + identity, + coverage, + query_contig, + ref_contig); + } else { + let stdout = std::io::stdout(); + let _ = writeln!(&mut stdout.lock(), + "{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{:.2}\t{:.2}\t{}\t{}", + file, ref_file.clone().unwrap(), + aln_start, + aln_end, + strand, + aln.end - aln.start, + aln.mismatches, + aln.gap_bases, + aln.gap_opens, + identity, + coverage, + query_contig, + ref_contig); + } }); }); }, Some(cli::Commands::Call{ query_file, ref_file, + output_file, max_error_prob, num_threads, kmer_size, @@ -278,33 +356,61 @@ fn main() { let ref_data: Vec<(String, Vec)> = read_fastx_file(ref_file.to_str().unwrap()); - let stdout = std::io::stdout(); - let query_data: Vec> = read_fastx_file(query_file.to_str().unwrap()).iter().map(|(_, seq)| seq.clone()).collect(); let (sbwt_query, lcs_query) = kbo::index::build_sbwt_from_vecs(&query_data, &Some(sbwt_build_options.clone())); // Will map separately against each contig in the reference - let vcf_header = vcf_writer::write_vcf_header(&mut stdout.lock(), ref_file.to_str().unwrap(), - &ref_data.iter().map(|(contig_name, contig_seq)| { - (contig_name.clone(), contig_seq.len()) - }).collect::>()) - .expect("Write header to .vcf file"); - - ref_data.iter().for_each(|(ref_contig_header, ref_seq)| { - let calls = kbo::call(&sbwt_query, &lcs_query, ref_seq, call_opts.clone()); - vcf_writer::write_vcf_contents( - &mut stdout.lock(), - &vcf_header, - &calls, - ref_seq, - ref_contig_header, - ).expect("Wrote .vcf record"); - }); + if output_file.is_some() { + // this is dumb, surely there is a way to pass stdout or the file? + let mut ofs = match std::fs::File::create(output_file.as_ref().unwrap()) { + Ok(file) => file, + Err(e) => panic!(" Error in opening --output: {}", e), + }; + + let vcf_header = vcf_writer::write_vcf_header(&mut ofs, + ref_file.to_str().unwrap(), + &ref_data.iter().map(|(contig_name, contig_seq)| { + (contig_name.clone(), contig_seq.len()) + }).collect::>()) + .expect("Write header to .vcf file"); + + ref_data.iter().for_each(|(ref_contig_header, ref_seq)| { + let calls = kbo::call(&sbwt_query, &lcs_query, ref_seq, call_opts.clone()); + vcf_writer::write_vcf_contents( + &mut ofs, + &vcf_header, + &calls, + ref_seq, + ref_contig_header, + ).expect("Wrote .vcf record"); + }); + } else { + let stdout = std::io::stdout(); + let vcf_header = vcf_writer::write_vcf_header(&mut stdout.lock(), + ref_file.to_str().unwrap(), + &ref_data.iter().map(|(contig_name, contig_seq)| { + (contig_name.clone(), contig_seq.len()) + }).collect::>()) + .expect("Write header to .vcf file"); + + ref_data.iter().for_each(|(ref_contig_header, ref_seq)| { + let calls = kbo::call(&sbwt_query, &lcs_query, ref_seq, call_opts.clone()); + vcf_writer::write_vcf_contents( + &mut stdout.lock(), + &vcf_header, + &calls, + ref_seq, + ref_contig_header, + ).expect("Wrote .vcf record"); + }); + } } Some(cli::Commands::Map { query_files, ref_file, + input_list, + output_file, max_error_prob, num_threads, kmer_size, @@ -337,26 +443,54 @@ fn main() { .build() .unwrap(); + let mut in_files: Vec<(String, PathBuf)> = query_files.iter().map(|file| (file.clone(), PathBuf::from(file))).collect(); + if let Some(list) = input_list { + let mut contents = read_input_list(list, b'\t'); + in_files.append(&mut contents); + } + let ref_data: Vec<(String, Vec)> = read_fastx_file(ref_file); - let stdout = std::io::stdout(); + let ofs = if output_file.is_some() { + let ofs = match std::fs::File::create(output_file.as_ref().unwrap()) { + Ok(file) => file, + Err(e) => panic!(" Error in opening --output: {}", e), + }; + Some(ofs) + } else { + None + }; let mut first_write = true; - query_files.iter().for_each(|query_file| { - let contigs: Vec> = read_fastx_file(query_file).iter().map(|(_, seq)| seq.clone()).collect(); + in_files.iter().for_each(|(file, path)| { + let contigs: Vec> = read_fastx_file(path.to_str().unwrap()).iter().map(|(_, seq)| seq.clone()).collect(); let (sbwt, lcs) = kbo::index::build_sbwt_from_vecs(&contigs, &Some(sbwt_build_options.clone())); let mut res: Vec = Vec::new(); ref_data.iter().for_each(|(_, ref_seq)| { res.append(&mut kbo::map(ref_seq, &sbwt, &lcs, map_opts.clone())); }); - if first_write { + + if ofs.is_some() { + + if first_write { + let _ = writeln!(&mut ofs.as_ref().unwrap(), + ">{}\n{}", ref_file, std::str::from_utf8(&ref_data.iter().flat_map(|x| x.1.clone()).collect::>()).expect("UTF-8")); + first_write = false; + } + let _ = writeln!(&mut ofs.as_ref().unwrap(), + ">{}\n{}", file, std::str::from_utf8(&res).expect("UTF-8")); + } else { + let stdout = std::io::stdout(); + + if first_write { + let _ = writeln!(&mut stdout.lock(), + ">{}\n{}", ref_file, std::str::from_utf8(&ref_data.iter().flat_map(|x| x.1.clone()).collect::>()).expect("UTF-8")); + first_write = false; + } let _ = writeln!(&mut stdout.lock(), - ">{}\n{}", ref_file, std::str::from_utf8(&ref_data.iter().flat_map(|x| x.1.clone()).collect::>()).expect("UTF-8")); - first_write = false; + ">{}\n{}", file, std::str::from_utf8(&res).expect("UTF-8")); } - let _ = writeln!(&mut stdout.lock(), - ">{}\n{}", query_file, std::str::from_utf8(&res).expect("UTF-8")); }); }, None => {}