import logging
import threading
from abc import abstractmethod
from functools import reduce
from typing import Set, Union
import numpy as np
import pandas as pd
from solrat.engine.functions.decorators import VERBOSE
_max_looper_id = threading.local()
def _get_max_looper_id() -> int:
return getattr(_max_looper_id, "level", 0)
def _set_max_looper_id(value: int) -> None:
_max_looper_id.level = value
[docs]
def get_unique_name() -> str:
new_id = _get_max_looper_id()
name = f"__looper_unique_{new_id}__"
_set_max_looper_id(new_id + 1)
return name
[docs]
class Looper:
"""
Base Looper class
"""
def __init__(self):
self.name = None
self.is_name_user_set = None
[docs]
def set_name(self, name: str):
assert self.name is None
self.name = name
self.is_name_user_set = True
[docs]
def get_name(self) -> str:
if self.name is None:
self.name = get_unique_name()
self.is_name_user_set = False
return self.name
[docs]
def add_to_dependencies(self, dependencies: Set[str]):
msg = "Looper name should be set or determined before adding to dependencies. "
msg += "Check that all loopers are passed to Frame."
assert self.is_name_user_set is not None, msg
if self.is_name_user_set:
dependencies.add(self.get_name())
[docs]
def is_name_set(self) -> bool: # Todo remove
return self.name is not None
[docs]
def get_directly_dependent_columns(self) -> Set[str]:
return set()
[docs]
@abstractmethod
def fill_frame(self, frame: pd.DataFrame, explode: bool = True) -> pd.DataFrame:
pass
def __sub__(self, other):
if isinstance(other, (int, float)):
other = Value(other)
assert isinstance(other, Looper)
return Difference(self, other)
[docs]
class DummyOrAlreadyMerged(Looper):
"""
Looper class for the pre-merged variables.
"""
def __init__(self, dependency: Union[Looper, None] = None):
super().__init__()
self.dependency: Union[Looper, None] = dependency
[docs]
def add_to_dependencies(self, dependencies: Set[str]):
if self.is_name_user_set:
dependencies.add(self.get_name())
return
if self.dependency is not None:
self.dependency.add_to_dependencies(dependencies)
[docs]
def get_directly_dependent_columns(self) -> Set[str]:
cols = set()
if self.dependency is not None:
self.dependency.add_to_dependencies(cols)
return cols
[docs]
def fill_frame(self, frame: pd.DataFrame, explode=True) -> pd.DataFrame:
raise ValueError()
[docs]
class Value(Looper):
"""
Looper class for a single value.
"""
def __init__(self, value: Union[int, float, str]):
super().__init__()
self.value = value
[docs]
def add_to_dependencies(self, dependencies: Set[str]):
pass
[docs]
def fill_frame(self, frame: pd.DataFrame, explode=True) -> pd.DataFrame:
name = self.get_name()
frame[name] = self.value
if isinstance(self.value, (int, float)):
frame = frame.astype({name: float})
elif isinstance(self.value, str):
frame = frame.astype({name: str})
return frame
[docs]
def wrap_in_value_if_needed(x: Union[Looper, int, float]) -> Looper:
"""
Helper function to wrap int or float into :any:`Value` looper.
"""
if isinstance(x, Looper):
return x
if isinstance(x, (int, float)):
return Value(x)
raise ValueError(f"Cannot wrap type {type(x)} in Value looper.")
[docs]
class FromTo(Looper):
"""
Inclusive range looper
"""
def __init__(self, start: Union[Looper, int, float], end: Union[Looper, int, float]):
super().__init__()
self.start: Looper = wrap_in_value_if_needed(start)
self.end: Looper = wrap_in_value_if_needed(end)
[docs]
def add_to_dependencies(self, dependencies: Set[str]):
if self.is_name_user_set:
dependencies.add(self.get_name())
return
self.start.add_to_dependencies(dependencies)
self.end.add_to_dependencies(dependencies)
[docs]
def get_directly_dependent_columns(self) -> Set[str]:
cols = set()
self.start.add_to_dependencies(cols)
self.end.add_to_dependencies(cols)
return cols
[docs]
def fill_frame(self, frame: pd.DataFrame, explode=True) -> pd.DataFrame:
name = self.get_name()
if not self.start.is_name_user_set:
frame = self.start.fill_frame(frame)
if not self.end.is_name_user_set:
frame = self.end.fill_frame(frame)
start_name = self.start.get_name()
end_name = self.end.get_name()
frame[name] = frame.apply(lambda row: list(np.arange(row[start_name], row[end_name] + 1)), axis=1)
if not self.start.is_name_user_set:
frame = frame.drop(columns=[start_name])
if not self.end.is_name_user_set:
frame = frame.drop(columns=[end_name])
if explode:
frame = frame.explode(name)
frame = frame.astype({name: float})
return frame
[docs]
class Projection(Looper):
r"""
Projection looper: from -X to X (inclusive).
"""
def __init__(self, vector: Union[Looper, int, float]):
super().__init__()
self.vector: Looper = wrap_in_value_if_needed(vector)
[docs]
def add_to_dependencies(self, dependencies: Set[str]):
if self.is_name_user_set:
dependencies.add(self.get_name())
return
self.vector.add_to_dependencies(dependencies)
[docs]
def get_directly_dependent_columns(self) -> Set[str]:
cols = set()
self.vector.add_to_dependencies(cols)
return cols
[docs]
def fill_frame(self, frame: pd.DataFrame, explode=True) -> pd.DataFrame:
name = self.get_name()
if not self.vector.is_name_user_set:
frame = self.vector.fill_frame(frame)
vector_name = self.vector.get_name()
frame[name] = frame.apply(lambda row: list(np.arange(-row[vector_name], row[vector_name] + 1)), axis=1)
if not self.vector.is_name_user_set:
frame = frame.drop(columns=[vector_name])
if explode:
frame = frame.explode(name)
frame = frame.astype({name: float})
return frame
[docs]
class Triangular(Looper):
r"""
Triangular looper: from :math:`|A-B|` to :math:`A+B`
"""
def __init__(self, vector1: Union[Looper, int, float], vector2: Union[Looper, int, float]):
super().__init__()
self.vector1: Looper = wrap_in_value_if_needed(vector1)
self.vector2: Looper = wrap_in_value_if_needed(vector2)
[docs]
def add_to_dependencies(self, dependencies: Set[str]):
if self.is_name_user_set:
dependencies.add(self.get_name())
return
self.vector1.add_to_dependencies(dependencies)
self.vector2.add_to_dependencies(dependencies)
[docs]
def get_directly_dependent_columns(self) -> Set[str]:
cols = set()
self.vector1.add_to_dependencies(cols)
self.vector2.add_to_dependencies(cols)
return cols
[docs]
def fill_frame(self, frame: pd.DataFrame, explode=True) -> pd.DataFrame:
name = self.get_name()
if not self.vector1.is_name_user_set:
frame = self.vector1.fill_frame(frame)
if not self.vector2.is_name_user_set:
frame = self.vector2.fill_frame(frame)
vector1_name = self.vector1.get_name()
vector2_name = self.vector2.get_name()
frame[name] = frame.apply(
lambda row: list(
np.arange(np.abs(row[vector1_name] - row[vector2_name]), row[vector1_name] + row[vector2_name] + 1)
),
axis=1,
)
if not self.vector1.is_name_user_set:
frame = frame.drop(columns=[vector1_name])
if not self.vector2.is_name_user_set:
frame = frame.drop(columns=[vector2_name])
if explode:
frame = frame.explode(name)
frame = frame.astype({name: float})
return frame
[docs]
class Difference(Looper):
"""
Difference looper: A - B
"""
def __init__(self, left: Looper, right: Looper):
super().__init__()
self.left = wrap_in_value_if_needed(left)
self.right = wrap_in_value_if_needed(right)
[docs]
def add_to_dependencies(self, dependencies: Set[str]):
if self.is_name_user_set:
dependencies.add(self.get_name())
return
self.left.add_to_dependencies(dependencies)
self.right.add_to_dependencies(dependencies)
[docs]
def get_directly_dependent_columns(self) -> Set[str]:
cols = set()
self.left.add_to_dependencies(cols)
self.right.add_to_dependencies(cols)
return cols
[docs]
def fill_frame(self, frame: pd.DataFrame, explode=True) -> pd.DataFrame:
name = self.get_name()
if not self.left.is_name_user_set:
frame = self.left.fill_frame(frame)
if not self.right.is_name_user_set:
frame = self.right.fill_frame(frame)
frame[name] = frame[self.left.get_name()] - frame[self.right.get_name()]
if not self.left.is_name_user_set:
frame = frame.drop(columns=[self.left.get_name()])
if not self.right.is_name_user_set:
frame = frame.drop(columns=[self.right.get_name()])
return frame
[docs]
class Intersection(Looper):
"""
Intersection looper: only common values among multiple loopers.
"""
def __init__(self, *args: Looper):
super().__init__()
self.loopers = [wrap_in_value_if_needed(arg) for arg in args]
assert all([not isinstance(looper, Intersection) for looper in self.loopers])
[docs]
def get_directly_dependent_columns(self) -> set:
cols = set()
for looper in self.loopers:
looper.add_to_dependencies(cols)
return cols
[docs]
def fill_frame(self, frame: pd.DataFrame, explode=True) -> pd.DataFrame:
assert explode, "Intersection looper requires explode=True"
name = self.get_name()
for looper in self.loopers:
if not looper.is_name_user_set:
frame = looper.fill_frame(frame, explode=False)
cols = [looper.get_name() for looper in self.loopers]
frame[name] = frame.apply(lambda row: list(reduce(np.intersect1d, [row[col] for col in cols])), axis=1)
for looper in self.loopers:
if not looper.is_name_user_set:
frame = frame.drop(columns=[looper.get_name()])
frame = frame.explode(name)
if frame[name].isna().any():
msg = (
f"NaN values found in intersection looper {name}. "
f"This typically means that not all triangular conditions were accounted for in SumLimits. "
f"This warning is expected for current implementation of LL04 multi-term atom."
)
logging.log(VERBOSE, msg)
frame = frame.dropna(subset=[name])
frame = frame.astype({name: float})
return frame
[docs]
class Constraint(DummyOrAlreadyMerged):
"""
Constrains the values of some variable to a list of values.
This is meant to be an artificial constraint, not triangular/etc.
"""
def __init__(self):
super().__init__()
[docs]
class ApplyConstraint(Looper):
"""
ApplyConstraint looper: for applying the constraint to another looper.
"""
def __init__(self, looper: Looper, constraint: Constraint):
super().__init__()
# self.loopers = [wrap_in_value_if_needed(arg) for arg in args]
self.looper = wrap_in_value_if_needed(looper)
self.constraint = constraint
[docs]
def get_directly_dependent_columns(self) -> set:
cols = set()
self.looper.add_to_dependencies(cols)
self.constraint.add_to_dependencies(cols)
return cols
[docs]
def fill_frame(self, frame: pd.DataFrame, explode=True) -> pd.DataFrame:
assert explode, "Intersection looper requires explode=True"
assert not self.looper.is_name_user_set, "Cannot constrain a named looper"
frame = self.looper.fill_frame(frame).reset_index(drop=True)
looper_name = self.looper.get_name()
constraint_name = self.constraint.get_name()
mask = [False] * len(frame)
# Slow but should work:
for i in range(len(frame)):
if frame.at[i, looper_name] in frame.at[i, constraint_name] or frame.at[i, constraint_name] is None:
mask[i] = True
frame = frame[mask].reset_index(drop=True)
frame = frame.rename(columns={looper_name: self.get_name()})
return frame