Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 137 additions & 26 deletions crates/turborepo-daemon/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -691,40 +691,59 @@ impl<W: PackageChangesWatcher + 'static> proto::turbod_server::Turbod for TurboG

tokio::spawn(async move {
loop {
let event = match package_changes_rx.recv().await {
match package_changes_rx.recv().await {
Err(RecvError::Lagged(_)) => {
warn!("package changes stream lagged");
proto::PackageChangeEvent {
let event = proto::PackageChangeEvent {
event: Some(proto::package_change_event::Event::RediscoverPackages(
proto::RediscoverPackages {},
)),
};
if let Err(err) = tx.send(Ok(event)).await {
error!("package changes stream closed: {}", err);
break;
}
}
Err(RecvError::Closed) => {
warn!(
"package changes channel closed, file watching may have failed to \
initialize"
);
let event = proto::PackageChangeEvent {
event: Some(proto::package_change_event::Event::Error(
proto::PackageChangeError {
message: "file watching failed to initialize".to_string(),
},
)),
};
let _ = tx.send(Ok(event)).await;
break;
}
Ok(PackageChangeEvent::Package { name }) => {
let event = proto::PackageChangeEvent {
event: Some(proto::package_change_event::Event::PackageChanged(
proto::PackageChanged {
package_name: name.to_string(),
},
)),
};
if let Err(err) = tx.send(Ok(event)).await {
error!("package changes stream closed: {}", err);
break;
}
}
Ok(PackageChangeEvent::Rediscover) => {
let event = proto::PackageChangeEvent {
event: Some(proto::package_change_event::Event::RediscoverPackages(
proto::RediscoverPackages {},
)),
};
if let Err(err) = tx.send(Ok(event)).await {
error!("package changes stream closed: {}", err);
break;
}
}
Err(err) => proto::PackageChangeEvent {
event: Some(proto::package_change_event::Event::Error(
proto::PackageChangeError {
message: err.to_string(),
},
)),
},
Ok(PackageChangeEvent::Package { name }) => proto::PackageChangeEvent {
event: Some(proto::package_change_event::Event::PackageChanged(
proto::PackageChanged {
package_name: name.to_string(),
},
)),
},
Ok(PackageChangeEvent::Rediscover) => proto::PackageChangeEvent {
event: Some(proto::package_change_event::Event::RediscoverPackages(
proto::RediscoverPackages {},
)),
},
};

if let Err(err) = tx.send(Ok(event)).await {
error!("package changes stream closed: {}", err);
break;
}
}
});

Expand Down Expand Up @@ -1010,4 +1029,96 @@ mod test {
.expect("server exited");
assert_matches!(close_reason, Ok(CloseReason::Shutdown));
}

/// Verifies that when the package changes broadcast sender is dropped
/// (e.g. because file watching failed to initialize), the gRPC stream
/// forwarding loop sends an error event and terminates instead of
/// spinning forever.
#[tokio::test]
async fn package_changes_stream_closes_on_sender_drop() {
let (package_tx, mut package_rx) = broadcast::channel::<PackageChangeEvent>(16);

// Simulate the forwarding loop from the package_changes handler
let (tx, mut rx) = tokio::sync::mpsc::channel::<
Result<crate::proto::PackageChangeEvent, tonic::Status>,
>(16);

let handle = tokio::spawn(async move {
loop {
match package_rx.recv().await {
Err(broadcast::error::RecvError::Lagged(_)) => {
let event = crate::proto::PackageChangeEvent {
event: Some(
crate::proto::package_change_event::Event::RediscoverPackages(
crate::proto::RediscoverPackages {},
),
),
};
if tx.send(Ok(event)).await.is_err() {
break;
}
}
Err(broadcast::error::RecvError::Closed) => {
let event = crate::proto::PackageChangeEvent {
event: Some(crate::proto::package_change_event::Event::Error(
crate::proto::PackageChangeError {
message: "file watching failed to initialize".to_string(),
},
)),
};
let _ = tx.send(Ok(event)).await;
break;
}
Ok(PackageChangeEvent::Package { name }) => {
let event = crate::proto::PackageChangeEvent {
event: Some(crate::proto::package_change_event::Event::PackageChanged(
crate::proto::PackageChanged {
package_name: name.to_string(),
},
)),
};
if tx.send(Ok(event)).await.is_err() {
break;
}
}
Ok(PackageChangeEvent::Rediscover) => {
let event = crate::proto::PackageChangeEvent {
event: Some(
crate::proto::package_change_event::Event::RediscoverPackages(
crate::proto::RediscoverPackages {},
),
),
};
if tx.send(Ok(event)).await.is_err() {
break;
}
}
};
}
});

// Drop the sender, simulating file watching failure
drop(package_tx);

// The loop should send an error event and terminate
let event = tokio::time::timeout(Duration::from_secs(1), rx.recv())
.await
.expect("should not time out")
.expect("should receive an event");

let event = event.expect("should be Ok");
assert_matches!(
event.event,
Some(crate::proto::package_change_event::Event::Error(_))
);

// The loop task should complete (not hang)
tokio::time::timeout(Duration::from_secs(1), handle)
.await
.expect("loop should terminate")
.expect("loop should not panic");

// Channel should be closed after the loop exits
assert!(rx.recv().await.is_none());
}
}
13 changes: 11 additions & 2 deletions crates/turborepo-filewatch/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use notify::{Config, RecommendedWatcher};
use notify::{Event, EventHandler, RecursiveMode, Watcher};
use thiserror::Error;
use tokio::sync::{broadcast, mpsc, watch::error::RecvError};
use tracing::{debug, warn};
use tracing::{debug, error, warn};
use turbopath::{AbsoluteSystemPath, AbsoluteSystemPathBuf, PathRelation};
#[cfg(feature = "manual_recursive_watch")]
use {
Expand Down Expand Up @@ -145,6 +145,10 @@ impl FileSystemWatcher {
let Ok(Ok(watcher)) = task.await else {
// if the watcher fails, just return. we don't set the event sender, and other
// services will never start
error!(
"file watcher failed to start. watch mode and other daemon-dependent \
features will not work"
);
return;
};

Expand All @@ -153,7 +157,12 @@ impl FileSystemWatcher {
if let Err(e) = wait_for_cookie(&cookie_dir, &mut recv_file_events).await {
// if we can't get a cookie here, we should not make the file
// watching available to downstream services
warn!("failed to wait for initial filesystem cookie: {}", e);
error!(
"failed to wait for initial filesystem cookie: {}. This means the file \
system event backend (e.g. FSEvents on macOS) is not delivering events. \
watch mode will not work. Try running `turbo daemon clean` and retrying.",
e
);
return;
}
debug!("filewatching ready");
Expand Down
25 changes: 25 additions & 0 deletions crates/turborepo-filewatch/src/optional_watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ impl<T> std::ops::Deref for SomeRef<'_, T> {

#[cfg(test)]
mod test {
use std::time::Duration;

use futures::FutureExt;
use tokio::sync::watch::error::RecvError;

/// Futures have a method that allow you to fetch the value of a future
/// if it is immediately available. This is useful for, for example,
Expand All @@ -72,4 +75,26 @@ mod test {

assert_eq!(*rx.get().now_or_never().unwrap().unwrap(), 42);
}

#[tokio::test]
pub async fn get_returns_error_when_sender_dropped() {
let (tx, mut rx) = super::OptionalWatch::<i32>::new();

// Drop the sender without ever sending a value
drop(tx);

// get() should return RecvError, not hang
let result = rx.get().await;
assert!(matches!(result, Err(RecvError { .. })));
}

#[tokio::test]
pub async fn get_with_timeout_returns_elapsed_when_no_value() {
let (_tx, mut rx) = super::OptionalWatch::<i32>::new();

// The sender is alive but never sends. A timeout should fire
// instead of hanging forever.
let result = tokio::time::timeout(Duration::from_millis(50), rx.get()).await;
assert!(result.is_err(), "should have timed out");
}
}
23 changes: 19 additions & 4 deletions crates/turborepo-lib/src/package_changes_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,10 +281,25 @@ impl Subscriber {
}

async fn watch(mut self, exit_rx: oneshot::Receiver<()>) {
let Ok(mut file_events) = self.file_events_lazy.get().await.map(|r| r.resubscribe()) else {
// if we get here, it means that file watching has not started, so we should
// just report that the package watcher is not available
tracing::debug!("file watching shut down, package watcher not available");
let file_events_result = tokio::time::timeout(
std::time::Duration::from_secs(5),
self.file_events_lazy.get(),
)
.await;
let Ok(mut file_events) = file_events_result
.map_err(|_elapsed| {
tracing::warn!(
"timed out waiting for file watching to become ready after 5s. This usually \
means the daemon's file watcher failed to initialize. Try running `turbo \
daemon clean` and retrying."
);
})
.and_then(|r| {
r.map(|r| r.resubscribe()).map_err(|_| {
tracing::debug!("file watching shut down, package watcher not available");
})
})
else {
return;
};

Expand Down
19 changes: 19 additions & 0 deletions crates/turborepo-lib/src/run/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ pub enum Error {
},
#[error("Daemon connection closed.")]
ConnectionClosed,
#[error(
"Timed out waiting for the daemon's file watcher to become ready. The daemon may be \
having trouble watching your repository. Try running `turbo daemon clean` and retrying."
)]
DaemonFileWatchingTimeout,
#[error("Failed to subscribe to signal handler. Shutting down.")]
NoSignalHandler,
#[error("Watch interrupted due to signal.")]
Expand Down Expand Up @@ -186,6 +191,16 @@ impl WatchClient {

let mut events = client.package_changes().await?;

// Wait for the initial event from the daemon with a timeout.
// The daemon sends a Rediscover event immediately when the stream opens,
// but the stream won't produce anything until the daemon's file watcher
// is ready. If it never becomes ready, we'd hang here forever.
let initial_event = tokio::time::timeout(std::time::Duration::from_secs(10), events.next())
.await
.map_err(|_| Error::DaemonFileWatchingTimeout)?
.ok_or(Error::ConnectionClosed)?;
let initial_event = initial_event?;

let signal_subscriber = self.handler.subscribe().ok_or(Error::NoSignalHandler)?;

// We explicitly use a tokio::sync::Mutex here to avoid deadlocks.
Expand All @@ -195,6 +210,10 @@ impl WatchClient {
let notify_run = Arc::new(Notify::new());
let notify_event = notify_run.clone();

// Process the initial event
Self::handle_change_event(&changed_packages, initial_event.event.unwrap())?;
notify_event.notify_one();

let event_fut = async {
while let Some(event) = events.next().await {
let event = event?;
Expand Down
Loading