Source code for id_translation.fetching._abstract_fetcher

import logging
import warnings
from abc import abstractmethod
from collections.abc import Iterable, Sequence
from contextlib import contextmanager
from copy import deepcopy
from time import perf_counter
from typing import Any, Self, final

from rics.collections.dicts import InheritedKeysDict, reverse_dict
from rics.misc import tname
from rics.strings import format_seconds as fmt_sec

from .. import logging as _logging
from ..exceptions import ConnectionStatusError
from ..mapping import HeuristicScore, Mapper
from ..mapping.exceptions import MappingWarning
from ..mapping.score_functions import modified_hamming
from ..offline.types import PlaceholdersTuple, PlaceholderTranslations, SourcePlaceholderTranslations
from ..types import ID, IdType, SourceType
from . import exceptions
from ._cache_access import CacheAccess
from ._fetcher import Fetcher
from .exceptions import CacheAccessNotAvailableError
from .types import FetchInstruction, IdsToFetch


[docs] class AbstractFetcher(Fetcher[SourceType, IdType]): """Base class for retrieving translations from an external source. Args: mapper: A :class:`.Mapper` instance used to adapt placeholder names in sources to wanted names, i.e. the names of the placeholders that are in the translation :class:`.Format` being used. allow_fetch_all: If ``False``, an error will be raised when :meth:`fetch_all` is called. selective_fetch_all: If ``True``, fetch only from those :attr:`~.HasSources.sources` that contain the required :attr:`~.HasSources.placeholders` (after mapping). May reduce the number of sources retrieved. identifiers: A collection of hierarchical identifiers. If given, element zero of the `identifiers` is added to the :attr:`logger` name for the fetcher. optional: If ``True``, this fetcher may be discarded if source/placeholder-enumeration fails in multi-fetcher mode. cache_access: A :class:`.CacheAccess` instance. Defaults to a NOOP-implementation (i.e. always fetch new data). """ def __init__( self, *, mapper: Mapper[str, str, SourceType] | None = None, allow_fetch_all: bool = True, selective_fetch_all: bool = True, identifiers: Sequence[str] | None = None, optional: bool = False, cache_access: CacheAccess[SourceType, IdType] | None = None, ) -> None: self._mapper: Mapper[str, str, SourceType] = mapper or Mapper(**self.default_mapper_kwargs()) if self._mapper.on_unmapped == "raise": warnings.warn( "Using on_unmapped='raise' will treat optional placeholders as " "required placeholders during normal operation.", category=MappingWarning, stacklevel=2, ) self._allow_fetch_all: bool = allow_fetch_all self._selective_fetch_all = selective_fetch_all identifiers = () if identifiers is None else (*identifiers,) logger, mapper_logger = self._configure_loggers(identifiers) self.logger = logger self._mapper.logger = mapper_logger self._identifiers: tuple[str, ...] = identifiers self._optional = optional self._placeholders: dict[SourceType, list[str]] | None = None if cache_access is None: cache_access = _NOOP_CACHE_ACCESS else: cache_access.set_parent(self) self._cache_access = cache_access
[docs] @final # Prevent accidental overriding def initialize_sources(self, task_id: int | None = None, *, force: bool = False) -> None: if not (self._placeholders is None or force): return start = perf_counter() if task_id is None: task_id = _logging.generate_task_id(start) logger = self.logger if _logging.ENABLE_VERBOSE_LOGGING and logger.isEnabledFor(logging.DEBUG): logger.debug( f"Begin initialization of '{self._cls_name()}' at {hex(id(self))}.", extra=dict( task_id=task_id, event_key=_logging.get_event_key(self.initialize_sources, "enter"), ), ) self._placeholders = self._initialize_sources(task_id) if self._placeholders is None: msg = f"Call to {self._cls_name()}.{self.initialize_sources.__name__}() failed." raise RuntimeError(msg) if logger.isEnabledFor(logging.INFO): seconds = perf_counter() - start logger.info( f"Finished initialization of '{self._cls_name()}' in {fmt_sec(seconds)}: {self}", extra=dict( task_id=task_id, event_key=_logging.get_event_key(self.initialize_sources, "exit"), seconds=seconds, ), )
@abstractmethod def _initialize_sources(self, task_id: int) -> dict[SourceType, list[str]]: """Perform a full (re) discovery of sources and placeholders.""" @final @property def selective_fetch_all(self) -> bool: """If set, reduce the amount of data fetched by :meth:`fetch_all`.""" return self._selective_fetch_all @final @property def placeholders(self) -> dict[SourceType, list[str]]: if self._placeholders is None: self.initialize_sources() return self.placeholders return self._placeholders @final @property def sources(self) -> list[SourceType]: return list(self.placeholders) @final @property def identifiers(self) -> tuple[str, ...]: """A collection of hierarchical identifiers for this fetcher.""" return self._identifiers
[docs] def map_placeholders( self, source: SourceType, placeholders: Iterable[str], *, candidates: Iterable[str] | None = None, task_id: int | None = None, ) -> dict[str, str | None]: """Map `placeholder` names to the actual names seen in `source`. This method calls ``Mapper.apply(values=placeholders, candidates=candidates, context=source)`` using the local :attr:`.AbstractFetcher.mapper` instance. Args: source: The source to map placeholders for. placeholders: Desired :attr:`~.Format.placeholders`. candidates: A subset of candidates (placeholder names) in `source` to map with `placeholders`. task_id: Used for logging. Returns: A dict ``{wanted_placeholder_name: actual_placeholder_name_in_source}``, where `actual_placeholder_name_in_source` will be ``None`` if the wanted placeholder could not be mapped to any of the candidates available for the source. Raises: UnknownSourceError: If `source` is not in :attr:`sources`. See Also: 🔑 This is a key event method. See :ref:`key-events` for details. """ start = perf_counter() if self._placeholders is not None and source not in self._placeholders: # Check the underlying attribute to avoid infinite recursion in _initialize_sources()-implementations that # call this method. This is typically done indirectly via id_column(), which requires explicit candidates. raise exceptions.UnknownSourceError({source}, self.sources) if candidates is None: # May lead to infinite recursion if _initialize_sources() calls map_placeholders(candidates=None). candidates = self.placeholders[source] candidates = set(candidates) placeholders = set(placeholders) logger = self.logger emit_key_event = self._placeholders or (_logging.ENABLE_VERBOSE_LOGGING and logger.isEnabledFor(logging.DEBUG)) if emit_key_event: logger.debug( f"Begin mapping of wanted {placeholders=} to actual placeholders={candidates} for {source=}.", extra=dict( task_id=task_id, event_key=_logging.get_event_key(self.map_placeholders, "enter"), values=list(placeholders), candidates=list(candidates), context=source, ), ) dm = self.mapper.apply(placeholders, candidates, context=source, task_id=task_id) ans: dict[str, str | None] = dm.flatten() # type: ignore[assignment] for not_mapped in placeholders.difference(ans): ans[not_mapped] = None if emit_key_event: seconds = perf_counter() - start logger.debug( f"Finished placeholder mapping for {source=}: {ans}.", extra=dict( task_id=task_id, event_key=_logging.get_event_key(self.map_placeholders, "exit"), seconds=seconds, mapping=ans, context=source, ), ) return ans
[docs] def id_column( self, source: SourceType, *, candidates: Iterable[str], task_id: int | None = None, ) -> str | None: """Return the ID column for `source`.""" if not candidates: msg = f"Bad {candidates=} argument; must be a non-empty collection." raise TypeError(msg) return self.map_placeholders(source, [ID], candidates=candidates, task_id=task_id)[ID]
@property def mapper(self) -> Mapper[str, str, SourceType]: """Return the ``Mapper`` instance used for placeholder name mapping.""" return self._mapper @property def cache_access(self) -> CacheAccess[SourceType, IdType]: """Return the :class:`.CacheAccess` for this fetcher.""" cache_access = self._cache_access if cache_access is not _NOOP_CACHE_ACCESS: return cache_access link = "https://id-translation.readthedocs.io/en/stable/documentation/examples/caching/caching.html" msg = f"{self} does not have a `CacheAccess`.\nHint: {link}" raise CacheAccessNotAvailableError(msg) @property def online(self) -> bool: return False # pragma: no cover
[docs] def assert_online(self) -> None: """Raise an error if offline. Raises: ConnectionStatusError: If not online. """ if not self.online: # pragma: no cover raise ConnectionStatusError("disconnected")
@property def allow_fetch_all(self) -> bool: return self._allow_fetch_all @property def logger(self) -> logging.Logger: """Return the ``Logger`` that is used by this instance.""" return self._logger @logger.setter def logger(self, logger: logging.Logger) -> None: self._logger = logger @property def optional(self) -> bool: return self._optional
[docs] def fetch( self, ids_to_fetch: Iterable[IdsToFetch[SourceType, IdType]], placeholders: Iterable[str] = (), required: Iterable[str] = (), task_id: int | None = None, enable_uuid_heuristics: bool = False, ) -> SourcePlaceholderTranslations[SourceType]: start = perf_counter() if task_id is None: task_id = _logging.generate_task_id(start) placeholders = tuple(placeholders) required_placeholders = set(required) ids_to_fetch = tuple(ids_to_fetch) logger = self.logger if logger.isEnabledFor(logging.DEBUG): num_ids = {itf.source: len(itf.ids) for itf in ids_to_fetch} logger.debug( f"Begin fetching {placeholders=} for {sum(num_ids.values())} IDs from {len(num_ids)} sources: {num_ids}.", extra=dict( task_id=task_id, event_key=_logging.get_event_key(self.fetch, "enter"), placeholders=placeholders, required_placeholders=tuple(required_placeholders), num_ids=num_ids, fetch_all=False, ), ) rv = { itf.source: self._fetch_translations( itf.source, placeholders, required_placeholders=required_placeholders, ids=itf.ids, task_id=task_id, enable_uuid_heuristics=enable_uuid_heuristics, ) for itf in ids_to_fetch } if logger.isEnabledFor(logging.INFO): seconds = perf_counter() - start num_ids = {itf.source: len(itf.ids) for itf in ids_to_fetch} placeholders_returned = {source: len(pht.placeholders) for source, pht in rv.items()} pretty = self._format_fetch_result(rv, num_ids) logger.info( f"Finished fetching from {len(rv)} sources in {fmt_sec(seconds)}: {pretty}.", extra=dict( task_id=task_id, event_key=_logging.get_event_key(self.fetch, "exit"), seconds=seconds, sources=[*rv], placeholders_returned=placeholders_returned, num_ids=num_ids, num_ids_returned={source: len(pht.records) for source, pht in rv.items()}, fetch_all=False, ), ) return rv
[docs] def fetch_all( self, placeholders: Iterable[str] = (), *, required: Iterable[str] = (), sources: set[SourceType] | None = None, task_id: int | None = None, enable_uuid_heuristics: bool = False, ) -> SourcePlaceholderTranslations[SourceType]: start = perf_counter() if task_id is None: task_id = _logging.generate_task_id() if not self._allow_fetch_all: raise exceptions.ForbiddenOperationError("FETCH_ALL", reason=f"not allowed by {self}.") placeholders = tuple(placeholders) required_placeholders = set(required) logger = self.logger if logger.isEnabledFor(logging.DEBUG): wanted = self.sources if sources is None else list(sources) logger.debug( f"Begin fetching all IDs for {placeholders=} for {len(wanted)}/{len(self.sources)}: {wanted}.", extra=dict( task_id=task_id, event_key=_logging.get_event_key(self.fetch_all, "enter"), placeholders=placeholders, required_placeholders=tuple(required_placeholders), wanted_sources=wanted, num_ids=None, fetch_all=True, ), ) with self._fetch_all_mapping_context(): rv = self._fetch_all( tuple(placeholders), required_placeholders=set(required), wanted_sources=sources, task_id=task_id, enable_uuid_heuristics=enable_uuid_heuristics, ) if logger.isEnabledFor(logging.INFO): seconds = perf_counter() - start pretty = self._format_fetch_result(rv, None) logger.info( f"Finished fetching all IDs from {len(self.sources) if sources is None else len(sources)}/" f"{len(self.sources)} sources in {fmt_sec(seconds)}: {pretty}.", extra=dict( task_id=task_id, event_key=_logging.get_event_key(self.fetch, "exit"), seconds=seconds, sources=[*rv], placeholders_returned={source: len(pht.placeholders) for source, pht in rv.items()}, num_ids=None, num_ids_returned={source: len(pht.records) for source, pht in rv.items()}, fetch_all=True, ), ) return rv
def _fetch_all( self, placeholders: PlaceholdersTuple, required_placeholders: set[str], wanted_sources: set[SourceType] | None, task_id: int, enable_uuid_heuristics: bool, ) -> SourcePlaceholderTranslations[SourceType]: if wanted_sources is None: wanted_sources = {*self.sources} if self._selective_fetch_all: # There's nothing stopping us from doing this for regular fetching. But we assume that then the user wants # fetching to fail if explicit IDs can't be translated as specified. sources = [ source for source in wanted_sources if required_placeholders.issubset(self._wanted_to_actual(source, required_placeholders, task_id)) ] if discarded := wanted_sources.difference(sources): self.logger.info( f"Ignoring {len(discarded)} sources {discarded} since required " f"placeholders {sorted(required_placeholders)} could not be mapped by {self}.", extra={"task_id": task_id}, ) else: sources = [*wanted_sources] source_translations = {} for source in sources: translations = self._fetch_translations( source, placeholders or (*self.placeholders[source],), required_placeholders=required_placeholders, task_id=task_id, enable_uuid_heuristics=enable_uuid_heuristics, ) source_translations[source] = translations return source_translations @contextmanager def _fetch_all_mapping_context(self): # type: ignore # noqa original_mapper = self._mapper on_unmapped = "ignore" selective_fetch_all = self._selective_fetch_all if not (selective_fetch_all and original_mapper.on_unmapped != on_unmapped): yield return if self.logger.isEnabledFor(logging.DEBUG): self.logger.debug( f"Using Mapper.{on_unmapped=} until the current {self.fetch_all.__qualname__}-operation" f" finishes, since {selective_fetch_all=}." ) self._mapper = self._mapper.copy(on_unmapped=on_unmapped) try: yield finally: self._mapper = original_mapper
[docs] @abstractmethod def fetch_translations(self, instr: FetchInstruction[SourceType, IdType]) -> PlaceholderTranslations[SourceType]: """Retrieve placeholder translations from the source. Args: instr: A single :class:`.FetchInstruction` for IDs to fetch. If IDs is ``None``, the fetcher should retrieve data for as many IDs as possible. Returns: Placeholder translation elements. Raises: UnknownPlaceholderError: If the placeholder is unknown to the fetcher. See Also: 🔑 This is a key event method. See :ref:`key-events` for details. """
def _fetch_translations( self, source: SourceType, placeholders: PlaceholdersTuple, *, required_placeholders: set[str], task_id: int, enable_uuid_heuristics: bool, ids: set[IdType] | None = None, ) -> PlaceholderTranslations[SourceType]: placeholders = self._deduplicate(placeholders) reverse_mappings, instr = self._make_fetch_instruction( source, placeholders, required_placeholders=required_placeholders, ids=ids, task_id=task_id, enable_uuid_heuristics=enable_uuid_heuristics, ) translations: PlaceholderTranslations[SourceType] | None = None cache = self._cache_access logger: logging.Logger | None = None store_cache = False def log_cache(msg: str, event: str) -> None: if logger is None: return pretty = f"{type(cache).__name__}[{source=}]" logger.debug(msg.format(pretty), extra={"source": instr.source, "task_id": task_id, "cache_event": event}) if cache.enabled: logger = self.logger if not logger.isEnabledFor(logging.DEBUG): logger = None translations = cache.load(instr) store_cache = translations is None if logger: value = f"{len(translations.records)} IDs" if translations else None cache_event = "hit" if translations else "miss" log_cache(f"{{}}.load() returned {value}.", cache_event) if translations is None: translations = self._call_user_impl(instr) if reverse_mappings: # The mapping is only in reverse from the Fetchers point-of-view; we're mapping back to "proper" values. translations.placeholders = tuple(reverse_mappings.get(p, p) for p in translations.placeholders) translations.id_pos = translations.placeholders.index(ID) translations.placeholder_aliases.update(reverse_mappings) available_placeholders = {*translations.placeholders, *translations.placeholder_aliases} unmapped_required_placeholders = required_placeholders.difference(available_placeholders) if unmapped_required_placeholders: self._verify_placeholders(reverse_mappings or {}, source, unmapped_required_placeholders) if store_cache: log_cache(f"Calling {{}}.store() with {len(translations.records)} IDs.", "store") cache.store(instr, translations) return translations @classmethod def _deduplicate(cls, placeholders: PlaceholdersTuple) -> PlaceholdersTuple: unique = [] # Hashing is slower for few elements for placeholder in placeholders: if placeholder not in unique: unique.append(placeholder) return tuple(unique) def _call_user_impl(self, instr: FetchInstruction[SourceType, IdType]) -> PlaceholderTranslations[SourceType]: start = perf_counter() source = instr.source placeholders = instr.placeholders logger = self.logger if logger.isEnabledFor(logging.DEBUG): logger.debug( f"Begin fetching {'all' if instr.ids is None else len(instr.ids)} IDs from {source=}. Placeholders: {placeholders}.", extra=dict( event_key=_logging.get_event_key(self.fetch_translations, "enter"), source=source, placeholders=instr.placeholders, num_ids=None if instr.ids is None else len(instr.ids), fetch_all=instr.fetch_all, task_id=instr.task_id, ), ) translations = self.fetch_translations(instr) if logger.isEnabledFor(logging.DEBUG): seconds = perf_counter() - start logger.debug( f"Finished fetching {len(translations.records)} IDs from source='{translations.source}'" f" in {fmt_sec(seconds)}. Placeholders: {translations.placeholders}.", extra=dict( task_id=instr.task_id, event_key=_logging.get_event_key(self.fetch_translations, "exit"), seconds=seconds, source=source, placeholders=translations.placeholders, num_ids=None if instr.ids is None else len(instr.ids), num_ids_returned=len(translations.records), fetch_all=instr.fetch_all, ), ) return translations def _verify_placeholders(self, reverse_mappings: dict[str, str], source: SourceType, unmapped: set[str]) -> None: hint = "" if unmapped.intersection(reverse_mappings.values()): r = reverse_dict(reverse_mappings) bad_mappings = {b: r[b] for b in unmapped} hint = ( f"\nHint: Mapping {bad_mappings} for required placeholders (keys) were made to placeholders that do not" f" exist. The override configuration {self.mapper._overrides} may be incorrect." ) raise exceptions.UnknownPlaceholderError( f"Required placeholders {unmapped} not recognized. For {source=}, known placeholders are: " f"{sorted(self.placeholders[source])} for {self}.{hint}" ) def _make_fetch_instruction( self, source: SourceType, placeholders: PlaceholdersTuple, required_placeholders: set[str], ids: set[IdType] | None, task_id: int, enable_uuid_heuristics: bool, ) -> tuple[dict[str, str], FetchInstruction[SourceType, IdType]]: required_placeholders.add(ID) if ID not in placeholders: placeholders = (ID, *placeholders) wanted_to_actual = self._wanted_to_actual(source, placeholders, task_id) actual_to_wanted = {actual: wanted for wanted, actual in wanted_to_actual.items() if wanted != actual} if actual_to_wanted: # We'll just map what we can here. If anything is missing it'll be caught later. def apply(c: Iterable[str]) -> Iterable[str]: return (wanted_to_actual[p] for p in c if p in wanted_to_actual) placeholders = tuple(apply(placeholders)) required_placeholders = set(apply(required_placeholders)) return ( actual_to_wanted, FetchInstruction( source=source, placeholders=placeholders, required=required_placeholders, ids=None if ids is None else set(ids), task_id=task_id, enable_uuid_heuristics=enable_uuid_heuristics, ), ) def _wanted_to_actual( self, source: SourceType, wanted_placeholders: Iterable[str], task_id: int | None = None ) -> dict[str, str]: wanted_to_actual = self.map_placeholders(source, wanted_placeholders, task_id=task_id) return {wanted: actual for wanted, actual in wanted_to_actual.items() if actual is not None}
[docs] @classmethod def default_mapper_kwargs(cls) -> dict[str, Any]: """Return default :class:`.Mapper` arguments for ``AbstractFetcher`` implementations.""" return dict( score_function=HeuristicScore( cls.default_score_function, # type: ignore heuristics=[("force_lower_case", {})], ), overrides=InheritedKeysDict(), )
[docs] @classmethod def default_score_function(cls, value: str, candidates: Iterable[str], context: str) -> Iterable[float]: """Compute score for candidates.""" return modified_hamming(value, candidates, context)
def __str__(self) -> str: class NoSources: def __repr__(self) -> str: return "<no sources>" sources = self.sources or NoSources() return f"{type(self).__name__}({sources=})" def _cls_name(self) -> str: return tname(self, include_module=True).removeprefix(__package__ + ".") def __deepcopy__(self, memo: dict[int, Any] = {}) -> Self: # noqa: B006 cls = self.__class__ result = cls.__new__(cls) memo[id(self)] = result for k, v in self.__dict__.items(): setattr(result, k, deepcopy(v, memo)) return result @classmethod def _configure_loggers(cls, identifiers: tuple[str, ...]) -> tuple[logging.Logger, logging.Logger]: config_file: str | None = None for identifier in identifiers: if identifier.endswith(".toml"): config_file = identifier break adapter = _AbstractFetcherLogAdapter(cls.__module__ + "." + cls.__name__, config_file=config_file) logger = logging.getLogger(__package__) logger.addFilter(adapter) mapper_logger = logger.getChild("map") mapper_logger.addFilter(adapter) return logger, mapper_logger @classmethod def _format_fetch_result( cls, source_translation: dict[SourceType, PlaceholderTranslations[SourceType]], num_ids_requested: dict[SourceType, int] | None, ) -> str: def add_requested(source: SourceType) -> str: if num_ids_requested is None: return "" return f"/{num_ids_requested[source]}" return ", ".join( f"[{source!r} x {pht.placeholders} x {len(pht.records)}{add_requested(source)} IDs]" for source, pht in source_translation.items() )
class _AbstractFetcherLogAdapter(logging.Filter): def __init__( self, cls: str, *, config_file: str | None, ) -> None: super().__init__() self.config_file = config_file self.cls = cls def filter(self, record: logging.LogRecord) -> bool: record.fetcher_config_file = self.config_file record.fetcher_class = self.cls return True class NoopCacheAccess(CacheAccess[Any, Any]): @property def enabled(self) -> bool: return False def _raise(self, *_: Any, **__: Any) -> None: raise NotImplementedError store = _raise load = _raise _NOOP_CACHE_ACCESS = NoopCacheAccess()