"""SQLite cache and persistence for FINN and Eiendom.no data. Caching strategy ---------------- Raw data (finn_ads, eiendom_units, similar_units) Stored with a SHA-256 content_hash of the serialised payload. On write: compare incoming hash to stored hash. If equal the remote data has not changed -- the row is left untouched and the caller gets back ``changed=False``, which preserves a valid analysis_cache entry. Analysis results (analysis_cache) Keyed by ``(finnkode, deps_hash)`` where deps_hash = SHA-256 of the combined raw payloads of the ad, eiendom unit, and comps that were used to produce the result. A cache hit is only valid when the deps_hash still matches, i.e. none of the underlying data has changed. This means analysis is re-run *only* when remote data actually changes, not on every TTL tick. Search pages / cards (cache_meta) Still TTL-based -- these change frequently and a content-hash over a full HTML page is cheap but the semantics of "changed" are less clear (ads added/removed vs. cosmetic HTML tweaks). Hash is stored anyway so callers can detect real list changes if desired. """ import hashlib import json import logging import sqlite3 from datetime import UTC, datetime, timedelta from typing import Any from .config import FINN_CACHE_PATH from .models import EiendomUnit, FinnAd, FinnSearchCard, SimilarUnit logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Hashing helpers # --------------------------------------------------------------------------- def compute_content_hash(payload: Any) -> str: """Return a stable SHA-256 hex digest of *payload*. *payload* can be a dict, list, or any JSON-serialisable value. Keys are sorted so that insertion order does not affect the hash. """ serialised = json.dumps(payload, sort_keys=True, default=str) return hashlib.sha256(serialised.encode()).hexdigest() def combine_hashes(*hashes: str | None) -> str: """Combine multiple content hashes into one deterministic deps_hash.""" combined = "|".join(h or "" for h in hashes) return hashlib.sha256(combined.encode()).hexdigest() # --------------------------------------------------------------------------- # Connection / schema # --------------------------------------------------------------------------- def get_connection(path: str | None = None) -> sqlite3.Connection: db_path = path or FINN_CACHE_PATH conn = sqlite3.connect(str(db_path), detect_types=sqlite3.PARSE_DECLTYPES) conn.row_factory = sqlite3.Row return conn def init_db(path: str | None = None) -> sqlite3.Connection: conn = get_connection(path) cursor = conn.cursor() cursor.execute( """ CREATE TABLE IF NOT EXISTS finn_ads ( finnkode TEXT PRIMARY KEY, url TEXT, payload TEXT NOT NULL, content_hash TEXT, fetched_at TEXT NOT NULL ) """ ) # Migration: add content_hash column if the table already existed without it. _add_column_if_missing(cursor, "finn_ads", "content_hash", "TEXT") cursor.execute( """ CREATE TABLE IF NOT EXISTS eiendom_units ( unit_code TEXT PRIMARY KEY, payload TEXT NOT NULL, content_hash TEXT, fetched_at TEXT NOT NULL ) """ ) _add_column_if_missing(cursor, "eiendom_units", "content_hash", "TEXT") cursor.execute( """ CREATE TABLE IF NOT EXISTS similar_units ( id INTEGER PRIMARY KEY AUTOINCREMENT, unit_code TEXT NOT NULL, listing_status TEXT NOT NULL, payload TEXT NOT NULL, content_hash TEXT, fetched_at TEXT NOT NULL ) """ ) _add_column_if_missing(cursor, "similar_units", "content_hash", "TEXT") cursor.execute( """ CREATE TABLE IF NOT EXISTS cache_meta ( key TEXT PRIMARY KEY, value TEXT NOT NULL, content_hash TEXT, expires_at TEXT ) """ ) _add_column_if_missing(cursor, "cache_meta", "content_hash", "TEXT") cursor.execute( """ CREATE TABLE IF NOT EXISTS analysis_cache ( finnkode TEXT PRIMARY KEY, deps_hash TEXT NOT NULL, payload TEXT NOT NULL, computed_at TEXT NOT NULL ) """ ) conn.commit() return conn def _add_column_if_missing( cursor: sqlite3.Cursor, table: str, column: str, col_type: str ) -> None: """ALTER TABLE … ADD COLUMN is idempotent via this guard.""" cursor.execute(f"PRAGMA table_info({table})") existing = {row["name"] for row in cursor.fetchall()} if column not in existing: cursor.execute(f"ALTER TABLE {table} ADD COLUMN {column} {col_type}") # --------------------------------------------------------------------------- # Generic cache_meta helpers (search pages, search cards) # --------------------------------------------------------------------------- def cache_get(conn: sqlite3.Connection, key: str) -> dict[str, Any] | None: cursor = conn.cursor() cursor.execute("SELECT value, expires_at FROM cache_meta WHERE key = ?", (key,)) row = cursor.fetchone() if not row: return None expires_at = row["expires_at"] if expires_at and datetime.fromisoformat(expires_at) < datetime.now(UTC): cursor.execute("DELETE FROM cache_meta WHERE key = ?", (key,)) conn.commit() return None return json.loads(row["value"]) def cache_set( conn: sqlite3.Connection, key: str, payload: dict[str, Any], ttl_hours: int | None = None, ttl_minutes: int | None = None, ) -> str: """Store *payload* in cache_meta and return its content_hash.""" expires_at = None if ttl_minutes is not None: expires_at = (datetime.now(UTC) + timedelta(minutes=ttl_minutes)).isoformat() elif ttl_hours is not None: expires_at = (datetime.now(UTC) + timedelta(hours=ttl_hours)).isoformat() content_hash = compute_content_hash(payload) cursor = conn.cursor() cursor.execute( "INSERT OR REPLACE INTO cache_meta (key, value, content_hash, expires_at)" " VALUES (?, ?, ?, ?)", (key, json.dumps(payload, default=_json_default), content_hash, expires_at), ) conn.commit() return content_hash # --------------------------------------------------------------------------- # Search page / cards helpers # --------------------------------------------------------------------------- def save_search_page( conn: sqlite3.Connection, url: str, html: str, ttl_minutes: int = 60, ) -> str: """Cache raw HTML for a search page URL. Returns content_hash.""" return cache_set(conn, f"search_page:{url}", {"html": html}, ttl_minutes=ttl_minutes) def get_search_page(conn: sqlite3.Connection, url: str) -> str | None: payload = cache_get(conn, f"search_page:{url}") if not payload: return None return payload.get("html") def save_search_cards( conn: sqlite3.Connection, url: str, cards: list[FinnSearchCard], ttl_minutes: int = 60, ) -> str: """Cache parsed search cards. Returns content_hash.""" return cache_set( conn, f"search_cards:{url}", [card.model_dump(mode="json") for card in cards], ttl_minutes=ttl_minutes, ) def get_search_cards(conn: sqlite3.Connection, url: str) -> list[FinnSearchCard]: payload = cache_get(conn, f"search_cards:{url}") if not payload: return [] return [FinnSearchCard.model_validate(item) for item in payload] # --------------------------------------------------------------------------- # FinnAd # --------------------------------------------------------------------------- def save_finn_ad(conn: sqlite3.Connection, ad: FinnAd) -> tuple[str, bool]: """Persist *ad* to finn_ads. Returns ``(content_hash, changed)`` where ``changed=False`` means the remote payload is identical to what was already stored -- callers can use this to skip analysis recomputation. """ cursor = conn.cursor() payload = ad.model_dump(mode="json") new_hash = compute_content_hash(payload) fetched_at = ( ad.detail_fetched_at.isoformat() if ad.detail_fetched_at else datetime.now(UTC).isoformat() ) # Check existing hash before writing. cursor.execute( "SELECT content_hash FROM finn_ads WHERE finnkode = ?", (ad.finnkode,) ) row = cursor.fetchone() if row and row["content_hash"] == new_hash: logger.debug("finn_ad %s unchanged (hash match)", ad.finnkode) return new_hash, False cursor.execute( "INSERT OR REPLACE INTO finn_ads" " (finnkode, url, payload, content_hash, fetched_at)" " VALUES (?, ?, ?, ?, ?)", (ad.finnkode, ad.url, json.dumps(payload, default=_json_default), new_hash, fetched_at), ) conn.commit() logger.debug("finn_ad %s saved (hash=%s)", ad.finnkode, new_hash[:8]) return new_hash, True def get_finn_ad( conn: sqlite3.Connection, finnkode: str, ttl_hours: int | None = None ) -> FinnAd | None: cursor = conn.cursor() cursor.execute( "SELECT payload, fetched_at FROM finn_ads WHERE finnkode = ?", (finnkode,) ) row = cursor.fetchone() if not row: return None if ttl_hours is not None and not _is_fresh(row["fetched_at"], ttl_hours): return None return FinnAd.model_validate(json.loads(row["payload"])) def get_finn_ad_hash(conn: sqlite3.Connection, finnkode: str) -> str | None: """Return the stored content_hash for *finnkode*, or None if not cached.""" cursor = conn.cursor() cursor.execute( "SELECT content_hash FROM finn_ads WHERE finnkode = ?", (finnkode,) ) row = cursor.fetchone() return row["content_hash"] if row else None # --------------------------------------------------------------------------- # EiendomUnit # --------------------------------------------------------------------------- def save_eiendom_unit(conn: sqlite3.Connection, unit: EiendomUnit) -> tuple[str, bool]: """Persist *unit* to eiendom_units. Returns ``(content_hash, changed)``. """ cursor = conn.cursor() payload = unit.model_dump(mode="json") new_hash = compute_content_hash(payload) cursor.execute( "SELECT content_hash FROM eiendom_units WHERE unit_code = ?", (unit.unit_code,) ) row = cursor.fetchone() if row and row["content_hash"] == new_hash: logger.debug("eiendom_unit %s unchanged (hash match)", unit.unit_code) return new_hash, False cursor.execute( "INSERT OR REPLACE INTO eiendom_units" " (unit_code, payload, content_hash, fetched_at)" " VALUES (?, ?, ?, ?)", (unit.unit_code, json.dumps(payload, default=_json_default), new_hash, unit.fetched_at.isoformat()), ) conn.commit() logger.debug("eiendom_unit %s saved (hash=%s)", unit.unit_code, new_hash[:8]) return new_hash, True def get_eiendom_unit( conn: sqlite3.Connection, unit_code: str, ttl_hours: int | None = None, ) -> EiendomUnit | None: cursor = conn.cursor() cursor.execute( "SELECT payload, fetched_at FROM eiendom_units WHERE unit_code = ?", (unit_code,) ) row = cursor.fetchone() if not row: return None if ttl_hours is not None and not _is_fresh(row["fetched_at"], ttl_hours): return None return EiendomUnit.model_validate(json.loads(row["payload"])) def get_eiendom_unit_hash(conn: sqlite3.Connection, unit_code: str) -> str | None: """Return the stored content_hash for *unit_code*, or None if not cached.""" cursor = conn.cursor() cursor.execute( "SELECT content_hash FROM eiendom_units WHERE unit_code = ?", (unit_code,) ) row = cursor.fetchone() return row["content_hash"] if row else None # --------------------------------------------------------------------------- # SimilarUnits # --------------------------------------------------------------------------- def save_similar_units( conn: sqlite3.Connection, unit_code: str, listing_status: str, similar_units: list[SimilarUnit], ) -> tuple[str, bool]: """Persist *similar_units* for (unit_code, listing_status). Returns ``(content_hash, changed)``. """ cursor = conn.cursor() payload_list = [item.model_dump(mode="json") for item in similar_units] new_hash = compute_content_hash(payload_list) cursor.execute( "SELECT payload, content_hash FROM similar_units" " WHERE unit_code = ? AND listing_status = ?" " ORDER BY id DESC LIMIT 1", (unit_code, listing_status), ) row = cursor.fetchone() if row and row["content_hash"] == new_hash: logger.debug( "similar_units %s/%s unchanged (hash match)", unit_code, listing_status ) return new_hash, False cursor.execute( "INSERT INTO similar_units" " (unit_code, listing_status, payload, content_hash, fetched_at)" " VALUES (?, ?, ?, ?, ?)", ( unit_code, listing_status, json.dumps(payload_list, default=_json_default), new_hash, datetime.now(UTC).isoformat(), ), ) conn.commit() logger.debug( "similar_units %s/%s saved (hash=%s)", unit_code, listing_status, new_hash[:8] ) return new_hash, True def get_similar_units( conn: sqlite3.Connection, unit_code: str, listing_status: str, ttl_hours: int | None = None, ) -> list[SimilarUnit]: cursor = conn.cursor() cursor.execute( "SELECT payload, fetched_at FROM similar_units" " WHERE unit_code = ? AND listing_status = ?" " ORDER BY id DESC LIMIT 1", (unit_code, listing_status), ) row = cursor.fetchone() if not row: return [] if ttl_hours is not None and not _is_fresh(row["fetched_at"], ttl_hours): return [] return [SimilarUnit.model_validate(item) for item in json.loads(row["payload"])] def get_similar_units_hash( conn: sqlite3.Connection, unit_code: str, listing_status: str ) -> str | None: """Return the stored content_hash for (unit_code, listing_status), or None.""" cursor = conn.cursor() cursor.execute( "SELECT content_hash FROM similar_units" " WHERE unit_code = ? AND listing_status = ?" " ORDER BY id DESC LIMIT 1", (unit_code, listing_status), ) row = cursor.fetchone() return row["content_hash"] if row else None # --------------------------------------------------------------------------- # Analysis cache # --------------------------------------------------------------------------- def get_analysis( conn: sqlite3.Connection, finnkode: str, deps_hash: str ) -> dict[str, Any] | None: """Return cached analysis for *finnkode* if deps_hash still matches. ``deps_hash`` encodes the combined hashes of the ad, eiendom unit, and comps that were used to produce the analysis. Any change to underlying data produces a different deps_hash and the cache is considered stale. """ cursor = conn.cursor() cursor.execute( "SELECT payload, deps_hash FROM analysis_cache WHERE finnkode = ?", (finnkode,), ) row = cursor.fetchone() if not row: return None if row["deps_hash"] != deps_hash: logger.debug( "analysis_cache miss for %s (deps_hash changed %s→%s)", finnkode, row["deps_hash"][:8], deps_hash[:8], ) return None logger.debug("analysis_cache hit for %s", finnkode) return json.loads(row["payload"]) def _json_default(obj: Any) -> Any: """Fallback serialiser for json.dumps. Converts datetime/date → ISO string; anything else → repr string. Means save_analysis never raises TypeError regardless of what scoring or model_dump() emits. """ if hasattr(obj, "isoformat"): return obj.isoformat() return repr(obj) def save_analysis( conn: sqlite3.Connection, finnkode: str, deps_hash: str, result: dict[str, Any], ) -> None: """Store an analysis result keyed by (finnkode, deps_hash).""" cursor = conn.cursor() cursor.execute( "INSERT OR REPLACE INTO analysis_cache" " (finnkode, deps_hash, payload, computed_at)" " VALUES (?, ?, ?, ?)", (finnkode, deps_hash, json.dumps(result, default=_json_default), datetime.now(UTC).isoformat()), ) conn.commit() logger.debug("analysis_cache saved for %s (deps_hash=%s)", finnkode, deps_hash[:8]) def invalidate_analysis(conn: sqlite3.Connection, finnkode: str) -> None: """Remove any cached analysis for *finnkode* (call after raw data changes).""" conn.cursor().execute( "DELETE FROM analysis_cache WHERE finnkode = ?", (finnkode,) ) conn.commit() # --------------------------------------------------------------------------- # Internal helpers # --------------------------------------------------------------------------- def _is_fresh(fetched_at: str, ttl_hours: int | None) -> bool: if ttl_hours is None: return True return datetime.fromisoformat(fetched_at) >= datetime.now(UTC) - timedelta( hours=ttl_hours )