"""FastMCP stdio server for FINN real estate analysis and Eiendom.no enrichment.""" import base64 import json import logging from typing import Any import os import asyncio import httpx from mcp.server.transport_security import TransportSecuritySettings from mcp.server.fastmcp import Context, FastMCP from mcp.types import ImageContent, TextContent from .formatting import ( render_diff, render_shortlist, ) from .service import ( analyze_ad, analyze_search, get_new_ads_since_last_run, get_shortlist, get_unit_images, save_feedback, ) logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Response shaping # --------------------------------------------------------------------------- def _slim_comp(c: dict) -> dict: """Drop internal IDs, coords, redundant status fields from a comparable unit.""" return { "unit_code": c.get("unit_code"), "address": c.get("address"), "usable_area": c.get("usable_area"), "rooms": c.get("rooms"), "floor": c.get("floor"), "construction_year": c.get("construction_year"), "listing_price": c.get("listing_price"), "selling_price": c.get("selling_price"), "shared_debt": c.get("shared_debt"), "sqm_price": c.get("sqm_price"), "common_costs": c.get("common_costs"), "days_on_market": c.get("days_on_market"), "finalized_at": (c.get("finalized_at") or "")[:10], } def _slim_comps(comps: list[dict], keep: int = 15) -> list[dict]: """Sort comps by recency, keep the N most recent — older comps lose relevance fast.""" sorted_comps = sorted(comps, key=lambda c: c.get("finalized_at") or "", reverse=True) return [_slim_comp(c) for c in sorted_comps[:keep]] def _avg_comp_sqm(comps: list[dict]) -> int | None: sqm_prices = [c["sqm_price"] for c in comps if c.get("sqm_price")] return round(sum(sqm_prices) / len(sqm_prices)) if sqm_prices else None def _slim_eiendom(eu: dict, comps: list[dict]) -> dict: """Compact Eiendom.no unit view. Drops unit_images, unit_vector, lat/lng, timestamps.""" return { "unit_code": eu.get("unit_code"), "usable_area": eu.get("usable_area"), "estimated_price": eu.get("estimated_selling_price"), "estimated_range": [ eu.get("estimated_selling_price_lower"), eu.get("estimated_selling_price_upper"), ], "listing_sqm_price": eu.get("listing_sqm_price"), "market_placement": eu.get("market_placement_score"), "sale_status": eu.get("sale_status"), "days_on_market": eu.get("days_on_market"), "avg_comp_sqm_price": _avg_comp_sqm(comps), "comp_count": len(comps), } def _slim_listing(rank: int, item: dict) -> dict: """Collapse one full analyze_ad result into a compact listing card. Keeps: listing_description (for AI interpretation), price_history, cache_age, score breakdown. Drops: unit_images, unit_vector, internal eiendom_unit timestamps. Derives: avg_comp_sqm_price from similar_units. """ eu = item.get("eiendom_unit") or {} comps = item.get("similar_units") or [] score = item.get("score") or {} summary = item.get("summary") or {} price_history = item.get("price_history") or [] return { "rank": rank, "finnkode": item.get("finnkode"), "url": item.get("url"), "title": item.get("title"), "address": item.get("address"), "listing_description": item.get("listing_description"), "district": item.get("district"), "property_type": item.get("property_type"), "ownership_type": item.get("ownership_type"), "floor": item.get("floor"), "area_m2": item.get("area_m2"), "bedrooms": item.get("bedrooms"), "rooms": item.get("rooms"), "total_price": item.get("total_price"), "asking_price": item.get("asking_price"), "shared_debt": item.get("shared_debt"), "common_costs": item.get("common_costs"), "construction_year": item.get("construction_year"), "has_balcony": item.get("has_balcony"), "has_terrace": item.get("has_terrace"), "has_elevator": item.get("has_elevator"), "has_parking": item.get("has_parking"), "has_garage": item.get("has_garage"), "eiendom_unit_code": item.get("eiendom_unit_code"), "score": dict(score), "categories": item.get("categories"), "why_interesting": summary.get("why_interesting"), "risks": summary.get("risks"), "cache_age": item.get("cache_age"), "price_history": price_history[:5], # Last 5 price records "eiendom": _slim_eiendom(eu, comps) if eu else None, "similar_units": _slim_comps(comps), } def _slim_analyze_ad(result: dict) -> dict: """Shape the single-ad analyze_ad result for MCP output. The service returns {ad: FinnAd, eiendom_unit: EiendomUnit, similar_units: [...]}. Flatten the ad fields up, keep listing_description, attach slim eiendom + comps, and strip unit_images / unit_vector / lat / lng / internal timestamps. """ ad = result.get("ad") or {} eu = result.get("eiendom_unit") or {} comps = result.get("similar_units") or [] out: dict[str, Any] = { "finnkode": ad.get("finnkode"), "url": ad.get("url"), "title": ad.get("title"), "address": ad.get("address"), "district": ad.get("district"), "listing_description": ad.get("listing_description"), "property_type": ad.get("property_type"), "ownership_type": ad.get("ownership_type"), "floor": ad.get("floor"), "area_m2": ad.get("area_m2"), "rooms": ad.get("rooms"), "bedrooms": ad.get("bedrooms"), "total_price": ad.get("total_price"), "asking_price": ad.get("asking_price"), "shared_debt": ad.get("shared_debt"), "common_costs": ad.get("common_costs"), "construction_year": ad.get("construction_year"), "energy_rating": ad.get("energy_rating"), "has_balcony": ad.get("has_balcony"), "has_terrace": ad.get("has_terrace"), "has_elevator": ad.get("has_elevator"), "has_parking": ad.get("has_parking"), "has_garage": ad.get("has_garage"), "eiendom_unit_code": ad.get("eiendom_unit_code"), "eiendom": _slim_eiendom(eu, comps) if eu else None, "similar_units": _slim_comps(comps), } return out def _build_slim_search_result(full: dict) -> dict: """Convert full analyze_search output to a compact MCP-safe response. Removes search_cards (redundant), drops all fat fields from individual listings. Target: <200KB for 30 analyzed ads. """ listings = [ _slim_listing(rank + 1, item) for rank, item in enumerate(full.get("analysis") or []) ] return { "search_url": full.get("search_url"), "summary": full.get("summary"), "listings": listings, } def _build_transport_security() -> TransportSecuritySettings: allowed = os.getenv("MCP_ALLOWED_HOSTS", "") if allowed: hosts = [h.strip() for h in allowed.split(",")] return TransportSecuritySettings( enable_dns_rebinding_protection=True, allowed_hosts=["127.0.0.1:*", "localhost:*", "[::1]:*"] + hosts, ) return TransportSecuritySettings(enable_dns_rebinding_protection=False) mcp = FastMCP("finn_eiendom_mcp", transport_security=_build_transport_security()) @mcp.tool( description=( "Analyze a FINN.no real estate search URL. Scrapes listing cards," " fetches details, enriches with Eiendom.no data, scores, and ranks." " Fetches all ads in parallel (phase 1) then scores from cache (phase 2)." " Progress updates are emitted during phase 1." ) ) async def finn_analyze_search( search_url: str, ctx: Context, max_pages: int = 3, detail_limit: int = 20, include_details: bool = True, include_eiendom_no: bool = True, ) -> str: """Analyze a FINN search URL and return ranked listing results.""" try: result = await analyze_search( search_url, max_pages=max_pages, include_details=include_details, detail_limit=detail_limit, include_eiendom_no=include_eiendom_no, ctx=ctx, ) return json.dumps(_build_slim_search_result(result), default=str) except Exception as e: logger.error(f"Error analyzing search: {e}") return json.dumps({"error": True, "message": str(e)}) @mcp.tool( description=( "Fetch and analyze unit images for visual assessment of a property. " "Downloads photos and returns them as visual image content so Claude can " "directly assess views, condition, layout, kitchen/bathroom quality, and atmosphere." ) ) async def finn_analyze_unit_images( unit_code: str, force_refresh: bool = False, max_images: int = 8, ) -> list: """Fetch unit images and return as vision-compatible image content blocks.""" try: result = await get_unit_images(unit_code, force_refresh=force_refresh) all_urls = result.get("unit_images") or [] urls = all_urls[:max_images] header = ( f"{result.get('address', unit_code)} | " f"{result.get('rooms')} rom | " f"{result.get('usable_area')}m² | " f"{len(all_urls)} bilder totalt, viser {len(urls)}" ) content: list = [TextContent(type="text", text=header)] async def _fetch(url: str) -> ImageContent | None: try: async with httpx.AsyncClient(timeout=15) as client: resp = await client.get(url) if resp.status_code != 200: return None # Resize to max 1024px on longest side before encoding. # Raw real estate photos are 2-4MB — must compress to stay # within the 1MB MCP tool result limit across multiple images. from PIL import Image import io img = Image.open(io.BytesIO(resp.content)) img.thumbnail((1024, 1024), Image.LANCZOS) if img.mode in ("RGBA", "P"): img = img.convert("RGB") buf = io.BytesIO() img.save(buf, format="JPEG", quality=75, optimize=True) b64 = base64.b64encode(buf.getvalue()).decode() return ImageContent(type="image", data=b64, mimeType="image/jpeg") except Exception as exc: logger.warning("Failed to fetch/resize image %s: %s", url, exc) return None fetched = await asyncio.gather(*[_fetch(u) for u in urls]) content.extend(img for img in fetched if img is not None) return content except Exception as e: logger.error(f"Error fetching unit images for {unit_code}: {e}") return [TextContent(type="text", text=json.dumps({"error": True, "message": str(e)}))] # ============================================================================ # Additional analysis and enrichment tools # ============================================================================ @mcp.tool( description=( "Deep-dive one or more FINN listings. Accepts a single finnkode or a list " "(batched in one call). Always enriches with Eiendom.no data and comparable " "sold units. Returns listing_description plus slim eiendom/comps; excludes " "image URLs and internal vectors (use finn_analyze_unit_images for visuals)." ) ) async def finn_analyze_ad(finnkode: str | list[str]) -> str: """Analyze and enrich one or more FINN ads. Batch input returns a list.""" finnkoder = [finnkode] if isinstance(finnkode, str) else list(finnkode) async def _one(fk: str) -> dict: try: result = await analyze_ad( fk, include_eiendom_no=True, include_similar_units=True, ) return _slim_analyze_ad(result) except Exception as e: # noqa: BLE001 — per-item isolation, batch must not abort logger.error(f"Error analyzing ad {fk}: {e}") return {"finnkode": fk, "error": True, "message": str(e)} results = await asyncio.gather(*[_one(fk) for fk in finnkoder]) # Single string input → single object; list input → list (preserves order). payload: Any = results[0] if isinstance(finnkode, str) else results return json.dumps(payload, default=str) @mcp.tool( description="Store user feedback (verdict, notes) for a FINN listing. " "Enables similar-unit recommendations and shortlist filtering." ) async def finn_save_feedback(finnkode: str, verdict: str, notes: str | None = None) -> str: """Save user feedback for a listing.""" try: result = save_feedback(finnkode, verdict, notes) return json.dumps(result, default=str) except Exception as e: logger.error(f"Error saving feedback for {finnkode}: {e}") return json.dumps({"error": True, "message": str(e)}) @mcp.tool( description="Fetch the shortlist of listings you have given a verdict " "(liked, disliked, maybe, visited). Enriched with cached score and price data." ) def finn_get_shortlist(verdict: str = "liked", limit: int = 10) -> str: """Get stored shortlist filtered by verdict.""" try: result = get_shortlist(verdict, limit) return render_shortlist(result, "json") except Exception as e: logger.error(f"Error fetching shortlist: {e}") return json.dumps({"error": True, "message": str(e)}) @mcp.tool( description=( "Detect new, removed, and changed listings in a FINN search URL " "compared to the previous run. Shows price/status diffs on changed listings." ) ) async def finn_get_new_ads_since_last_run(search_url: str) -> str: """Get new/removed/changed listings since last run.""" try: result = get_new_ads_since_last_run(search_url) return render_diff(result, "json") except Exception as e: logger.error(f"Error fetching diff: {e}") return json.dumps({"error": True, "message": str(e)}) def main() -> None: """Run the FastMCP server over stdio (standard MCP transport).""" mcp.run(transport="stdio") if __name__ == "__main__": main()