diff --git a/finn_eiendom/analysis.py b/finn_eiendom/analysis.py index 2e816f0..408f187 100644 --- a/finn_eiendom/analysis.py +++ b/finn_eiendom/analysis.py @@ -1,9 +1,37 @@ -"""Orchestration for FINN search + Eiendom.no enrichment + scoring.""" +"""Orchestration for FINN search + Eiendom.no enrichment + scoring. + +Analysis caching +---------------- +``analyze_ad`` caches its result under a ``deps_hash`` that is the +SHA-256 of the combined raw payloads of the ad, the eiendom unit, and the +comparable sales used to produce it. On a subsequent call the function: + + 1. Reads the three raw content hashes from the DB (no deserialisation). + 2. Derives the same deps_hash from those hashes. + 3. Checks analysis_cache for a matching (finnkode, deps_hash) row. + 4. Returns the cached result immediately if found. + 5. Otherwise runs the full scoring pipeline and writes to analysis_cache. + +The cached result is invalidated automatically the moment any piece of +underlying data changes, because the deps_hash will differ. +""" import logging from . import ad as ad_module from . import cache, eiendom_no, scoring, search +from .cache import ( + combine_hashes, + get_analysis, + get_eiendom_unit_hash, + get_finn_ad_hash, + get_similar_units_hash, + invalidate_analysis, + save_analysis, + save_eiendom_unit, + save_finn_ad, + save_similar_units, +) from .config import ( EIENDOM_NO_CACHE_TTL_HOURS, FINN_CACHE_PATH, @@ -86,38 +114,93 @@ def _build_ad_summary( } +def _compute_deps_hash( + conn, + finnkode: str, + unit_code: str | None, + listing_status: str = "RECENTLY_SOLD", +) -> str: + """Derive a deps_hash from the three stored raw content hashes. + + Reads only the hash column -- no payload deserialisation. + """ + ad_hash = get_finn_ad_hash(conn, finnkode) + unit_hash = get_eiendom_unit_hash(conn, unit_code) if unit_code else None + comps_hash = ( + get_similar_units_hash(conn, unit_code, listing_status) if unit_code else None + ) + return combine_hashes(ad_hash, unit_hash, comps_hash) + + async def analyze_ad( finn_ad: FinnAd, unit_code: str | None = None, ) -> dict: - """Enrich a FinnAd and compute score summary.""" + """Enrich a FinnAd and compute score summary. + + Result is cached in analysis_cache keyed by deps_hash. Recomputation + happens only when the underlying raw data has actually changed. + """ conn = cache.init_db(FINN_CACHE_PATH) + + # ------------------------------------------------------------------ + # 1. Ensure the ad is in the DB so we have a stable hash to key on. + # ------------------------------------------------------------------ + ad_hash, ad_changed = save_finn_ad(conn, finn_ad) + + # ------------------------------------------------------------------ + # 2. Fetch / refresh Eiendom.no data (cache-aware). + # ------------------------------------------------------------------ enriched: EiendomUnit | None = None - similar_units: list[SimilarUnit] = [] + unit_hash_changed = False if unit_code: enriched = cache.get_eiendom_unit(conn, unit_code) if enriched is None: enriched = await eiendom_no.enrich_ad_with_eiendom_no(finn_ad, unit_code) if enriched is not None: - cache.save_eiendom_unit(conn, enriched) + _, unit_hash_changed = save_eiendom_unit(conn, enriched) + # If already cached, unit_hash_changed stays False -- no new write. + + # ------------------------------------------------------------------ + # 3. Fetch / refresh similar units (cache-aware). + # ------------------------------------------------------------------ + similar_units: list[SimilarUnit] = [] + comps_hash_changed = False if enriched: - # Check cache for similar units first. The cache uses (unit_code, - # listing_status) as the key, so we must look it up by unit_code. similar_units = cache.get_similar_units( conn, enriched.unit_code, "RECENTLY_SOLD", ttl_hours=EIENDOM_NO_CACHE_TTL_HOURS ) - if not similar_units: - # Cache miss: build the vector and fetch fresh from Eiendom.no - # (unit_vector field from get_unit is None; build locally) vector = enriched.unit_vector or eiendom_no.build_unit_vector(enriched) if vector: similar_units = await eiendom_no.get_similar_units(vector) - # Save to cache if similar_units: - cache.save_similar_units(conn, enriched.unit_code, "RECENTLY_SOLD", similar_units) + _, comps_hash_changed = save_similar_units( + conn, enriched.unit_code, "RECENTLY_SOLD", similar_units + ) + + # ------------------------------------------------------------------ + # 4. Derive deps_hash and check analysis_cache. + # ------------------------------------------------------------------ + deps_hash = _compute_deps_hash(conn, finn_ad.finnkode, unit_code) + + cached_analysis = get_analysis(conn, finn_ad.finnkode, deps_hash) + if cached_analysis is not None: + logger.debug("analysis_cache hit for %s -- skipping recompute", finn_ad.finnkode) + return cached_analysis + + # ------------------------------------------------------------------ + # 5. Cache miss: compute, store, return. + # ------------------------------------------------------------------ + logger.debug( + "analysis_cache miss for %s (ad_changed=%s, unit_changed=%s, comps_changed=%s)", + finn_ad.finnkode, + ad_changed, + unit_hash_changed, + comps_hash_changed, + ) scores = scoring.score_ad(finn_ad, enriched, similar_units) categories = scoring.classify_ad(scores) @@ -130,10 +213,16 @@ async def analyze_ad( "score": scores, "categories": categories, "summary": summary, - "eiendom_unit": enriched.model_dump() if enriched else None, - "similar_units": [unit.model_dump() for unit in similar_units], + "eiendom_unit": enriched.model_dump(mode="json") if enriched else None, + "similar_units": [unit.model_dump(mode="json") for unit in similar_units], } - cache.save_finn_ad(conn, finn_ad) + + # Round-trip through JSON to guarantee all values are serialisable + # (catches any datetime that survives model_dump, e.g. from scoring). + import json as _json + result = _json.loads(_json.dumps(result, default=str)) + + save_analysis(conn, finn_ad.finnkode, deps_hash, result) return result @@ -166,7 +255,13 @@ async def analyze_search( client=None, use_cache: bool = True, ) -> dict: - """Analyze a FINN search URL and enrich matching listings.""" + """Analyze a FINN search URL and enrich matching listings. + + Search-level results are NOT cached as a whole (the search page itself + is cached at the HTML level). Individual ad analyses ARE cached via + ``analyze_ad``, so re-running a search only re-scores ads whose + underlying data has changed. + """ conn = cache.init_db(FINN_CACHE_PATH) cards = await search.fetch_search_pages( search_url, @@ -177,6 +272,7 @@ async def analyze_search( results = [] enriched_count = 0 skipped_count = 0 + cache_hits = 0 if fetch_details: for card in cards[:detail_limit]: @@ -200,12 +296,14 @@ async def analyze_search( if result.get("eiendom_unit"): enriched_count += 1 + # Track analysis cache hits via the absence of recompute logging + # (the flag is not propagated up here; rely on debug logs). results.append(result) results.sort(key=lambda item: item["score"].get("total", 0.0), reverse=True) return { "search_url": search_url, - "search_cards": [card.model_dump() for card in cards], + "search_cards": [card.model_dump(mode="json") for card in cards], "analysis": results, "summary": { "total_listings": len(cards), @@ -213,4 +311,4 @@ async def analyze_search( "skipped_listings": skipped_count, "eiendom_enriched": enriched_count, }, - } + } \ No newline at end of file diff --git a/finn_eiendom/cache.py b/finn_eiendom/cache.py index 8bf78ba..b46b9c3 100644 --- a/finn_eiendom/cache.py +++ b/finn_eiendom/cache.py @@ -1,5 +1,29 @@ -"""SQLite cache and persistence for FINN and Eiendom.no data.""" +"""SQLite cache and persistence for FINN and Eiendom.no data. +Caching strategy +---------------- +Raw data (finn_ads, eiendom_units, similar_units) + Stored with a SHA-256 content_hash of the serialised payload. + On write: compare incoming hash to stored hash. If equal the remote + data has not changed -- the row is left untouched and the caller gets + back ``changed=False``, which preserves a valid analysis_cache entry. + +Analysis results (analysis_cache) + Keyed by ``(finnkode, deps_hash)`` where deps_hash = SHA-256 of the + combined raw payloads of the ad, eiendom unit, and comps that were used + to produce the result. A cache hit is only valid when the deps_hash + still matches, i.e. none of the underlying data has changed. + This means analysis is re-run *only* when remote data actually changes, + not on every TTL tick. + +Search pages / cards (cache_meta) + Still TTL-based -- these change frequently and a content-hash over a + full HTML page is cheap but the semantics of "changed" are less clear + (ads added/removed vs. cosmetic HTML tweaks). Hash is stored anyway so + callers can detect real list changes if desired. +""" + +import hashlib import json import logging import sqlite3 @@ -12,6 +36,32 @@ from .models import EiendomUnit, FinnAd, FinnSearchCard, SimilarUnit logger = logging.getLogger(__name__) +# --------------------------------------------------------------------------- +# Hashing helpers +# --------------------------------------------------------------------------- + + +def compute_content_hash(payload: Any) -> str: + """Return a stable SHA-256 hex digest of *payload*. + + *payload* can be a dict, list, or any JSON-serialisable value. + Keys are sorted so that insertion order does not affect the hash. + """ + serialised = json.dumps(payload, sort_keys=True, default=str) + return hashlib.sha256(serialised.encode()).hexdigest() + + +def combine_hashes(*hashes: str | None) -> str: + """Combine multiple content hashes into one deterministic deps_hash.""" + combined = "|".join(h or "" for h in hashes) + return hashlib.sha256(combined.encode()).hexdigest() + + +# --------------------------------------------------------------------------- +# Connection / schema +# --------------------------------------------------------------------------- + + def get_connection(path: str | None = None) -> sqlite3.Connection: db_path = path or FINN_CACHE_PATH conn = sqlite3.connect(str(db_path), detect_types=sqlite3.PARSE_DECLTYPES) @@ -22,62 +72,100 @@ def get_connection(path: str | None = None) -> sqlite3.Connection: def init_db(path: str | None = None) -> sqlite3.Connection: conn = get_connection(path) cursor = conn.cursor() + cursor.execute( """ CREATE TABLE IF NOT EXISTS finn_ads ( - finnkode TEXT PRIMARY KEY, - url TEXT, - payload TEXT NOT NULL, - fetched_at TEXT NOT NULL + finnkode TEXT PRIMARY KEY, + url TEXT, + payload TEXT NOT NULL, + content_hash TEXT, + fetched_at TEXT NOT NULL ) """ ) + # Migration: add content_hash column if the table already existed without it. + _add_column_if_missing(cursor, "finn_ads", "content_hash", "TEXT") + cursor.execute( """ CREATE TABLE IF NOT EXISTS eiendom_units ( - unit_code TEXT PRIMARY KEY, - payload TEXT NOT NULL, - fetched_at TEXT NOT NULL + unit_code TEXT PRIMARY KEY, + payload TEXT NOT NULL, + content_hash TEXT, + fetched_at TEXT NOT NULL ) """ ) + _add_column_if_missing(cursor, "eiendom_units", "content_hash", "TEXT") + cursor.execute( """ CREATE TABLE IF NOT EXISTS similar_units ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - unit_code TEXT NOT NULL, + id INTEGER PRIMARY KEY AUTOINCREMENT, + unit_code TEXT NOT NULL, listing_status TEXT NOT NULL, - payload TEXT NOT NULL, - fetched_at TEXT NOT NULL + payload TEXT NOT NULL, + content_hash TEXT, + fetched_at TEXT NOT NULL ) """ ) + _add_column_if_missing(cursor, "similar_units", "content_hash", "TEXT") + cursor.execute( """ CREATE TABLE IF NOT EXISTS cache_meta ( - key TEXT PRIMARY KEY, - value TEXT NOT NULL, + key TEXT PRIMARY KEY, + value TEXT NOT NULL, + content_hash TEXT, expires_at TEXT ) """ ) + _add_column_if_missing(cursor, "cache_meta", "content_hash", "TEXT") + + cursor.execute( + """ + CREATE TABLE IF NOT EXISTS analysis_cache ( + finnkode TEXT PRIMARY KEY, + deps_hash TEXT NOT NULL, + payload TEXT NOT NULL, + computed_at TEXT NOT NULL + ) + """ + ) + conn.commit() return conn +def _add_column_if_missing( + cursor: sqlite3.Cursor, table: str, column: str, col_type: str +) -> None: + """ALTER TABLE … ADD COLUMN is idempotent via this guard.""" + cursor.execute(f"PRAGMA table_info({table})") + existing = {row["name"] for row in cursor.fetchall()} + if column not in existing: + cursor.execute(f"ALTER TABLE {table} ADD COLUMN {column} {col_type}") + + +# --------------------------------------------------------------------------- +# Generic cache_meta helpers (search pages, search cards) +# --------------------------------------------------------------------------- + + def cache_get(conn: sqlite3.Connection, key: str) -> dict[str, Any] | None: cursor = conn.cursor() cursor.execute("SELECT value, expires_at FROM cache_meta WHERE key = ?", (key,)) row = cursor.fetchone() if not row: return None - expires_at = row["expires_at"] if expires_at and datetime.fromisoformat(expires_at) < datetime.now(UTC): cursor.execute("DELETE FROM cache_meta WHERE key = ?", (key,)) conn.commit() return None - return json.loads(row["value"]) @@ -87,24 +175,28 @@ def cache_set( payload: dict[str, Any], ttl_hours: int | None = None, ttl_minutes: int | None = None, -) -> None: +) -> str: + """Store *payload* in cache_meta and return its content_hash.""" expires_at = None if ttl_minutes is not None: expires_at = (datetime.now(UTC) + timedelta(minutes=ttl_minutes)).isoformat() elif ttl_hours is not None: expires_at = (datetime.now(UTC) + timedelta(hours=ttl_hours)).isoformat() + + content_hash = compute_content_hash(payload) cursor = conn.cursor() cursor.execute( - "INSERT OR REPLACE INTO cache_meta (key, value, expires_at) VALUES (?, ?, ?)", - (key, json.dumps(payload), expires_at), + "INSERT OR REPLACE INTO cache_meta (key, value, content_hash, expires_at)" + " VALUES (?, ?, ?, ?)", + (key, json.dumps(payload, default=_json_default), content_hash, expires_at), ) conn.commit() + return content_hash -def _is_fresh(fetched_at: str, ttl_hours: int | None) -> bool: - if ttl_hours is None: - return True - return datetime.fromisoformat(fetched_at) >= datetime.now(UTC) - timedelta(hours=ttl_hours) +# --------------------------------------------------------------------------- +# Search page / cards helpers +# --------------------------------------------------------------------------- def save_search_page( @@ -112,8 +204,9 @@ def save_search_page( url: str, html: str, ttl_minutes: int = 60, -) -> None: - cache_set(conn, f"search_page:{url}", {"html": html}, ttl_minutes=ttl_minutes) +) -> str: + """Cache raw HTML for a search page URL. Returns content_hash.""" + return cache_set(conn, f"search_page:{url}", {"html": html}, ttl_minutes=ttl_minutes) def get_search_page(conn: sqlite3.Connection, url: str) -> str | None: @@ -128,8 +221,9 @@ def save_search_cards( url: str, cards: list[FinnSearchCard], ttl_minutes: int = 60, -) -> None: - cache_set( +) -> str: + """Cache parsed search cards. Returns content_hash.""" + return cache_set( conn, f"search_cards:{url}", [card.model_dump(mode="json") for card in cards], @@ -144,28 +238,54 @@ def get_search_cards(conn: sqlite3.Connection, url: str) -> list[FinnSearchCard] return [FinnSearchCard.model_validate(item) for item in payload] -def save_finn_ad(conn: sqlite3.Connection, ad: FinnAd) -> None: +# --------------------------------------------------------------------------- +# FinnAd +# --------------------------------------------------------------------------- + + +def save_finn_ad(conn: sqlite3.Connection, ad: FinnAd) -> tuple[str, bool]: + """Persist *ad* to finn_ads. + + Returns ``(content_hash, changed)`` where ``changed=False`` means the + remote payload is identical to what was already stored -- callers can + use this to skip analysis recomputation. + """ cursor = conn.cursor() payload = ad.model_dump(mode="json") + new_hash = compute_content_hash(payload) + fetched_at = ( + ad.detail_fetched_at.isoformat() + if ad.detail_fetched_at + else datetime.now(UTC).isoformat() + ) + + # Check existing hash before writing. cursor.execute( - "INSERT OR REPLACE INTO finn_ads (finnkode, url, payload, fetched_at) VALUES (?, ?, ?, ?)", - ( - ad.finnkode, - ad.url, - json.dumps(payload), - ad.detail_fetched_at.isoformat() - if ad.detail_fetched_at - else datetime.now(UTC).isoformat(), - ), + "SELECT content_hash FROM finn_ads WHERE finnkode = ?", (ad.finnkode,) + ) + row = cursor.fetchone() + if row and row["content_hash"] == new_hash: + logger.debug("finn_ad %s unchanged (hash match)", ad.finnkode) + return new_hash, False + + cursor.execute( + "INSERT OR REPLACE INTO finn_ads" + " (finnkode, url, payload, content_hash, fetched_at)" + " VALUES (?, ?, ?, ?, ?)", + (ad.finnkode, ad.url, json.dumps(payload, default=_json_default), new_hash, fetched_at), ) conn.commit() + logger.debug("finn_ad %s saved (hash=%s)", ad.finnkode, new_hash[:8]) + return new_hash, True def get_finn_ad( conn: sqlite3.Connection, finnkode: str, ttl_hours: int | None = None ) -> FinnAd | None: cursor = conn.cursor() - cursor.execute("SELECT payload, fetched_at FROM finn_ads WHERE finnkode = ?", (finnkode,)) + cursor.execute( + "SELECT payload, fetched_at FROM finn_ads WHERE finnkode = ?", (finnkode,) + ) row = cursor.fetchone() if not row: return None @@ -174,13 +294,47 @@ def get_finn_ad( return FinnAd.model_validate(json.loads(row["payload"])) -def save_eiendom_unit(conn: sqlite3.Connection, unit: EiendomUnit) -> None: +def get_finn_ad_hash(conn: sqlite3.Connection, finnkode: str) -> str | None: + """Return the stored content_hash for *finnkode*, or None if not cached.""" cursor = conn.cursor() cursor.execute( - "INSERT OR REPLACE INTO eiendom_units (unit_code, payload, fetched_at) VALUES (?, ?, ?)", - (unit.unit_code, json.dumps(unit.model_dump(mode="json")), unit.fetched_at.isoformat()), + "SELECT content_hash FROM finn_ads WHERE finnkode = ?", (finnkode,) + ) + row = cursor.fetchone() + return row["content_hash"] if row else None + + +# --------------------------------------------------------------------------- +# EiendomUnit +# --------------------------------------------------------------------------- + + +def save_eiendom_unit(conn: sqlite3.Connection, unit: EiendomUnit) -> tuple[str, bool]: + """Persist *unit* to eiendom_units. + + Returns ``(content_hash, changed)``. + """ + cursor = conn.cursor() + payload = unit.model_dump(mode="json") + new_hash = compute_content_hash(payload) + + cursor.execute( + "SELECT content_hash FROM eiendom_units WHERE unit_code = ?", (unit.unit_code,) + ) + row = cursor.fetchone() + if row and row["content_hash"] == new_hash: + logger.debug("eiendom_unit %s unchanged (hash match)", unit.unit_code) + return new_hash, False + + cursor.execute( + "INSERT OR REPLACE INTO eiendom_units" + " (unit_code, payload, content_hash, fetched_at)" + " VALUES (?, ?, ?, ?)", + (unit.unit_code, json.dumps(payload, default=_json_default), new_hash, unit.fetched_at.isoformat()), ) conn.commit() + logger.debug("eiendom_unit %s saved (hash=%s)", unit.unit_code, new_hash[:8]) + return new_hash, True def get_eiendom_unit( @@ -190,8 +344,7 @@ def get_eiendom_unit( ) -> EiendomUnit | None: cursor = conn.cursor() cursor.execute( - "SELECT payload, fetched_at FROM eiendom_units WHERE unit_code = ?", - (unit_code,), + "SELECT payload, fetched_at FROM eiendom_units WHERE unit_code = ?", (unit_code,) ) row = cursor.fetchone() if not row: @@ -201,23 +354,65 @@ def get_eiendom_unit( return EiendomUnit.model_validate(json.loads(row["payload"])) +def get_eiendom_unit_hash(conn: sqlite3.Connection, unit_code: str) -> str | None: + """Return the stored content_hash for *unit_code*, or None if not cached.""" + cursor = conn.cursor() + cursor.execute( + "SELECT content_hash FROM eiendom_units WHERE unit_code = ?", (unit_code,) + ) + row = cursor.fetchone() + return row["content_hash"] if row else None + + +# --------------------------------------------------------------------------- +# SimilarUnits +# --------------------------------------------------------------------------- + + def save_similar_units( conn: sqlite3.Connection, unit_code: str, listing_status: str, similar_units: list[SimilarUnit], -) -> None: +) -> tuple[str, bool]: + """Persist *similar_units* for (unit_code, listing_status). + + Returns ``(content_hash, changed)``. + """ cursor = conn.cursor() - payload = json.dumps([item.model_dump(mode="json") for item in similar_units]) + payload_list = [item.model_dump(mode="json") for item in similar_units] + new_hash = compute_content_hash(payload_list) + cursor.execute( + "SELECT payload, content_hash FROM similar_units" + " WHERE unit_code = ? AND listing_status = ?" + " ORDER BY id DESC LIMIT 1", + (unit_code, listing_status), + ) + row = cursor.fetchone() + if row and row["content_hash"] == new_hash: + logger.debug( + "similar_units %s/%s unchanged (hash match)", unit_code, listing_status + ) + return new_hash, False + + cursor.execute( + "INSERT INTO similar_units" + " (unit_code, listing_status, payload, content_hash, fetched_at)" + " VALUES (?, ?, ?, ?, ?)", ( - "INSERT INTO similar_units" - " (unit_code, listing_status, payload, fetched_at)" - " VALUES (?, ?, ?, ?)" + unit_code, + listing_status, + json.dumps(payload_list, default=_json_default), + new_hash, + datetime.now(UTC).isoformat(), ), - (unit_code, listing_status, payload, datetime.now(UTC).isoformat()), ) conn.commit() + logger.debug( + "similar_units %s/%s saved (hash=%s)", unit_code, listing_status, new_hash[:8] + ) + return new_hash, True def get_similar_units( @@ -228,11 +423,9 @@ def get_similar_units( ) -> list[SimilarUnit]: cursor = conn.cursor() cursor.execute( - ( - "SELECT payload, fetched_at FROM similar_units" - " WHERE unit_code = ? AND listing_status = ?" - " ORDER BY id DESC LIMIT 1" - ), + "SELECT payload, fetched_at FROM similar_units" + " WHERE unit_code = ? AND listing_status = ?" + " ORDER BY id DESC LIMIT 1", (unit_code, listing_status), ) row = cursor.fetchone() @@ -241,3 +434,102 @@ def get_similar_units( if ttl_hours is not None and not _is_fresh(row["fetched_at"], ttl_hours): return [] return [SimilarUnit.model_validate(item) for item in json.loads(row["payload"])] + + +def get_similar_units_hash( + conn: sqlite3.Connection, unit_code: str, listing_status: str +) -> str | None: + """Return the stored content_hash for (unit_code, listing_status), or None.""" + cursor = conn.cursor() + cursor.execute( + "SELECT content_hash FROM similar_units" + " WHERE unit_code = ? AND listing_status = ?" + " ORDER BY id DESC LIMIT 1", + (unit_code, listing_status), + ) + row = cursor.fetchone() + return row["content_hash"] if row else None + + +# --------------------------------------------------------------------------- +# Analysis cache +# --------------------------------------------------------------------------- + + +def get_analysis( + conn: sqlite3.Connection, finnkode: str, deps_hash: str +) -> dict[str, Any] | None: + """Return cached analysis for *finnkode* if deps_hash still matches. + + ``deps_hash`` encodes the combined hashes of the ad, eiendom unit, and + comps that were used to produce the analysis. Any change to underlying + data produces a different deps_hash and the cache is considered stale. + """ + cursor = conn.cursor() + cursor.execute( + "SELECT payload, deps_hash FROM analysis_cache WHERE finnkode = ?", + (finnkode,), + ) + row = cursor.fetchone() + if not row: + return None + if row["deps_hash"] != deps_hash: + logger.debug( + "analysis_cache miss for %s (deps_hash changed %s→%s)", + finnkode, + row["deps_hash"][:8], + deps_hash[:8], + ) + return None + logger.debug("analysis_cache hit for %s", finnkode) + return json.loads(row["payload"]) + + +def _json_default(obj: Any) -> Any: + """Fallback serialiser for json.dumps. + Converts datetime/date → ISO string; anything else → repr string. + Means save_analysis never raises TypeError regardless of what scoring + or model_dump() emits. + """ + if hasattr(obj, "isoformat"): + return obj.isoformat() + return repr(obj) + + +def save_analysis( + conn: sqlite3.Connection, + finnkode: str, + deps_hash: str, + result: dict[str, Any], +) -> None: + """Store an analysis result keyed by (finnkode, deps_hash).""" + cursor = conn.cursor() + cursor.execute( + "INSERT OR REPLACE INTO analysis_cache" + " (finnkode, deps_hash, payload, computed_at)" + " VALUES (?, ?, ?, ?)", + (finnkode, deps_hash, json.dumps(result, default=_json_default), datetime.now(UTC).isoformat()), + ) + conn.commit() + logger.debug("analysis_cache saved for %s (deps_hash=%s)", finnkode, deps_hash[:8]) + + +def invalidate_analysis(conn: sqlite3.Connection, finnkode: str) -> None: + """Remove any cached analysis for *finnkode* (call after raw data changes).""" + conn.cursor().execute( + "DELETE FROM analysis_cache WHERE finnkode = ?", (finnkode,) + ) + conn.commit() + + +# --------------------------------------------------------------------------- +# Internal helpers +# --------------------------------------------------------------------------- + + +def _is_fresh(fetched_at: str, ttl_hours: int | None) -> bool: + if ttl_hours is None: + return True + return datetime.fromisoformat(fetched_at) >= datetime.now(UTC) - timedelta( + hours=ttl_hours + ) \ No newline at end of file diff --git a/finn_eiendom/service.py b/finn_eiendom/service.py index 506f4d4..dfbb153 100644 --- a/finn_eiendom/service.py +++ b/finn_eiendom/service.py @@ -1,4 +1,21 @@ -"""Service layer for cache-aware fetching of FINN ads and Eiendom.no units.""" +"""Service layer for cache-aware fetching of FINN ads and Eiendom.no units. + +Hash-aware fetch pattern +------------------------ +Every ``get_or_fetch_*`` function follows the same contract: + + 1. TTL check -- if cached row is fresh enough, return it directly. + 2. Remote fetch -- if TTL expired (or force_refresh), fetch from network. + 3. Hash check -- compare incoming payload hash to stored hash. + If equal the remote data has not changed; skip the DB write so that + the analysis_cache entry for this finnkode remains valid. + 4. Write + invalidate -- if hash differs, persist the new row and + delete any cached analysis (it will be recomputed on next call to + ``analyze_ad``). + +This means analysis results survive TTL resets as long as the remote +data has not actually changed. +""" import logging from typing import Any @@ -10,11 +27,12 @@ from .cache import ( get_finn_ad, get_similar_units as get_cached_similar_units, init_db, + invalidate_analysis, save_eiendom_unit, save_finn_ad, save_similar_units, ) -from .config import EIENDOM_NO_CACHE_TTL_HOURS, FINN_CACHE_PATH +from .config import EIENDOM_NO_CACHE_TTL_HOURS, FINN_CACHE_PATH, FINN_CACHE_TTL_AD_HOURS from .eiendom_no import ( build_unit_vector, decode_unit_vector, @@ -29,12 +47,25 @@ logger = logging.getLogger(__name__) async def get_or_fetch_ad(finnkode: str, force_refresh: bool = False) -> FinnAd: - """Get FinnAd from cache or fetch fresh. Never returns None.""" + """Get FinnAd from cache or fetch fresh. Never returns None. + + On a TTL expiry or force_refresh the ad is re-fetched from FINN. + If the remote payload hash matches the stored hash the DB row is + NOT updated, so analysis_cache entries for this finnkode stay valid. + If the hash differs the row is updated and any cached analysis is + invalidated. + """ conn = init_db(FINN_CACHE_PATH) - ad = None if force_refresh else get_finn_ad(conn, finnkode, ttl_hours=24) - if ad is None: - ad = await fetch_ad_details(finnkode) - save_finn_ad(conn, ad) + ad = None if force_refresh else get_finn_ad(conn, finnkode, ttl_hours=FINN_CACHE_TTL_AD_HOURS) + if ad is not None: + return ad + + # Cache miss or force_refresh: fetch from remote. + ad = await fetch_ad_details(finnkode) + _, changed = save_finn_ad(conn, ad) + if changed: + logger.debug("finn_ad %s updated -- invalidating analysis cache", finnkode) + invalidate_analysis(conn, finnkode) return ad @@ -67,10 +98,13 @@ async def ensure_eiendom_unit_code(ad: FinnAd) -> str | None: ad.eiendom_unit_code = unit.unit_code conn = init_db(FINN_CACHE_PATH) - save_finn_ad(conn, ad) # persist backfill; do NOT cache `unit` here -- - # the resolver returns a partial record (code + - # address + coords). The full unit comes from - # get_or_fetch_eiendom_unit -> get_unit(). + + # Persist the backfilled unit_code. If the hash changes (new field), + # invalidate the analysis cache so it is recomputed with the enriched ad. + _, changed = save_finn_ad(conn, ad) + if changed: + invalidate_analysis(conn, ad.finnkode) + logger.info("Resolved finnkode %s -> unit %s", ad.finnkode, unit.unit_code) return ad.eiendom_unit_code @@ -78,13 +112,31 @@ async def ensure_eiendom_unit_code(ad: FinnAd) -> str | None: async def get_or_fetch_eiendom_unit( unit_code: str, force_refresh: bool = False ) -> EiendomUnit | None: - """Get EiendomUnit from cache or fetch fresh.""" + """Get EiendomUnit from cache or fetch fresh. + + Hash-aware: if the remote payload is identical to what is stored, + the DB row is not updated (analysis_cache stays valid). + """ conn = init_db(FINN_CACHE_PATH) - unit = None if force_refresh else get_cached_eiendom_unit(conn, unit_code, ttl_hours=24) - if unit is None: - unit = await get_unit(unit_code) - if unit is not None: - save_eiendom_unit(conn, unit) + unit = ( + None + if force_refresh + else get_cached_eiendom_unit(conn, unit_code, ttl_hours=24) + ) + if unit is not None: + return unit + + unit = await get_unit(unit_code) + if unit is not None: + _, changed = save_eiendom_unit(conn, unit) + if changed: + logger.debug( + "eiendom_unit %s updated -- analysis caches for linked finnkodes may be stale", + unit_code, + ) + # We don't have a direct finnkode → unit_code reverse map in the + # DB yet, so we cannot invalidate analysis here. The deps_hash + # mismatch in get_analysis() handles this automatically. return unit @@ -93,43 +145,39 @@ async def get_or_fetch_similar_units( ) -> list[SimilarUnit]: """Get similar units (comps) from cache or fetch fresh. - Fetches the unit first to get the unit_vector, then checks cache for similar - units by (unit_code, listing_status). On cache miss, fetches fresh from - Eiendom.no and saves to cache. + Hash-aware: identical remote payloads do not trigger a DB write, + so the analysis_cache entry for any finnkode that uses these comps + remains valid. """ conn = init_db(FINN_CACHE_PATH) - # First, ensure we have the unit to build its vector + # Ensure we have the unit to build its vector. unit = await get_or_fetch_eiendom_unit(unit_code, force_refresh=force_refresh) if unit is None: return [] - # Check cache for similar units (unless force_refresh) if not force_refresh: cached_similar = get_cached_similar_units( conn, unit_code, listing_status, ttl_hours=EIENDOM_NO_CACHE_TTL_HOURS ) if cached_similar: logger.debug( - "Using cached similar units for %s (status=%s)", - unit_code, - listing_status, + "Using cached similar units for %s (status=%s)", unit_code, listing_status ) return cached_similar - # Cache miss or force_refresh: fetch fresh + # Cache miss or force_refresh: fetch from remote. vector = build_unit_vector(unit) similar = await get_similar_units(vector, listing_status=listing_status) - # Save to cache if similar: - save_similar_units(conn, unit_code, listing_status, similar) - logger.debug( - "Cached %d similar units for %s (status=%s)", - len(similar), - unit_code, - listing_status, - ) + _, changed = save_similar_units(conn, unit_code, listing_status, similar) + if changed: + logger.debug( + "similar_units %s/%s updated -- analysis caches may be stale", + unit_code, + listing_status, + ) return similar @@ -170,10 +218,8 @@ async def analyze_search( ) -> dict[str, Any]: """Analyze a FINN search URL and return a ranked shortlist. - NOTE: enrichment for search results lives in analysis.py. If that path - also reports `eiendom_enriched: 0`, it has the same root cause -- each - card's eiendom_unit_code must be resolved via ensure_eiendom_unit_code - (or search_unit_from_finn_url) before the enrichment gate. + Individual ad analyses are served from analysis_cache when the + underlying data has not changed. """ return await run_analysis_search( search_url, @@ -198,15 +244,15 @@ async def analyze_ad( unit_code = await ensure_eiendom_unit_code(ad) if include_eiendom_no else None result: dict[str, Any] = { - "ad": ad.model_dump(), + "ad": ad.model_dump(mode="json"), } if unit_code: unit = await get_or_fetch_eiendom_unit(unit_code) if unit: - result["eiendom_unit"] = unit.model_dump() + result["eiendom_unit"] = unit.model_dump(mode="json") if include_similar_units: similar = await get_or_fetch_similar_units(unit_code) - result["similar_units"] = [s.model_dump() for s in similar] + result["similar_units"] = [s.model_dump(mode="json") for s in similar] return result @@ -220,14 +266,14 @@ async def analyze_ad_against_comps( unit_code = await ensure_eiendom_unit_code(ad) result: dict[str, Any] = { - "ad": ad.model_dump(), + "ad": ad.model_dump(mode="json"), } if unit_code: unit = await get_or_fetch_eiendom_unit(unit_code) if unit: - result["eiendom_unit"] = unit.model_dump() + result["eiendom_unit"] = unit.model_dump(mode="json") comps = await get_or_fetch_similar_units(unit_code, listing_status=listing_status) - result["comparable_units"] = [c.model_dump() for c in comps] + result["comparable_units"] = [c.model_dump(mode="json") for c in comps] return result @@ -235,7 +281,6 @@ async def find_similar_to_liked( finnkode: str, *, mode: str = "recommendations", listing_status: str = "FOR_SALE" ) -> dict[str, Any]: """Find properties similar to a listing the user has liked.""" - # Requires that feedback.verdict = "liked" exists for this finnkode ad = await get_or_fetch_ad(finnkode) unit_code = await ensure_eiendom_unit_code(ad) @@ -252,8 +297,8 @@ async def find_similar_to_liked( similar = await get_or_fetch_similar_units(unit_code, listing_status=listing_status) return { - "base_ad": ad.model_dump(), - "similar_listings": [s.model_dump() for s in similar], + "base_ad": ad.model_dump(mode="json"), + "similar_listings": [s.model_dump(mode="json") for s in similar], "mode": mode, } @@ -269,16 +314,16 @@ async def compare_ads( # Resolve before model_dump() -- see analyze_ad. unit_code = await ensure_eiendom_unit_code(ad) if include_eiendom_no else None - ad_data = ad.model_dump() + ad_data = ad.model_dump(mode="json") if unit_code: unit = await get_or_fetch_eiendom_unit(unit_code) if unit: - ad_data["eiendom_unit"] = unit.model_dump() + ad_data["eiendom_unit"] = unit.model_dump(mode="json") if include_comps: comps = await get_or_fetch_similar_units( unit_code, listing_status="RECENTLY_SOLD" ) - ad_data["comps"] = [c.model_dump() for c in comps] + ad_data["comps"] = [c.model_dump(mode="json") for c in comps] ads.append(ad_data) @@ -318,4 +363,4 @@ def get_shortlist(run_id: int | None = None, limit: int = 10) -> dict[str, Any]: def get_new_ads_since_last_run(search_url: str) -> dict[str, Any]: """Detect new/removed/changed listings vs the previous run.""" # TODO: implement via search_runs table in cache.py - return {"new_ads": [], "removed_ads": [], "changed_ads": [], "search_url": search_url} + return {"new_ads": [], "removed_ads": [], "changed_ads": [], "search_url": search_url} \ No newline at end of file