Enhance analysis functionality with parallel fetching and response shaping; add image processing for unit images

This commit is contained in:
Ole
2026-05-26 20:50:58 +00:00
parent 2933b8c1ea
commit 5b772b2ae5
4 changed files with 300 additions and 49 deletions
+124 -41
View File
@@ -16,7 +16,9 @@ The cached result is invalidated automatically the moment any piece of
underlying data changes, because the deps_hash will differ. underlying data changes, because the deps_hash will differ.
""" """
import asyncio
import logging import logging
from typing import Any
from . import ad as ad_module from . import ad as ad_module
from . import cache, eiendom_no, scoring, search from . import cache, eiendom_no, scoring, search
@@ -43,6 +45,10 @@ from .models import EiendomUnit, FinnAd, SimilarUnit
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Max parallel ad + eiendom.no fetches in analyze_search phase 1.
# High enough to be fast; low enough to avoid FINN rate-limiting.
FETCH_CONCURRENCY = 5
def _normalize_description(text: str | None) -> str: def _normalize_description(text: str | None) -> str:
return text.lower() if text else "" return text.lower() if text else ""
@@ -91,8 +97,6 @@ def _build_ad_summary(
risks.append("Risk flags are detected in description or metadata.") risks.append("Risk flags are detected in description or metadata.")
if ad.common_costs and ad.common_costs > 5000: if ad.common_costs and ad.common_costs > 5000:
risks.append("Common costs are relatively high and should be reviewed.") risks.append("Common costs are relatively high and should be reviewed.")
if enriched and enriched.sale_status and enriched.sale_status.upper() != "FOR_SALE":
risks.append("Eiendom.no sale status does not indicate an active sale.")
if not enriched: if not enriched:
risks.append("Missing Eiendom.no data increases uncertainty.") risks.append("Missing Eiendom.no data increases uncertainty.")
@@ -208,8 +212,27 @@ async def analyze_ad(
result = { result = {
"finnkode": finn_ad.finnkode, "finnkode": finn_ad.finnkode,
"url": finn_ad.url,
"title": finn_ad.title, "title": finn_ad.title,
"address": finn_ad.address, "address": finn_ad.address,
"district": finn_ad.district,
"property_type": finn_ad.property_type,
"ownership_type": finn_ad.ownership_type,
"floor": finn_ad.floor,
"area_m2": finn_ad.area_m2,
"bedrooms": finn_ad.bedrooms,
"rooms": finn_ad.rooms,
"total_price": finn_ad.total_price,
"asking_price": finn_ad.asking_price,
"shared_debt": finn_ad.shared_debt,
"common_costs": finn_ad.common_costs,
"construction_year": finn_ad.construction_year,
"has_balcony": finn_ad.has_balcony,
"has_terrace": finn_ad.has_terrace,
"has_elevator": finn_ad.has_elevator,
"has_parking": finn_ad.has_parking,
"has_garage": finn_ad.has_garage,
"eiendom_unit_code": finn_ad.eiendom_unit_code,
"score": scores, "score": scores,
"categories": categories, "categories": categories,
"summary": summary, "summary": summary,
@@ -226,12 +249,26 @@ async def analyze_ad(
return result return result
async def _analyze_card(card, conn, *, include_eiendom_no: bool, client) -> dict: async def _fetch_card_to_db(
"""Fetch details + enrich a single search card. Raises on unrecoverable card,
errors; the caller is responsible for catching and skipping.""" conn,
finn_ad = cache.get_finn_ad(conn, card.finnkode, ttl_hours=FINN_CACHE_TTL_AD_HOURS) *,
if finn_ad is None: include_eiendom_no: bool,
finn_ad = await ad_module.fetch_ad_details(card.finnkode, client=client) client,
) -> tuple["FinnAd | None", "str | None"]:
"""Phase 1 worker: fetch ad details + resolve Eiendom.no unit, persist to DB.
Returns (finn_ad, unit_code). Both can be None on failure -- the caller
treats None as a skip without aborting the whole batch.
"""
try:
finn_ad = cache.get_finn_ad(conn, card.finnkode, ttl_hours=FINN_CACHE_TTL_AD_HOURS)
if finn_ad is None:
finn_ad = await ad_module.fetch_ad_details(card.finnkode, client=client)
save_finn_ad(conn, finn_ad)
except Exception as exc:
logger.warning("Failed to fetch ad %s: %s", card.finnkode, exc)
return None, None
unit_code = None unit_code = None
if include_eiendom_no: if include_eiendom_no:
@@ -239,11 +276,9 @@ async def _analyze_card(card, conn, *, include_eiendom_no: bool, client) -> dict
matched_unit = await eiendom_no.search_unit_from_finn_url(card.url) matched_unit = await eiendom_no.search_unit_from_finn_url(card.url)
unit_code = matched_unit.unit_code if matched_unit else None unit_code = matched_unit.unit_code if matched_unit else None
except Exception as exc: except Exception as exc:
# A failed unit resolution is non-fatal -- proceed without enrichment.
logger.warning("Eiendom.no unit search failed for %s: %s", card.finnkode, exc) logger.warning("Eiendom.no unit search failed for %s: %s", card.finnkode, exc)
unit_code = None
return await analyze_ad(finn_ad, unit_code=unit_code) return finn_ad, unit_code
async def analyze_search( async def analyze_search(
@@ -254,13 +289,24 @@ async def analyze_search(
include_eiendom_no: bool = True, include_eiendom_no: bool = True,
client=None, client=None,
use_cache: bool = True, use_cache: bool = True,
ctx: Any = None,
) -> dict: ) -> dict:
"""Analyze a FINN search URL and enrich matching listings. """Analyze a FINN search URL and enrich matching listings.
Search-level results are NOT cached as a whole (the search page itself Two-phase parallel execution
is cached at the HTML level). Individual ad analyses ARE cached via ----------------------------
``analyze_ad``, so re-running a search only re-scores ads whose Phase 1 (parallel, I/O bound):
underlying data has changed. All resale cards are fetched concurrently behind a semaphore of size
``FETCH_CONCURRENCY``. Each worker fetches the ad detail page and
resolves the Eiendom.no unit in one shot, then writes both to SQLite.
Progress is reported via ``ctx`` if provided.
Phase 2 (sequential, cache bound):
Scoring reads entirely from SQLite -- no network -- and is fast.
Results are sorted by total score and returned.
Individual ad analyses ARE cached via ``analyze_ad``; re-running a search
only re-scores ads whose underlying data has changed.
""" """
conn = cache.init_db(FINN_CACHE_PATH) conn = cache.init_db(FINN_CACHE_PATH)
cards = await search.fetch_search_pages( cards = await search.fetch_search_pages(
@@ -269,38 +315,75 @@ async def analyze_search(
client=client, client=client,
use_cache=use_cache, use_cache=use_cache,
) )
resale_cards = [c for c in cards[:detail_limit] if _is_resale_listing(c.url)]
skipped_count = len(cards[:detail_limit]) - len(resale_cards)
if ctx is not None:
await ctx.info(
f"Found {len(cards)} listings, {len(resale_cards)} resale ads to fetch."
)
# ------------------------------------------------------------------
# Phase 1: parallel fetch to DB
# ------------------------------------------------------------------
fetched: dict[str, tuple] = {} # finnkode -> (FinnAd, unit_code | None)
fetch_counter = 0
sem = asyncio.Semaphore(FETCH_CONCURRENCY)
if not fetch_details:
resale_cards = []
async def _fetch_worker(card, idx: int) -> None:
nonlocal fetch_counter
async with sem:
finn_ad, unit_code = await _fetch_card_to_db(
card, conn, include_eiendom_no=include_eiendom_no, client=client
)
fetched[card.finnkode] = (finn_ad, unit_code)
fetch_counter += 1
if ctx is not None:
await ctx.report_progress(fetch_counter, len(resale_cards))
status = "enriched" if unit_code else "no eiendom match"
await ctx.info(
f"[{fetch_counter}/{len(resale_cards)}] {card.finnkode} fetched ({status})"
)
await asyncio.gather(*[_fetch_worker(c, i) for i, c in enumerate(resale_cards)])
# ------------------------------------------------------------------
# Phase 2: score from DB (reads cache, fast)
# ------------------------------------------------------------------
if ctx is not None:
await ctx.info(f"All data fetched. Scoring {len(resale_cards)} ads...")
results = [] results = []
enriched_count = 0 enriched_count = 0
skipped_count = 0
cache_hits = 0
if fetch_details: for card in resale_cards:
for card in cards[:detail_limit]: finn_ad, unit_code = fetched.get(card.finnkode, (None, None))
# Project / new-build ads are not resale listings and fetch_ad_details if finn_ad is None:
# cannot resolve them -- skip up front rather than 404 mid-run. skipped_count += 1
if not _is_resale_listing(card.url): continue
logger.info("Skipping non-resale card %s (%s)", card.finnkode, card.url) try:
skipped_count += 1 result = await analyze_ad(finn_ad, unit_code=unit_code)
continue except Exception as exc:
logger.warning("Skipping card %s during scoring: %s", card.finnkode, exc)
skipped_count += 1
continue
# One bad card (stale finnkode, removed ad, transient network error) if result.get("eiendom_unit"):
# must not abort the whole search -- isolate each card. enriched_count += 1
try: results.append(result)
result = await _analyze_card(
card, conn, include_eiendom_no=include_eiendom_no, client=client
)
except Exception as exc:
logger.warning("Skipping card %s: %s", card.finnkode, exc)
skipped_count += 1
continue
if result.get("eiendom_unit"):
enriched_count += 1
# Track analysis cache hits via the absence of recompute logging
# (the flag is not propagated up here; rely on debug logs).
results.append(result)
results.sort(key=lambda item: item["score"].get("total", 0.0), reverse=True) results.sort(key=lambda item: item["score"].get("total", 0.0), reverse=True)
if ctx is not None:
await ctx.info(
f"Done. {len(results)} analyzed, {enriched_count} enriched, "
f"{skipped_count} skipped."
)
return { return {
"search_url": search_url, "search_url": search_url,
"search_cards": [card.model_dump(mode="json") for card in cards], "search_cards": [card.model_dump(mode="json") for card in cards],
+173 -8
View File
@@ -1,11 +1,15 @@
"""FastMCP stdio server for FINN real estate analysis and Eiendom.no enrichment.""" """FastMCP stdio server for FINN real estate analysis and Eiendom.no enrichment."""
import base64
import json import json
import logging import logging
from typing import Any from typing import Any
import os import os
import asyncio
import httpx
from mcp.server.transport_security import TransportSecuritySettings from mcp.server.transport_security import TransportSecuritySettings
from mcp.server.fastmcp import FastMCP from mcp.server.fastmcp import Context, FastMCP
from mcp.types import ImageContent, TextContent
from .eiendom_no import ( from .eiendom_no import (
build_unit_vector, build_unit_vector,
@@ -39,6 +43,120 @@ from .service import (
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Response shaping
# ---------------------------------------------------------------------------
def _slim_listing(rank: int, item: dict) -> dict:
"""Collapse one full analyze_ad result into a compact listing card.
Drops: listing_description, unit_images, unit_vector, all timestamps,
full similar_units list, score dimension breakdown.
Derives: avg_comp_sqm_price from similar_units.
"""
eu = item.get("eiendom_unit") or {}
comps = item.get("similar_units") or []
sqm_prices = [c["sqm_price"] for c in comps if c.get("sqm_price")]
avg_comp_sqm = round(sum(sqm_prices) / len(sqm_prices)) if sqm_prices else None
# Slim comps: drop internal IDs, coords, redundant status fields.
# Sort by recency, keep 15 most recent — older comps lose relevance fast.
def _slim_comp(c: dict) -> dict:
return {
"unit_code": c.get("unit_code"),
"address": c.get("address"),
"usable_area": c.get("usable_area"),
"rooms": c.get("rooms"),
"floor": c.get("floor"),
"construction_year": c.get("construction_year"),
"listing_price": c.get("listing_price"),
"selling_price": c.get("selling_price"),
"shared_debt": c.get("shared_debt"),
"sqm_price": c.get("sqm_price"),
"common_costs": c.get("common_costs"),
"days_on_market": c.get("days_on_market"),
"finalized_at": (c.get("finalized_at") or "")[:10],
}
sorted_comps = sorted(comps, key=lambda c: c.get("finalized_at") or "", reverse=True)
slim_comps = [_slim_comp(c) for c in sorted_comps[:15]]
score = item.get("score") or {}
summary = item.get("summary") or {}
# Keep full score breakdown — 12 dimensions + nearby_transit = ~220 bytes, all signal.
# Drop nothing from scores.
slim_score = {k: v for k, v in score.items()}
eiendom: dict | None = None
if eu:
eiendom = {
"unit_code": eu.get("unit_code"),
"usable_area": eu.get("usable_area"),
"estimated_price": eu.get("estimated_selling_price"),
"estimated_range": [
eu.get("estimated_selling_price_lower"),
eu.get("estimated_selling_price_upper"),
],
"listing_sqm_price": eu.get("listing_sqm_price"),
"market_placement": eu.get("market_placement_score"),
"sale_status": eu.get("sale_status"),
"days_on_market": eu.get("days_on_market"),
"avg_comp_sqm_price": avg_comp_sqm,
"comp_count": len(comps),
}
return {
"rank": rank,
"finnkode": item.get("finnkode"),
"url": item.get("url"),
"title": item.get("title"),
"address": item.get("address"),
"district": item.get("district"),
"property_type": item.get("property_type"),
"ownership_type": item.get("ownership_type"),
"floor": item.get("floor"),
"area_m2": item.get("area_m2"),
"bedrooms": item.get("bedrooms"),
"rooms": item.get("rooms"),
"total_price": item.get("total_price"),
"asking_price": item.get("asking_price"),
"shared_debt": item.get("shared_debt"),
"common_costs": item.get("common_costs"),
"construction_year": item.get("construction_year"),
"has_balcony": item.get("has_balcony"),
"has_terrace": item.get("has_terrace"),
"has_elevator": item.get("has_elevator"),
"has_parking": item.get("has_parking"),
"has_garage": item.get("has_garage"),
"eiendom_unit_code": item.get("eiendom_unit_code"),
"score": slim_score,
"categories": item.get("categories"),
"why_interesting": summary.get("why_interesting"),
"risks": summary.get("risks"),
"eiendom": eiendom,
"similar_units": slim_comps,
}
def _build_slim_search_result(full: dict) -> dict:
"""Convert full analyze_search output to a compact MCP-safe response.
Removes search_cards (redundant), drops all fat fields from individual
listings. Target: <200KB for 30 analyzed ads.
"""
listings = [
_slim_listing(rank + 1, item)
for rank, item in enumerate(full.get("analysis") or [])
]
return {
"search_url": full.get("search_url"),
"summary": full.get("summary"),
"listings": listings,
}
def _build_transport_security() -> TransportSecuritySettings: def _build_transport_security() -> TransportSecuritySettings:
allowed = os.getenv("MCP_ALLOWED_HOSTS", "") allowed = os.getenv("MCP_ALLOWED_HOSTS", "")
if allowed: if allowed:
@@ -57,10 +175,13 @@ mcp = FastMCP("finn_eiendom_mcp", transport_security=_build_transport_security()
description=( description=(
"Analyze a FINN.no real estate search URL. Scrapes listing cards," "Analyze a FINN.no real estate search URL. Scrapes listing cards,"
" fetches details, enriches with Eiendom.no data, scores, and ranks." " fetches details, enriches with Eiendom.no data, scores, and ranks."
" Fetches all ads in parallel (phase 1) then scores from cache (phase 2)."
" Progress updates are emitted during phase 1."
) )
) )
async def finn_analyze_search( async def finn_analyze_search(
search_url: str, search_url: str,
ctx: Context,
max_pages: int = 3, max_pages: int = 3,
detail_limit: int = 20, detail_limit: int = 20,
include_details: bool = True, include_details: bool = True,
@@ -74,8 +195,9 @@ async def finn_analyze_search(
include_details=include_details, include_details=include_details,
detail_limit=detail_limit, detail_limit=detail_limit,
include_eiendom_no=include_eiendom_no, include_eiendom_no=include_eiendom_no,
ctx=ctx,
) )
return json.dumps(result, default=str) return json.dumps(_build_slim_search_result(result), default=str)
except Exception as e: except Exception as e:
logger.error(f"Error analyzing search: {e}") logger.error(f"Error analyzing search: {e}")
return json.dumps({"error": True, "message": str(e)}) return json.dumps({"error": True, "message": str(e)})
@@ -143,17 +265,60 @@ async def finn_get_eiendom_unit(unit_code: str, force_refresh: bool = False) ->
@mcp.tool( @mcp.tool(
description=( description=(
"Fetch and analyze unit images for visual assessment of a property. " "Fetch and analyze unit images for visual assessment of a property. "
"Returns property photos with metadata for evaluating views, condition, and layout." "Downloads photos and returns them as visual image content so Claude can "
"directly assess views, condition, layout, kitchen/bathroom quality, and atmosphere."
) )
) )
async def finn_analyze_unit_images(unit_code: str, force_refresh: bool = False) -> str: async def finn_analyze_unit_images(
"""Fetch and return unit images for visual analysis.""" unit_code: str,
force_refresh: bool = False,
max_images: int = 8,
) -> list:
"""Fetch unit images and return as vision-compatible image content blocks."""
try: try:
result = await get_unit_images(unit_code, force_refresh=force_refresh) result = await get_unit_images(unit_code, force_refresh=force_refresh)
return render_unit_images(result, "markdown") all_urls = result.get("unit_images") or []
urls = all_urls[:max_images]
header = (
f"{result.get('address', unit_code)} | "
f"{result.get('rooms')} rom | "
f"{result.get('usable_area')}m² | "
f"{len(all_urls)} bilder totalt, viser {len(urls)}"
)
content: list = [TextContent(type="text", text=header)]
async def _fetch(url: str) -> ImageContent | None:
try:
async with httpx.AsyncClient(timeout=15) as client:
resp = await client.get(url)
if resp.status_code != 200:
return None
# Resize to max 1024px on longest side before encoding.
# Raw real estate photos are 2-4MB — must compress to stay
# within the 1MB MCP tool result limit across multiple images.
from PIL import Image
import io
img = Image.open(io.BytesIO(resp.content))
img.thumbnail((1024, 1024), Image.LANCZOS)
if img.mode in ("RGBA", "P"):
img = img.convert("RGB")
buf = io.BytesIO()
img.save(buf, format="JPEG", quality=75, optimize=True)
b64 = base64.b64encode(buf.getvalue()).decode()
return ImageContent(type="image", data=b64, mimeType="image/jpeg")
except Exception as exc:
logger.warning("Failed to fetch/resize image %s: %s", url, exc)
return None
fetched = await asyncio.gather(*[_fetch(u) for u in urls])
content.extend(img for img in fetched if img is not None)
return content
except Exception as e: except Exception as e:
logger.error(f"Error fetching unit images for {unit_code}: {e}") logger.error(f"Error fetching unit images for {unit_code}: {e}")
return json.dumps({"error": True, "message": str(e)}) return [TextContent(type="text", text=json.dumps({"error": True, "message": str(e)}))]
@mcp.tool( @mcp.tool(
@@ -332,4 +497,4 @@ def main() -> None:
if __name__ == "__main__": if __name__ == "__main__":
main() main()
+2
View File
@@ -215,6 +215,7 @@ async def analyze_search(
detail_limit: int = 20, detail_limit: int = 20,
include_details: bool = True, include_details: bool = True,
include_eiendom_no: bool = True, include_eiendom_no: bool = True,
ctx: Any = None,
) -> dict[str, Any]: ) -> dict[str, Any]:
"""Analyze a FINN search URL and return a ranked shortlist. """Analyze a FINN search URL and return a ranked shortlist.
@@ -227,6 +228,7 @@ async def analyze_search(
fetch_details=include_details, fetch_details=include_details,
detail_limit=detail_limit, detail_limit=detail_limit,
include_eiendom_no=include_eiendom_no, include_eiendom_no=include_eiendom_no,
ctx=ctx,
) )
+1
View File
@@ -10,6 +10,7 @@ dependencies = [
"lxml>=5.0.0", "lxml>=5.0.0",
"mcp[cli]>=1.0.0", "mcp[cli]>=1.0.0",
"msgpack>=1.0.0", "msgpack>=1.0.0",
"pillow>=10.0.0",
"pydantic>=2.8.0", "pydantic>=2.8.0",
"pydantic-settings>=2.4.0", "pydantic-settings>=2.4.0",
"python-dotenv>=1.0.0", "python-dotenv>=1.0.0",