-
Notifications
You must be signed in to change notification settings - Fork 23
Expand file tree
/
Copy pathsingle_task_snakemake.py
More file actions
376 lines (276 loc) · 10.1 KB
/
single_task_snakemake.py
File metadata and controls
376 lines (276 loc) · 10.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
import json
import os
import shlex
import sys
from itertools import chain
from pathlib import Path
from textwrap import dedent
from typing import Dict, Optional, Set
from urllib.parse import urlparse, urlunparse
import snakemake
import snakemake.workflow
from snakemake.deployment import singularity
from snakemake.parser import (
INDENT,
Benchmark,
Include,
Input,
Log,
Output,
Params,
Python,
Rule,
Ruleorder,
Shell,
)
from snakemake.rules import Rule as RRule
from snakemake.workflow import Workflow as WWorkflow
sys.stdout.reconfigure(line_buffering=True)
sys.stderr.reconfigure(line_buffering=True)
def eprint(x: str) -> None:
print(x, file=sys.stderr)
print_compilation = os.environ.get("LATCH_PRINT_COMPILATION", False)
if print_compilation == "1":
print_compilation = True
data = json.loads(os.environ["LATCH_SNAKEMAKE_DATA"])
rules = data["rules"]
outputs = data["outputs"]
overwrite_config = data.get("overwrite_config", {})
old_workflow_init = WWorkflow.__init__
def new_init(self: WWorkflow, *args, **kwargs):
kwargs["overwrite_config"] = overwrite_config
old_workflow_init(self, *args, **kwargs)
WWorkflow.__init__ = new_init
def eprint_named_list(xs):
eprint(" Positional:")
for x in xs["positional"]:
eprint(f" {repr(x)}")
eprint(" Keyword:")
for k, v in xs["keyword"].items():
eprint(f" {k}={repr(v)}")
eprint("\n>>><<<\n")
eprint("Using LATCH_SNAKEMAKE_DATA:")
for rule in rules:
rule_data = rules[rule]
eprint(f" {rule}:")
eprint(" Inputs:")
eprint_named_list(rule_data["inputs"])
eprint(" Outputs:")
eprint_named_list(rule_data["outputs"])
eprint(" Params:")
eprint_named_list(rule_data["params"])
eprint(" Benchmark:")
eprint(f" {rule_data['benchmark']}")
eprint(" Log:")
eprint(f" {rule_data['log']}")
eprint(" Shellcmd:")
eprint(f" {rule_data['shellcmd']}")
eprint("\nExpected outputs:")
for x in outputs:
eprint(repr(x))
eprint("\n>>><<<\n")
# Add a custom entrypoint rule
_real_rule_start = Rule.start
def rule_start(self, aux=""):
prefix = ""
if self.rulename in rules:
outputs_str = ",\n".join(f" {repr(x)}" for x in outputs)
prefix = dedent(f"""
@workflow.rule(name='latch_entrypoint', lineno=1, snakefile='workflow/Snakefile')
@workflow.input(
__outputs__
)
@workflow.norun()
@workflow.run
def __rule_latch_entrypoint(input, output, params, wildcards, threads, resources, log, version, rule, conda_env, container_img, singularity_args, use_singularity, env_modules, bench_record, jobid, is_shell, bench_iteration, cleanup_scripts, shadow_dir, edit_notebook, conda_base_path, basedir, runtime_sourcecache_path, __is_snakemake_rule_func=True):
pass
""").replace("__outputs__", outputs_str)
yield prefix + next(_real_rule_start(self, aux))
Rule.start = rule_start
def render_annotated_str(x) -> str:
if not isinstance(x, dict):
return repr(x)
value = x["value"]
flags = dict(x["flags"])
res = repr(value)
if len(flags) > 1:
raise RuntimeError(f"can only have one flag for {res} but found: {repr(flags)}")
if "directory" in flags:
res = f"directory({res})"
elif "report" in flags:
report_vals = flags.get("report", False)
res = (
f"report({res}, caption={repr(report_vals['caption'])},"
f" category={report_vals['category']})"
)
elif "temp" in flags:
# A temporary modifier is no different from a normal file as all files
# are deleted on Latch after a job completes.
# https://snakemake.readthedocs.io/en/stable/snakefiles/rules.html#protected-and-temporary-files
del flags["temp"]
return res
def render_annotated_str_list(xs) -> str:
if not isinstance(xs, list):
return render_annotated_str(xs)
els = ", ".join(render_annotated_str(x) for x in xs)
return f"[{els}]"
def emit_overrides(self, token):
cur_data = rules[self.rulename]
eprint(f"\nOverriding {self.rulename} {self.__class__.__name__}")
if isinstance(self, Input):
xs = cur_data["inputs"]
elif isinstance(self, Output):
xs = cur_data["outputs"]
elif isinstance(self, Params):
xs = cur_data["params"]
elif isinstance(self, Benchmark):
xs = {"positional": [cur_data["benchmark"]], "keyword": {}}
elif isinstance(self, Log):
xs = {"positional": [cur_data["log"]], "keyword": {}}
else:
raise ValueError(f"tried to emit overrides for unknown state: {type(self)}")
if (
isinstance(self, Output)
and len(xs["positional"]) > 0
and xs["positional"][0].get("flags") is not None
and "multiext" in xs["positional"][0].get("flags")
):
filename = repr(xs["positional"][0]["flags"]["multiext"])
exts = [repr("." + x["value"].split(".")[-1]) for x in xs["positional"]]
positional_data = (f"multiext({filename},{','.join(exts)})",)
else:
positional_data = (render_annotated_str_list(x) for x in xs["positional"])
modifier_fn = render_annotated_str_list
if isinstance(self, Params):
modifier_fn = repr
keyword_data = (f"{k}={modifier_fn(v)}" for k, v in xs["keyword"].items())
data = chain(positional_data, keyword_data)
for x in data:
eprint(f" {x}")
yield x, token
yield ",", token
yield "\n", token
# we'll need to re-indent the commented-out originals too
yield INDENT * self.base_indent, token
emitted_overrides_per_type: Dict[str, Set[str]] = {}
def skipping_block_content(self, token):
if self.rulename not in rules:
return
emitted_overrides = emitted_overrides_per_type.setdefault(
type(self).__name__, set()
)
if self.rulename in emitted_overrides:
return
yield from emit_overrides(self, token)
emitted_overrides.add(self.rulename)
def block_content_with_print_compilation(self, token):
if print_compilation:
yield f"{token.string}, print_compilation=True", token
else:
yield token.string, token
def empty_generator(self, token):
return
yield
Input.block_content = skipping_block_content
Output.block_content = skipping_block_content
Params.block_content = skipping_block_content
Benchmark.block_content = skipping_block_content
Log.block_content = skipping_block_content
Ruleorder.block_content = empty_generator
Include.block_content = block_content_with_print_compilation
class SkippingRule(Rule):
def start(self, aux=""):
if self.rulename not in rules:
# Rules can be nested in conditional statements:
#
# if (<condition>):
# rule A:
# <stuff>
#
# We want correct python code if we remove them.
yield "..."
return
yield from super().start(aux)
def end(self):
if self.rulename not in rules:
return
yield from super().end()
def block_content(self, token):
if self.rulename not in rules:
return
yield from super().block_content(token)
class SkippingCheckpoint(SkippingRule):
def start(self):
yield from super().start(aux=", checkpoint=True")
Python.subautomata["rule"] = SkippingRule
Python.subautomata["checkpoint"] = SkippingCheckpoint
class ReplacingShell(Shell):
def __init__(self, snakefile, rulename, base_indent=0, dedent=0, root=True):
if rulename in rules:
cmd: str = rules[rulename]["shellcmd"]
self.overwrite_cmd = cmd.replace("{", "{{").replace("}", "}}")
super().__init__(snakefile, rulename, base_indent, dedent, root)
SkippingRule.subautomata["shell"] = ReplacingShell
def get_wildcards(self, requested_output, wildcards_dict=None):
return wildcards_dict
RRule.get_wildcards = get_wildcards
def get_docker_cmd(
img_path: str,
cmd: str,
args: str = "",
quiet: bool = False,
envvars: Optional[Dict[str, str]] = None,
shell_executable: Optional[str] = None,
container_workdir: Optional[str] = None,
is_python_script: bool = False,
) -> str:
if shell_executable is None:
shell_executable = "sh"
else:
# Ensure to just use the name of the executable, not a path,
# because we cannot be sure where it is located in the container.
shell_executable = Path(shell_executable).name
workdir = container_workdir if container_workdir is not None else "/root"
if '"' in workdir:
raise ValueError("container_workdir cannot contain double quotes")
docker_cmd = ["docker", "run"]
if envvars is not None:
for k, v in envvars.items():
docker_cmd.extend(["--env", f"{k}={v}"])
if is_python_script:
# mount host snakemake module into container
docker_cmd.extend([
"--mount",
f'type=bind,"src={singularity.SNAKEMAKE_SEARCHPATH}","target={singularity.SNAKEMAKE_MOUNTPOINT}"',
])
# support for containers currently only works with relative input and output
# paths. This preserves the behavior before the monkey path
docker_cmd.extend([
"--workdir",
workdir,
"--mount",
f'type=bind,src=.,"target={workdir}"',
])
if args != "":
docker_cmd.extend(shlex.split(args))
docker_cmd.extend([
img_path,
shell_executable,
"-c",
cmd,
])
return shlex.join(docker_cmd)
@property
def docker_path(self):
parsed = urlparse(self.url)
if parsed.scheme != "" and parsed.scheme != "docker":
raise ValueError("Only docker images are supported")
path = urlunparse(parsed._replace(scheme=""))
return path[2:] if path.startswith("//") else path
# Monkey-patch snakemake to use docker container runtime instead of singularity
# because singularity does not work inside sysbox.
singularity.Image.pull = lambda self, dry_run: None
singularity.Image.path = docker_path
singularity.shellcmd = get_docker_cmd
# Run snakemake
snakemake.main()