Files
finn-mcp/finn_eiendom/service.py
T
ole 46fd22c277 Enhance Docker and Compose configurations; add health check endpoint and caching improvements
- Updated Dockerfile to include FINN_CACHE_PATH and create data directory.
- Modified docker-compose.prod.yml to expose port 8010 and adjust resource limits.
- Updated docker-compose.yml to include FINN_CACHE_PATH and ensure proper port mapping.
- Added health check endpoint in http_server.py for container orchestration.
- Improved caching logic in analysis.py and service.py for similar units.
- Refined scoring.py with updated scoring model and constants for better accuracy.

Co-authored-by: Copilot <copilot@github.com>
2026-05-26 12:10:00 +00:00

322 lines
11 KiB
Python

"""Service layer for cache-aware fetching of FINN ads and Eiendom.no units."""
import logging
from typing import Any
from .ad import fetch_ad_details
from .analysis import analyze_search as run_analysis_search
from .cache import (
get_eiendom_unit as get_cached_eiendom_unit,
get_finn_ad,
get_similar_units as get_cached_similar_units,
init_db,
save_eiendom_unit,
save_finn_ad,
save_similar_units,
)
from .config import EIENDOM_NO_CACHE_TTL_HOURS, FINN_CACHE_PATH
from .eiendom_no import (
build_unit_vector,
decode_unit_vector,
get_similar_units,
get_unit,
search_unit_from_finn_url,
)
from .feedback import save_feedback as save_feedback_impl
from .models import EiendomUnit, FinnAd, SimilarUnit
logger = logging.getLogger(__name__)
async def get_or_fetch_ad(finnkode: str, force_refresh: bool = False) -> FinnAd:
"""Get FinnAd from cache or fetch fresh. Never returns None."""
conn = init_db(FINN_CACHE_PATH)
ad = None if force_refresh else get_finn_ad(conn, finnkode, ttl_hours=24)
if ad is None:
ad = await fetch_ad_details(finnkode)
save_finn_ad(conn, ad)
return ad
async def ensure_eiendom_unit_code(ad: FinnAd) -> str | None:
"""Backfill ``ad.eiendom_unit_code`` by resolving it from the FINN URL.
``fetch_ad_details`` never populates ``eiendom_unit_code`` -- only the
Eiendom.no resolver (``search_unit_from_finn_url``) can map a FINN listing
to an Eiendom.no unit. Every enrichment path gates on this field, so
without an explicit resolve step the gate is always falsy and enrichment
silently no-ops.
Resolves once, mutates the ad in place, and persists the backfill to the
cache so subsequent cache hits skip the network round trip.
IMPORTANT: callers must run this BEFORE serialising the ad with
``model_dump()`` -- otherwise the dumped dict carries a stale
``eiendom_unit_code: None`` even though enrichment succeeded.
Returns the unit_code, or ``None`` if the listing cannot be resolved
(e.g. new-build project ads, off-market addresses).
"""
if ad.eiendom_unit_code:
return ad.eiendom_unit_code
unit = await search_unit_from_finn_url(ad.url)
if unit is None or not unit.unit_code:
logger.info("No Eiendom.no unit resolved for finnkode %s", ad.finnkode)
return None
ad.eiendom_unit_code = unit.unit_code
conn = init_db(FINN_CACHE_PATH)
save_finn_ad(conn, ad) # persist backfill; do NOT cache `unit` here --
# the resolver returns a partial record (code +
# address + coords). The full unit comes from
# get_or_fetch_eiendom_unit -> get_unit().
logger.info("Resolved finnkode %s -> unit %s", ad.finnkode, unit.unit_code)
return ad.eiendom_unit_code
async def get_or_fetch_eiendom_unit(
unit_code: str, force_refresh: bool = False
) -> EiendomUnit | None:
"""Get EiendomUnit from cache or fetch fresh."""
conn = init_db(FINN_CACHE_PATH)
unit = None if force_refresh else get_cached_eiendom_unit(conn, unit_code, ttl_hours=24)
if unit is None:
unit = await get_unit(unit_code)
if unit is not None:
save_eiendom_unit(conn, unit)
return unit
async def get_or_fetch_similar_units(
unit_code: str, listing_status: str = "RECENTLY_SOLD", force_refresh: bool = False
) -> list[SimilarUnit]:
"""Get similar units (comps) from cache or fetch fresh.
Fetches the unit first to get the unit_vector, then checks cache for similar
units by (unit_code, listing_status). On cache miss, fetches fresh from
Eiendom.no and saves to cache.
"""
conn = init_db(FINN_CACHE_PATH)
# First, ensure we have the unit to build its vector
unit = await get_or_fetch_eiendom_unit(unit_code, force_refresh=force_refresh)
if unit is None:
return []
# Check cache for similar units (unless force_refresh)
if not force_refresh:
cached_similar = get_cached_similar_units(
conn, unit_code, listing_status, ttl_hours=EIENDOM_NO_CACHE_TTL_HOURS
)
if cached_similar:
logger.debug(
"Using cached similar units for %s (status=%s)",
unit_code,
listing_status,
)
return cached_similar
# Cache miss or force_refresh: fetch fresh
vector = build_unit_vector(unit)
similar = await get_similar_units(vector, listing_status=listing_status)
# Save to cache
if similar:
save_similar_units(conn, unit_code, listing_status, similar)
logger.debug(
"Cached %d similar units for %s (status=%s)",
len(similar),
unit_code,
listing_status,
)
return similar
async def get_unit_images(unit_code: str, force_refresh: bool = False) -> dict[str, Any]:
"""Fetch unit images for visual assessment."""
unit = await get_or_fetch_eiendom_unit(unit_code, force_refresh=force_refresh)
if unit is None:
raise ValueError(f"Could not fetch Eiendom.no unit {unit_code}")
return {
"unit_code": unit.unit_code,
"address": unit.address,
"unit_images": unit.unit_images or [],
"property_type": unit.property_type,
"rooms": unit.rooms,
"usable_area": unit.usable_area,
}
async def resolve_eiendom_unit_from_finn_url(finn_url: str) -> EiendomUnit | None:
"""Resolve an Eiendom.no unit from a FINN listing URL."""
return await search_unit_from_finn_url(finn_url)
# ============================================================================
# Orchestration functions -- delegate to analysis.py
# ============================================================================
async def analyze_search(
search_url: str,
*,
max_pages: int = 3,
detail_limit: int = 20,
include_details: bool = True,
include_eiendom_no: bool = True,
) -> dict[str, Any]:
"""Analyze a FINN search URL and return a ranked shortlist.
NOTE: enrichment for search results lives in analysis.py. If that path
also reports `eiendom_enriched: 0`, it has the same root cause -- each
card's eiendom_unit_code must be resolved via ensure_eiendom_unit_code
(or search_unit_from_finn_url) before the enrichment gate.
"""
return await run_analysis_search(
search_url,
max_pages=max_pages,
fetch_details=include_details,
detail_limit=detail_limit,
include_eiendom_no=include_eiendom_no,
)
async def analyze_ad(
finnkode: str,
*,
include_eiendom_no: bool = True,
include_similar_units: bool = False,
) -> dict[str, Any]:
"""Fetch and enrich a single FINN ad with analysis."""
ad = await get_or_fetch_ad(finnkode)
# Resolve BEFORE model_dump() so the serialised ad carries the backfilled
# eiendom_unit_code instead of a stale None.
unit_code = await ensure_eiendom_unit_code(ad) if include_eiendom_no else None
result: dict[str, Any] = {
"ad": ad.model_dump(),
}
if unit_code:
unit = await get_or_fetch_eiendom_unit(unit_code)
if unit:
result["eiendom_unit"] = unit.model_dump()
if include_similar_units:
similar = await get_or_fetch_similar_units(unit_code)
result["similar_units"] = [s.model_dump() for s in similar]
return result
async def analyze_ad_against_comps(
finnkode: str, listing_status: str = "RECENTLY_SOLD"
) -> dict[str, Any]:
"""Evaluate one listing against recent comparable sales."""
ad = await get_or_fetch_ad(finnkode)
# Resolve before model_dump() -- see analyze_ad.
unit_code = await ensure_eiendom_unit_code(ad)
result: dict[str, Any] = {
"ad": ad.model_dump(),
}
if unit_code:
unit = await get_or_fetch_eiendom_unit(unit_code)
if unit:
result["eiendom_unit"] = unit.model_dump()
comps = await get_or_fetch_similar_units(unit_code, listing_status=listing_status)
result["comparable_units"] = [c.model_dump() for c in comps]
return result
async def find_similar_to_liked(
finnkode: str, *, mode: str = "recommendations", listing_status: str = "FOR_SALE"
) -> dict[str, Any]:
"""Find properties similar to a listing the user has liked."""
# Requires that feedback.verdict = "liked" exists for this finnkode
ad = await get_or_fetch_ad(finnkode)
unit_code = await ensure_eiendom_unit_code(ad)
if not unit_code:
raise ValueError(
f"Finnkode {finnkode} could not be resolved to an Eiendom.no unit; "
"cannot find similar properties"
)
# TODO: verify feedback verdict = "liked" exists
unit = await get_or_fetch_eiendom_unit(unit_code)
if not unit:
raise ValueError(f"Cannot enrich finnkode {finnkode} with Eiendom.no data")
similar = await get_or_fetch_similar_units(unit_code, listing_status=listing_status)
return {
"base_ad": ad.model_dump(),
"similar_listings": [s.model_dump() for s in similar],
"mode": mode,
}
async def compare_ads(
finnkoder: list[str], *, include_eiendom_no: bool = True, include_comps: bool = True
) -> dict[str, Any]:
"""Compare multiple FINN listings side by side."""
ads = []
for finnkode in finnkoder:
ad = await get_or_fetch_ad(finnkode)
# Resolve before model_dump() -- see analyze_ad.
unit_code = await ensure_eiendom_unit_code(ad) if include_eiendom_no else None
ad_data = ad.model_dump()
if unit_code:
unit = await get_or_fetch_eiendom_unit(unit_code)
if unit:
ad_data["eiendom_unit"] = unit.model_dump()
if include_comps:
comps = await get_or_fetch_similar_units(
unit_code, listing_status="RECENTLY_SOLD"
)
ad_data["comps"] = [c.model_dump() for c in comps]
ads.append(ad_data)
return {"listings": ads}
# ============================================================================
# Helper functions
# ============================================================================
async def build_unit_vector_for_unit_code(unit_code: str) -> dict[str, Any]:
"""Build a unit_vector for a unit_code by fetching and encoding the unit data."""
unit = await get_or_fetch_eiendom_unit(unit_code)
if unit is None:
raise ValueError(f"Could not fetch Eiendom.no unit {unit_code}")
vector = build_unit_vector(unit)
return {"unit_code": unit_code, "unit_vector": vector}
def decode_unit_vector_to_dict(unit_vector: str) -> dict[str, Any]:
"""Decode a unit_vector string to a dict."""
return decode_unit_vector(unit_vector)
def save_feedback(finnkode: str, verdict: str, notes: str | None = None) -> dict[str, Any]:
"""Store user feedback/verdict for a listing."""
return save_feedback_impl(finnkode, verdict, notes)
def get_shortlist(run_id: int | None = None, limit: int = 10) -> dict[str, Any]:
"""Fetch stored shortlist from a search run."""
# TODO: implement via search_runs table in cache.py
return {"shortlist": [], "run_id": run_id, "limit": limit}
def get_new_ads_since_last_run(search_url: str) -> dict[str, Any]:
"""Detect new/removed/changed listings vs the previous run."""
# TODO: implement via search_runs table in cache.py
return {"new_ads": [], "removed_ads": [], "changed_ads": [], "search_url": search_url}