Coverage for src/fsl_pipe/job.py: 84%

542 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-05-14 13:47 +0100

1"""Defines pipeline interface after it has been split into individual jobs based on a FileTree.""" 

2from functools import lru_cache 

3from file_tree.file_tree import FileTree 

4from typing import Optional, Set, Dict, Collection, Any, Callable, Sequence, List, Tuple 

5from enum import Enum 

6from fsl.utils.fslsub import func_to_cmd 

7from loguru import logger 

8from pathlib import Path 

9from fsl_sub import submit, config 

10import re 

11import inspect 

12from contextlib import contextmanager 

13from wcmatch.glob import globmatch, GLOBSTAR 

14from warnings import warn 

15import numpy as np 

16 

17class OutputMissing(IOError): 

18 """Raised if a job misses the output files.""" 

19 pass 

20 

21 

22class InputMissingRun(IOError): 

23 """Raised if a job misses the input files when it is being launched.""" 

24 pass 

25class InputMissingPipe(IOError): 

26 """Raised if a job misses input files while it is being added to the pipeline.""" 

27 def __init__(self, target, missing, dependency_graph=None): 

28 self.target = target 

29 self.missing = missing 

30 if dependency_graph is None: 

31 dependency_graph = [] 

32 self.dependency_graph = dependency_graph 

33 super().__init__("") 

34 

35 def __str__(self, ): 

36 dependency = ' -> '.join([str(d) for d in self.dependency_graph]) 

37 return f"{self.target} can not be added to pipeline as it is missing required input files: {self.missing} (dependency_graph: {dependency} )" 

38 

39 

40class RunMethod(Enum): 

41 """How to run the individual jobs.""" 

42 

43 local = 1 

44 submit = 2 

45 dask = 3 

46 

47 def default(): 

48 """Return default RunMethod, which is `submit` on a cluster and `local` otherwise.""" 

49 return RunMethod.submit if config.has_queues() else RunMethod.local 

50 

51 

52class JobList: 

53 """Pipeline with a concrete set of jobs. 

54  

55 Produced from a :class:`fsl_pipe.pipeline.Pipeline` based on the provided `FileTree`. 

56 

57 The default `JobList` produced by `Pipeline.generate_jobs` is unsorted and  

58 needs to be sorted by running `filter` before running. 

59 """ 

60 

61 def __init__(self, tree: FileTree, jobs: Sequence["SingleJob"], targets: Dict[str, "FileTarget"]): 

62 """Create a new concrete set of jobs.""" 

63 self.tree = tree 

64 self.jobs = jobs 

65 self.targets = targets 

66 self._sort() 

67 

68 def __len__(self, ): 

69 return len(self.jobs) 

70 

71 def filter(self, templates: Collection[str]=None, overwrite=False, overwrite_dependencies=False, skip_missing=False) -> "JobList": 

72 """Filter out a subset of jobs. 

73 

74 This does a total of three things: 

75 - Only include jobs needed to create the templates or file patterns given in `templates`. 

76 - Exclude jobs for which output files already exist, unless requested by `overwrite` or `overwrite_dependencies`. 

77 - Sort the jobs, so that dependencies are run before the jobs that depend on them. 

78 

79 :param templates: target template keys (default: all templates); Any jobs required to produce those files will be kept in the returned pipeline. 

80 :param overwrite: if True overwrite the files matching the template even if they already exist, defaults to False 

81 :param overwrite_dependencies: if True rerun any dependent jobs even if the output of those jobs already exists, defaults to False 

82 :param skip_missing: if True remove any jobs missing input dependencies. If not set, an error is raised whenever inputs are missing. 

83 :return: Concrete pipeline with the jobs needed to produces `templates` (with optional overriding) 

84 """ 

85 if isinstance(templates, str): 

86 templates = [templates] 

87 def add_target(target: "FileTarget", warn_msg=""): 

88 if target.exists() and (not overwrite or target.producer is None): 

89 return 

90 if target.producer is None: 

91 warn(f"No job found that produces {target.filename} {warn_msg}") 

92 return 

93 try: 

94 target.producer.add_to_jobs(all_jobs, overwrite=overwrite, overwrite_dependencies=overwrite_dependencies) 

95 except InputMissingPipe as e: 

96 if skip_missing: 

97 return 

98 e.dependency_graph.insert(0, e.target) 

99 e.target = target.filename 

100 raise 

101 

102 all_jobs: Tuple[List[SingleJob], Set[SingleJob]] = ([], set()) 

103 if templates is None: 

104 for target in self.targets.values(): 

105 if target.producer is None: 

106 continue 

107 add_target(target) 

108 else: 

109 for template in templates: 

110 if template in self.tree.template_keys(): 

111 for filename in self.tree.get_mult(template).data.flatten(): 

112 target = get_target(filename, self.targets, from_template=template) 

113 add_target(target, f"(part of template {template})") 

114 else: 

115 matches = get_matching_targets(template, self.targets) 

116 if len(matches) == 0: 

117 warn(f"No files were found that match {template}. It is not a template in the FileTree nor does it match any files that can be produced by this pipeline.") 

118 for target in matches: 

119 add_target(target, f"(matches {template})") 

120 return JobList(self.tree, all_jobs[0], self.targets) 

121 

122 def _sort(self, ): 

123 """Sorts the job in the `jobs` list in place based on dependencies. 

124 

125 A job dependencies should always come before the job itself. 

126 This sorting is done purely based on the job inputs/outputs and does not depend on which files actually exist on disk. 

127 """ 

128 all_jobs: Tuple[List[SingleJob], Set[SingleJob]] = ([], set()) 

129 for job in self.jobs: 

130 job.add_to_jobs(all_jobs, only_sort=True) 

131 

132 # Remove jobs that were already filtered out 

133 sorted_jobs = [j for j in all_jobs[0] if j in self.jobs] 

134 assert len(self.jobs) == len(sorted_jobs) 

135 self.jobs = sorted_jobs 

136 

137 

138 def batch(self, use_label=False, only_connected=False, split_on_ram=False, use_placeholders=True, batch_unlabeled=False): 

139 """Batches groups of jobs into a single job. 

140 

141 Two jobs will be batched if: 

142 

143 1. There are no intermediate jobs that need to run between the two and cannot be included in the same batch (because of #3 below). 

144 2. One of the jobs depend on the other. This is disabled by default. Enable it by setting `only_connected` to True. 

145 3. The jobs are similar. Jobs are similar if they match all of the (enabled) properties listed below: 

146 

147 a) They have the same batch label (set during pipeline creating using the `@pipe(batch=<batch label>)`. This can be enabled by setting the `use_label` keyword to True. If enabled, jobs without a batch label will never be merged. 

148 b) The jobs have the same submit parameters (e.g., "architecture", "coprocessor"), except for "jobtime", "name", and "export_vars". "jobram" can also be ignored by setting the `split_on_ram` keyword to True. 

149 c) The jobs have the same placeholder values. This will prevent jobs from different subjects to be merged with each other. It can be disabled by setting the `use_placeholders` to False. Alternatively, a subset of placeholder values could be considered by passing `use_placeholders` to a sequence of placeholders to consider (e.g., `use_placeholders=["subject", "session"]` will not merge jobs with different "subject" or "session" placeholder values, but will ignore any other placeholders). 

150 

151 A new `JobList` with the batched jobs will be returned. 

152 """ 

153 reference = self.copy() 

154 

155 all_flags = {key for job in reference.jobs for key in job.submit_params.keys()}.difference(["jobtime", "name", "export_vars"]) 

156 if not split_on_ram and "jobram" in all_flags: 

157 all_flags.remove("jobram") 

158 all_flags = sorted(all_flags) 

159 

160 job_labels = [] 

161 for job in reference.jobs: 

162 label = [] 

163 label.append(job.batch if use_label else 1) 

164 label.extend(job.submit_params.get(key, None) for key in all_flags) 

165 if use_placeholders: 

166 if use_placeholders is True: 

167 parameters = job.set_parameters 

168 else: 

169 parameters = {key: value for key, value in job.set_parameters.items() if key in use_placeholders} 

170 label.append(frozenset(parameters.items())) 

171 job_labels.append(tuple(label)) 

172 

173 hashable_job_labels = [tuple(label) if isinstance(label, list) else label for label in job_labels] 

174 

175 to_batch = { 

176 tb: set(job for (job, label) in zip(reference.jobs, job_labels) if label == tb) 

177 for tb in set(hashable_job_labels) 

178 } 

179 jobs = set(reference.jobs) 

180 for to_batch, batching in to_batch.items(): 

181 if to_batch[0] is None and not batch_unlabeled: 

182 # do not batch jobs without a label set (if `use_label` is True) 

183 continue 

184 other_jobs = jobs.difference(batching) 

185 new_jobs = batch_connected_jobs(batching, other_jobs) 

186 if not only_connected: 

187 new_jobs = batch_unconnected_jobs(new_jobs, other_jobs) 

188 jobs = set(new_jobs).union(other_jobs) 

189 return JobList(reference.tree, jobs, reference.targets) 

190 

191 def split_pipeline(self, use_label=False, split_on_ram=False): 

192 """Split the pipeline into multiple stages that require different hardware. 

193 

194 This uses the same rules as :meth:`batch`, except that placeholder values are always ignored. 

195 """ 

196 batched_jobs = self.batch( 

197 use_label=use_label, split_on_ram=split_on_ram, 

198 only_connected=False, use_placeholders=False, batch_unlabeled=True 

199 ) 

200 

201 return [stage.to_job_list(batched_jobs.tree) for stage in batched_jobs.jobs] 

202 

203 def copy(self, ): 

204 """Create a new, independent copy of the JobList.""" 

205 targets = dict() 

206 new_jobs = [job.copy(targets) for job in self.jobs] 

207 return JobList(self.tree, new_jobs, targets) 

208 

209 def report(self, console=None): 

210 """Produce tree reports with the relevant input/output templates.""" 

211 from rich import color, tree 

212 if console is None: 

213 from rich.console import Console 

214 console = Console() 

215 

216 if len(self.jobs) == 0: 

217 console.print("No jobs will be run.") 

218 return 

219 

220 all_inputs = set.union(*[set(j.input_targets) for j in self.jobs]) 

221 all_outputs = set.union(*[set(j.output_targets) for j in self.jobs]) 

222 templates = { 

223 template 

224 for fns in (all_inputs, all_outputs) 

225 for target in fns 

226 for template in target.from_templates 

227 } 

228 

229 def proc_line(tree_obj): 

230 for t in tree_obj.children: 

231 proc_line(t) 

232 line = tree_obj.label 

233 start = line.index('[cyan]') + 6 

234 end = line.index('[/cyan]') 

235 key = line[start:end] 

236 if key not in templates: 

237 return 

238 input_count = 0 

239 output_count = 0 

240 overwrite_count = 0 

241 exists_count = 0 

242 

243 for fn in self.tree.get_mult(key).data.flatten(): 

244 target = get_target(fn, self.targets) 

245 is_output = False 

246 is_input = False 

247 if target in all_outputs: 

248 is_output = True 

249 if target in all_inputs: 

250 is_input = True 

251 if target.exists() and is_output: 

252 overwrite_count += 1 

253 elif is_output: 

254 output_count += 1 

255 elif target.exists(): 

256 exists_count += 1 

257 if is_input: 

258 input_count += 1 

259 if (input_count + output_count + overwrite_count) == 0: 

260 return 

261 counter = "/".join([str(number) if color is None else f"[{color}]{number}[/{color}]" 

262 for number, color in [ 

263 (overwrite_count, 'red'), 

264 (output_count, 'yellow'), 

265 (input_count, 'blue'), 

266 ]]) 

267 tree_obj.label = f"{line} [{counter}]" 

268 

269 for rich_obj in self.tree.filter_templates(templates).fill()._generate_rich_report(): 

270 if isinstance(rich_obj, tree.Tree): 

271 proc_line(rich_obj) 

272 console.print(rich_obj) 

273 

274 def run_datalad(self, ): 

275 """Make sure we can run the pipeline. 

276 

277 Calls `datalad.get` on all input files and `datalad.unlock` on all output files. 

278 """ 

279 input_targets = set() 

280 output_targets = set() 

281 

282 for job_group in self.jobs.values(): 

283 for j in job_group: 

284 input_targets.update(j.exists_before) 

285 output_targets.update(j.exists_after) 

286 

287 input_targets.difference_update(output_targets) 

288 input_fns = [t.filename for t in input_targets] 

289 output_fns = [t.filename for t in output_targets if t.exists] 

290 

291 from .datalad import get_dataset 

292 ds = get_dataset() 

293 ds.get(input_fns) 

294 ds.unlock(output_fns) 

295 

296 def run(self, method: RunMethod=None, wait_for=(), clean_script="on_success"): 

297 """Run all the jobs that are required to produce the given templates. 

298 

299 :param method: defines how to run the job 

300 :param wait_for: job IDs to wait for before running pipeline 

301 :param clean_script: Sets whether the script produced in the log directory when submitting a job to the cluster should be kept after the job finishes. Only used if `method` is "submit". Options: 

302 - "never": Script is kept 

303 - "on_success": (default) Only remove if script successfully finishes (i.e., no error is raised) 

304 - "always": Always remove the script, even if the script raises an error 

305 """ 

306 if method is None: 

307 method = RunMethod.default() 

308 elif not isinstance(method, RunMethod): 

309 method = RunMethod[method] 

310 if len(self.jobs) == 0: 

311 logger.info("No new jobs being run/submitted") 

312 return 

313 

314 prev_count = 0 

315 run_jobs: Dict[SingleJob, Any] = {} 

316 while len(run_jobs) < len(self.jobs): 

317 for job in self.jobs: 

318 if job in run_jobs or any(j in self.jobs and j not in run_jobs for _, j in job.dependencies(only_missing=False)): 

319 continue 

320 dependencies = [run_jobs[j] for _, j in job.dependencies(only_missing=False) if j in run_jobs] 

321 if len(dependencies) == 0: 

322 dependencies = wait_for 

323 run_jobs[job] = job( 

324 method=method, 

325 wait_for=dependencies, 

326 clean_script=clean_script, 

327 ) 

328 if len(run_jobs) == prev_count: 

329 raise ValueError("Unable to run/submit all jobs. Are there circular dependencies?") 

330 prev_count = len(run_jobs) 

331 

332 if method == RunMethod.dask: 

333 import dask 

334 def last_dask_job(*all_jobs): 

335 if any(a != 0 for a in all_jobs): 

336 return 1 

337 return 0 

338 if dask.delayed(last_dask_job, name="last_job")(*run_jobs.values()).compute() == 1: 

339 raise ValueError("One or more of the jobs have failed.") 

340 logger.info("Successfully finished running all jobs using Dask.") 

341 

342 def scale_jobtime(self, scaling): 

343 """ 

344 Scale the submit job times of all jobs by `scaling`. 

345  

346 This will only affect jobs submitted to the cluster. 

347 """ 

348 for job in self.jobs: 

349 job.scale_jobtime(scaling) 

350 return self 

351 

352 

353class JobParent: 

354 """Parent for `SingleJob` and `BatchJob`. 

355 """ 

356 input_targets: Set["FileTarget"] 

357 output_targets: Set["FileTarget"] 

358 optional_targets: Set["FileTarget"] 

359 

360 #@lru_cache(None) 

361 def dependencies(self, only_missing=True) -> Set[Optional["SingleJob"]]: 

362 """Return jobs on which this job depends. 

363 

364 By default it only returns those related with missing input files. 

365 

366 :param only_missing: set to False to also return dependencies that produce files that already exist on disk 

367 """ 

368 jobs = set() 

369 for target in self.input_targets: 

370 if not (only_missing and target.exists()): 

371 jobs.add((target in self.optional_targets, target.producer)) 

372 return jobs 

373 

374 def missing_output(self, reset_cache=False): 

375 """ 

376 Create a list of filenames that do not exist on disk. 

377 

378 Optional outputs are not considered. 

379 

380 :param reset_cache: set to True to not rely on an existing cached existence check 

381 """ 

382 missing = set() 

383 for to_check in self.output_targets: 

384 if to_check in self.optional_targets: 

385 continue 

386 if reset_cache: 

387 to_check.reset_existence() 

388 if not to_check.exists(): 

389 missing.add(to_check) 

390 return missing 

391 

392 def missing_input(self, reset_cache=False, ignore_expected=False): 

393 """ 

394 Create a list of filenames that do not exist on disk. 

395 

396 Optional inputs are not considered. 

397 

398 :param reset_cache: set to True to not rely on an existing cached existence check 

399 :param ignore_expected: set to True to ignore any missing files that have a job that will produce them in the pipeline 

400 """ 

401 missing = set() 

402 for to_check in self.input_targets: 

403 if to_check in self.optional_targets: 

404 continue 

405 if reset_cache: 

406 to_check.reset_existence() 

407 if ignore_expected and to_check.producer is not None: 

408 continue 

409 if not to_check.exists(): 

410 missing.add(to_check) 

411 return missing 

412 

413 def add_to_jobs(self, all_jobs, overwrite=False, overwrite_dependencies=False, only_sort=False): 

414 """Mark this job and all of its dependencies to run. 

415 

416 This job is marked to run, if any of the output does not yet exist or overwrite is True. 

417 The dependencies are marked to run, if this job runs and either their output does not exist or overwrite_dependencies is True. 

418 

419 :param all_jobs: list and set of individual jobs. This job and all required jobs are added to this list. 

420 :param overwrite: if True mark this job even if the output already exists, defaults to False 

421 :param overwrite_dependencies: if True mark the dependencies of this job even if their output already exists, defaults to False 

422 """ 

423 if self in all_jobs[1]: 

424 return 

425 if not only_sort: 

426 if not overwrite and len(self.missing_output()) == 0: 

427 return 

428 missing = self.missing_input(ignore_expected=True) 

429 if len(missing) > 0: 

430 raise InputMissingPipe(self, {m.filename for m in missing}) 

431 subjobs = ([], set(all_jobs[1])) 

432 for optional, job in self.dependencies(only_missing=not (overwrite_dependencies or only_sort)): 

433 try: 

434 if job is not None: 

435 job.add_to_jobs(subjobs, overwrite_dependencies, overwrite_dependencies, only_sort) and not optional 

436 except InputMissingPipe as e: 

437 if optional: 

438 continue 

439 e.dependency_graph.insert(0, e.target) 

440 e.target = self 

441 raise 

442 all_jobs[0].extend(subjobs[0]) 

443 all_jobs[1].update(subjobs[0]) 

444 all_jobs[0].append(self) 

445 all_jobs[1].add(self) 

446 

447 def __call__(self, method: RunMethod, wait_for=(), clean_script="on_success"): 

448 """Run the job using the specified `method`.""" 

449 if method == RunMethod.local: 

450 self.prepare_run() 

451 missing = self.missing_input() 

452 if len(missing) > 0: 

453 raise InputMissingRun(f"{self} can not run as it misses required input files: {missing}") 

454 logger.info(f"running {self}") 

455 self.job = self.function(**self.kwargs) 

456 missing = self.missing_output(reset_cache=True) 

457 if len(missing) > 0: 

458 raise OutputMissing(f"{self} failed to produce required output files: {missing}") 

459 elif method == RunMethod.submit: 

460 from .pipeline import Template 

461 self.prepare_run() 

462 local_submit = dict(self.submit_params) 

463 if 'logdir' not in local_submit: 

464 local_submit['logdir'] = 'log' 

465 Path(local_submit['logdir']).mkdir(exist_ok=True, parents=True) 

466 if 'name' not in local_submit: 

467 local_submit['name'] = self.job_name() 

468 cmd = func_to_cmd(self.function, (), self.kwargs, local_submit['logdir'], clean=clean_script) 

469 if len(wait_for) == 0: 

470 wait_for = None 

471 self.job = submit(cmd, jobhold=wait_for, **local_submit) 

472 logger.debug(f"submitted {self} with job ID {self.job}") 

473 elif method == RunMethod.dask: 

474 import dask 

475 def dask_job(*other_jobs): 

476 if any(a != 0 for a in other_jobs): 

477 logger.debug(f"{self} skipped because dependencies failed") 

478 return 1 

479 try: 

480 logger.debug(f"Running {self} using dask") 

481 self(RunMethod.local) 

482 except Exception as e: 

483 logger.exception(f"{self} failed: {e}") 

484 return 1 

485 logger.debug(f"Running {self} using dask") 

486 return 0 

487 self.job = dask.delayed(dask_job, name=str(self))(*wait_for) 

488 return self.job 

489 

490 def prepare_run(self): 

491 """ 

492 Prepare to run this job. 

493 

494 Steps: 

495 1. Creates output directory 

496 """ 

497 for target in self.output_targets: 

498 target.filename.parent.mkdir(parents=True, exist_ok=True) 

499 

500 def expected(self, ): 

501 """ 

502 Return true if this job is expected to be able to run. 

503 """ 

504 for target in self.input_targets: 

505 if not target.expected(): 

506 return False 

507 return True 

508 

509 

510class SingleJob(JobParent): 

511 """A single job within a larger pipeline.""" 

512 

513 def __init__(self, function: Callable, kwargs, submit_params, input_targets, output_targets, optionals, set_parameters=None, batch=None): 

514 """ 

515 Create a single job that can be run locally or submitted. 

516 

517 :param function: python function 

518 :param kwargs: keyword arguments 

519 :param submit_params: instructions to submit job to cluster using `fsl_sub` 

520 :param input_targets: set of all filenames expected to exist before this job runs 

521 :param output_targets: set of all filenames expected to exist after this job runs 

522 :param optionals: set of filenames that are used to generate the dependency graph and yet might not exist 

523 :param set_parameters: dictionary with placeholder values used to distinguish this SingleJob with all those produced from the same function 

524 :param batch: label used to batch multiple jobs into one when submitting to the cluster 

525 """ 

526 self.function = function 

527 if set_parameters is None: 

528 set_parameters = {} 

529 self.set_parameters = set_parameters 

530 self.kwargs = kwargs 

531 self.input_targets = input_targets 

532 self.output_targets = output_targets 

533 self.optional_targets = set(optionals) 

534 self.submit_params = dict(submit_params) 

535 self.batch = batch 

536 for target in self.input_targets: 

537 target.required_by.add(self) 

538 for target in self.output_targets: 

539 target.producer = self 

540 

541 def copy(self, targets: Dict[str, "FileTarget"]): 

542 """ 

543 Create a copy of the `SingleJob` to be included in a new :class:`JobList`. 

544 

545 `targets` contain the set of FileTargets recognised by this new `JobList`. 

546 This will be updated based on the input/output targets of this job. 

547 """ 

548 def copy_targets(job_targets): 

549 new_targets = set() 

550 for target in job_targets: 

551 new_target = get_target(target.filename, targets) 

552 new_target.from_templates = target.from_templates 

553 new_targets.add(new_target) 

554 return new_targets 

555 

556 return SingleJob( 

557 self.function, 

558 self.kwargs, 

559 self.submit_params, 

560 copy_targets(self.input_targets), 

561 copy_targets(self.output_targets), 

562 copy_targets(self.optional_targets), 

563 self.set_parameters, 

564 self.batch, 

565 ) 

566 

567 def job_name(self, ): 

568 """Return a string representation of this job.""" 

569 if len(self.set_parameters) > 0: 

570 parameter_string = '_'.join([f"{key}-{value}" for key, value in self.set_parameters.items()]) 

571 name = f"{self.function.__name__}_{parameter_string}" 

572 else: 

573 name = self.function.__name__ 

574 value = re.sub(r'[^\w\s-]', '', name).strip().lower() 

575 return re.sub(r'[-\s]+', '-', value) 

576 

577 def __repr__(self, ): 

578 """Print job as a function call.""" 

579 parameter_string = ', '.join([f"{key}={value}" for key, value in self.set_parameters.items()]) 

580 return f"{self.function.__name__}({parameter_string})" 

581 

582 def to_job_list(self, tree:FileTree): 

583 """Convert single job into its own :class:`JobList`.""" 

584 result = JobList(tree, [self], {}).copy() 

585 return self.batch, self.submit_params, result 

586 

587 def scale_jobtime(self, scaling): 

588 """ 

589 Scale the submit job time in place by `scaling`. 

590  

591 This will only affect jobs submitted to the cluster. 

592 """ 

593 if "jobtime" in self.submit_params.keys(): 

594 self.submit_params["jobtime"] = int(np.ceil(scaling * self.submit_params["jobtime"])) 

595 

596 

597def get_target(filename: Path, all_targets, from_template=None) -> "FileTarget": 

598 """ 

599 Return a :class:`FileTarget` matching the input `filename`. 

600 

601 If the `FileTarget` for `filename` is already in `all_targets`, it will be returned. 

602 Otherwise a new `FileTarget` will be added to `all_targets` and returned. 

603 

604 :param filename: path to the input/intermediate/output filename 

605 :param all_targets: dictionary of all FileTargets 

606 :param from_template: template key used to obtain the filename 

607 """ 

608 abs_path = Path(filename).absolute() 

609 if abs_path not in all_targets: 

610 all_targets[abs_path] = FileTarget(filename) 

611 if from_template is not None: 

612 all_targets[abs_path].from_templates.add(from_template) 

613 return all_targets[abs_path] 

614 

615 

616def get_matching_targets(pattern: str, all_targets) -> List["FileTarget"]: 

617 """ 

618 Return all :class:`FileTarget` that match the given input pattern. 

619 

620 :param pattern: filename definition supporting Unix shell-style wildcards 

621 :param all_targets: dictionary of all FileTargets 

622 """ 

623 abs_pattern = str(Path(pattern).absolute()) 

624 matches = [] 

625 for path, target in all_targets.items(): 

626 if globmatch(str(path), abs_pattern, flags=GLOBSTAR): 

627 matches.append(target) 

628 return matches 

629 

630 

631class FileTarget: 

632 """Input, intermediate, or output file. 

633 

634 See :func:`get_target` for instructions on creating a new `FileTarget`. 

635 

636 If a specific :class:`SingleJob` produces a filename, this can be indicated by setting :attr:`producer`: 

637 

638 .. code-block:: python 

639 

640 get_target(filename, all_targets).producer = job 

641 

642 This will raise a `ValueError` if the filename is already produced by another job. 

643 

644 If a specific :class:`SingleJob` requires a filename, this can be indicated by adding it to :attr:`required_by`: 

645 

646 .. code-block:: python 

647 

648 get_target(filename, all_targets).required_by.add(job) 

649 

650 Filename existence can be checked using :meth:`exists`. 

651 This method uses caching. To reset the cache run :meth:`reset_existence`. 

652 

653 To check if the filename can be created by this pipeline (or already exists) run :meth:`expected`. 

654 """ 

655 

656 def __init__(self, filename: Path): 

657 """ 

658 Create a new target based on the provided filename. 

659 

660 Do not call this method directly.  

661 Instead use :func:`get_target`. 

662 

663 :param filename: filename 

664 """ 

665 self.filename = Path(filename) 

666 self._producer = None 

667 self.required_by: Set[SingleJob] = set() 

668 self.from_templates: Set[str] = set() 

669 

670 def exists(self) -> bool: 

671 """ 

672 Test whether file exists on disk. 

673 

674 This function is lazy; once it has been checked once it will keep returning the same result. 

675 

676 To reset use :meth:`reset_existence`. 

677 """ 

678 if not hasattr(self, "_exists"): 

679 self._exists = self.filename.is_symlink() or self.filename.exists() 

680 return self._exists 

681 

682 def reset_existence(self, ): 

683 """Ensure existence is checked again when running :meth:`exists`.""" 

684 if hasattr(self, "_exists"): 

685 del self._exists 

686 

687 def expected(self, ): 

688 """ 

689 Return whether the file can be produced by the pipeline (or already exists). 

690 

691 Returns False if the file does not exist and there is no way to produce it. Otherwise, True is returned 

692 """ 

693 if self.exists(): 

694 return True 

695 if self.producer is None: 

696 return False 

697 return self.producer.expected() 

698 

699 @property 

700 def producer(self, ) -> SingleJob: 

701 """Job that can produce this file.""" 

702 return self._producer 

703 

704 @producer.setter 

705 def producer(self, new_value): 

706 if self._producer is not None: 

707 if self._producer is new_value: 

708 return 

709 raise ValueError(f"{self} can be produced by both {self.producer} and {new_value}") 

710 self._producer = new_value 

711 

712 def __repr__(self, ): 

713 """Print filename of target.""" 

714 return f"FileTarget({str(self.filename)})" 

715 

716 

717@contextmanager 

718def update_closure(*dictionaries, **kwargs): 

719 """Add the provided dictionaries to the globals dictionary. 

720 

721 Inside the `with` block all the dictionary keys will be available in the local environment. 

722 After the `with` block ends the local environment will be cleaned up again. 

723  

724 Use like this: 

725 

726 .. code-block:: python 

727 

728 def my_func() 

729 with add_to_globals({'a': 3}): 

730 print(a) # prints 3 

731 print(a) # raises a NameError (or prints whatever `a` is set to in the global environment) 

732 """ 

733 func_globals = inspect.stack()[2].frame.f_globals 

734 

735 # merge input dictionaries 

736 new_kwargs = {} 

737 for d in dictionaries: 

738 new_kwargs.update(d) 

739 new_kwargs.update(kwargs) 

740 

741 to_restore = {} 

742 to_delete = set() 

743 for key in new_kwargs: 

744 if key in func_globals: 

745 to_restore[key] = func_globals[key] 

746 else: 

747 to_delete.add(key) 

748 

749 func_globals.update(new_kwargs) 

750 yield # return to run the code within the with-block 

751 

752 # clean up the environment 

753 func_globals.update(to_restore) 

754 for key in to_delete: 

755 del func_globals[key] 

756 del func_globals 

757 

758 

759def call_batched_jobs(funcs, kwargs): 

760 for f, k in zip(funcs, kwargs): 

761 f(**k) 

762 

763class BatchJob(JobParent): 

764 """Batched combination of multiple `SingleJob` instances that will be submitted together. 

765 """ 

766 function = staticmethod(call_batched_jobs) 

767 

768 def __init__(self, *sub_jobs): 

769 """Creates a new `BatchJob` from sub_jobs. 

770 

771 `sub_jobs` can be either `SingleJob` or `BatchJob`. In either case they will be run in the order in which they are supplied. 

772 """ 

773 self.torun: Sequence[SingleJob] = [] 

774 for job in sub_jobs: 

775 if isinstance(job, BatchJob): 

776 self.torun.extend(job.torun) 

777 else: 

778 assert isinstance(job, SingleJob) 

779 self.torun.append(job) 

780 self.torun_set = set(self.torun) 

781 

782 self.output_targets = {t for job in self.torun for t in job.output_targets} 

783 for t in self.output_targets: 

784 t._producer = self 

785 self.input_targets = {t for job in self.torun for t in job.input_targets if t not in self.output_targets} 

786 for t in self.input_targets: 

787 t.required_by.add(self) 

788 

789 self.optional_targets = set.union( 

790 { 

791 t for t in self.output_targets 

792 if not any(t in job.output_targets and t not in job.optional_targets for job in self.torun) 

793 }, 

794 { 

795 t for t in self.input_targets 

796 if not any(t in job.input_targets and t not in job.optional_targets for job in self.torun) 

797 }, 

798 ) 

799 

800 as_tuples = set.intersection(*[{(key, value) for key, value in job.set_parameters.items()} for job in self.torun]) 

801 self.set_parameters = {key: value for key, value in as_tuples} 

802 

803 def copy(self, targets: Dict[str, "FileTarget"]): 

804 """ 

805 Create a copy of the `BatchJob` to be included in a new :class:`JobList`. 

806 

807 `targets` contain the set of FileTargets recognised by this new `JobList`. 

808 This will be updated based on the input/output targets of each job within this batch. 

809 """ 

810 return BatchJob(*[job.copy(targets) for job in self.torun]) 

811 

812 @property 

813 def batch(self, ): 

814 b = self.torun[0].batch 

815 if all(job.batch is not None and job.batch == b for job in self.torun): 

816 return b 

817 return None 

818 

819 @property 

820 def kwargs(self, ): 

821 return { 

822 'funcs': [job.function for job in self.torun], 

823 'kwargs': [job.kwargs for job in self.torun], 

824 } 

825 

826 @property 

827 def submit_params(self, ): 

828 def sum_value(name, *times): 

829 total = sum(t for t in times if t is not None) 

830 return None if total == 0 else total 

831 

832 def max_value(name, *rams): 

833 if any(r is not None for r in rams): 

834 return max(int(r) for r in rams if r is not None) 

835 return None 

836 

837 def extend(name, *vars): 

838 all_vars = set() 

839 for v in vars: 

840 if v is not None: 

841 all_vars.update(v) 

842 return list(all_vars) 

843 

844 def merge_name(_, *names): 

845 if len(names) <= 3: 

846 return "-".join(names) 

847 return "batch_job" 

848 

849 def unique_param(name, *flags): 

850 not_none = {f for f in flags if f is not None} 

851 if len(not_none) == 0: 

852 return None 

853 elif len(not_none) == 1: 

854 return not_none.pop() 

855 else: 

856 raise ValueError(f"Multiple different values for submit parameter flag '{name}' found. fsl-pipe does not know which one to choose out of: {not_none}") 

857 

858 to_combine = { 

859 'jobtime': sum_value, 

860 'jobram': max_value, 

861 'threads': max_value, 

862 'export_vars': extend, 

863 'name': merge_name, 

864 } 

865 

866 all_flags = {flag for job in self.torun for flag in job.submit_params.keys()} 

867 return { 

868 flag: to_combine.get(flag, unique_param)(flag, *[job.submit_params.get(flag, None) for job in self.torun]) 

869 for flag in all_flags 

870 } 

871 

872 def job_name(self, ): 

873 """Return a string representation of this job.""" 

874 base = "" if self.batch is None else self.batch + '_' 

875 func_name = "-".join(sorted({job.function.__name__ for job in self.torun})) 

876 if len(self.set_parameters) > 0: 

877 parameter_string = '_'.join([f"{key}-{value}" for key, value in self.set_parameters.items()]) 

878 name = f"{func_name}_{parameter_string}" 

879 else: 

880 name = func_name 

881 value = re.sub(r'[^\w\s-]', '', name).strip().lower() 

882 return base + re.sub(r'[-\s]+', '-', value) 

883 

884 def scale_jobtime(self, scaling): 

885 """ 

886 Scale the submit job time in place by `scaling`. 

887  

888 This will only affect jobs submitted to the cluster. 

889 """ 

890 for job in self.torun: 

891 job.scale_jobtime(scaling) 

892 

893 def __repr__(self, ): 

894 """Print job as a function call.""" 

895 return f"Batch({self.torun})" 

896 

897 def to_job_list(self, tree:FileTree): 

898 """Convert batched jobs back into a :class:`JobList`.""" 

899 result = JobList(tree, self.torun, {}).copy() 

900 return self.batch, self.submit_params, result 

901 

902 

903def has_dependencies(possible_parent: JobParent, possible_children: Set[JobParent], all_jobs): 

904 if possible_parent in possible_children: 

905 return True 

906 for _, child in possible_parent.dependencies(only_missing=False): 

907 if child in all_jobs and has_dependencies(child, possible_children, all_jobs): 

908 return True 

909 return False 

910 

911 

912def batch_connected_jobs(to_batch: Sequence[SingleJob], other_jobs: Collection[SingleJob]) -> List[JobParent]: 

913 """Iteratively combines two jobs  

914 

915 Two jobs will be batched if: 

916 1. They are both in `to_batch` 

917 2. One of the jobs depends on the other 

918 3. There is no other job that needs to run between these two jobs. 

919 Iteration through the jobs will continue until no further batching is possible. 

920 """ 

921 other_jobs = set(other_jobs) 

922 batches = set(to_batch) 

923 nbatches_prev = len(batches) * 2 

924 while len(batches) != nbatches_prev: 

925 nbatches_prev = len(batches) 

926 for job in set(batches): 

927 if job not in batches: 

928 continue 

929 for _, dependency in job.dependencies(only_missing=False): 

930 if job in batches and dependency in batches: 

931 # properties 1 and 2 are met; now checking property 3 

932 all_jobs = set.union(batches, other_jobs) 

933 indirect_dependency = any(has_dependencies(d, {dependency}, all_jobs) for _, d in job.dependencies(only_missing=False) if d in all_jobs and d != dependency) 

934 if not indirect_dependency: 

935 batches.remove(job) 

936 batches.remove(dependency) 

937 batches.add(BatchJob(dependency, job)) 

938 break 

939 return list(batches) 

940 

941 

942def batch_unconnected_jobs(to_batch: Sequence[JobParent], other_jobs: Collection[JobParent]) -> List[BatchJob]: 

943 """Batch jobs into generational sets 

944 

945 All jobs that do not depend on any other job in `to_batch` will be added to the first generational batch. 

946 All jobs that only depend on the first generational batch will be added to the second. 

947 And so forth, until all jobs have been added to a generation. 

948 

949 Only dependencies through `other_jobs` will be considered. 

950 """ 

951 to_batch = set(to_batch) 

952 all_jobs = set.union(to_batch, other_jobs) 

953 

954 generations = [] 

955 

956 while len(to_batch) > 0: 

957 generations.append(set()) 

958 for job in to_batch: 

959 if not any(has_dependencies(dependency, to_batch, all_jobs) for _, dependency in job.dependencies(only_missing=False) if dependency in all_jobs): 

960 generations[-1].add(job) 

961 to_batch.difference_update(generations[-1]) 

962 return [jobs.pop() if len(jobs) == 1 else BatchJob(*jobs) for jobs in generations] 

963 

964