Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 58 additions & 48 deletions turbopack/crates/turbo-tasks-backend/src/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,6 @@ impl<B: BackingStorage> Drop for OperationGuard<'_, B> {
/// Intermediate result of step 1 of task execution completion.
struct TaskExecutionCompletePrepareResult {
pub new_children: FxHashSet<TaskId>,
pub removed_data: Vec<CachedDataItem>,
pub is_now_immutable: bool,
#[cfg(feature = "verify_determinism")]
pub no_output_set: bool,
Expand Down Expand Up @@ -1837,11 +1836,13 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
stale = tracing::field::Empty,
)
.entered();

let is_error = result.is_err();

let mut ctx = self.execute_context(turbo_tasks);

let Some(TaskExecutionCompletePrepareResult {
new_children,
mut removed_data,
is_now_immutable,
#[cfg(feature = "verify_determinism")]
no_output_set,
Expand Down Expand Up @@ -1896,6 +1897,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
return true;
}

let mut removed_data = Vec::new();
if self.task_execution_completed_finish(
&mut ctx,
task_id,
Expand All @@ -1911,9 +1913,15 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
return true;
}

drop(removed_data);
self.task_execution_completed_cleanup(
&mut ctx,
task_id,
cell_counters,
is_error,
&mut removed_data,
);

self.task_execution_completed_cleanup(&mut ctx, task_id);
drop(removed_data);

false
}
Expand All @@ -1934,7 +1942,6 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
if matches!(in_progress, InProgressState::Canceled) {
return Some(TaskExecutionCompletePrepareResult {
new_children: Default::default(),
removed_data: Default::default(),
is_now_immutable: false,
#[cfg(feature = "verify_determinism")]
no_output_set: false,
Expand Down Expand Up @@ -2026,7 +2033,6 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {

let mut queue = AggregationUpdateQueue::new();

let mut removed_data = Vec::new();
let mut old_edges = Vec::new();

let has_children = !new_children.is_empty();
Expand Down Expand Up @@ -2065,27 +2071,6 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
old_edges.extend(iter_many!(task, Child { task } => task).map(OutdatedEdge::Child));
}

// Remove no longer existing cells and
// find all outdated data items (removed cells, outdated edges)
// Note: For persistent tasks we only want to call extract_if when there are actual cells to
// remove to avoid tracking that as modification.
if task_id.is_transient() || iter_many!(task, CellData { cell }
if cell_counters.get(&cell.type_id).is_none_or(|start_index| cell.index >= *start_index) => cell
).count() > 0 {
removed_data.extend(task.extract_if(CachedDataItemType::CellData, |key, _| {
matches!(key, CachedDataItemKey::CellData { cell } if cell_counters
.get(&cell.type_id).is_none_or(|start_index| cell.index >= *start_index))
}));
}
if task_id.is_transient() || iter_many!(task, TransientCellData { cell }
if cell_counters.get(&cell.type_id).is_none_or(|start_index| cell.index >= *start_index) => cell
).count() > 0 {
removed_data.extend(task.extract_if(CachedDataItemType::TransientCellData, |key, _| {
matches!(key, CachedDataItemKey::TransientCellData { cell } if cell_counters
.get(&cell.type_id).is_none_or(|start_index| cell.index >= *start_index))
}));
}

old_edges.extend(
task.iter(CachedDataItemType::OutdatedCollectible)
.filter_map(|(key, value)| match (key, value) {
Expand All @@ -2100,25 +2085,6 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
if self.should_track_dependencies() {
old_edges.extend(iter_many!(task, OutdatedCellDependency { target, key } => OutdatedEdge::CellDependency(target, key)));
old_edges.extend(iter_many!(task, OutdatedOutputDependency { target } => OutdatedEdge::OutputDependency(target)));
old_edges.extend(
iter_many!(task, CellDependent { cell, task, key: _ } => (cell, task)).filter_map(
|(cell, task)| {
if cell_counters
.get(&cell.type_id)
.is_none_or(|start_index| cell.index >= *start_index)
&& let Some(old_counter) = old_counters.get(&cell.type_id)
&& cell.index < *old_counter
{
return Some(OutdatedEdge::RemovedCellDependent {
task_id: task,
#[cfg(feature = "trace_task_dirty")]
value_type_id: cell.type_id,
});
}
None
},
),
);
}

// Check if output need to be updated
Expand Down Expand Up @@ -2195,7 +2161,6 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {

Some(TaskExecutionCompletePrepareResult {
new_children,
removed_data,
is_now_immutable,
#[cfg(feature = "verify_determinism")]
no_output_set,
Expand Down Expand Up @@ -2563,14 +2528,59 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
reschedule
}

fn task_execution_completed_cleanup(&self, ctx: &mut impl ExecuteContext<'_>, task_id: TaskId) {
fn task_execution_completed_cleanup(
&self,
ctx: &mut impl ExecuteContext<'_>,
task_id: TaskId,
cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
is_error: bool,
removed_data: &mut Vec<CachedDataItem>,
) {
let mut task = ctx.task(task_id, TaskDataCategory::All);

// An error is potentially caused by a eventual consistency, so we avoid updating cells
// after an error as it is likely transient and we want to keep the dependent tasks
// clean to avoid re-executions.
if !is_error {
// Remove no longer existing cells and
// find all outdated data items (removed cells, outdated edges)
// Note: For persistent tasks we only want to call extract_if when there are actual
// cells to remove to avoid tracking that as modification.
// Note: We do not mark the tasks as dirty here, as these tasks are unused or stale
// anyway and we want to avoid needless re-executions. When the cells become
// used again, they are invalidated from the update cell operation.
let is_cell_unused = |cell: CellId| {
cell_counters
.get(&cell.type_id)
.is_none_or(|start_index| cell.index >= *start_index)
};
if task_id.is_transient()
|| iter_many!(task, CellData { cell } => cell).any(is_cell_unused)
{
removed_data.extend(task.extract_if(CachedDataItemType::CellData, |key, _| {
matches!(key, CachedDataItemKey::CellData { cell } if cell_counters.get(&cell.type_id).is_none_or(|start_index| cell.index >= *start_index))
}));
}
if task_id.is_transient()
|| iter_many!(task, TransientCellData { cell } => cell).any(is_cell_unused)
{
removed_data.extend(task.extract_if(
CachedDataItemType::TransientCellData,
|key, _| {
matches!(key, CachedDataItemKey::TransientCellData { cell } if cell_counters.get(&cell.type_id).is_none_or(|start_index| cell.index >= *start_index))
},
));
}
}

// Shrink memory usage
task.shrink_to_fit(CachedDataItemType::CellData);
task.shrink_to_fit(CachedDataItemType::TransientCellData);
task.shrink_to_fit(CachedDataItemType::CellTypeMaxIndex);
task.shrink_to_fit(CachedDataItemType::CellDependency);
task.shrink_to_fit(CachedDataItemType::OutputDependency);
task.shrink_to_fit(CachedDataItemType::CollectiblesDependency);

drop(task);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ use rustc_hash::FxHashSet;
use smallvec::SmallVec;
use turbo_tasks::TaskId;

#[cfg(feature = "trace_task_dirty")]
use crate::backend::operation::invalidate::TaskDirtyCause;
use crate::{
backend::{
TaskDataCategory, get, get_many,
Expand All @@ -16,7 +14,6 @@ use crate::{
AggregationUpdateJob, AggregationUpdateQueue, InnerOfUppersLostFollowersJob,
get_aggregation_number, get_uppers, is_aggregating_node,
},
invalidate::make_task_dirty,
},
storage::update_count,
},
Expand Down Expand Up @@ -45,11 +42,6 @@ pub enum OutdatedEdge {
CellDependency(CellRef, Option<u64>),
OutputDependency(TaskId),
CollectiblesDependency(CollectiblesRef),
RemovedCellDependent {
task_id: TaskId,
#[cfg(feature = "trace_task_dirty")]
value_type_id: turbo_tasks::ValueTypeId,
},
}

impl CleanupOldEdgesOperation {
Expand Down Expand Up @@ -229,21 +221,6 @@ impl Operation for CleanupOldEdgesOperation {
});
}
}
OutdatedEdge::RemovedCellDependent {
task_id,
#[cfg(feature = "trace_task_dirty")]
value_type_id,
} => {
make_task_dirty(
task_id,
#[cfg(feature = "trace_task_dirty")]
TaskDirtyCause::CellRemoved {
value_type: value_type_id,
},
queue,
ctx,
);
}
}
}

Expand Down
Loading