"""Orchestration for FINN search + Eiendom.no enrichment + scoring. Analysis caching ---------------- ``analyze_ad`` caches its result under a ``deps_hash`` that is the SHA-256 of the combined raw payloads of the ad, the eiendom unit, and the comparable sales used to produce it. On a subsequent call the function: 1. Reads the three raw content hashes from the DB (no deserialisation). 2. Derives the same deps_hash from those hashes. 3. Checks analysis_cache for a matching (finnkode, deps_hash) row. 4. Returns the cached result immediately if found. 5. Otherwise runs the full scoring pipeline and writes to analysis_cache. The cached result is invalidated automatically the moment any piece of underlying data changes, because the deps_hash will differ. """ import asyncio import logging from typing import Any from . import ad as ad_module from . import cache, eiendom_no, scoring, search from .cache import ( combine_hashes, get_analysis, get_eiendom_unit_hash, get_finn_ad_hash, get_similar_units_hash, invalidate_analysis, save_analysis, save_eiendom_unit, save_finn_ad, save_search_run, save_similar_units, ) from .config import ( EIENDOM_NO_CACHE_TTL_SIMILAR_UNITS_DAYS, EIENDOM_NO_CACHE_TTL_STRUCTURAL_DAYS, FINN_CACHE_PATH, FINN_CACHE_TTL_AD_STRUCTURAL_DAYS, FINN_DETAIL_LIMIT, FINN_MAX_SEARCH_PAGES, ) from .models import EiendomUnit, FinnAd, SimilarUnit logger = logging.getLogger(__name__) # Max parallel ad + eiendom.no fetches in analyze_search phase 1. # High enough to be fast; low enough to avoid FINN rate-limiting. FETCH_CONCURRENCY = 5 def _normalize_description(text: str | None) -> str: return text.lower() if text else "" def _is_resale_listing(url: str) -> bool: """True for ordinary resale ads. Project / new-build ads use different URL paths that fetch_ad_details cannot resolve (it builds a /homes/ URL).""" return "/realestate/homes/" in url def _build_ad_summary( ad: FinnAd, enriched: EiendomUnit | None, similar_units: list[SimilarUnit], scores: dict, categories: list[str], ) -> dict: description = _normalize_description(ad.listing_description) reasons = [] risks = [] next_steps = [ "Open the FINN listing and condition report.", "Review the Eiendom.no estimate and comparable sales.", "Ask the broker about renovation status and approvals.", ] if enriched and enriched.estimated_selling_price and ad.total_price: if ad.total_price < enriched.estimated_selling_price: reasons.append("Listing price is below Eiendom.no estimate.") elif ad.total_price <= enriched.estimated_selling_price_upper: reasons.append("Price sits within the local estimate range.") else: reasons.append("Listing price is above the estimate range.") else: reasons.append("Eiendom.no enrichment is unavailable or incomplete.") if "utsikt" in description or ad.has_balcony or ad.has_terrace: reasons.append("Outdoor space or view potential is positive.") if "hybel" in description or "leie" in description: reasons.append("Potential hybel/rental opportunity is mentioned.") if "potensial" in description or "renover" in description or "renovation" in description: reasons.append("Renovation or improvement potential is highlighted.") if scores.get("risk", 0.0) < 0: risks.append("Risk flags are detected in description or metadata.") if ad.common_costs and ad.common_costs > 5000: risks.append("Common costs are relatively high and should be reviewed.") if not enriched: risks.append("Missing Eiendom.no data increases uncertainty.") if not any("Eiendom.no" in step for step in next_steps): next_steps.append("Verify the property on Eiendom.no and reconcile any mismatches.") if similar_units: next_steps.append("Review the comparable units and average sqm prices.") else: next_steps.append("Comparable sales are unavailable; treat valuation with caution.") return { "why_interesting": reasons, "risks": risks, "next_steps": next_steps, "shortlist_reason": ", ".join(reasons[:3]) if reasons else "Review details and seller disclosures.", } def _compute_deps_hash( conn, finnkode: str, unit_code: str | None, listing_status: str = "RECENTLY_SOLD", ) -> str: """Derive a deps_hash from the three stored raw content hashes. Reads only the hash column -- no payload deserialisation. """ ad_hash = get_finn_ad_hash(conn, finnkode) unit_hash = get_eiendom_unit_hash(conn, unit_code) if unit_code else None comps_hash = get_similar_units_hash(conn, unit_code, listing_status) if unit_code else None return combine_hashes(ad_hash, unit_hash, comps_hash) async def analyze_ad( finn_ad: FinnAd, unit_code: str | None = None, ) -> dict: """Enrich a FinnAd and compute score summary. Result is cached in analysis_cache keyed by deps_hash. Recomputation happens only when the underlying raw data has actually changed. """ conn = cache.init_db(FINN_CACHE_PATH) # ------------------------------------------------------------------ # 0. Backfill eiendom_unit_code if provided. # ------------------------------------------------------------------ if unit_code and not finn_ad.eiendom_unit_code: finn_ad.eiendom_unit_code = unit_code # ------------------------------------------------------------------ # 1. Ensure the ad is in the DB so we have a stable hash to key on. # ------------------------------------------------------------------ ad_hash, ad_changed = save_finn_ad(conn, finn_ad) # ------------------------------------------------------------------ # 2. Fetch / refresh Eiendom.no data (cache-aware). # ------------------------------------------------------------------ enriched: EiendomUnit | None = None unit_hash_changed = False if unit_code: enriched = cache.get_eiendom_unit(conn, unit_code) if enriched is None: enriched = await eiendom_no.enrich_ad_with_eiendom_no(finn_ad, unit_code) if enriched is not None: _, unit_hash_changed = save_eiendom_unit(conn, enriched) # If already cached, unit_hash_changed stays False -- no new write. # ------------------------------------------------------------------ # 3. Fetch / refresh similar units (cache-aware). # ------------------------------------------------------------------ similar_units: list[SimilarUnit] = [] comps_hash_changed = False if enriched: # Convert similar units TTL from days to hours ttl_hours = EIENDOM_NO_CACHE_TTL_SIMILAR_UNITS_DAYS * 24 similar_units = cache.get_similar_units( conn, enriched.unit_code, "RECENTLY_SOLD", ttl_hours=ttl_hours ) if not similar_units: vector = enriched.unit_vector or eiendom_no.build_unit_vector(enriched) if vector: similar_units = await eiendom_no.get_similar_units(vector) if similar_units: _, comps_hash_changed = save_similar_units( conn, enriched.unit_code, "RECENTLY_SOLD", similar_units ) # ------------------------------------------------------------------ # 4. Derive deps_hash and check analysis_cache. # ------------------------------------------------------------------ deps_hash = _compute_deps_hash(conn, finn_ad.finnkode, unit_code) cached_analysis = get_analysis(conn, finn_ad.finnkode, deps_hash) if cached_analysis is not None: logger.debug("analysis_cache hit for %s -- skipping recompute", finn_ad.finnkode) return cached_analysis # ------------------------------------------------------------------ # 5. Cache miss: compute, store, return. # ------------------------------------------------------------------ logger.debug( "analysis_cache miss for %s (ad_changed=%s, unit_changed=%s, comps_changed=%s)", finn_ad.finnkode, ad_changed, unit_hash_changed, comps_hash_changed, ) scores = scoring.score_ad(finn_ad, enriched, similar_units) categories = scoring.classify_ad(scores) summary = _build_ad_summary(finn_ad, enriched, similar_units, scores, categories) # Get price history and cache age metadata from .cache import get_price_history, get_finn_ad_hash from datetime import datetime, UTC, timedelta price_history = get_price_history(conn, finn_ad.finnkode, limit=20) # Compute cache age: how long since we last fetched this ad cursor = conn.cursor() cursor.execute( "SELECT fetched_at, last_verified_at FROM finn_ads WHERE finnkode = ?", (finn_ad.finnkode,), ) db_row = cursor.fetchone() cache_age = None if db_row: fetched_at = datetime.fromisoformat(db_row["fetched_at"]) last_verified = db_row["last_verified_at"] if last_verified: last_verified_at = datetime.fromisoformat(last_verified) structural_age_mins = (datetime.now(UTC) - fetched_at).total_seconds() / 60 price_age_mins = (datetime.now(UTC) - last_verified_at).total_seconds() / 60 cache_age = { "structural_minutes": round(structural_age_mins, 1), "price_minutes": round(price_age_mins, 1), } result = { "finnkode": finn_ad.finnkode, "url": finn_ad.url, "title": finn_ad.title, "address": finn_ad.address, "listing_description": finn_ad.listing_description, "district": finn_ad.district, "property_type": finn_ad.property_type, "ownership_type": finn_ad.ownership_type, "floor": finn_ad.floor, "area_m2": finn_ad.area_m2, "bedrooms": finn_ad.bedrooms, "rooms": finn_ad.rooms, "total_price": finn_ad.total_price, "asking_price": finn_ad.asking_price, "shared_debt": finn_ad.shared_debt, "common_costs": finn_ad.common_costs, "construction_year": finn_ad.construction_year, "has_balcony": finn_ad.has_balcony, "has_terrace": finn_ad.has_terrace, "has_elevator": finn_ad.has_elevator, "has_parking": finn_ad.has_parking, "has_garage": finn_ad.has_garage, "eiendom_unit_code": finn_ad.eiendom_unit_code, "score": scores, "categories": categories, "summary": summary, "price_history": price_history, "cache_age": cache_age, "eiendom_unit": enriched.model_dump(mode="json") if enriched else None, "similar_units": [unit.model_dump(mode="json") for unit in similar_units], } # Round-trip through JSON to guarantee all values are serialisable # (catches any datetime that survives model_dump, e.g. from scoring). import json as _json result = _json.loads(_json.dumps(result, default=str)) save_analysis(conn, finn_ad.finnkode, deps_hash, result) return result async def _fetch_card_to_db( card, conn, *, include_eiendom_no: bool, client, ) -> tuple["FinnAd | None", "str | None"]: """Phase 1 worker: fetch ad details + resolve Eiendom.no unit, persist to DB. Returns (finn_ad, unit_code). Both can be None on failure -- the caller treats None as a skip without aborting the whole batch. """ try: finn_ad = cache.get_finn_ad( conn, card.finnkode, ttl_hours=FINN_CACHE_TTL_AD_STRUCTURAL_DAYS * 24 ) if finn_ad is None: finn_ad = await ad_module.fetch_ad_details(card.finnkode, client=client) save_finn_ad(conn, finn_ad) except Exception as exc: logger.warning("Failed to fetch ad %s: %s", card.finnkode, exc) return None, None unit_code = None if include_eiendom_no: try: matched_unit = await eiendom_no.search_unit_from_finn_url(card.url) unit_code = matched_unit.unit_code if matched_unit else None # Backfill unit_code into the ad object and persist. # This ensures the cached ad has the eiendom_unit_code field populated. if unit_code and not finn_ad.eiendom_unit_code: finn_ad.eiendom_unit_code = unit_code _, _ = save_finn_ad(conn, finn_ad) except Exception as exc: logger.warning("Eiendom.no unit search failed for %s: %s", card.finnkode, exc) return finn_ad, unit_code async def analyze_search( search_url: str, max_pages: int = FINN_MAX_SEARCH_PAGES, fetch_details: bool = True, detail_limit: int = FINN_DETAIL_LIMIT, include_eiendom_no: bool = True, client=None, use_cache: bool = True, ctx: Any = None, ) -> dict: """Analyze a FINN search URL and enrich matching listings. Two-phase parallel execution ---------------------------- Phase 1 (parallel, I/O bound): All resale cards are fetched concurrently behind a semaphore of size ``FETCH_CONCURRENCY``. Each worker fetches the ad detail page and resolves the Eiendom.no unit in one shot, then writes both to SQLite. Progress is reported via ``ctx`` if provided. Phase 2 (sequential, cache bound): Scoring reads entirely from SQLite -- no network -- and is fast. Results are sorted by total score and returned. Individual ad analyses ARE cached via ``analyze_ad``; re-running a search only re-scores ads whose underlying data has changed. """ conn = cache.init_db(FINN_CACHE_PATH) cards = await search.fetch_search_pages( search_url, max_pages=max_pages, client=client, use_cache=use_cache, ) resale_cards = [c for c in cards[:detail_limit] if _is_resale_listing(c.url)] skipped_count = len(cards[:detail_limit]) - len(resale_cards) if ctx is not None: await ctx.info(f"Found {len(cards)} listings, {len(resale_cards)} resale ads to fetch.") # ------------------------------------------------------------------ # Phase 1: parallel fetch to DB # ------------------------------------------------------------------ fetched: dict[str, tuple] = {} # finnkode -> (FinnAd, unit_code | None) fetch_counter = 0 sem = asyncio.Semaphore(FETCH_CONCURRENCY) if not fetch_details: resale_cards = [] async def _fetch_worker(card, idx: int) -> None: nonlocal fetch_counter async with sem: finn_ad, unit_code = await _fetch_card_to_db( card, conn, include_eiendom_no=include_eiendom_no, client=client ) fetched[card.finnkode] = (finn_ad, unit_code) fetch_counter += 1 if ctx is not None: await ctx.report_progress(fetch_counter, len(resale_cards)) status = "enriched" if unit_code else "no eiendom match" await ctx.info( f"[{fetch_counter}/{len(resale_cards)}] {card.finnkode} fetched ({status})" ) await asyncio.gather(*[_fetch_worker(c, i) for i, c in enumerate(resale_cards)]) # ------------------------------------------------------------------ # Phase 2: score from DB (reads cache, fast) # ------------------------------------------------------------------ if ctx is not None: await ctx.info(f"All data fetched. Scoring {len(resale_cards)} ads...") results = [] enriched_count = 0 for card in resale_cards: finn_ad, unit_code = fetched.get(card.finnkode, (None, None)) if finn_ad is None: skipped_count += 1 continue try: result = await analyze_ad(finn_ad, unit_code=unit_code) except Exception as exc: logger.warning("Skipping card %s during scoring: %s", card.finnkode, exc) skipped_count += 1 continue if result.get("eiendom_unit"): enriched_count += 1 results.append(result) results.sort(key=lambda item: item["score"].get("total", 0.0), reverse=True) if ctx is not None: await ctx.info( f"Done. {len(results)} analyzed, {enriched_count} enriched, {skipped_count} skipped." ) # Record this search run in the database finnkodes = [card.finnkode for card in cards] save_search_run(conn, search_url, finnkodes) return { "search_url": search_url, "search_cards": [card.model_dump(mode="json") for card in cards], "analysis": results, "summary": { "total_listings": len(cards), "analyzed_listings": len(results), "skipped_listings": skipped_count, "eiendom_enriched": enriched_count, }, }