Skip to content

Add &mut World as a system param and make .exclusive_system() optional #4166

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion benches/benches/bevy_ecs/scheduling/run_criteria.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use bevy_ecs::{
component::Component,
prelude::{ParallelSystemDescriptorCoercion, Res, Resource, RunCriteriaDescriptorCoercion},
prelude::{IntoSystemDescriptor, Res, Resource, RunCriteriaDescriptorCoercion},
schedule::{RunCriteriaLabel, ShouldRun, Stage, SystemStage},
system::Query,
world::World,
Expand Down
2 changes: 1 addition & 1 deletion crates/bevy_animation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use bevy_ecs::{
entity::Entity,
prelude::Component,
reflect::ReflectComponent,
schedule::ParallelSystemDescriptorCoercion,
schedule::IntoSystemDescriptor,
system::{Query, Res},
};
use bevy_hierarchy::Children;
Expand Down
4 changes: 2 additions & 2 deletions crates/bevy_app/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ pub use bevy_derive::AppLabel;
use bevy_derive::{Deref, DerefMut};
use bevy_ecs::{
event::{Event, Events},
prelude::{FromWorld, IntoExclusiveSystem},
prelude::FromWorld,
schedule::{
IntoSystemDescriptor, Schedule, ShouldRun, Stage, StageLabel, State, StateData, SystemSet,
SystemStage,
Expand Down Expand Up @@ -84,7 +84,7 @@ impl Default for App {

app.add_default_stages()
.add_event::<AppExit>()
.add_system_to_stage(CoreStage::Last, World::clear_trackers.exclusive_system());
.add_system_to_stage(CoreStage::Last, World::clear_trackers.at_end());

#[cfg(feature = "bevy_ci_testing")]
{
Expand Down
29 changes: 27 additions & 2 deletions crates/bevy_ecs/macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,27 @@ pub fn impl_param_set(_input: TokenStream) -> TokenStream {
fn apply(&mut self, world: &mut World) {
self.0.apply(world)
}

fn world_access_level() -> WorldAccessLevel {
let mut shared = false;
#(
match #param_fetch::world_access_level() {
WorldAccessLevel::Exclusive => {
return WorldAccessLevel::Exclusive;
}
WorldAccessLevel::Shared => {
shared = true;
}
WorldAccessLevel::None => (),
}
)*

if shared {
WorldAccessLevel::Shared
} else {
WorldAccessLevel::None
}
}
}


Expand All @@ -239,7 +260,7 @@ pub fn impl_param_set(_input: TokenStream) -> TokenStream {
unsafe fn get_param(
state: &'s mut Self,
system_meta: &SystemMeta,
world: &'w World,
world: MaybeUnsafeCell<'w, World>,
change_tick: u32,
) -> Self::Item {
ParamSet {
Expand Down Expand Up @@ -377,14 +398,18 @@ pub fn derive_system_param(input: TokenStream) -> TokenStream {
fn apply(&mut self, world: &mut #path::world::World) {
self.state.apply(world)
}

fn world_access_level() -> #path::system::WorldAccessLevel {
TSystemParamState::world_access_level()
}
}

impl #impl_generics #path::system::SystemParamFetch<'w, 's> for FetchState <(#(<#field_types as #path::system::SystemParam>::Fetch,)*), #punctuated_generic_idents> #where_clause {
type Item = #struct_name #ty_generics;
unsafe fn get_param(
state: &'s mut Self,
system_meta: &#path::system::SystemMeta,
world: &'w #path::world::World,
world: #path::system::MaybeUnsafeCell<'w, #path::world::World>,
change_tick: u32,
) -> Self::Item {
#struct_name {
Expand Down
2 changes: 1 addition & 1 deletion crates/bevy_ecs/src/entity/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ type IdCursor = isize;
/// }
/// #
/// # bevy_ecs::system::assert_is_system(setup);
/// # bevy_ecs::system::IntoExclusiveSystem::exclusive_system(exclusive_system);
/// # bevy_ecs::system::assert_is_system(exclusive_system);
/// ```
///
/// It can be used to refer to a specific entity to apply [`EntityCommands`], or to call [`Query::get`] (or similar methods) to access its components.
Expand Down
11 changes: 5 additions & 6 deletions crates/bevy_ecs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,13 @@ pub mod prelude {
event::{EventReader, EventWriter, Events},
query::{Added, AnyOf, ChangeTrackers, Changed, Or, QueryState, With, Without},
schedule::{
ExclusiveSystemDescriptorCoercion, ParallelSystemDescriptorCoercion, RunCriteria,
RunCriteriaDescriptorCoercion, RunCriteriaLabel, Schedule, Stage, StageLabel, State,
SystemLabel, SystemSet, SystemStage,
IntoSystemDescriptor, RunCriteria, RunCriteriaDescriptorCoercion, RunCriteriaLabel,
Schedule, Stage, StageLabel, State, SystemLabel, SystemSet, SystemStage,
},
system::{
adapter as system_adapter, Commands, In, IntoChainSystem, IntoExclusiveSystem,
IntoSystem, Local, NonSend, NonSendMut, ParallelCommands, ParamSet, Query,
RemovedComponents, Res, ResMut, Resource, System, SystemParamFunction,
adapter as system_adapter, Commands, In, IntoChainSystem, IntoSystem, Local, NonSend,
NonSendMut, ParallelCommands, ParamSet, Query, RemovedComponents, Res, ResMut,
Resource, System, SystemParamFunction,
},
world::{FromWorld, Mut, World},
};
Expand Down
80 changes: 71 additions & 9 deletions crates/bevy_ecs/src/query/access.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,20 @@ pub struct Access<T: SparseSetIndex> {
reads_and_writes: FixedBitSet,
/// The exclusively-accessed elements.
writes: FixedBitSet,
/// Is `true` if this has access to all elements in the collection?
/// Is `true` if this has access to all elements in the collection.
/// This field is a performance optimization for `&World` (also harder to mess up for soundness).
reads_all: bool,
/// Is `true` if this has exclusive access to all elements in the collection.
/// This field is a performance optimization for `&mut World` (also harder to mess up for soundness).
writes_all: bool,
marker: PhantomData<T>,
}

impl<T: SparseSetIndex> Default for Access<T> {
fn default() -> Self {
Self {
reads_all: false,
writes_all: false,
reads_and_writes: Default::default(),
writes: Default::default(),
marker: PhantomData,
Expand Down Expand Up @@ -64,7 +68,11 @@ impl<T: SparseSetIndex> Access<T> {

/// Returns `true` if this can exclusively access the element given by `index`.
pub fn has_write(&self, index: T) -> bool {
self.writes.contains(index.sparse_set_index())
if self.writes_all {
true
} else {
self.writes.contains(index.sparse_set_index())
}
}

/// Sets this as having access to all indexed elements (i.e. `&World`).
Expand All @@ -77,16 +85,29 @@ impl<T: SparseSetIndex> Access<T> {
self.reads_all
}

/// Sets this as having exclusive access to all indexed elements (i.e. `&mut World`).
pub fn write_all(&mut self) {
self.reads_all = true;
self.writes_all = true;
}

/// Returns `true` if this has exclusive access to all indexed elements (i.e. `&mut World`).
pub fn has_write_all(&self) -> bool {
self.writes_all
}

/// Removes all accesses.
pub fn clear(&mut self) {
self.reads_all = false;
self.writes_all = false;
self.reads_and_writes.clear();
self.writes.clear();
}

/// Adds all access from `other`.
pub fn extend(&mut self, other: &Access<T>) {
self.reads_all = self.reads_all || other.reads_all;
self.writes_all = self.writes_all || other.writes_all;
self.reads_and_writes.union_with(&other.reads_and_writes);
self.writes.union_with(&other.writes);
}
Expand All @@ -96,6 +117,12 @@ impl<T: SparseSetIndex> Access<T> {
/// `Access` instances are incompatible if one can write
/// an element that the other can read or write.
pub fn is_compatible(&self, other: &Access<T>) -> bool {
// All systems make a `&World` reference before running to update change detection info.
// Since exclusive systems produce a `&mut World`, we cannot let other systems run.
if self.writes_all || other.writes_all {
return false;
}

// Only systems that do not write data are compatible with systems that operate on `&World`.
if self.reads_all {
return other.writes.count_ones(..) == 0;
Expand All @@ -112,15 +139,31 @@ impl<T: SparseSetIndex> Access<T> {
/// Returns a vector of elements that the access and `other` cannot access at the same time.
pub fn get_conflicts(&self, other: &Access<T>) -> Vec<T> {
let mut conflicts = FixedBitSet::default();
if self.reads_all {
conflicts.extend(other.writes.ones());

if self.writes_all {
conflicts.extend(other.reads_and_writes.ones());
}

if other.reads_all {
conflicts.extend(self.writes.ones());
if other.writes_all {
conflicts.extend(self.reads_and_writes.ones());
}

if !(self.writes_all || other.writes_all) {
match (self.reads_all, other.reads_all) {
(false, false) => {
conflicts.extend(self.writes.intersection(&other.reads_and_writes));
conflicts.extend(self.reads_and_writes.intersection(&other.writes));
}
(false, true) => {
conflicts.extend(self.writes.ones());
}
(true, false) => {
conflicts.extend(other.writes.ones());
}
(true, true) => (),
}
}
conflicts.extend(self.writes.intersection(&other.reads_and_writes));
conflicts.extend(self.reads_and_writes.intersection(&other.writes));

conflicts
.ones()
.map(SparseSetIndex::get_sparse_set_index)
Expand Down Expand Up @@ -266,6 +309,11 @@ impl<T: SparseSetIndex> FilteredAccess<T> {
pub fn read_all(&mut self) {
self.access.read_all();
}

/// Sets the underlying unfiltered access as having exclusive access to all indexed elements.
pub fn write_all(&mut self) {
self.access.write_all();
}
}

/// A collection of [`FilteredAccess`] instances.
Expand Down Expand Up @@ -306,6 +354,20 @@ impl<T: SparseSetIndex> FilteredAccessSet<T> {
true
}

/// Returns `true` if this and `filtered_access` can be active at the same time.
pub fn is_compatible_single(&self, filtered_access: &FilteredAccess<T>) -> bool {
if self.combined_access.is_compatible(filtered_access.access()) {
return true;
}
for filtered in &self.filtered_accesses {
if !filtered.is_compatible(filtered_access) {
return false;
}
}

true
}

/// Returns a vector of elements that this set and `other` cannot access at the same time.
pub fn get_conflicts(&self, other: &FilteredAccessSet<T>) -> Vec<T> {
// if the unfiltered access is incompatible, must check each pair
Expand All @@ -320,7 +382,7 @@ impl<T: SparseSetIndex> FilteredAccessSet<T> {
conflicts.into_iter().collect()
}

/// Returns a vector of elements that this set and `other` cannot access at the same time.
/// Returns a vector of elements that this set and `filtered_access` cannot access at the same time.
pub fn get_conflicts_single(&self, filtered_access: &FilteredAccess<T>) -> Vec<T> {
// if the unfiltered access is incompatible, must check each pair
let mut conflicts = HashSet::new();
Expand Down
8 changes: 3 additions & 5 deletions crates/bevy_ecs/src/schedule/ambiguity_detection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ impl SystemOrderAmbiguity {
stage: &SystemStage,
world: &World,
) -> Self {
use crate::schedule::graph_utils::GraphNode;
use SystemStageSegment::*;

// TODO: blocked on https://github.com/bevyengine/bevy/pull/4166
Expand Down Expand Up @@ -220,7 +219,7 @@ impl SystemStage {
/// Returns vector containing all pairs of indices of systems with ambiguous execution order,
/// along with specific components that have triggered the warning.
/// Systems must be topologically sorted beforehand.
fn find_ambiguities(systems: &[impl SystemContainer]) -> Vec<(usize, usize, Vec<ComponentId>)> {
fn find_ambiguities(systems: &[SystemContainer]) -> Vec<(usize, usize, Vec<ComponentId>)> {
let mut all_dependencies = Vec::<FixedBitSet>::with_capacity(systems.len());
let mut all_dependants = Vec::<FixedBitSet>::with_capacity(systems.len());
for (index, container) in systems.iter().enumerate() {
Expand Down Expand Up @@ -266,9 +265,8 @@ fn find_ambiguities(systems: &[impl SystemContainer]) -> Vec<(usize, usize, Vec<
let a_access = systems[index_a].component_access();
let b_access = systems[index_b].component_access();
if let (Some(a), Some(b)) = (a_access, b_access) {
let conflicts = a.get_conflicts(b);
if !conflicts.is_empty() {
ambiguities.push((index_a, index_b, conflicts));
if !a.is_compatible(b) {
ambiguities.push((index_a, index_b, a.get_conflicts(b)));
}
} else {
ambiguities.push((index_a, index_b, Vec::new()));
Expand Down
10 changes: 5 additions & 5 deletions crates/bevy_ecs/src/schedule/executor.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use crate::{schedule::ParallelSystemContainer, world::World};
use crate::{schedule::SystemContainer, world::World};
use downcast_rs::{impl_downcast, Downcast};

pub trait ParallelSystemExecutor: Downcast + Send + Sync {
/// Called by `SystemStage` whenever `systems` have been changed.
fn rebuild_cached_data(&mut self, systems: &[ParallelSystemContainer]);
fn rebuild_cached_data(&mut self, systems: &[SystemContainer]);

fn run_systems(&mut self, systems: &mut [ParallelSystemContainer], world: &mut World);
fn run_systems(&mut self, systems: &mut [SystemContainer], world: &mut World);
}

impl_downcast!(ParallelSystemExecutor);
Expand All @@ -14,9 +14,9 @@ impl_downcast!(ParallelSystemExecutor);
pub struct SingleThreadedExecutor;

impl ParallelSystemExecutor for SingleThreadedExecutor {
fn rebuild_cached_data(&mut self, _: &[ParallelSystemContainer]) {}
fn rebuild_cached_data(&mut self, _: &[SystemContainer]) {}

fn run_systems(&mut self, systems: &mut [ParallelSystemContainer], world: &mut World) {
fn run_systems(&mut self, systems: &mut [SystemContainer], world: &mut World) {
for system in systems {
if system.should_run() {
#[cfg(feature = "trace")]
Expand Down
15 changes: 10 additions & 5 deletions crates/bevy_ecs/src/schedule/executor_parallel.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::{
archetype::ArchetypeComponentId,
query::Access,
schedule::{ParallelSystemContainer, ParallelSystemExecutor},
schedule::{ParallelSystemExecutor, SystemContainer},
system::MaybeUnsafeCell,
world::World,
};
use async_channel::{Receiver, Sender};
Expand Down Expand Up @@ -77,14 +78,18 @@ impl Default for ParallelExecutor {
}

impl ParallelSystemExecutor for ParallelExecutor {
fn rebuild_cached_data(&mut self, systems: &[ParallelSystemContainer]) {
fn rebuild_cached_data(&mut self, systems: &[SystemContainer]) {
self.system_metadata.clear();
self.queued.grow(systems.len());
self.running.grow(systems.len());
self.should_run.grow(systems.len());

// Construct scheduling data for systems.
for container in systems {
if container.system().is_exclusive() {
panic!("executor cannot run exclusive systems");
}

let dependencies_total = container.dependencies().len();
let system = container.system();
self.system_metadata.push(SystemSchedulingMetadata {
Expand All @@ -104,7 +109,7 @@ impl ParallelSystemExecutor for ParallelExecutor {
}
}

fn run_systems(&mut self, systems: &mut [ParallelSystemContainer], world: &mut World) {
fn run_systems(&mut self, systems: &mut [SystemContainer], world: &mut World) {
#[cfg(test)]
if self.events_sender.is_none() {
let (sender, receiver) = async_channel::unbounded::<SchedulingEvent>();
Expand Down Expand Up @@ -167,7 +172,7 @@ impl ParallelExecutor {
fn prepare_systems<'scope>(
&mut self,
scope: &mut Scope<'scope, ()>,
systems: &'scope mut [ParallelSystemContainer],
systems: &'scope mut [SystemContainer],
world: &'scope World,
) {
// These are used as a part of a unit test.
Expand Down Expand Up @@ -215,7 +220,7 @@ impl ParallelExecutor {
#[cfg(feature = "trace")]
let _system_guard = system_span.enter();
// SAFETY: the executor prevents two systems with conflicting access from running simultaneously.
unsafe { system.run_unsafe((), world) };
unsafe { system.run_unsafe((), MaybeUnsafeCell::from_ref(world)) };
};

if can_start {
Expand Down
Loading