eb95b98111
- Updated docker-compose files to use local data volumes for development. - Refactored analysis.py to improve code readability and performance, including changes to cache age calculations and hash computations. - Enhanced cache.py to ensure the database directory is created if it doesn't exist and improved SQL query formatting. - Modified cli.py to improve logging and statistics reporting for finn_ads. - Updated config.py to streamline environment variable handling. - Initialized the database eagerly in http_server.py to prevent runtime errors. - Refactored mcp_server.py to slim down data structures and improve response formatting for API calls. - Enhanced service.py to improve feedback handling and shortlist retrieval, ensuring enriched data is returned. - Updated recompute_analysis_cache.py for better SQL query formatting.
384 lines
14 KiB
Python
384 lines
14 KiB
Python
"""FastMCP stdio server for FINN real estate analysis and Eiendom.no enrichment."""
|
|
|
|
import base64
|
|
import json
|
|
import logging
|
|
from typing import Any
|
|
import os
|
|
import asyncio
|
|
import httpx
|
|
from mcp.server.transport_security import TransportSecuritySettings
|
|
from mcp.server.fastmcp import Context, FastMCP
|
|
from mcp.types import ImageContent, TextContent
|
|
|
|
from .formatting import (
|
|
render_diff,
|
|
render_shortlist,
|
|
)
|
|
from .service import (
|
|
analyze_ad,
|
|
analyze_search,
|
|
get_new_ads_since_last_run,
|
|
get_shortlist,
|
|
get_unit_images,
|
|
save_feedback,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Response shaping
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _slim_comp(c: dict) -> dict:
|
|
"""Drop internal IDs, coords, redundant status fields from a comparable unit."""
|
|
return {
|
|
"unit_code": c.get("unit_code"),
|
|
"address": c.get("address"),
|
|
"usable_area": c.get("usable_area"),
|
|
"rooms": c.get("rooms"),
|
|
"floor": c.get("floor"),
|
|
"construction_year": c.get("construction_year"),
|
|
"listing_price": c.get("listing_price"),
|
|
"selling_price": c.get("selling_price"),
|
|
"shared_debt": c.get("shared_debt"),
|
|
"sqm_price": c.get("sqm_price"),
|
|
"common_costs": c.get("common_costs"),
|
|
"days_on_market": c.get("days_on_market"),
|
|
"finalized_at": (c.get("finalized_at") or "")[:10],
|
|
}
|
|
|
|
|
|
def _slim_comps(comps: list[dict], keep: int = 15) -> list[dict]:
|
|
"""Sort comps by recency, keep the N most recent — older comps lose relevance fast."""
|
|
sorted_comps = sorted(comps, key=lambda c: c.get("finalized_at") or "", reverse=True)
|
|
return [_slim_comp(c) for c in sorted_comps[:keep]]
|
|
|
|
|
|
def _avg_comp_sqm(comps: list[dict]) -> int | None:
|
|
sqm_prices = [c["sqm_price"] for c in comps if c.get("sqm_price")]
|
|
return round(sum(sqm_prices) / len(sqm_prices)) if sqm_prices else None
|
|
|
|
|
|
def _slim_eiendom(eu: dict, comps: list[dict]) -> dict:
|
|
"""Compact Eiendom.no unit view. Drops unit_images, unit_vector, lat/lng, timestamps."""
|
|
return {
|
|
"unit_code": eu.get("unit_code"),
|
|
"usable_area": eu.get("usable_area"),
|
|
"estimated_price": eu.get("estimated_selling_price"),
|
|
"estimated_range": [
|
|
eu.get("estimated_selling_price_lower"),
|
|
eu.get("estimated_selling_price_upper"),
|
|
],
|
|
"listing_sqm_price": eu.get("listing_sqm_price"),
|
|
"market_placement": eu.get("market_placement_score"),
|
|
"sale_status": eu.get("sale_status"),
|
|
"days_on_market": eu.get("days_on_market"),
|
|
"avg_comp_sqm_price": _avg_comp_sqm(comps),
|
|
"comp_count": len(comps),
|
|
}
|
|
|
|
|
|
def _slim_listing(rank: int, item: dict) -> dict:
|
|
"""Collapse one full analyze_ad result into a compact listing card.
|
|
|
|
Keeps: listing_description (for AI interpretation), price_history, cache_age, score breakdown.
|
|
Drops: unit_images, unit_vector, internal eiendom_unit timestamps.
|
|
Derives: avg_comp_sqm_price from similar_units.
|
|
"""
|
|
eu = item.get("eiendom_unit") or {}
|
|
comps = item.get("similar_units") or []
|
|
score = item.get("score") or {}
|
|
summary = item.get("summary") or {}
|
|
price_history = item.get("price_history") or []
|
|
|
|
return {
|
|
"rank": rank,
|
|
"finnkode": item.get("finnkode"),
|
|
"url": item.get("url"),
|
|
"title": item.get("title"),
|
|
"address": item.get("address"),
|
|
"listing_description": item.get("listing_description"),
|
|
"district": item.get("district"),
|
|
"property_type": item.get("property_type"),
|
|
"ownership_type": item.get("ownership_type"),
|
|
"floor": item.get("floor"),
|
|
"area_m2": item.get("area_m2"),
|
|
"bedrooms": item.get("bedrooms"),
|
|
"rooms": item.get("rooms"),
|
|
"total_price": item.get("total_price"),
|
|
"asking_price": item.get("asking_price"),
|
|
"shared_debt": item.get("shared_debt"),
|
|
"common_costs": item.get("common_costs"),
|
|
"construction_year": item.get("construction_year"),
|
|
"has_balcony": item.get("has_balcony"),
|
|
"has_terrace": item.get("has_terrace"),
|
|
"has_elevator": item.get("has_elevator"),
|
|
"has_parking": item.get("has_parking"),
|
|
"has_garage": item.get("has_garage"),
|
|
"eiendom_unit_code": item.get("eiendom_unit_code"),
|
|
"score": dict(score),
|
|
"categories": item.get("categories"),
|
|
"why_interesting": summary.get("why_interesting"),
|
|
"risks": summary.get("risks"),
|
|
"cache_age": item.get("cache_age"),
|
|
"price_history": price_history[:5], # Last 5 price records
|
|
"eiendom": _slim_eiendom(eu, comps) if eu else None,
|
|
"similar_units": _slim_comps(comps),
|
|
}
|
|
|
|
|
|
def _slim_analyze_ad(result: dict) -> dict:
|
|
"""Shape the single-ad analyze_ad result for MCP output.
|
|
|
|
The service returns {ad: FinnAd, eiendom_unit: EiendomUnit, similar_units: [...]}.
|
|
Flatten the ad fields up, keep listing_description, attach slim eiendom + comps,
|
|
and strip unit_images / unit_vector / lat / lng / internal timestamps.
|
|
"""
|
|
ad = result.get("ad") or {}
|
|
eu = result.get("eiendom_unit") or {}
|
|
comps = result.get("similar_units") or []
|
|
|
|
out: dict[str, Any] = {
|
|
"finnkode": ad.get("finnkode"),
|
|
"url": ad.get("url"),
|
|
"title": ad.get("title"),
|
|
"address": ad.get("address"),
|
|
"district": ad.get("district"),
|
|
"listing_description": ad.get("listing_description"),
|
|
"property_type": ad.get("property_type"),
|
|
"ownership_type": ad.get("ownership_type"),
|
|
"floor": ad.get("floor"),
|
|
"area_m2": ad.get("area_m2"),
|
|
"rooms": ad.get("rooms"),
|
|
"bedrooms": ad.get("bedrooms"),
|
|
"total_price": ad.get("total_price"),
|
|
"asking_price": ad.get("asking_price"),
|
|
"shared_debt": ad.get("shared_debt"),
|
|
"common_costs": ad.get("common_costs"),
|
|
"construction_year": ad.get("construction_year"),
|
|
"energy_rating": ad.get("energy_rating"),
|
|
"has_balcony": ad.get("has_balcony"),
|
|
"has_terrace": ad.get("has_terrace"),
|
|
"has_elevator": ad.get("has_elevator"),
|
|
"has_parking": ad.get("has_parking"),
|
|
"has_garage": ad.get("has_garage"),
|
|
"eiendom_unit_code": ad.get("eiendom_unit_code"),
|
|
"eiendom": _slim_eiendom(eu, comps) if eu else None,
|
|
"similar_units": _slim_comps(comps),
|
|
}
|
|
return out
|
|
|
|
|
|
def _build_slim_search_result(full: dict) -> dict:
|
|
"""Convert full analyze_search output to a compact MCP-safe response.
|
|
|
|
Removes search_cards (redundant), drops all fat fields from individual
|
|
listings. Target: <200KB for 30 analyzed ads.
|
|
"""
|
|
listings = [
|
|
_slim_listing(rank + 1, item) for rank, item in enumerate(full.get("analysis") or [])
|
|
]
|
|
return {
|
|
"search_url": full.get("search_url"),
|
|
"summary": full.get("summary"),
|
|
"listings": listings,
|
|
}
|
|
|
|
|
|
def _build_transport_security() -> TransportSecuritySettings:
|
|
allowed = os.getenv("MCP_ALLOWED_HOSTS", "")
|
|
if allowed:
|
|
hosts = [h.strip() for h in allowed.split(",")]
|
|
return TransportSecuritySettings(
|
|
enable_dns_rebinding_protection=True,
|
|
allowed_hosts=["127.0.0.1:*", "localhost:*", "[::1]:*"] + hosts,
|
|
)
|
|
return TransportSecuritySettings(enable_dns_rebinding_protection=False)
|
|
|
|
|
|
mcp = FastMCP("finn_eiendom_mcp", transport_security=_build_transport_security())
|
|
|
|
|
|
@mcp.tool(
|
|
description=(
|
|
"Analyze a FINN.no real estate search URL. Scrapes listing cards,"
|
|
" fetches details, enriches with Eiendom.no data, scores, and ranks."
|
|
" Fetches all ads in parallel (phase 1) then scores from cache (phase 2)."
|
|
" Progress updates are emitted during phase 1."
|
|
)
|
|
)
|
|
async def finn_analyze_search(
|
|
search_url: str,
|
|
ctx: Context,
|
|
max_pages: int = 3,
|
|
detail_limit: int = 20,
|
|
include_details: bool = True,
|
|
include_eiendom_no: bool = True,
|
|
) -> str:
|
|
"""Analyze a FINN search URL and return ranked listing results."""
|
|
try:
|
|
result = await analyze_search(
|
|
search_url,
|
|
max_pages=max_pages,
|
|
include_details=include_details,
|
|
detail_limit=detail_limit,
|
|
include_eiendom_no=include_eiendom_no,
|
|
ctx=ctx,
|
|
)
|
|
return json.dumps(_build_slim_search_result(result), default=str)
|
|
except Exception as e:
|
|
logger.error(f"Error analyzing search: {e}")
|
|
return json.dumps({"error": True, "message": str(e)})
|
|
|
|
|
|
@mcp.tool(
|
|
description=(
|
|
"Fetch and analyze unit images for visual assessment of a property. "
|
|
"Downloads photos and returns them as visual image content so Claude can "
|
|
"directly assess views, condition, layout, kitchen/bathroom quality, and atmosphere."
|
|
)
|
|
)
|
|
async def finn_analyze_unit_images(
|
|
unit_code: str,
|
|
force_refresh: bool = False,
|
|
max_images: int = 8,
|
|
) -> list:
|
|
"""Fetch unit images and return as vision-compatible image content blocks."""
|
|
try:
|
|
result = await get_unit_images(unit_code, force_refresh=force_refresh)
|
|
all_urls = result.get("unit_images") or []
|
|
urls = all_urls[:max_images]
|
|
|
|
header = (
|
|
f"{result.get('address', unit_code)} | "
|
|
f"{result.get('rooms')} rom | "
|
|
f"{result.get('usable_area')}m² | "
|
|
f"{len(all_urls)} bilder totalt, viser {len(urls)}"
|
|
)
|
|
content: list = [TextContent(type="text", text=header)]
|
|
|
|
async def _fetch(url: str) -> ImageContent | None:
|
|
try:
|
|
async with httpx.AsyncClient(timeout=15) as client:
|
|
resp = await client.get(url)
|
|
if resp.status_code != 200:
|
|
return None
|
|
|
|
# Resize to max 1024px on longest side before encoding.
|
|
# Raw real estate photos are 2-4MB — must compress to stay
|
|
# within the 1MB MCP tool result limit across multiple images.
|
|
from PIL import Image
|
|
import io
|
|
|
|
img = Image.open(io.BytesIO(resp.content))
|
|
img.thumbnail((1024, 1024), Image.LANCZOS)
|
|
if img.mode in ("RGBA", "P"):
|
|
img = img.convert("RGB")
|
|
buf = io.BytesIO()
|
|
img.save(buf, format="JPEG", quality=75, optimize=True)
|
|
b64 = base64.b64encode(buf.getvalue()).decode()
|
|
return ImageContent(type="image", data=b64, mimeType="image/jpeg")
|
|
except Exception as exc:
|
|
logger.warning("Failed to fetch/resize image %s: %s", url, exc)
|
|
return None
|
|
|
|
fetched = await asyncio.gather(*[_fetch(u) for u in urls])
|
|
content.extend(img for img in fetched if img is not None)
|
|
return content
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error fetching unit images for {unit_code}: {e}")
|
|
return [TextContent(type="text", text=json.dumps({"error": True, "message": str(e)}))]
|
|
|
|
|
|
# ============================================================================
|
|
# Additional analysis and enrichment tools
|
|
# ============================================================================
|
|
|
|
|
|
@mcp.tool(
|
|
description=(
|
|
"Deep-dive one or more FINN listings. Accepts a single finnkode or a list "
|
|
"(batched in one call). Always enriches with Eiendom.no data and comparable "
|
|
"sold units. Returns listing_description plus slim eiendom/comps; excludes "
|
|
"image URLs and internal vectors (use finn_analyze_unit_images for visuals)."
|
|
)
|
|
)
|
|
async def finn_analyze_ad(finnkode: str | list[str]) -> str:
|
|
"""Analyze and enrich one or more FINN ads. Batch input returns a list."""
|
|
finnkoder = [finnkode] if isinstance(finnkode, str) else list(finnkode)
|
|
|
|
async def _one(fk: str) -> dict:
|
|
try:
|
|
result = await analyze_ad(
|
|
fk,
|
|
include_eiendom_no=True,
|
|
include_similar_units=True,
|
|
)
|
|
return _slim_analyze_ad(result)
|
|
except Exception as e: # noqa: BLE001 — per-item isolation, batch must not abort
|
|
logger.error(f"Error analyzing ad {fk}: {e}")
|
|
return {"finnkode": fk, "error": True, "message": str(e)}
|
|
|
|
results = await asyncio.gather(*[_one(fk) for fk in finnkoder])
|
|
|
|
# Single string input → single object; list input → list (preserves order).
|
|
payload: Any = results[0] if isinstance(finnkode, str) else results
|
|
return json.dumps(payload, default=str)
|
|
|
|
|
|
@mcp.tool(
|
|
description="Store user feedback (verdict, notes) for a FINN listing. "
|
|
"Enables similar-unit recommendations and shortlist filtering."
|
|
)
|
|
async def finn_save_feedback(finnkode: str, verdict: str, notes: str | None = None) -> str:
|
|
"""Save user feedback for a listing."""
|
|
try:
|
|
result = save_feedback(finnkode, verdict, notes)
|
|
return json.dumps(result, default=str)
|
|
except Exception as e:
|
|
logger.error(f"Error saving feedback for {finnkode}: {e}")
|
|
return json.dumps({"error": True, "message": str(e)})
|
|
|
|
|
|
@mcp.tool(
|
|
description="Fetch the shortlist of listings you have given a verdict "
|
|
"(liked, disliked, maybe, visited). Enriched with cached score and price data."
|
|
)
|
|
def finn_get_shortlist(verdict: str = "liked", limit: int = 10) -> str:
|
|
"""Get stored shortlist filtered by verdict."""
|
|
try:
|
|
result = get_shortlist(verdict, limit)
|
|
return render_shortlist(result, "json")
|
|
except Exception as e:
|
|
logger.error(f"Error fetching shortlist: {e}")
|
|
return json.dumps({"error": True, "message": str(e)})
|
|
|
|
|
|
@mcp.tool(
|
|
description=(
|
|
"Detect new, removed, and changed listings in a FINN search URL "
|
|
"compared to the previous run. Shows price/status diffs on changed listings."
|
|
)
|
|
)
|
|
async def finn_get_new_ads_since_last_run(search_url: str) -> str:
|
|
"""Get new/removed/changed listings since last run."""
|
|
try:
|
|
result = get_new_ads_since_last_run(search_url)
|
|
return render_diff(result, "json")
|
|
except Exception as e:
|
|
logger.error(f"Error fetching diff: {e}")
|
|
return json.dumps({"error": True, "message": str(e)})
|
|
|
|
|
|
def main() -> None:
|
|
"""Run the FastMCP server over stdio (standard MCP transport)."""
|
|
mcp.run(transport="stdio")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|