diff --git a/Cargo.toml b/Cargo.toml index 3e29eff47f2d4..2fbd4340638b5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -136,7 +136,6 @@ serde = { version = "1", features = ["derive"] } bytemuck = "1.7" # Needed to poll Task examples futures-lite = "1.11.3" -crossbeam-channel = "0.5.0" [[example]] name = "hello_world" diff --git a/crates/bevy_asset/Cargo.toml b/crates/bevy_asset/Cargo.toml index c64bb610bf939..0bf5009b2d994 100644 --- a/crates/bevy_asset/Cargo.toml +++ b/crates/bevy_asset/Cargo.toml @@ -25,7 +25,6 @@ bevy_utils = { path = "../bevy_utils", version = "0.9.0" } # other serde = { version = "1", features = ["derive"] } -crossbeam-channel = "0.5.0" anyhow = "1.0.4" thiserror = "1.0" downcast-rs = "1.2.0" diff --git a/crates/bevy_asset/src/asset_server.rs b/crates/bevy_asset/src/asset_server.rs index 41c91fe7c7f4f..1e67e7816ca20 100644 --- a/crates/bevy_asset/src/asset_server.rs +++ b/crates/bevy_asset/src/asset_server.rs @@ -9,9 +9,11 @@ use bevy_ecs::system::{Res, ResMut, Resource}; use bevy_log::warn; use bevy_tasks::IoTaskPool; use bevy_utils::{Entry, HashMap, Uuid}; -use crossbeam_channel::TryRecvError; use parking_lot::{Mutex, RwLock}; -use std::{path::Path, sync::Arc}; +use std::{ + path::Path, + sync::{mpsc::TryRecvError, Arc}, +}; use thiserror::Error; /// Errors that occur while loading assets with an `AssetServer`. @@ -57,7 +59,7 @@ fn format_missing_asset_ext(exts: &[String]) -> String { #[derive(Default)] pub(crate) struct AssetRefCounter { - pub(crate) channel: Arc, + pub(crate) channel: Arc>, pub(crate) ref_counts: Arc>>, pub(crate) mark_unused_assets: Arc>>, } @@ -142,7 +144,7 @@ impl AssetServer { &*self.server.asset_io } - pub(crate) fn register_asset_type(&self) -> Assets { + pub(crate) fn register_asset_type(&mut self) -> Assets { if self .server .asset_lifecycles @@ -153,7 +155,15 @@ impl AssetServer { panic!("Error while registering new asset type: {:?} with UUID: {:?}. Another type with the same UUID is already registered. Can not register new asset type with the same UUID", std::any::type_name::(), T::TYPE_UUID); } - Assets::new(self.server.asset_ref_counter.channel.sender.clone()) + Assets::new( + self.server + .asset_ref_counter + .channel + .lock() + .sender + .get() + .clone(), + ) } /// Adds the provided asset loader to the server. @@ -177,13 +187,27 @@ impl AssetServer { /// Gets a strong handle for an asset with the provided id. pub fn get_handle>(&self, id: I) -> Handle { - let sender = self.server.asset_ref_counter.channel.sender.clone(); + let sender = self + .server + .asset_ref_counter + .channel + .lock() + .sender + .get() + .clone(); Handle::strong(id.into(), sender) } /// Gets an untyped strong handle for an asset with the provided id. pub fn get_handle_untyped>(&self, id: I) -> HandleUntyped { - let sender = self.server.asset_ref_counter.channel.sender.clone(); + let sender = self + .server + .asset_ref_counter + .channel + .lock() + .sender + .get() + .clone(); HandleUntyped::strong(id.into(), sender) } @@ -371,9 +395,10 @@ impl AssetServer { }; // load the asset source using the corresponding AssetLoader + let asset_ref_counter = self.server.asset_ref_counter.channel.lock(); let mut load_context = LoadContext::new( asset_path.path(), - &self.server.asset_ref_counter.channel, + &asset_ref_counter, self.asset_io(), version, ); @@ -532,11 +557,11 @@ impl AssetServer { /// Iterates through asset references and marks assets with no active handles as unused. pub fn mark_unused_assets(&self) { - let receiver = &self.server.asset_ref_counter.channel.receiver; + let receiver = &mut self.server.asset_ref_counter.channel.lock().receiver; let mut ref_counts = self.server.asset_ref_counter.ref_counts.write(); let mut potential_frees = None; loop { - let ref_change = match receiver.try_recv() { + let ref_change = match receiver.get().try_recv() { Ok(ref_change) => ref_change, Err(TryRecvError::Empty) => break, Err(TryRecvError::Disconnected) => panic!("RefChange channel disconnected."), @@ -584,15 +609,15 @@ impl AssetServer { // Note: this takes a `ResMut>` to ensure change detection does not get // triggered unless the `Assets` collection is actually updated. pub(crate) fn update_asset_storage(&self, mut assets: ResMut>) { - let asset_lifecycles = self.server.asset_lifecycles.read(); - let asset_lifecycle = asset_lifecycles.get(&T::TYPE_UUID).unwrap(); + let mut asset_lifecycles = self.server.asset_lifecycles.write(); + let asset_lifecycle = asset_lifecycles.get_mut(&T::TYPE_UUID).unwrap(); let mut asset_sources_guard = None; let channel = asset_lifecycle - .downcast_ref::>() + .downcast_mut::>() .unwrap(); loop { - match channel.receiver.try_recv() { + match channel.receiver.get().try_recv() { Ok(AssetLifecycleEvent::Create(result)) => { // update SourceInfo if this asset was loaded from an AssetPath if let HandleId::AssetPathId(id) = result.id { diff --git a/crates/bevy_asset/src/assets.rs b/crates/bevy_asset/src/assets.rs index 1a3a300553990..1425851481869 100644 --- a/crates/bevy_asset/src/assets.rs +++ b/crates/bevy_asset/src/assets.rs @@ -9,9 +9,9 @@ use bevy_ecs::{ world::FromWorld, }; use bevy_reflect::{FromReflect, GetTypeRegistration, Reflect}; -use bevy_utils::HashMap; -use crossbeam_channel::Sender; +use bevy_utils::{synccell::SyncCell, HashMap}; use std::fmt::Debug; +use std::sync::mpsc::Sender; /// Events that involve assets of type `T`. /// @@ -67,11 +67,21 @@ impl Debug for AssetEvent { /// Remember, if there are no Strong handles for an asset (i.e. they have all been dropped), the /// asset will unload. Make sure you always have a Strong handle when you want to keep an asset /// loaded! -#[derive(Debug, Resource)] +#[derive(Resource)] pub struct Assets { assets: HashMap, events: Events>, - pub(crate) ref_change_sender: Sender, + pub(crate) ref_change_sender: SyncCell>, +} + +impl Debug for Assets { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Assets") + .field("assets", &self.assets) + .field("events", &self.events) + .field("ref_change_sender", &"Receiver { .. }") + .finish() + } } impl Assets { @@ -79,7 +89,7 @@ impl Assets { Assets { assets: HashMap::default(), events: Events::default(), - ref_change_sender, + ref_change_sender: SyncCell::new(ref_change_sender), } } @@ -158,8 +168,8 @@ impl Assets { } /// Gets a _Strong_ handle pointing to the same asset as the given one. - pub fn get_handle>(&self, handle: H) -> Handle { - Handle::strong(handle.into(), self.ref_change_sender.clone()) + pub fn get_handle>(&mut self, handle: H) -> Handle { + Handle::strong(handle.into(), self.ref_change_sender.get().clone()) } /// Gets mutable access to an asset for the given handle, inserting a new value if none exists. @@ -330,7 +340,7 @@ impl AddAsset for App { return self; } let assets = { - let asset_server = self.world.resource::(); + let mut asset_server = self.world.resource_mut::(); asset_server.register_asset_type::() }; diff --git a/crates/bevy_asset/src/filesystem_watcher.rs b/crates/bevy_asset/src/filesystem_watcher.rs index ace2500cd5b8f..caeff4b542aa4 100644 --- a/crates/bevy_asset/src/filesystem_watcher.rs +++ b/crates/bevy_asset/src/filesystem_watcher.rs @@ -1,6 +1,7 @@ -use crossbeam_channel::Receiver; +use bevy_utils::synccell::SyncCell; use notify::{Config, Event, RecommendedWatcher, RecursiveMode, Result, Watcher}; use std::path::Path; +use std::sync::mpsc::Receiver; /// Watches for changes to files on the local filesystem. /// @@ -8,12 +9,12 @@ use std::path::Path; /// assets when their source files are modified. pub struct FilesystemWatcher { pub watcher: RecommendedWatcher, - pub receiver: Receiver>, + pub receiver: SyncCell>>, } impl Default for FilesystemWatcher { fn default() -> Self { - let (sender, receiver) = crossbeam_channel::unbounded(); + let (sender, receiver) = std::sync::mpsc::channel(); let watcher: RecommendedWatcher = RecommendedWatcher::new( move |res| { sender.send(res).expect("Watch event send failure."); @@ -21,7 +22,10 @@ impl Default for FilesystemWatcher { Config::default(), ) .expect("Failed to create filesystem watcher."); - FilesystemWatcher { watcher, receiver } + FilesystemWatcher { + watcher, + receiver: SyncCell::new(receiver), + } } } diff --git a/crates/bevy_asset/src/handle.rs b/crates/bevy_asset/src/handle.rs index dfa4bd2d35892..fd3491163c60f 100644 --- a/crates/bevy_asset/src/handle.rs +++ b/crates/bevy_asset/src/handle.rs @@ -13,9 +13,9 @@ use bevy_ecs::{component::Component, reflect::ReflectComponent}; use bevy_reflect::{ std_traits::ReflectDefault, FromReflect, Reflect, ReflectDeserialize, ReflectSerialize, }; -use bevy_utils::Uuid; -use crossbeam_channel::{Receiver, Sender}; +use bevy_utils::{synccell::SyncCell, Uuid}; use serde::{Deserialize, Serialize}; +use std::sync::mpsc::{Receiver, Sender}; /// A unique, stable asset id. #[derive( @@ -121,7 +121,7 @@ where enum HandleType { #[default] Weak, - Strong(Sender), + Strong(SyncCell>), } impl Debug for HandleType { @@ -138,7 +138,7 @@ impl Handle { ref_change_sender.send(RefChange::Increment(id)).unwrap(); Self { id, - handle_type: HandleType::Strong(ref_change_sender), + handle_type: HandleType::Strong(SyncCell::new(ref_change_sender)), marker: PhantomData, } } @@ -187,13 +187,13 @@ impl Handle { /// Makes this handle Strong if it wasn't already. /// /// This method requires the corresponding [`Assets`](crate::Assets) collection. - pub fn make_strong(&mut self, assets: &Assets) { + pub fn make_strong(&mut self, assets: &mut Assets) { if self.is_strong() { return; } - let sender = assets.ref_change_sender.clone(); + let sender = assets.ref_change_sender.get().clone(); sender.send(RefChange::Increment(self.id)).unwrap(); - self.handle_type = HandleType::Strong(sender); + self.handle_type = HandleType::Strong(SyncCell::new(sender)); } /// Creates a weak copy of this handle. @@ -204,9 +204,9 @@ impl Handle { } /// Creates an untyped copy of this handle. - pub fn clone_untyped(&self) -> HandleUntyped { - match &self.handle_type { - HandleType::Strong(sender) => HandleUntyped::strong(self.id, sender.clone()), + pub fn clone_untyped(&mut self) -> HandleUntyped { + match &mut self.handle_type { + HandleType::Strong(sender) => HandleUntyped::strong(self.id, sender.get().clone()), HandleType::Weak => HandleUntyped::weak(self.id), } } @@ -220,10 +220,10 @@ impl Handle { impl Drop for Handle { fn drop(&mut self) { match self.handle_type { - HandleType::Strong(ref sender) => { + HandleType::Strong(ref mut sender) => { // ignore send errors because this means the channel is shut down / the game has // stopped - let _ = sender.send(RefChange::Decrement(self.id)); + let _ = sender.get().send(RefChange::Decrement(self.id)); } HandleType::Weak => {} } @@ -308,7 +308,7 @@ impl Debug for Handle { impl Clone for Handle { fn clone(&self) -> Self { match self.handle_type { - HandleType::Strong(ref sender) => Handle::strong(self.id, sender.clone()), + HandleType::Strong(ref mut sender) => Handle::strong(self.id, sender.get().clone()), HandleType::Weak => Handle::weak(self.id), } } @@ -339,7 +339,7 @@ impl HandleUntyped { ref_change_sender.send(RefChange::Increment(id)).unwrap(); Self { id, - handle_type: HandleType::Strong(ref_change_sender), + handle_type: HandleType::Strong(SyncCell::new(ref_change_sender)), } } @@ -396,7 +396,7 @@ impl HandleUntyped { ); } let handle_type = match &self.handle_type { - HandleType::Strong(sender) => HandleType::Strong(sender.clone()), + HandleType::Strong(sender) => HandleType::Strong(SyncCell::new(sender.get().clone())), HandleType::Weak => HandleType::Weak, }; // ensure we don't send the RefChange event when "self" is dropped @@ -412,10 +412,10 @@ impl HandleUntyped { impl Drop for HandleUntyped { fn drop(&mut self) { match self.handle_type { - HandleType::Strong(ref sender) => { + HandleType::Strong(ref mut sender) => { // ignore send errors because this means the channel is shut down / the game has // stopped - let _ = sender.send(RefChange::Decrement(self.id)); + let _ = sender.get().send(RefChange::Decrement(self.id)); } HandleType::Weak => {} } @@ -455,7 +455,7 @@ impl Eq for HandleUntyped {} impl Clone for HandleUntyped { fn clone(&self) -> Self { match self.handle_type { - HandleType::Strong(ref sender) => HandleUntyped::strong(self.id, sender.clone()), + HandleType::Strong(ref sender) => HandleUntyped::strong(self.id, sender.get().clone()), HandleType::Weak => HandleUntyped::weak(self.id), } } @@ -466,15 +466,17 @@ pub(crate) enum RefChange { Decrement(HandleId), } -#[derive(Clone)] pub(crate) struct RefChangeChannel { - pub sender: Sender, - pub receiver: Receiver, + pub sender: SyncCell>, + pub receiver: SyncCell>, } impl Default for RefChangeChannel { fn default() -> Self { - let (sender, receiver) = crossbeam_channel::unbounded(); - RefChangeChannel { sender, receiver } + let (sender, receiver) = std::sync::mpsc::channel(); + RefChangeChannel { + sender: SyncCell::new(sender), + receiver: SyncCell::new(receiver), + } } } diff --git a/crates/bevy_asset/src/io/file_asset_io.rs b/crates/bevy_asset/src/io/file_asset_io.rs index e4f9a57bba0cd..beb3d2c587f73 100644 --- a/crates/bevy_asset/src/io/file_asset_io.rs +++ b/crates/bevy_asset/src/io/file_asset_io.rs @@ -3,16 +3,16 @@ use crate::{filesystem_watcher::FilesystemWatcher, AssetServer}; use crate::{AssetIo, AssetIoError, Metadata}; use anyhow::Result; #[cfg(feature = "filesystem_watcher")] -use bevy_ecs::system::Res; +use bevy_ecs::system::ResMut; use bevy_utils::BoxedFuture; #[cfg(feature = "filesystem_watcher")] use bevy_utils::HashSet; -#[cfg(feature = "filesystem_watcher")] -use crossbeam_channel::TryRecvError; use fs::File; #[cfg(feature = "filesystem_watcher")] use parking_lot::RwLock; #[cfg(feature = "filesystem_watcher")] +use std::sync::mpsc::TryRecvError; +#[cfg(feature = "filesystem_watcher")] use std::sync::Arc; use std::{ convert::TryFrom, @@ -168,18 +168,18 @@ impl AssetIo for FileAssetIo { feature = "filesystem_watcher", all(not(target_arch = "wasm32"), not(target_os = "android")) ))] -pub fn filesystem_watcher_system(asset_server: Res) { +pub fn filesystem_watcher_system(mut asset_server: ResMut) { let mut changed = HashSet::default(); let asset_io = - if let Some(asset_io) = asset_server.server.asset_io.downcast_ref::() { + if let Some(asset_io) = asset_server.server.asset_io.downcast_mut::() { asset_io } else { return; }; - let watcher = asset_io.filesystem_watcher.read(); - if let Some(ref watcher) = *watcher { + let watcher = asset_io.filesystem_watcher.write(); + if let Some(ref mut watcher) = *watcher { loop { - let event = match watcher.receiver.try_recv() { + let event = match watcher.receiver.get().try_recv() { Ok(result) => result.unwrap(), Err(TryRecvError::Empty) => break, Err(TryRecvError::Disconnected) => panic!("FilesystemWatcher disconnected."), diff --git a/crates/bevy_asset/src/loader.rs b/crates/bevy_asset/src/loader.rs index 931efacb4c42a..6f0f10f38d4a7 100644 --- a/crates/bevy_asset/src/loader.rs +++ b/crates/bevy_asset/src/loader.rs @@ -6,10 +6,10 @@ use anyhow::Error; use anyhow::Result; use bevy_ecs::system::{Res, ResMut}; use bevy_reflect::{TypeUuid, TypeUuidDynamic}; -use bevy_utils::{BoxedFuture, HashMap}; -use crossbeam_channel::{Receiver, Sender}; +use bevy_utils::{synccell::SyncCell, BoxedFuture, HashMap}; use downcast_rs::{impl_downcast, Downcast}; use std::path::Path; +use std::sync::mpsc::{Receiver, Sender}; /// A loader for an asset source. /// @@ -168,7 +168,7 @@ impl<'a> LoadContext<'a> { /// Gets a handle to an asset of type `T` from its id. pub fn get_handle, T: Asset>(&self, id: I) -> Handle { - Handle::strong(id.into(), self.ref_change_channel.sender.clone()) + Handle::strong(id.into(), self.ref_change_channel.sender.get().clone()) } /// Reads the contents of the file at the specified path through the [`AssetIo`] associated @@ -208,12 +208,11 @@ pub struct AssetResult { } /// An event channel used by asset server to update the asset storage of a `T` asset. -#[derive(Debug)] pub struct AssetLifecycleChannel { /// The sender endpoint of the channel. - pub sender: Sender>, + pub sender: SyncCell>>, /// The receiver endpoint of the channel. - pub receiver: Receiver>, + pub receiver: SyncCell>>, } /// Events for the [`AssetLifecycleChannel`]. @@ -237,6 +236,7 @@ impl AssetLifecycle for AssetLifecycleChannel { fn create_asset(&self, id: HandleId, asset: Box, version: usize) { if let Ok(asset) = asset.downcast::() { self.sender + .get() .send(AssetLifecycleEvent::Create(AssetResult { asset, id, @@ -252,14 +252,20 @@ impl AssetLifecycle for AssetLifecycleChannel { } fn free_asset(&self, id: HandleId) { - self.sender.send(AssetLifecycleEvent::Free(id)).unwrap(); + self.sender + .get() + .send(AssetLifecycleEvent::Free(id)) + .unwrap(); } } impl Default for AssetLifecycleChannel { fn default() -> Self { - let (sender, receiver) = crossbeam_channel::unbounded(); - AssetLifecycleChannel { sender, receiver } + let (sender, receiver) = std::sync::mpsc::channel(); + AssetLifecycleChannel { + sender: SyncCell::new(sender), + receiver: SyncCell::new(receiver), + } } } diff --git a/crates/bevy_core/Cargo.toml b/crates/bevy_core/Cargo.toml index 2e2c4cea863b2..0566f42613ba6 100644 --- a/crates/bevy_core/Cargo.toml +++ b/crates/bevy_core/Cargo.toml @@ -24,6 +24,3 @@ serde = { version = "1.0", optional = true } [features] serialize = ["dep:serde"] - -[dev-dependencies] -crossbeam-channel = "0.5.0" diff --git a/crates/bevy_core/src/lib.rs b/crates/bevy_core/src/lib.rs index a3b91ea2d9fc7..97e46107f9a73 100644 --- a/crates/bevy_core/src/lib.rs +++ b/crates/bevy_core/src/lib.rs @@ -146,21 +146,21 @@ mod tests { app.add_plugin(TaskPoolPlugin::default()); app.add_plugin(TypeRegistrationPlugin::default()); - let (async_tx, async_rx) = crossbeam_channel::unbounded(); + let (async_tx, async_rx) = std::sync::mpsc::channel(); AsyncComputeTaskPool::get() .spawn_local(async move { async_tx.send(()).unwrap(); }) .detach(); - let (compute_tx, compute_rx) = crossbeam_channel::unbounded(); + let (compute_tx, compute_rx) = std::sync::mpsc::channel(); ComputeTaskPool::get() .spawn_local(async move { compute_tx.send(()).unwrap(); }) .detach(); - let (io_tx, io_rx) = crossbeam_channel::unbounded(); + let (io_tx, io_rx) = std::sync::mpsc::channel(); IoTaskPool::get() .spawn_local(async move { io_tx.send(()).unwrap(); diff --git a/crates/bevy_time/Cargo.toml b/crates/bevy_time/Cargo.toml index a95210464e936..b754c2e6c8103 100644 --- a/crates/bevy_time/Cargo.toml +++ b/crates/bevy_time/Cargo.toml @@ -20,5 +20,4 @@ bevy_reflect = { path = "../bevy_reflect", version = "0.9.0", features = ["bevy" bevy_utils = { path = "../bevy_utils", version = "0.9.0" } # other -crossbeam-channel = "0.5.0" serde = { version = "1", features = ["derive"], optional = true } diff --git a/crates/bevy_time/src/lib.rs b/crates/bevy_time/src/lib.rs index 86646e3f1c2ce..632bbf3338672 100644 --- a/crates/bevy_time/src/lib.rs +++ b/crates/bevy_time/src/lib.rs @@ -10,8 +10,8 @@ pub use time::*; pub use timer::*; use bevy_ecs::system::{Res, ResMut}; -use bevy_utils::{tracing::warn, Duration, Instant}; -use crossbeam_channel::{Receiver, Sender}; +use bevy_utils::{synccell::SyncCell, tracing::warn, Duration, Instant}; +use std::sync::mpsc::{Receiver, SyncSender}; pub mod prelude { //! The Bevy Time Prelude. @@ -61,18 +61,18 @@ pub enum TimeUpdateStrategy { /// Channel resource used to receive time from render world #[derive(Resource)] -pub struct TimeReceiver(pub Receiver); +pub struct TimeReceiver(pub SyncCell>); /// Channel resource used to send time from render world #[derive(Resource)] -pub struct TimeSender(pub Sender); +pub struct TimeSender(pub SyncSender); /// Creates channels used for sending time between render world and app world pub fn create_time_channels() -> (TimeSender, TimeReceiver) { // bound the channel to 2 since when pipelined the render phase can finish before // the time system runs. - let (s, r) = crossbeam_channel::bounded::(2); - (TimeSender(s), TimeReceiver(r)) + let (s, r) = std::sync::mpsc::sync_channel::(2); + (TimeSender(s), TimeReceiver(SyncCell::new(r))) } /// The system used to update the [`Time`] used by app logic. If there is a render world the time is sent from @@ -80,12 +80,12 @@ pub fn create_time_channels() -> (TimeSender, TimeReceiver) { fn time_system( mut time: ResMut