You're reading an old version of this documentation. If you want up-to-date information, please have a look at stable (v1.6.0).

Source code for file_tree.template

from pathlib import Path
import numpy as np
import re
import itertools
from typing import Generator, List, Optional, Sequence, Set, Tuple, Dict, Iterator, Any, FrozenSet, Iterable
import xarray
import pandas as pd
from collections.abc import MutableMapping
from itertools import product, combinations, chain
from glob import glob
from collections import defaultdict
from datetime import datetime
from parse import extract_format, compile
from functools import cmp_to_key, lru_cache
import pandas as pd
import os
import string
from rich.tree import Tree as RichTree


[docs] def is_singular(value): """Whether a value is singular or has multiple options. """ if isinstance(value, str): return True try: iter(value) return False except TypeError: return True
[docs] class Placeholders(MutableMapping): """Dictionary-like object containing the placeholder values. It understands about sub-trees (i.e., if "<sub_tree>/<placeholder>" does not exist it will return "<placeholder>" instead). """ def __init__(self, *args, **kwargs): self.mapping = {} self.linkages: Dict[str: FrozenSet[str]] = {} self.update(dict(*args, **kwargs))
[docs] def copy(self): p = Placeholders() p.mapping = dict(self.mapping) p.linkages = dict(self.linkages) return p
def __getitem__(self, key: str): actual_key = self.find_key(key) if actual_key is None: raise KeyError(f"No parameter value available for {key}") if actual_key in self.linkages: return self.mapping[self.linkages[actual_key]][actual_key] return self.mapping[actual_key] def __delitem__(self, key): if isinstance(key, tuple): key = frozenset(key) del self.mapping[key] if isinstance(key, frozenset): for k in key: del self.linkages[k] def __setitem__(self, key, value): if isinstance(key, tuple): # create linked placeholders if len(key) != len(value): raise ValueError(f"Attempting to set linked placeholders for {key}, but {value} has a different number of elements than {key}") if any([len(value[0]) != len(v) for v in value]): raise ValueError(f"Attempting to set linked placeholders for {key}, but not all elements in {value} have the same length") value = {k: v for k, v in zip(key, value)} key = frozenset(key) if isinstance(key, frozenset): for k in list(key): if k in self.linkages: unmatched_keys = [unmatched for unmatched in self.linkages[k] if unmatched not in key] if len(unmatched_keys) > 0: raise ValueError(f"Attempting to set linked placeholders for {key}, but {k} is already linked to {unmatched_keys}") self.mapping[key] = value for k in list(key): self.linkages[k] = key if k in self.mapping: del self.mapping[k] elif key in self.linkages: old_values = self.mapping[self.linkages[key]] if is_singular(value): nvalue = old_values[key].count(value) if nvalue == 0: raise ValueError(f"Can not overwrite placeholder {key} with new values as it is linked to: {self.linkages[key]}") elif nvalue == 1: self.unlink(*old_values.keys()) idx = old_values[key].index(value) for skey in old_values: self.mapping[skey] = old_values[skey][idx] else: idx = [i for i, v in enumerate(old_values[key]) if v == value] for skey in old_values: old_values[skey] = tuple(old_values[skey][i] for i in idx) else: if all(v in old_values[key] for v in value): idx = [] for new_v in value: for i, old_v in enumerate(old_values[key]): if old_v == new_v and i not in idx: idx.append(i) for skey in old_values: old_values[skey] = tuple(old_values[skey][i] for i in idx) else: raise ValueError(f"Can not overwrite placeholder {key} with new values as it is linked to: {self.linkages[key]}") else: self.mapping[key] = value def __iter__(self): for key in self.mapping: if self.mapping[key] is not None: yield key def __len__(self): return len(self.mapping) def __repr__(self): return f"Placeholders({self.mapping})"
[docs] def find_key(self, key: str) -> Optional[str]: """Finds the actual key containing the value Will look for: - not None value for the key itself - not None value for any parent (i.e, for key "A/B", will look for "B" as well) - otherwise will return None Args: key (str): placeholder name Returns: Optional[str]: None if no value for the key is available, otherwise the key used to index the value """ if not isinstance(key, str): key = frozenset(key) elif key in self.linkages: return key if self.mapping.get(key, None) is not None: return key elif '/' in key: *sub_trees, _, actual_key = key.split('/') new_key = '/'.join([*sub_trees, actual_key]) return self.find_key(new_key) else: return None
[docs] def split(self) -> Tuple["Placeholders", "Placeholders"]: """Splits all placeholders into those with a single value or those with multiple values Placeholders are considered to have multiple values if they are equivalent to 1D-arrays (lists, tuples, 1D ndarray, etc.). Anything else is considered a single value (string, int, float, etc.) Args: placeholders (Dict): all mappings from placeholder names to values Returns: Tuple[Dict, Dict]: Returns tuples with two dictionaries (first those with single values, then those with the multiple values) """ single_placeholders = Placeholders() multi_placeholders = Placeholders() for name, value in self.mapping.items(): if isinstance(name, frozenset) or not is_singular(value): multi_placeholders[name] = value else: single_placeholders[name] = value return single_placeholders, multi_placeholders
[docs] def iter_over(self, keys) -> Generator["Placeholders", None, None]: """Iterate over the placeholder placeholder names Args: keys (Sequence[str]): sequence of placeholder names to iterate over Returns: Generator[FileTree]: yields Placeholders object, where each of the listed keys only has a single possible value """ actual_keys = [self.linkages.get(self.find_key(key), key) for key in keys] unfilled = {orig for orig, key in zip(keys, actual_keys) if key is None} if len(unfilled) > 0: raise KeyError(f"Can not iterate over undefined placeholders: {unfilled}") unique_keys = [] iter_values = {} for key in actual_keys: if key not in unique_keys: if isinstance(key, frozenset): # linked placeholder unique_keys.append(key) iter_values[key] = [{k: self[k][idx] for k in key} for idx in range(len(self[list(key)[0]]))] elif not is_singular(self[key]): # iterable placeholder unique_keys.append(key) iter_values[key] = self[key] for values in product(*[iter_values[k] for k in unique_keys]): new_vars = Placeholders(self) for key, value in zip(unique_keys, values): if isinstance(key, frozenset): del new_vars[key] # break the placeholders link new_vars.update(value) else: new_vars[key] = value yield new_vars
[docs] class MyDataArray: """Wrapper around xarray.DataArray for internal usage It tries to delay creating the DataArray object as long as possible (as using them for small arrays is slow...) """ def __init__(self, data, coords=None): self.as_xarray = coords is None if self.as_xarray: assert isinstance(data, xarray.DataArray) self.data_array = data else: self.data = data self.coords = coords
[docs] def map(self, func) -> "MyDataArray": if self.as_xarray: return MyDataArray(xarray.apply_ufunc(func, self.data_array, vectorize=True)) else: return MyDataArray(np.array([func(d) for d in self.data.flat]).reshape(self.data.shape), self.coords)
[docs] def to_xarray(self, ) -> xarray.DataArray: if self.as_xarray: return self.data_array else: return xarray.DataArray(self.data, [_to_index(name, values) for name, values in self.coords])
[docs] @staticmethod def concat(parts, new_index) -> "MyDataArray": if len(parts) == 0: return MyDataArray(np.array([]), []) to_xarray = ( any(p.as_xarray for p in parts) or any( len(p.coords) != len(parts[0].coords) or any(np.all(name1 != name2) for (name1, _), (name2, _) in zip(p.coords, parts[0].coords)) for p in parts ) ) if to_xarray: return MyDataArray(xarray.concat([p.to_xarray() for p in parts], _to_index(*new_index))) else: new_data = np.stack([p.data for p in parts], axis=0) new_coords = list(parts[0].coords) new_coords.insert(0, new_index) return MyDataArray(new_data, new_coords)
def _to_index(name, values): if isinstance(name, str): return pd.Index(values, name=name) else: return pd.MultiIndex.from_tuples(values, names=name)
[docs] class Template: def __init__(self, parent: Optional["Template"], unique_part: str): self.parent = parent self.unique_part = unique_part @property def as_path(self, ) -> Path: """The full path with no placeholders filled in """ if self.parent is None: return Path(self.unique_part) return self.parent.as_path.joinpath(self.unique_part) @property def as_string(self, ): if self.parent is None: return str(self.unique_part) return os.path.join(self.parent.as_string, str(self.unique_part)) def __str__(self, ): return f"Template({self.as_string})"
[docs] def children(self, templates: Iterable["Template"]) -> List["Template"]: """From a sequence of templates find the children Returns: List[Template]: list of children templates """ res = [] for t in templates: if t.parent is self and t not in res: res.append(t) return res
[docs] def as_multi_line(self, other_templates: Dict[str, "Template"], indentation=4) -> str: """Generates a string describing this and any child templates Args: other_templates (Dict[str, Template]): templates including all the child templates and itself indentation (int, optional): number of spaces to use as indentation. Defaults to 4 Returns: str: multi-line string that can be processed by :meth:`file_tree.FileTree.read` """ result = self._as_multi_line_helper(other_templates, indentation) is_top_level = other_templates[""] is self if not is_top_level and self.parent is None: return '!' + result else: return result
def _as_multi_line_helper(self, other_templates: Dict[str, "Template"], indentation=4, _current_indentation=0) -> str: leaves = [] branches = [] for t in sorted(self.children(other_templates.values()), key=lambda t: t.unique_part): if len(t.children(other_templates.values())) == 0: leaves.append(t) else: branches.append(t) is_top_level = other_templates[""] is self if is_top_level: base_line = '.' assert _current_indentation == 0 and self.parent is None _current_indentation = -indentation else: base_line = _current_indentation * ' ' + self.unique_part all_keys = [] for key, value in other_templates.items(): if value is not self: continue if is_top_level and key == "": continue all_keys.append(key) if is_top_level and len(all_keys) == 0: lines = [] elif len(all_keys) == 1 and all_keys[0] == self.guess_key(): lines = [base_line] else: assert len(all_keys) > 0 lines = [base_line + f' ({",".join(all_keys)})'] already_done = set() for t in leaves + branches: if t not in already_done: lines.append(t._as_multi_line_helper(other_templates, indentation, indentation + _current_indentation)) already_done.add(t) return '\n'.join(lines) @property def _parts(self, ): return TemplateParts.parse(self.as_string)
[docs] def placeholders(self, valid=None) -> List[str]: """Returns a list of the placeholder names Returns: List[str]: placeholder names in order that they appear in the template """ return self._parts.ordered_placeholders(valid)
[docs] def format_single(self, placeholders: Placeholders, check=True, keep_optionals=False) -> str: """Formats the template with the placeholders filled in Only placeholders with a single value are considered. Args: placeholders (Placeholders): values to fill into the placeholder check (bool): skip check for missing placeholders if set to True keep_optionals: if True keep optional parameters that have not been set (will cause the check to fail) Raises: KeyError: if any placeholder is missing Returns: str: filled in template """ single_placeholders, _ = placeholders.split() template = self._parts.fill_single_placeholders(single_placeholders) if not keep_optionals: template = template.remove_optionals() if check: unfilled = template.required_placeholders() if len(unfilled) > 0: raise KeyError(f"Missing placeholder values for {unfilled}") return str(template)
[docs] def format_mult(self, placeholders: Placeholders, check=False, filter=False, matches=None) -> xarray.DataArray: """Replaces placeholders in template with the provided placeholder values Args: placeholders (Placeholders): mapping from placeholder names to single or multiple vaalues check (bool): skip check for missing placeholders if set to True filter (bool): filter out non-existing files if set to True Raises: KeyError: if any placeholder is missing Returns: xarray.DataArray: array with possible resolved paths. If `filter` is set to True the non-existent paths are replaced by None """ parts = self._parts resolved = parts.resolve(placeholders) if check: for template in resolved.data.flatten(): unfilled = template.required_placeholders() if len(unfilled) > 0: raise KeyError(f"Missing placeholder values for {unfilled}") paths = resolved.map(lambda t: str(t)) if not filter: return paths.to_xarray() placeholder_dict = dict(placeholders) path_matches = [str(parts.fill_single_placeholders(Placeholders({**placeholder_dict, **match})).remove_optionals()) for match in (self.all_matches(placeholders) if matches is None else matches)] return paths.map(lambda p: p if p in path_matches else '').to_xarray()
[docs] def optional_placeholders(self, ) -> Set[str]: """Finds all placeholders that are only within optional blocks (i.e., they do not require a value) Returns: Set[str]: names of optional placeholders """ return self._parts.optional_placeholders()
[docs] def required_placeholders(self, ) -> Set[str]: """Finds all placeholders that are outside of optional blocks (i.e., they do require a value) Returns: Set[str]: names of required placeholders """ return self._parts.required_placeholders()
[docs] def guess_key(self, ) -> str: """Proposes a short name for the template The proposed short name is created by: - taking the basename (i.e., last component) of the path - removing the first '.' and everything beyond (to remove the extension) .. warning:: If there are multiple dots within the path's basename, this might remove far more than just the extension. Returns: str: proposed short name for this template (used if user does not provide one) """ parts = self.as_path.parts if len(parts) == 0: return "" else: return parts[-1].split('.')[0]
[docs] def add_precursor(self, text) -> "Template": """Returns a new Template with any placeholder names in the unique part now preceded by `text` Used for adding sub-trees """ parts = TemplateParts.parse(self.unique_part).parts updated = ''.join([str(p.add_precursor(text)) for p in parts]) return Template(self.parent, updated)
[docs] def get_all_placeholders(self, placeholders: Placeholders, matches=None) -> Placeholders: """Fill placeholders with possible values based on what is available on disk Args: placeholders (Placeholders): New values for undefined placeholders in template """ undefined = defaultdict(set) for match in self.all_matches(placeholders) if matches is None else matches: for name, value in match.items(): if name not in placeholders: undefined[name].add(value) def cmp(item1, item2): if item1 is None: return -1 if item2 is None: return 1 if item1 < item2: return -1 if item1 > item2: return 1 return 0 return Placeholders({k: sorted(v, key=cmp_to_key(cmp)) for k, v in undefined.items()})
[docs] def all_matches(self, placeholders: Placeholders): """Returns a sequence of all possible variable values matching existing files on disk Only variable values matching existing placeholder values are returned (undefined placeholders are unconstrained). """ single_vars, multi_vars = placeholders.split() filled_template: TemplateParts = self._parts.fill_single_placeholders(single_vars) res = [] for match in filled_template.all_matches(): def check_name_with_edit(name): value = match[name] if name in multi_vars and multi_vars.find_key(name) == name: if value not in multi_vars[name]: return False return True if '/' in name: del match[name] parent_name = name[name.find('/') + 1:] if parent_name in match: return match[parent_name] == value match[parent_name] = value return check_name_with_edit(parent_name) return True if not all(check_name_with_edit(name) for name in list(match.keys())): continue res.append(match) return res
[docs] def rich_line(self, all_templates): """Produces a line for rendering using rich""" all_keys = [key for key, value in all_templates.items() if value == self] base = self.guess_key() unique_part = str(self.unique_part) if base in all_keys: all_keys.remove(base) unique_part = str.replace(unique_part, base, f"[cyan]{base}[/cyan]") if len(all_keys) == 0: return unique_part return unique_part + " (" + ', '.join("[cyan]" + key + "[/cyan]" for key in all_keys) + ")"
[docs] def extract_placeholders(template, filename, known_vars=None): """ Extracts the placeholder values from the filename :param template: template matching the given filename :param filename: filename :param known_vars: already known placeholders :return: dictionary from placeholder names to string representations (unused placeholders set to None) """ return TemplateParts.parse(template).extract_placeholders(filename, known_vars)
[docs] class Part: """ Individual part of a template 3 subclasses are defined: - :class:`Literal`: piece of text - :class:`Required`: required placeholder to fill in (between curly brackets) - :class:`OptionalPart`: part of text containing optional placeholders (between square brackets) """
[docs] def fill_single_placeholders(self, placeholders: Placeholders, ignore_type=False) -> Sequence["Part"]: """ Fills in the given placeholders """ return (self, )
[docs] def optional_placeholders(self, ) -> Set[str]: """ Returns all placeholders in optional parts """ return set()
[docs] def required_placeholders(self, ) -> Set[str]: """ Returns all required placeholders """ return set()
[docs] def contains_optionals(self, placeholders: Set["Part"]=None): """ Returns True if this part contains the optional placeholders """ return False
[docs] def append_placeholders(self, placeholders: List[str], valid=None): """ Appends the placeholders in this part to the provided list in order """ pass
[docs] def add_precursor(self, text: str) -> "Part": """Prepends any placeholder names by `text`. """ return self
[docs] def for_defined(self, placeholder_names: Set[str]) -> List["Part"]: """Returns the template string assuming the placeholders in `placeholder_names` are defined Removes any optional parts, whose placeholders are not in `placeholder_names`. """ return [self]
[docs] def remove_precursors(self, placeholders=None): return self
[docs] class Literal(Part): def __init__(self, text: str): """ Literal part is defined purely by the text it contains :param text: part of the template """ self.text = text def __str__(self): """ Returns this part of the template as a string """ return self.text def __eq__(self, other): if not isinstance(other, Literal): return NotImplemented return self.text == other.text
[docs] class Required(Part): def __init__(self, var_name, var_formatting=None): """ Required part of template (between curly brackets) Required placeholder part of template is defined by placeholder name and its format :param var_name: name of placeholder :param var_formatting: how to format the placeholder """ self.var_name = var_name self.var_formatting = var_formatting def __str__(self): """ Returns this part of the template as a string """ if self.var_formatting is None or len(self.var_formatting) == 0: return '{' + self.var_name + '}' else: return '{' + self.var_name + ':' + self.var_formatting + '}'
[docs] def fill_single_placeholders(self, placeholders: Placeholders, ignore_type=False): value = placeholders.get(self.var_name, None) if value is None: return (self, ) else: if not ignore_type and len(self.var_formatting) > 0: format_type = extract_format(self.var_formatting, [])["type"] if format_type in list(r"dnbox"): value = int(value) elif format_type in list(r"f%eg"): value = float(value) elif format_type in ['t' + ft for ft in 'iegachs'] and isinstance(value, str): from dateutil import parser value = parser(value) res = TemplateParts.parse(format(value, '' if ignore_type else self.var_formatting)) if len(res.parts) == 1: return res.parts return res.fill_single_placeholders(placeholders, ignore_type=ignore_type).parts
[docs] def required_placeholders(self, ): return {self.var_name}
[docs] def append_placeholders(self, placeholders, valid=None): if valid is not None and self.var_name not in valid: raise ValueError(f"Placholder {self.var_name} is not defined") placeholders.append(self.var_name)
[docs] def add_precursor(self, text: str) -> "Required": """Prepends any placeholder names by `text`. """ return Required(text + self.var_name, self.var_formatting)
[docs] def remove_precursors(self, placeholders=None): if placeholders is None: new_name = self.var_name.split('/')[-1] else: key = placeholders.find_key(self.var_name) new_name = self.var_name if key is None else key return Required(new_name, self.var_formatting)
def __eq__(self, other): if not isinstance(other, Required): return NotImplemented return (self.var_name == other.var_name) & (self.var_formatting == other.var_formatting)
[docs] class OptionalPart(Part): def __init__(self, sub_template: "TemplateParts"): """ Optional part of template (between square brackets) Optional part can contain literal and required parts :param sub_template: part of the template within square brackets """ self.sub_template = sub_template def __str__(self): return '[' + str(self.sub_template) + ']'
[docs] def fill_single_placeholders(self, placeholders: Placeholders, ignore_type=False): new_opt = self.sub_template.fill_single_placeholders(placeholders, ignore_type=ignore_type) if len(new_opt.required_placeholders()) == 0: return (Literal(str(new_opt)), ) return (OptionalPart(new_opt), )
[docs] def optional_placeholders(self, ): return self.sub_template.required_placeholders()
[docs] def contains_optionals(self, placeholders=None): if placeholders is None and len(self.optional_placeholders()) > 0: return True return len(self.optional_placeholders().intersection(placeholders)) > 0
[docs] def append_placeholders(self, placeholders, valid=None): try: placeholders.extend(self.sub_template.ordered_placeholders(valid=valid)) except ValueError: pass
[docs] def add_precursor(self, text: str) -> "OptionalPart": return OptionalPart(TemplateParts([p.add_precursor(text) for p in self.sub_template.parts]))
[docs] def for_defined(self, placeholder_names: Set[str]) -> List["Part"]: """Returns the template string assuming the placeholders in `placeholder_names` are defined Removes any optional parts, whose placeholders are not in `placeholder_names`. """ if len(self.optional_placeholders().difference(placeholder_names)) > 0: return [] return list(self.sub_template.parts)
[docs] def remove_precursors(self, placeholders=None): return OptionalPart(self.sub_template.remove_precursors(placeholders))
def __eq__(self, other): if not isinstance(other, OptionalPart): return NotImplemented return (self.sub_template == other.sub_template)
[docs] class TemplateParts: """ The parts of a larger template """ optional_re = re.compile(r'(\[.*?\])') requires_re = re.compile(r'(\{.*?\})') def __init__(self, parts: Sequence[Part]): if isinstance(parts, str): raise ValueError("Input to Template should be a sequence of parts; " + "did you mean to call `TemplateParts.parse` instead?") self.parts = tuple(parts)
[docs] @staticmethod @lru_cache(1000) def parse(text: str) -> "TemplateParts": """Parses a template string into its constituent parts Raises: ValueError: raised if a parsing error is Returns: TemplateParts: object that contains the parts of the template """ parts: List[Part] = [] for optional_parts in TemplateParts.optional_re.split(text): if len(optional_parts) > 0 and optional_parts[0] == '[' and optional_parts[-1] == ']': if '[' in optional_parts[1:-1] or ']' in optional_parts[1:-1]: raise ValueError(f'Can not parse {text}, because unmatching square brackets were found') parts.append(OptionalPart(TemplateParts.parse(optional_parts[1:-1]))) else: for required_parts in TemplateParts.requires_re.split(optional_parts): if len(required_parts) > 0 and required_parts[0] == '{' and required_parts[-1] == '}': if ':' in required_parts: var_name, var_type = required_parts[1:-1].split(':') else: var_name, var_type = required_parts[1:-1], '' parts.append(Required(var_name, var_type)) else: parts.append(Literal(required_parts)) return TemplateParts(parts)
def __str__(self): """ Returns the template as a string """ return os.path.normpath(''.join([str(p) for p in self.parts]))
[docs] def optional_placeholders(self, ) -> Set[str]: """Set of optional placeholders """ if len(self.parts) == 0: return set() optionals = set.union(*[p.optional_placeholders() for p in self.parts]) return optionals.difference(self.required_placeholders())
[docs] def required_placeholders(self, ) -> Set[str]: """Set of required placeholders """ if len(self.parts) == 0: return set() return set.union(*[p.required_placeholders() for p in self.parts])
[docs] def ordered_placeholders(self, valid=None) -> List[str]: """Sequence of all placeholders in order (can contain duplicates) """ ordered_vars: List[str] = [] for p in self.parts: p.append_placeholders(ordered_vars, valid=valid) return ordered_vars
[docs] def fill_known(self, placeholders: Placeholders, ignore_type=False) -> MyDataArray: """Fill in the known placeholders Any optional parts, where all placeholders have been filled will be automatically replaced """ single, multi = placeholders.split() return self.remove_precursors(placeholders)._fill_known_helper(single, multi, ignore_type=ignore_type)
def _fill_known_helper(self, single: Placeholders, multi: Placeholders, ignore_type=False) -> MyDataArray: new_template = self.fill_single_placeholders(single, ignore_type=ignore_type) for name in new_template.ordered_placeholders(): use_name = multi.find_key(name) if use_name is None: continue new_multi = multi.copy() if use_name in multi.linkages: values = multi[multi.linkages[use_name]] keys = tuple(sorted(values.keys())) index = (keys, zip(*[values[k] for k in keys])) del new_multi[new_multi.linkages[use_name]] else: values = {use_name: list(multi[name])} index = (use_name, values[use_name]) del new_multi[use_name] assert use_name is not None parts = [] new_single = single.copy() for idx in range(len(values[use_name])): new_vals = {n: v[idx] for n, v in values.items()} new_single.mapping.update(new_vals) parts.append(new_template._fill_known_helper(new_single, new_multi, ignore_type=ignore_type)) return MyDataArray.concat(parts, index) return MyDataArray(np.array(new_template), [])
[docs] def fill_single_placeholders(self, placeholders: Placeholders, ignore_type=False) -> "TemplateParts": """ Fills in placeholders with singular values Assumes that all placeholders are in fact singular """ res = [p.fill_single_placeholders(placeholders, ignore_type=ignore_type) for p in self.parts] return TemplateParts(list(chain(*res)))
[docs] def remove_optionals(self, optionals=None) -> "TemplateParts": """ Removes any optionals containing the provided placeholders (default: remove all) """ return TemplateParts([p for p in self.parts if not p.contains_optionals(optionals)])
[docs] def all_matches(self, ) -> List[Dict[str, Any]]: """Finds all potential matches to existing templates Returns a list with the possible combination of values for the placeholders. """ required = self.required_placeholders() optional = self.optional_placeholders() matches = [] already_globbed = {} for defined_optionals in [c for n in range(len(optional) + 1) for c in combinations(optional, n)]: glob_placeholders = Placeholders(**{req: '*' for req in required}, **{opt: '*' for opt in defined_optionals}) new_glob = str(self.fill_single_placeholders(glob_placeholders, ignore_type=True).remove_optionals()) while '**' in new_glob: new_glob = new_glob.replace('**', '*') if new_glob not in already_globbed: already_globbed[new_glob] = glob(new_glob) res = [] vars = required.union(defined_optionals) for p in self.parts: res.extend(p.for_defined(vars)) parser = TemplateParts(res).get_parser() for fn in already_globbed[new_glob]: try: placeholders = parser(fn) except ValueError: continue for var_name in optional: if var_name not in placeholders: placeholders[var_name] = None matches.append(placeholders) return matches
[docs] def resolve(self, placeholders, ignore_type=False) -> MyDataArray: """ Resolves the template given a set of placeholders :param placeholders: mapping of placeholder names to values :param ignore_type: if True, ignore the type formatting when filling in placeholders :return: cleaned string """ return self.fill_known(placeholders, ignore_type=ignore_type).map(lambda t: t.remove_optionals())
[docs] def optional_subsets(self, ) -> Iterator["TemplateParts"]: """ Yields template sub-sets with every combination optional placeholders """ optionals = self.optional_placeholders() for n_optional in range(len(optionals) + 1): for exclude_optional in itertools.combinations(optionals, n_optional): yield self.remove_optionals(exclude_optional)
[docs] def extract_placeholders(self, filename, known_vars=None): """ Extracts the placeholder values from the filename :param filename: filename :param known_vars: already known placeholders :return: dictionary from placeholder names to string representations (unused placeholders set to None) """ if known_vars is not None: template = self.fill_known(known_vars) else: template = self while '//' in filename: filename = filename.replace('//', '/') required = template.required_placeholders() optional = template.optional_placeholders() results = [] for to_fill in template.optional_subsets(): sub_re = str(to_fill.fill_known( {var: r'(\S+)' for var in required.union(optional)}, )) while '//' in sub_re: sub_re = sub_re.replace('//', '/') sub_re = sub_re.replace('.', r'\.') match = re.match(sub_re, filename) if match is None: continue extracted_value = {} ordered_vars = to_fill.ordered_placeholders() assert len(ordered_vars) == len(match.groups()) failed = False for var, value in zip(ordered_vars, match.groups()): if var in extracted_value: if value != extracted_value[var]: failed = True break else: extracted_value[var] = value if failed or any('/' in value for value in extracted_value.values()): continue for name in template.optional_placeholders(): if name not in extracted_value: extracted_value[name] = None if known_vars is not None: extracted_value.update(known_vars) results.append(extracted_value) if len(results) == 0: raise ValueError("{} did not match {}".format(filename, template)) def score(placeholders): """ The highest score is given to the set of placeholders that: 1. has used the largest amount of optional placeholders 2. has the shortest text within the placeholders (only used if equal at 1 """ number_used = len([v for v in placeholders.values() if v is not None]) length_hint = sum([len(v) for v in placeholders.values() if v is not None]) return number_used * 1000 - length_hint best = max(results, key=score) for var in results: if best != var and score(best) == score(var): raise KeyError("Multiple equivalent ways found to parse {} using {}".format(filename, template)) return best
[docs] def get_parser(self): if any(isinstance(p, OptionalPart) for p in self.parts): raise ValueError("Can not parse filename when there are optional parts in the template") mapping = {old_key: ''.join(new_key) for old_key, new_key in zip(self.required_placeholders(), itertools.product(*[string.ascii_letters] * 3))} reverse = {new_key: old_key for old_key, new_key in mapping.items()} cleaned = TemplateParts([Required(mapping[p.var_name], p.var_formatting) if isinstance(p, Required) else p for p in self.parts]) parser = compile(str(cleaned), case_sensitive=True) def parse_filename(filename): result = parser.parse(filename) if result is None: raise ValueError(f"template string ({str(self)}) does not mach filename ({filename})") named = result.named if any(isinstance(value, str) and '/' in value for value in named.values()): raise ValueError("Placeholder can not span directories") return {reverse[key]: value for key, value in named.items()} return parse_filename
[docs] def remove_precursors(self, placeholders=None): """Replaces keys to those existing in the placeholders If no placeholders provided all precursors are removed """ return TemplateParts([p.remove_precursors(placeholders) for p in self.parts])
def __eq__(self, other): if not isinstance(other, TemplateParts): return NotImplemented return (len(self.parts) == len(other.parts)) and all(p1 == p2 for p1, p2 in zip(self.parts, other.parts))