Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
# Changelog

## Latest

### Fixes

- `mapache verify` is now very explicit about data corruption in the pack files.
Before, it would log an error when a corrupt pack is found, but the logical
check (indexed refs) could be `[OK]` and the user would be misled that the
verification passed.

## v0.1.6

### Changes
Expand Down
234 changes: 150 additions & 84 deletions core/src/commands/cmd_verify.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Instant;

use anyhow::{Result, bail};
Expand All @@ -6,10 +7,10 @@ use colored::Colorize;
use indicatif::{ProgressBar, ProgressState, ProgressStyle};
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};

use crate::mapache::defaults::SHORT_SNAPSHOT_ID_LEN;
use crate::{
backend::new_backend_with_prompt,
commands::{GlobalArgs, cleanup::CleanupHandler},
mapache::defaults::SHORT_SNAPSHOT_ID_LEN,
repository::{
repo::{RepoConfig, Repository},
snapshot::SnapshotStream,
Expand All @@ -22,19 +23,36 @@ use crate::{
#[derive(Args, Debug)]
#[clap(
about = "Verify the integrity of the data stored in the repository",
long_about = "Verify the integrity of the data stored in the repository, ensuring that all data\
associated to a any active snapshots are valid and reachable. This guarantees\
that any active snapshot can be restored."
long_about = "Verify the integrity of the data stored in the repository. \
By default, it checks logical consistency (snapshots point to known index entries). \
Use --read-packs to enforce a full physical verification (decryption + checksums)."
)]
pub struct CmdArgs {
/// Read all packs and discover unreferenced blobs
#[clap(long = "read-packs", value_parser, default_value_t = false)]
/// Read, decrypt, and hash ALL data in the repository (Slow but thorough)
#[clap(long = "read-packs", default_value_t = false)]
pub read_packs: bool,
}

struct VerifyStats {
packs_processed: AtomicUsize,
packs_corrupt: AtomicUsize,
blobs_verified: AtomicUsize,
blobs_dangling: AtomicUsize,
}

impl VerifyStats {
fn new() -> Self {
Self {
packs_processed: AtomicUsize::new(0),
packs_corrupt: AtomicUsize::new(0),
blobs_verified: AtomicUsize::new(0),
blobs_dangling: AtomicUsize::new(0),
}
}
}

pub fn run(global_args: &GlobalArgs, args: &CmdArgs) -> Result<()> {
let auth = utils::get_auth_from_file(&global_args.auth_file)?;

let backend_options = global_args.backend_options(false);
let backend_arc = new_backend_with_prompt(backend_options)?;

Expand All @@ -43,6 +61,8 @@ pub fn run(global_args: &GlobalArgs, args: &CmdArgs) -> Result<()> {
use_cache: !global_args.no_cache,
compression: global_args.compression_level,
};

// Open repository
let (repo_arc, secure_storage, lock_handle) = Repository::try_open_with_lock(
auth.as_ref(),
global_args.key.as_ref(),
Expand All @@ -52,125 +72,171 @@ pub fn run(global_args: &GlobalArgs, args: &CmdArgs) -> Result<()> {
global_args.retry_lock_duration,
)?;

// Ensure unlock on panic/drop
let lock_handle_clone = lock_handle.clone();
let _cleanup_handler = CleanupHandler::new(move || {
lock_handle_clone.write().unlock();
})?;

let start = Instant::now();
let stats = VerifyStats::new();

let snapshot_stream = SnapshotStream::new(repo_arc.clone())?;

// --------------------------------
// Physical Verification (Optional)
// --------------------------------
if args.read_packs {
ui::cli::log!("{}", "Verifying Pack Integrity (Physical)...".bold());
let packs = repo_arc.list_packs()?;

let style = ProgressStyle::default_bar()
.template(
"[{custom_elapsed}] [{bar:20.cyan/white}] Reading packs: {pos} / {len} [ETA: {custom_eta}]",
)
.template("[{custom_elapsed}] [{bar:25.cyan/white}] {pos}/{len} packs ({msg})")
.unwrap()
.progress_chars("=> ")
.with_key(
"custom_elapsed",
move |state: &ProgressState, w: &mut dyn std::fmt::Write| {
let elapsed = state.elapsed();
let custom_elapsed = utils::pretty_print_duration(elapsed);
let _ = w.write_str(&custom_elapsed);
},
)
.with_key(
"custom_eta",
move |state: &ProgressState, w: &mut dyn std::fmt::Write| {
let eta = state.eta();
let custom_eta = utils::pretty_print_duration(eta);
let _ = w.write_str(&custom_eta);
|state: &ProgressState, w: &mut dyn std::fmt::Write| {
write!(w, "{}", utils::pretty_print_duration(state.elapsed())).unwrap()
},
);

let bar = ProgressBar::new(packs.len() as u64);
bar.set_draw_target(default_bar_draw_target());
bar.set_style(style);

let repo_ref = repo_arc.clone();
let backend_ref = backend_arc.clone();
let secure_storage_ref = secure_storage.clone();

let num_dangling_blobs: usize = packs
.par_iter()
.map(|pack_id| {
let verify_res = verify_pack(
repo_ref.as_ref(),
backend_ref.as_ref(),
secure_storage_ref.as_ref(),
pack_id,
);

bar.inc(1);

verify_res.unwrap_or_else(|e| {
ui::cli::error!("Error verifying pack {}: {}", pack_id.to_short_hex(8), e);
0 // Return 0 dangling on error
})
})
.sum();

bar.finish_and_clear();
// References for the closure
let repo_ref = repo_arc.as_ref();
let backend_ref = backend_arc.as_ref();
let secure_ref = secure_storage.as_ref();

packs.par_iter().for_each(|pack_id| {
match verify_pack(repo_ref, backend_ref, secure_ref, pack_id) {
Ok(pack_stats) => {
stats.packs_processed.fetch_add(1, Ordering::Relaxed);
stats
.blobs_verified
.fetch_add(pack_stats.verified_blobs, Ordering::Relaxed);
stats
.blobs_dangling
.fetch_add(pack_stats.dangling, Ordering::Relaxed);
}
Err(e) => {
// Log IMMEDIATELY so user sees which pack failed
bar.suspend(|| {
ui::cli::error!("Pack {} CORRUPT: {}", pack_id, e);
});
stats.packs_corrupt.fetch_add(1, Ordering::Relaxed);
}
}

if num_dangling_blobs > 0 {
ui::cli::log!("Found {} unreferenced blobs", num_dangling_blobs);
// Update bar message with dynamic stats
let corrupt = stats.packs_corrupt.load(Ordering::Relaxed);
if corrupt > 0 {
bar.set_message(
utils::format_count(corrupt, "ERROR", "ERRORS")
.red()
.to_string(),
);
} else {
bar.set_message("OK".to_string());
}
bar.inc(1);
});

bar.finish();

if stats.packs_corrupt.load(Ordering::Relaxed) > 0 {
ui::cli::log!();
ui::cli::error!("Physical verification failed. The repository data is corrupt.");
} else {
ui::cli::log!(
"Physical verification passed. {} blobs verified.",
stats.blobs_verified.load(Ordering::Relaxed)
);
}

ui::cli::log!();
}

let num_snapshots = snapshot_stream.len();
let mut ok_counter = 0;
let mut error_counter = 0;
// ------------------------------------------
// Logical Verification (Snapshot References)
// ------------------------------------------
ui::cli::log!("{}", "Verifying Snapshots (Logical)...".bold());

for (i, (snapshot_id, _snapshot)) in snapshot_stream.enumerate() {
ui::cli::log!(
"Verifying snapshot {} ({} / {})",
snapshot_id
.to_short_hex(SHORT_SNAPSHOT_ID_LEN)
.bold()
.yellow(),
let snapshot_stream = SnapshotStream::new(repo_arc.clone())?;
let snapshots: Vec<_> = snapshot_stream.collect(); // Collect to know total count
let num_snapshots = snapshots.len();

let mut snapshots_corrupt = 0;

for (i, (snapshot_id, _)) in snapshots.iter().enumerate() {
let short_id = snapshot_id.to_short_hex(SHORT_SNAPSHOT_ID_LEN);

// Print progress
let msg = format!(
"Snapshot {} ({}/{})",
short_id.bold().yellow(),
i + 1,
num_snapshots
);

match verify_snapshot_refs(repo_arc.clone(), &snapshot_id) {
// Check refs
match verify_snapshot_refs(repo_arc.clone(), snapshot_id) {
Ok(_) => {
ui::cli::log!("{}", "[OK]".bold().green());
ok_counter += 1;
ui::cli::log!("{} {}", msg, "[OK]".bold().green());
}
Err(e) => {
ui::cli::log!("{} {}", "[ERROR]".bold().red(), e.to_string());
error_counter += 1
ui::cli::log!("{} {}", msg, "[ERROR]".bold().red());
ui::cli::error!("{e}");
snapshots_corrupt += 1;
}
}
}

// -------------
// FINAL REPORT
// -------------
ui::cli::log!();
ui::cli::log!(
"{} verified",
utils::format_count(num_snapshots, "snapshot", "snapshots"),
);
if ok_counter > 0 {
ui::cli::log!("{} {}", ok_counter, "[OK]".bold().green());

let packs_corrupt_count = stats.packs_corrupt.load(Ordering::Relaxed);
let dangling_count = stats.blobs_dangling.load(Ordering::Relaxed);

if packs_corrupt_count > 0 || snapshots_corrupt > 0 {
ui::cli::log!("{}", "VERIFICATION FAILED".bold().on_red());

if packs_corrupt_count > 0 {
ui::cli::log!(
"- {} corrupt/unreadable.",
utils::format_count(packs_corrupt_count, "pack", "packs")
);
}
if snapshots_corrupt > 0 {
ui::cli::log!(
"- {} with broken references.",
utils::format_count(snapshots_corrupt, "snapshot", "snapshots")
);
}

bail!("Repository integrity check failed.");
}
if error_counter > 0 {
ui::cli::log!("{} {}", error_counter, "[ERROR]".bold().red());
bail!(
"Verify failed after {}",
utils::pretty_print_duration(start.elapsed())
)
} else {
ui::cli::log!();

if dangling_count > 0 {
ui::cli::log!(
"Finished in {}",
utils::pretty_print_duration(start.elapsed())
"{} Found {} (run 'prune' to clean up).",
"[INFO]".yellow(),
utils::format_count(dangling_count, "unreferenced blob", "unreferenced blobs")
);

Ok(())
}

ui::cli::log!(
"{} Verified {} and {} in {}",
"[SUCCESS]".bold().green(),
utils::format_count(num_snapshots, "snapshot", "snapshots"),
utils::format_count(
stats.packs_processed.load(Ordering::Relaxed),
"pack",
"packs"
),
utils::pretty_print_duration(start.elapsed())
);

Ok(())
}
3 changes: 0 additions & 3 deletions core/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
use anyhow::Result;
use colored::Colorize;

use mapache::{commands, ui};

fn main() -> Result<()> {
// Parse arguments and execute commands
if let Err(e) = commands::parse_and_run() {
ui::cli::error!("{}", e.to_string());
ui::cli::log!();
ui::cli::log!("Finished with {}", "Error".bold().red());
std::process::exit(1);
}

Expand Down
Loading