You're reading an old version of this documentation. If you want up-to-date information, please have a look at stable (v1.6.0).
Source code for file_tree.template
"""Define Placeholders and Template interface."""
import itertools
import os
import re
import string
from collections import defaultdict
from collections.abc import MutableMapping
from functools import cmp_to_key, lru_cache
from glob import glob
from itertools import chain, combinations, product
from pathlib import Path
from typing import (
Any,
Dict,
FrozenSet,
Generator,
Iterable,
Iterator,
List,
Optional,
Sequence,
Set,
Tuple,
)
import numpy as np
import pandas as pd
import xarray
from parse import compile, extract_format
[docs]
def is_singular(value):
"""Whether a value is singular or has multiple options."""
if isinstance(value, str):
return True
try:
iter(value)
return False
except TypeError:
return True
[docs]
class Placeholders(MutableMapping):
"""Dictionary-like object containing the placeholder values.
It understands about sub-trees
(i.e., if "<sub_tree>/<placeholder>" does not exist it will return "<placeholder>" instead).
"""
def __init__(self, *args, **kwargs):
"""Create a new Placeholders as any dictionary."""
self.mapping = {}
self.linkages: Dict[str : FrozenSet[str]] = {}
self.update(dict(*args, **kwargs))
[docs]
def copy(self):
"""Create copy of placeholder values."""
p = Placeholders()
p.mapping = dict(self.mapping)
p.linkages = dict(self.linkages)
return p
def __getitem__(self, key: str):
"""Get placeholder values respecting sub-tree placeholders."""
actual_key = self.find_key(key)
if actual_key is None:
raise KeyError(f"No parameter value available for {key}")
if actual_key in self.linkages:
return self.mapping[self.linkages[actual_key]][actual_key]
return self.mapping[actual_key]
def __delitem__(self, key):
"""Delete placeholder values represented by key."""
if isinstance(key, tuple):
key = frozenset(key)
del self.mapping[key]
if isinstance(key, frozenset):
for k in key:
del self.linkages[k]
def __setitem__(self, key, value):
"""Overwrite placeholder value taking adjusting linked placeholders if needed."""
if isinstance(key, tuple): # create linked placeholders
if len(key) != len(value):
raise ValueError(
f"Attempting to set linked placeholders for {key}, "
+ f"but {value} has a different number of elements than {key}"
)
if any([len(value[0]) != len(v) for v in value]):
raise ValueError(
f"Attempting to set linked placeholders for {key}, "
+ f"but not all elements in {value} have the same length."
)
value = {k: v for k, v in zip(key, value)}
key = frozenset(key)
if isinstance(key, frozenset):
for k in list(key):
if k in self.linkages:
unmatched_keys = [
unmatched
for unmatched in self.linkages[k]
if unmatched not in key
]
if len(unmatched_keys) > 0:
raise ValueError(
f"Attempting to set linked placeholders for {key}, "
+ f"but {k} is already linked to {unmatched_keys}."
)
self.mapping[key] = value
for k in list(key):
self.linkages[k] = key
if k in self.mapping:
del self.mapping[k]
elif key in self.linkages:
old_values = self.mapping[self.linkages[key]]
if is_singular(value):
nvalue = old_values[key].count(value)
if nvalue == 0:
raise ValueError(
f"Can not overwrite placeholder {key} "
+ f"with new values as it is linked to: {self.linkages[key]}"
)
elif nvalue == 1:
self.unlink(*old_values.keys())
idx = old_values[key].index(value)
for skey in old_values:
self.mapping[skey] = old_values[skey][idx]
else:
idx = [i for i, v in enumerate(old_values[key]) if v == value]
for skey in old_values:
old_values[skey] = tuple(old_values[skey][i] for i in idx)
else:
if all(v in old_values[key] for v in value):
idx = []
for new_v in value:
for i, old_v in enumerate(old_values[key]):
if old_v == new_v and i not in idx:
idx.append(i)
for skey in old_values:
old_values[skey] = tuple(old_values[skey][i] for i in idx)
else:
raise ValueError(
f"Can not overwrite placeholder {key} "
+ f"with new values as it is linked to: {self.linkages[key]}"
)
else:
self.mapping[key] = value
def __iter__(self):
"""Iterate over all placeholder keys that actually have values."""
for key in self.mapping:
if self.mapping[key] is not None:
yield key
def __len__(self):
"""Return number of keys in the mapping."""
return len([k for k, v in self.mapping.items() if v is not None])
def __repr__(self):
"""Text representation of placeholder values."""
return f"Placeholders({self.mapping})"
[docs]
def find_key(self, key: str) -> Optional[str]:
"""Find the actual key containing the value.
Will look for:
- not None value for the key itself
- not None value for any parent (i.e, for key "A/B", will look for "B" as well)
- otherwise will return None
Args:
key (str): placeholder name
Returns:
None if no value for the key is available, otherwise the key used to index the value
"""
if not isinstance(key, str):
key = frozenset(key)
elif key in self.linkages:
return key
if self.mapping.get(key, None) is not None:
return key
elif "/" in key:
*sub_trees, _, actual_key = key.split("/")
new_key = "/".join([*sub_trees, actual_key])
return self.find_key(new_key)
else:
return None
[docs]
def split(self) -> Tuple["Placeholders", "Placeholders"]:
"""Split all placeholders into those with a single value or those with multiple values.
Placeholders are considered to have multiple values if they are equivalent to 1D-arrays (lists, tuples, 1D ndarray, etc.).
Anything else is considered a single value (string, int, float, etc.).
Returns:
Tuple with two dictionaries:
1. placeholders with single values
2. placehodlers with multiple values
"""
single_placeholders = Placeholders()
multi_placeholders = Placeholders()
for name, value in self.mapping.items():
if isinstance(name, frozenset) or not is_singular(value):
multi_placeholders[name] = value
else:
single_placeholders[name] = value
return single_placeholders, multi_placeholders
[docs]
def iter_over(self, keys) -> Generator["Placeholders", None, None]:
"""Iterate over the placeholder names.
Args:
keys (Sequence[str]): sequence of placeholder names to iterate over
Raises:
KeyError: Raised if any of the provided `keys` does not have any value.
Yields:
yield Placeholders object, where each of the listed keys only has a single possible value
"""
actual_keys = [self.linkages.get(self.find_key(key), key) for key in keys]
unfilled = {orig for orig, key in zip(keys, actual_keys) if key is None}
if len(unfilled) > 0:
raise KeyError(f"Can not iterate over undefined placeholders: {unfilled}")
unique_keys = []
iter_values = {}
for key in actual_keys:
if key not in unique_keys:
if isinstance(key, frozenset): # linked placeholder
unique_keys.append(key)
iter_values[key] = [
{k: self[k][idx] for k in key}
for idx in range(len(self[list(key)[0]]))
]
elif not is_singular(self[key]): # iterable placeholder
unique_keys.append(key)
iter_values[key] = self[key]
for values in product(*[iter_values[k] for k in unique_keys]):
new_vars = Placeholders(self)
for key, value in zip(unique_keys, values):
if isinstance(key, frozenset):
del new_vars[key] # break the placeholders link
new_vars.update(value)
else:
new_vars[key] = value
yield new_vars
[docs]
def link(self, *keys):
"""
Link the placeholders represented by `keys`.
When iterating over linked placeholders the i-th tree
will contain the i-th element from all linked placeholders,
instead of the tree containing all possible combinations of placeholder values.
This can be thought of using `zip` for linked variables and
`itertools.product` for unlinked ones.
"""
self[keys] = [self[key] for key in keys]
[docs]
def unlink(self, *keys):
"""
Unlink the placeholders represented by `keys`.
See :meth:`link` for how linking affects the iteration
through placeholders with multiple values.
Raises a ValueError if the placeholders are not actually linked.
"""
if keys not in self:
raise ValueError(f"{keys} were not linked, so cannot unlink them")
new_vars = {k: self[k] for k in keys}
del self[keys]
self.update(new_vars)
[docs]
class MyDataArray:
"""Wrapper around xarray.DataArray for internal usage.
It tries to delay creating the DataArray object as long as possible
(as using them for small arrays is slow...).
"""
def __init__(self, data, coords=None):
"""Create a new DataArray look-a-like."""
self.as_xarray = coords is None
if self.as_xarray:
assert isinstance(data, xarray.DataArray)
self.data_array = data
else:
self.data = data
self.coords = coords
[docs]
def map(self, func) -> "MyDataArray":
"""Apply `func` to each element of array."""
if self.as_xarray:
return MyDataArray(
xarray.apply_ufunc(func, self.data_array, vectorize=True)
)
else:
return MyDataArray(
np.array([func(d) for d in self.data.flat]).reshape(self.data.shape),
self.coords,
)
[docs]
def to_xarray(
self,
) -> xarray.DataArray:
"""Convert to a real xarray.DataArray."""
if self.as_xarray:
return self.data_array
else:
return xarray.DataArray(
self.data, [_to_index(name, values) for name, values in self.coords]
)
[docs]
@staticmethod
def concat(parts, new_index) -> "MyDataArray":
"""Combine multiple DataArrays."""
if len(parts) == 0:
return MyDataArray(np.array([]), [])
to_xarray = any(p.as_xarray for p in parts) or any(
len(p.coords) != len(parts[0].coords)
or any(
np.all(name1 != name2)
for (name1, _), (name2, _) in zip(p.coords, parts[0].coords)
)
for p in parts
)
if to_xarray:
return MyDataArray(
xarray.concat([p.to_xarray() for p in parts], _to_index(*new_index))
)
else:
new_data = np.stack([p.data for p in parts], axis=0)
new_coords = list(parts[0].coords)
new_coords.insert(0, new_index)
return MyDataArray(new_data, new_coords)
def _to_index(name, values):
"""Convert to index for MyDataArray."""
if isinstance(name, str):
return pd.Index(values, name=name)
else:
return pd.MultiIndex.from_tuples(values, names=name)
[docs]
class Template:
"""Represents a single template in the FileTree."""
def __init__(self, parent: Optional["Template"], unique_part: str):
"""Create a new child template in `parent` directory with `unique_part` filename."""
self.parent = parent
self.unique_part = unique_part
@property
def as_path(self) -> Path:
"""Return the full path with no placeholders filled in."""
if self.parent is None:
return Path(self.unique_part)
return self.parent.as_path.joinpath(self.unique_part)
@property
def as_string(self):
"""Return the full path with no placeholders filled in."""
if self.parent is None:
return str(self.unique_part)
return os.path.join(self.parent.as_string, str(self.unique_part))
def __str__(self):
"""Return string representation of template."""
return f"Template({self.as_string})"
[docs]
def children(self, templates: Iterable["Template"]) -> List["Template"]:
"""Find children from a sequence of templates.
Args:
templates: sequence of possible child templates.
Returns:
list of children templates
"""
res = []
for t in templates:
if t.parent is self and t not in res:
res.append(t)
return res
[docs]
def as_multi_line(
self, other_templates: Dict[str, "Template"], indentation=4
) -> str:
"""Generate a string describing this and any child templates.
Args:
other_templates (Dict[str, Template]):
templates including all the child templates and itself.
indentation (int, optional):
number of spaces to use as indentation. Defaults to 4.
Returns:
str: multi-line string that can be processed by :meth:`file_tree.FileTree.read`
"""
result = self._as_multi_line_helper(other_templates, indentation)
is_top_level = other_templates[""] is self
if not is_top_level and self.parent is None:
return "!" + result
else:
return result
def _as_multi_line_helper(
self,
other_templates: Dict[str, "Template"],
indentation=4,
_current_indentation=0,
) -> str:
leaves = []
branches = []
for t in sorted(
self.children(other_templates.values()), key=lambda t: t.unique_part
):
if len(t.children(other_templates.values())) == 0:
leaves.append(t)
else:
branches.append(t)
is_top_level = other_templates[""] is self
if is_top_level:
base_line = "."
assert _current_indentation == 0 and self.parent is None
_current_indentation = -indentation
else:
base_line = _current_indentation * " " + self.unique_part
all_keys = []
for key, value in other_templates.items():
if value is not self:
continue
if is_top_level and key == "":
continue
all_keys.append(key)
if is_top_level and len(all_keys) == 0:
lines = []
elif len(all_keys) == 1 and all_keys[0] == self.guess_key():
lines = [base_line]
else:
assert len(all_keys) > 0
lines = [base_line + f' ({",".join(all_keys)})']
already_done = set()
for t in leaves + branches:
if t not in already_done:
lines.append(
t._as_multi_line_helper(
other_templates, indentation, indentation + _current_indentation
)
)
already_done.add(t)
return "\n".join(lines)
@property
def _parts(
self,
):
return TemplateParts.parse(self.as_string)
[docs]
def placeholders(self, valid=None) -> List[str]:
"""Return a list of the placeholder names.
Args:
valid: Collection of valid placeholder names.
An error is raised if any other placeholder is detected.
By default all placeholder names are fine.
Returns:
List[str]: placeholder names in order that they appear in the template
"""
return self._parts.ordered_placeholders(valid)
[docs]
def format_single(
self, placeholders: Placeholders, check=True, keep_optionals=False
) -> str:
"""Format the template with the placeholders filled in.
Only placeholders with a single value are considered.
Args:
placeholders (Placeholders): values to fill into the placeholder
check (bool): skip check for missing placeholders if set to True
keep_optionals: if True keep optional parameters that have not been set (will cause the check to fail)
Raises:
KeyError: if any placeholder is missing
Returns:
str: filled in template
"""
single_placeholders, _ = placeholders.split()
template = self._parts.fill_single_placeholders(single_placeholders)
if not keep_optionals:
template = template.remove_optionals()
if check:
unfilled = template.required_placeholders()
if len(unfilled) > 0:
raise KeyError(f"Missing placeholder values for {unfilled}")
return str(template)
[docs]
def format_mult(
self, placeholders: Placeholders, check=False, filter=False, matches=None
) -> xarray.DataArray:
"""Replace placeholders in template with the provided placeholder values.
Args:
placeholders: mapping from placeholder names to single or multiple vaalues
check: skip check for missing placeholders if set to True
filter: filter out non-existing files if set to True
matches: Optional pre-generated list of any matches to the template.
Raises:
KeyError: if any placeholder is missing
Returns:
xarray.DataArray: array with possible resolved paths.
If `filter` is set to True the non-existent paths are replaced by None
"""
parts = self._parts
resolved = parts.resolve(placeholders)
if check:
for template in resolved.data.flatten():
unfilled = template.required_placeholders()
if len(unfilled) > 0:
raise KeyError(f"Missing placeholder values for {unfilled}")
paths = resolved.map(lambda t: str(t))
if not filter:
return paths.to_xarray()
placeholder_dict = dict(placeholders)
path_matches = [
str(
parts.fill_single_placeholders(
Placeholders({**placeholder_dict, **match})
).remove_optionals()
)
for match in (
self.all_matches(placeholders) if matches is None else matches
)
]
return paths.map(lambda p: p if p in path_matches else "").to_xarray()
[docs]
def optional_placeholders(
self,
) -> Set[str]:
"""Find all placeholders that are only within optional blocks (i.e., they do not require a value).
Returns:
Set[str]: names of optional placeholders
"""
return self._parts.optional_placeholders()
[docs]
def required_placeholders(
self,
) -> Set[str]:
"""Find all placeholders that are outside of optional blocks (i.e., they do require a value).
Returns:
Set[str]: names of required placeholders
"""
return self._parts.required_placeholders()
[docs]
def guess_key(
self,
) -> str:
"""Propose a short name for the template.
The proposed short name is created by:
- taking the basename (i.e., last component) of the path
- removing the first '.' and everything beyond (to remove the extension)
.. warning::
If there are multiple dots within the path's basename,
this might remove far more than just the extension.
Returns:
str: proposed short name for this template (used if user does not provide one)
"""
parts = self.as_path.parts
if len(parts) == 0:
return ""
else:
return parts[-1].split(".")[0]
[docs]
def add_precursor(self, text) -> "Template":
"""Return a new Template with any placeholder names in the unique part now preceded by `text`.
Used for adding sub-trees
"""
parts = TemplateParts.parse(self.unique_part).parts
updated = "".join([str(p.add_precursor(text)) for p in parts])
return Template(self.parent, updated)
[docs]
def get_all_placeholders(
self, placeholders: Placeholders, matches=None
) -> Placeholders:
"""Fill placeholders with possible values based on what is available on disk.
Args:
placeholders: New values for undefined placeholders in template.
matches: Optional pre-generated list of any matches to the template.
Returns:
Set of placeholders updated based on filed existing on disk that match this template.
"""
undefined = defaultdict(set)
for match in self.all_matches(placeholders) if matches is None else matches:
for name, value in match.items():
if name not in placeholders:
undefined[name].add(value)
def cmp(item1, item2):
if item1 is None:
return -1
if item2 is None:
return 1
if item1 < item2:
return -1
if item1 > item2:
return 1
return 0
return Placeholders(
{k: sorted(v, key=cmp_to_key(cmp)) for k, v in undefined.items()}
)
[docs]
def all_matches(self, placeholders: Placeholders):
"""Return a sequence of all possible variable values matching existing files on disk.
Only variable values matching existing placeholder values are returned
(undefined placeholders are unconstrained).
"""
single_vars, multi_vars = placeholders.split()
filled_template: TemplateParts = self._parts.fill_single_placeholders(
single_vars
)
res = []
def check_name_with_edit(match, name):
value = match[name]
if name in multi_vars and multi_vars.find_key(name) == name:
if value not in multi_vars[name]:
return False
return True
if "/" in name:
del match[name]
parent_name = name[name.find("/") + 1 :]
if parent_name in match:
return match[parent_name] == value
match[parent_name] = value
return check_name_with_edit(match, parent_name)
return True
for match in filled_template.all_matches():
if not all(
check_name_with_edit(match, name) for name in list(match.keys())
):
continue
res.append(match)
return res
[docs]
def rich_line(self, all_templates):
"""Produce a line for rendering using rich."""
all_keys = [key for key, value in all_templates.items() if value == self]
base = self.guess_key()
unique_part = str(self.unique_part)
if base in all_keys:
all_keys.remove(base)
unique_part = str.replace(unique_part, base, f"[cyan]{base}[/cyan]")
if len(all_keys) == 0:
return unique_part
return (
unique_part
+ " ("
+ ", ".join("[cyan]" + key + "[/cyan]" for key in all_keys)
+ ")"
)
[docs]
def extract_placeholders(template, filename, known_vars=None):
"""
Extract the placeholder values from the filename.
:param template: template matching the given filename
:param filename: filename
:param known_vars: already known placeholders
:return: dictionary from placeholder names to string representations
(unused placeholders set to None)
"""
return TemplateParts.parse(template).extract_placeholders(filename, known_vars)
[docs]
class Part:
"""
Individual part of a template.
3 subclasses are defined:
- :class:`Literal`:
piece of text
- :class:`Required`:
required placeholder to fill in
(between curly brackets)
- :class:`OptionalPart`:
part of text containing optional placeholders
(between square brackets)
"""
[docs]
def fill_single_placeholders(
self, placeholders: Placeholders, ignore_type=False
) -> Sequence["Part"]:
"""Fill in the given placeholders."""
return (self,)
[docs]
def optional_placeholders(
self,
) -> Set[str]:
"""Return all placeholders in optional parts."""
return set()
[docs]
def required_placeholders(
self,
) -> Set[str]:
"""Return all required placeholders."""
return set()
[docs]
def contains_optionals(self, placeholders: Set["Part"] = None):
"""Return True if this part contains the optional placeholders."""
return False
[docs]
def append_placeholders(self, placeholders: List[str], valid=None):
"""Append the placeholders in this part to the provided list in order."""
pass
[docs]
def add_precursor(self, text: str) -> "Part":
"""Prepend any placeholder names by `text`."""
return self
[docs]
def for_defined(self, placeholder_names: Set[str]) -> List["Part"]:
"""Return the template string assuming the placeholders in `placeholder_names` are defined.
Removes any optional parts, whose placeholders are not in `placeholder_names`.
"""
return [self]
[docs]
def remove_precursors(self, placeholders=None):
"""Remove precursor from placeholder key."""
return self
[docs]
class Literal(Part):
"""Piece of text in template without placeholders."""
def __init__(self, text: str):
"""
Literal part is defined purely by the text it contains.
:param text: part of the template
"""
self.text = text
def __str__(self):
"""Return this part of the template as a string."""
return self.text
def __eq__(self, other):
"""Check if text matches other `Literal`."""
if not isinstance(other, Literal):
return NotImplemented
return self.text == other.text
[docs]
class Required(Part):
"""Placeholder part of template that requires a value."""
def __init__(self, var_name, var_formatting=None):
"""
Create required part of template (between curly brackets).
Required placeholder part of template is defined by placeholder name and its format
:param var_name: name of placeholder
:param var_formatting: how to format the placeholder
"""
self.var_name = var_name
self.var_formatting = var_formatting
def __str__(self):
"""Return this part of the template as a string."""
if self.var_formatting is None or len(self.var_formatting) == 0:
return "{" + self.var_name + "}"
else:
return "{" + self.var_name + ":" + self.var_formatting + "}"
[docs]
def fill_single_placeholders(self, placeholders: Placeholders, ignore_type=False):
"""Fill placeholder values into template obeying typing."""
value = placeholders.get(self.var_name, None)
if value is None:
return (self,)
else:
if not ignore_type and len(self.var_formatting) > 0:
format_type = extract_format(self.var_formatting, [])["type"]
if format_type in list(r"dnbox"):
value = int(value)
elif format_type in list(r"f%eg"):
value = float(value)
elif format_type in ["t" + ft for ft in "iegachs"] and isinstance(
value, str
):
from dateutil import parser
value = parser(value)
res = TemplateParts.parse(
format(value, "" if ignore_type else self.var_formatting)
)
if len(res.parts) == 1:
return res.parts
return res.fill_single_placeholders(
placeholders, ignore_type=ignore_type
).parts
[docs]
def append_placeholders(self, placeholders, valid=None):
"""Add placeholder name to list of placeholders in template."""
if valid is not None and self.var_name not in valid:
raise ValueError(f"Placeholder {self.var_name} is not defined")
placeholders.append(self.var_name)
[docs]
def add_precursor(self, text: str) -> "Required":
"""Prepend any placeholder names by `text`."""
return Required(text + self.var_name, self.var_formatting)
[docs]
def remove_precursors(self, placeholders=None):
"""Remove precursor from placeholder key."""
if placeholders is None:
new_name = self.var_name.split("/")[-1]
else:
key = placeholders.find_key(self.var_name)
new_name = self.var_name if key is None else key
return Required(new_name, self.var_formatting)
def __eq__(self, other):
"""Check whether `other` placeholder matches this one."""
if not isinstance(other, Required):
return NotImplemented
return (self.var_name == other.var_name) & (
self.var_formatting == other.var_formatting
)
[docs]
class OptionalPart(Part):
"""Optional part of a template (i.e., between square brackets)."""
def __init__(self, sub_template: "TemplateParts"):
"""
Create optional part of template (between square brackets).
Optional part can contain literal and required parts
:param sub_template: part of the template within square brackets
"""
self.sub_template = sub_template
def __str__(self):
"""Return string representation of optional part."""
return "[" + str(self.sub_template) + "]"
[docs]
def fill_single_placeholders(self, placeholders: Placeholders, ignore_type=False):
"""Fill placeholders into text within optional part."""
new_opt = self.sub_template.fill_single_placeholders(
placeholders, ignore_type=ignore_type
)
if len(new_opt.required_placeholders()) == 0:
return (Literal(str(new_opt)),)
return (OptionalPart(new_opt),)
[docs]
def optional_placeholders(self):
"""Return sequence of any placeholders in the optional part of the template."""
return self.sub_template.required_placeholders()
[docs]
def contains_optionals(self, placeholders=None):
"""Check if this optional part contains any placeholders not listed in `placeholders`."""
if placeholders is None and len(self.optional_placeholders()) > 0:
return True
return len(self.optional_placeholders().intersection(placeholders)) > 0
[docs]
def append_placeholders(self, placeholders, valid=None):
"""Add any placeholders in the optional part to `placeholders` list."""
try:
placeholders.extend(self.sub_template.ordered_placeholders(valid=valid))
except ValueError:
pass
[docs]
def add_precursor(self, text: str) -> "OptionalPart":
"""Prepend precursor `text` to any placeholders in the optional part."""
return OptionalPart(
TemplateParts([p.add_precursor(text) for p in self.sub_template.parts])
)
[docs]
def for_defined(self, placeholder_names: Set[str]) -> List["Part"]:
"""
Return the template string assuming the placeholders in `placeholder_names` are defined.
Removes any optional parts, whose placeholders are not in `placeholder_names`.
"""
if len(self.optional_placeholders().difference(placeholder_names)) > 0:
return []
return list(self.sub_template.parts)
[docs]
def remove_precursors(self, placeholders=None):
"""Remove precursor from placeholder key."""
return OptionalPart(self.sub_template.remove_precursors(placeholders))
def __eq__(self, other):
"""Check whether two optional parts match."""
if not isinstance(other, OptionalPart):
return NotImplemented
return self.sub_template == other.sub_template
[docs]
class TemplateParts:
"""Representation of full template as sequence of `Part` objects."""
optional_re = re.compile(r"(\[.*?\])")
requires_re = re.compile(r"(\{.*?\})")
def __init__(self, parts: Sequence[Part]):
"""Create new TemplateParts based on sequence."""
if isinstance(parts, str):
raise ValueError(
"Input to Template should be a sequence of parts; "
+ "did you mean to call `TemplateParts.parse` instead?"
)
self.parts = tuple(parts)
[docs]
@staticmethod
@lru_cache(1000)
def parse(text: str) -> "TemplateParts":
"""Parse a template string into its constituent parts.
Args:
text: template as string.
Raises:
ValueError: raised if a parsing error is
Returns:
TemplateParts: object that contains the parts of the template
"""
parts: List[Part] = []
for optional_parts in TemplateParts.optional_re.split(text):
if (
len(optional_parts) > 0
and optional_parts[0] == "["
and optional_parts[-1] == "]"
):
if "[" in optional_parts[1:-1] or "]" in optional_parts[1:-1]:
raise ValueError(
f"Can not parse {text}, because unmatching square brackets were found"
)
parts.append(OptionalPart(TemplateParts.parse(optional_parts[1:-1])))
else:
for required_parts in TemplateParts.requires_re.split(optional_parts):
if (
len(required_parts) > 0
and required_parts[0] == "{"
and required_parts[-1] == "}"
):
if ":" in required_parts:
var_name, var_type = required_parts[1:-1].split(":")
else:
var_name, var_type = required_parts[1:-1], ""
parts.append(Required(var_name, var_type))
else:
parts.append(Literal(required_parts))
return TemplateParts(parts)
def __str__(self):
"""Return the template as a string."""
return os.path.normpath("".join([str(p) for p in self.parts]))
[docs]
def optional_placeholders(
self,
) -> Set[str]:
"""Set of optional placeholders."""
if len(self.parts) == 0:
return set()
optionals = set.union(*[p.optional_placeholders() for p in self.parts])
return optionals.difference(self.required_placeholders())
[docs]
def required_placeholders(
self,
) -> Set[str]:
"""Set of required placeholders."""
if len(self.parts) == 0:
return set()
return set.union(*[p.required_placeholders() for p in self.parts])
[docs]
def ordered_placeholders(self, valid=None) -> List[str]:
"""Sequence of all placeholders in order (can contain duplicates)."""
ordered_vars: List[str] = []
for p in self.parts:
p.append_placeholders(ordered_vars, valid=valid)
return ordered_vars
[docs]
def fill_known(self, placeholders: Placeholders, ignore_type=False) -> MyDataArray:
"""Fill in the known placeholders.
Any optional parts, where all placeholders have been filled
will be automatically replaced.
"""
single, multi = placeholders.split()
return self.remove_precursors(placeholders)._fill_known_helper(
single, multi, ignore_type=ignore_type
)
def _fill_known_helper(
self, single: Placeholders, multi: Placeholders, ignore_type=False
) -> MyDataArray:
"""Do work for `fill_known`."""
new_template = self.fill_single_placeholders(single, ignore_type=ignore_type)
for name in new_template.ordered_placeholders():
use_name = multi.find_key(name)
if use_name is None:
continue
new_multi = multi.copy()
if use_name in multi.linkages:
values = multi[multi.linkages[use_name]]
keys = tuple(sorted(values.keys()))
index = (keys, zip(*[values[k] for k in keys]))
del new_multi[new_multi.linkages[use_name]]
else:
values = {use_name: list(multi[name])}
index = (use_name, values[use_name])
del new_multi[use_name]
assert use_name is not None
parts = []
new_single = single.copy()
for idx in range(len(values[use_name])):
new_vals = {n: v[idx] for n, v in values.items()}
new_single.mapping.update(new_vals)
parts.append(
new_template._fill_known_helper(
new_single, new_multi, ignore_type=ignore_type
)
)
return MyDataArray.concat(parts, index)
return MyDataArray(np.array(new_template), [])
[docs]
def fill_single_placeholders(
self, placeholders: Placeholders, ignore_type=False
) -> "TemplateParts":
"""
Fill in placeholders with singular values.
Assumes that all placeholders are in fact singular.
"""
res = [
p.fill_single_placeholders(placeholders, ignore_type=ignore_type)
for p in self.parts
]
return TemplateParts(list(chain(*res)))
[docs]
def remove_optionals(self, optionals=None) -> "TemplateParts":
"""
Remove any optionals containing the provided placeholders.
By default all optionals are removed.
"""
return TemplateParts(
[p for p in self.parts if not p.contains_optionals(optionals)]
)
[docs]
def all_matches(
self,
) -> List[Dict[str, Any]]:
"""Find all potential matches to existing templates.
Returns a list with the possible combination of values for the placeholders.
"""
required = self.required_placeholders()
optional = self.optional_placeholders()
matches = []
already_globbed = {}
for defined_optionals in [
c for n in range(len(optional) + 1) for c in combinations(optional, n)
]:
glob_placeholders = Placeholders(
**{req: "*" for req in required},
**{opt: "*" for opt in defined_optionals},
)
new_glob = str(
self.fill_single_placeholders(
glob_placeholders, ignore_type=True
).remove_optionals()
)
while "**" in new_glob:
new_glob = new_glob.replace("**", "*")
if new_glob not in already_globbed:
already_globbed[new_glob] = glob(new_glob)
res = []
vars = required.union(defined_optionals)
for p in self.parts:
res.extend(p.for_defined(vars))
parser = TemplateParts(res).get_parser()
for fn in already_globbed[new_glob]:
try:
placeholders = parser(fn)
except ValueError:
continue
for var_name in optional:
if var_name not in placeholders:
placeholders[var_name] = None
matches.append(placeholders)
return matches
[docs]
def resolve(self, placeholders, ignore_type=False) -> MyDataArray:
"""
Resolve the template given a set of placeholders.
:param placeholders: mapping of placeholder names to values
:param ignore_type: if True, ignore the type formatting when
filling in placeholders
:return: cleaned string
"""
return self.fill_known(placeholders, ignore_type=ignore_type).map(
lambda t: t.remove_optionals()
)
[docs]
def optional_subsets(
self,
) -> Iterator["TemplateParts"]:
"""Yield template sub-sets with every combination optional placeholders."""
optionals = self.optional_placeholders()
for n_optional in range(len(optionals) + 1):
for exclude_optional in itertools.combinations(optionals, n_optional):
yield self.remove_optionals(exclude_optional)
[docs]
def extract_placeholders(self, filename, known_vars=None):
"""
Extract the placeholder values from the filename.
:param filename: filename
:param known_vars: already known placeholders
:return: dictionary from placeholder names to string representations
(unused placeholders set to None)
"""
if known_vars is not None:
template = self.fill_known(known_vars)
else:
template = self
while "//" in filename:
filename = filename.replace("//", "/")
required = template.required_placeholders()
optional = template.optional_placeholders()
results = []
for to_fill in template.optional_subsets():
sub_re = str(
to_fill.fill_known(
{var: r"(\S+)" for var in required.union(optional)},
)
)
while "//" in sub_re:
sub_re = sub_re.replace("//", "/")
sub_re = sub_re.replace(".", r"\.")
match = re.match(sub_re, filename)
if match is None:
continue
extracted_value = {}
ordered_vars = to_fill.ordered_placeholders()
assert len(ordered_vars) == len(match.groups())
failed = False
for var, value in zip(ordered_vars, match.groups()):
if var in extracted_value:
if value != extracted_value[var]:
failed = True
break
else:
extracted_value[var] = value
if failed or any("/" in value for value in extracted_value.values()):
continue
for name in template.optional_placeholders():
if name not in extracted_value:
extracted_value[name] = None
if known_vars is not None:
extracted_value.update(known_vars)
results.append(extracted_value)
if len(results) == 0:
raise ValueError("{} did not match {}".format(filename, template))
def score(placeholders):
"""
Assign score to possible reconstructions of the placeholder values.
The highest score is given to the set of placeholders that:
1. has used the largest amount of optional placeholders
2. has the shortest text within the placeholders (only used if equal at 1
"""
number_used = len([v for v in placeholders.values() if v is not None])
length_hint = sum([len(v) for v in placeholders.values() if v is not None])
return number_used * 1000 - length_hint
best = max(results, key=score)
for var in results:
if best != var and score(best) == score(var):
raise KeyError(
"Multiple equivalent ways found to parse {} using {}".format(
filename, template
)
)
return best
[docs]
def get_parser(self):
"""Create function that will parse a filename based on this template."""
if any(isinstance(p, OptionalPart) for p in self.parts):
raise ValueError(
"Can not parse filename when there are optional parts in the template"
)
mapping = {
old_key: "".join(new_key)
for old_key, new_key in zip(
self.required_placeholders(),
itertools.product(*[string.ascii_letters] * 3),
)
}
reverse = {new_key: old_key for old_key, new_key in mapping.items()}
cleaned = TemplateParts(
[
Required(mapping[p.var_name], p.var_formatting)
if isinstance(p, Required)
else p
for p in self.parts
]
)
parser = compile(str(cleaned), case_sensitive=True)
def parse_filename(filename):
"""Parse filename based on template."""
result = parser.parse(filename)
if result is None:
raise ValueError(
f"template string ({str(self)}) does not mach filename ({filename})"
)
named = result.named
if any(isinstance(value, str) and "/" in value for value in named.values()):
raise ValueError("Placeholder can not span directories")
return {reverse[key]: value for key, value in named.items()}
return parse_filename
[docs]
def remove_precursors(self, placeholders=None):
"""Replace keys to those existing in the placeholders.
If no placeholders provided all precursors are removed.
"""
return TemplateParts([p.remove_precursors(placeholders) for p in self.parts])
def __eq__(self, other):
"""Check whether other template matches this one."""
if not isinstance(other, TemplateParts):
return NotImplemented
return (len(self.parts) == len(other.parts)) and all(
p1 == p2 for p1, p2 in zip(self.parts, other.parts)
)