1010use std:: {
1111 fs,
1212 path:: { Component , Path } ,
13+ sync:: atomic:: AtomicUsize ,
1314} ;
1415
1516use crossbeam_channel:: { never, select, unbounded, Receiver , Sender } ;
16- use notify:: { Config , RecommendedWatcher , RecursiveMode , Watcher } ;
17+ use notify:: { Config , EventKind , RecommendedWatcher , RecursiveMode , Watcher } ;
1718use paths:: { AbsPath , AbsPathBuf , Utf8PathBuf } ;
18- use vfs:: loader;
19+ use rayon:: iter:: { IndexedParallelIterator as _, IntoParallelIterator as _, ParallelIterator } ;
20+ use vfs:: loader:: { self , LoadingProgress } ;
1921use walkdir:: WalkDir ;
2022
2123#[ derive( Debug ) ]
@@ -104,35 +106,61 @@ impl NotifyActor {
104106 let config_version = config. version ;
105107
106108 let n_total = config. load . len ( ) ;
107- self . send ( loader:: Message :: Progress {
109+ self . watched_entries . clear ( ) ;
110+
111+ let send = |msg| ( self . sender ) ( msg) ;
112+ send ( loader:: Message :: Progress {
108113 n_total,
109- n_done : None ,
114+ n_done : LoadingProgress :: Started ,
110115 config_version,
111116 dir : None ,
112117 } ) ;
113118
114- self . watched_entries . clear ( ) ;
115-
116- for ( i, entry) in config. load . into_iter ( ) . enumerate ( ) {
117- let watch = config. watch . contains ( & i) ;
118- if watch {
119- self . watched_entries . push ( entry. clone ( ) ) ;
119+ let ( entry_tx, entry_rx) = unbounded ( ) ;
120+ let ( watch_tx, watch_rx) = unbounded ( ) ;
121+ let processed = AtomicUsize :: new ( 0 ) ;
122+ config. load . into_par_iter ( ) . enumerate ( ) . for_each ( move |( i, entry) | {
123+ let do_watch = config. watch . contains ( & i) ;
124+ if do_watch {
125+ _ = entry_tx. send ( entry. clone ( ) ) ;
120126 }
121- let files =
122- self . load_entry ( entry, watch, |file| loader:: Message :: Progress {
123- n_total,
124- n_done : Some ( i) ,
125- dir : Some ( file) ,
126- config_version,
127- } ) ;
128- self . send ( loader:: Message :: Loaded { files } ) ;
129- self . send ( loader:: Message :: Progress {
127+ let files = Self :: load_entry (
128+ |f| _ = watch_tx. send ( f. to_owned ( ) ) ,
129+ entry,
130+ do_watch,
131+ |file| {
132+ send ( loader:: Message :: Progress {
133+ n_total,
134+ n_done : LoadingProgress :: Progress (
135+ processed. load ( std:: sync:: atomic:: Ordering :: Relaxed ) ,
136+ ) ,
137+ dir : Some ( file) ,
138+ config_version,
139+ } )
140+ } ,
141+ ) ;
142+ send ( loader:: Message :: Loaded { files } ) ;
143+ send ( loader:: Message :: Progress {
130144 n_total,
131- n_done : Some ( i + 1 ) ,
145+ n_done : LoadingProgress :: Progress (
146+ processed. fetch_add ( 1 , std:: sync:: atomic:: Ordering :: AcqRel ) + 1 ,
147+ ) ,
132148 config_version,
133149 dir : None ,
134150 } ) ;
151+ } ) ;
152+ for path in watch_rx {
153+ self . watch ( & path) ;
154+ }
155+ for entry in entry_rx {
156+ self . watched_entries . push ( entry) ;
135157 }
158+ self . send ( loader:: Message :: Progress {
159+ n_total,
160+ n_done : LoadingProgress :: Finished ,
161+ config_version,
162+ dir : None ,
163+ } ) ;
136164 }
137165 Message :: Invalidate ( path) => {
138166 let contents = read ( path. as_path ( ) ) ;
@@ -142,60 +170,67 @@ impl NotifyActor {
142170 } ,
143171 Event :: NotifyEvent ( event) => {
144172 if let Some ( event) = log_notify_error ( event) {
145- let files = event
146- . paths
147- . into_iter ( )
148- . filter_map ( |path| {
149- Some (
150- AbsPathBuf :: try_from ( Utf8PathBuf :: from_path_buf ( path) . ok ( ) ?)
173+ if let EventKind :: Create ( _) | EventKind :: Modify ( _) | EventKind :: Remove ( _) =
174+ event. kind
175+ {
176+ let files = event
177+ . paths
178+ . into_iter ( )
179+ . filter_map ( |path| {
180+ Some (
181+ AbsPathBuf :: try_from (
182+ Utf8PathBuf :: from_path_buf ( path) . ok ( ) ?,
183+ )
151184 . expect ( "path is absolute" ) ,
152- )
153- } )
154- . filter_map ( |path| {
155- let meta = fs:: metadata ( & path) . ok ( ) ?;
156- if meta. file_type ( ) . is_dir ( )
157- && self
185+ )
186+ } )
187+ . filter_map ( |path| {
188+ let meta = fs:: metadata ( & path) . ok ( ) ?;
189+ if meta. file_type ( ) . is_dir ( )
190+ && self
191+ . watched_entries
192+ . iter ( )
193+ . any ( |entry| entry. contains_dir ( & path) )
194+ {
195+ self . watch ( path. as_ref ( ) ) ;
196+ return None ;
197+ }
198+
199+ if !meta. file_type ( ) . is_file ( ) {
200+ return None ;
201+ }
202+ if !self
158203 . watched_entries
159204 . iter ( )
160- . any ( |entry| entry. contains_dir ( & path) )
161- {
162- self . watch ( path) ;
163- return None ;
164- }
165-
166- if !meta. file_type ( ) . is_file ( ) {
167- return None ;
168- }
169- if !self
170- . watched_entries
171- . iter ( )
172- . any ( |entry| entry. contains_file ( & path) )
173- {
174- return None ;
175- }
176-
177- let contents = read ( & path) ;
178- Some ( ( path, contents) )
179- } )
180- . collect ( ) ;
181- self . send ( loader:: Message :: Changed { files } ) ;
205+ . any ( |entry| entry. contains_file ( & path) )
206+ {
207+ return None ;
208+ }
209+
210+ let contents = read ( & path) ;
211+ Some ( ( path, contents) )
212+ } )
213+ . collect ( ) ;
214+ self . send ( loader:: Message :: Changed { files } ) ;
215+ }
182216 }
183217 }
184218 }
185219 }
186220 }
221+
187222 fn load_entry (
188- & mut self ,
223+ mut watch : impl FnMut ( & Path ) ,
189224 entry : loader:: Entry ,
190- watch : bool ,
191- make_message : impl Fn ( AbsPathBuf ) -> loader :: Message ,
225+ do_watch : bool ,
226+ send_message : impl Fn ( AbsPathBuf ) ,
192227 ) -> Vec < ( AbsPathBuf , Option < Vec < u8 > > ) > {
193228 match entry {
194229 loader:: Entry :: Files ( files) => files
195230 . into_iter ( )
196231 . map ( |file| {
197- if watch {
198- self . watch ( file. clone ( ) ) ;
232+ if do_watch {
233+ watch ( file. as_ref ( ) ) ;
199234 }
200235 let contents = read ( file. as_path ( ) ) ;
201236 ( file, contents)
@@ -205,15 +240,15 @@ impl NotifyActor {
205240 let mut res = Vec :: new ( ) ;
206241
207242 for root in & dirs. include {
208- self . send ( make_message ( root. clone ( ) ) ) ;
243+ send_message ( root. clone ( ) ) ;
209244 let walkdir =
210245 WalkDir :: new ( root) . follow_links ( true ) . into_iter ( ) . filter_entry ( |entry| {
211246 if !entry. file_type ( ) . is_dir ( ) {
212247 return true ;
213248 }
214249 let path = entry. path ( ) ;
215250
216- if path_is_parent_symlink ( path) {
251+ if path_might_be_cyclic ( path) {
217252 return false ;
218253 }
219254
@@ -230,10 +265,10 @@ impl NotifyActor {
230265 )
231266 . ok ( ) ?;
232267 if depth < 2 && is_dir {
233- self . send ( make_message ( abs_path. clone ( ) ) ) ;
268+ send_message ( abs_path. clone ( ) ) ;
234269 }
235- if is_dir && watch {
236- self . watch ( abs_path. clone ( ) ) ;
270+ if is_dir && do_watch {
271+ watch ( abs_path. as_ref ( ) ) ;
237272 }
238273 if !is_file {
239274 return None ;
@@ -255,12 +290,13 @@ impl NotifyActor {
255290 }
256291 }
257292
258- fn watch ( & mut self , path : AbsPathBuf ) {
293+ fn watch ( & mut self , path : & Path ) {
259294 if let Some ( ( watcher, _) ) = & mut self . watcher {
260- log_notify_error ( watcher. watch ( path. as_ref ( ) , RecursiveMode :: NonRecursive ) ) ;
295+ log_notify_error ( watcher. watch ( path, RecursiveMode :: NonRecursive ) ) ;
261296 }
262297 }
263- fn send ( & mut self , msg : loader:: Message ) {
298+
299+ fn send ( & self , msg : loader:: Message ) {
264300 ( self . sender ) ( msg) ;
265301 }
266302}
@@ -279,7 +315,7 @@ fn log_notify_error<T>(res: notify::Result<T>) -> Option<T> {
279315/// heuristic is not sufficient to catch all symlink cycles (it's
280316/// possible to construct cycle using two or more symlinks), but it
281317/// catches common cases.
282- fn path_is_parent_symlink ( path : & Path ) -> bool {
318+ fn path_might_be_cyclic ( path : & Path ) -> bool {
283319 let Ok ( destination) = std:: fs:: read_link ( path) else {
284320 return false ;
285321 } ;
0 commit comments