Skip to content

Commit 9f4b18d

Browse files
authored
Release the asset infos lock before acquiring the file transaction lock. (#22017)
# Objective - Fix the first flake related to #22001. - We had a double lock problem. We would first lock `asset_infos`, then lock one of the assets within `asset_infos` **and then dropped the `asset_infos` lock**. This means however that if a process needs to access `asset_infos` (which is necessary for loading nested assets during processing), we'll have one thread trying to lock 1) asset_infos, 2) per-asset lock, and we'll have another thread trying to lock 1) per-asset lock (we've already dropped the asset_infos lock), 2) asset_infos. A classic deadlock! ## Solution - Before locking the per-asset lock, we clone the `Arc<RwLock>` out of the `asset_infos`, drop the `asset_infos`, and only then lock the per-asset lock. This ensures that we never hang on the `asset_infos` lock when just trying to fetch a single asset. - Make all the access to `asset_infos` "short" - they get what they need out of `asset_infos` and then drop the lock as soon as possible. - I also used `?` to cleanup some methods in `ProcessorAssetInfos`, where previously it was "remove the asset info, then have one big `if let` after it". Now we just return early if the value is none. - I also happened to fix a weird case where the new path in a rename wasn't guarded by a transaction lock. Now it is! ## Testing - Running without this fix I get "Ran out of loops" from the `only_reprocesses_wrong_hash_on_startup` test quite quickly (after a minute or so). With this fix, I now only get the assertion failure problem. If I also skip that assertion, the test hasn't flaked for a while! Yay, no more deadlock!
1 parent 84e1278 commit 9f4b18d

File tree

1 file changed

+131
-102
lines changed
  • crates/bevy_asset/src/processor

1 file changed

+131
-102
lines changed

crates/bevy_asset/src/processor/mod.rs

Lines changed: 131 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -642,15 +642,20 @@ impl AssetProcessor {
642642
async fn handle_removed_asset(&self, source: &AssetSource, path: PathBuf) {
643643
let asset_path = AssetPath::from(path).with_source(source.id());
644644
debug!("Removing processed {asset_path} because source was removed");
645-
let mut infos = self.data.processing_state.asset_infos.write().await;
646-
if let Some(info) = infos.get(&asset_path) {
647-
// we must wait for uncontested write access to the asset source to ensure existing readers / writers
648-
// can finish their operations
649-
let _write_lock = info.file_transaction_lock.write();
650-
self.remove_processed_asset_and_meta(source, asset_path.path())
651-
.await;
652-
}
653-
infos.remove(&asset_path).await;
645+
let lock = {
646+
// Scope the infos lock so we don't hold up other processing for too long.
647+
let mut infos = self.data.processing_state.asset_infos.write().await;
648+
infos.remove(&asset_path).await
649+
};
650+
let Some(lock) = lock else {
651+
return;
652+
};
653+
654+
// we must wait for uncontested write access to the asset source to ensure existing
655+
// readers/writers can finish their operations
656+
let _write_lock = lock.write();
657+
self.remove_processed_asset_and_meta(source, asset_path.path())
658+
.await;
654659
}
655660

656661
/// Handles a renamed source asset by moving its processed results to the new location and updating in-memory paths + metadata.
@@ -662,24 +667,29 @@ impl AssetProcessor {
662667
new: PathBuf,
663668
new_task_sender: &async_channel::Sender<(AssetSourceId<'static>, PathBuf)>,
664669
) {
665-
let mut infos = self.data.processing_state.asset_infos.write().await;
666670
let old = AssetPath::from(old).with_source(source.id());
667671
let new = AssetPath::from(new).with_source(source.id());
668672
let processed_writer = source.processed_writer().unwrap();
669-
if let Some(info) = infos.get(&old) {
670-
// we must wait for uncontested write access to the asset source to ensure existing readers / writers
671-
// can finish their operations
672-
let _write_lock = info.file_transaction_lock.write();
673-
processed_writer
674-
.rename(old.path(), new.path())
675-
.await
676-
.unwrap();
677-
processed_writer
678-
.rename_meta(old.path(), new.path())
679-
.await
680-
.unwrap();
681-
}
682-
infos.rename(&old, &new, new_task_sender).await;
673+
let result = {
674+
// Scope the infos lock so we don't hold up other processing for too long.
675+
let mut infos = self.data.processing_state.asset_infos.write().await;
676+
infos.rename(&old, &new, new_task_sender).await
677+
};
678+
let Some((old_lock, new_lock)) = result else {
679+
return;
680+
};
681+
// we must wait for uncontested write access to both assets to ensure existing
682+
// readers/writers can finish their operations
683+
let _old_write_lock = old_lock.write();
684+
let _new_write_lock = new_lock.write();
685+
processed_writer
686+
.rename(old.path(), new.path())
687+
.await
688+
.unwrap();
689+
processed_writer
690+
.rename_meta(old.path(), new.path())
691+
.await
692+
.unwrap();
683693
}
684694

685695
async fn queue_processing_tasks_for_folder(
@@ -1069,9 +1079,15 @@ impl AssetProcessor {
10691079
// Note: this lock must remain alive until all processed asset and meta writes have finished (or failed)
10701080
// See ProcessedAssetInfo::file_transaction_lock docs for more info
10711081
let _transaction_lock = {
1072-
let mut infos = self.data.processing_state.asset_infos.write().await;
1073-
let info = infos.get_or_insert(asset_path.clone());
1074-
info.file_transaction_lock.write_arc().await
1082+
let lock = {
1083+
let mut infos = self.data.processing_state.asset_infos.write().await;
1084+
let info = infos.get_or_insert(asset_path.clone());
1085+
// Clone out the transaction lock first and then lock after we've dropped the
1086+
// asset_infos. Otherwise, trying to lock a single path can block all other paths to
1087+
// (leading to deadlocks).
1088+
info.file_transaction_lock.clone()
1089+
};
1090+
lock.write_arc().await
10751091
};
10761092

10771093
// NOTE: if processing the asset fails this will produce an "unfinished" log entry, forcing a rebuild on next run.
@@ -1317,11 +1333,17 @@ impl ProcessingState {
13171333
&self,
13181334
path: &AssetPath<'static>,
13191335
) -> Result<RwLockReadGuardArc<()>, AssetReaderError> {
1320-
let infos = self.asset_infos.read().await;
1321-
let info = infos
1322-
.get(path)
1323-
.ok_or_else(|| AssetReaderError::NotFound(path.path().to_owned()))?;
1324-
Ok(info.file_transaction_lock.read_arc().await)
1336+
let lock = {
1337+
let infos = self.asset_infos.read().await;
1338+
let info = infos
1339+
.get(path)
1340+
.ok_or_else(|| AssetReaderError::NotFound(path.path().to_owned()))?;
1341+
// Clone out the transaction lock first and then lock after we've dropped the
1342+
// asset_infos. Otherwise, trying to lock a single path can block all other paths to
1343+
// (leading to deadlocks).
1344+
info.file_transaction_lock.clone()
1345+
};
1346+
Ok(lock.read_arc().await)
13251347
}
13261348

13271349
/// Returns a future that will not finish until the path has been processed.
@@ -1603,95 +1625,102 @@ impl ProcessorAssetInfos {
16031625
}
16041626
}
16051627

1606-
/// Remove the info for the given path. This should only happen if an asset's source is removed / non-existent
1607-
async fn remove(&mut self, asset_path: &AssetPath<'static>) {
1608-
let info = self.infos.remove(asset_path);
1609-
if let Some(info) = info {
1610-
if let Some(processed_info) = info.processed_info {
1611-
self.clear_dependencies(asset_path, processed_info);
1612-
}
1613-
// Tell all listeners this asset does not exist
1614-
info.status_sender
1615-
.broadcast(ProcessStatus::NonExistent)
1616-
.await
1617-
.unwrap();
1618-
if !info.dependents.is_empty() {
1619-
error!(
1628+
/// Remove the info for the given path. This should only happen if an asset's source is
1629+
/// removed/non-existent. Returns the transaction lock for the asset, or [`None`] if the asset
1630+
/// path does not exist.
1631+
async fn remove(
1632+
&mut self,
1633+
asset_path: &AssetPath<'static>,
1634+
) -> Option<Arc<async_lock::RwLock<()>>> {
1635+
let info = self.infos.remove(asset_path)?;
1636+
if let Some(processed_info) = info.processed_info {
1637+
self.clear_dependencies(asset_path, processed_info);
1638+
}
1639+
// Tell all listeners this asset does not exist
1640+
info.status_sender
1641+
.broadcast(ProcessStatus::NonExistent)
1642+
.await
1643+
.unwrap();
1644+
if !info.dependents.is_empty() {
1645+
error!(
16201646
"The asset at {asset_path} was removed, but it had assets that depend on it to be processed. Consider updating the path in the following assets: {:?}",
16211647
info.dependents
16221648
);
1623-
self.non_existent_dependents
1624-
.insert(asset_path.clone(), info.dependents);
1625-
}
1649+
self.non_existent_dependents
1650+
.insert(asset_path.clone(), info.dependents);
16261651
}
1652+
1653+
Some(info.file_transaction_lock)
16271654
}
16281655

1629-
/// Remove the info for the given path. This should only happen if an asset's source is removed / non-existent
1656+
/// Remove the info for the old path, and move over its info to the new path. This should only
1657+
/// happen if an asset's source is removed/non-existent. Returns the transaction locks for the
1658+
/// old and new assets respectively, or [`None`] if the old path does not exist.
16301659
async fn rename(
16311660
&mut self,
16321661
old: &AssetPath<'static>,
16331662
new: &AssetPath<'static>,
16341663
new_task_sender: &async_channel::Sender<(AssetSourceId<'static>, PathBuf)>,
1635-
) {
1636-
let info = self.infos.remove(old);
1637-
if let Some(mut info) = info {
1638-
if !info.dependents.is_empty() {
1639-
// TODO: We can't currently ensure "moved" folders with relative paths aren't broken because AssetPath
1640-
// doesn't distinguish between absolute and relative paths. We have "erased" relativeness. In the short term,
1641-
// we could do "remove everything in a folder and re-add", but that requires full rebuilds / destroying the cache.
1642-
// If processors / loaders could enumerate dependencies, we could check if the new deps line up with a rename.
1643-
// If deps encoded "relativeness" as part of loading, that would also work (this seems like the right call).
1644-
// TODO: it would be nice to log an error here for dependents that aren't also being moved + fixed.
1645-
// (see the remove impl).
1646-
error!(
1664+
) -> Option<(Arc<async_lock::RwLock<()>>, Arc<async_lock::RwLock<()>>)> {
1665+
let mut info = self.infos.remove(old)?;
1666+
if !info.dependents.is_empty() {
1667+
// TODO: We can't currently ensure "moved" folders with relative paths aren't broken because AssetPath
1668+
// doesn't distinguish between absolute and relative paths. We have "erased" relativeness. In the short term,
1669+
// we could do "remove everything in a folder and re-add", but that requires full rebuilds / destroying the cache.
1670+
// If processors / loaders could enumerate dependencies, we could check if the new deps line up with a rename.
1671+
// If deps encoded "relativeness" as part of loading, that would also work (this seems like the right call).
1672+
// TODO: it would be nice to log an error here for dependents that aren't also being moved + fixed.
1673+
// (see the remove impl).
1674+
error!(
16471675
"The asset at {old} was removed, but it had assets that depend on it to be processed. Consider updating the path in the following assets: {:?}",
16481676
info.dependents
16491677
);
1650-
self.non_existent_dependents
1651-
.insert(old.clone(), core::mem::take(&mut info.dependents));
1652-
}
1653-
if let Some(processed_info) = &info.processed_info {
1654-
// Update "dependent" lists for this asset's "process dependencies" to use new path.
1655-
for dep in &processed_info.process_dependencies {
1656-
if let Some(info) = self.infos.get_mut(&dep.path) {
1657-
info.dependents.remove(old);
1658-
info.dependents.insert(new.clone());
1659-
} else if let Some(dependents) = self.non_existent_dependents.get_mut(&dep.path)
1660-
{
1661-
dependents.remove(old);
1662-
dependents.insert(new.clone());
1663-
}
1678+
self.non_existent_dependents
1679+
.insert(old.clone(), core::mem::take(&mut info.dependents));
1680+
}
1681+
if let Some(processed_info) = &info.processed_info {
1682+
// Update "dependent" lists for this asset's "process dependencies" to use new path.
1683+
for dep in &processed_info.process_dependencies {
1684+
if let Some(info) = self.infos.get_mut(&dep.path) {
1685+
info.dependents.remove(old);
1686+
info.dependents.insert(new.clone());
1687+
} else if let Some(dependents) = self.non_existent_dependents.get_mut(&dep.path) {
1688+
dependents.remove(old);
1689+
dependents.insert(new.clone());
16641690
}
16651691
}
1666-
// Tell all listeners this asset no longer exists
1667-
info.status_sender
1668-
.broadcast(ProcessStatus::NonExistent)
1669-
.await
1670-
.unwrap();
1671-
let dependents: Vec<AssetPath<'static>> = {
1672-
let new_info = self.get_or_insert(new.clone());
1673-
new_info.processed_info = info.processed_info;
1674-
new_info.status = info.status;
1675-
// Ensure things waiting on the new path are informed of the status of this asset
1676-
if let Some(status) = new_info.status {
1677-
new_info.status_sender.broadcast(status).await.unwrap();
1678-
}
1679-
new_info.dependents.iter().cloned().collect()
1680-
};
1681-
// Queue the asset for a reprocess check, in case it needs new meta.
1692+
}
1693+
// Tell all listeners this asset no longer exists
1694+
info.status_sender
1695+
.broadcast(ProcessStatus::NonExistent)
1696+
.await
1697+
.unwrap();
1698+
let new_info = self.get_or_insert(new.clone());
1699+
new_info.processed_info = info.processed_info;
1700+
new_info.status = info.status;
1701+
// Ensure things waiting on the new path are informed of the status of this asset
1702+
if let Some(status) = new_info.status {
1703+
new_info.status_sender.broadcast(status).await.unwrap();
1704+
}
1705+
let dependents = new_info.dependents.iter().cloned().collect::<Vec<_>>();
1706+
// Queue the asset for a reprocess check, in case it needs new meta.
1707+
let _ = new_task_sender
1708+
.send((new.source().clone_owned(), new.path().to_owned()))
1709+
.await;
1710+
for dependent in dependents {
1711+
// Queue dependents for reprocessing because they might have been waiting for this asset.
16821712
let _ = new_task_sender
1683-
.send((new.source().clone_owned(), new.path().to_owned()))
1713+
.send((
1714+
dependent.source().clone_owned(),
1715+
dependent.path().to_owned(),
1716+
))
16841717
.await;
1685-
for dependent in dependents {
1686-
// Queue dependents for reprocessing because they might have been waiting for this asset.
1687-
let _ = new_task_sender
1688-
.send((
1689-
dependent.source().clone_owned(),
1690-
dependent.path().to_owned(),
1691-
))
1692-
.await;
1693-
}
16941718
}
1719+
1720+
Some((
1721+
info.file_transaction_lock,
1722+
new_info.file_transaction_lock.clone(),
1723+
))
16951724
}
16961725

16971726
fn clear_dependencies(&mut self, asset_path: &AssetPath<'static>, removed_info: ProcessedInfo) {

0 commit comments

Comments
 (0)