Coverage for src/fsl_pipe/job.py: 84%
542 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-14 13:47 +0100
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-14 13:47 +0100
1"""Defines pipeline interface after it has been split into individual jobs based on a FileTree."""
2from functools import lru_cache
3from file_tree.file_tree import FileTree
4from typing import Optional, Set, Dict, Collection, Any, Callable, Sequence, List, Tuple
5from enum import Enum
6from fsl.utils.fslsub import func_to_cmd
7from loguru import logger
8from pathlib import Path
9from fsl_sub import submit, config
10import re
11import inspect
12from contextlib import contextmanager
13from wcmatch.glob import globmatch, GLOBSTAR
14from warnings import warn
15import numpy as np
17class OutputMissing(IOError):
18 """Raised if a job misses the output files."""
19 pass
22class InputMissingRun(IOError):
23 """Raised if a job misses the input files when it is being launched."""
24 pass
25class InputMissingPipe(IOError):
26 """Raised if a job misses input files while it is being added to the pipeline."""
27 def __init__(self, target, missing, dependency_graph=None):
28 self.target = target
29 self.missing = missing
30 if dependency_graph is None:
31 dependency_graph = []
32 self.dependency_graph = dependency_graph
33 super().__init__("")
35 def __str__(self, ):
36 dependency = ' -> '.join([str(d) for d in self.dependency_graph])
37 return f"{self.target} can not be added to pipeline as it is missing required input files: {self.missing} (dependency_graph: {dependency} )"
40class RunMethod(Enum):
41 """How to run the individual jobs."""
43 local = 1
44 submit = 2
45 dask = 3
47 def default():
48 """Return default RunMethod, which is `submit` on a cluster and `local` otherwise."""
49 return RunMethod.submit if config.has_queues() else RunMethod.local
52class JobList:
53 """Pipeline with a concrete set of jobs.
55 Produced from a :class:`fsl_pipe.pipeline.Pipeline` based on the provided `FileTree`.
57 The default `JobList` produced by `Pipeline.generate_jobs` is unsorted and
58 needs to be sorted by running `filter` before running.
59 """
61 def __init__(self, tree: FileTree, jobs: Sequence["SingleJob"], targets: Dict[str, "FileTarget"]):
62 """Create a new concrete set of jobs."""
63 self.tree = tree
64 self.jobs = jobs
65 self.targets = targets
66 self._sort()
68 def __len__(self, ):
69 return len(self.jobs)
71 def filter(self, templates: Collection[str]=None, overwrite=False, overwrite_dependencies=False, skip_missing=False) -> "JobList":
72 """Filter out a subset of jobs.
74 This does a total of three things:
75 - Only include jobs needed to create the templates or file patterns given in `templates`.
76 - Exclude jobs for which output files already exist, unless requested by `overwrite` or `overwrite_dependencies`.
77 - Sort the jobs, so that dependencies are run before the jobs that depend on them.
79 :param templates: target template keys (default: all templates); Any jobs required to produce those files will be kept in the returned pipeline.
80 :param overwrite: if True overwrite the files matching the template even if they already exist, defaults to False
81 :param overwrite_dependencies: if True rerun any dependent jobs even if the output of those jobs already exists, defaults to False
82 :param skip_missing: if True remove any jobs missing input dependencies. If not set, an error is raised whenever inputs are missing.
83 :return: Concrete pipeline with the jobs needed to produces `templates` (with optional overriding)
84 """
85 if isinstance(templates, str):
86 templates = [templates]
87 def add_target(target: "FileTarget", warn_msg=""):
88 if target.exists() and (not overwrite or target.producer is None):
89 return
90 if target.producer is None:
91 warn(f"No job found that produces {target.filename} {warn_msg}")
92 return
93 try:
94 target.producer.add_to_jobs(all_jobs, overwrite=overwrite, overwrite_dependencies=overwrite_dependencies)
95 except InputMissingPipe as e:
96 if skip_missing:
97 return
98 e.dependency_graph.insert(0, e.target)
99 e.target = target.filename
100 raise
102 all_jobs: Tuple[List[SingleJob], Set[SingleJob]] = ([], set())
103 if templates is None:
104 for target in self.targets.values():
105 if target.producer is None:
106 continue
107 add_target(target)
108 else:
109 for template in templates:
110 if template in self.tree.template_keys():
111 for filename in self.tree.get_mult(template).data.flatten():
112 target = get_target(filename, self.targets, from_template=template)
113 add_target(target, f"(part of template {template})")
114 else:
115 matches = get_matching_targets(template, self.targets)
116 if len(matches) == 0:
117 warn(f"No files were found that match {template}. It is not a template in the FileTree nor does it match any files that can be produced by this pipeline.")
118 for target in matches:
119 add_target(target, f"(matches {template})")
120 return JobList(self.tree, all_jobs[0], self.targets)
122 def _sort(self, ):
123 """Sorts the job in the `jobs` list in place based on dependencies.
125 A job dependencies should always come before the job itself.
126 This sorting is done purely based on the job inputs/outputs and does not depend on which files actually exist on disk.
127 """
128 all_jobs: Tuple[List[SingleJob], Set[SingleJob]] = ([], set())
129 for job in self.jobs:
130 job.add_to_jobs(all_jobs, only_sort=True)
132 # Remove jobs that were already filtered out
133 sorted_jobs = [j for j in all_jobs[0] if j in self.jobs]
134 assert len(self.jobs) == len(sorted_jobs)
135 self.jobs = sorted_jobs
138 def batch(self, use_label=False, only_connected=False, split_on_ram=False, use_placeholders=True, batch_unlabeled=False):
139 """Batches groups of jobs into a single job.
141 Two jobs will be batched if:
143 1. There are no intermediate jobs that need to run between the two and cannot be included in the same batch (because of #3 below).
144 2. One of the jobs depend on the other. This is disabled by default. Enable it by setting `only_connected` to True.
145 3. The jobs are similar. Jobs are similar if they match all of the (enabled) properties listed below:
147 a) They have the same batch label (set during pipeline creating using the `@pipe(batch=<batch label>)`. This can be enabled by setting the `use_label` keyword to True. If enabled, jobs without a batch label will never be merged.
148 b) The jobs have the same submit parameters (e.g., "architecture", "coprocessor"), except for "jobtime", "name", and "export_vars". "jobram" can also be ignored by setting the `split_on_ram` keyword to True.
149 c) The jobs have the same placeholder values. This will prevent jobs from different subjects to be merged with each other. It can be disabled by setting the `use_placeholders` to False. Alternatively, a subset of placeholder values could be considered by passing `use_placeholders` to a sequence of placeholders to consider (e.g., `use_placeholders=["subject", "session"]` will not merge jobs with different "subject" or "session" placeholder values, but will ignore any other placeholders).
151 A new `JobList` with the batched jobs will be returned.
152 """
153 reference = self.copy()
155 all_flags = {key for job in reference.jobs for key in job.submit_params.keys()}.difference(["jobtime", "name", "export_vars"])
156 if not split_on_ram and "jobram" in all_flags:
157 all_flags.remove("jobram")
158 all_flags = sorted(all_flags)
160 job_labels = []
161 for job in reference.jobs:
162 label = []
163 label.append(job.batch if use_label else 1)
164 label.extend(job.submit_params.get(key, None) for key in all_flags)
165 if use_placeholders:
166 if use_placeholders is True:
167 parameters = job.set_parameters
168 else:
169 parameters = {key: value for key, value in job.set_parameters.items() if key in use_placeholders}
170 label.append(frozenset(parameters.items()))
171 job_labels.append(tuple(label))
173 hashable_job_labels = [tuple(label) if isinstance(label, list) else label for label in job_labels]
175 to_batch = {
176 tb: set(job for (job, label) in zip(reference.jobs, job_labels) if label == tb)
177 for tb in set(hashable_job_labels)
178 }
179 jobs = set(reference.jobs)
180 for to_batch, batching in to_batch.items():
181 if to_batch[0] is None and not batch_unlabeled:
182 # do not batch jobs without a label set (if `use_label` is True)
183 continue
184 other_jobs = jobs.difference(batching)
185 new_jobs = batch_connected_jobs(batching, other_jobs)
186 if not only_connected:
187 new_jobs = batch_unconnected_jobs(new_jobs, other_jobs)
188 jobs = set(new_jobs).union(other_jobs)
189 return JobList(reference.tree, jobs, reference.targets)
191 def split_pipeline(self, use_label=False, split_on_ram=False):
192 """Split the pipeline into multiple stages that require different hardware.
194 This uses the same rules as :meth:`batch`, except that placeholder values are always ignored.
195 """
196 batched_jobs = self.batch(
197 use_label=use_label, split_on_ram=split_on_ram,
198 only_connected=False, use_placeholders=False, batch_unlabeled=True
199 )
201 return [stage.to_job_list(batched_jobs.tree) for stage in batched_jobs.jobs]
203 def copy(self, ):
204 """Create a new, independent copy of the JobList."""
205 targets = dict()
206 new_jobs = [job.copy(targets) for job in self.jobs]
207 return JobList(self.tree, new_jobs, targets)
209 def report(self, console=None):
210 """Produce tree reports with the relevant input/output templates."""
211 from rich import color, tree
212 if console is None:
213 from rich.console import Console
214 console = Console()
216 if len(self.jobs) == 0:
217 console.print("No jobs will be run.")
218 return
220 all_inputs = set.union(*[set(j.input_targets) for j in self.jobs])
221 all_outputs = set.union(*[set(j.output_targets) for j in self.jobs])
222 templates = {
223 template
224 for fns in (all_inputs, all_outputs)
225 for target in fns
226 for template in target.from_templates
227 }
229 def proc_line(tree_obj):
230 for t in tree_obj.children:
231 proc_line(t)
232 line = tree_obj.label
233 start = line.index('[cyan]') + 6
234 end = line.index('[/cyan]')
235 key = line[start:end]
236 if key not in templates:
237 return
238 input_count = 0
239 output_count = 0
240 overwrite_count = 0
241 exists_count = 0
243 for fn in self.tree.get_mult(key).data.flatten():
244 target = get_target(fn, self.targets)
245 is_output = False
246 is_input = False
247 if target in all_outputs:
248 is_output = True
249 if target in all_inputs:
250 is_input = True
251 if target.exists() and is_output:
252 overwrite_count += 1
253 elif is_output:
254 output_count += 1
255 elif target.exists():
256 exists_count += 1
257 if is_input:
258 input_count += 1
259 if (input_count + output_count + overwrite_count) == 0:
260 return
261 counter = "/".join([str(number) if color is None else f"[{color}]{number}[/{color}]"
262 for number, color in [
263 (overwrite_count, 'red'),
264 (output_count, 'yellow'),
265 (input_count, 'blue'),
266 ]])
267 tree_obj.label = f"{line} [{counter}]"
269 for rich_obj in self.tree.filter_templates(templates).fill()._generate_rich_report():
270 if isinstance(rich_obj, tree.Tree):
271 proc_line(rich_obj)
272 console.print(rich_obj)
274 def run_datalad(self, ):
275 """Make sure we can run the pipeline.
277 Calls `datalad.get` on all input files and `datalad.unlock` on all output files.
278 """
279 input_targets = set()
280 output_targets = set()
282 for job_group in self.jobs.values():
283 for j in job_group:
284 input_targets.update(j.exists_before)
285 output_targets.update(j.exists_after)
287 input_targets.difference_update(output_targets)
288 input_fns = [t.filename for t in input_targets]
289 output_fns = [t.filename for t in output_targets if t.exists]
291 from .datalad import get_dataset
292 ds = get_dataset()
293 ds.get(input_fns)
294 ds.unlock(output_fns)
296 def run(self, method: RunMethod=None, wait_for=(), clean_script="on_success"):
297 """Run all the jobs that are required to produce the given templates.
299 :param method: defines how to run the job
300 :param wait_for: job IDs to wait for before running pipeline
301 :param clean_script: Sets whether the script produced in the log directory when submitting a job to the cluster should be kept after the job finishes. Only used if `method` is "submit". Options:
302 - "never": Script is kept
303 - "on_success": (default) Only remove if script successfully finishes (i.e., no error is raised)
304 - "always": Always remove the script, even if the script raises an error
305 """
306 if method is None:
307 method = RunMethod.default()
308 elif not isinstance(method, RunMethod):
309 method = RunMethod[method]
310 if len(self.jobs) == 0:
311 logger.info("No new jobs being run/submitted")
312 return
314 prev_count = 0
315 run_jobs: Dict[SingleJob, Any] = {}
316 while len(run_jobs) < len(self.jobs):
317 for job in self.jobs:
318 if job in run_jobs or any(j in self.jobs and j not in run_jobs for _, j in job.dependencies(only_missing=False)):
319 continue
320 dependencies = [run_jobs[j] for _, j in job.dependencies(only_missing=False) if j in run_jobs]
321 if len(dependencies) == 0:
322 dependencies = wait_for
323 run_jobs[job] = job(
324 method=method,
325 wait_for=dependencies,
326 clean_script=clean_script,
327 )
328 if len(run_jobs) == prev_count:
329 raise ValueError("Unable to run/submit all jobs. Are there circular dependencies?")
330 prev_count = len(run_jobs)
332 if method == RunMethod.dask:
333 import dask
334 def last_dask_job(*all_jobs):
335 if any(a != 0 for a in all_jobs):
336 return 1
337 return 0
338 if dask.delayed(last_dask_job, name="last_job")(*run_jobs.values()).compute() == 1:
339 raise ValueError("One or more of the jobs have failed.")
340 logger.info("Successfully finished running all jobs using Dask.")
342 def scale_jobtime(self, scaling):
343 """
344 Scale the submit job times of all jobs by `scaling`.
346 This will only affect jobs submitted to the cluster.
347 """
348 for job in self.jobs:
349 job.scale_jobtime(scaling)
350 return self
353class JobParent:
354 """Parent for `SingleJob` and `BatchJob`.
355 """
356 input_targets: Set["FileTarget"]
357 output_targets: Set["FileTarget"]
358 optional_targets: Set["FileTarget"]
360 #@lru_cache(None)
361 def dependencies(self, only_missing=True) -> Set[Optional["SingleJob"]]:
362 """Return jobs on which this job depends.
364 By default it only returns those related with missing input files.
366 :param only_missing: set to False to also return dependencies that produce files that already exist on disk
367 """
368 jobs = set()
369 for target in self.input_targets:
370 if not (only_missing and target.exists()):
371 jobs.add((target in self.optional_targets, target.producer))
372 return jobs
374 def missing_output(self, reset_cache=False):
375 """
376 Create a list of filenames that do not exist on disk.
378 Optional outputs are not considered.
380 :param reset_cache: set to True to not rely on an existing cached existence check
381 """
382 missing = set()
383 for to_check in self.output_targets:
384 if to_check in self.optional_targets:
385 continue
386 if reset_cache:
387 to_check.reset_existence()
388 if not to_check.exists():
389 missing.add(to_check)
390 return missing
392 def missing_input(self, reset_cache=False, ignore_expected=False):
393 """
394 Create a list of filenames that do not exist on disk.
396 Optional inputs are not considered.
398 :param reset_cache: set to True to not rely on an existing cached existence check
399 :param ignore_expected: set to True to ignore any missing files that have a job that will produce them in the pipeline
400 """
401 missing = set()
402 for to_check in self.input_targets:
403 if to_check in self.optional_targets:
404 continue
405 if reset_cache:
406 to_check.reset_existence()
407 if ignore_expected and to_check.producer is not None:
408 continue
409 if not to_check.exists():
410 missing.add(to_check)
411 return missing
413 def add_to_jobs(self, all_jobs, overwrite=False, overwrite_dependencies=False, only_sort=False):
414 """Mark this job and all of its dependencies to run.
416 This job is marked to run, if any of the output does not yet exist or overwrite is True.
417 The dependencies are marked to run, if this job runs and either their output does not exist or overwrite_dependencies is True.
419 :param all_jobs: list and set of individual jobs. This job and all required jobs are added to this list.
420 :param overwrite: if True mark this job even if the output already exists, defaults to False
421 :param overwrite_dependencies: if True mark the dependencies of this job even if their output already exists, defaults to False
422 """
423 if self in all_jobs[1]:
424 return
425 if not only_sort:
426 if not overwrite and len(self.missing_output()) == 0:
427 return
428 missing = self.missing_input(ignore_expected=True)
429 if len(missing) > 0:
430 raise InputMissingPipe(self, {m.filename for m in missing})
431 subjobs = ([], set(all_jobs[1]))
432 for optional, job in self.dependencies(only_missing=not (overwrite_dependencies or only_sort)):
433 try:
434 if job is not None:
435 job.add_to_jobs(subjobs, overwrite_dependencies, overwrite_dependencies, only_sort) and not optional
436 except InputMissingPipe as e:
437 if optional:
438 continue
439 e.dependency_graph.insert(0, e.target)
440 e.target = self
441 raise
442 all_jobs[0].extend(subjobs[0])
443 all_jobs[1].update(subjobs[0])
444 all_jobs[0].append(self)
445 all_jobs[1].add(self)
447 def __call__(self, method: RunMethod, wait_for=(), clean_script="on_success"):
448 """Run the job using the specified `method`."""
449 if method == RunMethod.local:
450 self.prepare_run()
451 missing = self.missing_input()
452 if len(missing) > 0:
453 raise InputMissingRun(f"{self} can not run as it misses required input files: {missing}")
454 logger.info(f"running {self}")
455 self.job = self.function(**self.kwargs)
456 missing = self.missing_output(reset_cache=True)
457 if len(missing) > 0:
458 raise OutputMissing(f"{self} failed to produce required output files: {missing}")
459 elif method == RunMethod.submit:
460 from .pipeline import Template
461 self.prepare_run()
462 local_submit = dict(self.submit_params)
463 if 'logdir' not in local_submit:
464 local_submit['logdir'] = 'log'
465 Path(local_submit['logdir']).mkdir(exist_ok=True, parents=True)
466 if 'name' not in local_submit:
467 local_submit['name'] = self.job_name()
468 cmd = func_to_cmd(self.function, (), self.kwargs, local_submit['logdir'], clean=clean_script)
469 if len(wait_for) == 0:
470 wait_for = None
471 self.job = submit(cmd, jobhold=wait_for, **local_submit)
472 logger.debug(f"submitted {self} with job ID {self.job}")
473 elif method == RunMethod.dask:
474 import dask
475 def dask_job(*other_jobs):
476 if any(a != 0 for a in other_jobs):
477 logger.debug(f"{self} skipped because dependencies failed")
478 return 1
479 try:
480 logger.debug(f"Running {self} using dask")
481 self(RunMethod.local)
482 except Exception as e:
483 logger.exception(f"{self} failed: {e}")
484 return 1
485 logger.debug(f"Running {self} using dask")
486 return 0
487 self.job = dask.delayed(dask_job, name=str(self))(*wait_for)
488 return self.job
490 def prepare_run(self):
491 """
492 Prepare to run this job.
494 Steps:
495 1. Creates output directory
496 """
497 for target in self.output_targets:
498 target.filename.parent.mkdir(parents=True, exist_ok=True)
500 def expected(self, ):
501 """
502 Return true if this job is expected to be able to run.
503 """
504 for target in self.input_targets:
505 if not target.expected():
506 return False
507 return True
510class SingleJob(JobParent):
511 """A single job within a larger pipeline."""
513 def __init__(self, function: Callable, kwargs, submit_params, input_targets, output_targets, optionals, set_parameters=None, batch=None):
514 """
515 Create a single job that can be run locally or submitted.
517 :param function: python function
518 :param kwargs: keyword arguments
519 :param submit_params: instructions to submit job to cluster using `fsl_sub`
520 :param input_targets: set of all filenames expected to exist before this job runs
521 :param output_targets: set of all filenames expected to exist after this job runs
522 :param optionals: set of filenames that are used to generate the dependency graph and yet might not exist
523 :param set_parameters: dictionary with placeholder values used to distinguish this SingleJob with all those produced from the same function
524 :param batch: label used to batch multiple jobs into one when submitting to the cluster
525 """
526 self.function = function
527 if set_parameters is None:
528 set_parameters = {}
529 self.set_parameters = set_parameters
530 self.kwargs = kwargs
531 self.input_targets = input_targets
532 self.output_targets = output_targets
533 self.optional_targets = set(optionals)
534 self.submit_params = dict(submit_params)
535 self.batch = batch
536 for target in self.input_targets:
537 target.required_by.add(self)
538 for target in self.output_targets:
539 target.producer = self
541 def copy(self, targets: Dict[str, "FileTarget"]):
542 """
543 Create a copy of the `SingleJob` to be included in a new :class:`JobList`.
545 `targets` contain the set of FileTargets recognised by this new `JobList`.
546 This will be updated based on the input/output targets of this job.
547 """
548 def copy_targets(job_targets):
549 new_targets = set()
550 for target in job_targets:
551 new_target = get_target(target.filename, targets)
552 new_target.from_templates = target.from_templates
553 new_targets.add(new_target)
554 return new_targets
556 return SingleJob(
557 self.function,
558 self.kwargs,
559 self.submit_params,
560 copy_targets(self.input_targets),
561 copy_targets(self.output_targets),
562 copy_targets(self.optional_targets),
563 self.set_parameters,
564 self.batch,
565 )
567 def job_name(self, ):
568 """Return a string representation of this job."""
569 if len(self.set_parameters) > 0:
570 parameter_string = '_'.join([f"{key}-{value}" for key, value in self.set_parameters.items()])
571 name = f"{self.function.__name__}_{parameter_string}"
572 else:
573 name = self.function.__name__
574 value = re.sub(r'[^\w\s-]', '', name).strip().lower()
575 return re.sub(r'[-\s]+', '-', value)
577 def __repr__(self, ):
578 """Print job as a function call."""
579 parameter_string = ', '.join([f"{key}={value}" for key, value in self.set_parameters.items()])
580 return f"{self.function.__name__}({parameter_string})"
582 def to_job_list(self, tree:FileTree):
583 """Convert single job into its own :class:`JobList`."""
584 result = JobList(tree, [self], {}).copy()
585 return self.batch, self.submit_params, result
587 def scale_jobtime(self, scaling):
588 """
589 Scale the submit job time in place by `scaling`.
591 This will only affect jobs submitted to the cluster.
592 """
593 if "jobtime" in self.submit_params.keys():
594 self.submit_params["jobtime"] = int(np.ceil(scaling * self.submit_params["jobtime"]))
597def get_target(filename: Path, all_targets, from_template=None) -> "FileTarget":
598 """
599 Return a :class:`FileTarget` matching the input `filename`.
601 If the `FileTarget` for `filename` is already in `all_targets`, it will be returned.
602 Otherwise a new `FileTarget` will be added to `all_targets` and returned.
604 :param filename: path to the input/intermediate/output filename
605 :param all_targets: dictionary of all FileTargets
606 :param from_template: template key used to obtain the filename
607 """
608 abs_path = Path(filename).absolute()
609 if abs_path not in all_targets:
610 all_targets[abs_path] = FileTarget(filename)
611 if from_template is not None:
612 all_targets[abs_path].from_templates.add(from_template)
613 return all_targets[abs_path]
616def get_matching_targets(pattern: str, all_targets) -> List["FileTarget"]:
617 """
618 Return all :class:`FileTarget` that match the given input pattern.
620 :param pattern: filename definition supporting Unix shell-style wildcards
621 :param all_targets: dictionary of all FileTargets
622 """
623 abs_pattern = str(Path(pattern).absolute())
624 matches = []
625 for path, target in all_targets.items():
626 if globmatch(str(path), abs_pattern, flags=GLOBSTAR):
627 matches.append(target)
628 return matches
631class FileTarget:
632 """Input, intermediate, or output file.
634 See :func:`get_target` for instructions on creating a new `FileTarget`.
636 If a specific :class:`SingleJob` produces a filename, this can be indicated by setting :attr:`producer`:
638 .. code-block:: python
640 get_target(filename, all_targets).producer = job
642 This will raise a `ValueError` if the filename is already produced by another job.
644 If a specific :class:`SingleJob` requires a filename, this can be indicated by adding it to :attr:`required_by`:
646 .. code-block:: python
648 get_target(filename, all_targets).required_by.add(job)
650 Filename existence can be checked using :meth:`exists`.
651 This method uses caching. To reset the cache run :meth:`reset_existence`.
653 To check if the filename can be created by this pipeline (or already exists) run :meth:`expected`.
654 """
656 def __init__(self, filename: Path):
657 """
658 Create a new target based on the provided filename.
660 Do not call this method directly.
661 Instead use :func:`get_target`.
663 :param filename: filename
664 """
665 self.filename = Path(filename)
666 self._producer = None
667 self.required_by: Set[SingleJob] = set()
668 self.from_templates: Set[str] = set()
670 def exists(self) -> bool:
671 """
672 Test whether file exists on disk.
674 This function is lazy; once it has been checked once it will keep returning the same result.
676 To reset use :meth:`reset_existence`.
677 """
678 if not hasattr(self, "_exists"):
679 self._exists = self.filename.is_symlink() or self.filename.exists()
680 return self._exists
682 def reset_existence(self, ):
683 """Ensure existence is checked again when running :meth:`exists`."""
684 if hasattr(self, "_exists"):
685 del self._exists
687 def expected(self, ):
688 """
689 Return whether the file can be produced by the pipeline (or already exists).
691 Returns False if the file does not exist and there is no way to produce it. Otherwise, True is returned
692 """
693 if self.exists():
694 return True
695 if self.producer is None:
696 return False
697 return self.producer.expected()
699 @property
700 def producer(self, ) -> SingleJob:
701 """Job that can produce this file."""
702 return self._producer
704 @producer.setter
705 def producer(self, new_value):
706 if self._producer is not None:
707 if self._producer is new_value:
708 return
709 raise ValueError(f"{self} can be produced by both {self.producer} and {new_value}")
710 self._producer = new_value
712 def __repr__(self, ):
713 """Print filename of target."""
714 return f"FileTarget({str(self.filename)})"
717@contextmanager
718def update_closure(*dictionaries, **kwargs):
719 """Add the provided dictionaries to the globals dictionary.
721 Inside the `with` block all the dictionary keys will be available in the local environment.
722 After the `with` block ends the local environment will be cleaned up again.
724 Use like this:
726 .. code-block:: python
728 def my_func()
729 with add_to_globals({'a': 3}):
730 print(a) # prints 3
731 print(a) # raises a NameError (or prints whatever `a` is set to in the global environment)
732 """
733 func_globals = inspect.stack()[2].frame.f_globals
735 # merge input dictionaries
736 new_kwargs = {}
737 for d in dictionaries:
738 new_kwargs.update(d)
739 new_kwargs.update(kwargs)
741 to_restore = {}
742 to_delete = set()
743 for key in new_kwargs:
744 if key in func_globals:
745 to_restore[key] = func_globals[key]
746 else:
747 to_delete.add(key)
749 func_globals.update(new_kwargs)
750 yield # return to run the code within the with-block
752 # clean up the environment
753 func_globals.update(to_restore)
754 for key in to_delete:
755 del func_globals[key]
756 del func_globals
759def call_batched_jobs(funcs, kwargs):
760 for f, k in zip(funcs, kwargs):
761 f(**k)
763class BatchJob(JobParent):
764 """Batched combination of multiple `SingleJob` instances that will be submitted together.
765 """
766 function = staticmethod(call_batched_jobs)
768 def __init__(self, *sub_jobs):
769 """Creates a new `BatchJob` from sub_jobs.
771 `sub_jobs` can be either `SingleJob` or `BatchJob`. In either case they will be run in the order in which they are supplied.
772 """
773 self.torun: Sequence[SingleJob] = []
774 for job in sub_jobs:
775 if isinstance(job, BatchJob):
776 self.torun.extend(job.torun)
777 else:
778 assert isinstance(job, SingleJob)
779 self.torun.append(job)
780 self.torun_set = set(self.torun)
782 self.output_targets = {t for job in self.torun for t in job.output_targets}
783 for t in self.output_targets:
784 t._producer = self
785 self.input_targets = {t for job in self.torun for t in job.input_targets if t not in self.output_targets}
786 for t in self.input_targets:
787 t.required_by.add(self)
789 self.optional_targets = set.union(
790 {
791 t for t in self.output_targets
792 if not any(t in job.output_targets and t not in job.optional_targets for job in self.torun)
793 },
794 {
795 t for t in self.input_targets
796 if not any(t in job.input_targets and t not in job.optional_targets for job in self.torun)
797 },
798 )
800 as_tuples = set.intersection(*[{(key, value) for key, value in job.set_parameters.items()} for job in self.torun])
801 self.set_parameters = {key: value for key, value in as_tuples}
803 def copy(self, targets: Dict[str, "FileTarget"]):
804 """
805 Create a copy of the `BatchJob` to be included in a new :class:`JobList`.
807 `targets` contain the set of FileTargets recognised by this new `JobList`.
808 This will be updated based on the input/output targets of each job within this batch.
809 """
810 return BatchJob(*[job.copy(targets) for job in self.torun])
812 @property
813 def batch(self, ):
814 b = self.torun[0].batch
815 if all(job.batch is not None and job.batch == b for job in self.torun):
816 return b
817 return None
819 @property
820 def kwargs(self, ):
821 return {
822 'funcs': [job.function for job in self.torun],
823 'kwargs': [job.kwargs for job in self.torun],
824 }
826 @property
827 def submit_params(self, ):
828 def sum_value(name, *times):
829 total = sum(t for t in times if t is not None)
830 return None if total == 0 else total
832 def max_value(name, *rams):
833 if any(r is not None for r in rams):
834 return max(int(r) for r in rams if r is not None)
835 return None
837 def extend(name, *vars):
838 all_vars = set()
839 for v in vars:
840 if v is not None:
841 all_vars.update(v)
842 return list(all_vars)
844 def merge_name(_, *names):
845 if len(names) <= 3:
846 return "-".join(names)
847 return "batch_job"
849 def unique_param(name, *flags):
850 not_none = {f for f in flags if f is not None}
851 if len(not_none) == 0:
852 return None
853 elif len(not_none) == 1:
854 return not_none.pop()
855 else:
856 raise ValueError(f"Multiple different values for submit parameter flag '{name}' found. fsl-pipe does not know which one to choose out of: {not_none}")
858 to_combine = {
859 'jobtime': sum_value,
860 'jobram': max_value,
861 'threads': max_value,
862 'export_vars': extend,
863 'name': merge_name,
864 }
866 all_flags = {flag for job in self.torun for flag in job.submit_params.keys()}
867 return {
868 flag: to_combine.get(flag, unique_param)(flag, *[job.submit_params.get(flag, None) for job in self.torun])
869 for flag in all_flags
870 }
872 def job_name(self, ):
873 """Return a string representation of this job."""
874 base = "" if self.batch is None else self.batch + '_'
875 func_name = "-".join(sorted({job.function.__name__ for job in self.torun}))
876 if len(self.set_parameters) > 0:
877 parameter_string = '_'.join([f"{key}-{value}" for key, value in self.set_parameters.items()])
878 name = f"{func_name}_{parameter_string}"
879 else:
880 name = func_name
881 value = re.sub(r'[^\w\s-]', '', name).strip().lower()
882 return base + re.sub(r'[-\s]+', '-', value)
884 def scale_jobtime(self, scaling):
885 """
886 Scale the submit job time in place by `scaling`.
888 This will only affect jobs submitted to the cluster.
889 """
890 for job in self.torun:
891 job.scale_jobtime(scaling)
893 def __repr__(self, ):
894 """Print job as a function call."""
895 return f"Batch({self.torun})"
897 def to_job_list(self, tree:FileTree):
898 """Convert batched jobs back into a :class:`JobList`."""
899 result = JobList(tree, self.torun, {}).copy()
900 return self.batch, self.submit_params, result
903def has_dependencies(possible_parent: JobParent, possible_children: Set[JobParent], all_jobs):
904 if possible_parent in possible_children:
905 return True
906 for _, child in possible_parent.dependencies(only_missing=False):
907 if child in all_jobs and has_dependencies(child, possible_children, all_jobs):
908 return True
909 return False
912def batch_connected_jobs(to_batch: Sequence[SingleJob], other_jobs: Collection[SingleJob]) -> List[JobParent]:
913 """Iteratively combines two jobs
915 Two jobs will be batched if:
916 1. They are both in `to_batch`
917 2. One of the jobs depends on the other
918 3. There is no other job that needs to run between these two jobs.
919 Iteration through the jobs will continue until no further batching is possible.
920 """
921 other_jobs = set(other_jobs)
922 batches = set(to_batch)
923 nbatches_prev = len(batches) * 2
924 while len(batches) != nbatches_prev:
925 nbatches_prev = len(batches)
926 for job in set(batches):
927 if job not in batches:
928 continue
929 for _, dependency in job.dependencies(only_missing=False):
930 if job in batches and dependency in batches:
931 # properties 1 and 2 are met; now checking property 3
932 all_jobs = set.union(batches, other_jobs)
933 indirect_dependency = any(has_dependencies(d, {dependency}, all_jobs) for _, d in job.dependencies(only_missing=False) if d in all_jobs and d != dependency)
934 if not indirect_dependency:
935 batches.remove(job)
936 batches.remove(dependency)
937 batches.add(BatchJob(dependency, job))
938 break
939 return list(batches)
942def batch_unconnected_jobs(to_batch: Sequence[JobParent], other_jobs: Collection[JobParent]) -> List[BatchJob]:
943 """Batch jobs into generational sets
945 All jobs that do not depend on any other job in `to_batch` will be added to the first generational batch.
946 All jobs that only depend on the first generational batch will be added to the second.
947 And so forth, until all jobs have been added to a generation.
949 Only dependencies through `other_jobs` will be considered.
950 """
951 to_batch = set(to_batch)
952 all_jobs = set.union(to_batch, other_jobs)
954 generations = []
956 while len(to_batch) > 0:
957 generations.append(set())
958 for job in to_batch:
959 if not any(has_dependencies(dependency, to_batch, all_jobs) for _, dependency in job.dependencies(only_missing=False) if dependency in all_jobs):
960 generations[-1].add(job)
961 to_batch.difference_update(generations[-1])
962 return [jobs.pop() if len(jobs) == 1 else BatchJob(*jobs) for jobs in generations]