"""FastMCP stdio server for FINN real estate analysis and Eiendom.no enrichment.""" import base64 import json import logging from typing import Any import os import asyncio import httpx from mcp.server.transport_security import TransportSecuritySettings from mcp.server.fastmcp import Context, FastMCP from mcp.types import ImageContent, TextContent from .eiendom_no import ( build_unit_vector, decode_unit_vector, get_similar_units, get_unit, search_unit_from_finn_url, ) from .formatting import ( render_ad, render_comparison, render_diff, render_shortlist, render_similar_units, render_unit_images, ) from .service import ( analyze_ad, analyze_ad_against_comps, analyze_search, compare_ads, find_similar_to_liked, get_new_ads_since_last_run, get_or_fetch_ad, get_or_fetch_eiendom_unit, get_shortlist, get_unit_images, save_feedback, ) logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Response shaping # --------------------------------------------------------------------------- def _slim_listing(rank: int, item: dict) -> dict: """Collapse one full analyze_ad result into a compact listing card. Keeps: listing_description (for AI interpretation), price_history, cache_age, score breakdown. Drops: unit_images, unit_vector, internal eiendom_unit timestamps. Derives: avg_comp_sqm_price from similar_units. """ eu = item.get("eiendom_unit") or {} comps = item.get("similar_units") or [] sqm_prices = [c["sqm_price"] for c in comps if c.get("sqm_price")] avg_comp_sqm = round(sum(sqm_prices) / len(sqm_prices)) if sqm_prices else None # Slim comps: drop internal IDs, coords, redundant status fields. # Sort by recency, keep 15 most recent — older comps lose relevance fast. def _slim_comp(c: dict) -> dict: return { "unit_code": c.get("unit_code"), "address": c.get("address"), "usable_area": c.get("usable_area"), "rooms": c.get("rooms"), "floor": c.get("floor"), "construction_year": c.get("construction_year"), "listing_price": c.get("listing_price"), "selling_price": c.get("selling_price"), "shared_debt": c.get("shared_debt"), "sqm_price": c.get("sqm_price"), "common_costs": c.get("common_costs"), "days_on_market": c.get("days_on_market"), "finalized_at": (c.get("finalized_at") or "")[:10], } sorted_comps = sorted(comps, key=lambda c: c.get("finalized_at") or "", reverse=True) slim_comps = [_slim_comp(c) for c in sorted_comps[:15]] score = item.get("score") or {} summary = item.get("summary") or {} price_history = item.get("price_history") or [] cache_age = item.get("cache_age") # Keep full score breakdown — 12 dimensions + nearby_transit = ~220 bytes, all signal. # Drop nothing from scores. slim_score = {k: v for k, v in score.items()} eiendom: dict | None = None if eu: eiendom = { "unit_code": eu.get("unit_code"), "usable_area": eu.get("usable_area"), "estimated_price": eu.get("estimated_selling_price"), "estimated_range": [ eu.get("estimated_selling_price_lower"), eu.get("estimated_selling_price_upper"), ], "listing_sqm_price": eu.get("listing_sqm_price"), "market_placement": eu.get("market_placement_score"), "sale_status": eu.get("sale_status"), "days_on_market": eu.get("days_on_market"), "avg_comp_sqm_price": avg_comp_sqm, "comp_count": len(comps), } return { "rank": rank, "finnkode": item.get("finnkode"), "url": item.get("url"), "title": item.get("title"), "address": item.get("address"), "listing_description": item.get("listing_description"), "district": item.get("district"), "property_type": item.get("property_type"), "ownership_type": item.get("ownership_type"), "floor": item.get("floor"), "area_m2": item.get("area_m2"), "bedrooms": item.get("bedrooms"), "rooms": item.get("rooms"), "total_price": item.get("total_price"), "asking_price": item.get("asking_price"), "shared_debt": item.get("shared_debt"), "common_costs": item.get("common_costs"), "construction_year": item.get("construction_year"), "has_balcony": item.get("has_balcony"), "has_terrace": item.get("has_terrace"), "has_elevator": item.get("has_elevator"), "has_parking": item.get("has_parking"), "has_garage": item.get("has_garage"), "eiendom_unit_code": item.get("eiendom_unit_code"), "score": slim_score, "categories": item.get("categories"), "why_interesting": summary.get("why_interesting"), "risks": summary.get("risks"), "cache_age": cache_age, "price_history": price_history[:5], # Last 5 price records "eiendom": eiendom, "similar_units": slim_comps, } def _build_slim_search_result(full: dict) -> dict: """Convert full analyze_search output to a compact MCP-safe response. Removes search_cards (redundant), drops all fat fields from individual listings. Target: <200KB for 30 analyzed ads. """ listings = [ _slim_listing(rank + 1, item) for rank, item in enumerate(full.get("analysis") or []) ] return { "search_url": full.get("search_url"), "summary": full.get("summary"), "listings": listings, } def _build_transport_security() -> TransportSecuritySettings: allowed = os.getenv("MCP_ALLOWED_HOSTS", "") if allowed: hosts = [h.strip() for h in allowed.split(",")] return TransportSecuritySettings( enable_dns_rebinding_protection=True, allowed_hosts=["127.0.0.1:*", "localhost:*", "[::1]:*"] + hosts, ) return TransportSecuritySettings(enable_dns_rebinding_protection=False) mcp = FastMCP("finn_eiendom_mcp", transport_security=_build_transport_security()) @mcp.tool( description=( "Analyze a FINN.no real estate search URL. Scrapes listing cards," " fetches details, enriches with Eiendom.no data, scores, and ranks." " Fetches all ads in parallel (phase 1) then scores from cache (phase 2)." " Progress updates are emitted during phase 1." ) ) async def finn_analyze_search( search_url: str, ctx: Context, max_pages: int = 3, detail_limit: int = 20, include_details: bool = True, include_eiendom_no: bool = True, ) -> str: """Analyze a FINN search URL and return ranked listing results.""" try: result = await analyze_search( search_url, max_pages=max_pages, include_details=include_details, detail_limit=detail_limit, include_eiendom_no=include_eiendom_no, ctx=ctx, ) return json.dumps(_build_slim_search_result(result), default=str) except Exception as e: logger.error(f"Error analyzing search: {e}") return json.dumps({"error": True, "message": str(e)}) @mcp.tool( description=( "Fetch full detail for a FINN listing by finnkode." " Checks cache first; use force_refresh=True to bypass." ) ) async def finn_get_ad(finnkode: str, force_refresh: bool = False) -> str: """Fetch FINN ad details by finnkode.""" try: ad = await get_or_fetch_ad(finnkode, force_refresh=force_refresh) return ad.model_dump_json() except Exception as e: logger.error(f"Error fetching ad {finnkode}: {e}") return json.dumps({"error": True, "message": str(e)}) @mcp.tool( description="Resolve an Eiendom.no unit_code from a FINN listing URL. " "Returns unit_code, address, lat, lng or an error if not found." ) async def finn_resolve_eiendom_unit(finn_url: str) -> str: """Resolve Eiendom.no unit from FINN URL.""" try: unit = await search_unit_from_finn_url(finn_url) if unit is None: return json.dumps( { "error": True, "message": "Eiendom.no unit could not be resolved from FINN URL", } ) return json.dumps( { "unit_code": unit.unit_code, "address": unit.address, "lat": unit.lat, "lng": unit.lng, } ) except Exception as e: logger.error(f"Error resolving unit from {finn_url}: {e}") return json.dumps({"error": True, "message": str(e)}) @mcp.tool( description="Fetch full Eiendom.no unit data by unit_code. Checks SQLite cache (24h TTL)." ) async def finn_get_eiendom_unit(unit_code: str, force_refresh: bool = False) -> str: """Fetch Eiendom.no unit details by unit_code.""" try: unit = await get_or_fetch_eiendom_unit(unit_code, force_refresh=force_refresh) if unit is None: return json.dumps({"error": True, "message": "Eiendom.no unit not found"}) return unit.model_dump_json() except Exception as e: logger.error(f"Error fetching unit {unit_code}: {e}") return json.dumps({"error": True, "message": str(e)}) @mcp.tool( description=( "Fetch and analyze unit images for visual assessment of a property. " "Downloads photos and returns them as visual image content so Claude can " "directly assess views, condition, layout, kitchen/bathroom quality, and atmosphere." ) ) async def finn_analyze_unit_images( unit_code: str, force_refresh: bool = False, max_images: int = 8, ) -> list: """Fetch unit images and return as vision-compatible image content blocks.""" try: result = await get_unit_images(unit_code, force_refresh=force_refresh) all_urls = result.get("unit_images") or [] urls = all_urls[:max_images] header = ( f"{result.get('address', unit_code)} | " f"{result.get('rooms')} rom | " f"{result.get('usable_area')}m² | " f"{len(all_urls)} bilder totalt, viser {len(urls)}" ) content: list = [TextContent(type="text", text=header)] async def _fetch(url: str) -> ImageContent | None: try: async with httpx.AsyncClient(timeout=15) as client: resp = await client.get(url) if resp.status_code != 200: return None # Resize to max 1024px on longest side before encoding. # Raw real estate photos are 2-4MB — must compress to stay # within the 1MB MCP tool result limit across multiple images. from PIL import Image import io img = Image.open(io.BytesIO(resp.content)) img.thumbnail((1024, 1024), Image.LANCZOS) if img.mode in ("RGBA", "P"): img = img.convert("RGB") buf = io.BytesIO() img.save(buf, format="JPEG", quality=75, optimize=True) b64 = base64.b64encode(buf.getvalue()).decode() return ImageContent(type="image", data=b64, mimeType="image/jpeg") except Exception as exc: logger.warning("Failed to fetch/resize image %s: %s", url, exc) return None fetched = await asyncio.gather(*[_fetch(u) for u in urls]) content.extend(img for img in fetched if img is not None) return content except Exception as e: logger.error(f"Error fetching unit images for {unit_code}: {e}") return [TextContent(type="text", text=json.dumps({"error": True, "message": str(e)}))] @mcp.tool( description="Fetch comparable recently-sold or for-sale units from Eiendom.no using a " "base64-encoded unit vector. Returns list of similar units with sale prices." ) async def finn_get_similar_units(unit_vector: str, listing_status: str = "RECENTLY_SOLD") -> str: """Fetch similar units from Eiendom.no.""" try: units = await get_similar_units(unit_vector, listing_status) return json.dumps([unit.model_dump() for unit in units], default=str) except Exception as e: logger.error(f"Error fetching similar units: {e}") return json.dumps({"error": True, "message": str(e)}) @mcp.tool( description="Build a base64-encoded unit vector for a given Eiendom.no unit_code. " "The vector is used as input to finn_get_similar_units." ) async def finn_build_unit_vector(unit_code: str) -> str: """Build unit vector for Eiendom.no unit.""" try: unit = await get_unit(unit_code) if unit is None: return json.dumps({"error": True, "message": "Eiendom.no unit not found"}) return json.dumps({"unit_code": unit.unit_code, "unit_vector": build_unit_vector(unit)}) except Exception as e: logger.error(f"Error building unit vector for {unit_code}: {e}") return json.dumps({"error": True, "message": str(e)}) @mcp.tool( description="Decode a base64 unit vector into human-readable JSON (lat, lon, property type, " "floor, rooms, construction year, area, price)." ) def finn_decode_unit_vector(unit_vector: str) -> str: """Decode unit vector to readable format.""" try: result = decode_unit_vector(unit_vector) return json.dumps(result) except Exception as e: logger.error(f"Error decoding unit vector: {e}") return json.dumps({"error": True, "message": str(e)}) # ============================================================================ # Additional analysis and enrichment tools # ============================================================================ @mcp.tool( description=( "Fetch and enrich a single FINN ad with optional Eiendom.no data and comparable units." ) ) async def finn_analyze_ad( finnkode: str, include_eiendom_no: bool = True, include_similar_units: bool = False, ) -> str: """Analyze and enrich a single FINN ad.""" try: result = await analyze_ad( finnkode, include_eiendom_no=include_eiendom_no, include_similar_units=include_similar_units, ) return json.dumps(result, default=str) except Exception as e: logger.error(f"Error analyzing ad {finnkode}: {e}") return json.dumps({"error": True, "message": str(e)}) @mcp.tool( description=( "Evaluate one FINN listing against comparable recently-sold properties from Eiendom.no." ) ) async def finn_analyze_ad_against_comps( finnkode: str, listing_status: str = "RECENTLY_SOLD" ) -> str: """Analyze ad against comparable sales.""" try: result = await analyze_ad_against_comps(finnkode, listing_status=listing_status) return json.dumps(result, default=str) except Exception as e: logger.error(f"Error analyzing ad {finnkode} against comps: {e}") return json.dumps({"error": True, "message": str(e)}) @mcp.tool( description=( "Find properties similar to a listing the user has liked. " "Requires that the user has marked the listing with verdict='liked'." ) ) async def finn_find_similar_to_liked_ad( finnkode: str, mode: str = "recommendations", listing_status: str = "FOR_SALE" ) -> str: """Find properties similar to a liked ad.""" try: result = await find_similar_to_liked(finnkode, mode=mode, listing_status=listing_status) return render_similar_units(result, "json") except Exception as e: logger.error(f"Error finding similar to {finnkode}: {e}") return json.dumps({"error": True, "message": str(e)}) @mcp.tool(description="Compare multiple FINN listings side by side with optional enrichment.") async def finn_compare_ads( finnkoder: list[str], include_eiendom_no: bool = True, include_comps: bool = True, ) -> str: """Compare multiple ads.""" try: result = await compare_ads( finnkoder, include_eiendom_no=include_eiendom_no, include_comps=include_comps, ) return render_comparison(result, "json") except Exception as e: logger.error(f"Error comparing ads: {e}") return json.dumps({"error": True, "message": str(e)}) @mcp.tool( description="Store user feedback (verdict, notes) for a FINN listing. " "Enables similar-unit recommendations and shortlist filtering." ) async def finn_save_feedback(finnkode: str, verdict: str, notes: str | None = None) -> str: """Save user feedback for a listing.""" try: result = save_feedback(finnkode, verdict, notes) return json.dumps(result, default=str) except Exception as e: logger.error(f"Error saving feedback for {finnkode}: {e}") return json.dumps({"error": True, "message": str(e)}) @mcp.tool( description="Fetch the stored shortlist from a previous search run. " "Returns the ranked listings with all enrichment data." ) def finn_get_shortlist(run_id: int | None = None, limit: int = 10) -> str: """Get stored shortlist.""" try: result = get_shortlist(run_id, limit) return render_shortlist(result, "json") except Exception as e: logger.error(f"Error fetching shortlist: {e}") return json.dumps({"error": True, "message": str(e)}) @mcp.tool( description=( "Detect new, removed, and changed listings in a FINN search URL " "compared to the previous run. Shows price/status diffs on changed listings." ) ) async def finn_get_new_ads_since_last_run(search_url: str) -> str: """Get new/removed/changed listings since last run.""" try: result = get_new_ads_since_last_run(search_url) return render_diff(result, "json") except Exception as e: logger.error(f"Error fetching diff: {e}") return json.dumps({"error": True, "message": str(e)}) def main() -> None: """Run the FastMCP server over stdio (standard MCP transport).""" mcp.run(transport="stdio") if __name__ == "__main__": main()