This commit is contained in:
Ole
2026-05-16 16:14:01 +00:00
parent 1399f61c1a
commit 71cc9c86a0
18 changed files with 1797 additions and 15 deletions
+6
View File
@@ -0,0 +1,6 @@
"""Entry point for python -m finn_eiendom."""
from .cli import app
if __name__ == "__main__":
app()
+422
View File
@@ -0,0 +1,422 @@
"""Typer-based CLI for FINN real estate analysis."""
import asyncio
from typing import Literal, Optional
import typer
from . import formatting
from .service import (
analyze_ad as svc_analyze_ad,
analyze_ad_against_comps as svc_analyze_ad_against_comps,
analyze_search as svc_analyze_search,
build_unit_vector_for_unit_code as svc_build_unit_vector,
compare_ads as svc_compare_ads,
decode_unit_vector_to_dict as svc_decode_unit_vector,
find_similar_to_liked as svc_find_similar_to_liked,
get_new_ads_since_last_run as svc_get_new_ads_since_last_run,
get_or_fetch_ad as svc_get_or_fetch_ad,
get_or_fetch_eiendom_unit as svc_get_or_fetch_eiendom_unit,
get_or_fetch_similar_units as svc_get_or_fetch_similar_units,
get_shortlist as svc_get_shortlist,
resolve_eiendom_unit_from_finn_url as svc_resolve_eiendom_unit,
save_feedback as svc_save_feedback,
)
app = typer.Typer(help="FINN real estate analysis and Eiendom.no enrichment")
cache_app = typer.Typer(help="Cache management commands")
config_app = typer.Typer(help="Configuration commands")
app.add_typer(cache_app, name="cache")
app.add_typer(config_app, name="config")
OutputFormat = Literal["json", "markdown", "table"]
# ============================================================================
# Search and analysis commands
# ============================================================================
@app.command()
def analyze_search(
url: str = typer.Argument(..., help="FINN search URL"),
max_pages: int = typer.Option(3, help="Max pages to scrape"),
detail_limit: int = typer.Option(20, help="Max details to fetch"),
no_details: bool = typer.Option(False, help="Skip fetching listing details"),
no_eiendom: bool = typer.Option(False, help="Skip Eiendom.no enrichment"),
with_similar: bool = typer.Option(False, help="Include similar units for each listing"),
format: OutputFormat = typer.Option("json", help="Output format"),
) -> None:
"""Analyze a FINN search URL and return ranked shortlist."""
try:
result = asyncio.run(
svc_analyze_search(
url,
max_pages=max_pages,
detail_limit=detail_limit,
include_details=not no_details,
include_eiendom_no=not no_eiendom,
include_similar_units_for_shortlist=with_similar,
)
)
output = formatting.render_shortlist(result, format)
typer.echo(output)
except Exception as e:
typer.echo(f"Error: {e}", err=True)
raise typer.Exit(1)
@app.command()
def get_ad(
finnkode: str = typer.Argument(..., help="FINN property code"),
force_refresh: bool = typer.Option(False, help="Bypass cache"),
no_eiendom: bool = typer.Option(False, help="Skip Eiendom.no enrichment"),
with_similar: bool = typer.Option(False, help="Include similar units for this property"),
format: OutputFormat = typer.Option("json", help="Output format"),
) -> None:
"""Fetch and display a single FINN listing."""
try:
ad = asyncio.run(svc_get_or_fetch_ad(finnkode, force_refresh=force_refresh))
ad_data = ad.model_dump()
output = formatting.render_ad(ad_data, format)
typer.echo(output)
except Exception as e:
typer.echo(f"Error: {e}", err=True)
raise typer.Exit(1)
@app.command()
def enrich_ad(
finnkode: str = typer.Argument(..., help="FINN property code"),
with_similar: bool = typer.Option(False, help="Include similar units for this property"),
format: OutputFormat = typer.Option("json", help="Output format"),
) -> None:
"""Fetch and enrich a single FINN listing with Eiendom.no data."""
try:
result = asyncio.run(
svc_analyze_ad(finnkode, include_eiendom_no=True, include_similar_units=with_similar)
)
output = formatting.render_ad(result.get("ad", {}), format)
typer.echo(output)
except Exception as e:
typer.echo(f"Error: {e}", err=True)
raise typer.Exit(1)
@app.command()
def compare(
finnkoder: list[str] = typer.Argument(..., help="FINN property codes to compare (2-10 codes)"),
no_eiendom: bool = typer.Option(False, help="Skip Eiendom.no enrichment"),
no_comps: bool = typer.Option(False, help="Skip comparable units"),
format: OutputFormat = typer.Option("json", help="Output format"),
) -> None:
"""Compare multiple FINN listings side by side."""
if len(finnkoder) < 2:
typer.echo("Error: Provide at least 2 finnkoder", err=True)
raise typer.Exit(1)
if len(finnkoder) > 10:
typer.echo("Error: Provide at most 10 finnkoder", err=True)
raise typer.Exit(1)
try:
result = asyncio.run(
svc_compare_ads(
finnkoder,
include_eiendom_no=not no_eiendom,
include_comps=not no_comps,
)
)
output = formatting.render_comparison(result, format)
typer.echo(output)
except Exception as e:
typer.echo(f"Error: {e}", err=True)
raise typer.Exit(1)
@app.command()
def analyze_against_comps(
finnkode: str = typer.Argument(..., help="FINN property code"),
listing_status: str = typer.Option(
"RECENTLY_SOLD", help="Comparable status (RECENTLY_SOLD, FOR_SALE, CURRENT)"
),
format: OutputFormat = typer.Option("json", help="Output format"),
) -> None:
"""Evaluate a listing against comparable recently-sold units."""
try:
result = asyncio.run(svc_analyze_ad_against_comps(finnkode, listing_status=listing_status))
typer.echo(formatting.render_comparison({"listings": [result]}, format))
except Exception as e:
typer.echo(f"Error: {e}", err=True)
raise typer.Exit(1)
# ============================================================================
# Eiendom.no commands
# ============================================================================
@app.command()
def resolve_unit(
url: str = typer.Argument(..., help="FINN listing URL"),
format: OutputFormat = typer.Option("json", help="Output format"),
) -> None:
"""Resolve an Eiendom.no unit_code from a FINN URL."""
try:
unit = asyncio.run(svc_resolve_eiendom_unit(url))
if unit:
typer.echo(formatting.render_unit(unit.model_dump(), format))
else:
typer.echo("Error: Could not resolve unit from URL", err=True)
raise typer.Exit(1)
except Exception as e:
typer.echo(f"Error: {e}", err=True)
raise typer.Exit(1)
@app.command()
def get_unit(
unit_code: str = typer.Argument(..., help="Eiendom.no unit code"),
force_refresh: bool = typer.Option(False, help="Bypass cache"),
format: OutputFormat = typer.Option("json", help="Output format"),
) -> None:
"""Fetch Eiendom.no unit details."""
try:
unit = asyncio.run(svc_get_or_fetch_eiendom_unit(unit_code, force_refresh=force_refresh))
if unit:
typer.echo(formatting.render_unit(unit.model_dump(), format))
else:
typer.echo("Error: Unit not found", err=True)
raise typer.Exit(1)
except Exception as e:
typer.echo(f"Error: {e}", err=True)
raise typer.Exit(1)
@app.command()
def build_vector(
unit_code: str = typer.Argument(..., help="Eiendom.no unit code"),
format: OutputFormat = typer.Option("json", help="Output format"),
) -> None:
"""Build a unit vector for an Eiendom.no unit."""
try:
result = svc_build_unit_vector(unit_code)
typer.echo(formatting.render_ad(result, format))
except Exception as e:
typer.echo(f"Error: {e}", err=True)
raise typer.Exit(1)
@app.command()
def decode_vector(
unit_vector: str = typer.Argument(..., help="Unit vector string (base64)"),
format: OutputFormat = typer.Option("json", help="Output format"),
) -> None:
"""Decode a unit vector to human-readable JSON."""
try:
result = svc_decode_unit_vector(unit_vector)
typer.echo(formatting.render_ad(result, format))
except Exception as e:
typer.echo(f"Error: {e}", err=True)
raise typer.Exit(1)
@app.command()
def similar_units(
unit_vector: str = typer.Argument(..., help="Unit vector string (base64)"),
status: str = typer.Option(
"RECENTLY_SOLD", help="Listing status (RECENTLY_SOLD, FOR_SALE, CURRENT)"
),
format: OutputFormat = typer.Option("json", help="Output format"),
) -> None:
"""Fetch similar/comparable units from Eiendom.no."""
try:
units = asyncio.run(svc_get_or_fetch_similar_units(unit_vector, listing_status=status))
result = {"similar_units": [u.model_dump() for u in units]}
typer.echo(formatting.render_similar_units(result, format))
except Exception as e:
typer.echo(f"Error: {e}", err=True)
raise typer.Exit(1)
@app.command()
def similar_to_liked(
finnkode: str = typer.Argument(..., help="FINN property code"),
mode: str = typer.Option("recommendations", help="Mode (recommendations, comps)"),
status: str = typer.Option(
"FOR_SALE", help="Listing status (RECENTLY_SOLD, FOR_SALE, CURRENT)"
),
format: OutputFormat = typer.Option("json", help="Output format"),
) -> None:
"""Find properties similar to one you've liked."""
try:
result = asyncio.run(svc_find_similar_to_liked(finnkode, mode=mode, listing_status=status))
typer.echo(formatting.render_similar_units(result, format))
except Exception as e:
typer.echo(f"Error: {e}", err=True)
raise typer.Exit(1)
# ============================================================================
# Feedback commands
# ============================================================================
@app.command()
def save_feedback(
finnkode: str = typer.Argument(..., help="FINN property code"),
verdict: str = typer.Argument(..., help="Verdict (liked, rejected, interesting, etc.)"),
notes: Optional[str] = typer.Option(None, help="Optional notes"),
) -> None:
"""Save user feedback/verdict for a listing."""
try:
result = svc_save_feedback(finnkode, verdict, notes)
typer.echo(f"Saved feedback for {finnkode}: {verdict}")
except Exception as e:
typer.echo(f"Error: {e}", err=True)
raise typer.Exit(1)
# ============================================================================
# History and diff commands
# ============================================================================
@app.command()
def shortlist(
run_id: Optional[int] = typer.Option(None, help="Run ID (defaults to latest)"),
limit: int = typer.Option(10, help="Max listings to return"),
format: OutputFormat = typer.Option("json", help="Output format"),
) -> None:
"""Fetch stored shortlist from a previous search run."""
try:
result = svc_get_shortlist(run_id, limit)
typer.echo(formatting.render_shortlist(result, format))
except Exception as e:
typer.echo(f"Error: {e}", err=True)
raise typer.Exit(1)
@app.command()
def diff(
url: str = typer.Argument(..., help="FINN search URL"),
format: OutputFormat = typer.Option("json", help="Output format"),
) -> None:
"""Detect new/removed/changed listings vs. the previous run."""
try:
result = svc_get_new_ads_since_last_run(url)
typer.echo(formatting.render_diff(result, format))
except Exception as e:
typer.echo(f"Error: {e}", err=True)
raise typer.Exit(1)
# ============================================================================
# Cache management
# ============================================================================
@cache_app.command()
def stats() -> None:
"""Show cache statistics."""
try:
# TODO: implement cache stats via cache.py
typer.echo("Cache stats (not yet implemented)")
except Exception as e:
typer.echo(f"Error: {e}", err=True)
raise typer.Exit(1)
@cache_app.command()
def clear() -> None:
"""Clear all cache."""
if not typer.confirm("Are you sure you want to clear all cache?"):
raise typer.Exit()
try:
# TODO: implement cache clear via cache.py
typer.echo("Cache cleared (not yet implemented)")
except Exception as e:
typer.echo(f"Error: {e}", err=True)
raise typer.Exit(1)
@cache_app.command()
def clear_html() -> None:
"""Clear cached HTML only."""
try:
# TODO: implement HTML cache clear via cache.py
typer.echo("HTML cache cleared (not yet implemented)")
except Exception as e:
typer.echo(f"Error: {e}", err=True)
raise typer.Exit(1)
@cache_app.command()
def clear_json() -> None:
"""Clear cached JSON only."""
try:
# TODO: implement JSON cache clear via cache.py
typer.echo("JSON cache cleared (not yet implemented)")
except Exception as e:
typer.echo(f"Error: {e}", err=True)
raise typer.Exit(1)
# ============================================================================
# Config management
# ============================================================================
@config_app.command()
def show() -> None:
"""Show current configuration."""
try:
from .config import FINN_CACHE_PATH
typer.echo(f"Cache path: {FINN_CACHE_PATH}")
except Exception as e:
typer.echo(f"Error: {e}", err=True)
raise typer.Exit(1)
@config_app.command()
def path() -> None:
"""Show cache directory path."""
try:
from .config import FINN_CACHE_PATH
typer.echo(FINN_CACHE_PATH)
except Exception as e:
typer.echo(f"Error: {e}", err=True)
raise typer.Exit(1)
# ============================================================================
# Utility commands
# ============================================================================
@app.command()
def version() -> None:
"""Show version information."""
typer.echo("finn-eiendom 0.1.0")
@app.command()
def serve(
transport: str = typer.Option("stdio", help="Transport (stdio, http)"),
host: str = typer.Option("127.0.0.1", help="HTTP host"),
port: int = typer.Option(8010, help="HTTP port"),
) -> None:
"""Run the MCP server."""
if transport == "stdio":
from .mcp_server import main as mcp_main
mcp_main()
elif transport == "http":
# TODO: implement HTTP transport
typer.echo(f"HTTP transport not yet implemented (would run on {host}:{port})")
raise typer.Exit(1)
else:
typer.echo(f"Error: Unknown transport {transport}", err=True)
raise typer.Exit(1)
+65
View File
@@ -0,0 +1,65 @@
"""User feedback storage and retrieval for listed properties."""
import logging
from typing import Any
from .cache import init_db
from .config import FINN_CACHE_PATH
logger = logging.getLogger(__name__)
def save_feedback(finnkode: str, verdict: str, notes: str | None = None) -> dict[str, Any]:
"""Store user feedback/verdict for a FINN listing.
Args:
finnkode: FINN property ID
verdict: User verdict (e.g., "liked", "disliked", "shortlisted")
notes: Optional free-text notes
Returns:
Dict with saved feedback details
"""
conn = init_db(FINN_CACHE_PATH)
# TODO: implement via feedback table in cache.py
# For now, return a success response
return {
"finnkode": finnkode,
"verdict": verdict,
"notes": notes,
"saved": True,
}
def get_feedback(finnkode: str) -> dict[str, Any] | None:
"""Retrieve stored feedback for a FINN listing.
Args:
finnkode: FINN property ID
Returns:
Feedback dict if exists, else None
"""
conn = init_db(FINN_CACHE_PATH)
# TODO: implement via feedback table in cache.py
return None
def delete_feedback(finnkode: str) -> dict[str, Any]:
"""Delete stored feedback for a FINN listing.
Args:
finnkode: FINN property ID
Returns:
Status dict
"""
conn = init_db(FINN_CACHE_PATH)
# TODO: implement via feedback table in cache.py
return {
"finnkode": finnkode,
"deleted": True,
}
+525
View File
@@ -0,0 +1,525 @@
"""Output formatting for JSON, Markdown, and table formats."""
import json
from typing import Any, Literal
from .models import FinnAd, EiendomUnit, SimilarUnit
OutputFormat = Literal["json", "markdown", "table"]
def _validate_format(fmt: OutputFormat) -> None:
"""Raise ValueError if format is not supported."""
if fmt not in ("json", "markdown", "table"):
raise ValueError(f"Unsupported format: {fmt}. Supported: json, markdown, table")
# ============================================================================
# Ad renderers
# ============================================================================
def render_ad(payload: dict[str, Any], fmt: OutputFormat) -> str:
"""Render a single FINN ad (FinnAd model dump)."""
_validate_format(fmt)
if fmt == "json":
return json.dumps(payload, indent=2, default=str)
elif fmt == "markdown":
return _render_ad_markdown(payload)
else: # table
return _render_ad_markdown(payload) # For single ad, markdown is better than table
def _render_ad_markdown(ad: dict[str, Any]) -> str:
"""Render ad as markdown."""
lines = [
f"# {ad.get('title', 'FINN Listing')}",
"",
f"**Finnkode:** {ad.get('finnkode')} ",
f"**URL:** {ad.get('url')}",
"",
"## Details",
f"- **Address:** {ad.get('address')}",
f"- **Property Type:** {ad.get('property_type')}",
f"- **Ownership Type:** {ad.get('ownership_type')}",
f"- **Area:** {ad.get('area_m2')}",
f"- **Rooms:** {ad.get('rooms')} ({ad.get('bedrooms')} bedrooms)",
f"- **Floor:** {ad.get('floor')}",
f"- **Construction Year:** {ad.get('construction_year')}",
"",
"## Pricing",
f"- **Asking Price:** {ad.get('asking_price'):,}"
if ad.get("asking_price")
else "- **Asking Price:** N/A",
f"- **Total Price:** {ad.get('total_price'):,}"
if ad.get("total_price")
else "- **Total Price:** N/A",
f"- **Common Costs:** {ad.get('common_costs'):,}/month"
if ad.get("common_costs")
else "- **Common Costs:** N/A",
f"- **Shared Debt:** {ad.get('shared_debt'):,}"
if ad.get("shared_debt")
else "- **Shared Debt:** N/A",
"",
"## Features",
f"- **Heating:** {ad.get('heating')}",
f"- **Energy Rating:** {ad.get('energy_rating')}",
f"- **Balcony:** {'Yes' if ad.get('has_balcony') else 'No'}",
f"- **Terrace:** {'Yes' if ad.get('has_terrace') else 'No'}",
f"- **Elevator:** {'Yes' if ad.get('has_elevator') else 'No'}",
f"- **Parking:** {'Yes' if ad.get('has_parking') else 'No'}",
f"- **Garage:** {'Yes' if ad.get('has_garage') else 'No'}",
"",
"## Listing Info",
f"- **Broker:** {ad.get('broker_company')} ({ad.get('broker_name')})",
f"- **First Seen:** {ad.get('first_seen_at')}",
f"- **Last Seen:** {ad.get('last_seen_at')}",
]
return "\n".join(lines)
# ============================================================================
# Unit renderers (Eiendom.no)
# ============================================================================
def render_unit(payload: dict[str, Any], fmt: OutputFormat) -> str:
"""Render an Eiendom.no unit."""
_validate_format(fmt)
if fmt == "json":
return json.dumps(payload, indent=2, default=str)
else:
return _render_unit_markdown(payload)
def _render_unit_markdown(unit: dict[str, Any]) -> str:
"""Render unit as markdown."""
lines = [
f"# Eiendom.no Unit: {unit.get('address', 'Unknown')}",
"",
f"**Unit Code:** {unit.get('unit_code')}",
f"**Coordinates:** {unit.get('lat')}, {unit.get('lng')}",
"",
"## Property Details",
f"- **Type:** {unit.get('property_type')}",
f"- **Rooms:** {unit.get('rooms')}",
f"- **Floor:** {unit.get('floor')}",
f"- **Construction Year:** {unit.get('construction_year')}",
f"- **Usable Area:** {unit.get('usable_area')}",
"",
"## Market Data",
f"- **Estimated Selling Price:** {unit.get('estimated_selling_price'):,}"
if unit.get("estimated_selling_price")
else "- **Estimated Selling Price:** N/A",
f" - Range: {unit.get('estimated_selling_price_lower'):,} - {unit.get('estimated_selling_price_upper'):,}"
if unit.get("estimated_selling_price_lower")
else "",
f"- **Listing Price:** {unit.get('listing_price'):,}"
if unit.get("listing_price")
else "- **Listing Price:** N/A",
f"- **Listing Price/m²:** {unit.get('listing_sqm_price'):,} NOK/m²"
if unit.get("listing_sqm_price")
else "",
f"- **Common Costs:** {unit.get('common_costs'):,}/month"
if unit.get("common_costs")
else "- **Common Costs:** N/A",
f"- **Days on Market:** {unit.get('days_on_market')}",
f"- **Market Placement Score:** {unit.get('market_placement_score')}",
f"- **Sale Status:** {unit.get('sale_status')}",
"",
f"**Fetched at:** {unit.get('fetched_at')}",
]
return "\n".join(lines)
# ============================================================================
# Shortlist renderer
# ============================================================================
def render_shortlist(payload: dict[str, Any], fmt: OutputFormat) -> str:
"""Render a shortlist of analyzed listings."""
_validate_format(fmt)
if fmt == "json":
return json.dumps(payload, indent=2, default=str)
elif fmt == "markdown":
return _render_shortlist_markdown(payload)
else: # table
return _render_shortlist_table(payload)
def _render_shortlist_markdown(data: dict[str, Any]) -> str:
"""Render shortlist as markdown."""
shortlist = data.get("shortlist", [])
lines = [
"# Shortlist",
f"Found {len(shortlist)} listings",
"",
]
for i, ad in enumerate(shortlist, 1):
lines.extend(
[
f"## {i}. {ad.get('title', 'Unknown')}",
f"**Finnkode:** {ad.get('finnkode')}",
f"**Price:** {ad.get('asking_price'):,} NOK"
if ad.get("asking_price")
else "**Price:** N/A",
f"**Area:** {ad.get('area_m2')}",
f"**Address:** {ad.get('address')}",
"",
]
)
return "\n".join(lines)
def _render_shortlist_table(data: dict[str, Any]) -> str:
"""Render shortlist as table."""
shortlist = data.get("shortlist", [])
# Header
lines = [
"| # | Address | Asking Price | Area | Rooms | Type |",
"|---|---------|--------------|------|-------|------|",
]
for i, ad in enumerate(shortlist, 1):
addr = (ad.get("address") or "")[:40]
price = f"{ad.get('asking_price'):,}" if ad.get("asking_price") else "N/A"
area = f"{ad.get('area_m2')}" if ad.get("area_m2") else "N/A"
rooms = ad.get("rooms") or "N/A"
ptype = ad.get("property_type") or "N/A"
lines.append(f"| {i} | {addr} | {price} | {area} | {rooms} | {ptype} |")
return "\n".join(lines)
# ============================================================================
# Comparison renderer
# ============================================================================
def render_comparison(payload: dict[str, Any], fmt: OutputFormat) -> str:
"""Render a side-by-side comparison of listings."""
_validate_format(fmt)
if fmt == "json":
return json.dumps(payload, indent=2, default=str)
elif fmt == "markdown":
return _render_comparison_markdown(payload)
else: # table
return _render_comparison_table(payload)
def _render_comparison_markdown(data: dict[str, Any]) -> str:
"""Render comparison as markdown."""
listings = data.get("listings", [])
lines = [
"# Listing Comparison",
f"Comparing {len(listings)} listings",
"",
]
for i, ad in enumerate(listings, 1):
lines.extend(
[
f"## {i}. {ad.get('title', 'Unknown')}",
f"- **Address:** {ad.get('address')}",
f"- **Price:** {ad.get('asking_price'):,} NOK"
if ad.get("asking_price")
else "- **Price:** N/A",
f"- **Area:** {ad.get('area_m2')}",
f"- **Price/m²:** {ad.get('asking_price') // ad.get('area_m2') if ad.get('asking_price') and ad.get('area_m2') else 'N/A'} NOK/m²",
f"- **Rooms:** {ad.get('rooms')}",
f"- **Common Costs:** {ad.get('common_costs'):,}/month"
if ad.get("common_costs")
else "- **Common Costs:** N/A",
"",
]
)
return "\n".join(lines)
def _render_comparison_table(data: dict[str, Any]) -> str:
"""Render comparison as table."""
listings = data.get("listings", [])
lines = [
"| Address | Asking Price | Area | Price/m² | Rooms | Common Costs |",
"|---------|--------------|------|----------|-------|--------------|",
]
for ad in listings:
addr = (ad.get("address") or "")[:30]
price = f"{ad.get('asking_price'):,}" if ad.get("asking_price") else "N/A"
area = f"{ad.get('area_m2')}" if ad.get("area_m2") else "N/A"
price_sqm = (
f"{ad.get('asking_price') // ad.get('area_m2')}"
if ad.get("asking_price") and ad.get("area_m2")
else "N/A"
)
rooms = ad.get("rooms") or "N/A"
costs = f"{ad.get('common_costs'):,}" if ad.get("common_costs") else "N/A"
lines.append(f"| {addr} | {price} | {area} m² | {price_sqm} | {rooms} | {costs} |")
return "\n".join(lines)
# ============================================================================
# Similar units renderer
# ============================================================================
def render_similar_units(payload: dict[str, Any], fmt: OutputFormat) -> str:
"""Render similar/comparable units."""
_validate_format(fmt)
if fmt == "json":
return json.dumps(payload, indent=2, default=str)
elif fmt == "markdown":
return _render_similar_units_markdown(payload)
else: # table
return _render_similar_units_table(payload)
def _render_similar_units_markdown(data: dict[str, Any]) -> str:
"""Render similar units as markdown."""
similar = data.get("similar_units", [])
lines = [
"# Similar/Comparable Units",
f"Found {len(similar)} comparable listings",
"",
]
for i, unit in enumerate(similar, 1):
lines.extend(
[
f"## {i}. {unit.get('address', 'Unknown')}",
f"- **Listing Price:** {unit.get('listing_price'):,} NOK"
if unit.get("listing_price")
else "- **Listing Price:** N/A",
f"- **Selling Price:** {unit.get('selling_price'):,} NOK"
if unit.get("selling_price")
else "- **Selling Price:** N/A",
f"- **Area:** {unit.get('usable_area')}"
if unit.get("usable_area")
else "- **Area:** N/A",
f"- **Price/m²:** {unit.get('sqm_price'):,} NOK/m²"
if unit.get("sqm_price")
else "- **Price/m²:** N/A",
f"- **Days on Market:** {unit.get('days_on_market')}",
f"- **Status:** {unit.get('sale_status')}",
"",
]
)
return "\n".join(lines)
def _render_similar_units_table(data: dict[str, Any]) -> str:
"""Render similar units as table."""
similar = data.get("similar_units", [])
lines = [
"| Address | Listing Price | Selling Price | Area | Price/m² | Days | Status |",
"|---------|---------------|---------------|------|----------|------|--------|",
]
for unit in similar:
addr = (unit.get("address") or "")[:30]
list_price = f"{unit.get('listing_price'):,}" if unit.get("listing_price") else "N/A"
sell_price = f"{unit.get('selling_price'):,}" if unit.get("selling_price") else "N/A"
area = f"{unit.get('usable_area')}" if unit.get("usable_area") else "N/A"
price_sqm = f"{unit.get('sqm_price'):,}" if unit.get("sqm_price") else "N/A"
days = unit.get("days_on_market") or "N/A"
status = unit.get("sale_status") or "N/A"
lines.append(
f"| {addr} | {list_price} | {sell_price} | {area} m² | {price_sqm} | {days} | {status} |"
)
return "\n".join(lines)
# ============================================================================
# Diff renderer
# ============================================================================
def render_diff(payload: dict[str, Any], fmt: OutputFormat) -> str:
"""Render diff of new/removed/changed listings."""
_validate_format(fmt)
if fmt == "json":
return json.dumps(payload, indent=2, default=str)
elif fmt == "markdown":
return _render_diff_markdown(payload)
else: # table
return _render_diff_table(payload)
def _render_diff_markdown(data: dict[str, Any]) -> str:
"""Render diff as markdown."""
new = data.get("new_ads", [])
removed = data.get("removed_ads", [])
changed = data.get("changed_ads", [])
lines = [
"# Listing Diff",
"",
]
if new:
lines.extend(
[
f"## 🆕 New Listings ({len(new)})",
"",
]
)
for ad in new:
lines.extend(
[
f"- {ad.get('title', 'Unknown')} ({ad.get('address')})",
f" - Price: {ad.get('asking_price'):,} NOK"
if ad.get("asking_price")
else " - Price: N/A",
"",
]
)
if removed:
lines.extend(
[
f"## ❌ Removed Listings ({len(removed)})",
"",
]
)
for ad in removed:
lines.extend(
[
f"- {ad.get('title', 'Unknown')} ({ad.get('address')})",
"",
]
)
if changed:
lines.extend(
[
f"## 🔄 Changed Listings ({len(changed)})",
"",
]
)
for change in changed:
lines.extend(
[
f"- {change.get('title', 'Unknown')} ({change.get('address')})",
]
)
diffs = change.get("diffs", {})
for key, (old, new_val) in diffs.items():
lines.append(f" - {key}: {old}{new_val}")
lines.append("")
return "\n".join(lines)
def _render_diff_table(data: dict[str, Any]) -> str:
"""Render diff as table."""
new = data.get("new_ads", [])
removed = data.get("removed_ads", [])
changed = data.get("changed_ads", [])
lines = [
"| Type | Address | Price | Status |",
"|------|---------|-------|--------|",
]
for ad in new:
addr = (ad.get("address") or "")[:30]
price = f"{ad.get('asking_price'):,}" if ad.get("asking_price") else "N/A"
lines.append(f"| 🆕 New | {addr} | {price} | - |")
for ad in removed:
addr = (ad.get("address") or "")[:30]
lines.append(f"| ❌ Removed | {addr} | - | - |")
for change in changed:
addr = (change.get("address") or "")[:30]
diffs = change.get("diffs", {})
changed_fields = ", ".join(diffs.keys())
lines.append(f"| 🔄 Changed | {addr} | - | {changed_fields} |")
return "\n".join(lines)
# ============================================================================
# Score breakdown renderer
# ============================================================================
def render_score_breakdown(payload: dict[str, Any], fmt: OutputFormat) -> str:
"""Render score breakdown details."""
_validate_format(fmt)
if fmt == "json":
return json.dumps(payload, indent=2, default=str)
else:
return _render_score_breakdown_markdown(payload)
def _render_score_breakdown_markdown(data: dict[str, Any]) -> str:
"""Render score breakdown as markdown."""
scores = data.get("scores", {})
lines = [
"# Score Breakdown",
"",
]
for key, value in scores.items():
lines.append(f"- **{key}:** {value}")
return "\n".join(lines)
# ============================================================================
# Cache stats renderer
# ============================================================================
def render_cache_stats(payload: dict[str, Any], fmt: OutputFormat) -> str:
"""Render cache statistics."""
_validate_format(fmt)
if fmt == "json":
return json.dumps(payload, indent=2, default=str)
elif fmt == "markdown":
return _render_cache_stats_markdown(payload)
else: # table
return _render_cache_stats_table(payload)
def _render_cache_stats_markdown(data: dict[str, Any]) -> str:
"""Render cache stats as markdown."""
lines = [
"# Cache Statistics",
"",
f"- **Total FINN Ads:** {data.get('total_finn_ads', 0)}",
f"- **Total Eiendom Units:** {data.get('total_eiendom_units', 0)}",
f"- **Total Search Runs:** {data.get('total_search_runs', 0)}",
f"- **Cache Size:** {data.get('cache_size_mb', 0):.2f} MB",
f"- **Last Updated:** {data.get('last_updated')}",
]
return "\n".join(lines)
def _render_cache_stats_table(data: dict[str, Any]) -> str:
"""Render cache stats as table."""
lines = [
"| Metric | Value |",
"|--------|-------|",
f"| Total FINN Ads | {data.get('total_finn_ads', 0)} |",
f"| Total Eiendom Units | {data.get('total_eiendom_units', 0)} |",
f"| Total Search Runs | {data.get('total_search_runs', 0)} |",
f"| Cache Size | {data.get('cache_size_mb', 0):.2f} MB |",
f"| Last Updated | {data.get('last_updated')} |",
]
return "\n".join(lines)
+145 -1
View File
@@ -2,6 +2,7 @@
import json
import logging
from typing import Any
from mcp.server.fastmcp import FastMCP
@@ -13,7 +14,24 @@ from .eiendom_no import (
get_unit,
search_unit_from_finn_url,
)
from .service import get_or_fetch_ad, get_or_fetch_eiendom_unit
from .formatting import (
render_ad,
render_comparison,
render_diff,
render_shortlist,
render_similar_units,
)
from .service import (
analyze_ad,
analyze_ad_against_comps,
compare_ads,
find_similar_to_liked,
get_new_ads_since_last_run,
get_or_fetch_ad,
get_or_fetch_eiendom_unit,
get_shortlist,
save_feedback,
)
logger = logging.getLogger(__name__)
@@ -151,6 +169,132 @@ def finn_decode_unit_vector(unit_vector: str) -> str:
return json.dumps({"error": True, "message": str(e)})
# ============================================================================
# Additional analysis and enrichment tools
# ============================================================================
@mcp.tool(
description=(
"Fetch and enrich a single FINN ad with optional Eiendom.no data and comparable units."
)
)
async def finn_analyze_ad(
finnkode: str,
include_eiendom_no: bool = True,
include_similar_units: bool = False,
) -> str:
"""Analyze and enrich a single FINN ad."""
try:
result = await analyze_ad(
finnkode,
include_eiendom_no=include_eiendom_no,
include_similar_units=include_similar_units,
)
return render_ad(result.get("ad", {}), "json")
except Exception as e:
logger.error(f"Error analyzing ad {finnkode}: {e}")
return json.dumps({"error": True, "message": str(e)})
@mcp.tool(
description=(
"Evaluate one FINN listing against comparable recently-sold properties from Eiendom.no."
)
)
async def finn_analyze_ad_against_comps(
finnkode: str, listing_status: str = "RECENTLY_SOLD"
) -> str:
"""Analyze ad against comparable sales."""
try:
result = await analyze_ad_against_comps(finnkode, listing_status=listing_status)
return json.dumps(result, default=str)
except Exception as e:
logger.error(f"Error analyzing ad {finnkode} against comps: {e}")
return json.dumps({"error": True, "message": str(e)})
@mcp.tool(
description=(
"Find properties similar to a listing the user has liked. "
"Requires that the user has marked the listing with verdict='liked'."
)
)
async def finn_find_similar_to_liked_ad(
finnkode: str, mode: str = "recommendations", listing_status: str = "FOR_SALE"
) -> str:
"""Find properties similar to a liked ad."""
try:
result = await find_similar_to_liked(finnkode, mode=mode, listing_status=listing_status)
return render_similar_units(result, "json")
except Exception as e:
logger.error(f"Error finding similar to {finnkode}: {e}")
return json.dumps({"error": True, "message": str(e)})
@mcp.tool(description="Compare multiple FINN listings side by side with optional enrichment.")
async def finn_compare_ads(
finnkoder: list[str],
include_eiendom_no: bool = True,
include_comps: bool = True,
) -> str:
"""Compare multiple ads."""
try:
result = await compare_ads(
finnkoder,
include_eiendom_no=include_eiendom_no,
include_comps=include_comps,
)
return render_comparison(result, "json")
except Exception as e:
logger.error(f"Error comparing ads: {e}")
return json.dumps({"error": True, "message": str(e)})
@mcp.tool(
description="Store user feedback (verdict, notes) for a FINN listing. "
"Enables similar-unit recommendations and shortlist filtering."
)
async def finn_save_feedback(finnkode: str, verdict: str, notes: str | None = None) -> str:
"""Save user feedback for a listing."""
try:
result = save_feedback(finnkode, verdict, notes)
return json.dumps(result, default=str)
except Exception as e:
logger.error(f"Error saving feedback for {finnkode}: {e}")
return json.dumps({"error": True, "message": str(e)})
@mcp.tool(
description="Fetch the stored shortlist from a previous search run. "
"Returns the ranked listings with all enrichment data."
)
def finn_get_shortlist(run_id: int | None = None, limit: int = 10) -> str:
"""Get stored shortlist."""
try:
result = get_shortlist(run_id, limit)
return render_shortlist(result, "json")
except Exception as e:
logger.error(f"Error fetching shortlist: {e}")
return json.dumps({"error": True, "message": str(e)})
@mcp.tool(
description=(
"Detect new, removed, and changed listings in a FINN search URL "
"compared to the previous run. Shows price/status diffs on changed listings."
)
)
async def finn_get_new_ads_since_last_run(search_url: str) -> str:
"""Get new/removed/changed listings since last run."""
try:
result = get_new_ads_since_last_run(search_url)
return render_diff(result, "json")
except Exception as e:
logger.error(f"Error fetching diff: {e}")
return json.dumps({"error": True, "message": str(e)})
def main() -> None:
"""Run the FastMCP stdio server."""
mcp.run(transport="stdio")
+3 -3
View File
@@ -60,7 +60,7 @@ class FinnAd(BaseModel):
detail_fetched_at: datetime | None = None
eiendom_unit_code: str | None = None
model_config = ConfigDict(serializers={datetime: lambda v: v.isoformat()})
model_config = ConfigDict()
class EiendomUnit(BaseModel):
@@ -87,7 +87,7 @@ class EiendomUnit(BaseModel):
unit_vector: str | None = None
fetched_at: datetime = Field(default_factory=lambda: datetime.now(UTC))
model_config = ConfigDict(serializers={datetime: lambda v: v.isoformat()})
model_config = ConfigDict()
class SimilarUnit(BaseModel):
@@ -112,7 +112,7 @@ class SimilarUnit(BaseModel):
finalized_at: datetime | None = None
listing_status: str = Field(default="RECENTLY_SOLD")
model_config = ConfigDict(serializers={datetime: lambda v: v.isoformat() if v else None})
model_config = ConfigDict()
class UnitVector(BaseModel):
+177 -4
View File
@@ -1,13 +1,27 @@
"""Service layer for cache-aware fetching of FINN ads and Eiendom.no units."""
import logging
from typing import Any
from .ad import fetch_ad_details
from .cache import get_eiendom_unit as get_cached_eiendom_unit
from .cache import get_finn_ad, init_db, save_eiendom_unit, save_finn_ad
from .analysis import analyze_search as run_analysis_search
from .cache import (
get_eiendom_unit as get_cached_eiendom_unit,
get_finn_ad,
init_db,
save_eiendom_unit,
save_finn_ad,
)
from .config import FINN_CACHE_PATH
from .eiendom_no import get_unit
from .models import EiendomUnit, FinnAd
from .eiendom_no import (
build_unit_vector,
decode_unit_vector,
get_similar_units,
get_unit,
search_unit_from_finn_url,
)
from .feedback import save_feedback as save_feedback_impl
from .models import EiendomUnit, FinnAd, SimilarUnit
logger = logging.getLogger(__name__)
@@ -33,3 +47,162 @@ async def get_or_fetch_eiendom_unit(
if unit is not None:
save_eiendom_unit(conn, unit)
return unit
async def get_or_fetch_similar_units(
unit_code: str, listing_status: str = "RECENTLY_SOLD", force_refresh: bool = False
) -> list[SimilarUnit]:
"""Get similar units (comps) from cache or fetch fresh."""
# Similar units don't have a separate cache table; fetch fresh each time per PRD
# (or cache them in search_runs if doing diff detection)
return await get_similar_units(unit_code, listing_status=listing_status)
async def resolve_eiendom_unit_from_finn_url(finn_url: str) -> EiendomUnit | None:
"""Resolve an Eiendom.no unit from a FINN listing URL."""
return await search_unit_from_finn_url(finn_url)
# ============================================================================
# Orchestration functions — delegate to analysis.py
# ============================================================================
async def analyze_search(
search_url: str,
*,
max_pages: int = 3,
detail_limit: int = 20,
include_details: bool = True,
include_eiendom_no: bool = True,
include_similar_units_for_shortlist: bool = False,
) -> dict[str, Any]:
"""Analyze a FINN search URL and return a ranked shortlist."""
return await run_analysis_search(
search_url,
max_pages=max_pages,
fetch_details=include_details,
detail_limit=detail_limit,
include_eiendom_no=include_eiendom_no,
include_similar_units_for_shortlist=include_similar_units_for_shortlist,
)
async def analyze_ad(
finnkode: str,
*,
include_eiendom_no: bool = True,
include_similar_units: bool = False,
) -> dict[str, Any]:
"""Fetch and enrich a single FINN ad with analysis."""
ad = await get_or_fetch_ad(finnkode)
result: dict[str, Any] = {
"ad": ad.model_dump(),
}
if include_eiendom_no and ad.eiendom_unit_code:
unit = await get_or_fetch_eiendom_unit(ad.eiendom_unit_code)
if unit:
result["eiendom_unit"] = unit.model_dump()
if include_similar_units:
similar = await get_or_fetch_similar_units(ad.eiendom_unit_code)
result["similar_units"] = [s.model_dump() for s in similar]
return result
async def analyze_ad_against_comps(
finnkode: str, listing_status: str = "RECENTLY_SOLD"
) -> dict[str, Any]:
"""Evaluate one listing against recent comparable sales."""
ad = await get_or_fetch_ad(finnkode)
result: dict[str, Any] = {
"ad": ad.model_dump(),
}
if ad.eiendom_unit_code:
unit = await get_or_fetch_eiendom_unit(ad.eiendom_unit_code)
if unit:
result["eiendom_unit"] = unit.model_dump()
comps = await get_or_fetch_similar_units(
ad.eiendom_unit_code, listing_status=listing_status
)
result["comparable_units"] = [c.model_dump() for c in comps]
return result
async def find_similar_to_liked(
finnkode: str, *, mode: str = "recommendations", listing_status: str = "FOR_SALE"
) -> dict[str, Any]:
"""Find properties similar to a listing the user has liked."""
# Requires that feedback.verdict = "liked" exists for this finnkode
ad = await get_or_fetch_ad(finnkode)
if not ad.eiendom_unit_code:
raise ValueError(
f"Finnkode {finnkode} has no Eiendom.no unit_code; cannot find similar properties"
)
# TODO: verify feedback verdict = "liked" exists
unit = await get_or_fetch_eiendom_unit(ad.eiendom_unit_code)
if not unit:
raise ValueError(f"Cannot enrich finnkode {finnkode} with Eiendom.no data")
similar = await get_or_fetch_similar_units(ad.eiendom_unit_code, listing_status=listing_status)
return {
"base_ad": ad.model_dump(),
"similar_listings": [s.model_dump() for s in similar],
"mode": mode,
}
async def compare_ads(
finnkoder: list[str], *, include_eiendom_no: bool = True, include_comps: bool = True
) -> dict[str, Any]:
"""Compare multiple FINN listings side by side."""
ads = []
for finnkode in finnkoder:
ad = await get_or_fetch_ad(finnkode)
ad_data = ad.model_dump()
if include_eiendom_no and ad.eiendom_unit_code:
unit = await get_or_fetch_eiendom_unit(ad.eiendom_unit_code)
if unit:
ad_data["eiendom_unit"] = unit.model_dump()
if include_comps:
comps = await get_or_fetch_similar_units(
ad.eiendom_unit_code, listing_status="RECENTLY_SOLD"
)
ad_data["comps"] = [c.model_dump() for c in comps]
ads.append(ad_data)
return {"listings": ads}
# ============================================================================
# Helper functions
# ============================================================================
def build_unit_vector_for_unit_code(unit_code: str) -> dict[str, Any]:
"""Build a unit_vector dict for a unit_code (msgpack-encoded)."""
return build_unit_vector(unit_code)
def decode_unit_vector_to_dict(unit_vector: str) -> dict[str, Any]:
"""Decode a unit_vector string to a dict."""
return decode_unit_vector(unit_vector)
def save_feedback(finnkode: str, verdict: str, notes: str | None = None) -> dict[str, Any]:
"""Store user feedback/verdict for a listing."""
return save_feedback_impl(finnkode, verdict, notes)
def get_shortlist(run_id: int | None = None, limit: int = 10) -> dict[str, Any]:
"""Fetch stored shortlist from a search run."""
# TODO: implement via search_runs table in cache.py
return {"shortlist": [], "run_id": run_id, "limit": limit}
def get_new_ads_since_last_run(search_url: str) -> dict[str, Any]:
"""Detect new/removed/changed listings vs the previous run."""
# TODO: implement via search_runs table in cache.py
return {"new_ads": [], "removed_ads": [], "changed_ads": [], "search_url": search_url}