Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 26 additions & 22 deletions fendermint/vm/interpreter/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,8 +350,10 @@ where
// We start a blockstore transaction that can be reverted
state.state_tree_mut().begin_transaction();
for item in local_finalized_blobs.iter() {
if is_blob_finalized(&mut state, item.subscriber, item.hash, item.id.clone())? {
tracing::debug!(hash = %item.hash, "blob already finalized on chain; removing from pool");
let (finalized, status) =
is_blob_finalized(&mut state, item.subscriber, item.hash, item.id.clone())?;
if finalized {
tracing::debug!(hash = %item.hash, "blob already finalized on chain (status={:?}); removing from pool", status);
atomically(|| chain_env.blob_pool.remove_task(item)).await;
// Remove the result from the pool
atomically(|| chain_env.blob_pool.remove_result(item)).await;
Expand Down Expand Up @@ -647,9 +649,11 @@ where
ChainMessage::Ipc(IpcMessage::BlobPending(blob)) => {
// Check that blobs that are being enqueued are still in "added" state in the actor
// Once we enqueue a blob, the actor will transition it to "pending" state.
if !is_blob_added(&mut state, blob.subscriber, blob.hash, blob.id)? {
tracing::warn!(hash = %blob.hash, "blob is not added onchain; rejecting proposal");
return Ok(false);
let (added, status) = with_state_transaction(&mut state, |state| {
is_blob_added(state, blob.subscriber, blob.hash, blob.id)
})?;
if !added {
tracing::warn!(hash = %blob.hash, "blob is not added onchain (status={:?})", status);
}

// Reject the proposal if the current processor is not keeping up with blob
Expand All @@ -664,12 +668,12 @@ where
// Ensure that the blob is ready to be included on-chain.
// We can accept the proposal if the blob has reached a global quorum and is
// not yet finalized.
let is_blob_finalized = with_state_transaction(&mut state, |state| {
is_blob_finalized(state, blob.subscriber, blob.hash, blob.id.clone())
})?;
let (is_blob_finalized, status) =
with_state_transaction(&mut state, |state| {
is_blob_finalized(state, blob.subscriber, blob.hash, blob.id.clone())
})?;
if is_blob_finalized {
tracing::warn!(hash = %blob.hash, "blob is already finalized on chain; rejecting proposal");
return Ok(false);
tracing::warn!(hash = %blob.hash, "blob is already finalized on chain (status={:?}))", status);
}

let (is_globally_finalized, succeeded) = atomically(|| {
Expand Down Expand Up @@ -717,10 +721,11 @@ where
}
ChainMessage::Ipc(IpcMessage::ReadRequestPending(read_request)) => {
// Check that the read request is still open
let status = get_read_request_status(&mut state, read_request.id)?;
let status = with_state_transaction(&mut state, |state| {
get_read_request_status(state, read_request.id)
})?;
if !matches!(status, Some(ReadRequestStatus::Open)) {
tracing::warn!(request_id = %read_request.id, "read request is not open; rejecting proposal");
return Ok(false);
tracing::warn!(request_id = %read_request.id, "read request is not open");
}

// Reject the proposal if the current processor is not keeping up with read
Expand All @@ -739,8 +744,7 @@ where
get_read_request_status(state, read_request.id)
})?;
if !matches!(status, Some(ReadRequestStatus::Pending)) {
tracing::warn!(hash = %read_request.id, "only pending read requests can be closed; rejecting proposal");
return Ok(false);
tracing::warn!(hash = %read_request.id, "only pending read requests can be closed");
}

let read_response = read_request.response.clone();
Expand Down Expand Up @@ -1398,23 +1402,23 @@ where
.map_err(|e| anyhow!("error parsing blob status: {e}"))
}

/// Check if a blob is in added state, by reading its on-chain state.
/// Check if a blob is in the added state, by reading its on-chain state.
fn is_blob_added<DB>(
state: &mut FvmExecState<ReadOnlyBlockstore<DB>>,
subscriber: Address,
hash: Hash,
id: SubscriptionId,
) -> anyhow::Result<bool>
) -> anyhow::Result<(bool, Option<BlobStatus>)>
where
DB: Blockstore + Clone + 'static + Send + Sync,
{
let status = get_blob_status(state, subscriber, hash, id)?;
let added = if let Some(status) = status {
let added = if let Some(status) = status.clone() {
matches!(status, BlobStatus::Added)
} else {
false
};
Ok(added)
Ok((added, status))
}

/// Check if a blob is finalized (if it is resolved or failed), by reading its on-chain state.
Expand All @@ -1423,17 +1427,17 @@ fn is_blob_finalized<DB>(
subscriber: Address,
hash: Hash,
id: SubscriptionId,
) -> anyhow::Result<bool>
) -> anyhow::Result<(bool, Option<BlobStatus>)>
where
DB: Blockstore + Clone + 'static + Send + Sync,
{
let status = get_blob_status(state, subscriber, hash, id)?;
let finalized = if let Some(status) = status {
let finalized = if let Some(status) = status.clone() {
matches!(status, BlobStatus::Resolved | BlobStatus::Failed)
} else {
false
};
Ok(finalized)
Ok((finalized, status))
}

/// Returns credit and blob stats from on-chain state.
Expand Down
Loading