Skip to content

Commit 46b39e7

Browse files
dmwnzcmeesterscoderabbitai[bot]
authored
feat: pass a bash script to srun rather than the command directly (#37)
Related to snakemake/snakemake-executor-plugin-slurm#379 and matching PR snakemake/snakemake-executor-plugin-slurm#380 <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit * **New Features** * Added a runtime setting to stream job commands as a script to the scheduler via stdin. * **Improvements** * Enhanced logging to include generated script content when streaming is enabled. * Retains existing submission behavior when streaming is disabled. * **Bug Fixes** * More reliable delivery of streamed scripts with improved error detection and reporting. * **Tests** * Tests updated to provide concrete executor settings for the new option. <sub>✏️ Tip: You can customize this high-level summary in your review settings.</sub> <!-- end of auto-generated comment: release notes by coderabbit.ai --> --------- Co-authored-by: Christian Meesters <cmeesters@users.noreply.github.com> Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
1 parent 901c000 commit 46b39e7

File tree

2 files changed

+57
-4
lines changed

2 files changed

+57
-4
lines changed

snakemake_executor_plugin_slurm_jobstep/__init__.py

Lines changed: 54 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,17 @@
77
import socket
88
import subprocess
99
import sys
10+
from dataclasses import dataclass, field
1011
from snakemake_interface_executor_plugins.executors.base import SubmittedJobInfo
1112
from snakemake_interface_executor_plugins.executors.real import RealExecutor
1213
from snakemake_interface_executor_plugins.jobs import (
1314
JobExecutorInterface,
1415
)
15-
from snakemake_interface_executor_plugins.settings import ExecMode, CommonSettings
16+
from snakemake_interface_executor_plugins.settings import (
17+
CommonSettings,
18+
ExecMode,
19+
ExecutorSettingsBase,
20+
)
1621
from snakemake_interface_common.exceptions import WorkflowError
1722

1823

@@ -38,6 +43,25 @@
3843
)
3944

4045

46+
@dataclass
47+
class ExecutorSettings(ExecutorSettingsBase):
48+
"""Settings for the SLURM jobstep executor plugin."""
49+
50+
pass_command_as_script: bool = field(
51+
default=False,
52+
metadata={
53+
"help": (
54+
"Pass to srun the command to be executed as a shell script "
55+
"(fed through stdin) instead of wrapping it in the command line "
56+
"call. Useful when a limit exists on SLURM command line length (ie. "
57+
"max_submit_line_size)."
58+
),
59+
"env_var": False,
60+
"required": False,
61+
},
62+
)
63+
64+
4165
# Required:
4266
# Implementation of your executor
4367
class Executor(RealExecutor):
@@ -58,6 +82,7 @@ def run_job(self, job: JobExecutorInterface):
5882
# snakemake_interface_executor_plugins.executors.base.SubmittedJobInfo.
5983

6084
jobsteps = dict()
85+
srun_script = None
6186
# TODO revisit special handling for group job levels via srun at a later stage
6287
# if job.is_group():
6388

@@ -118,14 +143,40 @@ def run_job(self, job: JobExecutorInterface):
118143

119144
call = "srun -n1 --cpu-bind=q "
120145
call += f" {get_cpu_setting(job, self.gpu_job)} "
121-
call += f" {self.format_job_exec(job)}"
146+
if self.workflow.executor_settings.pass_command_as_script:
147+
# format the job to execute with all the snakemake parameters
148+
# into a script
149+
srun_script = self.format_job_exec(job)
150+
# the process will read the srun script from stdin
151+
call += " sh -s"
152+
else:
153+
call += f" {self.format_job_exec(job)}"
122154

123155
self.logger.debug(f"This job is a group job: {job.is_group()}")
124156
self.logger.debug(f"The call for this job is: {call}")
125157
self.logger.debug(f"Job is running on host: {socket.gethostname()}")
158+
if srun_script is not None:
159+
self.logger.debug(f"The script for this job is: \n{srun_script}")
126160
# this dict is to support the to be implemented feature of oversubscription in
127161
# "ordinary" group jobs.
128-
jobsteps[job] = subprocess.Popen(call, shell=True)
162+
jobsteps[job] = subprocess.Popen(
163+
call, shell=True, text=True, stdin=subprocess.PIPE
164+
)
165+
if srun_script is not None:
166+
try:
167+
# pass the srun bash script via stdin
168+
jobsteps[job].stdin.write(srun_script)
169+
jobsteps[job].stdin.close()
170+
except BrokenPipeError:
171+
# subprocess terminated before reading stdin
172+
self.logger.error(
173+
f"Failed to write script to stdin for job {job}. "
174+
"Subprocess may have terminated prematurely."
175+
)
176+
self.report_job_error(SubmittedJobInfo(job))
177+
raise WorkflowError(
178+
f"Job {job} failed: subprocess terminated before reading script"
179+
)
129180

130181
job_info = SubmittedJobInfo(job)
131182
self.report_job_submission(job_info)

tests/tests.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
import snakemake.common.tests
33
from snakemake_interface_executor_plugins.settings import ExecutorSettingsBase
44

5+
from snakemake_executor_plugin_slurm_jobstep import ExecutorSettings
6+
57

68
class TestWorkflowsBase(snakemake.common.tests.TestWorkflowsLocalStorageBase):
79
__test__ = True
@@ -11,7 +13,7 @@ def get_executor(self) -> str:
1113

1214
def get_executor_settings(self) -> Optional[ExecutorSettingsBase]:
1315
# instatiate ExecutorSettings of this plugin as appropriate
14-
return None
16+
return ExecutorSettings()
1517

1618

1719
# def test_issue_41():

0 commit comments

Comments
 (0)