Files
ole eb95b98111 Refactor and enhance various components of the FINN real estate analysis tool
- Updated docker-compose files to use local data volumes for development.
- Refactored analysis.py to improve code readability and performance, including changes to cache age calculations and hash computations.
- Enhanced cache.py to ensure the database directory is created if it doesn't exist and improved SQL query formatting.
- Modified cli.py to improve logging and statistics reporting for finn_ads.
- Updated config.py to streamline environment variable handling.
- Initialized the database eagerly in http_server.py to prevent runtime errors.
- Refactored mcp_server.py to slim down data structures and improve response formatting for API calls.
- Enhanced service.py to improve feedback handling and shortlist retrieval, ensuring enriched data is returned.
- Updated recompute_analysis_cache.py for better SQL query formatting.
2026-05-29 15:17:11 +00:00

384 lines
14 KiB
Python

"""FastMCP stdio server for FINN real estate analysis and Eiendom.no enrichment."""
import base64
import json
import logging
from typing import Any
import os
import asyncio
import httpx
from mcp.server.transport_security import TransportSecuritySettings
from mcp.server.fastmcp import Context, FastMCP
from mcp.types import ImageContent, TextContent
from .formatting import (
render_diff,
render_shortlist,
)
from .service import (
analyze_ad,
analyze_search,
get_new_ads_since_last_run,
get_shortlist,
get_unit_images,
save_feedback,
)
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Response shaping
# ---------------------------------------------------------------------------
def _slim_comp(c: dict) -> dict:
"""Drop internal IDs, coords, redundant status fields from a comparable unit."""
return {
"unit_code": c.get("unit_code"),
"address": c.get("address"),
"usable_area": c.get("usable_area"),
"rooms": c.get("rooms"),
"floor": c.get("floor"),
"construction_year": c.get("construction_year"),
"listing_price": c.get("listing_price"),
"selling_price": c.get("selling_price"),
"shared_debt": c.get("shared_debt"),
"sqm_price": c.get("sqm_price"),
"common_costs": c.get("common_costs"),
"days_on_market": c.get("days_on_market"),
"finalized_at": (c.get("finalized_at") or "")[:10],
}
def _slim_comps(comps: list[dict], keep: int = 15) -> list[dict]:
"""Sort comps by recency, keep the N most recent — older comps lose relevance fast."""
sorted_comps = sorted(comps, key=lambda c: c.get("finalized_at") or "", reverse=True)
return [_slim_comp(c) for c in sorted_comps[:keep]]
def _avg_comp_sqm(comps: list[dict]) -> int | None:
sqm_prices = [c["sqm_price"] for c in comps if c.get("sqm_price")]
return round(sum(sqm_prices) / len(sqm_prices)) if sqm_prices else None
def _slim_eiendom(eu: dict, comps: list[dict]) -> dict:
"""Compact Eiendom.no unit view. Drops unit_images, unit_vector, lat/lng, timestamps."""
return {
"unit_code": eu.get("unit_code"),
"usable_area": eu.get("usable_area"),
"estimated_price": eu.get("estimated_selling_price"),
"estimated_range": [
eu.get("estimated_selling_price_lower"),
eu.get("estimated_selling_price_upper"),
],
"listing_sqm_price": eu.get("listing_sqm_price"),
"market_placement": eu.get("market_placement_score"),
"sale_status": eu.get("sale_status"),
"days_on_market": eu.get("days_on_market"),
"avg_comp_sqm_price": _avg_comp_sqm(comps),
"comp_count": len(comps),
}
def _slim_listing(rank: int, item: dict) -> dict:
"""Collapse one full analyze_ad result into a compact listing card.
Keeps: listing_description (for AI interpretation), price_history, cache_age, score breakdown.
Drops: unit_images, unit_vector, internal eiendom_unit timestamps.
Derives: avg_comp_sqm_price from similar_units.
"""
eu = item.get("eiendom_unit") or {}
comps = item.get("similar_units") or []
score = item.get("score") or {}
summary = item.get("summary") or {}
price_history = item.get("price_history") or []
return {
"rank": rank,
"finnkode": item.get("finnkode"),
"url": item.get("url"),
"title": item.get("title"),
"address": item.get("address"),
"listing_description": item.get("listing_description"),
"district": item.get("district"),
"property_type": item.get("property_type"),
"ownership_type": item.get("ownership_type"),
"floor": item.get("floor"),
"area_m2": item.get("area_m2"),
"bedrooms": item.get("bedrooms"),
"rooms": item.get("rooms"),
"total_price": item.get("total_price"),
"asking_price": item.get("asking_price"),
"shared_debt": item.get("shared_debt"),
"common_costs": item.get("common_costs"),
"construction_year": item.get("construction_year"),
"has_balcony": item.get("has_balcony"),
"has_terrace": item.get("has_terrace"),
"has_elevator": item.get("has_elevator"),
"has_parking": item.get("has_parking"),
"has_garage": item.get("has_garage"),
"eiendom_unit_code": item.get("eiendom_unit_code"),
"score": dict(score),
"categories": item.get("categories"),
"why_interesting": summary.get("why_interesting"),
"risks": summary.get("risks"),
"cache_age": item.get("cache_age"),
"price_history": price_history[:5], # Last 5 price records
"eiendom": _slim_eiendom(eu, comps) if eu else None,
"similar_units": _slim_comps(comps),
}
def _slim_analyze_ad(result: dict) -> dict:
"""Shape the single-ad analyze_ad result for MCP output.
The service returns {ad: FinnAd, eiendom_unit: EiendomUnit, similar_units: [...]}.
Flatten the ad fields up, keep listing_description, attach slim eiendom + comps,
and strip unit_images / unit_vector / lat / lng / internal timestamps.
"""
ad = result.get("ad") or {}
eu = result.get("eiendom_unit") or {}
comps = result.get("similar_units") or []
out: dict[str, Any] = {
"finnkode": ad.get("finnkode"),
"url": ad.get("url"),
"title": ad.get("title"),
"address": ad.get("address"),
"district": ad.get("district"),
"listing_description": ad.get("listing_description"),
"property_type": ad.get("property_type"),
"ownership_type": ad.get("ownership_type"),
"floor": ad.get("floor"),
"area_m2": ad.get("area_m2"),
"rooms": ad.get("rooms"),
"bedrooms": ad.get("bedrooms"),
"total_price": ad.get("total_price"),
"asking_price": ad.get("asking_price"),
"shared_debt": ad.get("shared_debt"),
"common_costs": ad.get("common_costs"),
"construction_year": ad.get("construction_year"),
"energy_rating": ad.get("energy_rating"),
"has_balcony": ad.get("has_balcony"),
"has_terrace": ad.get("has_terrace"),
"has_elevator": ad.get("has_elevator"),
"has_parking": ad.get("has_parking"),
"has_garage": ad.get("has_garage"),
"eiendom_unit_code": ad.get("eiendom_unit_code"),
"eiendom": _slim_eiendom(eu, comps) if eu else None,
"similar_units": _slim_comps(comps),
}
return out
def _build_slim_search_result(full: dict) -> dict:
"""Convert full analyze_search output to a compact MCP-safe response.
Removes search_cards (redundant), drops all fat fields from individual
listings. Target: <200KB for 30 analyzed ads.
"""
listings = [
_slim_listing(rank + 1, item) for rank, item in enumerate(full.get("analysis") or [])
]
return {
"search_url": full.get("search_url"),
"summary": full.get("summary"),
"listings": listings,
}
def _build_transport_security() -> TransportSecuritySettings:
allowed = os.getenv("MCP_ALLOWED_HOSTS", "")
if allowed:
hosts = [h.strip() for h in allowed.split(",")]
return TransportSecuritySettings(
enable_dns_rebinding_protection=True,
allowed_hosts=["127.0.0.1:*", "localhost:*", "[::1]:*"] + hosts,
)
return TransportSecuritySettings(enable_dns_rebinding_protection=False)
mcp = FastMCP("finn_eiendom_mcp", transport_security=_build_transport_security())
@mcp.tool(
description=(
"Analyze a FINN.no real estate search URL. Scrapes listing cards,"
" fetches details, enriches with Eiendom.no data, scores, and ranks."
" Fetches all ads in parallel (phase 1) then scores from cache (phase 2)."
" Progress updates are emitted during phase 1."
)
)
async def finn_analyze_search(
search_url: str,
ctx: Context,
max_pages: int = 3,
detail_limit: int = 20,
include_details: bool = True,
include_eiendom_no: bool = True,
) -> str:
"""Analyze a FINN search URL and return ranked listing results."""
try:
result = await analyze_search(
search_url,
max_pages=max_pages,
include_details=include_details,
detail_limit=detail_limit,
include_eiendom_no=include_eiendom_no,
ctx=ctx,
)
return json.dumps(_build_slim_search_result(result), default=str)
except Exception as e:
logger.error(f"Error analyzing search: {e}")
return json.dumps({"error": True, "message": str(e)})
@mcp.tool(
description=(
"Fetch and analyze unit images for visual assessment of a property. "
"Downloads photos and returns them as visual image content so Claude can "
"directly assess views, condition, layout, kitchen/bathroom quality, and atmosphere."
)
)
async def finn_analyze_unit_images(
unit_code: str,
force_refresh: bool = False,
max_images: int = 8,
) -> list:
"""Fetch unit images and return as vision-compatible image content blocks."""
try:
result = await get_unit_images(unit_code, force_refresh=force_refresh)
all_urls = result.get("unit_images") or []
urls = all_urls[:max_images]
header = (
f"{result.get('address', unit_code)} | "
f"{result.get('rooms')} rom | "
f"{result.get('usable_area')}m² | "
f"{len(all_urls)} bilder totalt, viser {len(urls)}"
)
content: list = [TextContent(type="text", text=header)]
async def _fetch(url: str) -> ImageContent | None:
try:
async with httpx.AsyncClient(timeout=15) as client:
resp = await client.get(url)
if resp.status_code != 200:
return None
# Resize to max 1024px on longest side before encoding.
# Raw real estate photos are 2-4MB — must compress to stay
# within the 1MB MCP tool result limit across multiple images.
from PIL import Image
import io
img = Image.open(io.BytesIO(resp.content))
img.thumbnail((1024, 1024), Image.LANCZOS)
if img.mode in ("RGBA", "P"):
img = img.convert("RGB")
buf = io.BytesIO()
img.save(buf, format="JPEG", quality=75, optimize=True)
b64 = base64.b64encode(buf.getvalue()).decode()
return ImageContent(type="image", data=b64, mimeType="image/jpeg")
except Exception as exc:
logger.warning("Failed to fetch/resize image %s: %s", url, exc)
return None
fetched = await asyncio.gather(*[_fetch(u) for u in urls])
content.extend(img for img in fetched if img is not None)
return content
except Exception as e:
logger.error(f"Error fetching unit images for {unit_code}: {e}")
return [TextContent(type="text", text=json.dumps({"error": True, "message": str(e)}))]
# ============================================================================
# Additional analysis and enrichment tools
# ============================================================================
@mcp.tool(
description=(
"Deep-dive one or more FINN listings. Accepts a single finnkode or a list "
"(batched in one call). Always enriches with Eiendom.no data and comparable "
"sold units. Returns listing_description plus slim eiendom/comps; excludes "
"image URLs and internal vectors (use finn_analyze_unit_images for visuals)."
)
)
async def finn_analyze_ad(finnkode: str | list[str]) -> str:
"""Analyze and enrich one or more FINN ads. Batch input returns a list."""
finnkoder = [finnkode] if isinstance(finnkode, str) else list(finnkode)
async def _one(fk: str) -> dict:
try:
result = await analyze_ad(
fk,
include_eiendom_no=True,
include_similar_units=True,
)
return _slim_analyze_ad(result)
except Exception as e: # noqa: BLE001 — per-item isolation, batch must not abort
logger.error(f"Error analyzing ad {fk}: {e}")
return {"finnkode": fk, "error": True, "message": str(e)}
results = await asyncio.gather(*[_one(fk) for fk in finnkoder])
# Single string input → single object; list input → list (preserves order).
payload: Any = results[0] if isinstance(finnkode, str) else results
return json.dumps(payload, default=str)
@mcp.tool(
description="Store user feedback (verdict, notes) for a FINN listing. "
"Enables similar-unit recommendations and shortlist filtering."
)
async def finn_save_feedback(finnkode: str, verdict: str, notes: str | None = None) -> str:
"""Save user feedback for a listing."""
try:
result = save_feedback(finnkode, verdict, notes)
return json.dumps(result, default=str)
except Exception as e:
logger.error(f"Error saving feedback for {finnkode}: {e}")
return json.dumps({"error": True, "message": str(e)})
@mcp.tool(
description="Fetch the shortlist of listings you have given a verdict "
"(liked, disliked, maybe, visited). Enriched with cached score and price data."
)
def finn_get_shortlist(verdict: str = "liked", limit: int = 10) -> str:
"""Get stored shortlist filtered by verdict."""
try:
result = get_shortlist(verdict, limit)
return render_shortlist(result, "json")
except Exception as e:
logger.error(f"Error fetching shortlist: {e}")
return json.dumps({"error": True, "message": str(e)})
@mcp.tool(
description=(
"Detect new, removed, and changed listings in a FINN search URL "
"compared to the previous run. Shows price/status diffs on changed listings."
)
)
async def finn_get_new_ads_since_last_run(search_url: str) -> str:
"""Get new/removed/changed listings since last run."""
try:
result = get_new_ads_since_last_run(search_url)
return render_diff(result, "json")
except Exception as e:
logger.error(f"Error fetching diff: {e}")
return json.dumps({"error": True, "message": str(e)})
def main() -> None:
"""Run the FastMCP server over stdio (standard MCP transport)."""
mcp.run(transport="stdio")
if __name__ == "__main__":
main()