|
1 | 1 | import builtins |
2 | 2 | import types |
3 | 3 | from inspect import getmembers, isbuiltin |
4 | | -from typing import Any, Dict, List |
| 4 | +from typing import Any, Dict, List, Union |
5 | 5 |
|
6 | 6 | from je_auto_control.utils.exception.exception_tags import action_is_null_error, add_command_exception, \ |
7 | 7 | executor_list_error |
|
18 | 18 | from je_auto_control.utils.logging.loggin_instance import auto_control_logger |
19 | 19 | from je_auto_control.utils.package_manager.package_manager_class import package_manager |
20 | 20 | from je_auto_control.utils.project.create_project_structure import create_project_dir |
| 21 | +from je_auto_control.utils.scheduler.extend_apscheduler import scheduler_manager |
21 | 22 | from je_auto_control.utils.shell_process.shell_exec import ShellManager |
22 | 23 | from je_auto_control.utils.start_exe.start_another_process import start_exe |
23 | 24 | from je_auto_control.utils.test_record.record_test_class import record_action_to_list, test_record_instance |
@@ -156,6 +157,16 @@ def execute_files(self, execute_files_list: list) -> List[Dict[str, str]]: |
156 | 157 | execute_detail_list.append(self.execute_action(read_action_json(file))) |
157 | 158 | return execute_detail_list |
158 | 159 |
|
| 160 | + def scheduler_event_trigger( |
| 161 | + self, function: str, id: str = None, args: Union[list, tuple] = None, |
| 162 | + kwargs: dict = None, scheduler_type: str = "nonblocking", wait_type: str = "secondly", |
| 163 | + wait_value: int = 1, **trigger_args: Any) -> None: |
| 164 | + if scheduler_type == "nonblocking": |
| 165 | + scheduler_event = scheduler_manager.nonblocking_scheduler_event_dict.get(wait_type) |
| 166 | + else: |
| 167 | + scheduler_event = scheduler_manager.blocking_scheduler_event_dict.get(wait_type) |
| 168 | + scheduler_event(self.event_dict.get(function), id, args, kwargs, wait_value, **trigger_args) |
| 169 | + |
159 | 170 |
|
160 | 171 | executor = Executor() |
161 | 172 | package_manager.executor = executor |
|
0 commit comments