|
1 | 1 | use std::{
|
2 | 2 | fs,
|
3 |
| - thread, |
4 | 3 | path::{Path, PathBuf},
|
5 | 4 | sync::{mpsc, Arc},
|
6 | 5 | time::Duration,
|
7 | 6 | };
|
8 |
| -use crossbeam_channel::{Receiver, Sender, unbounded, RecvError, select}; |
| 7 | +use crossbeam_channel::{Sender, unbounded, RecvError, select}; |
9 | 8 | use relative_path::RelativePathBuf;
|
10 |
| -use thread_worker::WorkerHandle; |
11 | 9 | use walkdir::WalkDir;
|
12 | 10 | use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as _Watcher};
|
13 | 11 |
|
@@ -48,37 +46,42 @@ enum ChangeKind {
|
48 | 46 |
|
49 | 47 | const WATCHER_DELAY: Duration = Duration::from_millis(250);
|
50 | 48 |
|
51 |
| -pub(crate) struct Worker { |
52 |
| - worker: thread_worker::Worker<Task, TaskResult>, |
53 |
| - worker_handle: WorkerHandle, |
54 |
| -} |
55 |
| - |
56 |
| -impl Worker { |
57 |
| - pub(crate) fn start(roots: Arc<Roots>) -> Worker { |
58 |
| - // This is a pretty elaborate setup of threads & channels! It is |
59 |
| - // explained by the following concerns: |
60 |
| - // * we need to burn a thread translating from notify's mpsc to |
61 |
| - // crossbeam_channel. |
62 |
| - // * we want to read all files from a single thread, to guarantee that |
63 |
| - // we always get fresher versions and never go back in time. |
64 |
| - // * we want to tear down everything neatly during shutdown. |
65 |
| - let (worker, worker_handle) = thread_worker::spawn( |
66 |
| - "vfs", |
67 |
| - 128, |
68 |
| - // This are the channels we use to communicate with outside world. |
69 |
| - // If `input_receiver` is closed we need to tear ourselves down. |
70 |
| - // `output_sender` should not be closed unless the parent died. |
71 |
| - move |input_receiver, output_sender| { |
| 49 | +pub(crate) type Worker = thread_worker::Worker<Task, TaskResult>; |
| 50 | +pub(crate) fn start(roots: Arc<Roots>) -> Worker { |
| 51 | + // This is a pretty elaborate setup of threads & channels! It is |
| 52 | + // explained by the following concerns: |
| 53 | + // * we need to burn a thread translating from notify's mpsc to |
| 54 | + // crossbeam_channel. |
| 55 | + // * we want to read all files from a single thread, to guarantee that |
| 56 | + // we always get fresher versions and never go back in time. |
| 57 | + // * we want to tear down everything neatly during shutdown. |
| 58 | + Worker::spawn( |
| 59 | + "vfs", |
| 60 | + 128, |
| 61 | + // This are the channels we use to communicate with outside world. |
| 62 | + // If `input_receiver` is closed we need to tear ourselves down. |
| 63 | + // `output_sender` should not be closed unless the parent died. |
| 64 | + move |input_receiver, output_sender| { |
| 65 | + // Make sure that the destruction order is |
| 66 | + // |
| 67 | + // * notify_sender |
| 68 | + // * _thread |
| 69 | + // * watcher_sender |
| 70 | + // |
| 71 | + // this is required to avoid deadlocks. |
| 72 | + |
| 73 | + // These are the corresponding crossbeam channels |
| 74 | + let (watcher_sender, watcher_receiver) = unbounded(); |
| 75 | + let _thread; |
| 76 | + { |
72 | 77 | // These are `std` channels notify will send events to
|
73 | 78 | let (notify_sender, notify_receiver) = mpsc::channel();
|
74 |
| - // These are the corresponding crossbeam channels |
75 |
| - let (watcher_sender, watcher_receiver) = unbounded(); |
76 | 79 |
|
77 | 80 | let mut watcher = notify::watcher(notify_sender, WATCHER_DELAY)
|
78 | 81 | .map_err(|e| log::error!("failed to spawn notify {}", e))
|
79 | 82 | .ok();
|
80 | 83 | // Start a silly thread to transform between two channels
|
81 |
| - let thread = thread::spawn(move || { |
| 84 | + _thread = thread_worker::ScopedThread::spawn("notify-convertor", move || { |
82 | 85 | notify_receiver
|
83 | 86 | .into_iter()
|
84 | 87 | .for_each(|event| convert_notify_event(event, &watcher_sender))
|
@@ -110,35 +113,11 @@ impl Worker {
|
110 | 113 | },
|
111 | 114 | }
|
112 | 115 | }
|
113 |
| - // Stopped the watcher |
114 |
| - drop(watcher.take()); |
115 |
| - // Drain pending events: we are not interested in them anyways! |
116 |
| - watcher_receiver.into_iter().for_each(|_| ()); |
117 |
| - |
118 |
| - let res = thread.join(); |
119 |
| - match &res { |
120 |
| - Ok(()) => log::info!("... Watcher terminated with ok"), |
121 |
| - Err(_) => log::error!("... Watcher terminated with err"), |
122 |
| - } |
123 |
| - res.unwrap(); |
124 |
| - }, |
125 |
| - ); |
126 |
| - |
127 |
| - Worker { worker, worker_handle } |
128 |
| - } |
129 |
| - |
130 |
| - pub(crate) fn sender(&self) -> &Sender<Task> { |
131 |
| - &self.worker.inp |
132 |
| - } |
133 |
| - |
134 |
| - pub(crate) fn receiver(&self) -> &Receiver<TaskResult> { |
135 |
| - &self.worker.out |
136 |
| - } |
137 |
| - |
138 |
| - pub(crate) fn shutdown(self) -> thread::Result<()> { |
139 |
| - let _ = self.worker.shutdown(); |
140 |
| - self.worker_handle.shutdown() |
141 |
| - } |
| 116 | + } |
| 117 | + // Drain pending events: we are not interested in them anyways! |
| 118 | + watcher_receiver.into_iter().for_each(|_| ()); |
| 119 | + }, |
| 120 | + ) |
142 | 121 | }
|
143 | 122 |
|
144 | 123 | fn watch_root(
|
|
0 commit comments