Coverage for src/fsl_pipe/pipeline.py: 70%
431 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-14 13:47 +0100
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-14 13:47 +0100
1"""
2Defines the pipeline before a FileTree is provided.
4A pipeline is a collection of functions with mapping from the function parameters to input/output/reference/placeholder basenames.
5"""
6from typing import List, Optional, Set, Tuple, Dict, Collection, Callable, Union, Any
7import argparse
8from dataclasses import dataclass
9from file_tree import FileTree
10from copy import copy
11import inspect
12import xarray
13from collections import namedtuple
14from fsl_sub import submit
15from file_tree.template import is_singular
16from pathlib import Path
17from numpy import unique
18import graphviz
19import fnmatch
20try:
21 import datalad
22except ImportError:
23 datalad = None
25class Pipeline:
26 """Collection of python functions forming a pipeline.
28 You can either create a new pipeline (`from fsl_pipe import Pipeline; pipe = Pipeline()`) or use a pre-existing one (`from fsl_pipe import pipe`)
30 Scripts are added to a pipeline by using the pipeline as a decorator (see :meth:`__call__`).
32 To run the pipeline based on instructions from the command line run :meth:`pipe.cli(tree) <cli>`,
33 where tree is a FileTree defining the directory structure of the pipeline input & output files.
35 :ivar scripts: list of :class:`PipedFunction`, which define the python functions forming the pipeline and their input/output templates
36 """
38 def __init__(self, scripts=None, default_output=None, default_submit=None):
39 """Create a new empty pipeline."""
40 if scripts is None:
41 scripts = []
42 if default_output is None:
43 default_output = []
44 if default_submit is None:
45 default_submit = {}
46 self.scripts: List[PipedFunction] = list(scripts)
47 self.default_output: List[str] = list(default_output)
48 check_submit_parameters(default_submit)
49 self.default_submit = default_submit
51 def __call__(self, function=None, *, kwargs=None, no_iter=None, placeholders=None, as_path=False, submit=None, batch=None):
52 """Add a python function as a :class:`PipedFunction` to the pipeline.
54 This is the main route through which jobs are added to the pipeline.
56 .. code-block:: python
58 from fsl_pipe import pipe, In, Out, Ref, Var
60 @pipe(submit=dict(jobtime=40))
61 def func(in_path: In, out_path: Out, ref_path: Ref, placeholder_key: Var):
62 pass
64 :param function: Optionally provide the function directly. This can be useful when not using `pipe` as a decorator, e.g.:
66 .. code-block:: python
68 from fsl_pipe import pipe, In, Out, Ref, Var
69 from shutil import copyfile
71 pipe(copyfile, kwargs={'src': In('template_in'), 'dst': Out('template_out')})
73 :param kwargs: maps function keyword arguments to template names (In/Out/Ref), placeholder names (Var), or anything else. Templates or placeholders are replaced by their values based on the file-tree. Anything else is passed on unaltered.
74 :param no_iter: optional set of parameters not to iterate over.
75 :param as_path: if set to true, provide template filenames to the function as `pathlib.Path` objects instead of strings.
76 :param placeholders: dictionary overriding the placeholders set in the filetree for this specific function. This can, for example, be used to run this function on a sub-set of subjects.
77 :param submit: dictionary with the flags to pass on to `fsl_sub.submit`, when submitting jobs to a cluster queue.
78 :param batch: label used to batch multiple jobs into one when submitting to the cluster
79 """
80 if submit is None:
81 submit = {}
82 submit_params = dict(**self.default_submit, **submit)
83 check_submit_parameters(submit_params)
84 def wrapper(func):
85 self.scripts.append(PipedFunction(func, submit_params=submit_params, kwargs=kwargs, no_iter=no_iter, override=placeholders, as_path=as_path, batch=batch))
86 return func
87 if function is None:
88 return wrapper
89 wrapper(function)
90 return function
92 def generate_jobs(self, tree: FileTree):
93 """
94 Split the pipeline into individual jobs.
96 Produces an unsorted `JobList`, which can be sorted by running `filter` on it.
98 :param tree: set of templates for the input/output/reference files with all possible placeholder values
99 """
100 from .job import JobList, FileTarget
101 all_targets: Dict[str, FileTarget] = {}
102 jobs = []
103 for script in self.scripts:
104 jobs.extend(script.get_jobs(tree, all_targets))
105 return JobList(tree, jobs, all_targets)
107 def default_parser(self, tree: FileTree, parser: Optional[argparse.ArgumentParser]=None, include_vars=None, exclude_vars=()) -> argparse.ArgumentParser:
108 """
109 Add default `fsl-pipe` arguments to an argument parser (will create a new argument parser if needed).
111 :param tree: `file_tree.FileTree` object describing the directory structure for the input/output files (defaults to datalad tree).
112 :param parser: Optional argument parser that will be updated with the default `fsl-pipe` arguments (by default a new one is created).
113 :param include_vars: if provided, only include expose placeholders in this list to the command line
114 :param exclude_vars: exclude placeholders in this list from the command line
115 :return: Argument parser with the default `fsl-pipe` arguments. If one was provided as input, that one will be returned.
116 """
117 from .job import RunMethod
118 if parser is None:
119 parser = argparse.ArgumentParser(description="Runs the pipeline")
120 if len(self.scripts) == 0:
121 raise ValueError("The pipeline does not contain any scripts...")
122 templates = set.union(*[script.filter_templates(True, tree.template_keys()) for script in self.scripts])
124 if len(self.default_output) == 0:
125 default_output = list(sorted(templates))
126 else:
127 default_output = self.default_output
129 parser.add_argument("templates", nargs="*", default=default_output,
130 help=f"Set to one or more template keys or file patterns (e.g., \"*.txt\"). Only these templates/files will be produced. If not provided all templates will be produced ({', '.join(default_output)}).")
131 parser.add_argument("-m", '--pipeline-method', default=RunMethod.default().name,
132 choices=[m.name for m in RunMethod],
133 help=f"method used to run the jobs (default: {RunMethod.default().name})")
134 parser.add_argument("-o", '--overwrite', action='store_true', help="If set overwrite any requested files.")
135 parser.add_argument("-d", '--overwrite_dependencies', action='store_true',
136 help="If set also overwrites dependencies of requested files.")
137 parser.add_argument("-j", '--job-hold', default='',
138 help='Place a hold on the whole pipeline until job has completed.')
139 parser.add_argument('--skip-missing', action='store_true',
140 help='If set skip running any jobs depending on missing data. This replaces the default behaviour of raising an error if any required input data is missing.')
141 parser.add_argument("-q", '--quiet', action='store_true', help="Suppresses the report on what will be run (might speed up starting up the pipeline substantially).")
142 parser.add_argument("-b", "--batch", action="store_true", help="Batch jobs based on submit parameters and pipeline labels before running them.")
143 parser.add_argument("--scale-jobtime", type=float, default=1., help="Scale the developer-set job times by the given value. Set to a number above 1 if you are analysing a particularly large datasets and the individual jobs might be slower than expected. Only affects jobs submitted to a cluster (i.e., `--pipeline_method=submit`).")
144 gui_help = "Start a GUI to select which output files should be produced by the pipeline."
145 try:
146 import fsl_pipe_gui
147 except ModuleNotFoundError:
148 gui_help = gui_help + " Missing requirement 'fsl-pipe-gui' to run the GUI. Please install this with `conda/pip install fsl-pipe-gui`."
149 parser.add_argument("-g", "--gui", action="store_true", help=gui_help)
150 if datalad is not None:
151 parser.add_argument("--datalad", action='store_true',
152 help="run datalad get on all input data before running/submitting the jobs")
154 def add_placeholder_flag(variable):
155 if not isinstance(variable, str):
156 for v in variable:
157 add_placeholder_flag(v)
158 return
159 if '/' in variable:
160 return
161 if variable in exclude_vars:
162 return
163 if include_vars is not None and variable not in include_vars:
164 return
165 value = tree.placeholders[variable]
166 default = str(value) if is_singular(value) else ','.join([str(v) for v in unique(value)])
167 parser.add_argument(f"--{variable}", nargs='+', help=f"Use to set the possible values of {variable} to the selected values (default: {default})")
169 add_placeholder_flag(tree.placeholders.keys())
171 return parser
173 def run_cli(self, args: argparse.Namespace, tree: FileTree):
174 """
175 Run the pipeline based on arguments extracted from the default argument parser.
177 :param args: Namespace consisting of arguments extracted from the command line (produced by `argparse.ArgumentParser.parse_args`). This is expected to contain:
179 - templates (defaults to `self.default_output`): list of which templates the pipeline should produce
180 - pipeline_method: string with method used to run the jobs
181 - overwrite: boolean, which if true overwrite any requested files
182 - overwrite_dependencies: boolean, which if true also overwrites dependencies of requested files
183 - job-hold: string with comma-separated list, which contains job(s) that will be waited for
184 - skip-missing: whether to skip jobs depending on missing data instead of raising an error
185 - datalad (defaults to False): if true, run datalad get on all input data before running/submitting the jobs
186 - {placeholder} (defaults to values in FileTree): sequence of strings overwriting the possible values for a particular placeholder
188 :param tree: Definition of pipeline input/output files
189 """
190 from .job import RunMethod
191 tree = tree.copy()
193 def set_placeholder(key):
194 if not isinstance(key, str):
195 for k in key:
196 set_placeholder(k)
197 return
198 if getattr(args, key, None) is not None:
199 tree.placeholders[key] = args.__dict__[key]
200 for var in tree.placeholders.keys():
201 set_placeholder(var)
203 if args.gui:
204 self.gui(tree, overwrite_dependencies=args.overwrite_dependencies, run_method=RunMethod[args.pipeline_method])
205 return
206 requested_templates = getattr(
207 args, 'templates',
208 None if len(self.default_output) == 0 else self.default_output
209 )
210 concrete = self.generate_jobs(tree)
211 torun = concrete.filter(requested_templates, overwrite=args.overwrite, overwrite_dependencies=args.overwrite_dependencies, skip_missing=args.skip_missing)
212 if not args.quiet:
213 torun.report()
214 if getattr(args, "batch", False):
215 use_label = any(job.batch is not None for job in concrete.jobs)
216 torun = torun.batch(use_label=use_label)
217 torun.scale_jobtime(args.scale_jobtime)
218 if datalad is not None and args.datalad:
219 torun.run_datalad()
220 torun.run(RunMethod[args.pipeline_method], wait_for=() if args.job_hold == '' else args.job_hold.split(','))
222 def cli(self, tree: Optional[FileTree]=None, include_vars=None, exclude_vars=(), cli_arguments=None):
223 """
224 Run the pipeline from the command line.
226 :param tree: `file_tree.FileTree` object describing the directory structure for the input/output files (defaults to datalad tree).
227 :param include_vars: if provided, only include expose variables in this list to the command line
228 :param exclude_vars: exclude variables in this list from the command line
229 :param cli_arguments: list of command line arguments. If not set the arguments used in the python call (`sys.argv`) are used.
230 """
231 if tree is None:
232 if datalad is None:
233 raise ValueError("Argument 'tree' missing: please provide a FileTree describing the directory structure for the pipeline")
234 from .datalad import get_tree
235 try:
236 tree = get_tree()
237 except IOError:
238 raise ValueError("Pipeline run outside of a datalad dataset, so a FileTree needs to be explicitly provided using the 'tree' argument")
239 if tree is None:
240 raise ValueError("No reference FileTree for pipeline found")
242 parser = self.default_parser(tree=tree, include_vars=include_vars, exclude_vars=exclude_vars)
243 args = parser.parse_args(cli_arguments)
244 self.run_cli(args, tree)
246 def gui(self, tree: FileTree, **kwargs):
247 """
248 Run the fsl-pipe-gui interface to select pipeline output.
250 :param tree: `file_tree.FileTree` object describing the directory structure for the input/output files.
251 :param overwrite_depencencies: set to True to overwrite dependencies
252 :param run_method: overrides the default method to run the jobs
253 """
254 try:
255 from fsl_pipe_gui import run_gui
256 except ModuleNotFoundError:
257 raise ModuleNotFoundError("'fsl-pipe-gui' needs to be installed to run the graphical user interface. Please run `pip/conda install fsl-pipe-gui` and try again.")
258 run_gui(self, tree, **kwargs)
260 def move_to_subtree(self, sub_tree=None, other_mappings=None):
261 """
262 Create a new pipeline that runs in a sub-tree of a larger tree rather than at the top level.
264 :param sub_tree: name of the sub-tree in the FileTree
265 :param other_mappings: other mappings between templates or placeholder names and their new values
266 """
267 if other_mappings is None:
268 other_mappings = {}
269 all_scripts = [script.move_to_subtree(sub_tree, other_mappings) for script in self.scripts]
270 new_default_submit = _update_kwargs(self.default_submit, sub_tree, other_mappings)
271 return Pipeline(all_scripts, [_update_key(key, sub_tree, other_mappings) for key in self.default_output], new_default_submit)
273 @classmethod
274 def merge(cls, pipelines: Collection["Pipeline"]):
275 """
276 Combine multiple pipelines into a single one.
278 :param pipelines: pipelines containing part of the jobs
279 :return: parent pipeline containing all of the jobs in pipelines
280 """
281 new_pipeline = Pipeline()
282 for pipeline in pipelines:
283 new_pipeline.scripts.extend([s.copy() for s in pipeline.scripts])
284 new_pipeline.default_output.extend(pipeline.default_output)
285 return new_pipeline
287 def find(self, function: Union[str, Callable]):
288 """
289 Iterate over any pipeline scripts that run the provided function.
291 Either the function itself or the name of the function can be given.
292 """
293 for script in list(self.scripts):
294 if script.function == function or getattr(script.function, '__name__', None) == function:
295 yield script
297 def remove(self, function: Union[str, Callable]):
298 """
299 Remove any pipeline scripts that run the provided function from the pipeline.
301 Either the function itself or the name of the function can be given.
302 """
303 for script in self.find(function):
304 self.scripts.remove(script)
306 def configure(self, kwargs):
307 """
308 Override the values passed on to the keyword arguments of all the scripts.
310 Any keywords not expected by a script will be silently skipped for that script.
311 """
312 for script in self.scripts:
313 try:
314 script.configure(kwargs, allow_new_keywords=False, check=False)
315 except KeyError:
316 pass
318 def add_to_graph(self, graph: graphviz.Digraph=None, tree: FileTree=None):
319 """
320 Add all the pipeline functions to the provided graph.
322 :param graph: GraphViz graph object (will be altered)
323 :param tree: concrete FileTree
324 """
325 if graph is None:
326 graph = graphviz.Digraph("pipeline", format="svg")
327 placeholder_color = {}
328 for idx, script in enumerate(self.scripts):
329 script.add_node(graph, idx, tree, placeholder_color)
330 return graph
333class PipedFunction:
334 """Represents a function stored in a pipeline."""
336 def __init__(self, function, submit_params: Dict, kwargs=None, no_iter=None, override=None, as_path=True, batch=None):
337 """
338 Wrap a function with additional information to run it in a pipeline.
340 :param function: python function that will be run in pipeline.
341 :param submit_params: parameters to submit job running python function to cluster using `fsl_sub`.
342 :param kwargs: maps function keyword arguments to templates, variables, or actual values.
343 :param no_iter: which parameters to not iterate over (i.e., they are passed to the function in an array).
344 :param override: dictionary overriding the placeholders set in the filetree.
345 :param as_path: whether to pass on pathlib.Path objects instead of strings to the functions (default: True).
346 :param batch: label used to batch multiple jobs into one when submitting to the cluster.
347 """
348 self.function = function
349 self.submit_params = submit_params
350 if override is None:
351 override = {}
352 self.override = override
353 self.as_path = as_path
354 self.batch = batch
356 self.all_kwargs: Dict[str, Any] = {}
357 self.configure(function.__annotations__)
359 if kwargs is not None:
360 self.configure(kwargs)
362 if no_iter is None:
363 no_iter = set()
364 elif isinstance(no_iter, str):
365 no_iter = [no_iter]
366 self.explicit_no_iter = set(no_iter)
368 def copy(self, ):
369 """Create a copy of this PipedFunction for pipeline merging."""
370 new_script = copy(self)
371 new_script.all_kwargs = dict(self.all_kwargs)
372 self.explicit_no_iter = set(self.explicit_no_iter)
373 return new_script
375 @property
376 def no_iter(self, ) -> Set[str]:
377 """Sequence of placeholder names that should not be iterated over."""
378 res = {key if value.key is None else value.key for key, value in self.placeholders.items() if value.no_iter}
379 res.update(self.explicit_no_iter)
380 return res
382 def configure(self, kwargs, allow_new_keywords=True, check=True):
383 """Override the values passed on to the keyword arguments of the script.
385 :param kwargs: new placeholders/templates/values for keyword arguments
386 :param allow_new_keywords: if set to False, don't allow new keywords
387 """
388 bad_keys = []
389 signature = inspect.signature(self.function)
390 has_kws = any(param.kind == param.VAR_KEYWORD for param in signature.parameters.values())
392 if not allow_new_keywords:
393 existing_kws = set(signature.parameters)
394 if has_kws:
395 existing_kws.update(self.all_kwargs.keys())
396 if check:
397 bad_keys = {key for key in kwargs.keys() if key not in signature.parameters}
398 raise KeyError(f"Tried to configure {self} with keys that are not expected by the function: {bad_keys}")
399 for key, value in kwargs.items():
400 if not allow_new_keywords and key not in existing_kws:
401 continue
402 self.all_kwargs[key] = value
404 @property
405 def placeholders(self, ):
406 """Return dictionary with placeholder values overriden for this function."""
407 return {key: value for key, value in self.all_kwargs.items() if isinstance(value, PlaceHolder)}
409 @property
410 def templates(self, ):
411 """Return dictionary with templates used as input, output, or reference for this function."""
412 return {key: value for key, value in self.all_kwargs.items() if isinstance(value, Template)}
414 @property
415 def kwargs(self, ):
416 """Return dictionary with keyword arguments that will be passed on to the function."""
417 return {key: value for key, value in self.all_kwargs.items() if not any(
418 isinstance(value, cls) for cls in (Template, PlaceHolder)
419 )}
421 def filter_templates(self, output=False, all_templates=None) -> Set[str]:
422 """
423 Find all input or output template keys.
425 :param output: if set to True select the input rather than output templates
426 :param all_templates: sequence of all possible templates (required if any Template keys use globbing)
427 :return: set of input or output templates
428 """
429 res = set()
430 for kwarg_key, template in self.templates.items():
431 if ((template.input and not output) or
432 (template.output and output)):
433 template_key = kwarg_key if template.key is None else template.key
435 # apply globbing if template key contains * or ?
436 if '*' in template_key or '?' in template_key:
437 if all_templates is None:
438 raise ValueError(f"Template {template_key} uses globbing, but no template keys are not provided")
439 res.update(fnmatch.filter(all_templates, template_key))
440 else:
441 res.add(template_key)
442 return res
444 def iter_over(self, tree: FileTree) -> Tuple[str, ...]:
445 """
446 Find all the placeholders that should be iterated over before calling the function.
448 These are all the placeholders that affect the input templates, but are not part of `self.no_iter`.
450 :param tree: set of templates with placeholder values
451 :return: placeholder names to be iterated over sorted by name
452 """
453 tree = tree.update(**self.override)
454 in_vars = self.all_placeholders(tree, False)
455 in_vars_linked = {tree.placeholders.linkages.get(key, key) for key in in_vars}
456 out_vars = {tree.placeholders.linkages.get(key, key) for key in self.all_placeholders(tree, True)}
458 updated_no_iter = {
459 tree.placeholders.linkages.get(
460 tree.placeholders.find_key(key),
461 tree.placeholders.find_key(key),
462 ) for key in self.no_iter
463 }
464 all_in = in_vars_linked.union(updated_no_iter)
465 if len(out_vars.difference(all_in)) > 0:
466 raise ValueError(f"{self}: Output template depends on {out_vars.difference(all_in)}, which none of the input templates depend on")
467 return tuple(sorted(
468 {v for v in in_vars if tree.placeholders.linkages.get(v, v) not in updated_no_iter},
469 ))
471 def get_jobs(self, tree: FileTree, all_targets: Dict):
472 """
473 Get a list of all individual jobs defined by this function.
475 :param tree: set of templates with placeholder values
476 :param all_targets: mapping from filenames to Target objects used to match input/output filenames between jobs
477 :return: sequence of jobs
478 """
479 from .job import SingleJob
480 tree = tree.update(**self.override)
481 to_iter = self.iter_over(tree)
482 jobs = []
483 done_kwargs = set()
484 def freeze_value(value):
485 if isinstance(value, xarray.DataArray):
486 return str(value)
487 elif isinstance(value, dict):
488 return frozenset(value.items())
489 else:
490 return value
492 for sub_tree in tree.iter_vars(to_iter):
493 kwargs, in_fns, out_fns, optionals = _single_job_kwargs(self.all_kwargs, sub_tree, tree, all_targets, as_path=self.as_path)
494 job_submit_params, _, _, _ = _single_job_kwargs(self.submit_params, sub_tree, tree, all_targets, as_path=False, is_submit_param=True)
496 frozen_kwargs = frozenset([(key, freeze_value(value)) for (key, value) in kwargs.items()])
497 if frozen_kwargs in done_kwargs:
498 continue
499 done_kwargs.add(frozen_kwargs)
501 jobs.append(SingleJob(
502 self.function,
503 kwargs,
504 job_submit_params,
505 in_fns, out_fns, optionals,
506 {name: sub_tree.placeholders.get(name, None) for name in to_iter},
507 self.batch,
508 ))
509 return jobs
511 def all_placeholders(self, tree: FileTree, output=False) -> Set[str]:
512 """
513 Identify the multi-valued placeholders affecting the input/output templates of this function.
515 :param tree: set of templates with placeholder values
516 :param output: if set to True returns the placeholders for the output than input templates
517 :return: set of all placeholders that affect the input/output templates
518 """
519 res = set()
520 for t in self.filter_templates(output, tree.template_keys()):
521 res.update(tree.get_template(t).placeholders())
522 if not output:
523 for key, variable in self.placeholders.items():
524 res.add(key if variable.key is None else variable.key)
526 bad_keys = {key for key in res if key not in tree.placeholders}
527 if len(bad_keys) > 0:
528 raise ValueError(f"No value set for placeholders: {bad_keys}")
530 _, only_multi = tree.placeholders.split()
531 return {only_multi.find_key(key) for key in res if key in only_multi}
534 def move_to_subtree(self, sub_tree=None, other_mappings=None):
535 """
536 Create a new wrapped function that runs in a sub-tree of a larger tree rather than at the top level.
538 :param sub_tree: name of the sub-tree in the FileTree
539 :param other_mappings: other mappings between templates or placeholder names and their new values
540 """
541 if other_mappings is None:
542 other_mappings = {}
543 new_script = copy(self)
544 new_script.all_kwargs = _update_kwargs(self.all_kwargs, sub_tree, other_mappings)
545 new_script.override = {_update_key(key, sub_tree, other_mappings): value for key, value in self.override.items()}
546 new_script.submit_params = _update_kwargs(self.submit_params, sub_tree, other_mappings)
547 return new_script
549 def __repr__(self, ):
550 """Print function name."""
551 return f"PipedFunction({self.function.__name__})"
553 def add_node(self, graph: graphviz.Graph, index, tree: FileTree, placeholder_color: Dict[str, str]):
554 """
555 Add a node representing this function for a pipeline diagram.
557 :param graph: input pipeline diagram (will be altered)
558 :param index: unique integer identifier to use within the graph
559 """
560 seaborn_colors = ['#0173b2', '#de8f05', '#029e73', '#d55e00', '#cc78bc', '#ca9161', '#fbafe4', '#949494', '#ece133', '#56b4e9']
561 identifier = str(index) + self.function.__name__
562 label = self.function.__name__
563 if tree is not None:
564 placs = self.iter_over(tree)
565 if len(placs) > 0:
566 for p in placs:
567 if p not in placeholder_color:
568 placeholder_color[p] = seaborn_colors[len(placeholder_color) % len(seaborn_colors)]
569 plac_text = ';'.join(f'<font color="{placeholder_color[p]}">{p}</font>' for p in placs)
570 label = fr"<{label}<br/>{plac_text}>"
572 graph.node(identifier, label=label, color='red', shape='box')
573 for output in (False, True):
574 for t in self.filter_templates(output, tree.template_keys()):
575 label = t
576 if tree is not None:
577 all_plac = tree.get_template(t).required_placeholders() | tree.get_template(t).optional_placeholders()
578 multi_plac = sorted([p for p in all_plac if p not in tree.placeholders or not is_singular(tree.placeholders[p])])
579 for p in multi_plac:
580 if p not in placeholder_color:
581 placeholder_color[p] = seaborn_colors[len(placeholder_color) % len(seaborn_colors)]
582 multi_plac = ';'.join(f'<font color="{placeholder_color[p]}">{p}</font>' for p in multi_plac)
583 if len(multi_plac) > 0:
584 label = fr"<{t}<br/>{multi_plac}>"
585 graph.node(t, label, color='blue')
586 if output:
587 graph.edge(identifier, t)
588 else:
589 graph.edge(t, identifier)
591@dataclass
592class Template(object):
593 """Represents a keyword argument that will be mapped to a template path in the FileTree.
595 :param key: template key in FileTree.
596 :param input: Set to true if file should be considered as input (i.e., it should exist before the job is run).
597 :param output: Set to true if file should be considered as output (i.e., it is expected to exist after the job is run).
598 :param optional: Set to true if input/output file should be considered for creating the dependency graph, but still might not exist.
599 """
601 key: Optional[str] = None
602 input: bool = False
603 output: bool = False
604 optional: bool = False
606 def __call__(self, key=None, optional=False):
607 """Override the template key and whether it is optional."""
608 return Template(key, self.input, self.output, optional)
610@dataclass
611class PlaceHolder(object):
612 """Represents a keyword argument that will be mapped to a placeholder value.
614 :param key: placeholder key in FileTree.
615 :param no_iter: if True the pipeline will not iterate over individual placeholder values, but rather run a single job with all possible placeholder values at once.
616 """
618 key: Optional[str] = None
619 no_iter: bool = False
621 def __call__(self, key=None, no_iter=None):
622 """Override the Placeholder."""
623 if no_iter is None:
624 no_iter = self.no_iter
625 return PlaceHolder(key, no_iter)
628"""Use to mark keyword arguments that represent input filenames of the wrapped function.
630These filenames are expected to exist before the function is run.
632The actual filename is extracted from the FileTree based on the template key.
633This template key will by default match the keyword argument name, but can be overriden by calling `In` (e.g., `In("other_key")`).
634"""
635In = Template(input=True)
637"""Use to mark keyword arguments that represent output filenames of the wrapped function.
639These filenames are expected to exist after the function is run.
640An error is raised if they do not.
642The actual filename is extracted from the FileTree based on the template key.
643This template key will by default match the keyword argument name, but can be overriden by calling `In` (e.g., `In("other_key")`).
644"""
645Out = Template(output=True)
647"""Use to mark keyword arguments that represent reference filenames of the wrapped function.
649These filenames might or might not exist before or after the job is run.
651The actual filename is extracted from the FileTree based on the template key.
652This template key will by default match the keyword argument name, but can be overriden by calling `In` (e.g., `In("other_key")`).
653"""
654Ref = Template()
656"""Use to mark keyword arguments that represent placeholder values in the pipeline.
658Placeholder values are returned as :any:`PlaceholderValue` objects.
659"""
660Var = PlaceHolder()
663def to_templates_dict(input_files=(), output_files=(), reference_files=()):
664 """Convert a sequence of input/output/reference files into a template dictionary.
666 Args:
667 input_files (sequence, optional): Template keys representing input files. Defaults to ().
668 output_files (sequence, optional): Template keys representing output files. Defaults to ().
669 reference_files (sequence, optional): Template keys representing reference paths. Defaults to ().
671 Raises:
672 KeyError: If the same template key is used as more than one of the input/output/reference options
674 Returns:
675 dict: mapping of the keyword argument names to the Template objects
676 """
677 res = {}
678 for files, cls in [
679 (input_files, In),
680 (output_files, Out),
681 (reference_files, Ref),
682 ]:
683 for name in files:
684 if isinstance(name, str):
685 short_name = name.split('/')[-1]
686 else:
687 short_name, name = name
688 if name in res:
689 raise KeyError(f"Dual definition for template {name}")
690 res[short_name] = cls(name)
692 return res
695"""
696Named tuple representing a placeholder value with three fields.
698:param key: string with placeholder key in pipeline.
699:param index: integer with the index (or multiple integers with the indices if `no_iter` is set for this placeholder) of the placeholder values in list of all possible placeholder values.
700:param value: actual placeholder value (or sequence of values if `no_iter is set for this placeholder)
701"""
702PlaceholderValue = namedtuple("PlaceholderValue", ["key", "index", "value"])
705def _single_job_kwargs(wrapper_kwargs, single_tree: FileTree, full_tree: FileTree, all_targets, as_path=False, is_submit_param=False):
706 """Create the keywords, input, and output filenames for a single job."""
707 from .job import get_target
708 final_dict = {}
709 input_filenames = []
710 output_filenames = []
711 optional_filenames = []
712 for kwarg_key, value in wrapper_kwargs.items():
713 if isinstance(value, Template):
714 key = kwarg_key if value.key is None else value.key
716 use_glob = '*' in key or '?' in key
717 iter_keys = fnmatch.filter(single_tree.template_keys(), key) if use_glob else [key]
718 res = {}
719 for proc_key in iter_keys:
720 try:
721 proc_res = single_tree.get(proc_key)
722 is_xarray = False
723 except KeyError:
724 proc_res = single_tree.get_mult(proc_key)
725 is_xarray = True
726 if is_xarray:
727 if is_submit_param:
728 raise ValueError(f"Submit parameter {kwarg_key} is set to the template {key}, which has more than one possible value")
729 filenames = [get_target(fn, all_targets, proc_key) for fn in proc_res.data.flatten()]
730 if value.input:
731 input_filenames.extend(filenames)
732 if value.output:
733 output_filenames.extend(filenames)
734 if value.optional:
735 optional_filenames.extend(filenames)
736 proc_res = xarray.apply_ufunc(Path if as_path else str, proc_res, vectorize=True)
737 else:
738 filename = get_target(proc_res, all_targets, proc_key)
739 if value.input:
740 input_filenames.append(filename)
741 if value.output:
742 output_filenames.append(filename)
743 if value.optional:
744 optional_filenames.append(filename)
745 proc_res = (Path if as_path else str)(proc_res)
746 res[proc_key] = proc_res
747 if not use_glob:
748 res = res[key]
749 elif isinstance(value, PlaceHolder):
750 key = single_tree.placeholders.find_key(kwarg_key if value.key is None else value.key)
751 plac_value = single_tree.placeholders[key]
752 ref_values = [full_tree.placeholders[key]] if is_singular(full_tree.placeholders[key]) else list(full_tree.placeholders[key])
754 if is_singular(plac_value):
755 res = PlaceholderValue(key, ref_values.index(plac_value), plac_value)
756 if is_submit_param:
757 res = res.value
758 else:
759 res = PlaceholderValue(key, tuple(ref_values.index(v) for v in plac_value), tuple(plac_value))
760 if is_submit_param:
761 raise ValueError(f"Submit parameter {kwarg_key} is set to the placeholder {key}, which has more than one possible value")
762 else:
763 res = value
764 final_dict[kwarg_key] = res
765 return final_dict, input_filenames, output_filenames, optional_filenames
768def check_submit_parameters(submit_params: Dict):
769 """
770 Check that the submit parameters are actually valid.
772 Raises a ValueError if there are any submit parameters set not expected by fsl_sub.
773 """
774 signature = inspect.signature(submit)
775 unrecognised = set(submit_params.keys()).difference(signature.parameters.keys())
776 if len(unrecognised) > 0:
777 raise ValueError(f"Unrecognised fsl_sub submit keywords: {unrecognised}")
778 if 'jobhold' in submit_params:
779 raise ValueError("Job-holds are managed by the fsl-pipe and cannot be set by the pipeline definition.")
782def _update_key(key, sub_tree, other_mappings):
783 """Update template key with sub_tree precursor, unless listed in other_mappings."""
784 if key in other_mappings:
785 return other_mappings[key]
786 elif sub_tree is not None:
787 return sub_tree + '/' + key
788 else:
789 return key
792def _update_kwargs(input_dict, sub_tree, other_mappings):
793 """Update placeholders and templates in input_dict with sub_tree precursor, unless listed in other_mappings."""
794 kwargs = {}
795 for key, value in input_dict.items():
796 kwargs[key] = value
797 if isinstance(value, PlaceHolder) or isinstance(value, Template):
798 use_key = value.key if value.key is not None else key
799 kwargs[key] = value(_update_key(use_key, sub_tree, other_mappings))
800 return kwargs
803pipe = Pipeline()