Skip to content

Replace crossbeam with std::sync::mpsc #7346

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ serde = { version = "1", features = ["derive"] }
bytemuck = "1.7"
# Needed to poll Task examples
futures-lite = "1.11.3"
crossbeam-channel = "0.5.0"

[[example]]
name = "hello_world"
Expand Down
1 change: 0 additions & 1 deletion crates/bevy_asset/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ bevy_utils = { path = "../bevy_utils", version = "0.9.0" }

# other
serde = { version = "1", features = ["derive"] }
crossbeam-channel = "0.5.0"
anyhow = "1.0.4"
thiserror = "1.0"
downcast-rs = "1.2.0"
Expand Down
53 changes: 39 additions & 14 deletions crates/bevy_asset/src/asset_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ use bevy_ecs::system::{Res, ResMut, Resource};
use bevy_log::warn;
use bevy_tasks::IoTaskPool;
use bevy_utils::{Entry, HashMap, Uuid};
use crossbeam_channel::TryRecvError;
use parking_lot::{Mutex, RwLock};
use std::{path::Path, sync::Arc};
use std::{
path::Path,
sync::{mpsc::TryRecvError, Arc},
};
use thiserror::Error;

/// Errors that occur while loading assets with an `AssetServer`.
Expand Down Expand Up @@ -57,7 +59,7 @@ fn format_missing_asset_ext(exts: &[String]) -> String {

#[derive(Default)]
pub(crate) struct AssetRefCounter {
pub(crate) channel: Arc<RefChangeChannel>,
pub(crate) channel: Arc<Mutex<RefChangeChannel>>,
pub(crate) ref_counts: Arc<RwLock<HashMap<HandleId, usize>>>,
pub(crate) mark_unused_assets: Arc<Mutex<Vec<HandleId>>>,
}
Expand Down Expand Up @@ -142,7 +144,7 @@ impl AssetServer {
&*self.server.asset_io
}

pub(crate) fn register_asset_type<T: Asset>(&self) -> Assets<T> {
pub(crate) fn register_asset_type<T: Asset>(&mut self) -> Assets<T> {
if self
.server
.asset_lifecycles
Expand All @@ -153,7 +155,15 @@ impl AssetServer {
panic!("Error while registering new asset type: {:?} with UUID: {:?}. Another type with the same UUID is already registered. Can not register new asset type with the same UUID",
std::any::type_name::<T>(), T::TYPE_UUID);
}
Assets::new(self.server.asset_ref_counter.channel.sender.clone())
Assets::new(
self.server
.asset_ref_counter
.channel
.lock()
.sender
.get()
.clone(),
)
}

/// Adds the provided asset loader to the server.
Expand All @@ -177,13 +187,27 @@ impl AssetServer {

/// Gets a strong handle for an asset with the provided id.
pub fn get_handle<T: Asset, I: Into<HandleId>>(&self, id: I) -> Handle<T> {
let sender = self.server.asset_ref_counter.channel.sender.clone();
let sender = self
.server
.asset_ref_counter
.channel
.lock()
.sender
.get()
.clone();
Handle::strong(id.into(), sender)
}

/// Gets an untyped strong handle for an asset with the provided id.
pub fn get_handle_untyped<I: Into<HandleId>>(&self, id: I) -> HandleUntyped {
let sender = self.server.asset_ref_counter.channel.sender.clone();
let sender = self
.server
.asset_ref_counter
.channel
.lock()
.sender
.get()
.clone();
HandleUntyped::strong(id.into(), sender)
}

Expand Down Expand Up @@ -371,9 +395,10 @@ impl AssetServer {
};

// load the asset source using the corresponding AssetLoader
let asset_ref_counter = self.server.asset_ref_counter.channel.lock();
let mut load_context = LoadContext::new(
asset_path.path(),
&self.server.asset_ref_counter.channel,
&asset_ref_counter,
self.asset_io(),
version,
);
Expand Down Expand Up @@ -532,11 +557,11 @@ impl AssetServer {

/// Iterates through asset references and marks assets with no active handles as unused.
pub fn mark_unused_assets(&self) {
let receiver = &self.server.asset_ref_counter.channel.receiver;
let receiver = &mut self.server.asset_ref_counter.channel.lock().receiver;
let mut ref_counts = self.server.asset_ref_counter.ref_counts.write();
let mut potential_frees = None;
loop {
let ref_change = match receiver.try_recv() {
let ref_change = match receiver.get().try_recv() {
Ok(ref_change) => ref_change,
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Disconnected) => panic!("RefChange channel disconnected."),
Expand Down Expand Up @@ -584,15 +609,15 @@ impl AssetServer {
// Note: this takes a `ResMut<Assets<T>>` to ensure change detection does not get
// triggered unless the `Assets` collection is actually updated.
pub(crate) fn update_asset_storage<T: Asset>(&self, mut assets: ResMut<Assets<T>>) {
let asset_lifecycles = self.server.asset_lifecycles.read();
let asset_lifecycle = asset_lifecycles.get(&T::TYPE_UUID).unwrap();
let mut asset_lifecycles = self.server.asset_lifecycles.write();
let asset_lifecycle = asset_lifecycles.get_mut(&T::TYPE_UUID).unwrap();
let mut asset_sources_guard = None;
let channel = asset_lifecycle
.downcast_ref::<AssetLifecycleChannel<T>>()
.downcast_mut::<AssetLifecycleChannel<T>>()
.unwrap();

loop {
match channel.receiver.try_recv() {
match channel.receiver.get().try_recv() {
Ok(AssetLifecycleEvent::Create(result)) => {
// update SourceInfo if this asset was loaded from an AssetPath
if let HandleId::AssetPathId(id) = result.id {
Expand Down
26 changes: 18 additions & 8 deletions crates/bevy_asset/src/assets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ use bevy_ecs::{
world::FromWorld,
};
use bevy_reflect::{FromReflect, GetTypeRegistration, Reflect};
use bevy_utils::HashMap;
use crossbeam_channel::Sender;
use bevy_utils::{synccell::SyncCell, HashMap};
use std::fmt::Debug;
use std::sync::mpsc::Sender;

/// Events that involve assets of type `T`.
///
Expand Down Expand Up @@ -67,19 +67,29 @@ impl<T: Asset> Debug for AssetEvent<T> {
/// Remember, if there are no Strong handles for an asset (i.e. they have all been dropped), the
/// asset will unload. Make sure you always have a Strong handle when you want to keep an asset
/// loaded!
#[derive(Debug, Resource)]
#[derive(Resource)]
pub struct Assets<T: Asset> {
assets: HashMap<HandleId, T>,
events: Events<AssetEvent<T>>,
pub(crate) ref_change_sender: Sender<RefChange>,
pub(crate) ref_change_sender: SyncCell<Sender<RefChange>>,
}

impl<T: Asset + Debug> Debug for Assets<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Assets")
.field("assets", &self.assets)
.field("events", &self.events)
.field("ref_change_sender", &"Receiver { .. }")
.finish()
}
}

impl<T: Asset> Assets<T> {
pub(crate) fn new(ref_change_sender: Sender<RefChange>) -> Self {
Assets {
assets: HashMap::default(),
events: Events::default(),
ref_change_sender,
ref_change_sender: SyncCell::new(ref_change_sender),
}
}

Expand Down Expand Up @@ -158,8 +168,8 @@ impl<T: Asset> Assets<T> {
}

/// Gets a _Strong_ handle pointing to the same asset as the given one.
pub fn get_handle<H: Into<HandleId>>(&self, handle: H) -> Handle<T> {
Handle::strong(handle.into(), self.ref_change_sender.clone())
pub fn get_handle<H: Into<HandleId>>(&mut self, handle: H) -> Handle<T> {
Handle::strong(handle.into(), self.ref_change_sender.get().clone())
}

/// Gets mutable access to an asset for the given handle, inserting a new value if none exists.
Expand Down Expand Up @@ -330,7 +340,7 @@ impl AddAsset for App {
return self;
}
let assets = {
let asset_server = self.world.resource::<AssetServer>();
let mut asset_server = self.world.resource_mut::<AssetServer>();
asset_server.register_asset_type::<T>()
};

Expand Down
12 changes: 8 additions & 4 deletions crates/bevy_asset/src/filesystem_watcher.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,31 @@
use crossbeam_channel::Receiver;
use bevy_utils::synccell::SyncCell;
use notify::{Config, Event, RecommendedWatcher, RecursiveMode, Result, Watcher};
use std::path::Path;
use std::sync::mpsc::Receiver;

/// Watches for changes to files on the local filesystem.
///
/// When hot-reloading is enabled, the [`AssetServer`](crate::AssetServer) uses this to reload
/// assets when their source files are modified.
pub struct FilesystemWatcher {
pub watcher: RecommendedWatcher,
pub receiver: Receiver<Result<Event>>,
pub receiver: SyncCell<Receiver<Result<Event>>>,
}

impl Default for FilesystemWatcher {
fn default() -> Self {
let (sender, receiver) = crossbeam_channel::unbounded();
let (sender, receiver) = std::sync::mpsc::channel();
let watcher: RecommendedWatcher = RecommendedWatcher::new(
move |res| {
sender.send(res).expect("Watch event send failure.");
},
Config::default(),
)
.expect("Failed to create filesystem watcher.");
FilesystemWatcher { watcher, receiver }
FilesystemWatcher {
watcher,
receiver: SyncCell::new(receiver),
}
}
}

Expand Down
48 changes: 25 additions & 23 deletions crates/bevy_asset/src/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ use bevy_ecs::{component::Component, reflect::ReflectComponent};
use bevy_reflect::{
std_traits::ReflectDefault, FromReflect, Reflect, ReflectDeserialize, ReflectSerialize,
};
use bevy_utils::Uuid;
use crossbeam_channel::{Receiver, Sender};
use bevy_utils::{synccell::SyncCell, Uuid};
use serde::{Deserialize, Serialize};
use std::sync::mpsc::{Receiver, Sender};

/// A unique, stable asset id.
#[derive(
Expand Down Expand Up @@ -121,7 +121,7 @@ where
enum HandleType {
#[default]
Weak,
Strong(Sender<RefChange>),
Strong(SyncCell<Sender<RefChange>>),
}

impl Debug for HandleType {
Expand All @@ -138,7 +138,7 @@ impl<T: Asset> Handle<T> {
ref_change_sender.send(RefChange::Increment(id)).unwrap();
Self {
id,
handle_type: HandleType::Strong(ref_change_sender),
handle_type: HandleType::Strong(SyncCell::new(ref_change_sender)),
marker: PhantomData,
}
}
Expand Down Expand Up @@ -187,13 +187,13 @@ impl<T: Asset> Handle<T> {
/// Makes this handle Strong if it wasn't already.
///
/// This method requires the corresponding [`Assets`](crate::Assets) collection.
pub fn make_strong(&mut self, assets: &Assets<T>) {
pub fn make_strong(&mut self, assets: &mut Assets<T>) {
if self.is_strong() {
return;
}
let sender = assets.ref_change_sender.clone();
let sender = assets.ref_change_sender.get().clone();
sender.send(RefChange::Increment(self.id)).unwrap();
self.handle_type = HandleType::Strong(sender);
self.handle_type = HandleType::Strong(SyncCell::new(sender));
}

/// Creates a weak copy of this handle.
Expand All @@ -204,9 +204,9 @@ impl<T: Asset> Handle<T> {
}

/// Creates an untyped copy of this handle.
pub fn clone_untyped(&self) -> HandleUntyped {
match &self.handle_type {
HandleType::Strong(sender) => HandleUntyped::strong(self.id, sender.clone()),
pub fn clone_untyped(&mut self) -> HandleUntyped {
match &mut self.handle_type {
HandleType::Strong(sender) => HandleUntyped::strong(self.id, sender.get().clone()),
HandleType::Weak => HandleUntyped::weak(self.id),
}
}
Expand All @@ -220,10 +220,10 @@ impl<T: Asset> Handle<T> {
impl<T: Asset> Drop for Handle<T> {
fn drop(&mut self) {
match self.handle_type {
HandleType::Strong(ref sender) => {
HandleType::Strong(ref mut sender) => {
// ignore send errors because this means the channel is shut down / the game has
// stopped
let _ = sender.send(RefChange::Decrement(self.id));
let _ = sender.get().send(RefChange::Decrement(self.id));
}
HandleType::Weak => {}
}
Expand Down Expand Up @@ -308,7 +308,7 @@ impl<T: Asset> Debug for Handle<T> {
impl<T: Asset> Clone for Handle<T> {
fn clone(&self) -> Self {
match self.handle_type {
HandleType::Strong(ref sender) => Handle::strong(self.id, sender.clone()),
HandleType::Strong(ref mut sender) => Handle::strong(self.id, sender.get().clone()),
HandleType::Weak => Handle::weak(self.id),
}
}
Expand Down Expand Up @@ -339,7 +339,7 @@ impl HandleUntyped {
ref_change_sender.send(RefChange::Increment(id)).unwrap();
Self {
id,
handle_type: HandleType::Strong(ref_change_sender),
handle_type: HandleType::Strong(SyncCell::new(ref_change_sender)),
}
}

Expand Down Expand Up @@ -396,7 +396,7 @@ impl HandleUntyped {
);
}
let handle_type = match &self.handle_type {
HandleType::Strong(sender) => HandleType::Strong(sender.clone()),
HandleType::Strong(sender) => HandleType::Strong(SyncCell::new(sender.get().clone())),
HandleType::Weak => HandleType::Weak,
};
// ensure we don't send the RefChange event when "self" is dropped
Expand All @@ -412,10 +412,10 @@ impl HandleUntyped {
impl Drop for HandleUntyped {
fn drop(&mut self) {
match self.handle_type {
HandleType::Strong(ref sender) => {
HandleType::Strong(ref mut sender) => {
// ignore send errors because this means the channel is shut down / the game has
// stopped
let _ = sender.send(RefChange::Decrement(self.id));
let _ = sender.get().send(RefChange::Decrement(self.id));
}
HandleType::Weak => {}
}
Expand Down Expand Up @@ -455,7 +455,7 @@ impl Eq for HandleUntyped {}
impl Clone for HandleUntyped {
fn clone(&self) -> Self {
match self.handle_type {
HandleType::Strong(ref sender) => HandleUntyped::strong(self.id, sender.clone()),
HandleType::Strong(ref sender) => HandleUntyped::strong(self.id, sender.get().clone()),
HandleType::Weak => HandleUntyped::weak(self.id),
}
}
Expand All @@ -466,15 +466,17 @@ pub(crate) enum RefChange {
Decrement(HandleId),
}

#[derive(Clone)]
pub(crate) struct RefChangeChannel {
pub sender: Sender<RefChange>,
pub receiver: Receiver<RefChange>,
pub sender: SyncCell<Sender<RefChange>>,
pub receiver: SyncCell<Receiver<RefChange>>,
}

impl Default for RefChangeChannel {
fn default() -> Self {
let (sender, receiver) = crossbeam_channel::unbounded();
RefChangeChannel { sender, receiver }
let (sender, receiver) = std::sync::mpsc::channel();
RefChangeChannel {
sender: SyncCell::new(sender),
receiver: SyncCell::new(receiver),
}
}
}
Loading