Coverage for src/fsl_pipe/pipeline.py: 70%

431 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-05-14 13:47 +0100

1""" 

2Defines the pipeline before a FileTree is provided. 

3 

4A pipeline is a collection of functions with mapping from the function parameters to input/output/reference/placeholder basenames. 

5""" 

6from typing import List, Optional, Set, Tuple, Dict, Collection, Callable, Union, Any 

7import argparse 

8from dataclasses import dataclass 

9from file_tree import FileTree 

10from copy import copy 

11import inspect 

12import xarray 

13from collections import namedtuple 

14from fsl_sub import submit 

15from file_tree.template import is_singular 

16from pathlib import Path 

17from numpy import unique 

18import graphviz 

19import fnmatch 

20try: 

21 import datalad 

22except ImportError: 

23 datalad = None 

24 

25class Pipeline: 

26 """Collection of python functions forming a pipeline. 

27 

28 You can either create a new pipeline (`from fsl_pipe import Pipeline; pipe = Pipeline()`) or use a pre-existing one (`from fsl_pipe import pipe`) 

29 

30 Scripts are added to a pipeline by using the pipeline as a decorator (see :meth:`__call__`). 

31 

32 To run the pipeline based on instructions from the command line run :meth:`pipe.cli(tree) <cli>`,  

33 where tree is a FileTree defining the directory structure of the pipeline input & output files. 

34 

35 :ivar scripts: list of :class:`PipedFunction`, which define the python functions forming the pipeline and their input/output templates 

36 """ 

37 

38 def __init__(self, scripts=None, default_output=None, default_submit=None): 

39 """Create a new empty pipeline.""" 

40 if scripts is None: 

41 scripts = [] 

42 if default_output is None: 

43 default_output = [] 

44 if default_submit is None: 

45 default_submit = {} 

46 self.scripts: List[PipedFunction] = list(scripts) 

47 self.default_output: List[str] = list(default_output) 

48 check_submit_parameters(default_submit) 

49 self.default_submit = default_submit 

50 

51 def __call__(self, function=None, *, kwargs=None, no_iter=None, placeholders=None, as_path=False, submit=None, batch=None): 

52 """Add a python function as a :class:`PipedFunction` to the pipeline. 

53 

54 This is the main route through which jobs are added to the pipeline. 

55 

56 .. code-block:: python 

57 

58 from fsl_pipe import pipe, In, Out, Ref, Var 

59 

60 @pipe(submit=dict(jobtime=40)) 

61 def func(in_path: In, out_path: Out, ref_path: Ref, placeholder_key: Var): 

62 pass 

63 

64 :param function: Optionally provide the function directly. This can be useful when not using `pipe` as a decorator, e.g.: 

65  

66 .. code-block:: python 

67 

68 from fsl_pipe import pipe, In, Out, Ref, Var 

69 from shutil import copyfile 

70 

71 pipe(copyfile, kwargs={'src': In('template_in'), 'dst': Out('template_out')}) 

72 

73 :param kwargs: maps function keyword arguments to template names (In/Out/Ref), placeholder names (Var), or anything else. Templates or placeholders are replaced by their values based on the file-tree. Anything else is passed on unaltered. 

74 :param no_iter: optional set of parameters not to iterate over. 

75 :param as_path: if set to true, provide template filenames to the function as `pathlib.Path` objects instead of strings. 

76 :param placeholders: dictionary overriding the placeholders set in the filetree for this specific function. This can, for example, be used to run this function on a sub-set of subjects. 

77 :param submit: dictionary with the flags to pass on to `fsl_sub.submit`, when submitting jobs to a cluster queue. 

78 :param batch: label used to batch multiple jobs into one when submitting to the cluster 

79 """ 

80 if submit is None: 

81 submit = {} 

82 submit_params = dict(**self.default_submit, **submit) 

83 check_submit_parameters(submit_params) 

84 def wrapper(func): 

85 self.scripts.append(PipedFunction(func, submit_params=submit_params, kwargs=kwargs, no_iter=no_iter, override=placeholders, as_path=as_path, batch=batch)) 

86 return func 

87 if function is None: 

88 return wrapper 

89 wrapper(function) 

90 return function 

91 

92 def generate_jobs(self, tree: FileTree): 

93 """ 

94 Split the pipeline into individual jobs. 

95 

96 Produces an unsorted `JobList`, which can be sorted by running `filter` on it. 

97 

98 :param tree: set of templates for the input/output/reference files with all possible placeholder values 

99 """ 

100 from .job import JobList, FileTarget 

101 all_targets: Dict[str, FileTarget] = {} 

102 jobs = [] 

103 for script in self.scripts: 

104 jobs.extend(script.get_jobs(tree, all_targets)) 

105 return JobList(tree, jobs, all_targets) 

106 

107 def default_parser(self, tree: FileTree, parser: Optional[argparse.ArgumentParser]=None, include_vars=None, exclude_vars=()) -> argparse.ArgumentParser: 

108 """ 

109 Add default `fsl-pipe` arguments to an argument parser (will create a new argument parser if needed). 

110 

111 :param tree: `file_tree.FileTree` object describing the directory structure for the input/output files (defaults to datalad tree). 

112 :param parser: Optional argument parser that will be updated with the default `fsl-pipe` arguments (by default a new one is created). 

113 :param include_vars: if provided, only include expose placeholders in this list to the command line 

114 :param exclude_vars: exclude placeholders in this list from the command line 

115 :return: Argument parser with the default `fsl-pipe` arguments. If one was provided as input, that one will be returned. 

116 """ 

117 from .job import RunMethod 

118 if parser is None: 

119 parser = argparse.ArgumentParser(description="Runs the pipeline") 

120 if len(self.scripts) == 0: 

121 raise ValueError("The pipeline does not contain any scripts...") 

122 templates = set.union(*[script.filter_templates(True, tree.template_keys()) for script in self.scripts]) 

123 

124 if len(self.default_output) == 0: 

125 default_output = list(sorted(templates)) 

126 else: 

127 default_output = self.default_output 

128 

129 parser.add_argument("templates", nargs="*", default=default_output, 

130 help=f"Set to one or more template keys or file patterns (e.g., \"*.txt\"). Only these templates/files will be produced. If not provided all templates will be produced ({', '.join(default_output)}).") 

131 parser.add_argument("-m", '--pipeline-method', default=RunMethod.default().name, 

132 choices=[m.name for m in RunMethod], 

133 help=f"method used to run the jobs (default: {RunMethod.default().name})") 

134 parser.add_argument("-o", '--overwrite', action='store_true', help="If set overwrite any requested files.") 

135 parser.add_argument("-d", '--overwrite_dependencies', action='store_true', 

136 help="If set also overwrites dependencies of requested files.") 

137 parser.add_argument("-j", '--job-hold', default='', 

138 help='Place a hold on the whole pipeline until job has completed.') 

139 parser.add_argument('--skip-missing', action='store_true', 

140 help='If set skip running any jobs depending on missing data. This replaces the default behaviour of raising an error if any required input data is missing.') 

141 parser.add_argument("-q", '--quiet', action='store_true', help="Suppresses the report on what will be run (might speed up starting up the pipeline substantially).") 

142 parser.add_argument("-b", "--batch", action="store_true", help="Batch jobs based on submit parameters and pipeline labels before running them.") 

143 parser.add_argument("--scale-jobtime", type=float, default=1., help="Scale the developer-set job times by the given value. Set to a number above 1 if you are analysing a particularly large datasets and the individual jobs might be slower than expected. Only affects jobs submitted to a cluster (i.e., `--pipeline_method=submit`).") 

144 gui_help = "Start a GUI to select which output files should be produced by the pipeline." 

145 try: 

146 import fsl_pipe_gui 

147 except ModuleNotFoundError: 

148 gui_help = gui_help + " Missing requirement 'fsl-pipe-gui' to run the GUI. Please install this with `conda/pip install fsl-pipe-gui`." 

149 parser.add_argument("-g", "--gui", action="store_true", help=gui_help) 

150 if datalad is not None: 

151 parser.add_argument("--datalad", action='store_true', 

152 help="run datalad get on all input data before running/submitting the jobs") 

153 

154 def add_placeholder_flag(variable): 

155 if not isinstance(variable, str): 

156 for v in variable: 

157 add_placeholder_flag(v) 

158 return 

159 if '/' in variable: 

160 return 

161 if variable in exclude_vars: 

162 return 

163 if include_vars is not None and variable not in include_vars: 

164 return 

165 value = tree.placeholders[variable] 

166 default = str(value) if is_singular(value) else ','.join([str(v) for v in unique(value)]) 

167 parser.add_argument(f"--{variable}", nargs='+', help=f"Use to set the possible values of {variable} to the selected values (default: {default})") 

168 

169 add_placeholder_flag(tree.placeholders.keys()) 

170 

171 return parser 

172 

173 def run_cli(self, args: argparse.Namespace, tree: FileTree): 

174 """ 

175 Run the pipeline based on arguments extracted from the default argument parser. 

176 

177 :param args: Namespace consisting of arguments extracted from the command line (produced by `argparse.ArgumentParser.parse_args`). This is expected to contain: 

178 

179 - templates (defaults to `self.default_output`): list of which templates the pipeline should produce 

180 - pipeline_method: string with method used to run the jobs 

181 - overwrite: boolean, which if true overwrite any requested files 

182 - overwrite_dependencies: boolean, which if true also overwrites dependencies of requested files 

183 - job-hold: string with comma-separated list, which contains job(s) that will be waited for 

184 - skip-missing: whether to skip jobs depending on missing data instead of raising an error 

185 - datalad (defaults to False): if true, run datalad get on all input data before running/submitting the jobs 

186 - {placeholder} (defaults to values in FileTree): sequence of strings overwriting the possible values for a particular placeholder 

187 

188 :param tree: Definition of pipeline input/output files 

189 """ 

190 from .job import RunMethod 

191 tree = tree.copy() 

192 

193 def set_placeholder(key): 

194 if not isinstance(key, str): 

195 for k in key: 

196 set_placeholder(k) 

197 return 

198 if getattr(args, key, None) is not None: 

199 tree.placeholders[key] = args.__dict__[key] 

200 for var in tree.placeholders.keys(): 

201 set_placeholder(var) 

202 

203 if args.gui: 

204 self.gui(tree, overwrite_dependencies=args.overwrite_dependencies, run_method=RunMethod[args.pipeline_method]) 

205 return 

206 requested_templates = getattr( 

207 args, 'templates', 

208 None if len(self.default_output) == 0 else self.default_output 

209 ) 

210 concrete = self.generate_jobs(tree) 

211 torun = concrete.filter(requested_templates, overwrite=args.overwrite, overwrite_dependencies=args.overwrite_dependencies, skip_missing=args.skip_missing) 

212 if not args.quiet: 

213 torun.report() 

214 if getattr(args, "batch", False): 

215 use_label = any(job.batch is not None for job in concrete.jobs) 

216 torun = torun.batch(use_label=use_label) 

217 torun.scale_jobtime(args.scale_jobtime) 

218 if datalad is not None and args.datalad: 

219 torun.run_datalad() 

220 torun.run(RunMethod[args.pipeline_method], wait_for=() if args.job_hold == '' else args.job_hold.split(',')) 

221 

222 def cli(self, tree: Optional[FileTree]=None, include_vars=None, exclude_vars=(), cli_arguments=None): 

223 """ 

224 Run the pipeline from the command line. 

225 

226 :param tree: `file_tree.FileTree` object describing the directory structure for the input/output files (defaults to datalad tree). 

227 :param include_vars: if provided, only include expose variables in this list to the command line 

228 :param exclude_vars: exclude variables in this list from the command line 

229 :param cli_arguments: list of command line arguments. If not set the arguments used in the python call (`sys.argv`) are used. 

230 """ 

231 if tree is None: 

232 if datalad is None: 

233 raise ValueError("Argument 'tree' missing: please provide a FileTree describing the directory structure for the pipeline") 

234 from .datalad import get_tree 

235 try: 

236 tree = get_tree() 

237 except IOError: 

238 raise ValueError("Pipeline run outside of a datalad dataset, so a FileTree needs to be explicitly provided using the 'tree' argument") 

239 if tree is None: 

240 raise ValueError("No reference FileTree for pipeline found") 

241 

242 parser = self.default_parser(tree=tree, include_vars=include_vars, exclude_vars=exclude_vars) 

243 args = parser.parse_args(cli_arguments) 

244 self.run_cli(args, tree) 

245 

246 def gui(self, tree: FileTree, **kwargs): 

247 """ 

248 Run the fsl-pipe-gui interface to select pipeline output. 

249 

250 :param tree: `file_tree.FileTree` object describing the directory structure for the input/output files. 

251 :param overwrite_depencencies: set to True to overwrite dependencies 

252 :param run_method: overrides the default method to run the jobs 

253 """ 

254 try: 

255 from fsl_pipe_gui import run_gui 

256 except ModuleNotFoundError: 

257 raise ModuleNotFoundError("'fsl-pipe-gui' needs to be installed to run the graphical user interface. Please run `pip/conda install fsl-pipe-gui` and try again.") 

258 run_gui(self, tree, **kwargs) 

259 

260 def move_to_subtree(self, sub_tree=None, other_mappings=None): 

261 """ 

262 Create a new pipeline that runs in a sub-tree of a larger tree rather than at the top level. 

263 

264 :param sub_tree: name of the sub-tree in the FileTree 

265 :param other_mappings: other mappings between templates or placeholder names and their new values 

266 """ 

267 if other_mappings is None: 

268 other_mappings = {} 

269 all_scripts = [script.move_to_subtree(sub_tree, other_mappings) for script in self.scripts] 

270 new_default_submit = _update_kwargs(self.default_submit, sub_tree, other_mappings) 

271 return Pipeline(all_scripts, [_update_key(key, sub_tree, other_mappings) for key in self.default_output], new_default_submit) 

272 

273 @classmethod 

274 def merge(cls, pipelines: Collection["Pipeline"]): 

275 """ 

276 Combine multiple pipelines into a single one. 

277 

278 :param pipelines: pipelines containing part of the jobs 

279 :return: parent pipeline containing all of the jobs in pipelines 

280 """ 

281 new_pipeline = Pipeline() 

282 for pipeline in pipelines: 

283 new_pipeline.scripts.extend([s.copy() for s in pipeline.scripts]) 

284 new_pipeline.default_output.extend(pipeline.default_output) 

285 return new_pipeline 

286 

287 def find(self, function: Union[str, Callable]): 

288 """ 

289 Iterate over any pipeline scripts that run the provided function. 

290 

291 Either the function itself or the name of the function can be given. 

292 """ 

293 for script in list(self.scripts): 

294 if script.function == function or getattr(script.function, '__name__', None) == function: 

295 yield script 

296 

297 def remove(self, function: Union[str, Callable]): 

298 """ 

299 Remove any pipeline scripts that run the provided function from the pipeline. 

300 

301 Either the function itself or the name of the function can be given. 

302 """ 

303 for script in self.find(function): 

304 self.scripts.remove(script) 

305 

306 def configure(self, kwargs): 

307 """ 

308 Override the values passed on to the keyword arguments of all the scripts. 

309 

310 Any keywords not expected by a script will be silently skipped for that script. 

311 """ 

312 for script in self.scripts: 

313 try: 

314 script.configure(kwargs, allow_new_keywords=False, check=False) 

315 except KeyError: 

316 pass 

317 

318 def add_to_graph(self, graph: graphviz.Digraph=None, tree: FileTree=None): 

319 """ 

320 Add all the pipeline functions to the provided graph. 

321 

322 :param graph: GraphViz graph object (will be altered) 

323 :param tree: concrete FileTree 

324 """ 

325 if graph is None: 

326 graph = graphviz.Digraph("pipeline", format="svg") 

327 placeholder_color = {} 

328 for idx, script in enumerate(self.scripts): 

329 script.add_node(graph, idx, tree, placeholder_color) 

330 return graph 

331 

332 

333class PipedFunction: 

334 """Represents a function stored in a pipeline.""" 

335 

336 def __init__(self, function, submit_params: Dict, kwargs=None, no_iter=None, override=None, as_path=True, batch=None): 

337 """ 

338 Wrap a function with additional information to run it in a pipeline. 

339 

340 :param function: python function that will be run in pipeline. 

341 :param submit_params: parameters to submit job running python function to cluster using `fsl_sub`. 

342 :param kwargs: maps function keyword arguments to templates, variables, or actual values. 

343 :param no_iter: which parameters to not iterate over (i.e., they are passed to the function in an array). 

344 :param override: dictionary overriding the placeholders set in the filetree. 

345 :param as_path: whether to pass on pathlib.Path objects instead of strings to the functions (default: True). 

346 :param batch: label used to batch multiple jobs into one when submitting to the cluster. 

347 """ 

348 self.function = function 

349 self.submit_params = submit_params 

350 if override is None: 

351 override = {} 

352 self.override = override 

353 self.as_path = as_path 

354 self.batch = batch 

355 

356 self.all_kwargs: Dict[str, Any] = {} 

357 self.configure(function.__annotations__) 

358 

359 if kwargs is not None: 

360 self.configure(kwargs) 

361 

362 if no_iter is None: 

363 no_iter = set() 

364 elif isinstance(no_iter, str): 

365 no_iter = [no_iter] 

366 self.explicit_no_iter = set(no_iter) 

367 

368 def copy(self, ): 

369 """Create a copy of this PipedFunction for pipeline merging.""" 

370 new_script = copy(self) 

371 new_script.all_kwargs = dict(self.all_kwargs) 

372 self.explicit_no_iter = set(self.explicit_no_iter) 

373 return new_script 

374 

375 @property 

376 def no_iter(self, ) -> Set[str]: 

377 """Sequence of placeholder names that should not be iterated over.""" 

378 res = {key if value.key is None else value.key for key, value in self.placeholders.items() if value.no_iter} 

379 res.update(self.explicit_no_iter) 

380 return res 

381 

382 def configure(self, kwargs, allow_new_keywords=True, check=True): 

383 """Override the values passed on to the keyword arguments of the script. 

384 

385 :param kwargs: new placeholders/templates/values for keyword arguments 

386 :param allow_new_keywords: if set to False, don't allow new keywords 

387 """ 

388 bad_keys = [] 

389 signature = inspect.signature(self.function) 

390 has_kws = any(param.kind == param.VAR_KEYWORD for param in signature.parameters.values()) 

391 

392 if not allow_new_keywords: 

393 existing_kws = set(signature.parameters) 

394 if has_kws: 

395 existing_kws.update(self.all_kwargs.keys()) 

396 if check: 

397 bad_keys = {key for key in kwargs.keys() if key not in signature.parameters} 

398 raise KeyError(f"Tried to configure {self} with keys that are not expected by the function: {bad_keys}") 

399 for key, value in kwargs.items(): 

400 if not allow_new_keywords and key not in existing_kws: 

401 continue 

402 self.all_kwargs[key] = value 

403 

404 @property 

405 def placeholders(self, ): 

406 """Return dictionary with placeholder values overriden for this function.""" 

407 return {key: value for key, value in self.all_kwargs.items() if isinstance(value, PlaceHolder)} 

408 

409 @property 

410 def templates(self, ): 

411 """Return dictionary with templates used as input, output, or reference for this function.""" 

412 return {key: value for key, value in self.all_kwargs.items() if isinstance(value, Template)} 

413 

414 @property 

415 def kwargs(self, ): 

416 """Return dictionary with keyword arguments that will be passed on to the function.""" 

417 return {key: value for key, value in self.all_kwargs.items() if not any( 

418 isinstance(value, cls) for cls in (Template, PlaceHolder) 

419 )} 

420 

421 def filter_templates(self, output=False, all_templates=None) -> Set[str]: 

422 """ 

423 Find all input or output template keys. 

424 

425 :param output: if set to True select the input rather than output templates 

426 :param all_templates: sequence of all possible templates (required if any Template keys use globbing) 

427 :return: set of input or output templates 

428 """ 

429 res = set() 

430 for kwarg_key, template in self.templates.items(): 

431 if ((template.input and not output) or 

432 (template.output and output)): 

433 template_key = kwarg_key if template.key is None else template.key 

434 

435 # apply globbing if template key contains * or ? 

436 if '*' in template_key or '?' in template_key: 

437 if all_templates is None: 

438 raise ValueError(f"Template {template_key} uses globbing, but no template keys are not provided") 

439 res.update(fnmatch.filter(all_templates, template_key)) 

440 else: 

441 res.add(template_key) 

442 return res 

443 

444 def iter_over(self, tree: FileTree) -> Tuple[str, ...]: 

445 """ 

446 Find all the placeholders that should be iterated over before calling the function. 

447 

448 These are all the placeholders that affect the input templates, but are not part of `self.no_iter`. 

449 

450 :param tree: set of templates with placeholder values 

451 :return: placeholder names to be iterated over sorted by name 

452 """ 

453 tree = tree.update(**self.override) 

454 in_vars = self.all_placeholders(tree, False) 

455 in_vars_linked = {tree.placeholders.linkages.get(key, key) for key in in_vars} 

456 out_vars = {tree.placeholders.linkages.get(key, key) for key in self.all_placeholders(tree, True)} 

457 

458 updated_no_iter = { 

459 tree.placeholders.linkages.get( 

460 tree.placeholders.find_key(key), 

461 tree.placeholders.find_key(key), 

462 ) for key in self.no_iter 

463 } 

464 all_in = in_vars_linked.union(updated_no_iter) 

465 if len(out_vars.difference(all_in)) > 0: 

466 raise ValueError(f"{self}: Output template depends on {out_vars.difference(all_in)}, which none of the input templates depend on") 

467 return tuple(sorted( 

468 {v for v in in_vars if tree.placeholders.linkages.get(v, v) not in updated_no_iter}, 

469 )) 

470 

471 def get_jobs(self, tree: FileTree, all_targets: Dict): 

472 """ 

473 Get a list of all individual jobs defined by this function. 

474 

475 :param tree: set of templates with placeholder values 

476 :param all_targets: mapping from filenames to Target objects used to match input/output filenames between jobs 

477 :return: sequence of jobs 

478 """ 

479 from .job import SingleJob 

480 tree = tree.update(**self.override) 

481 to_iter = self.iter_over(tree) 

482 jobs = [] 

483 done_kwargs = set() 

484 def freeze_value(value): 

485 if isinstance(value, xarray.DataArray): 

486 return str(value) 

487 elif isinstance(value, dict): 

488 return frozenset(value.items()) 

489 else: 

490 return value 

491 

492 for sub_tree in tree.iter_vars(to_iter): 

493 kwargs, in_fns, out_fns, optionals = _single_job_kwargs(self.all_kwargs, sub_tree, tree, all_targets, as_path=self.as_path) 

494 job_submit_params, _, _, _ = _single_job_kwargs(self.submit_params, sub_tree, tree, all_targets, as_path=False, is_submit_param=True) 

495 

496 frozen_kwargs = frozenset([(key, freeze_value(value)) for (key, value) in kwargs.items()]) 

497 if frozen_kwargs in done_kwargs: 

498 continue 

499 done_kwargs.add(frozen_kwargs) 

500 

501 jobs.append(SingleJob( 

502 self.function, 

503 kwargs, 

504 job_submit_params, 

505 in_fns, out_fns, optionals, 

506 {name: sub_tree.placeholders.get(name, None) for name in to_iter}, 

507 self.batch, 

508 )) 

509 return jobs 

510 

511 def all_placeholders(self, tree: FileTree, output=False) -> Set[str]: 

512 """ 

513 Identify the multi-valued placeholders affecting the input/output templates of this function. 

514 

515 :param tree: set of templates with placeholder values 

516 :param output: if set to True returns the placeholders for the output than input templates 

517 :return: set of all placeholders that affect the input/output templates 

518 """ 

519 res = set() 

520 for t in self.filter_templates(output, tree.template_keys()): 

521 res.update(tree.get_template(t).placeholders()) 

522 if not output: 

523 for key, variable in self.placeholders.items(): 

524 res.add(key if variable.key is None else variable.key) 

525 

526 bad_keys = {key for key in res if key not in tree.placeholders} 

527 if len(bad_keys) > 0: 

528 raise ValueError(f"No value set for placeholders: {bad_keys}") 

529 

530 _, only_multi = tree.placeholders.split() 

531 return {only_multi.find_key(key) for key in res if key in only_multi} 

532 

533 

534 def move_to_subtree(self, sub_tree=None, other_mappings=None): 

535 """ 

536 Create a new wrapped function that runs in a sub-tree of a larger tree rather than at the top level. 

537 

538 :param sub_tree: name of the sub-tree in the FileTree 

539 :param other_mappings: other mappings between templates or placeholder names and their new values 

540 """ 

541 if other_mappings is None: 

542 other_mappings = {} 

543 new_script = copy(self) 

544 new_script.all_kwargs = _update_kwargs(self.all_kwargs, sub_tree, other_mappings) 

545 new_script.override = {_update_key(key, sub_tree, other_mappings): value for key, value in self.override.items()} 

546 new_script.submit_params = _update_kwargs(self.submit_params, sub_tree, other_mappings) 

547 return new_script 

548 

549 def __repr__(self, ): 

550 """Print function name.""" 

551 return f"PipedFunction({self.function.__name__})" 

552 

553 def add_node(self, graph: graphviz.Graph, index, tree: FileTree, placeholder_color: Dict[str, str]): 

554 """ 

555 Add a node representing this function for a pipeline diagram. 

556 

557 :param graph: input pipeline diagram (will be altered) 

558 :param index: unique integer identifier to use within the graph 

559 """ 

560 seaborn_colors = ['#0173b2', '#de8f05', '#029e73', '#d55e00', '#cc78bc', '#ca9161', '#fbafe4', '#949494', '#ece133', '#56b4e9'] 

561 identifier = str(index) + self.function.__name__ 

562 label = self.function.__name__ 

563 if tree is not None: 

564 placs = self.iter_over(tree) 

565 if len(placs) > 0: 

566 for p in placs: 

567 if p not in placeholder_color: 

568 placeholder_color[p] = seaborn_colors[len(placeholder_color) % len(seaborn_colors)] 

569 plac_text = ';'.join(f'<font color="{placeholder_color[p]}">{p}</font>' for p in placs) 

570 label = fr"<{label}<br/>{plac_text}>" 

571 

572 graph.node(identifier, label=label, color='red', shape='box') 

573 for output in (False, True): 

574 for t in self.filter_templates(output, tree.template_keys()): 

575 label = t 

576 if tree is not None: 

577 all_plac = tree.get_template(t).required_placeholders() | tree.get_template(t).optional_placeholders() 

578 multi_plac = sorted([p for p in all_plac if p not in tree.placeholders or not is_singular(tree.placeholders[p])]) 

579 for p in multi_plac: 

580 if p not in placeholder_color: 

581 placeholder_color[p] = seaborn_colors[len(placeholder_color) % len(seaborn_colors)] 

582 multi_plac = ';'.join(f'<font color="{placeholder_color[p]}">{p}</font>' for p in multi_plac) 

583 if len(multi_plac) > 0: 

584 label = fr"<{t}<br/>{multi_plac}>" 

585 graph.node(t, label, color='blue') 

586 if output: 

587 graph.edge(identifier, t) 

588 else: 

589 graph.edge(t, identifier) 

590 

591@dataclass 

592class Template(object): 

593 """Represents a keyword argument that will be mapped to a template path in the FileTree. 

594  

595 :param key: template key in FileTree. 

596 :param input: Set to true if file should be considered as input (i.e., it should exist before the job is run). 

597 :param output: Set to true if file should be considered as output (i.e., it is expected to exist after the job is run). 

598 :param optional: Set to true if input/output file should be considered for creating the dependency graph, but still might not exist. 

599 """ 

600 

601 key: Optional[str] = None 

602 input: bool = False 

603 output: bool = False 

604 optional: bool = False 

605 

606 def __call__(self, key=None, optional=False): 

607 """Override the template key and whether it is optional.""" 

608 return Template(key, self.input, self.output, optional) 

609 

610@dataclass 

611class PlaceHolder(object): 

612 """Represents a keyword argument that will be mapped to a placeholder value. 

613  

614 :param key: placeholder key in FileTree. 

615 :param no_iter: if True the pipeline will not iterate over individual placeholder values, but rather run a single job with all possible placeholder values at once. 

616 """ 

617 

618 key: Optional[str] = None 

619 no_iter: bool = False 

620 

621 def __call__(self, key=None, no_iter=None): 

622 """Override the Placeholder.""" 

623 if no_iter is None: 

624 no_iter = self.no_iter 

625 return PlaceHolder(key, no_iter) 

626 

627 

628"""Use to mark keyword arguments that represent input filenames of the wrapped function. 

629 

630These filenames are expected to exist before the function is run. 

631 

632The actual filename is extracted from the FileTree based on the template key. 

633This template key will by default match the keyword argument name, but can be overriden by calling `In` (e.g., `In("other_key")`). 

634""" 

635In = Template(input=True) 

636 

637"""Use to mark keyword arguments that represent output filenames of the wrapped function. 

638 

639These filenames are expected to exist after the function is run. 

640An error is raised if they do not. 

641 

642The actual filename is extracted from the FileTree based on the template key. 

643This template key will by default match the keyword argument name, but can be overriden by calling `In` (e.g., `In("other_key")`). 

644""" 

645Out = Template(output=True) 

646 

647"""Use to mark keyword arguments that represent reference filenames of the wrapped function. 

648 

649These filenames might or might not exist before or after the job is run. 

650 

651The actual filename is extracted from the FileTree based on the template key. 

652This template key will by default match the keyword argument name, but can be overriden by calling `In` (e.g., `In("other_key")`). 

653""" 

654Ref = Template() 

655 

656"""Use to mark keyword arguments that represent placeholder values in the pipeline. 

657 

658Placeholder values are returned as :any:`PlaceholderValue` objects. 

659""" 

660Var = PlaceHolder() 

661 

662 

663def to_templates_dict(input_files=(), output_files=(), reference_files=()): 

664 """Convert a sequence of input/output/reference files into a template dictionary. 

665 

666 Args: 

667 input_files (sequence, optional): Template keys representing input files. Defaults to (). 

668 output_files (sequence, optional): Template keys representing output files. Defaults to (). 

669 reference_files (sequence, optional): Template keys representing reference paths. Defaults to (). 

670 

671 Raises: 

672 KeyError: If the same template key is used as more than one of the input/output/reference options 

673 

674 Returns: 

675 dict: mapping of the keyword argument names to the Template objects 

676 """ 

677 res = {} 

678 for files, cls in [ 

679 (input_files, In), 

680 (output_files, Out), 

681 (reference_files, Ref), 

682 ]: 

683 for name in files: 

684 if isinstance(name, str): 

685 short_name = name.split('/')[-1] 

686 else: 

687 short_name, name = name 

688 if name in res: 

689 raise KeyError(f"Dual definition for template {name}") 

690 res[short_name] = cls(name) 

691 

692 return res 

693 

694 

695""" 

696Named tuple representing a placeholder value with three fields. 

697 

698:param key: string with placeholder key in pipeline. 

699:param index: integer with the index (or multiple integers with the indices if `no_iter` is set for this placeholder) of the placeholder values in list of all possible placeholder values. 

700:param value: actual placeholder value (or sequence of values if `no_iter is set for this placeholder) 

701""" 

702PlaceholderValue = namedtuple("PlaceholderValue", ["key", "index", "value"]) 

703 

704 

705def _single_job_kwargs(wrapper_kwargs, single_tree: FileTree, full_tree: FileTree, all_targets, as_path=False, is_submit_param=False): 

706 """Create the keywords, input, and output filenames for a single job.""" 

707 from .job import get_target 

708 final_dict = {} 

709 input_filenames = [] 

710 output_filenames = [] 

711 optional_filenames = [] 

712 for kwarg_key, value in wrapper_kwargs.items(): 

713 if isinstance(value, Template): 

714 key = kwarg_key if value.key is None else value.key 

715 

716 use_glob = '*' in key or '?' in key 

717 iter_keys = fnmatch.filter(single_tree.template_keys(), key) if use_glob else [key] 

718 res = {} 

719 for proc_key in iter_keys: 

720 try: 

721 proc_res = single_tree.get(proc_key) 

722 is_xarray = False 

723 except KeyError: 

724 proc_res = single_tree.get_mult(proc_key) 

725 is_xarray = True 

726 if is_xarray: 

727 if is_submit_param: 

728 raise ValueError(f"Submit parameter {kwarg_key} is set to the template {key}, which has more than one possible value") 

729 filenames = [get_target(fn, all_targets, proc_key) for fn in proc_res.data.flatten()] 

730 if value.input: 

731 input_filenames.extend(filenames) 

732 if value.output: 

733 output_filenames.extend(filenames) 

734 if value.optional: 

735 optional_filenames.extend(filenames) 

736 proc_res = xarray.apply_ufunc(Path if as_path else str, proc_res, vectorize=True) 

737 else: 

738 filename = get_target(proc_res, all_targets, proc_key) 

739 if value.input: 

740 input_filenames.append(filename) 

741 if value.output: 

742 output_filenames.append(filename) 

743 if value.optional: 

744 optional_filenames.append(filename) 

745 proc_res = (Path if as_path else str)(proc_res) 

746 res[proc_key] = proc_res 

747 if not use_glob: 

748 res = res[key] 

749 elif isinstance(value, PlaceHolder): 

750 key = single_tree.placeholders.find_key(kwarg_key if value.key is None else value.key) 

751 plac_value = single_tree.placeholders[key] 

752 ref_values = [full_tree.placeholders[key]] if is_singular(full_tree.placeholders[key]) else list(full_tree.placeholders[key]) 

753 

754 if is_singular(plac_value): 

755 res = PlaceholderValue(key, ref_values.index(plac_value), plac_value) 

756 if is_submit_param: 

757 res = res.value 

758 else: 

759 res = PlaceholderValue(key, tuple(ref_values.index(v) for v in plac_value), tuple(plac_value)) 

760 if is_submit_param: 

761 raise ValueError(f"Submit parameter {kwarg_key} is set to the placeholder {key}, which has more than one possible value") 

762 else: 

763 res = value 

764 final_dict[kwarg_key] = res 

765 return final_dict, input_filenames, output_filenames, optional_filenames 

766 

767 

768def check_submit_parameters(submit_params: Dict): 

769 """ 

770 Check that the submit parameters are actually valid. 

771  

772 Raises a ValueError if there are any submit parameters set not expected by fsl_sub. 

773 """ 

774 signature = inspect.signature(submit) 

775 unrecognised = set(submit_params.keys()).difference(signature.parameters.keys()) 

776 if len(unrecognised) > 0: 

777 raise ValueError(f"Unrecognised fsl_sub submit keywords: {unrecognised}") 

778 if 'jobhold' in submit_params: 

779 raise ValueError("Job-holds are managed by the fsl-pipe and cannot be set by the pipeline definition.") 

780 

781 

782def _update_key(key, sub_tree, other_mappings): 

783 """Update template key with sub_tree precursor, unless listed in other_mappings.""" 

784 if key in other_mappings: 

785 return other_mappings[key] 

786 elif sub_tree is not None: 

787 return sub_tree + '/' + key 

788 else: 

789 return key 

790 

791 

792def _update_kwargs(input_dict, sub_tree, other_mappings): 

793 """Update placeholders and templates in input_dict with sub_tree precursor, unless listed in other_mappings.""" 

794 kwargs = {} 

795 for key, value in input_dict.items(): 

796 kwargs[key] = value 

797 if isinstance(value, PlaceHolder) or isinstance(value, Template): 

798 use_key = value.key if value.key is not None else key 

799 kwargs[key] = value(_update_key(use_key, sub_tree, other_mappings)) 

800 return kwargs 

801 

802 

803pipe = Pipeline()