Source code for id_translation.mapping._mapper

import logging
import warnings
from collections.abc import Iterable
from time import perf_counter
from typing import Any, Generic, Self

import numpy as np
import pandas as pd
from rics.collections.dicts import InheritedKeysDict
from rics.misc import get_by_full_name, tname
from rics.strings import format_perf_counter as fmt_perf
from rics.types import LiteralHelper

from . import exceptions
from . import filter_functions as mf
from . import score_functions as sf
from ._cardinality import Cardinality
from ._directional_mapping import DirectionalMapping
from .exceptions import (
    MappingError,
    UnmappedValuesError,
    UnmappedValuesWarning,
    UserMappingError,
    UserMappingWarning,
)
from .types import (
    CandidateType,
    CardinalityType,
    ContextType,
    FilterFunction,
    OnUnknownUserOverride,
    OnUnmapped,
    ScoreFunction,
    UserOverrideFunction,
    ValueType,
)

with warnings.catch_warnings():
    warnings.simplefilter("ignore", category=UserWarning)
    from .support import MatchScores, enable_verbose_debug_messages

FORCE_VERBOSE: bool = False  # Magic variable used by the verbose context


[docs] class Mapper(Generic[ValueType, CandidateType, ContextType]): """Optimal value-candidate matching. For an introduction to mapping, see the :ref:`mapping-primer` page. Args: score_function: A callable which accepts a value `k` and an ordered collection of candidates `c`, returning a score ``s_i`` for each candidate `c_i` in `c`. Default: ``s_i = float(k == c_i)``. Higher=better match. score_function_kwargs: Keyword arguments for `score_function`. filter_functions: Function-kwargs pairs of filters to apply before scoring. min_score: Minimum score `s_i`, as given by ``score(k, c_i)``, to consider `k` a match for `c_i`. overrides: If a dict, assumed to be 1:1 mappings (`value` to `candidate`) which override the scoring logic. If :class:`rics.collections.dicts.InheritedKeysDict`, the context passed to :meth:`apply` is used to retrieve specific overrides. on_unmapped: Action to take if mapping fails for any values. on_unknown_user_override: Action to take if an :attr:`~id_translation.mapping.types.UserOverrideFunction` returns an unknown candidate. Unknown candidates, i.e. candidates not in the input `candidates` collection, will not be used unless `'allow'` is chosen. cardinality: Desired cardinality for mapped values. Derive for each matching if ``None``. verbose_logging: If ``True``, enable verbose logging for the :meth:`apply` function. Has no effect when the log level is above ``logging.DEBUG``. """ def __init__( self, score_function: str | ScoreFunction[ValueType, CandidateType, ContextType] = "disabled", score_function_kwargs: dict[str, Any] | None = None, filter_functions: Iterable[ tuple[str | FilterFunction[ValueType, CandidateType, ContextType], dict[str, Any]] ] = (), min_score: float = 0.90, overrides: dict[ValueType, CandidateType] | InheritedKeysDict[ContextType, ValueType, CandidateType] | None = None, on_unmapped: OnUnmapped = "ignore", on_unknown_user_override: OnUnknownUserOverride = "raise", cardinality: CardinalityType | None = Cardinality.ManyToOne, verbose_logging: bool = False, ) -> None: if min_score <= 0 or np.isinf(min_score): raise ValueError(f"Got {min_score=}. The score limit should be a finite positive value.") self._score = get_by_full_name(score_function, sf) if isinstance(score_function, str) else score_function self._score_kwargs = score_function_kwargs or {} self._min_score = min_score self._overrides: InheritedKeysDict[ContextType, ValueType, CandidateType] | dict[ValueType, CandidateType] = ( overrides if isinstance(overrides, InheritedKeysDict) else (overrides or {}) ) self._on_unmapped = OU_HELPER.check(on_unmapped) self._on_unknown_user_override = OUUO_HELPER.check(on_unknown_user_override) self._cardinality = None if cardinality is None else Cardinality.parse(cardinality, strict=True) self._filters: list[tuple[FilterFunction[ValueType, CandidateType, ContextType], dict[str, Any]]] = [ ((get_by_full_name(func, mf) if isinstance(func, str) else func), kwargs) for func, kwargs in filter_functions ] self._verbose = verbose_logging self._logger = logging.getLogger(__package__).getChild("Mapper") # This will almost always be overwritten
[docs] def apply( self, values: Iterable[ValueType], candidates: Iterable[CandidateType], context: ContextType = None, override_function: UserOverrideFunction[ValueType, CandidateType, ContextType] = None, **kwargs: Any, ) -> DirectionalMapping[ValueType, CandidateType]: """Map values to candidates. Args: values: Iterable of elements to match to candidates. candidates: Iterable of candidates to match with `value`. Duplicate elements will be discarded. context: Context in which mapping is being done. override_function: A callable that takes inputs ``(value, candidates, context)`` that returns either ``None`` (let the regular mapping logic decide) or one of the `candidates`. How non-candidates returned is handled is determined by the :attr:`on_unknown_user_override` property. **kwargs: Runtime keyword arguments for score and filter functions. May be used to add information which is not known when the ``Mapper`` is initialized. Returns: A :class:`.DirectionalMapping` on the form ``{value: [matched_candidates..]}``. May be turned into a plain dict ``{value: candidate}`` by using the :meth:`.DirectionalMapping.flatten` function (only if :attr:`.DirectionalMapping.cardinality` is of type :attr:`.Cardinality.one_right`). Raises: MappingError: If any values failed to match and ``on_unmapped='raise'``. BadFilterError: If a filter returns candidates that are not a subset of the original candidates. UserMappingError: If `override_function` returns an unknown candidate and ``on_unknown_user_override != 'allow'`` MappingError: If passing ``context=None`` (the default) when using context-sensitive overrides (type :class:`rics.collections.dicts.InheritedKeysDict`). """ start = perf_counter() candidates = list(candidates) values = list(values) if not (values and candidates): self.logger.debug("Aborting since values=%r and candidates=%r in context=%r.", values, candidates, context) return DirectionalMapping(left_to_right={}, _verify=False, cardinality=self.cardinality) if self.verbose_logging: with enable_verbose_debug_messages(): scores = self.compute_scores(values, candidates, context, override_function, **kwargs) else: scores = self.compute_scores(values, candidates, context, override_function, **kwargs) # pragma: no cover dm: DirectionalMapping[ValueType, CandidateType] = self.to_directional_mapping(scores) unmapped = set(scores.index[~np.isinf(scores).all(axis=1)]).difference(dm.left) if unmapped: extra = f" in {context=}" if context else "" candidates = set(scores) self._report_unmapped(f"Could not map {unmapped}{extra} to any of {candidates=}.") verbose_logger = self._get_verbose_logger() if verbose_logger.isEnabledFor(logging.DEBUG): cardinality = "automatic" if self.cardinality is None else self.cardinality.name l2r = dm.left_to_right matches = " Matches:\n" + "\n".join( f" {v!r} -> {repr(l2r[v]) if v in l2r else '<no matches>'}" for v in values ) verbose_logger.debug( f"Mapping with {cardinality=} completed for {values}x{candidates} in {fmt_perf(start)}." f"{matches}\nMatched {len(dm.left)}/{len(values)} values with {len(dm.right)} different candidates." ) return dm
def _report_unmapped(self, msg: str) -> None: if self.on_unmapped == "raise": msg += "\nHint: Set on_unmapped='warn' or on_unmapped='ignore' to allow unmapped values." self.logger.error(msg) raise UnmappedValuesError(msg) elif self.on_unmapped == "warn": self.logger.warning(msg) msg += ( "\nHint: Set " "on_unmapped='ignore' to hide this warning, or " f"on_unmapped='raise' to raise an {UnmappedValuesError.__name__}." ) warnings.warn(msg, UnmappedValuesWarning, stacklevel=3) else: self._get_verbose_logger().debug(msg)
[docs] def compute_scores( self, values: Iterable[ValueType], candidates: Iterable[CandidateType], context: ContextType = None, override_function: UserOverrideFunction[ValueType, CandidateType, ContextType] = None, **kwargs: Any, ) -> pd.DataFrame: """Compute likeness scores. Args: values: Iterable of elements to match to candidates. candidates: Iterable of candidates to match with `value`. Duplicate elements will be discarded. context: Context in which mapping is being done. override_function: A callable that takes inputs ``(value, candidates, context)`` that returns either ``None`` (let the regular mapping logic decide) or one of the `candidates`. How non-candidates returned is handled is determined by the :attr:`on_unknown_user_override` property. **kwargs: Runtime keyword arguments for score and filter functions. May be used to add information which is not known when the ``Mapper`` is initialized. Returns: A ``DataFrame`` of value-candidate match scores, with ``DataFrame.index=values`` and ``DataFrame.columns=candidates``. Raises: BadFilterError: If a filter returns candidates that are not a subset of the original candidates. UserMappingError: If `override_function` returns an unknown candidate and ``on_unknown_user_override != 'allow'`` """ start = perf_counter() candidates = list(candidates) values = list(values) scores = pd.DataFrame( data=-np.inf, columns=pd.Index(candidates, name="candidates").drop_duplicates(), index=pd.Index(values, name="values").drop_duplicates(), dtype=float, ) extra = f" in {context=}" if context else "" if scores.empty: if self.logger.isEnabledFor(logging.DEBUG): end = "" if (values or candidates) else ", but got neither" self.logger.warning( f"Abort mapping{extra} of {values}x{candidates}. Both values and candidates must be given{end}." ) return scores if self.logger.isEnabledFor(logging.DEBUG): self.logger.debug(f"Begin computing match scores{extra} for {values}x{candidates} using {self._score}.") unmapped_values = self._handle_overrides(scores, context, override_function) verbose_logger = self._get_verbose_logger() for value in unmapped_values: filtered_candidates = self._apply_filters(value, candidates, context, kwargs) if not filtered_candidates: continue scores_for_value: Iterable[float] if value in filtered_candidates: scores_for_value = [(np.inf if value == c else -np.inf) for c in filtered_candidates] # Identity match else: if verbose_logger.isEnabledFor(logging.DEBUG): verbose_logger.debug(f"Compute match scores for {value=}.") scores_for_value = self._score(value, filtered_candidates, context, **self._score_kwargs, **kwargs) for score, candidate in zip(scores_for_value, filtered_candidates, strict=True): scores.loc[value, candidate] = score if verbose_logger.isEnabledFor(logging.DEBUG): verbose_logger.debug( f"Computed {len(scores.index)}x{len(scores.columns)} " f"match scores in {fmt_perf(start)}:\n{scores.to_string()}" ) return scores
[docs] def to_directional_mapping( self, scores: pd.DataFrame, ) -> DirectionalMapping[ValueType, CandidateType]: """Create a ``DirectionalMapping`` from match scores. Args: scores: A score matrix, where ``scores.index`` are values and ``score.columns`` are treated as the candidates. Returns: A ``DirectionalMapping``. See Also: :meth:`.MatchScores.to_directional_mapping` """ return MatchScores(scores, self._min_score, self._get_verbose_logger()).to_directional_mapping(self.cardinality)
def _get_verbose_logger(self) -> logging.Logger: logger = self.logger.getChild("verbose") logger.disabled = not (FORCE_VERBOSE or self.verbose_logging) return logger @property def cardinality(self) -> Cardinality | None: """Return upper cardinality bound during mapping.""" return self._cardinality @property def on_unmapped(self) -> OnUnmapped: """Return the action to take if mapping fails for any values.""" return self._on_unmapped @property def on_unknown_user_override(self) -> OnUnknownUserOverride: """Return the action to take if an override function returns an unknown candidate. Returns: Action to take if a user-defined override function returns an unknown candidate. """ return self._on_unknown_user_override @property def verbose_logging(self) -> bool: """Return ``True`` if verbose debug-level messages are enabled.""" return self._verbose @property def logger(self) -> logging.Logger: """Return the ``Logger`` that is used by this instance.""" return self._logger @logger.setter def logger(self, logger: logging.Logger) -> None: self._logger = logger def _handle_overrides( self, scores: pd.DataFrame, context: ContextType | None, override_function: UserOverrideFunction[ValueType, CandidateType, ContextType] | None, ) -> list[ValueType]: applied: dict[ValueType, CandidateType] = {} def apply(v: ValueType, oc: CandidateType) -> None: scores.loc[v, :] = -np.inf scores.loc[v, oc] = np.inf unmapped_values.remove(v) applied[v] = oc unmapped_values = list(scores.index) if override_function: for value, override_candidate in self._get_function_overrides( override_function, scores.index, scores.columns, context ): if self.logger.isEnabledFor(logging.DEBUG): self.logger.debug( f"Using override {value!r} -> {override_candidate!r} returned by {override_function=}." ) apply(value, override_candidate) for value, override_candidate in self._get_static_overrides(unmapped_values, context).items(): apply(value, override_candidate) if self.logger.isEnabledFor(logging.DEBUG) and (self._overrides or override_function is not None): num_overrides = len(self._overrides) + int(override_function is not None) result = f"and found {len(applied)} matches={applied} in" if applied else "but none were a match for" done = "All values mapped by overrides. " if (not unmapped_values and applied) else "" self.logger.debug( f"{done}Applied {num_overrides} overrides, {result} the given values={list(scores.index)}." ) return unmapped_values def _get_static_overrides( self, values: Iterable[ValueType], context: ContextType | None, ) -> dict[ValueType, CandidateType]: if not self._overrides: return {} if isinstance(self._overrides, InheritedKeysDict): if context is None: raise MappingError("Must pass a context in context-sensitive mode.") overrides = self._overrides.get(context, {}) else: overrides = self._overrides return {value: overrides[value] for value in overrides if value in values} def _get_function_overrides( self, func: UserOverrideFunction[ValueType, CandidateType, ContextType], values: Iterable[ValueType], candidates: Iterable[CandidateType], context: ContextType | None, ) -> list[tuple[ValueType, CandidateType]]: candidates = set(candidates) ans = [] for value in values: user_override = func(value, candidates, context) if user_override is None: continue if user_override not in candidates and self.on_unknown_user_override != "keep": msg = ( f"The user-defined override function {func} returned an unknown candidate={user_override!r} for" f" {value=}.\nHint: Set on_unknown_user_override='keep' to use this value anyway." ) if self.on_unknown_user_override == "raise": self.logger.error(msg) raise UserMappingError(msg, value, candidates) elif self.on_unknown_user_override == "warn": self.logger.warning(msg) warnings.warn(msg, UserMappingWarning, stacklevel=2) continue ans.append((value, user_override)) return ans def _apply_filters( self, value: ValueType, candidates: Iterable[CandidateType], context: ContextType | None, kwargs: dict[str, Any], ) -> set[CandidateType]: candidates = list(candidates) filtered_candidates = set(candidates) for filter_function, function_kwargs in self._filters: filtered_candidates = filter_function(value, filtered_candidates, context, **function_kwargs, **kwargs) not_in_original_candidates = filtered_candidates.difference(candidates) if not_in_original_candidates: raise exceptions.BadFilterError( f"Filter {tname(filter_function)}({value}, candidates, **{kwargs}) created new" f"candidates: {not_in_original_candidates}" ) if not filtered_candidates: break if self.verbose_logging and self.logger.isEnabledFor(logging.DEBUG) and len(self._filters): diff = set(candidates).difference(filtered_candidates) removed = f"removing candidates={diff}" if diff else "but did not remove any candidates" done = "All candidates removed by filtering. " if not filtered_candidates else "" self.logger.debug(f"{done}Applied {len(self._filters)} filters for {value=}, {removed}.") return filtered_candidates def __repr__(self) -> str: score = self._score return f"{tname(self)}({score=} >= {self._min_score}, {len(self._filters)} filters)"
[docs] def copy(self, **overrides: Any) -> Self: """Make a copy of this ``Mapper``. Args: overrides: Keyword arguments to use when instantiating the copy. Options that aren't given will be taken from the current instance. See the :class:`Mapper` class documentation for possible choices. Returns: A copy of this ``Mapper`` with `overrides` applied. """ kwargs: dict[str, Any] = { "score_function": self._score, "min_score": self._min_score, "on_unmapped": self.on_unmapped, "on_unknown_user_override": self.on_unknown_user_override, "cardinality": self.cardinality, "verbose_logging": self.verbose_logging, **overrides, } if "score_function_kwargs" not in kwargs: kwargs["score_function_kwargs"] = self._score_kwargs.copy() if "filter_functions" not in kwargs: kwargs["filter_functions"] = [(func, func_kwargs.copy()) for func, func_kwargs in self._filters] if "overrides" not in kwargs: kwargs["overrides"] = self._overrides.copy() cls = type(self) return cls(**kwargs)
def __eq__(self, other: Any) -> bool: if not isinstance(other, Mapper): return False return all( ( self._score == other._score, self._score_kwargs == other._score_kwargs, self._filters == other._filters, self._min_score == other._min_score, self._overrides == other._overrides, self._on_unmapped == other._on_unmapped, self._on_unknown_user_override == other._on_unknown_user_override, self._cardinality == other._cardinality, self._verbose == other._verbose, ) )
OU_HELPER: LiteralHelper[OnUnmapped] = LiteralHelper( OnUnmapped, default_name="on_unmapped", type_name="OnUnmapped", ) OUUO_HELPER: LiteralHelper[OnUnknownUserOverride] = LiteralHelper( OnUnknownUserOverride, default_name="on_unknown_user_override", type_name="OnUnknownUserOverride", )