Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 0 additions & 113 deletions crates/turborepo-engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,119 +387,6 @@ impl<T: TaskDefinitionInfo + Clone> Engine<Built, T> {
self
}

/// Creates an `Engine` with only interruptible tasks, i.e. non-persistent
/// tasks and persistent tasks that are allowed to be interrupted
pub fn create_engine_for_interruptible_tasks(&self) -> Self {
let new_graph = self.task_graph.filter_map(
|node_idx, node| match &self.task_graph[node_idx] {
TaskNode::Task(task) => {
let def = self
.task_definitions
.get(task)
.expect("task should have definition");

if !def.persistent() || def.interruptible() {
Some(node.clone())
} else {
None
}
}
TaskNode::Root => Some(node.clone()),
},
|_, _| Some(()),
);

let root_index = new_graph
.node_indices()
.find(|index| new_graph[*index] == TaskNode::Root)
.expect("root node should be present");

let task_lookup: HashMap<_, _> = new_graph
.node_indices()
.filter_map(|index| {
let task = new_graph
.node_weight(index)
.expect("node index should be present");
match task {
TaskNode::Root => None,
TaskNode::Task(task) => Some((task.clone(), index)),
}
})
.collect();

Engine {
marker: std::marker::PhantomData,
root_index,
task_graph: new_graph,
task_lookup,
task_definitions: self.task_definitions.clone(),
task_locations: self.task_locations.clone(),
package_tasks: self.package_tasks.clone(),
has_non_interruptible_tasks: false,
}
}

/// Creates an `Engine` that is only the tasks that are not interruptible,
/// i.e. persistent and not allowed to be restarted
pub fn create_engine_for_non_interruptible_tasks(&self) -> Self {
let mut new_graph = self.task_graph.filter_map(
|node_idx, node| match &self.task_graph[node_idx] {
TaskNode::Task(task) => {
let def = self
.task_definitions
.get(task)
.expect("task should have definition");

if def.persistent() && !def.interruptible() {
Some(node.clone())
} else {
None
}
}
TaskNode::Root => Some(node.clone()),
},
|_, _| Some(()),
);

let root_index = new_graph
.node_indices()
.find(|index| new_graph[*index] == TaskNode::Root)
.expect("root node should be present");

// Connect persistent tasks to root
for index in new_graph.node_indices() {
if new_graph[index] == TaskNode::Root {
continue;
}

new_graph.add_edge(index, root_index, ());
}

let task_lookup: HashMap<_, _> = new_graph
.node_indices()
.filter_map(|index| {
let task = new_graph
.node_weight(index)
.expect("node index should be present");
match task {
TaskNode::Root => None,
TaskNode::Task(task) => Some((task.clone(), index)),
}
})
.collect();

Engine {
marker: std::marker::PhantomData,
root_index,
task_graph: new_graph,
task_lookup,
task_definitions: self.task_definitions.clone(),
task_locations: self.task_locations.clone(),
package_tasks: self.package_tasks.clone(),
has_non_interruptible_tasks: true,
}
}

pub fn dependencies(&self, task_id: &TaskId) -> Option<Vec<&TaskNode>> {
self.neighbors(task_id, petgraph::Direction::Outgoing)
}
Expand Down
49 changes: 0 additions & 49 deletions crates/turborepo-lib/src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -428,55 +428,6 @@ mod test {
assert!(engine.validate(&graph, 1, UIMode::Stream, true).is_err());
}

#[tokio::test]
async fn test_prune_persistent_tasks() {
// Verifies that we can prune the `Engine` to include only the persistent tasks
// or only the non-persistent tasks.

let mut engine: Engine<Building> = Engine::new();

// add two packages with a persistent build task
for package in ["a", "b"] {
let build_task_id = TaskId::new(package, "build");
let dev_task_id = TaskId::new(package, "dev");

engine.get_index(&build_task_id);
engine.add_definition(build_task_id.clone(), TaskDefinition::default());

engine.get_index(&dev_task_id);
engine.add_definition(
dev_task_id,
TaskDefinition {
persistent: true,
task_dependencies: vec![Spanned::new(TaskName::from(build_task_id))],
..Default::default()
},
);
}

let engine = engine.seal();

let non_interruptible_tasks_engine = engine.create_engine_for_non_interruptible_tasks();
for node in non_interruptible_tasks_engine.tasks() {
if let TaskNode::Task(task_id) = node {
let def = non_interruptible_tasks_engine
.task_definition(task_id)
.expect("task should have definition");
assert!(def.persistent, "task should be persistent");
}
}

let interruptible_tasks_engine = engine.create_engine_for_interruptible_tasks();
for node in interruptible_tasks_engine.tasks() {
if let TaskNode::Task(task_id) = node {
let def = interruptible_tasks_engine
.task_definition(task_id)
.expect("task should have definition");
assert!(!def.persistent, "task should not be persistent");
}
}
}

#[tokio::test]
async fn test_get_subgraph_for_package() {
// Verifies that we can prune the `Engine` to include only the persistent tasks
Expand Down
25 changes: 0 additions & 25 deletions crates/turborepo-lib/src/run/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,6 @@ type WuiResult = UIResult<WebUISender>;
type TuiResult = UIResult<TuiSender>;

impl Run {
fn has_non_interruptible_tasks(&self) -> bool {
self.engine.has_non_interruptible_tasks
}
/// Emit run prelude through `turborepo_log`. In stream mode,
/// `TerminalSink` writes these to stdout; in TUI mode, `TuiSink`
/// captures them for the log panel.
Expand Down Expand Up @@ -244,28 +241,6 @@ impl Run {
&self.root_turbo_json
}

pub fn create_run_for_non_interruptible_tasks(&self) -> Self {
let mut new_run = Self {
// ProcessManager is shared via an `Arc`,
// so we want to explicitly recreate it instead of cloning
processes: ProcessManager::new(self.processes.use_pty()),
..self.clone()
};

let new_engine = new_run.engine.create_engine_for_non_interruptible_tasks();
new_run.engine = Arc::new(new_engine);

new_run
}

pub fn create_run_for_interruptible_tasks(&self) -> Self {
let mut new_run = self.clone();
let new_engine = new_run.engine.create_engine_for_interruptible_tasks();
new_run.engine = Arc::new(new_engine);

new_run
}

// Produces the transitive closure of the filtered packages,
// i.e. the packages relevant for this run.
#[instrument(skip(self), ret)]
Expand Down
Loading
Loading