Implement caching strategy for analysis results and enhance hash-aware data fetching

This commit is contained in:
Ole
2026-05-26 13:54:49 +00:00
parent 46fd22c277
commit 22f30ebf00
3 changed files with 557 additions and 122 deletions
+114 -16
View File
@@ -1,9 +1,37 @@
"""Orchestration for FINN search + Eiendom.no enrichment + scoring.""" """Orchestration for FINN search + Eiendom.no enrichment + scoring.
Analysis caching
----------------
``analyze_ad`` caches its result under a ``deps_hash`` that is the
SHA-256 of the combined raw payloads of the ad, the eiendom unit, and the
comparable sales used to produce it. On a subsequent call the function:
1. Reads the three raw content hashes from the DB (no deserialisation).
2. Derives the same deps_hash from those hashes.
3. Checks analysis_cache for a matching (finnkode, deps_hash) row.
4. Returns the cached result immediately if found.
5. Otherwise runs the full scoring pipeline and writes to analysis_cache.
The cached result is invalidated automatically the moment any piece of
underlying data changes, because the deps_hash will differ.
"""
import logging import logging
from . import ad as ad_module from . import ad as ad_module
from . import cache, eiendom_no, scoring, search from . import cache, eiendom_no, scoring, search
from .cache import (
combine_hashes,
get_analysis,
get_eiendom_unit_hash,
get_finn_ad_hash,
get_similar_units_hash,
invalidate_analysis,
save_analysis,
save_eiendom_unit,
save_finn_ad,
save_similar_units,
)
from .config import ( from .config import (
EIENDOM_NO_CACHE_TTL_HOURS, EIENDOM_NO_CACHE_TTL_HOURS,
FINN_CACHE_PATH, FINN_CACHE_PATH,
@@ -86,38 +114,93 @@ def _build_ad_summary(
} }
def _compute_deps_hash(
conn,
finnkode: str,
unit_code: str | None,
listing_status: str = "RECENTLY_SOLD",
) -> str:
"""Derive a deps_hash from the three stored raw content hashes.
Reads only the hash column -- no payload deserialisation.
"""
ad_hash = get_finn_ad_hash(conn, finnkode)
unit_hash = get_eiendom_unit_hash(conn, unit_code) if unit_code else None
comps_hash = (
get_similar_units_hash(conn, unit_code, listing_status) if unit_code else None
)
return combine_hashes(ad_hash, unit_hash, comps_hash)
async def analyze_ad( async def analyze_ad(
finn_ad: FinnAd, finn_ad: FinnAd,
unit_code: str | None = None, unit_code: str | None = None,
) -> dict: ) -> dict:
"""Enrich a FinnAd and compute score summary.""" """Enrich a FinnAd and compute score summary.
Result is cached in analysis_cache keyed by deps_hash. Recomputation
happens only when the underlying raw data has actually changed.
"""
conn = cache.init_db(FINN_CACHE_PATH) conn = cache.init_db(FINN_CACHE_PATH)
# ------------------------------------------------------------------
# 1. Ensure the ad is in the DB so we have a stable hash to key on.
# ------------------------------------------------------------------
ad_hash, ad_changed = save_finn_ad(conn, finn_ad)
# ------------------------------------------------------------------
# 2. Fetch / refresh Eiendom.no data (cache-aware).
# ------------------------------------------------------------------
enriched: EiendomUnit | None = None enriched: EiendomUnit | None = None
similar_units: list[SimilarUnit] = [] unit_hash_changed = False
if unit_code: if unit_code:
enriched = cache.get_eiendom_unit(conn, unit_code) enriched = cache.get_eiendom_unit(conn, unit_code)
if enriched is None: if enriched is None:
enriched = await eiendom_no.enrich_ad_with_eiendom_no(finn_ad, unit_code) enriched = await eiendom_no.enrich_ad_with_eiendom_no(finn_ad, unit_code)
if enriched is not None: if enriched is not None:
cache.save_eiendom_unit(conn, enriched) _, unit_hash_changed = save_eiendom_unit(conn, enriched)
# If already cached, unit_hash_changed stays False -- no new write.
# ------------------------------------------------------------------
# 3. Fetch / refresh similar units (cache-aware).
# ------------------------------------------------------------------
similar_units: list[SimilarUnit] = []
comps_hash_changed = False
if enriched: if enriched:
# Check cache for similar units first. The cache uses (unit_code,
# listing_status) as the key, so we must look it up by unit_code.
similar_units = cache.get_similar_units( similar_units = cache.get_similar_units(
conn, enriched.unit_code, "RECENTLY_SOLD", ttl_hours=EIENDOM_NO_CACHE_TTL_HOURS conn, enriched.unit_code, "RECENTLY_SOLD", ttl_hours=EIENDOM_NO_CACHE_TTL_HOURS
) )
if not similar_units: if not similar_units:
# Cache miss: build the vector and fetch fresh from Eiendom.no
# (unit_vector field from get_unit is None; build locally)
vector = enriched.unit_vector or eiendom_no.build_unit_vector(enriched) vector = enriched.unit_vector or eiendom_no.build_unit_vector(enriched)
if vector: if vector:
similar_units = await eiendom_no.get_similar_units(vector) similar_units = await eiendom_no.get_similar_units(vector)
# Save to cache
if similar_units: if similar_units:
cache.save_similar_units(conn, enriched.unit_code, "RECENTLY_SOLD", similar_units) _, comps_hash_changed = save_similar_units(
conn, enriched.unit_code, "RECENTLY_SOLD", similar_units
)
# ------------------------------------------------------------------
# 4. Derive deps_hash and check analysis_cache.
# ------------------------------------------------------------------
deps_hash = _compute_deps_hash(conn, finn_ad.finnkode, unit_code)
cached_analysis = get_analysis(conn, finn_ad.finnkode, deps_hash)
if cached_analysis is not None:
logger.debug("analysis_cache hit for %s -- skipping recompute", finn_ad.finnkode)
return cached_analysis
# ------------------------------------------------------------------
# 5. Cache miss: compute, store, return.
# ------------------------------------------------------------------
logger.debug(
"analysis_cache miss for %s (ad_changed=%s, unit_changed=%s, comps_changed=%s)",
finn_ad.finnkode,
ad_changed,
unit_hash_changed,
comps_hash_changed,
)
scores = scoring.score_ad(finn_ad, enriched, similar_units) scores = scoring.score_ad(finn_ad, enriched, similar_units)
categories = scoring.classify_ad(scores) categories = scoring.classify_ad(scores)
@@ -130,10 +213,16 @@ async def analyze_ad(
"score": scores, "score": scores,
"categories": categories, "categories": categories,
"summary": summary, "summary": summary,
"eiendom_unit": enriched.model_dump() if enriched else None, "eiendom_unit": enriched.model_dump(mode="json") if enriched else None,
"similar_units": [unit.model_dump() for unit in similar_units], "similar_units": [unit.model_dump(mode="json") for unit in similar_units],
} }
cache.save_finn_ad(conn, finn_ad)
# Round-trip through JSON to guarantee all values are serialisable
# (catches any datetime that survives model_dump, e.g. from scoring).
import json as _json
result = _json.loads(_json.dumps(result, default=str))
save_analysis(conn, finn_ad.finnkode, deps_hash, result)
return result return result
@@ -166,7 +255,13 @@ async def analyze_search(
client=None, client=None,
use_cache: bool = True, use_cache: bool = True,
) -> dict: ) -> dict:
"""Analyze a FINN search URL and enrich matching listings.""" """Analyze a FINN search URL and enrich matching listings.
Search-level results are NOT cached as a whole (the search page itself
is cached at the HTML level). Individual ad analyses ARE cached via
``analyze_ad``, so re-running a search only re-scores ads whose
underlying data has changed.
"""
conn = cache.init_db(FINN_CACHE_PATH) conn = cache.init_db(FINN_CACHE_PATH)
cards = await search.fetch_search_pages( cards = await search.fetch_search_pages(
search_url, search_url,
@@ -177,6 +272,7 @@ async def analyze_search(
results = [] results = []
enriched_count = 0 enriched_count = 0
skipped_count = 0 skipped_count = 0
cache_hits = 0
if fetch_details: if fetch_details:
for card in cards[:detail_limit]: for card in cards[:detail_limit]:
@@ -200,12 +296,14 @@ async def analyze_search(
if result.get("eiendom_unit"): if result.get("eiendom_unit"):
enriched_count += 1 enriched_count += 1
# Track analysis cache hits via the absence of recompute logging
# (the flag is not propagated up here; rely on debug logs).
results.append(result) results.append(result)
results.sort(key=lambda item: item["score"].get("total", 0.0), reverse=True) results.sort(key=lambda item: item["score"].get("total", 0.0), reverse=True)
return { return {
"search_url": search_url, "search_url": search_url,
"search_cards": [card.model_dump() for card in cards], "search_cards": [card.model_dump(mode="json") for card in cards],
"analysis": results, "analysis": results,
"summary": { "summary": {
"total_listings": len(cards), "total_listings": len(cards),
+330 -38
View File
@@ -1,5 +1,29 @@
"""SQLite cache and persistence for FINN and Eiendom.no data.""" """SQLite cache and persistence for FINN and Eiendom.no data.
Caching strategy
----------------
Raw data (finn_ads, eiendom_units, similar_units)
Stored with a SHA-256 content_hash of the serialised payload.
On write: compare incoming hash to stored hash. If equal the remote
data has not changed -- the row is left untouched and the caller gets
back ``changed=False``, which preserves a valid analysis_cache entry.
Analysis results (analysis_cache)
Keyed by ``(finnkode, deps_hash)`` where deps_hash = SHA-256 of the
combined raw payloads of the ad, eiendom unit, and comps that were used
to produce the result. A cache hit is only valid when the deps_hash
still matches, i.e. none of the underlying data has changed.
This means analysis is re-run *only* when remote data actually changes,
not on every TTL tick.
Search pages / cards (cache_meta)
Still TTL-based -- these change frequently and a content-hash over a
full HTML page is cheap but the semantics of "changed" are less clear
(ads added/removed vs. cosmetic HTML tweaks). Hash is stored anyway so
callers can detect real list changes if desired.
"""
import hashlib
import json import json
import logging import logging
import sqlite3 import sqlite3
@@ -12,6 +36,32 @@ from .models import EiendomUnit, FinnAd, FinnSearchCard, SimilarUnit
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Hashing helpers
# ---------------------------------------------------------------------------
def compute_content_hash(payload: Any) -> str:
"""Return a stable SHA-256 hex digest of *payload*.
*payload* can be a dict, list, or any JSON-serialisable value.
Keys are sorted so that insertion order does not affect the hash.
"""
serialised = json.dumps(payload, sort_keys=True, default=str)
return hashlib.sha256(serialised.encode()).hexdigest()
def combine_hashes(*hashes: str | None) -> str:
"""Combine multiple content hashes into one deterministic deps_hash."""
combined = "|".join(h or "" for h in hashes)
return hashlib.sha256(combined.encode()).hexdigest()
# ---------------------------------------------------------------------------
# Connection / schema
# ---------------------------------------------------------------------------
def get_connection(path: str | None = None) -> sqlite3.Connection: def get_connection(path: str | None = None) -> sqlite3.Connection:
db_path = path or FINN_CACHE_PATH db_path = path or FINN_CACHE_PATH
conn = sqlite3.connect(str(db_path), detect_types=sqlite3.PARSE_DECLTYPES) conn = sqlite3.connect(str(db_path), detect_types=sqlite3.PARSE_DECLTYPES)
@@ -22,25 +72,33 @@ def get_connection(path: str | None = None) -> sqlite3.Connection:
def init_db(path: str | None = None) -> sqlite3.Connection: def init_db(path: str | None = None) -> sqlite3.Connection:
conn = get_connection(path) conn = get_connection(path)
cursor = conn.cursor() cursor = conn.cursor()
cursor.execute( cursor.execute(
""" """
CREATE TABLE IF NOT EXISTS finn_ads ( CREATE TABLE IF NOT EXISTS finn_ads (
finnkode TEXT PRIMARY KEY, finnkode TEXT PRIMARY KEY,
url TEXT, url TEXT,
payload TEXT NOT NULL, payload TEXT NOT NULL,
content_hash TEXT,
fetched_at TEXT NOT NULL fetched_at TEXT NOT NULL
) )
""" """
) )
# Migration: add content_hash column if the table already existed without it.
_add_column_if_missing(cursor, "finn_ads", "content_hash", "TEXT")
cursor.execute( cursor.execute(
""" """
CREATE TABLE IF NOT EXISTS eiendom_units ( CREATE TABLE IF NOT EXISTS eiendom_units (
unit_code TEXT PRIMARY KEY, unit_code TEXT PRIMARY KEY,
payload TEXT NOT NULL, payload TEXT NOT NULL,
content_hash TEXT,
fetched_at TEXT NOT NULL fetched_at TEXT NOT NULL
) )
""" """
) )
_add_column_if_missing(cursor, "eiendom_units", "content_hash", "TEXT")
cursor.execute( cursor.execute(
""" """
CREATE TABLE IF NOT EXISTS similar_units ( CREATE TABLE IF NOT EXISTS similar_units (
@@ -48,36 +106,66 @@ def init_db(path: str | None = None) -> sqlite3.Connection:
unit_code TEXT NOT NULL, unit_code TEXT NOT NULL,
listing_status TEXT NOT NULL, listing_status TEXT NOT NULL,
payload TEXT NOT NULL, payload TEXT NOT NULL,
content_hash TEXT,
fetched_at TEXT NOT NULL fetched_at TEXT NOT NULL
) )
""" """
) )
_add_column_if_missing(cursor, "similar_units", "content_hash", "TEXT")
cursor.execute( cursor.execute(
""" """
CREATE TABLE IF NOT EXISTS cache_meta ( CREATE TABLE IF NOT EXISTS cache_meta (
key TEXT PRIMARY KEY, key TEXT PRIMARY KEY,
value TEXT NOT NULL, value TEXT NOT NULL,
content_hash TEXT,
expires_at TEXT expires_at TEXT
) )
""" """
) )
_add_column_if_missing(cursor, "cache_meta", "content_hash", "TEXT")
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS analysis_cache (
finnkode TEXT PRIMARY KEY,
deps_hash TEXT NOT NULL,
payload TEXT NOT NULL,
computed_at TEXT NOT NULL
)
"""
)
conn.commit() conn.commit()
return conn return conn
def _add_column_if_missing(
cursor: sqlite3.Cursor, table: str, column: str, col_type: str
) -> None:
"""ALTER TABLE … ADD COLUMN is idempotent via this guard."""
cursor.execute(f"PRAGMA table_info({table})")
existing = {row["name"] for row in cursor.fetchall()}
if column not in existing:
cursor.execute(f"ALTER TABLE {table} ADD COLUMN {column} {col_type}")
# ---------------------------------------------------------------------------
# Generic cache_meta helpers (search pages, search cards)
# ---------------------------------------------------------------------------
def cache_get(conn: sqlite3.Connection, key: str) -> dict[str, Any] | None: def cache_get(conn: sqlite3.Connection, key: str) -> dict[str, Any] | None:
cursor = conn.cursor() cursor = conn.cursor()
cursor.execute("SELECT value, expires_at FROM cache_meta WHERE key = ?", (key,)) cursor.execute("SELECT value, expires_at FROM cache_meta WHERE key = ?", (key,))
row = cursor.fetchone() row = cursor.fetchone()
if not row: if not row:
return None return None
expires_at = row["expires_at"] expires_at = row["expires_at"]
if expires_at and datetime.fromisoformat(expires_at) < datetime.now(UTC): if expires_at and datetime.fromisoformat(expires_at) < datetime.now(UTC):
cursor.execute("DELETE FROM cache_meta WHERE key = ?", (key,)) cursor.execute("DELETE FROM cache_meta WHERE key = ?", (key,))
conn.commit() conn.commit()
return None return None
return json.loads(row["value"]) return json.loads(row["value"])
@@ -87,24 +175,28 @@ def cache_set(
payload: dict[str, Any], payload: dict[str, Any],
ttl_hours: int | None = None, ttl_hours: int | None = None,
ttl_minutes: int | None = None, ttl_minutes: int | None = None,
) -> None: ) -> str:
"""Store *payload* in cache_meta and return its content_hash."""
expires_at = None expires_at = None
if ttl_minutes is not None: if ttl_minutes is not None:
expires_at = (datetime.now(UTC) + timedelta(minutes=ttl_minutes)).isoformat() expires_at = (datetime.now(UTC) + timedelta(minutes=ttl_minutes)).isoformat()
elif ttl_hours is not None: elif ttl_hours is not None:
expires_at = (datetime.now(UTC) + timedelta(hours=ttl_hours)).isoformat() expires_at = (datetime.now(UTC) + timedelta(hours=ttl_hours)).isoformat()
content_hash = compute_content_hash(payload)
cursor = conn.cursor() cursor = conn.cursor()
cursor.execute( cursor.execute(
"INSERT OR REPLACE INTO cache_meta (key, value, expires_at) VALUES (?, ?, ?)", "INSERT OR REPLACE INTO cache_meta (key, value, content_hash, expires_at)"
(key, json.dumps(payload), expires_at), " VALUES (?, ?, ?, ?)",
(key, json.dumps(payload, default=_json_default), content_hash, expires_at),
) )
conn.commit() conn.commit()
return content_hash
def _is_fresh(fetched_at: str, ttl_hours: int | None) -> bool: # ---------------------------------------------------------------------------
if ttl_hours is None: # Search page / cards helpers
return True # ---------------------------------------------------------------------------
return datetime.fromisoformat(fetched_at) >= datetime.now(UTC) - timedelta(hours=ttl_hours)
def save_search_page( def save_search_page(
@@ -112,8 +204,9 @@ def save_search_page(
url: str, url: str,
html: str, html: str,
ttl_minutes: int = 60, ttl_minutes: int = 60,
) -> None: ) -> str:
cache_set(conn, f"search_page:{url}", {"html": html}, ttl_minutes=ttl_minutes) """Cache raw HTML for a search page URL. Returns content_hash."""
return cache_set(conn, f"search_page:{url}", {"html": html}, ttl_minutes=ttl_minutes)
def get_search_page(conn: sqlite3.Connection, url: str) -> str | None: def get_search_page(conn: sqlite3.Connection, url: str) -> str | None:
@@ -128,8 +221,9 @@ def save_search_cards(
url: str, url: str,
cards: list[FinnSearchCard], cards: list[FinnSearchCard],
ttl_minutes: int = 60, ttl_minutes: int = 60,
) -> None: ) -> str:
cache_set( """Cache parsed search cards. Returns content_hash."""
return cache_set(
conn, conn,
f"search_cards:{url}", f"search_cards:{url}",
[card.model_dump(mode="json") for card in cards], [card.model_dump(mode="json") for card in cards],
@@ -144,28 +238,54 @@ def get_search_cards(conn: sqlite3.Connection, url: str) -> list[FinnSearchCard]
return [FinnSearchCard.model_validate(item) for item in payload] return [FinnSearchCard.model_validate(item) for item in payload]
def save_finn_ad(conn: sqlite3.Connection, ad: FinnAd) -> None: # ---------------------------------------------------------------------------
# FinnAd
# ---------------------------------------------------------------------------
def save_finn_ad(conn: sqlite3.Connection, ad: FinnAd) -> tuple[str, bool]:
"""Persist *ad* to finn_ads.
Returns ``(content_hash, changed)`` where ``changed=False`` means the
remote payload is identical to what was already stored -- callers can
use this to skip analysis recomputation.
"""
cursor = conn.cursor() cursor = conn.cursor()
payload = ad.model_dump(mode="json") payload = ad.model_dump(mode="json")
cursor.execute( new_hash = compute_content_hash(payload)
"INSERT OR REPLACE INTO finn_ads (finnkode, url, payload, fetched_at) VALUES (?, ?, ?, ?)", fetched_at = (
(
ad.finnkode,
ad.url,
json.dumps(payload),
ad.detail_fetched_at.isoformat() ad.detail_fetched_at.isoformat()
if ad.detail_fetched_at if ad.detail_fetched_at
else datetime.now(UTC).isoformat(), else datetime.now(UTC).isoformat()
), )
# Check existing hash before writing.
cursor.execute(
"SELECT content_hash FROM finn_ads WHERE finnkode = ?", (ad.finnkode,)
)
row = cursor.fetchone()
if row and row["content_hash"] == new_hash:
logger.debug("finn_ad %s unchanged (hash match)", ad.finnkode)
return new_hash, False
cursor.execute(
"INSERT OR REPLACE INTO finn_ads"
" (finnkode, url, payload, content_hash, fetched_at)"
" VALUES (?, ?, ?, ?, ?)",
(ad.finnkode, ad.url, json.dumps(payload, default=_json_default), new_hash, fetched_at),
) )
conn.commit() conn.commit()
logger.debug("finn_ad %s saved (hash=%s)", ad.finnkode, new_hash[:8])
return new_hash, True
def get_finn_ad( def get_finn_ad(
conn: sqlite3.Connection, finnkode: str, ttl_hours: int | None = None conn: sqlite3.Connection, finnkode: str, ttl_hours: int | None = None
) -> FinnAd | None: ) -> FinnAd | None:
cursor = conn.cursor() cursor = conn.cursor()
cursor.execute("SELECT payload, fetched_at FROM finn_ads WHERE finnkode = ?", (finnkode,)) cursor.execute(
"SELECT payload, fetched_at FROM finn_ads WHERE finnkode = ?", (finnkode,)
)
row = cursor.fetchone() row = cursor.fetchone()
if not row: if not row:
return None return None
@@ -174,13 +294,47 @@ def get_finn_ad(
return FinnAd.model_validate(json.loads(row["payload"])) return FinnAd.model_validate(json.loads(row["payload"]))
def save_eiendom_unit(conn: sqlite3.Connection, unit: EiendomUnit) -> None: def get_finn_ad_hash(conn: sqlite3.Connection, finnkode: str) -> str | None:
"""Return the stored content_hash for *finnkode*, or None if not cached."""
cursor = conn.cursor() cursor = conn.cursor()
cursor.execute( cursor.execute(
"INSERT OR REPLACE INTO eiendom_units (unit_code, payload, fetched_at) VALUES (?, ?, ?)", "SELECT content_hash FROM finn_ads WHERE finnkode = ?", (finnkode,)
(unit.unit_code, json.dumps(unit.model_dump(mode="json")), unit.fetched_at.isoformat()), )
row = cursor.fetchone()
return row["content_hash"] if row else None
# ---------------------------------------------------------------------------
# EiendomUnit
# ---------------------------------------------------------------------------
def save_eiendom_unit(conn: sqlite3.Connection, unit: EiendomUnit) -> tuple[str, bool]:
"""Persist *unit* to eiendom_units.
Returns ``(content_hash, changed)``.
"""
cursor = conn.cursor()
payload = unit.model_dump(mode="json")
new_hash = compute_content_hash(payload)
cursor.execute(
"SELECT content_hash FROM eiendom_units WHERE unit_code = ?", (unit.unit_code,)
)
row = cursor.fetchone()
if row and row["content_hash"] == new_hash:
logger.debug("eiendom_unit %s unchanged (hash match)", unit.unit_code)
return new_hash, False
cursor.execute(
"INSERT OR REPLACE INTO eiendom_units"
" (unit_code, payload, content_hash, fetched_at)"
" VALUES (?, ?, ?, ?)",
(unit.unit_code, json.dumps(payload, default=_json_default), new_hash, unit.fetched_at.isoformat()),
) )
conn.commit() conn.commit()
logger.debug("eiendom_unit %s saved (hash=%s)", unit.unit_code, new_hash[:8])
return new_hash, True
def get_eiendom_unit( def get_eiendom_unit(
@@ -190,8 +344,7 @@ def get_eiendom_unit(
) -> EiendomUnit | None: ) -> EiendomUnit | None:
cursor = conn.cursor() cursor = conn.cursor()
cursor.execute( cursor.execute(
"SELECT payload, fetched_at FROM eiendom_units WHERE unit_code = ?", "SELECT payload, fetched_at FROM eiendom_units WHERE unit_code = ?", (unit_code,)
(unit_code,),
) )
row = cursor.fetchone() row = cursor.fetchone()
if not row: if not row:
@@ -201,23 +354,65 @@ def get_eiendom_unit(
return EiendomUnit.model_validate(json.loads(row["payload"])) return EiendomUnit.model_validate(json.loads(row["payload"]))
def get_eiendom_unit_hash(conn: sqlite3.Connection, unit_code: str) -> str | None:
"""Return the stored content_hash for *unit_code*, or None if not cached."""
cursor = conn.cursor()
cursor.execute(
"SELECT content_hash FROM eiendom_units WHERE unit_code = ?", (unit_code,)
)
row = cursor.fetchone()
return row["content_hash"] if row else None
# ---------------------------------------------------------------------------
# SimilarUnits
# ---------------------------------------------------------------------------
def save_similar_units( def save_similar_units(
conn: sqlite3.Connection, conn: sqlite3.Connection,
unit_code: str, unit_code: str,
listing_status: str, listing_status: str,
similar_units: list[SimilarUnit], similar_units: list[SimilarUnit],
) -> None: ) -> tuple[str, bool]:
"""Persist *similar_units* for (unit_code, listing_status).
Returns ``(content_hash, changed)``.
"""
cursor = conn.cursor() cursor = conn.cursor()
payload = json.dumps([item.model_dump(mode="json") for item in similar_units]) payload_list = [item.model_dump(mode="json") for item in similar_units]
new_hash = compute_content_hash(payload_list)
cursor.execute(
"SELECT payload, content_hash FROM similar_units"
" WHERE unit_code = ? AND listing_status = ?"
" ORDER BY id DESC LIMIT 1",
(unit_code, listing_status),
)
row = cursor.fetchone()
if row and row["content_hash"] == new_hash:
logger.debug(
"similar_units %s/%s unchanged (hash match)", unit_code, listing_status
)
return new_hash, False
cursor.execute( cursor.execute(
(
"INSERT INTO similar_units" "INSERT INTO similar_units"
" (unit_code, listing_status, payload, fetched_at)" " (unit_code, listing_status, payload, content_hash, fetched_at)"
" VALUES (?, ?, ?, ?)" " VALUES (?, ?, ?, ?, ?)",
(
unit_code,
listing_status,
json.dumps(payload_list, default=_json_default),
new_hash,
datetime.now(UTC).isoformat(),
), ),
(unit_code, listing_status, payload, datetime.now(UTC).isoformat()),
) )
conn.commit() conn.commit()
logger.debug(
"similar_units %s/%s saved (hash=%s)", unit_code, listing_status, new_hash[:8]
)
return new_hash, True
def get_similar_units( def get_similar_units(
@@ -228,11 +423,9 @@ def get_similar_units(
) -> list[SimilarUnit]: ) -> list[SimilarUnit]:
cursor = conn.cursor() cursor = conn.cursor()
cursor.execute( cursor.execute(
(
"SELECT payload, fetched_at FROM similar_units" "SELECT payload, fetched_at FROM similar_units"
" WHERE unit_code = ? AND listing_status = ?" " WHERE unit_code = ? AND listing_status = ?"
" ORDER BY id DESC LIMIT 1" " ORDER BY id DESC LIMIT 1",
),
(unit_code, listing_status), (unit_code, listing_status),
) )
row = cursor.fetchone() row = cursor.fetchone()
@@ -241,3 +434,102 @@ def get_similar_units(
if ttl_hours is not None and not _is_fresh(row["fetched_at"], ttl_hours): if ttl_hours is not None and not _is_fresh(row["fetched_at"], ttl_hours):
return [] return []
return [SimilarUnit.model_validate(item) for item in json.loads(row["payload"])] return [SimilarUnit.model_validate(item) for item in json.loads(row["payload"])]
def get_similar_units_hash(
conn: sqlite3.Connection, unit_code: str, listing_status: str
) -> str | None:
"""Return the stored content_hash for (unit_code, listing_status), or None."""
cursor = conn.cursor()
cursor.execute(
"SELECT content_hash FROM similar_units"
" WHERE unit_code = ? AND listing_status = ?"
" ORDER BY id DESC LIMIT 1",
(unit_code, listing_status),
)
row = cursor.fetchone()
return row["content_hash"] if row else None
# ---------------------------------------------------------------------------
# Analysis cache
# ---------------------------------------------------------------------------
def get_analysis(
conn: sqlite3.Connection, finnkode: str, deps_hash: str
) -> dict[str, Any] | None:
"""Return cached analysis for *finnkode* if deps_hash still matches.
``deps_hash`` encodes the combined hashes of the ad, eiendom unit, and
comps that were used to produce the analysis. Any change to underlying
data produces a different deps_hash and the cache is considered stale.
"""
cursor = conn.cursor()
cursor.execute(
"SELECT payload, deps_hash FROM analysis_cache WHERE finnkode = ?",
(finnkode,),
)
row = cursor.fetchone()
if not row:
return None
if row["deps_hash"] != deps_hash:
logger.debug(
"analysis_cache miss for %s (deps_hash changed %s%s)",
finnkode,
row["deps_hash"][:8],
deps_hash[:8],
)
return None
logger.debug("analysis_cache hit for %s", finnkode)
return json.loads(row["payload"])
def _json_default(obj: Any) -> Any:
"""Fallback serialiser for json.dumps.
Converts datetime/date → ISO string; anything else → repr string.
Means save_analysis never raises TypeError regardless of what scoring
or model_dump() emits.
"""
if hasattr(obj, "isoformat"):
return obj.isoformat()
return repr(obj)
def save_analysis(
conn: sqlite3.Connection,
finnkode: str,
deps_hash: str,
result: dict[str, Any],
) -> None:
"""Store an analysis result keyed by (finnkode, deps_hash)."""
cursor = conn.cursor()
cursor.execute(
"INSERT OR REPLACE INTO analysis_cache"
" (finnkode, deps_hash, payload, computed_at)"
" VALUES (?, ?, ?, ?)",
(finnkode, deps_hash, json.dumps(result, default=_json_default), datetime.now(UTC).isoformat()),
)
conn.commit()
logger.debug("analysis_cache saved for %s (deps_hash=%s)", finnkode, deps_hash[:8])
def invalidate_analysis(conn: sqlite3.Connection, finnkode: str) -> None:
"""Remove any cached analysis for *finnkode* (call after raw data changes)."""
conn.cursor().execute(
"DELETE FROM analysis_cache WHERE finnkode = ?", (finnkode,)
)
conn.commit()
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
def _is_fresh(fetched_at: str, ttl_hours: int | None) -> bool:
if ttl_hours is None:
return True
return datetime.fromisoformat(fetched_at) >= datetime.now(UTC) - timedelta(
hours=ttl_hours
)
+88 -43
View File
@@ -1,4 +1,21 @@
"""Service layer for cache-aware fetching of FINN ads and Eiendom.no units.""" """Service layer for cache-aware fetching of FINN ads and Eiendom.no units.
Hash-aware fetch pattern
------------------------
Every ``get_or_fetch_*`` function follows the same contract:
1. TTL check -- if cached row is fresh enough, return it directly.
2. Remote fetch -- if TTL expired (or force_refresh), fetch from network.
3. Hash check -- compare incoming payload hash to stored hash.
If equal the remote data has not changed; skip the DB write so that
the analysis_cache entry for this finnkode remains valid.
4. Write + invalidate -- if hash differs, persist the new row and
delete any cached analysis (it will be recomputed on next call to
``analyze_ad``).
This means analysis results survive TTL resets as long as the remote
data has not actually changed.
"""
import logging import logging
from typing import Any from typing import Any
@@ -10,11 +27,12 @@ from .cache import (
get_finn_ad, get_finn_ad,
get_similar_units as get_cached_similar_units, get_similar_units as get_cached_similar_units,
init_db, init_db,
invalidate_analysis,
save_eiendom_unit, save_eiendom_unit,
save_finn_ad, save_finn_ad,
save_similar_units, save_similar_units,
) )
from .config import EIENDOM_NO_CACHE_TTL_HOURS, FINN_CACHE_PATH from .config import EIENDOM_NO_CACHE_TTL_HOURS, FINN_CACHE_PATH, FINN_CACHE_TTL_AD_HOURS
from .eiendom_no import ( from .eiendom_no import (
build_unit_vector, build_unit_vector,
decode_unit_vector, decode_unit_vector,
@@ -29,12 +47,25 @@ logger = logging.getLogger(__name__)
async def get_or_fetch_ad(finnkode: str, force_refresh: bool = False) -> FinnAd: async def get_or_fetch_ad(finnkode: str, force_refresh: bool = False) -> FinnAd:
"""Get FinnAd from cache or fetch fresh. Never returns None.""" """Get FinnAd from cache or fetch fresh. Never returns None.
On a TTL expiry or force_refresh the ad is re-fetched from FINN.
If the remote payload hash matches the stored hash the DB row is
NOT updated, so analysis_cache entries for this finnkode stay valid.
If the hash differs the row is updated and any cached analysis is
invalidated.
"""
conn = init_db(FINN_CACHE_PATH) conn = init_db(FINN_CACHE_PATH)
ad = None if force_refresh else get_finn_ad(conn, finnkode, ttl_hours=24) ad = None if force_refresh else get_finn_ad(conn, finnkode, ttl_hours=FINN_CACHE_TTL_AD_HOURS)
if ad is None: if ad is not None:
return ad
# Cache miss or force_refresh: fetch from remote.
ad = await fetch_ad_details(finnkode) ad = await fetch_ad_details(finnkode)
save_finn_ad(conn, ad) _, changed = save_finn_ad(conn, ad)
if changed:
logger.debug("finn_ad %s updated -- invalidating analysis cache", finnkode)
invalidate_analysis(conn, finnkode)
return ad return ad
@@ -67,10 +98,13 @@ async def ensure_eiendom_unit_code(ad: FinnAd) -> str | None:
ad.eiendom_unit_code = unit.unit_code ad.eiendom_unit_code = unit.unit_code
conn = init_db(FINN_CACHE_PATH) conn = init_db(FINN_CACHE_PATH)
save_finn_ad(conn, ad) # persist backfill; do NOT cache `unit` here --
# the resolver returns a partial record (code + # Persist the backfilled unit_code. If the hash changes (new field),
# address + coords). The full unit comes from # invalidate the analysis cache so it is recomputed with the enriched ad.
# get_or_fetch_eiendom_unit -> get_unit(). _, changed = save_finn_ad(conn, ad)
if changed:
invalidate_analysis(conn, ad.finnkode)
logger.info("Resolved finnkode %s -> unit %s", ad.finnkode, unit.unit_code) logger.info("Resolved finnkode %s -> unit %s", ad.finnkode, unit.unit_code)
return ad.eiendom_unit_code return ad.eiendom_unit_code
@@ -78,13 +112,31 @@ async def ensure_eiendom_unit_code(ad: FinnAd) -> str | None:
async def get_or_fetch_eiendom_unit( async def get_or_fetch_eiendom_unit(
unit_code: str, force_refresh: bool = False unit_code: str, force_refresh: bool = False
) -> EiendomUnit | None: ) -> EiendomUnit | None:
"""Get EiendomUnit from cache or fetch fresh.""" """Get EiendomUnit from cache or fetch fresh.
Hash-aware: if the remote payload is identical to what is stored,
the DB row is not updated (analysis_cache stays valid).
"""
conn = init_db(FINN_CACHE_PATH) conn = init_db(FINN_CACHE_PATH)
unit = None if force_refresh else get_cached_eiendom_unit(conn, unit_code, ttl_hours=24) unit = (
if unit is None: None
if force_refresh
else get_cached_eiendom_unit(conn, unit_code, ttl_hours=24)
)
if unit is not None:
return unit
unit = await get_unit(unit_code) unit = await get_unit(unit_code)
if unit is not None: if unit is not None:
save_eiendom_unit(conn, unit) _, changed = save_eiendom_unit(conn, unit)
if changed:
logger.debug(
"eiendom_unit %s updated -- analysis caches for linked finnkodes may be stale",
unit_code,
)
# We don't have a direct finnkode → unit_code reverse map in the
# DB yet, so we cannot invalidate analysis here. The deps_hash
# mismatch in get_analysis() handles this automatically.
return unit return unit
@@ -93,40 +145,36 @@ async def get_or_fetch_similar_units(
) -> list[SimilarUnit]: ) -> list[SimilarUnit]:
"""Get similar units (comps) from cache or fetch fresh. """Get similar units (comps) from cache or fetch fresh.
Fetches the unit first to get the unit_vector, then checks cache for similar Hash-aware: identical remote payloads do not trigger a DB write,
units by (unit_code, listing_status). On cache miss, fetches fresh from so the analysis_cache entry for any finnkode that uses these comps
Eiendom.no and saves to cache. remains valid.
""" """
conn = init_db(FINN_CACHE_PATH) conn = init_db(FINN_CACHE_PATH)
# First, ensure we have the unit to build its vector # Ensure we have the unit to build its vector.
unit = await get_or_fetch_eiendom_unit(unit_code, force_refresh=force_refresh) unit = await get_or_fetch_eiendom_unit(unit_code, force_refresh=force_refresh)
if unit is None: if unit is None:
return [] return []
# Check cache for similar units (unless force_refresh)
if not force_refresh: if not force_refresh:
cached_similar = get_cached_similar_units( cached_similar = get_cached_similar_units(
conn, unit_code, listing_status, ttl_hours=EIENDOM_NO_CACHE_TTL_HOURS conn, unit_code, listing_status, ttl_hours=EIENDOM_NO_CACHE_TTL_HOURS
) )
if cached_similar: if cached_similar:
logger.debug( logger.debug(
"Using cached similar units for %s (status=%s)", "Using cached similar units for %s (status=%s)", unit_code, listing_status
unit_code,
listing_status,
) )
return cached_similar return cached_similar
# Cache miss or force_refresh: fetch fresh # Cache miss or force_refresh: fetch from remote.
vector = build_unit_vector(unit) vector = build_unit_vector(unit)
similar = await get_similar_units(vector, listing_status=listing_status) similar = await get_similar_units(vector, listing_status=listing_status)
# Save to cache
if similar: if similar:
save_similar_units(conn, unit_code, listing_status, similar) _, changed = save_similar_units(conn, unit_code, listing_status, similar)
if changed:
logger.debug( logger.debug(
"Cached %d similar units for %s (status=%s)", "similar_units %s/%s updated -- analysis caches may be stale",
len(similar),
unit_code, unit_code,
listing_status, listing_status,
) )
@@ -170,10 +218,8 @@ async def analyze_search(
) -> dict[str, Any]: ) -> dict[str, Any]:
"""Analyze a FINN search URL and return a ranked shortlist. """Analyze a FINN search URL and return a ranked shortlist.
NOTE: enrichment for search results lives in analysis.py. If that path Individual ad analyses are served from analysis_cache when the
also reports `eiendom_enriched: 0`, it has the same root cause -- each underlying data has not changed.
card's eiendom_unit_code must be resolved via ensure_eiendom_unit_code
(or search_unit_from_finn_url) before the enrichment gate.
""" """
return await run_analysis_search( return await run_analysis_search(
search_url, search_url,
@@ -198,15 +244,15 @@ async def analyze_ad(
unit_code = await ensure_eiendom_unit_code(ad) if include_eiendom_no else None unit_code = await ensure_eiendom_unit_code(ad) if include_eiendom_no else None
result: dict[str, Any] = { result: dict[str, Any] = {
"ad": ad.model_dump(), "ad": ad.model_dump(mode="json"),
} }
if unit_code: if unit_code:
unit = await get_or_fetch_eiendom_unit(unit_code) unit = await get_or_fetch_eiendom_unit(unit_code)
if unit: if unit:
result["eiendom_unit"] = unit.model_dump() result["eiendom_unit"] = unit.model_dump(mode="json")
if include_similar_units: if include_similar_units:
similar = await get_or_fetch_similar_units(unit_code) similar = await get_or_fetch_similar_units(unit_code)
result["similar_units"] = [s.model_dump() for s in similar] result["similar_units"] = [s.model_dump(mode="json") for s in similar]
return result return result
@@ -220,14 +266,14 @@ async def analyze_ad_against_comps(
unit_code = await ensure_eiendom_unit_code(ad) unit_code = await ensure_eiendom_unit_code(ad)
result: dict[str, Any] = { result: dict[str, Any] = {
"ad": ad.model_dump(), "ad": ad.model_dump(mode="json"),
} }
if unit_code: if unit_code:
unit = await get_or_fetch_eiendom_unit(unit_code) unit = await get_or_fetch_eiendom_unit(unit_code)
if unit: if unit:
result["eiendom_unit"] = unit.model_dump() result["eiendom_unit"] = unit.model_dump(mode="json")
comps = await get_or_fetch_similar_units(unit_code, listing_status=listing_status) comps = await get_or_fetch_similar_units(unit_code, listing_status=listing_status)
result["comparable_units"] = [c.model_dump() for c in comps] result["comparable_units"] = [c.model_dump(mode="json") for c in comps]
return result return result
@@ -235,7 +281,6 @@ async def find_similar_to_liked(
finnkode: str, *, mode: str = "recommendations", listing_status: str = "FOR_SALE" finnkode: str, *, mode: str = "recommendations", listing_status: str = "FOR_SALE"
) -> dict[str, Any]: ) -> dict[str, Any]:
"""Find properties similar to a listing the user has liked.""" """Find properties similar to a listing the user has liked."""
# Requires that feedback.verdict = "liked" exists for this finnkode
ad = await get_or_fetch_ad(finnkode) ad = await get_or_fetch_ad(finnkode)
unit_code = await ensure_eiendom_unit_code(ad) unit_code = await ensure_eiendom_unit_code(ad)
@@ -252,8 +297,8 @@ async def find_similar_to_liked(
similar = await get_or_fetch_similar_units(unit_code, listing_status=listing_status) similar = await get_or_fetch_similar_units(unit_code, listing_status=listing_status)
return { return {
"base_ad": ad.model_dump(), "base_ad": ad.model_dump(mode="json"),
"similar_listings": [s.model_dump() for s in similar], "similar_listings": [s.model_dump(mode="json") for s in similar],
"mode": mode, "mode": mode,
} }
@@ -269,16 +314,16 @@ async def compare_ads(
# Resolve before model_dump() -- see analyze_ad. # Resolve before model_dump() -- see analyze_ad.
unit_code = await ensure_eiendom_unit_code(ad) if include_eiendom_no else None unit_code = await ensure_eiendom_unit_code(ad) if include_eiendom_no else None
ad_data = ad.model_dump() ad_data = ad.model_dump(mode="json")
if unit_code: if unit_code:
unit = await get_or_fetch_eiendom_unit(unit_code) unit = await get_or_fetch_eiendom_unit(unit_code)
if unit: if unit:
ad_data["eiendom_unit"] = unit.model_dump() ad_data["eiendom_unit"] = unit.model_dump(mode="json")
if include_comps: if include_comps:
comps = await get_or_fetch_similar_units( comps = await get_or_fetch_similar_units(
unit_code, listing_status="RECENTLY_SOLD" unit_code, listing_status="RECENTLY_SOLD"
) )
ad_data["comps"] = [c.model_dump() for c in comps] ad_data["comps"] = [c.model_dump(mode="json") for c in comps]
ads.append(ad_data) ads.append(ad_data)