@@ -82,6 +82,12 @@ class UnknownStatus(LauncherError):
8282 pass
8383
8484
85+ class NotRunning (LauncherError ):
86+ """Raised when a launcher is no longer running"""
87+
88+ pass
89+
90+
8591class BaseLauncher (LoggingConfigurable ):
8692 """An abstraction for starting, stopping and signaling a process."""
8793
@@ -408,7 +414,10 @@ def from_dict(cls, d, **kwargs):
408414 def _reconstruct_process (self , d ):
409415 """Reconstruct our process"""
410416 if 'pid' in d and d ['pid' ] > 0 :
411- self .process = psutil .Process (d ['pid' ])
417+ try :
418+ self .process = psutil .Process (d ['pid' ])
419+ except psutil .NoSuchProcess as e :
420+ raise NotRunning (f"Process { d ['pid' ]} " )
412421 self ._start_waiting ()
413422
414423 def _wait (self ):
@@ -465,10 +474,18 @@ def start(self):
465474 async def join (self , timeout = None ):
466475 """Wait for the process to exit"""
467476 with ThreadPoolExecutor (1 ) as pool :
477+ wait = partial (self .process .wait , timeout )
468478 try :
469- await asyncio .wrap_future (
470- pool .submit (partial (self .process .wait , timeout ))
471- )
479+ try :
480+ future = pool .submit (wait )
481+ except RuntimeError :
482+ # e.g. called during process shutdown,
483+ # which raises
484+ # RuntimeError: cannot schedule new futures after interpreter shutdown
485+ # Instead, do the blocking call
486+ wait ()
487+ else :
488+ await asyncio .wrap_future (future )
472489 except psutil .TimeoutExpired :
473490 raise TimeoutError (
474491 f"Process { self .pid } did not complete in { timeout } seconds."
@@ -638,8 +655,20 @@ def to_dict(self):
638655 @classmethod
639656 def from_dict (cls , d , ** kwargs ):
640657 self = super ().from_dict (d , ** kwargs )
658+ n = 0
641659 for i , engine_dict in d ['engines' ].items ():
642- self .launchers [i ] = self .launcher_class .from_dict (engine_dict , parent = self )
660+ try :
661+ self .launchers [i ] = self .launcher_class .from_dict (
662+ engine_dict , parent = self
663+ )
664+ except NotRunning as e :
665+ self .log .error (f"Engine { i } not running: { e } " )
666+ else :
667+ n += 1
668+ if n == 0 :
669+ raise NotRunning ("No engines left" )
670+ else :
671+ self .n = n
643672 return self
644673
645674 def start (self , n ):
@@ -1184,9 +1213,17 @@ def wait_one(self, timeout):
11841213
11851214 async def join (self , timeout = None ):
11861215 with ThreadPoolExecutor (1 ) as pool :
1187- await asyncio .wrap_future (
1188- pool .submit (partial (self .wait_one , timeout = timeout ))
1189- )
1216+ wait = partial (self .wait_one , timeout = timeout )
1217+ try :
1218+ future = pool .submit (wait )
1219+ except RuntimeError :
1220+ # e.g. called during process shutdown,
1221+ # which raises
1222+ # RuntimeError: cannot schedule new futures after interpreter shutdown
1223+ # Instead, do the blocking call
1224+ wait ()
1225+ else :
1226+ await asyncio .wrap_future (future )
11901227
11911228 def signal (self , sig ):
11921229 if self .state == 'running' :
0 commit comments