"""Service layer for cache-aware fetching of FINN ads and Eiendom.no units. Hash-aware fetch pattern ------------------------ Every ``get_or_fetch_*`` function follows the same contract: 1. TTL check -- if cached row is fresh enough, return it directly. 2. Remote fetch -- if TTL expired (or force_refresh), fetch from network. 3. Hash check -- compare incoming payload hash to stored hash. If equal the remote data has not changed; skip the DB write so that the analysis_cache entry for this finnkode remains valid. 4. Write + invalidate -- if hash differs, persist the new row and delete any cached analysis (it will be recomputed on next call to ``analyze_ad``). This means analysis results survive TTL resets as long as the remote data has not actually changed. """ import logging from typing import Any from .ad import fetch_ad_details from .analysis import analyze_search as run_analysis_search from .cache import ( get_eiendom_unit as get_cached_eiendom_unit, get_feedback_by_verdict, get_finn_ad, get_latest_analysis, get_similar_units as get_cached_similar_units, init_db, invalidate_analysis, save_eiendom_unit, save_finn_ad, save_price_history, save_similar_units, ) from .config import ( EIENDOM_NO_CACHE_TTL_ESTIMATE_DAYS, EIENDOM_NO_CACHE_TTL_SIMILAR_UNITS_DAYS, EIENDOM_NO_CACHE_TTL_STRUCTURAL_DAYS, FINN_CACHE_PATH, FINN_CACHE_TTL_AD_STRUCTURAL_DAYS, ) from .eiendom_no import ( build_unit_vector, decode_unit_vector, get_similar_units, get_unit, search_unit_from_finn_url, ) from .feedback import save_feedback as save_feedback_impl from .models import EiendomUnit, FinnAd, SimilarUnit logger = logging.getLogger(__name__) async def get_or_fetch_ad(finnkode: str, force_refresh: bool = False) -> FinnAd: """Get FinnAd from cache or fetch fresh. Never returns None. On a TTL expiry or force_refresh the ad is re-fetched from FINN. If the remote payload hash matches the stored hash the DB row is NOT updated, so analysis_cache entries for this finnkode stay valid. If the hash differs the row is updated and any cached analysis is invalidated. """ conn = init_db(FINN_CACHE_PATH) # Convert structural TTL from days to hours ttl_hours = FINN_CACHE_TTL_AD_STRUCTURAL_DAYS * 24 ad = None if force_refresh else get_finn_ad(conn, finnkode, ttl_hours=ttl_hours) if ad is not None: return ad # Cache miss or force_refresh: fetch from remote. ad = await fetch_ad_details(finnkode) _, changed = save_finn_ad(conn, ad) # Record price snapshot for history tracking save_price_history( conn, finnkode, total_price=ad.total_price, asking_price=ad.asking_price, sale_status=None, ) if changed: logger.debug("finn_ad %s updated -- invalidating analysis cache", finnkode) invalidate_analysis(conn, finnkode) return ad async def ensure_eiendom_unit_code(ad: FinnAd) -> str | None: """Backfill ``ad.eiendom_unit_code`` by resolving it from the FINN URL. ``fetch_ad_details`` never populates ``eiendom_unit_code`` -- only the Eiendom.no resolver (``search_unit_from_finn_url``) can map a FINN listing to an Eiendom.no unit. Every enrichment path gates on this field, so without an explicit resolve step the gate is always falsy and enrichment silently no-ops. Resolves once, mutates the ad in place, and persists the backfill to the cache so subsequent cache hits skip the network round trip. IMPORTANT: callers must run this BEFORE serialising the ad with ``model_dump()`` -- otherwise the dumped dict carries a stale ``eiendom_unit_code: None`` even though enrichment succeeded. Returns the unit_code, or ``None`` if the listing cannot be resolved (e.g. new-build project ads, off-market addresses). """ if ad.eiendom_unit_code: return ad.eiendom_unit_code unit = await search_unit_from_finn_url(ad.url) if unit is None or not unit.unit_code: logger.info("No Eiendom.no unit resolved for finnkode %s", ad.finnkode) return None ad.eiendom_unit_code = unit.unit_code conn = init_db(FINN_CACHE_PATH) # Persist the backfilled unit_code. If the hash changes (new field), # invalidate the analysis cache so it is recomputed with the enriched ad. _, changed = save_finn_ad(conn, ad) if changed: invalidate_analysis(conn, ad.finnkode) logger.info("Resolved finnkode %s -> unit %s", ad.finnkode, unit.unit_code) return ad.eiendom_unit_code async def get_or_fetch_eiendom_unit( unit_code: str, force_refresh: bool = False ) -> EiendomUnit | None: """Get EiendomUnit from cache or fetch fresh. Hash-aware: if the remote payload is identical to what is stored, the DB row is not updated (analysis_cache stays valid). """ conn = init_db(FINN_CACHE_PATH) # Convert structural TTL from days to hours ttl_hours = EIENDOM_NO_CACHE_TTL_STRUCTURAL_DAYS * 24 unit = None if force_refresh else get_cached_eiendom_unit(conn, unit_code, ttl_hours=ttl_hours) if unit is not None: return unit unit = await get_unit(unit_code) if unit is not None: _, changed = save_eiendom_unit(conn, unit) if changed: logger.debug( "eiendom_unit %s updated -- analysis caches for linked finnkodes may be stale", unit_code, ) # We don't have a direct finnkode → unit_code reverse map in the # DB yet, so we cannot invalidate analysis here. The deps_hash # mismatch in get_analysis() handles this automatically. return unit async def get_or_fetch_similar_units( unit_code: str, listing_status: str = "RECENTLY_SOLD", force_refresh: bool = False ) -> list[SimilarUnit]: """Get similar units (comps) from cache or fetch fresh. Hash-aware: identical remote payloads do not trigger a DB write, so the analysis_cache entry for any finnkode that uses these comps remains valid. """ conn = init_db(FINN_CACHE_PATH) # Ensure we have the unit to build its vector. unit = await get_or_fetch_eiendom_unit(unit_code, force_refresh=force_refresh) if unit is None: return [] if not force_refresh: # Convert similar units TTL from days to hours ttl_hours = EIENDOM_NO_CACHE_TTL_SIMILAR_UNITS_DAYS * 24 cached_similar = get_cached_similar_units( conn, unit_code, listing_status, ttl_hours=ttl_hours ) if cached_similar: logger.debug("Using cached similar units for %s (status=%s)", unit_code, listing_status) return cached_similar # Cache miss or force_refresh: fetch from remote. vector = build_unit_vector(unit) similar = await get_similar_units(vector, listing_status=listing_status) if similar: _, changed = save_similar_units(conn, unit_code, listing_status, similar) if changed: logger.debug( "similar_units %s/%s updated -- analysis caches may be stale", unit_code, listing_status, ) return similar async def get_unit_images(unit_code: str, force_refresh: bool = False) -> dict[str, Any]: """Fetch unit images for visual assessment.""" unit = await get_or_fetch_eiendom_unit(unit_code, force_refresh=force_refresh) if unit is None: raise ValueError(f"Could not fetch Eiendom.no unit {unit_code}") return { "unit_code": unit.unit_code, "address": unit.address, "unit_images": unit.unit_images or [], "property_type": unit.property_type, "rooms": unit.rooms, "usable_area": unit.usable_area, } async def resolve_eiendom_unit_from_finn_url(finn_url: str) -> EiendomUnit | None: """Resolve an Eiendom.no unit from a FINN listing URL.""" return await search_unit_from_finn_url(finn_url) # ============================================================================ # Orchestration functions -- delegate to analysis.py # ============================================================================ async def analyze_search( search_url: str, *, max_pages: int = 3, detail_limit: int = 20, include_details: bool = True, include_eiendom_no: bool = True, ctx: Any = None, ) -> dict[str, Any]: """Analyze a FINN search URL and return a ranked shortlist. Individual ad analyses are served from analysis_cache when the underlying data has not changed. """ return await run_analysis_search( search_url, max_pages=max_pages, fetch_details=include_details, detail_limit=detail_limit, include_eiendom_no=include_eiendom_no, ctx=ctx, ) async def analyze_ad( finnkode: str, *, include_eiendom_no: bool = True, include_similar_units: bool = False, ) -> dict[str, Any]: """Fetch and enrich a single FINN ad with analysis.""" ad = await get_or_fetch_ad(finnkode) # Resolve BEFORE model_dump() so the serialised ad carries the backfilled # eiendom_unit_code instead of a stale None. unit_code = await ensure_eiendom_unit_code(ad) if include_eiendom_no else None result: dict[str, Any] = { "ad": ad.model_dump(mode="json"), } if unit_code: unit = await get_or_fetch_eiendom_unit(unit_code) if unit: result["eiendom_unit"] = unit.model_dump(mode="json") if include_similar_units: similar = await get_or_fetch_similar_units(unit_code) result["similar_units"] = [s.model_dump(mode="json") for s in similar] return result async def analyze_ad_against_comps( finnkode: str, listing_status: str = "RECENTLY_SOLD" ) -> dict[str, Any]: """Evaluate one listing against recent comparable sales.""" ad = await get_or_fetch_ad(finnkode) # Resolve before model_dump() -- see analyze_ad. unit_code = await ensure_eiendom_unit_code(ad) result: dict[str, Any] = { "ad": ad.model_dump(mode="json"), } if unit_code: unit = await get_or_fetch_eiendom_unit(unit_code) if unit: result["eiendom_unit"] = unit.model_dump(mode="json") comps = await get_or_fetch_similar_units(unit_code, listing_status=listing_status) result["comparable_units"] = [c.model_dump(mode="json") for c in comps] return result async def find_similar_to_liked( finnkode: str, *, mode: str = "recommendations", listing_status: str = "FOR_SALE" ) -> dict[str, Any]: """Find properties similar to a listing the user has liked.""" ad = await get_or_fetch_ad(finnkode) unit_code = await ensure_eiendom_unit_code(ad) if not unit_code: raise ValueError( f"Finnkode {finnkode} could not be resolved to an Eiendom.no unit; " "cannot find similar properties" ) # TODO: verify feedback verdict = "liked" exists unit = await get_or_fetch_eiendom_unit(unit_code) if not unit: raise ValueError(f"Cannot enrich finnkode {finnkode} with Eiendom.no data") similar = await get_or_fetch_similar_units(unit_code, listing_status=listing_status) return { "base_ad": ad.model_dump(mode="json"), "similar_listings": [s.model_dump(mode="json") for s in similar], "mode": mode, } async def compare_ads( finnkoder: list[str], *, include_eiendom_no: bool = True, include_comps: bool = True ) -> dict[str, Any]: """Compare multiple FINN listings side by side.""" ads = [] for finnkode in finnkoder: ad = await get_or_fetch_ad(finnkode) # Resolve before model_dump() -- see analyze_ad. unit_code = await ensure_eiendom_unit_code(ad) if include_eiendom_no else None ad_data = ad.model_dump(mode="json") if unit_code: unit = await get_or_fetch_eiendom_unit(unit_code) if unit: ad_data["eiendom_unit"] = unit.model_dump(mode="json") if include_comps: comps = await get_or_fetch_similar_units( unit_code, listing_status="RECENTLY_SOLD" ) ad_data["comps"] = [c.model_dump(mode="json") for c in comps] ads.append(ad_data) return {"listings": ads} # ============================================================================ # Helper functions # ============================================================================ async def build_unit_vector_for_unit_code(unit_code: str) -> dict[str, Any]: """Build a unit_vector for a unit_code by fetching and encoding the unit data.""" unit = await get_or_fetch_eiendom_unit(unit_code) if unit is None: raise ValueError(f"Could not fetch Eiendom.no unit {unit_code}") vector = build_unit_vector(unit) return {"unit_code": unit_code, "unit_vector": vector} def decode_unit_vector_to_dict(unit_vector: str) -> dict[str, Any]: """Decode a unit_vector string to a dict.""" return decode_unit_vector(unit_vector) def save_feedback(finnkode: str, verdict: str, notes: str | None = None) -> dict[str, Any]: """Store user feedback/verdict for a listing.""" return save_feedback_impl(finnkode, verdict, notes) def get_shortlist( verdict: str = "liked", limit: int = 10 ) -> dict[str, Any]: """Fetch the shortlist of listings the user has given *verdict*. Reads from the ``user_feedback`` table and enriches each finnkode with its most recent cached analysis (score, price, categories) when available. Entries with no cached analysis still appear, carrying the stored verdict and notes so nothing the user flagged is silently dropped. """ conn = init_db(FINN_CACHE_PATH) feedback_rows = get_feedback_by_verdict(conn, verdict, limit=limit) shortlist: list[dict[str, Any]] = [] for fb in feedback_rows: finnkode = fb["finnkode"] entry: dict[str, Any] = { "finnkode": finnkode, "verdict": fb["verdict"], "notes": fb["notes"], "url": f"https://www.finn.no/realestate/homes/ad.html?finnkode={finnkode}", } analysis = get_latest_analysis(conn, finnkode) if analysis: score = analysis.get("score") or {} eiendom = analysis.get("eiendom_unit") or {} entry.update( { "title": analysis.get("title"), "address": analysis.get("address"), "area_m2": analysis.get("area_m2"), "total_price": analysis.get("total_price"), "asking_price": analysis.get("asking_price"), "score": score.get("total"), "categories": analysis.get("categories", []), "market_placement": eiendom.get("market_placement_score"), } ) shortlist.append(entry) # Highest score first; un-enriched entries (score None) sink to the bottom. shortlist.sort(key=lambda e: (e.get("score") is not None, e.get("score") or 0), reverse=True) return {"shortlist": shortlist, "verdict": verdict, "limit": limit} def get_new_ads_since_last_run(search_url: str) -> dict[str, Any]: """Detect new/removed/changed listings vs the previous run.""" # TODO: implement via search_runs table in cache.py return {"new_ads": [], "removed_ads": [], "changed_ads": [], "search_url": search_url}