diff --git a/finn_eiendom/analysis.py b/finn_eiendom/analysis.py index 408f187..971df3c 100644 --- a/finn_eiendom/analysis.py +++ b/finn_eiendom/analysis.py @@ -16,7 +16,9 @@ The cached result is invalidated automatically the moment any piece of underlying data changes, because the deps_hash will differ. """ +import asyncio import logging +from typing import Any from . import ad as ad_module from . import cache, eiendom_no, scoring, search @@ -43,6 +45,10 @@ from .models import EiendomUnit, FinnAd, SimilarUnit logger = logging.getLogger(__name__) +# Max parallel ad + eiendom.no fetches in analyze_search phase 1. +# High enough to be fast; low enough to avoid FINN rate-limiting. +FETCH_CONCURRENCY = 5 + def _normalize_description(text: str | None) -> str: return text.lower() if text else "" @@ -91,8 +97,6 @@ def _build_ad_summary( risks.append("Risk flags are detected in description or metadata.") if ad.common_costs and ad.common_costs > 5000: risks.append("Common costs are relatively high and should be reviewed.") - if enriched and enriched.sale_status and enriched.sale_status.upper() != "FOR_SALE": - risks.append("Eiendom.no sale status does not indicate an active sale.") if not enriched: risks.append("Missing Eiendom.no data increases uncertainty.") @@ -208,8 +212,27 @@ async def analyze_ad( result = { "finnkode": finn_ad.finnkode, + "url": finn_ad.url, "title": finn_ad.title, "address": finn_ad.address, + "district": finn_ad.district, + "property_type": finn_ad.property_type, + "ownership_type": finn_ad.ownership_type, + "floor": finn_ad.floor, + "area_m2": finn_ad.area_m2, + "bedrooms": finn_ad.bedrooms, + "rooms": finn_ad.rooms, + "total_price": finn_ad.total_price, + "asking_price": finn_ad.asking_price, + "shared_debt": finn_ad.shared_debt, + "common_costs": finn_ad.common_costs, + "construction_year": finn_ad.construction_year, + "has_balcony": finn_ad.has_balcony, + "has_terrace": finn_ad.has_terrace, + "has_elevator": finn_ad.has_elevator, + "has_parking": finn_ad.has_parking, + "has_garage": finn_ad.has_garage, + "eiendom_unit_code": finn_ad.eiendom_unit_code, "score": scores, "categories": categories, "summary": summary, @@ -226,12 +249,26 @@ async def analyze_ad( return result -async def _analyze_card(card, conn, *, include_eiendom_no: bool, client) -> dict: - """Fetch details + enrich a single search card. Raises on unrecoverable - errors; the caller is responsible for catching and skipping.""" - finn_ad = cache.get_finn_ad(conn, card.finnkode, ttl_hours=FINN_CACHE_TTL_AD_HOURS) - if finn_ad is None: - finn_ad = await ad_module.fetch_ad_details(card.finnkode, client=client) +async def _fetch_card_to_db( + card, + conn, + *, + include_eiendom_no: bool, + client, +) -> tuple["FinnAd | None", "str | None"]: + """Phase 1 worker: fetch ad details + resolve Eiendom.no unit, persist to DB. + + Returns (finn_ad, unit_code). Both can be None on failure -- the caller + treats None as a skip without aborting the whole batch. + """ + try: + finn_ad = cache.get_finn_ad(conn, card.finnkode, ttl_hours=FINN_CACHE_TTL_AD_HOURS) + if finn_ad is None: + finn_ad = await ad_module.fetch_ad_details(card.finnkode, client=client) + save_finn_ad(conn, finn_ad) + except Exception as exc: + logger.warning("Failed to fetch ad %s: %s", card.finnkode, exc) + return None, None unit_code = None if include_eiendom_no: @@ -239,11 +276,9 @@ async def _analyze_card(card, conn, *, include_eiendom_no: bool, client) -> dict matched_unit = await eiendom_no.search_unit_from_finn_url(card.url) unit_code = matched_unit.unit_code if matched_unit else None except Exception as exc: - # A failed unit resolution is non-fatal -- proceed without enrichment. logger.warning("Eiendom.no unit search failed for %s: %s", card.finnkode, exc) - unit_code = None - return await analyze_ad(finn_ad, unit_code=unit_code) + return finn_ad, unit_code async def analyze_search( @@ -254,13 +289,24 @@ async def analyze_search( include_eiendom_no: bool = True, client=None, use_cache: bool = True, + ctx: Any = None, ) -> dict: """Analyze a FINN search URL and enrich matching listings. - Search-level results are NOT cached as a whole (the search page itself - is cached at the HTML level). Individual ad analyses ARE cached via - ``analyze_ad``, so re-running a search only re-scores ads whose - underlying data has changed. + Two-phase parallel execution + ---------------------------- + Phase 1 (parallel, I/O bound): + All resale cards are fetched concurrently behind a semaphore of size + ``FETCH_CONCURRENCY``. Each worker fetches the ad detail page and + resolves the Eiendom.no unit in one shot, then writes both to SQLite. + Progress is reported via ``ctx`` if provided. + + Phase 2 (sequential, cache bound): + Scoring reads entirely from SQLite -- no network -- and is fast. + Results are sorted by total score and returned. + + Individual ad analyses ARE cached via ``analyze_ad``; re-running a search + only re-scores ads whose underlying data has changed. """ conn = cache.init_db(FINN_CACHE_PATH) cards = await search.fetch_search_pages( @@ -269,38 +315,75 @@ async def analyze_search( client=client, use_cache=use_cache, ) + + resale_cards = [c for c in cards[:detail_limit] if _is_resale_listing(c.url)] + skipped_count = len(cards[:detail_limit]) - len(resale_cards) + + if ctx is not None: + await ctx.info( + f"Found {len(cards)} listings, {len(resale_cards)} resale ads to fetch." + ) + + # ------------------------------------------------------------------ + # Phase 1: parallel fetch to DB + # ------------------------------------------------------------------ + fetched: dict[str, tuple] = {} # finnkode -> (FinnAd, unit_code | None) + fetch_counter = 0 + sem = asyncio.Semaphore(FETCH_CONCURRENCY) + + if not fetch_details: + resale_cards = [] + + async def _fetch_worker(card, idx: int) -> None: + nonlocal fetch_counter + async with sem: + finn_ad, unit_code = await _fetch_card_to_db( + card, conn, include_eiendom_no=include_eiendom_no, client=client + ) + fetched[card.finnkode] = (finn_ad, unit_code) + fetch_counter += 1 + if ctx is not None: + await ctx.report_progress(fetch_counter, len(resale_cards)) + status = "enriched" if unit_code else "no eiendom match" + await ctx.info( + f"[{fetch_counter}/{len(resale_cards)}] {card.finnkode} fetched ({status})" + ) + + await asyncio.gather(*[_fetch_worker(c, i) for i, c in enumerate(resale_cards)]) + + # ------------------------------------------------------------------ + # Phase 2: score from DB (reads cache, fast) + # ------------------------------------------------------------------ + if ctx is not None: + await ctx.info(f"All data fetched. Scoring {len(resale_cards)} ads...") + results = [] enriched_count = 0 - skipped_count = 0 - cache_hits = 0 - if fetch_details: - for card in cards[:detail_limit]: - # Project / new-build ads are not resale listings and fetch_ad_details - # cannot resolve them -- skip up front rather than 404 mid-run. - if not _is_resale_listing(card.url): - logger.info("Skipping non-resale card %s (%s)", card.finnkode, card.url) - skipped_count += 1 - continue + for card in resale_cards: + finn_ad, unit_code = fetched.get(card.finnkode, (None, None)) + if finn_ad is None: + skipped_count += 1 + continue + try: + result = await analyze_ad(finn_ad, unit_code=unit_code) + except Exception as exc: + logger.warning("Skipping card %s during scoring: %s", card.finnkode, exc) + skipped_count += 1 + continue - # One bad card (stale finnkode, removed ad, transient network error) - # must not abort the whole search -- isolate each card. - try: - result = await _analyze_card( - card, conn, include_eiendom_no=include_eiendom_no, client=client - ) - except Exception as exc: - logger.warning("Skipping card %s: %s", card.finnkode, exc) - skipped_count += 1 - continue - - if result.get("eiendom_unit"): - enriched_count += 1 - # Track analysis cache hits via the absence of recompute logging - # (the flag is not propagated up here; rely on debug logs). - results.append(result) + if result.get("eiendom_unit"): + enriched_count += 1 + results.append(result) results.sort(key=lambda item: item["score"].get("total", 0.0), reverse=True) + + if ctx is not None: + await ctx.info( + f"Done. {len(results)} analyzed, {enriched_count} enriched, " + f"{skipped_count} skipped." + ) + return { "search_url": search_url, "search_cards": [card.model_dump(mode="json") for card in cards], diff --git a/finn_eiendom/mcp_server.py b/finn_eiendom/mcp_server.py index d3c1389..b6b26db 100644 --- a/finn_eiendom/mcp_server.py +++ b/finn_eiendom/mcp_server.py @@ -1,11 +1,15 @@ """FastMCP stdio server for FINN real estate analysis and Eiendom.no enrichment.""" +import base64 import json import logging from typing import Any import os +import asyncio +import httpx from mcp.server.transport_security import TransportSecuritySettings -from mcp.server.fastmcp import FastMCP +from mcp.server.fastmcp import Context, FastMCP +from mcp.types import ImageContent, TextContent from .eiendom_no import ( build_unit_vector, @@ -39,6 +43,120 @@ from .service import ( logger = logging.getLogger(__name__) +# --------------------------------------------------------------------------- +# Response shaping +# --------------------------------------------------------------------------- + + +def _slim_listing(rank: int, item: dict) -> dict: + """Collapse one full analyze_ad result into a compact listing card. + + Drops: listing_description, unit_images, unit_vector, all timestamps, + full similar_units list, score dimension breakdown. + Derives: avg_comp_sqm_price from similar_units. + """ + eu = item.get("eiendom_unit") or {} + comps = item.get("similar_units") or [] + sqm_prices = [c["sqm_price"] for c in comps if c.get("sqm_price")] + avg_comp_sqm = round(sum(sqm_prices) / len(sqm_prices)) if sqm_prices else None + + # Slim comps: drop internal IDs, coords, redundant status fields. + # Sort by recency, keep 15 most recent — older comps lose relevance fast. + def _slim_comp(c: dict) -> dict: + return { + "unit_code": c.get("unit_code"), + "address": c.get("address"), + "usable_area": c.get("usable_area"), + "rooms": c.get("rooms"), + "floor": c.get("floor"), + "construction_year": c.get("construction_year"), + "listing_price": c.get("listing_price"), + "selling_price": c.get("selling_price"), + "shared_debt": c.get("shared_debt"), + "sqm_price": c.get("sqm_price"), + "common_costs": c.get("common_costs"), + "days_on_market": c.get("days_on_market"), + "finalized_at": (c.get("finalized_at") or "")[:10], + } + + sorted_comps = sorted(comps, key=lambda c: c.get("finalized_at") or "", reverse=True) + slim_comps = [_slim_comp(c) for c in sorted_comps[:15]] + + score = item.get("score") or {} + summary = item.get("summary") or {} + + # Keep full score breakdown — 12 dimensions + nearby_transit = ~220 bytes, all signal. + # Drop nothing from scores. + slim_score = {k: v for k, v in score.items()} + + eiendom: dict | None = None + if eu: + eiendom = { + "unit_code": eu.get("unit_code"), + "usable_area": eu.get("usable_area"), + "estimated_price": eu.get("estimated_selling_price"), + "estimated_range": [ + eu.get("estimated_selling_price_lower"), + eu.get("estimated_selling_price_upper"), + ], + "listing_sqm_price": eu.get("listing_sqm_price"), + "market_placement": eu.get("market_placement_score"), + "sale_status": eu.get("sale_status"), + "days_on_market": eu.get("days_on_market"), + "avg_comp_sqm_price": avg_comp_sqm, + "comp_count": len(comps), + } + + return { + "rank": rank, + "finnkode": item.get("finnkode"), + "url": item.get("url"), + "title": item.get("title"), + "address": item.get("address"), + "district": item.get("district"), + "property_type": item.get("property_type"), + "ownership_type": item.get("ownership_type"), + "floor": item.get("floor"), + "area_m2": item.get("area_m2"), + "bedrooms": item.get("bedrooms"), + "rooms": item.get("rooms"), + "total_price": item.get("total_price"), + "asking_price": item.get("asking_price"), + "shared_debt": item.get("shared_debt"), + "common_costs": item.get("common_costs"), + "construction_year": item.get("construction_year"), + "has_balcony": item.get("has_balcony"), + "has_terrace": item.get("has_terrace"), + "has_elevator": item.get("has_elevator"), + "has_parking": item.get("has_parking"), + "has_garage": item.get("has_garage"), + "eiendom_unit_code": item.get("eiendom_unit_code"), + "score": slim_score, + "categories": item.get("categories"), + "why_interesting": summary.get("why_interesting"), + "risks": summary.get("risks"), + "eiendom": eiendom, + "similar_units": slim_comps, + } + + +def _build_slim_search_result(full: dict) -> dict: + """Convert full analyze_search output to a compact MCP-safe response. + + Removes search_cards (redundant), drops all fat fields from individual + listings. Target: <200KB for 30 analyzed ads. + """ + listings = [ + _slim_listing(rank + 1, item) + for rank, item in enumerate(full.get("analysis") or []) + ] + return { + "search_url": full.get("search_url"), + "summary": full.get("summary"), + "listings": listings, + } + + def _build_transport_security() -> TransportSecuritySettings: allowed = os.getenv("MCP_ALLOWED_HOSTS", "") if allowed: @@ -57,10 +175,13 @@ mcp = FastMCP("finn_eiendom_mcp", transport_security=_build_transport_security() description=( "Analyze a FINN.no real estate search URL. Scrapes listing cards," " fetches details, enriches with Eiendom.no data, scores, and ranks." + " Fetches all ads in parallel (phase 1) then scores from cache (phase 2)." + " Progress updates are emitted during phase 1." ) ) async def finn_analyze_search( search_url: str, + ctx: Context, max_pages: int = 3, detail_limit: int = 20, include_details: bool = True, @@ -74,8 +195,9 @@ async def finn_analyze_search( include_details=include_details, detail_limit=detail_limit, include_eiendom_no=include_eiendom_no, + ctx=ctx, ) - return json.dumps(result, default=str) + return json.dumps(_build_slim_search_result(result), default=str) except Exception as e: logger.error(f"Error analyzing search: {e}") return json.dumps({"error": True, "message": str(e)}) @@ -143,17 +265,60 @@ async def finn_get_eiendom_unit(unit_code: str, force_refresh: bool = False) -> @mcp.tool( description=( "Fetch and analyze unit images for visual assessment of a property. " - "Returns property photos with metadata for evaluating views, condition, and layout." + "Downloads photos and returns them as visual image content so Claude can " + "directly assess views, condition, layout, kitchen/bathroom quality, and atmosphere." ) ) -async def finn_analyze_unit_images(unit_code: str, force_refresh: bool = False) -> str: - """Fetch and return unit images for visual analysis.""" +async def finn_analyze_unit_images( + unit_code: str, + force_refresh: bool = False, + max_images: int = 8, +) -> list: + """Fetch unit images and return as vision-compatible image content blocks.""" try: result = await get_unit_images(unit_code, force_refresh=force_refresh) - return render_unit_images(result, "markdown") + all_urls = result.get("unit_images") or [] + urls = all_urls[:max_images] + + header = ( + f"{result.get('address', unit_code)} | " + f"{result.get('rooms')} rom | " + f"{result.get('usable_area')}m² | " + f"{len(all_urls)} bilder totalt, viser {len(urls)}" + ) + content: list = [TextContent(type="text", text=header)] + + async def _fetch(url: str) -> ImageContent | None: + try: + async with httpx.AsyncClient(timeout=15) as client: + resp = await client.get(url) + if resp.status_code != 200: + return None + + # Resize to max 1024px on longest side before encoding. + # Raw real estate photos are 2-4MB — must compress to stay + # within the 1MB MCP tool result limit across multiple images. + from PIL import Image + import io + img = Image.open(io.BytesIO(resp.content)) + img.thumbnail((1024, 1024), Image.LANCZOS) + if img.mode in ("RGBA", "P"): + img = img.convert("RGB") + buf = io.BytesIO() + img.save(buf, format="JPEG", quality=75, optimize=True) + b64 = base64.b64encode(buf.getvalue()).decode() + return ImageContent(type="image", data=b64, mimeType="image/jpeg") + except Exception as exc: + logger.warning("Failed to fetch/resize image %s: %s", url, exc) + return None + + fetched = await asyncio.gather(*[_fetch(u) for u in urls]) + content.extend(img for img in fetched if img is not None) + return content + except Exception as e: logger.error(f"Error fetching unit images for {unit_code}: {e}") - return json.dumps({"error": True, "message": str(e)}) + return [TextContent(type="text", text=json.dumps({"error": True, "message": str(e)}))] @mcp.tool( @@ -332,4 +497,4 @@ def main() -> None: if __name__ == "__main__": - main() + main() \ No newline at end of file diff --git a/finn_eiendom/service.py b/finn_eiendom/service.py index dfbb153..0898e87 100644 --- a/finn_eiendom/service.py +++ b/finn_eiendom/service.py @@ -215,6 +215,7 @@ async def analyze_search( detail_limit: int = 20, include_details: bool = True, include_eiendom_no: bool = True, + ctx: Any = None, ) -> dict[str, Any]: """Analyze a FINN search URL and return a ranked shortlist. @@ -227,6 +228,7 @@ async def analyze_search( fetch_details=include_details, detail_limit=detail_limit, include_eiendom_no=include_eiendom_no, + ctx=ctx, ) diff --git a/pyproject.toml b/pyproject.toml index 4bd8fab..9304778 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,6 +10,7 @@ dependencies = [ "lxml>=5.0.0", "mcp[cli]>=1.0.0", "msgpack>=1.0.0", + "pillow>=10.0.0", "pydantic>=2.8.0", "pydantic-settings>=2.4.0", "python-dotenv>=1.0.0",