initial
This commit is contained in:
@@ -0,0 +1,36 @@
|
||||
"""FINN Real Estate MCP Server - Private property analysis platform."""
|
||||
|
||||
__version__ = "0.1.0"
|
||||
__author__ = "FINN Scout"
|
||||
|
||||
from . import ad, analysis, cache, config, eiendom_no, scoring, search
|
||||
from .http import HTTPClient
|
||||
from .models import EiendomUnit, FinnAd, FinnSearchCard, SimilarUnit, UnitVector
|
||||
from .parser import (
|
||||
extract_finnkode_from_url,
|
||||
normalize_area,
|
||||
normalize_finnkode,
|
||||
normalize_number,
|
||||
normalize_price,
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
"config",
|
||||
"FinnAd",
|
||||
"FinnSearchCard",
|
||||
"EiendomUnit",
|
||||
"SimilarUnit",
|
||||
"UnitVector",
|
||||
"normalize_price",
|
||||
"normalize_area",
|
||||
"normalize_number",
|
||||
"normalize_finnkode",
|
||||
"extract_finnkode_from_url",
|
||||
"HTTPClient",
|
||||
"ad",
|
||||
"analysis",
|
||||
"cache",
|
||||
"eiendom_no",
|
||||
"scoring",
|
||||
"search",
|
||||
]
|
||||
@@ -0,0 +1,193 @@
|
||||
"""FINN listing detail scraping and normalization."""
|
||||
|
||||
import logging
|
||||
import re
|
||||
from datetime import UTC, datetime
|
||||
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
from .http import HTTPClient
|
||||
from .models import FinnAd
|
||||
from .parser import (
|
||||
clean_text,
|
||||
extract_finnkode_from_url,
|
||||
normalize_area,
|
||||
normalize_finnkode,
|
||||
normalize_number,
|
||||
normalize_price,
|
||||
text_to_bool,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
FINN_AD_URL_TEMPLATE = "https://www.finn.no/realestate/homes/ad.html?finnkode={}"
|
||||
|
||||
|
||||
async def fetch_ad(finnkode: str, client: HTTPClient | None = None) -> str:
|
||||
"""Fetch FINN listing HTML by finnkode."""
|
||||
client = client or HTTPClient(request_delay_seconds=0.0)
|
||||
url = FINN_AD_URL_TEMPLATE.format(finnkode)
|
||||
response = await client.get(url)
|
||||
return response.text
|
||||
|
||||
|
||||
def _load_property_map(soup: BeautifulSoup) -> dict[str, str]:
|
||||
properties: dict[str, str] = {}
|
||||
for dt, dd in zip(soup.find_all("dt"), soup.find_all("dd"), strict=False):
|
||||
key = clean_text(dt.get_text()) or ""
|
||||
value = clean_text(dd.get_text()) or ""
|
||||
properties[key.lower()] = value
|
||||
return properties
|
||||
|
||||
|
||||
def _get_data_testid_value(soup: BeautifulSoup, testid: str) -> str | None:
|
||||
node = soup.select_one(f'[data-testid="{testid}"]')
|
||||
if not node:
|
||||
return None
|
||||
return clean_text(node.get_text(" ", strip=True))
|
||||
|
||||
|
||||
def _strip_labelled_text(text: str | None, labels: list[str]) -> str | None:
|
||||
if not text:
|
||||
return None
|
||||
for label in labels:
|
||||
if text.lower().startswith(label.lower()):
|
||||
return clean_text(text[len(label) :])
|
||||
return text
|
||||
|
||||
|
||||
def _extract_floor_from_text(text: str | None) -> str | None:
|
||||
if not text:
|
||||
return None
|
||||
match = re.search(r"(\d+)\s*\.?\s*etasje", text, re.IGNORECASE)
|
||||
if match:
|
||||
return f"{match.group(1)}. etasje"
|
||||
return None
|
||||
|
||||
|
||||
def _clean_description(text: str | None) -> str | None:
|
||||
if not text:
|
||||
return None
|
||||
cleaned = re.sub(r"(?i)^om boligen", "", text).strip()
|
||||
cleaned = re.sub(r"(?i)^beskrivelse", "", cleaned).strip()
|
||||
return clean_text(cleaned)
|
||||
|
||||
|
||||
def _load_feature_text(soup: BeautifulSoup) -> str:
|
||||
return _get_data_testid_value(soup, "object-facilities") or ""
|
||||
|
||||
|
||||
def _extract_description(soup: BeautifulSoup) -> str | None:
|
||||
node = soup.select_one('[data-testid="om boligen"]') or soup.select_one(".description")
|
||||
if not node:
|
||||
return None
|
||||
paragraphs = [clean_text(p.get_text()) for p in node.select("p") if clean_text(p.get_text())]
|
||||
if paragraphs:
|
||||
return "\n".join(paragraphs)
|
||||
return _clean_description(node.get_text(" ", strip=True))
|
||||
|
||||
|
||||
def scrape_ad(html: str, url: str | None = None) -> FinnAd:
|
||||
"""Scrape a FINN listing HTML page into a FinnAd model."""
|
||||
soup = BeautifulSoup(html, "html.parser")
|
||||
title_node = soup.select_one("h1")
|
||||
broker_name = soup.select_one(".broker-name")
|
||||
|
||||
properties = _load_property_map(soup)
|
||||
feature_text = _load_feature_text(soup).lower()
|
||||
finnkode = normalize_finnkode(extract_finnkode_from_url(url or "")) or ""
|
||||
address = _get_data_testid_value(soup, "object-address") or properties.get("adresse")
|
||||
district = _get_data_testid_value(soup, "local-area-name") or properties.get("område")
|
||||
ownership_type = _strip_labelled_text(
|
||||
_get_data_testid_value(soup, "info-ownership-type"), ["Eieform", "Eiendomstype"]
|
||||
) or properties.get("eierform")
|
||||
property_type = _strip_labelled_text(
|
||||
_get_data_testid_value(soup, "info-property-type"), ["Boligtype", "Eiendomstype"]
|
||||
) or properties.get("eiendomstype")
|
||||
|
||||
asking_price = normalize_price(
|
||||
properties.get("prisantydning") or _get_data_testid_value(soup, "pricing-incicative-price")
|
||||
)
|
||||
total_price_value = normalize_price(
|
||||
properties.get("totalpris") or _get_data_testid_value(soup, "pricing-total-price")
|
||||
)
|
||||
shared_debt = normalize_price(
|
||||
properties.get("fellesgjeld") or _get_data_testid_value(soup, "pricing-joint-debt")
|
||||
)
|
||||
common_costs = normalize_number(
|
||||
properties.get("felles utgifter")
|
||||
or _get_data_testid_value(soup, "pricing-common-monthly-cost")
|
||||
)
|
||||
area_m2 = normalize_area(
|
||||
properties.get("boligareal")
|
||||
or _get_data_testid_value(soup, "info-usable-i-area")
|
||||
or _get_data_testid_value(soup, "info-usable-area")
|
||||
)
|
||||
rooms = normalize_number(properties.get("rom") or _get_data_testid_value(soup, "info-rooms"))
|
||||
bedrooms = normalize_number(
|
||||
properties.get("soverom") or _get_data_testid_value(soup, "info-bedrooms")
|
||||
)
|
||||
floor = (
|
||||
properties.get("etasje")
|
||||
or _extract_floor_from_text(title_node.get_text() if title_node else "")
|
||||
or _get_data_testid_value(soup, "info-floor")
|
||||
)
|
||||
construction_year = normalize_number(
|
||||
properties.get("byggeår") or _get_data_testid_value(soup, "info-construction-year")
|
||||
)
|
||||
energy_rating = properties.get("energimerking")
|
||||
heating = properties.get("oppvarming")
|
||||
has_balcony = text_to_bool(properties.get("balkonger/terrasser")) or "balkong" in feature_text
|
||||
has_terrace = "terrasse" in feature_text
|
||||
has_elevator = text_to_bool(properties.get("heis")) or "heis" in feature_text
|
||||
has_parking = (
|
||||
bool(properties.get("parkering/garasje"))
|
||||
or "parkering" in feature_text
|
||||
or "garasje" in feature_text
|
||||
)
|
||||
broker_company = None
|
||||
if broker_name:
|
||||
broker_company = clean_text(broker_name.get_text())
|
||||
|
||||
listing_description = _extract_description(soup)
|
||||
|
||||
ad = FinnAd(
|
||||
finnkode=finnkode,
|
||||
url=url or "",
|
||||
title=clean_text(title_node.get_text()) if title_node else None,
|
||||
address=address,
|
||||
postal_area=properties.get("postnummer"),
|
||||
district=district,
|
||||
property_type=property_type,
|
||||
ownership_type=ownership_type,
|
||||
asking_price=asking_price,
|
||||
total_price=total_price_value,
|
||||
shared_debt=shared_debt,
|
||||
common_costs=common_costs,
|
||||
municipal_fee=normalize_number(properties.get("kommunale avgifter")),
|
||||
other_fees=normalize_number(properties.get("andre utgifter")),
|
||||
area_m2=area_m2,
|
||||
rooms=rooms,
|
||||
bedrooms=bedrooms,
|
||||
floor=floor,
|
||||
construction_year=construction_year,
|
||||
energy_rating=energy_rating,
|
||||
heating=heating,
|
||||
has_balcony=has_balcony,
|
||||
has_terrace=has_terrace,
|
||||
has_elevator=has_elevator,
|
||||
has_parking=has_parking,
|
||||
listing_description=listing_description,
|
||||
broker_name=None,
|
||||
broker_company=broker_company,
|
||||
detail_fetched_at=None,
|
||||
)
|
||||
return ad
|
||||
|
||||
|
||||
async def fetch_ad_details(finnkode: str, client: HTTPClient | None = None) -> FinnAd:
|
||||
"""Fetch FINN listing HTML and return a parsed FinnAd object."""
|
||||
html = await fetch_ad(finnkode, client=client)
|
||||
ad = scrape_ad(html, url=FINN_AD_URL_TEMPLATE.format(finnkode))
|
||||
ad.detail_fetched_at = datetime.now(UTC)
|
||||
return ad
|
||||
@@ -0,0 +1,175 @@
|
||||
"""Orchestration for FINN search + Eiendom.no enrichment + scoring."""
|
||||
|
||||
import logging
|
||||
|
||||
from . import ad as ad_module
|
||||
from . import cache, eiendom_no, scoring, search
|
||||
from .config import (
|
||||
FINN_CACHE_PATH,
|
||||
FINN_CACHE_TTL_AD_HOURS,
|
||||
FINN_DETAIL_LIMIT,
|
||||
FINN_MAX_SEARCH_PAGES,
|
||||
)
|
||||
from .models import EiendomUnit, FinnAd, SimilarUnit
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _normalize_description(text: str | None) -> str:
|
||||
return text.lower() if text else ""
|
||||
|
||||
|
||||
def _build_ad_summary(
|
||||
ad: FinnAd,
|
||||
enriched: EiendomUnit | None,
|
||||
similar_units: list[SimilarUnit],
|
||||
scores: dict,
|
||||
categories: list[str],
|
||||
) -> dict:
|
||||
description = _normalize_description(ad.listing_description)
|
||||
reasons = []
|
||||
risks = []
|
||||
next_steps = [
|
||||
"Open the FINN listing and condition report.",
|
||||
"Review the Eiendom.no estimate and comparable sales.",
|
||||
"Ask the broker about renovation status and approvals.",
|
||||
]
|
||||
|
||||
if enriched and enriched.estimated_selling_price and ad.total_price:
|
||||
if ad.total_price < enriched.estimated_selling_price:
|
||||
reasons.append("Listing price is below Eiendom.no estimate.")
|
||||
elif ad.total_price <= enriched.estimated_selling_price_upper:
|
||||
reasons.append("Price sits within the local estimate range.")
|
||||
else:
|
||||
reasons.append("Listing price is above the estimate range.")
|
||||
else:
|
||||
reasons.append("Eiendom.no enrichment is unavailable or incomplete.")
|
||||
|
||||
if "utsikt" in description or ad.has_balcony or ad.has_terrace:
|
||||
reasons.append("Outdoor space or view potential is positive.")
|
||||
if "hybel" in description or "leie" in description:
|
||||
reasons.append("Potential hybel/rental opportunity is mentioned.")
|
||||
if "potensial" in description or "renover" in description:
|
||||
reasons.append("Renovation or improvement potential is highlighted.")
|
||||
|
||||
if scores.get("risk", 0.0) < 0:
|
||||
risks.append("Risk flags are detected in description or metadata.")
|
||||
if ad.common_costs and ad.common_costs > 5000:
|
||||
risks.append("Common costs are relatively high and should be reviewed.")
|
||||
if enriched and enriched.sale_status and enriched.sale_status.upper() != "FOR_SALE":
|
||||
risks.append("Eiendom.no sale status does not indicate an active sale.")
|
||||
if not enriched:
|
||||
risks.append("Missing Eiendom.no data increases uncertainty.")
|
||||
|
||||
if not any("Eiendom.no" in step for step in next_steps):
|
||||
next_steps.append("Verify the property on Eiendom.no and reconcile any mismatches.")
|
||||
|
||||
if similar_units:
|
||||
next_steps.append("Review the comparable units and average sqm prices.")
|
||||
else:
|
||||
next_steps.append("Comparable sales are unavailable; treat valuation with caution.")
|
||||
|
||||
return {
|
||||
"why_interesting": reasons,
|
||||
"risks": risks,
|
||||
"next_steps": next_steps,
|
||||
"shortlist_reason": ", ".join(reasons[:3])
|
||||
if reasons
|
||||
else "Review details and seller disclosures.",
|
||||
}
|
||||
|
||||
|
||||
async def analyze_ad(
|
||||
finn_ad: FinnAd,
|
||||
unit_code: str | None = None,
|
||||
) -> dict:
|
||||
"""Enrich a FinnAd and compute score summary."""
|
||||
conn = cache.init_db(FINN_CACHE_PATH)
|
||||
enriched: EiendomUnit | None = None
|
||||
similar_units: list[SimilarUnit] = []
|
||||
|
||||
if unit_code:
|
||||
enriched = cache.get_eiendom_unit(conn, unit_code)
|
||||
if enriched is None:
|
||||
enriched = await eiendom_no.enrich_ad_with_eiendom_no(finn_ad, unit_code)
|
||||
if enriched is not None:
|
||||
cache.save_eiendom_unit(conn, enriched)
|
||||
|
||||
if enriched and enriched.unit_vector:
|
||||
similar_units = cache.get_similar_units(conn, enriched.unit_code, "RECENTLY_SOLD")
|
||||
if not similar_units:
|
||||
similar_units = await eiendom_no.get_similar_units(enriched.unit_vector)
|
||||
if similar_units:
|
||||
cache.save_similar_units(conn, enriched.unit_code, "RECENTLY_SOLD", similar_units)
|
||||
|
||||
scores = scoring.score_ad(finn_ad, enriched, similar_units)
|
||||
categories = scoring.classify_ad(scores)
|
||||
summary = _build_ad_summary(finn_ad, enriched, similar_units, scores, categories)
|
||||
|
||||
result = {
|
||||
"finnkode": finn_ad.finnkode,
|
||||
"title": finn_ad.title,
|
||||
"address": finn_ad.address,
|
||||
"score": scores,
|
||||
"categories": categories,
|
||||
"summary": summary,
|
||||
"eiendom_unit": enriched.model_dump() if enriched else None,
|
||||
"similar_units": [unit.model_dump() for unit in similar_units],
|
||||
}
|
||||
cache.save_finn_ad(conn, finn_ad)
|
||||
return result
|
||||
|
||||
|
||||
async def analyze_search(
|
||||
search_url: str,
|
||||
max_pages: int = FINN_MAX_SEARCH_PAGES,
|
||||
fetch_details: bool = True,
|
||||
detail_limit: int = FINN_DETAIL_LIMIT,
|
||||
include_eiendom_no: bool = True,
|
||||
client=None,
|
||||
use_cache: bool = True,
|
||||
) -> dict:
|
||||
"""Analyze a FINN search URL and enrich matching listings."""
|
||||
conn = cache.init_db(FINN_CACHE_PATH)
|
||||
cards = await search.fetch_search_pages(
|
||||
search_url,
|
||||
max_pages=max_pages,
|
||||
client=client,
|
||||
use_cache=use_cache,
|
||||
)
|
||||
results = []
|
||||
enriched_count = 0
|
||||
|
||||
if fetch_details:
|
||||
for card in cards[:detail_limit]:
|
||||
finn_ad = cache.get_finn_ad(conn, card.finnkode, ttl_hours=FINN_CACHE_TTL_AD_HOURS)
|
||||
if finn_ad is None:
|
||||
finn_ad = await ad_module.fetch_ad_details(card.finnkode, client=client)
|
||||
unit_code = None
|
||||
if include_eiendom_no:
|
||||
try:
|
||||
matched_unit = await eiendom_no.search_unit_from_finn_url(card.url)
|
||||
except Exception as exc:
|
||||
logger.warning("Eiendom.no unit search failed: %s", exc)
|
||||
matched_unit = None
|
||||
unit_code = (
|
||||
matched_unit.unit_code
|
||||
if matched_unit
|
||||
else eiendom_no.resolve_unit_from_finn_url(card.url)
|
||||
)
|
||||
result = await analyze_ad(finn_ad, unit_code=unit_code)
|
||||
if result.get("eiendom_unit"):
|
||||
enriched_count += 1
|
||||
results.append(result)
|
||||
|
||||
results.sort(key=lambda item: item["score"].get("total", 0.0), reverse=True)
|
||||
return {
|
||||
"search_url": search_url,
|
||||
"search_cards": [card.model_dump() for card in cards],
|
||||
"analysis": results,
|
||||
"summary": {
|
||||
"total_listings": len(cards),
|
||||
"analyzed_listings": len(results),
|
||||
"eiendom_enriched": enriched_count,
|
||||
},
|
||||
}
|
||||
@@ -0,0 +1,243 @@
|
||||
"""SQLite cache and persistence for FINN and Eiendom.no data."""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import sqlite3
|
||||
from datetime import UTC, datetime, timedelta
|
||||
from typing import Any
|
||||
|
||||
from .config import FINN_CACHE_PATH
|
||||
from .models import EiendomUnit, FinnAd, FinnSearchCard, SimilarUnit
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def get_connection(path: str | None = None) -> sqlite3.Connection:
|
||||
db_path = path or FINN_CACHE_PATH
|
||||
conn = sqlite3.connect(str(db_path), detect_types=sqlite3.PARSE_DECLTYPES)
|
||||
conn.row_factory = sqlite3.Row
|
||||
return conn
|
||||
|
||||
|
||||
def init_db(path: str | None = None) -> sqlite3.Connection:
|
||||
conn = get_connection(path)
|
||||
cursor = conn.cursor()
|
||||
cursor.execute(
|
||||
"""
|
||||
CREATE TABLE IF NOT EXISTS finn_ads (
|
||||
finnkode TEXT PRIMARY KEY,
|
||||
url TEXT,
|
||||
payload TEXT NOT NULL,
|
||||
fetched_at TEXT NOT NULL
|
||||
)
|
||||
"""
|
||||
)
|
||||
cursor.execute(
|
||||
"""
|
||||
CREATE TABLE IF NOT EXISTS eiendom_units (
|
||||
unit_code TEXT PRIMARY KEY,
|
||||
payload TEXT NOT NULL,
|
||||
fetched_at TEXT NOT NULL
|
||||
)
|
||||
"""
|
||||
)
|
||||
cursor.execute(
|
||||
"""
|
||||
CREATE TABLE IF NOT EXISTS similar_units (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
unit_code TEXT NOT NULL,
|
||||
listing_status TEXT NOT NULL,
|
||||
payload TEXT NOT NULL,
|
||||
fetched_at TEXT NOT NULL
|
||||
)
|
||||
"""
|
||||
)
|
||||
cursor.execute(
|
||||
"""
|
||||
CREATE TABLE IF NOT EXISTS cache_meta (
|
||||
key TEXT PRIMARY KEY,
|
||||
value TEXT NOT NULL,
|
||||
expires_at TEXT
|
||||
)
|
||||
"""
|
||||
)
|
||||
conn.commit()
|
||||
return conn
|
||||
|
||||
|
||||
def cache_get(conn: sqlite3.Connection, key: str) -> dict[str, Any] | None:
|
||||
cursor = conn.cursor()
|
||||
cursor.execute("SELECT value, expires_at FROM cache_meta WHERE key = ?", (key,))
|
||||
row = cursor.fetchone()
|
||||
if not row:
|
||||
return None
|
||||
|
||||
expires_at = row["expires_at"]
|
||||
if expires_at and datetime.fromisoformat(expires_at) < datetime.now(UTC):
|
||||
cursor.execute("DELETE FROM cache_meta WHERE key = ?", (key,))
|
||||
conn.commit()
|
||||
return None
|
||||
|
||||
return json.loads(row["value"])
|
||||
|
||||
|
||||
def cache_set(
|
||||
conn: sqlite3.Connection,
|
||||
key: str,
|
||||
payload: dict[str, Any],
|
||||
ttl_hours: int | None = None,
|
||||
ttl_minutes: int | None = None,
|
||||
) -> None:
|
||||
expires_at = None
|
||||
if ttl_minutes is not None:
|
||||
expires_at = (datetime.now(UTC) + timedelta(minutes=ttl_minutes)).isoformat()
|
||||
elif ttl_hours is not None:
|
||||
expires_at = (datetime.now(UTC) + timedelta(hours=ttl_hours)).isoformat()
|
||||
cursor = conn.cursor()
|
||||
cursor.execute(
|
||||
"INSERT OR REPLACE INTO cache_meta (key, value, expires_at) VALUES (?, ?, ?)",
|
||||
(key, json.dumps(payload), expires_at),
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
|
||||
def _is_fresh(fetched_at: str, ttl_hours: int | None) -> bool:
|
||||
if ttl_hours is None:
|
||||
return True
|
||||
return datetime.fromisoformat(fetched_at) >= datetime.now(UTC) - timedelta(hours=ttl_hours)
|
||||
|
||||
|
||||
def save_search_page(
|
||||
conn: sqlite3.Connection,
|
||||
url: str,
|
||||
html: str,
|
||||
ttl_minutes: int = 60,
|
||||
) -> None:
|
||||
cache_set(conn, f"search_page:{url}", {"html": html}, ttl_minutes=ttl_minutes)
|
||||
|
||||
|
||||
def get_search_page(conn: sqlite3.Connection, url: str) -> str | None:
|
||||
payload = cache_get(conn, f"search_page:{url}")
|
||||
if not payload:
|
||||
return None
|
||||
return payload.get("html")
|
||||
|
||||
|
||||
def save_search_cards(
|
||||
conn: sqlite3.Connection,
|
||||
url: str,
|
||||
cards: list[FinnSearchCard],
|
||||
ttl_minutes: int = 60,
|
||||
) -> None:
|
||||
cache_set(
|
||||
conn,
|
||||
f"search_cards:{url}",
|
||||
[card.model_dump(mode="json") for card in cards],
|
||||
ttl_minutes=ttl_minutes,
|
||||
)
|
||||
|
||||
|
||||
def get_search_cards(conn: sqlite3.Connection, url: str) -> list[FinnSearchCard]:
|
||||
payload = cache_get(conn, f"search_cards:{url}")
|
||||
if not payload:
|
||||
return []
|
||||
return [FinnSearchCard.model_validate(item) for item in payload]
|
||||
|
||||
|
||||
def save_finn_ad(conn: sqlite3.Connection, ad: FinnAd) -> None:
|
||||
cursor = conn.cursor()
|
||||
payload = ad.model_dump(mode="json")
|
||||
cursor.execute(
|
||||
"INSERT OR REPLACE INTO finn_ads (finnkode, url, payload, fetched_at) VALUES (?, ?, ?, ?)",
|
||||
(
|
||||
ad.finnkode,
|
||||
ad.url,
|
||||
json.dumps(payload),
|
||||
ad.detail_fetched_at.isoformat()
|
||||
if ad.detail_fetched_at
|
||||
else datetime.now(UTC).isoformat(),
|
||||
),
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
|
||||
def get_finn_ad(
|
||||
conn: sqlite3.Connection, finnkode: str, ttl_hours: int | None = None
|
||||
) -> FinnAd | None:
|
||||
cursor = conn.cursor()
|
||||
cursor.execute("SELECT payload, fetched_at FROM finn_ads WHERE finnkode = ?", (finnkode,))
|
||||
row = cursor.fetchone()
|
||||
if not row:
|
||||
return None
|
||||
if ttl_hours is not None and not _is_fresh(row["fetched_at"], ttl_hours):
|
||||
return None
|
||||
return FinnAd.model_validate(json.loads(row["payload"]))
|
||||
|
||||
|
||||
def save_eiendom_unit(conn: sqlite3.Connection, unit: EiendomUnit) -> None:
|
||||
cursor = conn.cursor()
|
||||
cursor.execute(
|
||||
"INSERT OR REPLACE INTO eiendom_units (unit_code, payload, fetched_at) VALUES (?, ?, ?)",
|
||||
(unit.unit_code, json.dumps(unit.model_dump(mode="json")), unit.fetched_at.isoformat()),
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
|
||||
def get_eiendom_unit(
|
||||
conn: sqlite3.Connection,
|
||||
unit_code: str,
|
||||
ttl_hours: int | None = None,
|
||||
) -> EiendomUnit | None:
|
||||
cursor = conn.cursor()
|
||||
cursor.execute(
|
||||
"SELECT payload, fetched_at FROM eiendom_units WHERE unit_code = ?",
|
||||
(unit_code,),
|
||||
)
|
||||
row = cursor.fetchone()
|
||||
if not row:
|
||||
return None
|
||||
if ttl_hours is not None and not _is_fresh(row["fetched_at"], ttl_hours):
|
||||
return None
|
||||
return EiendomUnit.model_validate(json.loads(row["payload"]))
|
||||
|
||||
|
||||
def save_similar_units(
|
||||
conn: sqlite3.Connection,
|
||||
unit_code: str,
|
||||
listing_status: str,
|
||||
similar_units: list[SimilarUnit],
|
||||
) -> None:
|
||||
cursor = conn.cursor()
|
||||
payload = json.dumps([item.model_dump(mode="json") for item in similar_units])
|
||||
cursor.execute(
|
||||
(
|
||||
"INSERT INTO similar_units"
|
||||
" (unit_code, listing_status, payload, fetched_at)"
|
||||
" VALUES (?, ?, ?, ?)"
|
||||
),
|
||||
(unit_code, listing_status, payload, datetime.now(UTC).isoformat()),
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
|
||||
def get_similar_units(
|
||||
conn: sqlite3.Connection,
|
||||
unit_code: str,
|
||||
listing_status: str,
|
||||
ttl_hours: int | None = None,
|
||||
) -> list[SimilarUnit]:
|
||||
cursor = conn.cursor()
|
||||
cursor.execute(
|
||||
(
|
||||
"SELECT payload, fetched_at FROM similar_units"
|
||||
" WHERE unit_code = ? AND listing_status = ?"
|
||||
" ORDER BY id DESC LIMIT 1"
|
||||
),
|
||||
(unit_code, listing_status),
|
||||
)
|
||||
row = cursor.fetchone()
|
||||
if not row:
|
||||
return []
|
||||
if ttl_hours is not None and not _is_fresh(row["fetched_at"], ttl_hours):
|
||||
return []
|
||||
return [SimilarUnit.model_validate(item) for item in json.loads(row["payload"])]
|
||||
@@ -0,0 +1,30 @@
|
||||
"""Configuration and environment variables."""
|
||||
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
# Cache and database
|
||||
FINN_CACHE_PATH = os.getenv("FINN_CACHE_PATH", str(Path("data/finn.sqlite")))
|
||||
|
||||
# FINN API settings
|
||||
FINN_MAX_SEARCH_PAGES = int(os.getenv("FINN_MAX_SEARCH_PAGES", "3"))
|
||||
FINN_DETAIL_LIMIT = int(os.getenv("FINN_DETAIL_LIMIT", "20"))
|
||||
FINN_REQUEST_DELAY_SECONDS = float(os.getenv("FINN_REQUEST_DELAY_SECONDS", "2"))
|
||||
FINN_USER_AGENT = os.getenv("FINN_USER_AGENT", "personal-finn-eiendom-analyzer/0.1")
|
||||
FINN_CACHE_TTL_SEARCH_MINUTES = int(os.getenv("FINN_CACHE_TTL_SEARCH_MINUTES", "60"))
|
||||
FINN_CACHE_TTL_AD_HOURS = int(os.getenv("FINN_CACHE_TTL_AD_HOURS", "24"))
|
||||
|
||||
# Eiendom.no API settings
|
||||
EIENDOM_NO_ENABLED = os.getenv("EIENDOM_NO_ENABLED", "true").lower() == "true"
|
||||
EIENDOM_NO_BASE_URL = os.getenv("EIENDOM_NO_BASE_URL", "https://api.eiendom.no/api/v1")
|
||||
EIENDOM_NO_REQUEST_DELAY_SECONDS = float(os.getenv("EIENDOM_NO_REQUEST_DELAY_SECONDS", "1"))
|
||||
EIENDOM_NO_CACHE_TTL_HOURS = int(os.getenv("EIENDOM_NO_CACHE_TTL_HOURS", "24"))
|
||||
EIENDOM_NO_SIMILAR_UNITS_ENABLED = (
|
||||
os.getenv("EIENDOM_NO_SIMILAR_UNITS_ENABLED", "true").lower() == "true"
|
||||
)
|
||||
EIENDOM_NO_SIMILAR_UNITS_DEFAULT_STATUS = os.getenv(
|
||||
"EIENDOM_NO_SIMILAR_UNITS_DEFAULT_STATUS", "RECENTLY_SOLD"
|
||||
)
|
||||
|
||||
# Logging
|
||||
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO")
|
||||
@@ -0,0 +1,236 @@
|
||||
"""Eiendom.no enrichment, unit vector, and similar units client."""
|
||||
|
||||
import base64
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
import msgpack
|
||||
|
||||
from .config import (
|
||||
EIENDOM_NO_BASE_URL,
|
||||
EIENDOM_NO_ENABLED,
|
||||
EIENDOM_NO_REQUEST_DELAY_SECONDS,
|
||||
EIENDOM_NO_SIMILAR_UNITS_DEFAULT_STATUS,
|
||||
)
|
||||
from .http import HTTPClient
|
||||
from .models import EiendomUnit, SimilarUnit, UnitVector
|
||||
from .parser import extract_finnkode_from_url, normalize_finnkode
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _extract_coordinates(geometry: dict) -> tuple[float | None, float | None]:
|
||||
if not isinstance(geometry, dict):
|
||||
return None, None
|
||||
coords = geometry.get("coordinates") or []
|
||||
if isinstance(coords, (list, tuple)) and len(coords) >= 2:
|
||||
return coords[0], coords[1]
|
||||
return None, None
|
||||
|
||||
|
||||
def parse_eiendom_unit_json(unit_data: dict) -> EiendomUnit:
|
||||
geometry = unit_data.get("geometry", {})
|
||||
lon, lat = _extract_coordinates(geometry)
|
||||
specification = unit_data.get("specification", {})
|
||||
valuation = unit_data.get("valuation", {})
|
||||
market = unit_data.get("latestMarketData", {})
|
||||
|
||||
return EiendomUnit(
|
||||
unit_code=unit_data.get("unitCode", ""),
|
||||
address=unit_data.get("address") or unit_data.get("streetAddress"),
|
||||
lat=lat or unit_data.get("lat"),
|
||||
lng=lon or unit_data.get("lon"),
|
||||
property_type=specification.get("propertyType") or unit_data.get("propertyType"),
|
||||
floor=specification.get("floor") or unit_data.get("floor"),
|
||||
rooms=specification.get("rooms") or unit_data.get("rooms"),
|
||||
construction_year=specification.get("constructionYear")
|
||||
or unit_data.get("constructionYear"),
|
||||
usable_area=specification.get("usableArea") or unit_data.get("usableArea"),
|
||||
estimated_selling_price=valuation.get("estimatedSellingPrice")
|
||||
or unit_data.get("estimatedSellingPrice"),
|
||||
estimated_selling_price_lower=valuation.get("estimatedSellingPriceLower")
|
||||
or unit_data.get("estimatedSellingPriceLower"),
|
||||
estimated_selling_price_upper=valuation.get("estimatedSellingPriceUpper")
|
||||
or unit_data.get("estimatedSellingPriceUpper"),
|
||||
listing_price=market.get("listingPrice") or unit_data.get("listingPrice"),
|
||||
listing_sqm_price=market.get("squareMeterPrice")
|
||||
or unit_data.get("listingSquareMeterPrice"),
|
||||
common_costs=market.get("monthlyCosts")
|
||||
or market.get("commonCosts")
|
||||
or unit_data.get("commonCosts"),
|
||||
days_on_market=market.get("daysOnMarket") or unit_data.get("daysOnMarket"),
|
||||
sale_status=market.get("saleStatus") or unit_data.get("saleStatus"),
|
||||
market_placement_score=market.get("marketPlacementScore")
|
||||
or unit_data.get("marketPlacementScore"),
|
||||
)
|
||||
|
||||
|
||||
def parse_similar_units_json(response_data: dict) -> list[SimilarUnit]:
|
||||
units: list[SimilarUnit] = []
|
||||
for item in response_data.get("units", []):
|
||||
geometry = item.get("geometry", {})
|
||||
lon, lat = _extract_coordinates(geometry)
|
||||
specification = item.get("specification", {})
|
||||
market = item.get("marketData", {})
|
||||
units.append(
|
||||
SimilarUnit(
|
||||
unit_code=item.get("unitCode", ""),
|
||||
address=item.get("address"),
|
||||
lat=lat or item.get("lat"),
|
||||
lng=lon or item.get("lon"),
|
||||
property_type=specification.get("propertyType") or item.get("propertyType"),
|
||||
floor=specification.get("floor") or item.get("floor"),
|
||||
rooms=specification.get("rooms") or item.get("rooms"),
|
||||
construction_year=specification.get("constructionYear")
|
||||
or item.get("constructionYear"),
|
||||
usable_area=specification.get("usableArea") or item.get("usableArea"),
|
||||
listing_price=market.get("listingPrice") or item.get("listingPrice"),
|
||||
selling_price=market.get("sellingPrice") or item.get("sellingPrice"),
|
||||
shared_debt=market.get("jointDebt") or item.get("sharedDebt"),
|
||||
common_costs=market.get("monthlyCosts") or item.get("commonCosts"),
|
||||
sqm_price=market.get("squareMeterPrice") or item.get("squareMeterPrice"),
|
||||
days_on_market=market.get("daysOnMarket") or item.get("daysOnMarket"),
|
||||
sale_status=market.get("saleStatus") or item.get("saleStatus"),
|
||||
finalized_at=item.get("finalizedAt") or market.get("finalizedAt"),
|
||||
listing_status=item.get("listingStatus", "RECENTLY_SOLD"),
|
||||
)
|
||||
)
|
||||
return units
|
||||
|
||||
|
||||
def build_unit_vector(unit: EiendomUnit) -> str:
|
||||
"""Build a base64url-encoded unit_vector from EiendomUnit data."""
|
||||
payload = UnitVector(
|
||||
lon=unit.lng or 0.0,
|
||||
lat=unit.lat or 0.0,
|
||||
ptype=unit.property_type or "APARTMENT",
|
||||
floor=unit.floor,
|
||||
rooms=unit.rooms,
|
||||
built=unit.construction_year,
|
||||
area=unit.usable_area,
|
||||
price=unit.listing_price or unit.estimated_selling_price,
|
||||
)
|
||||
packed = msgpack.packb(payload.model_dump(), use_bin_type=True)
|
||||
encoded = base64.urlsafe_b64encode(packed).decode("utf-8").rstrip("=")
|
||||
return encoded
|
||||
|
||||
|
||||
def decode_unit_vector(vector_str: str) -> dict:
|
||||
"""Decode a base64url unit_vector for debugging."""
|
||||
padding = 4 - (len(vector_str) % 4)
|
||||
if padding != 4:
|
||||
vector_str += "=" * padding
|
||||
packed = base64.urlsafe_b64decode(vector_str.encode("utf-8"))
|
||||
return msgpack.unpackb(packed, raw=False)
|
||||
|
||||
|
||||
async def search_unit_from_finn_url(
|
||||
finn_url: str,
|
||||
client: HTTPClient | None = None,
|
||||
) -> EiendomUnit | None:
|
||||
if not EIENDOM_NO_ENABLED or not finn_url:
|
||||
logger.info("Eiendom.no unit search is disabled or finn_url is empty")
|
||||
return None
|
||||
|
||||
client = client or HTTPClient(
|
||||
base_url=EIENDOM_NO_BASE_URL,
|
||||
request_delay_seconds=EIENDOM_NO_REQUEST_DELAY_SECONDS,
|
||||
)
|
||||
response = await client.get(
|
||||
"/geodata/units/search/",
|
||||
params={"search": finn_url},
|
||||
)
|
||||
data = response.json()
|
||||
units = data.get("units", [])
|
||||
if not units:
|
||||
return None
|
||||
return parse_eiendom_unit_json(units[0])
|
||||
|
||||
|
||||
async def get_unit(
|
||||
unit_code: str,
|
||||
client: HTTPClient | None = None,
|
||||
) -> EiendomUnit | None:
|
||||
if not EIENDOM_NO_ENABLED:
|
||||
logger.info("Eiendom.no enrichment is disabled")
|
||||
return None
|
||||
|
||||
client = client or HTTPClient(
|
||||
base_url=EIENDOM_NO_BASE_URL,
|
||||
request_delay_seconds=EIENDOM_NO_REQUEST_DELAY_SECONDS,
|
||||
)
|
||||
path = f"/geodata/units/{unit_code}/"
|
||||
response = await client.get(path)
|
||||
data = response.json()
|
||||
units = data.get("units") or []
|
||||
if not units and isinstance(data, dict) and data.get("unitCode"):
|
||||
return parse_eiendom_unit_json(data)
|
||||
if not units:
|
||||
return None
|
||||
return parse_eiendom_unit_json(units[0])
|
||||
|
||||
|
||||
async def get_eiendom_unit(
|
||||
unit_code: str,
|
||||
client: HTTPClient | None = None,
|
||||
) -> EiendomUnit | None:
|
||||
return await get_unit(unit_code, client=client)
|
||||
|
||||
|
||||
async def get_similar_units(
|
||||
unit_vector: str,
|
||||
listing_status: str = EIENDOM_NO_SIMILAR_UNITS_DEFAULT_STATUS,
|
||||
client: HTTPClient | None = None,
|
||||
) -> list[SimilarUnit]:
|
||||
if not EIENDOM_NO_ENABLED:
|
||||
logger.info("Eiendom.no similar-units disabled")
|
||||
return []
|
||||
|
||||
client = client or HTTPClient(
|
||||
base_url=EIENDOM_NO_BASE_URL,
|
||||
request_delay_seconds=EIENDOM_NO_REQUEST_DELAY_SECONDS,
|
||||
)
|
||||
response = await client.get(
|
||||
"/geodata/units/similar/",
|
||||
params={"unit_vector": unit_vector},
|
||||
)
|
||||
data = response.json()
|
||||
units = parse_similar_units_json(data)
|
||||
|
||||
listing_status = (listing_status or "").upper()
|
||||
if listing_status == "RECENTLY_SOLD":
|
||||
units = [
|
||||
unit
|
||||
for unit in units
|
||||
if unit.sale_status and unit.sale_status.upper() == "SOLD" and unit.finalized_at
|
||||
]
|
||||
elif listing_status == "FOR_SALE":
|
||||
units = [
|
||||
unit for unit in units if unit.sale_status and unit.sale_status.upper() == "FORSALE"
|
||||
]
|
||||
|
||||
return units
|
||||
|
||||
|
||||
def resolve_unit_from_finn_url(finn_url: str) -> str | None:
|
||||
"""Resolve the FINN URL into a unit identifier or unitCode placeholder."""
|
||||
if not finn_url:
|
||||
return None
|
||||
candidate = normalize_finnkode(extract_finnkode_from_url(finn_url))
|
||||
if candidate:
|
||||
return candidate
|
||||
return None
|
||||
|
||||
|
||||
async def enrich_ad_with_eiendom_no(
|
||||
ad: Any,
|
||||
unit_code: str | None = None,
|
||||
client: HTTPClient | None = None,
|
||||
) -> EiendomUnit | None:
|
||||
if not unit_code:
|
||||
return None
|
||||
unit = await get_eiendom_unit(unit_code, client=client)
|
||||
if unit is None:
|
||||
return None
|
||||
unit.unit_vector = build_unit_vector(unit)
|
||||
return unit
|
||||
@@ -0,0 +1,122 @@
|
||||
"""HTTP client with retries, delays, and error handling."""
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
|
||||
import httpx
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class HTTPClient:
|
||||
"""HTTP client with configurable retries, delays, and timeout."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
base_url: str = "",
|
||||
user_agent: str = "personal-finn-eiendom-analyzer/0.1",
|
||||
request_delay_seconds: float = 0.0,
|
||||
retries: int = 1,
|
||||
timeout_seconds: float = 30.0,
|
||||
):
|
||||
"""
|
||||
Initialize HTTP client.
|
||||
|
||||
Args:
|
||||
base_url: Base URL for requests
|
||||
user_agent: User-Agent header value
|
||||
request_delay_seconds: Delay between requests (to be respectful)
|
||||
retries: Number of retry attempts for failed connections
|
||||
timeout_seconds: Request timeout
|
||||
"""
|
||||
self.base_url = base_url
|
||||
self.user_agent = user_agent
|
||||
self.request_delay_seconds = request_delay_seconds
|
||||
self.timeout = httpx.Timeout(timeout_seconds)
|
||||
self.transport = httpx.AsyncHTTPTransport(retries=retries)
|
||||
self.last_request_time: float | None = None
|
||||
|
||||
async def get(self, url: str, **kwargs) -> httpx.Response:
|
||||
"""
|
||||
Make async GET request with delay and error handling.
|
||||
|
||||
Args:
|
||||
url: URL to fetch
|
||||
**kwargs: Additional httpx arguments
|
||||
|
||||
Returns:
|
||||
httpx.Response
|
||||
|
||||
Raises:
|
||||
httpx.HTTPStatusError if status is 4xx or 5xx
|
||||
"""
|
||||
headers = kwargs.pop("headers", {})
|
||||
if "User-Agent" not in headers:
|
||||
headers["User-Agent"] = self.user_agent
|
||||
|
||||
for attempt in range(self._get_retries() + 1):
|
||||
await self._apply_delay()
|
||||
|
||||
async with httpx.AsyncClient(
|
||||
timeout=self.timeout,
|
||||
base_url=self.base_url if not url.startswith("http") else "",
|
||||
) as client:
|
||||
try:
|
||||
response = await client.get(url, headers=headers, **kwargs)
|
||||
if response.status_code < 500:
|
||||
response.raise_for_status()
|
||||
logger.debug(f"GET {url} -> {response.status_code}")
|
||||
return response
|
||||
if attempt < self._get_retries():
|
||||
await asyncio.sleep(2**attempt)
|
||||
continue
|
||||
response.raise_for_status()
|
||||
return response
|
||||
except httpx.HTTPStatusError as e:
|
||||
logger.error(f"HTTP {e.response.status_code} for {url}")
|
||||
raise
|
||||
except httpx.RequestError as e:
|
||||
logger.error(f"Request failed for {url}: {e}")
|
||||
raise
|
||||
|
||||
def _get_retries(self) -> int:
|
||||
"""Get retries count from transport."""
|
||||
if hasattr(self.transport, "_retries"):
|
||||
return self.transport._retries
|
||||
return 1
|
||||
|
||||
async def post(self, url: str, **kwargs) -> httpx.Response:
|
||||
"""Make async POST request with delay and error handling."""
|
||||
headers = kwargs.pop("headers", {})
|
||||
if "User-Agent" not in headers:
|
||||
headers["User-Agent"] = self.user_agent
|
||||
|
||||
for attempt in range(self._get_retries() + 1):
|
||||
await self._apply_delay()
|
||||
|
||||
async with httpx.AsyncClient(
|
||||
timeout=self.timeout,
|
||||
base_url=self.base_url if not url.startswith("http") else "",
|
||||
) as client:
|
||||
try:
|
||||
response = await client.post(url, headers=headers, **kwargs)
|
||||
if response.status_code < 500:
|
||||
response.raise_for_status()
|
||||
logger.debug(f"POST {url} -> {response.status_code}")
|
||||
return response
|
||||
if attempt < self._get_retries():
|
||||
await asyncio.sleep(2**attempt)
|
||||
continue
|
||||
response.raise_for_status()
|
||||
return response
|
||||
except httpx.HTTPStatusError as e:
|
||||
logger.error(f"HTTP {e.response.status_code} for {url}")
|
||||
raise
|
||||
except httpx.RequestError as e:
|
||||
logger.error(f"Request failed for {url}: {e}")
|
||||
raise
|
||||
|
||||
async def _apply_delay(self):
|
||||
"""Apply delay between requests if configured."""
|
||||
if self.request_delay_seconds > 0:
|
||||
await asyncio.sleep(self.request_delay_seconds)
|
||||
@@ -0,0 +1,160 @@
|
||||
"""FastMCP stdio server for FINN real estate analysis and Eiendom.no enrichment."""
|
||||
|
||||
import json
|
||||
import logging
|
||||
|
||||
from mcp.server.fastmcp import FastMCP
|
||||
|
||||
from .analysis import analyze_search
|
||||
from .eiendom_no import (
|
||||
build_unit_vector,
|
||||
decode_unit_vector,
|
||||
get_similar_units,
|
||||
get_unit,
|
||||
search_unit_from_finn_url,
|
||||
)
|
||||
from .service import get_or_fetch_ad, get_or_fetch_eiendom_unit
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
mcp = FastMCP("finn_eiendom_mcp")
|
||||
|
||||
|
||||
@mcp.tool(
|
||||
description=(
|
||||
"Analyze a FINN.no real estate search URL. Scrapes listing cards,"
|
||||
" fetches details, enriches with Eiendom.no data, scores, and ranks."
|
||||
)
|
||||
)
|
||||
async def finn_analyze_search(
|
||||
search_url: str,
|
||||
max_pages: int = 3,
|
||||
detail_limit: int = 20,
|
||||
include_details: bool = True,
|
||||
include_eiendom_no: bool = True,
|
||||
) -> str:
|
||||
"""Analyze a FINN search URL and return ranked listing results."""
|
||||
try:
|
||||
result = await analyze_search(
|
||||
search_url,
|
||||
max_pages=max_pages,
|
||||
fetch_details=include_details,
|
||||
detail_limit=detail_limit,
|
||||
include_eiendom_no=include_eiendom_no,
|
||||
)
|
||||
return json.dumps(result)
|
||||
except Exception as e:
|
||||
logger.error(f"Error analyzing search: {e}")
|
||||
return json.dumps({"error": True, "message": str(e)})
|
||||
|
||||
|
||||
@mcp.tool(
|
||||
description=(
|
||||
"Fetch full detail for a FINN listing by finnkode."
|
||||
" Checks cache first; use force_refresh=True to bypass."
|
||||
)
|
||||
)
|
||||
async def finn_get_ad(finnkode: str, force_refresh: bool = False) -> str:
|
||||
"""Fetch FINN ad details by finnkode."""
|
||||
try:
|
||||
ad = await get_or_fetch_ad(finnkode, force_refresh=force_refresh)
|
||||
return ad.model_dump_json()
|
||||
except Exception as e:
|
||||
logger.error(f"Error fetching ad {finnkode}: {e}")
|
||||
return json.dumps({"error": True, "message": str(e)})
|
||||
|
||||
|
||||
@mcp.tool(
|
||||
description="Resolve an Eiendom.no unit_code from a FINN listing URL. "
|
||||
"Returns unit_code, address, lat, lng or an error if not found."
|
||||
)
|
||||
async def finn_resolve_eiendom_unit(finn_url: str) -> str:
|
||||
"""Resolve Eiendom.no unit from FINN URL."""
|
||||
try:
|
||||
unit = await search_unit_from_finn_url(finn_url)
|
||||
if unit is None:
|
||||
return json.dumps(
|
||||
{
|
||||
"error": True,
|
||||
"message": "Eiendom.no unit could not be resolved from FINN URL",
|
||||
}
|
||||
)
|
||||
return json.dumps(
|
||||
{
|
||||
"unit_code": unit.unit_code,
|
||||
"address": unit.address,
|
||||
"lat": unit.lat,
|
||||
"lng": unit.lng,
|
||||
}
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Error resolving unit from {finn_url}: {e}")
|
||||
return json.dumps({"error": True, "message": str(e)})
|
||||
|
||||
|
||||
@mcp.tool(
|
||||
description="Fetch full Eiendom.no unit data by unit_code. Checks SQLite cache (24h TTL)."
|
||||
)
|
||||
async def finn_get_eiendom_unit(unit_code: str, force_refresh: bool = False) -> str:
|
||||
"""Fetch Eiendom.no unit details by unit_code."""
|
||||
try:
|
||||
unit = await get_or_fetch_eiendom_unit(unit_code, force_refresh=force_refresh)
|
||||
if unit is None:
|
||||
return json.dumps({"error": True, "message": "Eiendom.no unit not found"})
|
||||
return unit.model_dump_json()
|
||||
except Exception as e:
|
||||
logger.error(f"Error fetching unit {unit_code}: {e}")
|
||||
return json.dumps({"error": True, "message": str(e)})
|
||||
|
||||
|
||||
@mcp.tool(
|
||||
description="Fetch comparable recently-sold or for-sale units from Eiendom.no using a "
|
||||
"base64-encoded unit vector. Returns list of similar units with sale prices."
|
||||
)
|
||||
async def finn_get_similar_units(unit_vector: str, listing_status: str = "RECENTLY_SOLD") -> str:
|
||||
"""Fetch similar units from Eiendom.no."""
|
||||
try:
|
||||
units = await get_similar_units(unit_vector, listing_status)
|
||||
return json.dumps([unit.model_dump() for unit in units])
|
||||
except Exception as e:
|
||||
logger.error(f"Error fetching similar units: {e}")
|
||||
return json.dumps({"error": True, "message": str(e)})
|
||||
|
||||
|
||||
@mcp.tool(
|
||||
description="Build a base64-encoded unit vector for a given Eiendom.no unit_code. "
|
||||
"The vector is used as input to finn_get_similar_units."
|
||||
)
|
||||
async def finn_build_unit_vector(unit_code: str) -> str:
|
||||
"""Build unit vector for Eiendom.no unit."""
|
||||
try:
|
||||
unit = await get_unit(unit_code)
|
||||
if unit is None:
|
||||
return json.dumps({"error": True, "message": "Eiendom.no unit not found"})
|
||||
return json.dumps({"unit_code": unit.unit_code, "unit_vector": build_unit_vector(unit)})
|
||||
except Exception as e:
|
||||
logger.error(f"Error building unit vector for {unit_code}: {e}")
|
||||
return json.dumps({"error": True, "message": str(e)})
|
||||
|
||||
|
||||
@mcp.tool(
|
||||
description="Decode a base64 unit vector into human-readable JSON (lat, lon, property type, "
|
||||
"floor, rooms, construction year, area, price)."
|
||||
)
|
||||
def finn_decode_unit_vector(unit_vector: str) -> str:
|
||||
"""Decode unit vector to readable format."""
|
||||
try:
|
||||
result = decode_unit_vector(unit_vector)
|
||||
return json.dumps(result)
|
||||
except Exception as e:
|
||||
logger.error(f"Error decoding unit vector: {e}")
|
||||
return json.dumps({"error": True, "message": str(e)})
|
||||
|
||||
|
||||
def main() -> None:
|
||||
"""Run the FastMCP stdio server."""
|
||||
mcp.run(transport="stdio")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,128 @@
|
||||
"""Pydantic models for FINN ads and Eiendom.no units."""
|
||||
|
||||
from datetime import UTC, datetime
|
||||
|
||||
from pydantic import BaseModel, ConfigDict, Field
|
||||
|
||||
|
||||
class FinnSearchCard(BaseModel):
|
||||
"""FINN search result card (minimal fields from search listing)."""
|
||||
|
||||
finnkode: str
|
||||
url: str
|
||||
title: str | None = None
|
||||
address: str | None = None
|
||||
area_m2: int | None = None
|
||||
asking_price: int | None = None
|
||||
total_price: int | None = None
|
||||
common_costs: int | None = None
|
||||
property_type: str | None = None
|
||||
ownership_type: str | None = None
|
||||
bedrooms: int | None = None
|
||||
floor: str | None = None
|
||||
broker_company: str | None = None
|
||||
|
||||
|
||||
class FinnAd(BaseModel):
|
||||
"""FINN listing detail with all available fields."""
|
||||
|
||||
finnkode: str
|
||||
url: str
|
||||
title: str | None = None
|
||||
address: str | None = None
|
||||
postal_area: str | None = None
|
||||
district: str | None = None
|
||||
property_type: str | None = None
|
||||
ownership_type: str | None = None
|
||||
asking_price: int | None = None
|
||||
total_price: int | None = None
|
||||
shared_debt: int | None = None
|
||||
common_costs: int | None = None
|
||||
municipal_fee: int | None = None
|
||||
other_fees: int | None = None
|
||||
area_m2: int | None = None
|
||||
rooms: int | None = None
|
||||
bedrooms: int | None = None
|
||||
floor: str | None = None
|
||||
construction_year: int | None = None
|
||||
energy_rating: str | None = None
|
||||
heating: str | None = None
|
||||
has_balcony: bool | None = None
|
||||
has_terrace: bool | None = None
|
||||
has_elevator: bool | None = None
|
||||
has_parking: bool | None = None
|
||||
has_garage: bool | None = None
|
||||
listing_description: str | None = None
|
||||
broker_name: str | None = None
|
||||
broker_company: str | None = None
|
||||
first_seen_at: datetime = Field(default_factory=lambda: datetime.now(UTC))
|
||||
last_seen_at: datetime = Field(default_factory=lambda: datetime.now(UTC))
|
||||
detail_fetched_at: datetime | None = None
|
||||
eiendom_unit_code: str | None = None
|
||||
|
||||
model_config = ConfigDict(serializers={datetime: lambda v: v.isoformat()})
|
||||
|
||||
|
||||
class EiendomUnit(BaseModel):
|
||||
"""Eiendom.no unit detail with market data."""
|
||||
|
||||
unit_code: str
|
||||
address: str | None = None
|
||||
lat: float | None = None
|
||||
lng: float | None = None
|
||||
property_type: str | None = None
|
||||
floor: int | None = None
|
||||
rooms: int | None = None
|
||||
construction_year: int | None = None
|
||||
usable_area: int | None = None
|
||||
estimated_selling_price: int | None = None
|
||||
estimated_selling_price_lower: int | None = None
|
||||
estimated_selling_price_upper: int | None = None
|
||||
listing_price: int | None = None
|
||||
listing_sqm_price: int | None = None
|
||||
common_costs: int | None = None
|
||||
days_on_market: int | None = None
|
||||
sale_status: str | None = None
|
||||
market_placement_score: str | None = None
|
||||
unit_vector: str | None = None
|
||||
fetched_at: datetime = Field(default_factory=lambda: datetime.now(UTC))
|
||||
|
||||
model_config = ConfigDict(serializers={datetime: lambda v: v.isoformat()})
|
||||
|
||||
|
||||
class SimilarUnit(BaseModel):
|
||||
"""Eiendom.no similar unit (comp) result."""
|
||||
|
||||
unit_code: str
|
||||
address: str | None = None
|
||||
lat: float | None = None
|
||||
lng: float | None = None
|
||||
property_type: str | None = None
|
||||
floor: int | None = None
|
||||
rooms: int | None = None
|
||||
construction_year: int | None = None
|
||||
usable_area: int | None = None
|
||||
listing_price: int | None = None
|
||||
selling_price: int | None = None
|
||||
shared_debt: int | None = None
|
||||
common_costs: int | None = None
|
||||
sqm_price: int | None = None
|
||||
days_on_market: int | None = None
|
||||
sale_status: str | None = None
|
||||
finalized_at: datetime | None = None
|
||||
listing_status: str = Field(default="RECENTLY_SOLD")
|
||||
|
||||
model_config = ConfigDict(serializers={datetime: lambda v: v.isoformat() if v else None})
|
||||
|
||||
|
||||
class UnitVector(BaseModel):
|
||||
"""Unit vector payload for similar-units API."""
|
||||
|
||||
lon: float
|
||||
lat: float
|
||||
ptype: str # property type: APARTMENT, HOUSE, etc.
|
||||
floor: int | None = None
|
||||
rooms: int | None = None
|
||||
built: int | None = None # construction year
|
||||
area: int | None = None # usable area
|
||||
price: int | None = None # listing or estimated price
|
||||
@@ -0,0 +1,88 @@
|
||||
"""Normalization and parsing helpers."""
|
||||
|
||||
import re
|
||||
|
||||
|
||||
def normalize_price(price_str: str | None) -> int | None:
|
||||
"""
|
||||
Normalize Norwegian formatted price to integer.
|
||||
Example: "7 200 991 kr" -> 7200991
|
||||
"""
|
||||
if not price_str:
|
||||
return None
|
||||
# Remove "kr" and spaces, keep only digits
|
||||
normalized = re.sub(r"[^\d]", "", price_str)
|
||||
try:
|
||||
return int(normalized) if normalized else None
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
|
||||
def normalize_area(area_str: str | None) -> int | None:
|
||||
"""
|
||||
Normalize area string to integer.
|
||||
Example: "77 m²" -> 77
|
||||
"""
|
||||
if not area_str:
|
||||
return None
|
||||
cleaned = area_str.replace(" ", "")
|
||||
match = re.search(r"(\d+(?:[.,]\d+)?)", cleaned)
|
||||
if match:
|
||||
value = match.group(1).replace(",", ".")
|
||||
try:
|
||||
return int(float(value))
|
||||
except ValueError:
|
||||
return None
|
||||
return None
|
||||
|
||||
|
||||
def normalize_number(num_str: str | None) -> int | None:
|
||||
"""
|
||||
Normalize Norwegian formatted number to integer.
|
||||
Handles text like "3 500 kr/mnd" and "7,2".
|
||||
"""
|
||||
if not num_str:
|
||||
return None
|
||||
cleaned = re.sub(r"[^\d,\.]", "", num_str)
|
||||
cleaned = cleaned.replace(" ", "")
|
||||
if "," in cleaned:
|
||||
cleaned = cleaned.replace(".", "").replace(",", ".")
|
||||
else:
|
||||
cleaned = cleaned.replace(".", "")
|
||||
try:
|
||||
return int(float(cleaned)) if cleaned else None
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
|
||||
def normalize_finnkode(finnkode: str | None) -> str | None:
|
||||
"""Normalize finnkode to string, strip whitespace."""
|
||||
if not finnkode:
|
||||
return None
|
||||
return str(finnkode).strip()
|
||||
|
||||
|
||||
def extract_finnkode_from_url(url: str) -> str | None:
|
||||
"""
|
||||
Extract finnkode from FINN URL.
|
||||
Example: https://www.finn.no/realestate/homes/ad.html?finnkode=462400360 -> 462400360
|
||||
"""
|
||||
match = re.search(r"finnkode=(\d+)", url)
|
||||
if match:
|
||||
return match.group(1)
|
||||
return None
|
||||
|
||||
|
||||
def text_to_bool(text: str | None) -> bool:
|
||||
"""Convert text to boolean."""
|
||||
if not text:
|
||||
return False
|
||||
return text.lower() in ("ja", "yes", "true", "1", "y")
|
||||
|
||||
|
||||
def clean_text(text: str | None) -> str | None:
|
||||
"""Clean and normalize text: strip, collapse whitespace."""
|
||||
if not text:
|
||||
return None
|
||||
cleaned = " ".join(text.split())
|
||||
return cleaned if cleaned else None
|
||||
@@ -0,0 +1,146 @@
|
||||
"""Scoring engine for FINN listings enriched with Eiendom.no data."""
|
||||
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
from .models import EiendomUnit, SimilarUnit
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _clamp(value: float, min_value: float, max_value: float) -> float:
|
||||
return max(min_value, min(max_value, value))
|
||||
|
||||
|
||||
def score_market_position(unit: EiendomUnit | None) -> float:
|
||||
if unit is None or unit.estimated_selling_price is None or unit.listing_price is None:
|
||||
return 0.0
|
||||
ratio = unit.listing_price / unit.estimated_selling_price
|
||||
if ratio <= 0.9:
|
||||
return 20.0
|
||||
if ratio <= 1.0:
|
||||
return 16.0 + (1.0 - ratio) * 40.0
|
||||
if ratio <= 1.1:
|
||||
return 12.0 - (ratio - 1.0) * 40.0
|
||||
return 5.0
|
||||
|
||||
|
||||
def score_economy(ad: Any, unit: EiendomUnit | None) -> float:
|
||||
if ad.total_price is None:
|
||||
return 0.0
|
||||
if unit and unit.estimated_selling_price:
|
||||
ratio = ad.total_price / unit.estimated_selling_price
|
||||
if ratio <= 0.95:
|
||||
return 20.0
|
||||
if ratio <= 1.0:
|
||||
return 15.0
|
||||
if ratio <= 1.05:
|
||||
return 10.0
|
||||
return 6.0
|
||||
if ad.asking_price and ad.total_price <= ad.asking_price:
|
||||
return 12.0
|
||||
return 8.0
|
||||
|
||||
|
||||
def score_comparable_sales(listings: list[SimilarUnit], listing_price: int | None) -> float:
|
||||
if not listings or listing_price is None:
|
||||
return 0.0
|
||||
selling_prices = [unit.selling_price for unit in listings if unit.selling_price]
|
||||
if not selling_prices:
|
||||
return 0.0
|
||||
average = sum(selling_prices) / len(selling_prices)
|
||||
ratio = listing_price / average
|
||||
score = (1.0 - abs(ratio - 1.0)) * 20.0
|
||||
return float(_clamp(score, 0.0, 20.0))
|
||||
|
||||
|
||||
def score_location(address: str | None, district: str | None) -> float:
|
||||
if not address and not district:
|
||||
return 0.0
|
||||
if district and "oslo" in district.lower():
|
||||
return 15.0
|
||||
if address and "oslo" in address.lower():
|
||||
return 12.0
|
||||
return 7.0
|
||||
|
||||
|
||||
def score_layout_and_potential(description: str | None, rooms: int | None) -> float:
|
||||
score = 0.0
|
||||
if rooms and rooms >= 4:
|
||||
score += 10.0
|
||||
if description and "potensial" in description.lower():
|
||||
score += 8.0
|
||||
return float(_clamp(score, 0.0, 20.0))
|
||||
|
||||
|
||||
def score_outdoor_and_view(description: str | None) -> float:
|
||||
if not description:
|
||||
return 0.0
|
||||
score = 5.0 if "utsikt" in description.lower() or "balkong" in description.lower() else 0.0
|
||||
return float(_clamp(score, 0.0, 15.0))
|
||||
|
||||
|
||||
def score_rental_potential(description: str | None) -> float:
|
||||
if not description:
|
||||
return 0.0
|
||||
score = 10.0 if "hybel" in description.lower() or "leie" in description.lower() else 0.0
|
||||
return score
|
||||
|
||||
|
||||
def score_renovation_upside(description: str | None, asking_price: int | None) -> float:
|
||||
score = 0.0
|
||||
if description and "renover" in description.lower():
|
||||
score += 10.0
|
||||
if asking_price and asking_price > 0:
|
||||
score += 5.0
|
||||
return float(_clamp(score, 0.0, 15.0))
|
||||
|
||||
|
||||
def score_risk(description: str | None, unit: EiendomUnit | None) -> float:
|
||||
if unit is None:
|
||||
return -10.0
|
||||
if description and "usikker" in description.lower():
|
||||
return -10.0
|
||||
return 0.0
|
||||
|
||||
|
||||
def score_ad(
|
||||
ad: Any, unit: EiendomUnit | None, similar_units: list[SimilarUnit]
|
||||
) -> dict[str, float]:
|
||||
scores = {
|
||||
"economy": score_economy(ad, unit),
|
||||
"market_position": score_market_position(unit),
|
||||
"comparable_sales": score_comparable_sales(
|
||||
similar_units, ad.total_price or ad.asking_price
|
||||
),
|
||||
"location": score_location(ad.address, ad.district),
|
||||
"layout": score_layout_and_potential(ad.listing_description, ad.rooms),
|
||||
"outdoor": score_outdoor_and_view(ad.listing_description),
|
||||
"rental_potential": score_rental_potential(ad.listing_description),
|
||||
"renovation": score_renovation_upside(ad.listing_description, ad.asking_price),
|
||||
"risk": score_risk(ad.listing_description, unit),
|
||||
}
|
||||
scores["total"] = float(_clamp(sum(scores.values()), 0.0, 100.0))
|
||||
return scores
|
||||
|
||||
|
||||
def classify_ad(scores: dict[str, float]) -> list[str]:
|
||||
categories: list[str] = []
|
||||
total = scores.get("total", 0.0)
|
||||
if total >= 70:
|
||||
categories.append("bargain_candidate")
|
||||
if total >= 60:
|
||||
categories.append("safe_candidate")
|
||||
if 50 <= total < 70:
|
||||
categories.append("lifestyle_candidate")
|
||||
if scores.get("renovation", 0.0) >= 8:
|
||||
categories.append("renovation_candidate")
|
||||
if scores.get("rental_potential", 0.0) >= 5:
|
||||
categories.append("hybel_candidate")
|
||||
if scores.get("risk", 0.0) < 0:
|
||||
categories.append("risk_object")
|
||||
if total < 30:
|
||||
categories.append("not_interesting")
|
||||
if 30 <= total < 60:
|
||||
categories.append("manual_review_required")
|
||||
return categories
|
||||
@@ -0,0 +1,194 @@
|
||||
"""FINN search scraping and parsing."""
|
||||
|
||||
import logging
|
||||
import re
|
||||
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
from . import cache
|
||||
from .config import FINN_CACHE_TTL_SEARCH_MINUTES
|
||||
from .http import HTTPClient
|
||||
from .models import FinnSearchCard
|
||||
from .parser import (
|
||||
clean_text,
|
||||
extract_finnkode_from_url,
|
||||
normalize_area,
|
||||
normalize_finnkode,
|
||||
normalize_number,
|
||||
normalize_price,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
async def fetch_search_page(url: str, client: HTTPClient | None = None) -> str:
|
||||
"""Fetch a FINN search page HTML."""
|
||||
client = client or HTTPClient(request_delay_seconds=0.0)
|
||||
response = await client.get(url)
|
||||
return response.text
|
||||
|
||||
|
||||
async def fetch_search_page_cached(
|
||||
url: str,
|
||||
client: HTTPClient | None = None,
|
||||
conn: cache.sqlite3.Connection | None = None,
|
||||
use_cache: bool = True,
|
||||
) -> str:
|
||||
"""Fetch a FINN search page with optional SQLite caching."""
|
||||
client = client or HTTPClient(request_delay_seconds=0.0)
|
||||
conn = conn or cache.init_db()
|
||||
if use_cache:
|
||||
cached_html = cache.get_search_page(conn, url)
|
||||
if cached_html:
|
||||
logger.debug("Using cached search page: %s", url)
|
||||
return cached_html
|
||||
|
||||
html = await fetch_search_page(url, client=client)
|
||||
cache.save_search_page(conn, url, html, ttl_minutes=FINN_CACHE_TTL_SEARCH_MINUTES)
|
||||
return html
|
||||
|
||||
|
||||
def extract_ad_links(html: str) -> list[str]:
|
||||
"""Extract listing URLs from FINN search HTML."""
|
||||
soup = BeautifulSoup(html, "html.parser")
|
||||
links = []
|
||||
for article in soup.select("article.listing-card, article.sf-search-ad"):
|
||||
anchor = article.select_one("a[href*='finnkode']")
|
||||
if anchor and anchor.get("href"):
|
||||
links.append(clean_text(anchor.get("href")) or "")
|
||||
return links
|
||||
|
||||
|
||||
def _extract_int_from_text(text: str, pattern: str) -> int | None:
|
||||
match = re.search(pattern, text, re.I)
|
||||
if match:
|
||||
return normalize_number(match.group(1))
|
||||
return None
|
||||
|
||||
|
||||
def _extract_area_from_text(text: str) -> int | None:
|
||||
matches = re.findall(r"(\d+(?:[.,]\d+)?)\s*(?:m²|m2|kvm)", text, re.I)
|
||||
if matches:
|
||||
return normalize_area(matches[-1])
|
||||
return None
|
||||
|
||||
|
||||
def _extract_price_from_text(text: str, label: str) -> int | None:
|
||||
pattern = rf"{label}[:\s]*([\d\s]+kr)"
|
||||
match = re.search(pattern, text, re.I)
|
||||
if match:
|
||||
return normalize_price(match.group(1))
|
||||
return None
|
||||
|
||||
|
||||
def extract_search_cards(html: str) -> list[FinnSearchCard]:
|
||||
"""Parse FINN search HTML and return a list of FinnSearchCard objects."""
|
||||
logger.debug("Extracting FINN search cards")
|
||||
soup = BeautifulSoup(html, "html.parser")
|
||||
cards: list[FinnSearchCard] = []
|
||||
|
||||
for card in soup.select("article.listing-card, article.sf-search-ad"):
|
||||
data_id = card.get("data-id")
|
||||
anchor = card.select_one("a[href*='finnkode']")
|
||||
url = anchor.get("href") if anchor else ""
|
||||
finnkode = normalize_finnkode(data_id or extract_finnkode_from_url(url))
|
||||
if not finnkode:
|
||||
logger.debug("Skipping card with missing finnkode")
|
||||
continue
|
||||
|
||||
title_elem = card.select_one(".title, h2.sf-realestate-heading, a.sf-search-ad-link")
|
||||
address_elem = card.select_one(".location, .sf-realestate-location")
|
||||
area_elem = card.select_one(".area")
|
||||
price_elem = card.select_one(".price")
|
||||
common_costs_elem = card.select_one(".common-costs")
|
||||
bedrooms_elem = card.select_one(".bedrooms")
|
||||
property_type_elem = card.select_one(".property-type")
|
||||
ownership_type_elem = card.select_one(".ownership-type")
|
||||
broker_elem = card.select_one(".broker-company")
|
||||
|
||||
card_text = clean_text(card.get_text(" ") or "")
|
||||
|
||||
bedrooms = None
|
||||
if bedrooms_elem:
|
||||
bedrooms = normalize_number(bedrooms_elem.get_text())
|
||||
elif card_text:
|
||||
bedrooms = _extract_int_from_text(card_text, r"(\d+)\s*soverom")
|
||||
|
||||
common_costs = None
|
||||
if common_costs_elem:
|
||||
common_costs = normalize_number(common_costs_elem.get_text())
|
||||
elif card_text:
|
||||
common_costs = _extract_int_from_text(
|
||||
card_text, r"(?:Fellesutg|Felleskost(?:er)?)[^\d]*(\d+[\d\s]*)kr"
|
||||
)
|
||||
|
||||
total_price = None
|
||||
if price_elem:
|
||||
total_price = normalize_price(price_elem.get_text())
|
||||
if not total_price and card_text:
|
||||
total_price = _extract_price_from_text(card_text, r"Totalpris")
|
||||
if not total_price and card_text:
|
||||
first_price_match = re.search(r"([\d\s]+kr)", card_text)
|
||||
if first_price_match:
|
||||
total_price = normalize_price(first_price_match.group(1))
|
||||
|
||||
area_m2 = None
|
||||
if area_elem:
|
||||
area_m2 = normalize_area(area_elem.get_text())
|
||||
elif card_text:
|
||||
area_m2 = _extract_area_from_text(card_text)
|
||||
|
||||
card_data = FinnSearchCard(
|
||||
finnkode=finnkode,
|
||||
url=url or "",
|
||||
title=clean_text(title_elem.get_text()) if title_elem else None,
|
||||
address=clean_text(address_elem.get_text()) if address_elem else None,
|
||||
area_m2=area_m2,
|
||||
asking_price=None,
|
||||
total_price=total_price,
|
||||
common_costs=common_costs,
|
||||
property_type=clean_text(property_type_elem.get_text()) if property_type_elem else None,
|
||||
ownership_type=clean_text(ownership_type_elem.get_text())
|
||||
if ownership_type_elem
|
||||
else None,
|
||||
bedrooms=bedrooms,
|
||||
floor=None,
|
||||
broker_company=clean_text(broker_elem.get_text()) if broker_elem else None,
|
||||
)
|
||||
cards.append(card_data)
|
||||
logger.debug("Parsed FINN search card %s", finnkode)
|
||||
|
||||
return cards
|
||||
|
||||
|
||||
def find_next_page_url(html: str) -> str | None:
|
||||
"""Return the FINN search next page URL if present."""
|
||||
soup = BeautifulSoup(html, "html.parser")
|
||||
next_link = soup.select_one("a[rel='next']")
|
||||
if next_link and next_link.get("href"):
|
||||
return clean_text(next_link.get("href"))
|
||||
return None
|
||||
|
||||
|
||||
async def fetch_search_pages(
|
||||
start_url: str,
|
||||
max_pages: int = 1,
|
||||
client: HTTPClient | None = None,
|
||||
use_cache: bool = True,
|
||||
) -> list[FinnSearchCard]:
|
||||
"""Fetch paginated FINN search pages and parse search cards."""
|
||||
client = client or HTTPClient(request_delay_seconds=0.0)
|
||||
conn = cache.init_db()
|
||||
url = start_url
|
||||
all_cards: list[FinnSearchCard] = []
|
||||
|
||||
for _ in range(max_pages):
|
||||
html = await fetch_search_page_cached(url, client=client, conn=conn, use_cache=use_cache)
|
||||
all_cards.extend(extract_search_cards(html))
|
||||
next_url = find_next_page_url(html)
|
||||
if not next_url:
|
||||
break
|
||||
url = next_url
|
||||
logger.debug("Following next page link: %s", url)
|
||||
|
||||
return all_cards
|
||||
@@ -0,0 +1,35 @@
|
||||
"""Service layer for cache-aware fetching of FINN ads and Eiendom.no units."""
|
||||
|
||||
import logging
|
||||
|
||||
from .ad import fetch_ad_details
|
||||
from .cache import get_eiendom_unit as get_cached_eiendom_unit
|
||||
from .cache import get_finn_ad, init_db, save_eiendom_unit, save_finn_ad
|
||||
from .config import FINN_CACHE_PATH
|
||||
from .eiendom_no import get_unit
|
||||
from .models import EiendomUnit, FinnAd
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
async def get_or_fetch_ad(finnkode: str, force_refresh: bool = False) -> FinnAd:
|
||||
"""Get FinnAd from cache or fetch fresh. Never returns None."""
|
||||
conn = init_db(FINN_CACHE_PATH)
|
||||
ad = None if force_refresh else get_finn_ad(conn, finnkode, ttl_hours=24)
|
||||
if ad is None:
|
||||
ad = await fetch_ad_details(finnkode)
|
||||
save_finn_ad(conn, ad)
|
||||
return ad
|
||||
|
||||
|
||||
async def get_or_fetch_eiendom_unit(
|
||||
unit_code: str, force_refresh: bool = False
|
||||
) -> EiendomUnit | None:
|
||||
"""Get EiendomUnit from cache or fetch fresh."""
|
||||
conn = init_db(FINN_CACHE_PATH)
|
||||
unit = None if force_refresh else get_cached_eiendom_unit(conn, unit_code, ttl_hours=24)
|
||||
if unit is None:
|
||||
unit = await get_unit(unit_code)
|
||||
if unit is not None:
|
||||
save_eiendom_unit(conn, unit)
|
||||
return unit
|
||||
Reference in New Issue
Block a user