1
1
use std:: {
2
2
fs,
3
- thread,
4
3
path:: { Path , PathBuf } ,
5
4
sync:: { mpsc, Arc } ,
6
5
time:: Duration ,
7
6
} ;
8
7
use crossbeam_channel:: { Receiver , Sender , unbounded, RecvError , select} ;
9
8
use relative_path:: RelativePathBuf ;
10
- use thread_worker:: WorkerHandle ;
11
9
use walkdir:: WalkDir ;
12
10
use notify:: { DebouncedEvent , RecommendedWatcher , RecursiveMode , Watcher as _Watcher} ;
13
11
@@ -49,8 +47,7 @@ enum ChangeKind {
49
47
const WATCHER_DELAY : Duration = Duration :: from_millis ( 250 ) ;
50
48
51
49
pub ( crate ) struct Worker {
52
- worker : thread_worker:: Worker < Task , TaskResult > ,
53
- worker_handle : WorkerHandle ,
50
+ thread_worker : thread_worker:: Worker < Task , TaskResult > ,
54
51
}
55
52
56
53
impl Worker {
@@ -62,82 +59,79 @@ impl Worker {
62
59
// * we want to read all files from a single thread, to guarantee that
63
60
// we always get fresher versions and never go back in time.
64
61
// * we want to tear down everything neatly during shutdown.
65
- let ( worker , worker_handle ) = thread_worker:: spawn (
62
+ let thread_worker = thread_worker:: Worker :: spawn (
66
63
"vfs" ,
67
64
128 ,
68
65
// This are the channels we use to communicate with outside world.
69
66
// If `input_receiver` is closed we need to tear ourselves down.
70
67
// `output_sender` should not be closed unless the parent died.
71
68
move |input_receiver, output_sender| {
72
- // These are `std` channels notify will send events to
73
- let ( notify_sender, notify_receiver) = mpsc:: channel ( ) ;
69
+ // Make sure that the destruction order is
70
+ //
71
+ // * notify_sender
72
+ // * _thread
73
+ // * watcher_sender
74
+ //
75
+ // this is required to avoid deadlocks.
76
+
74
77
// These are the corresponding crossbeam channels
75
78
let ( watcher_sender, watcher_receiver) = unbounded ( ) ;
79
+ let _thread;
80
+ {
81
+ // These are `std` channels notify will send events to
82
+ let ( notify_sender, notify_receiver) = mpsc:: channel ( ) ;
76
83
77
- let mut watcher = notify:: watcher ( notify_sender, WATCHER_DELAY )
78
- . map_err ( |e| log:: error!( "failed to spawn notify {}" , e) )
79
- . ok ( ) ;
80
- // Start a silly thread to transform between two channels
81
- let thread = thread :: spawn ( move || {
82
- notify_receiver
83
- . into_iter ( )
84
- . for_each ( |event| convert_notify_event ( event, & watcher_sender) )
85
- } ) ;
84
+ let mut watcher = notify:: watcher ( notify_sender, WATCHER_DELAY )
85
+ . map_err ( |e| log:: error!( "failed to spawn notify {}" , e) )
86
+ . ok ( ) ;
87
+ // Start a silly thread to transform between two channels
88
+ _thread = thread_worker :: ScopedThread :: spawn ( "notify-convertor" , move || {
89
+ notify_receiver
90
+ . into_iter ( )
91
+ . for_each ( |event| convert_notify_event ( event, & watcher_sender) )
92
+ } ) ;
86
93
87
- // Process requests from the called or notifications from
88
- // watcher until the caller says stop.
89
- loop {
90
- select ! {
91
- // Received request from the caller. If this channel is
92
- // closed, we should shutdown everything.
93
- recv( input_receiver) -> t => match t {
94
- Err ( RecvError ) => {
95
- drop( input_receiver) ;
96
- break
94
+ // Process requests from the called or notifications from
95
+ // watcher until the caller says stop.
96
+ loop {
97
+ select ! {
98
+ // Received request from the caller. If this channel is
99
+ // closed, we should shutdown everything.
100
+ recv( input_receiver) -> t => match t {
101
+ Err ( RecvError ) => {
102
+ drop( input_receiver) ;
103
+ break
104
+ } ,
105
+ Ok ( Task :: AddRoot { root, config } ) => {
106
+ watch_root( watcher. as_mut( ) , & output_sender, root, Arc :: clone( & config) ) ;
107
+ }
108
+ } ,
109
+ // Watcher send us changes. If **this** channel is
110
+ // closed, the watcher has died, which indicates a bug
111
+ // -- escalate!
112
+ recv( watcher_receiver) -> event => match event {
113
+ Err ( RecvError ) => panic!( "watcher is dead" ) ,
114
+ Ok ( ( path, change) ) => {
115
+ handle_change( watcher. as_mut( ) , & output_sender, & * roots, path, change) ;
116
+ }
97
117
} ,
98
- Ok ( Task :: AddRoot { root, config } ) => {
99
- watch_root( watcher. as_mut( ) , & output_sender, root, Arc :: clone( & config) ) ;
100
- }
101
- } ,
102
- // Watcher send us changes. If **this** channel is
103
- // closed, the watcher has died, which indicates a bug
104
- // -- escalate!
105
- recv( watcher_receiver) -> event => match event {
106
- Err ( RecvError ) => panic!( "watcher is dead" ) ,
107
- Ok ( ( path, change) ) => {
108
- handle_change( watcher. as_mut( ) , & output_sender, & * roots, path, change) ;
109
- }
110
- } ,
118
+ }
111
119
}
112
120
}
113
- // Stopped the watcher
114
- drop ( watcher. take ( ) ) ;
115
121
// Drain pending events: we are not interested in them anyways!
116
122
watcher_receiver. into_iter ( ) . for_each ( |_| ( ) ) ;
117
-
118
- let res = thread. join ( ) ;
119
- match & res {
120
- Ok ( ( ) ) => log:: info!( "... Watcher terminated with ok" ) ,
121
- Err ( _) => log:: error!( "... Watcher terminated with err" ) ,
122
- }
123
- res. unwrap ( ) ;
124
123
} ,
125
124
) ;
126
125
127
- Worker { worker , worker_handle }
126
+ Worker { thread_worker }
128
127
}
129
128
130
129
pub ( crate ) fn sender ( & self ) -> & Sender < Task > {
131
- & self . worker . inp
130
+ & self . thread_worker . sender ( )
132
131
}
133
132
134
133
pub ( crate ) fn receiver ( & self ) -> & Receiver < TaskResult > {
135
- & self . worker . out
136
- }
137
-
138
- pub ( crate ) fn shutdown ( self ) -> thread:: Result < ( ) > {
139
- let _ = self . worker . shutdown ( ) ;
140
- self . worker_handle . shutdown ( )
134
+ & self . thread_worker . receiver ( )
141
135
}
142
136
}
143
137
0 commit comments