import logging
from collections.abc import Iterable
from dataclasses import dataclass
from typing import Generic, Self
from ... import logging as _logging
from .. import Cardinality, DirectionalMapping
from ..exceptions import AmbiguousScoreError
from ..types import CandidateType, ValueType
from ._score_matrix import ScoreMatrix
inf = float("inf")
[docs]
@dataclass(frozen=True)
class Record(Generic[ValueType, CandidateType]):
"""Data concerning a match."""
value: ValueType
"""A hashable :class:`value <.ValueType>`."""
candidate: CandidateType
"""A hashable :class:`candidate <.CandidateType>`."""
score: float
"""Likeness score computed by some scoring function."""
def __lt__(self, other: Self) -> bool:
return self.score < other.score
def __str__(self) -> str:
return f"{self.value!r} -> '{self.candidate}'; score={self.score:.3f}"
[docs]
@dataclass(frozen=True)
class Reject(Generic[ValueType, CandidateType]):
"""Data concerning the rejection of a match."""
record: Record[ValueType, CandidateType]
"""A :class:`Record` to describe."""
superseding_value: Record[ValueType, CandidateType] | None = None
"""A :class:`Record` that prevents matching of the current value."""
superseding_candidate: Record[ValueType, CandidateType] | None = None
"""A :class:`Record` that prevents matching of the current candidate."""
[docs]
def explain(self, min_score: float, full: bool = False) -> str:
"""Create a string which explains the rejection.
Args:
min_score: Minimum score to accept a match.
full: If ``True`` show full information about superseding matches.
Returns:
An explanatory string.
"""
if self.record.score == -inf:
if self.superseding_value and self.superseding_value.score == inf:
extra = f": {self.superseding_value}" if full else ""
why = f" (superseded by short-circuit or override{extra})"
elif self.superseding_candidate and self.superseding_candidate.score == inf:
extra = f": {self.superseding_candidate}" if full else ""
why = f" (superseded by short-circuit or override{extra}"
else:
why = " (filtered)"
elif self.record.score < min_score:
why = f" < {min_score} (below threshold)"
else:
ands = []
if self.superseding_value:
extra = f": {self.superseding_value}" if full else ""
ands.append(f"value={self.superseding_value.value!r}{extra}")
if self.superseding_candidate:
extra = f": {self.superseding_candidate}" if full else ""
ands.append(f"candidate={self.superseding_candidate.candidate!r}{extra}")
why = f" (superseded on {' and '.join(ands)})"
return f"{self.record}{why}."
[docs]
class ScoreHelper(Generic[ValueType, CandidateType]):
"""High-level selection operations.
Args:
matrix: A :class:`.ScoreMatrix` instance.
min_score: Minimum score to make a `value -> candidate` match.
logger: Explicit ``Logger`` instance to use.
"""
def __init__(
self,
matrix: ScoreMatrix[ValueType, CandidateType],
min_score: float,
logger: logging.Logger | None = None,
) -> None:
self._min_score = min_score
self._matrix = matrix
self._logger = logging.getLogger(__name__) if logger is None else logger
@property
def logger(self) -> logging.Logger:
"""Return the ``Logger`` that is used by this instance."""
return self._logger
[docs]
def to_directional_mapping(
self,
cardinality: Cardinality | None = None,
) -> DirectionalMapping[ValueType, CandidateType]:
"""Create a ``DirectionalMapping`` with a given target ``Cardinality``.
Args:
cardinality: Explicit cardinality to set, see :attr:`~.DirectionalMapping.cardinality`. If ``None``, use the
actual cardinality when selecting all matches with scores :attr:`at or above <above>` the minimum.
Returns:
A ``DirectionalMapping``.
"""
matches: list[Record[ValueType, CandidateType]]
rejections: list[Reject[ValueType, CandidateType]]
matches, rejections = self._match(cardinality)
min_score = self._min_score
logger = self.logger
logging_disabled = not (_logging.ENABLE_VERBOSE_LOGGING and logger.isEnabledFor(logging.DEBUG))
left_to_right: dict[ValueType, list[CandidateType]] = {}
for record in list(matches):
left_to_right.setdefault(record.value, []).append(record.candidate)
if logging_disabled:
continue
supersedes: list[Reject[ValueType, CandidateType]] = []
if rejections:
supersedes.extend(
rr
for rr in rejections
if record in (rr.superseding_value, rr.superseding_candidate) and rr.record.score >= min_score
)
reason = "(short-circuit or override)" if record.score == inf else f">= {min_score}"
msg = f"Accepted: {record} {reason}."
if supersedes:
s = "\n".join(" " + rr.explain(min_score) for rr in supersedes)
msg += f" This match supersedes {len(supersedes)} other matches:\n{s}"
logger.debug(msg)
values = set(self._matrix.values)
if rejections and not logging_disabled:
unmapped_values = values.difference(left_to_right)
for value in unmapped_values:
value_reasons = "\n".join(
" " + reject.explain(min_score, full=True)
for reject in rejections
if reject.record.value == value
)
logger.debug(f"Could not map {value=}. Rejected matches:\n{value_reasons}")
return DirectionalMapping(
cardinality=cardinality,
left_to_right={value: tuple(left_to_right[value]) for value in values.intersection(left_to_right)},
_verify=False,
)
def _match(
self,
cardinality: Cardinality | None = None,
) -> tuple[list[Record[ValueType, CandidateType]], list[Reject[ValueType, CandidateType]]]:
rejections: list[Reject[ValueType, CandidateType]] | None = None
records: list[Record[ValueType, CandidateType]] = self.above()
if _logging.ENABLE_VERBOSE_LOGGING and self.logger.isEnabledFor(logging.DEBUG):
rejections = []
records.extend(self.below())
records.sort(reverse=True)
if cardinality is Cardinality.OneToOne:
matches = self._select_one_to_one(records, rejections)
elif cardinality is Cardinality.OneToMany:
matches = self._select_one_to_many(records, rejections)
elif cardinality is Cardinality.ManyToOne:
matches = self._select_many_to_one(records, rejections)
else:
matches = self._select_many_to_many(records, rejections)
return list(matches), rejections or []
[docs]
def above(self) -> list[Record[ValueType, CandidateType]]:
"""Records with scores `above` the threshold."""
d = self._matrix.to_dict()
d = {k: v for k, v in d.items() if v >= self._min_score}
return self._from_dict(d)
[docs]
def below(self) -> list[Record[ValueType, CandidateType]]:
"""Records with scores `below` the threshold."""
d = self._matrix.to_dict()
d = {k: v for k, v in d.items() if v < self._min_score}
return self._from_dict(d)
@classmethod
def _from_dict(cls, d: dict[tuple[ValueType, CandidateType], float]) -> list[Record[ValueType, CandidateType]]:
return [Record(value, candidate, score) for (value, candidate), score in d.items()]
def _raise_if_ambiguous(
self,
record: Record, # type: ignore[type-arg]
matches: dict, # type: ignore[type-arg]
kind: str,
cardinality: Cardinality,
) -> None:
if record.score == inf:
# Overrides are allowed to be infinite; the first one will be chosen. It's up to the user to manage them.
return
key = record.value if kind == "value" else record.candidate
if key not in matches:
return
old_match = matches[key]
if old_match.score == inf:
# Overrides are allowed to be infinite; the first one will be chosen. It's up to the user to manage them.
return
if record.score == old_match.score:
raise AmbiguousScoreError(
kind=kind,
key=key,
match0=record,
match1=old_match,
cardinality=cardinality.name,
scores=self._matrix.to_string(),
)
def _select_one_to_one(
self,
records: Iterable[Record[ValueType, CandidateType]],
rejections: list[Reject[ValueType, CandidateType]] | None = None,
) -> Iterable[Record[ValueType, CandidateType]]:
mvs: dict[ValueType, Record[ValueType, CandidateType]] = {}
mcs: dict[CandidateType, Record[ValueType, CandidateType]] = {}
for record in records:
self._raise_if_ambiguous(record, mcs, "candidate", Cardinality.OneToOne)
self._raise_if_ambiguous(record, mvs, "value", Cardinality.OneToOne)
if record.score < self._min_score or record.value in mvs or record.candidate in mcs:
if rejections is not None:
rejections.append(
Reject(
record,
superseding_value=mvs.get(record.value),
superseding_candidate=mcs.get(record.candidate),
)
)
continue
mvs[record.value] = record
mcs[record.candidate] = record
yield record
def _select_one_to_many(
self,
records: Iterable[Record[ValueType, CandidateType]],
rejections: list[Reject[ValueType, CandidateType]] | None = None,
) -> Iterable[Record[ValueType, CandidateType]]:
mcs: dict[CandidateType, Record[ValueType, CandidateType]] = {}
for record in records:
self._raise_if_ambiguous(record, mcs, "candidate", Cardinality.OneToMany)
if record.score < self._min_score or record.candidate in mcs:
if rejections is not None:
rejections.append(Reject(record, superseding_candidate=mcs.get(record.candidate)))
continue
mcs[record.candidate] = record
yield record
def _select_many_to_one(
self,
records: Iterable[Record[ValueType, CandidateType]],
rejections: list[Reject[ValueType, CandidateType]] | None = None,
) -> Iterable[Record[ValueType, CandidateType]]:
mvs: dict[ValueType, Record[ValueType, CandidateType]] = {}
for record in records:
self._raise_if_ambiguous(record, mvs, "value", cardinality=Cardinality.ManyToOne)
if record.score < self._min_score or record.value in mvs:
if rejections is not None:
rejections.append(Reject(record, superseding_value=mvs.get(record.value)))
continue
mvs[record.value] = record
yield record
def _select_many_to_many(
self,
records: Iterable[Record[ValueType, CandidateType]],
rejections: list[Reject[ValueType, CandidateType]] | None = None,
) -> Iterable[Record[ValueType, CandidateType]]:
for record in records:
if record.score < self._min_score:
if rejections is not None:
rejections.append(Reject(record))
continue
yield record