55d93894ac
feat(scripts): Add backfill script for content_hash in cache tables feat(scripts): Create recompute script for analysis_cache population test(tests): Implement comprehensive tests for analysis module functions fix(tests): Update CLI tests to assert errors on stderr instead of stdout fix(tests): Adjust MCP integration tests to pass context parameter correctly fix(tests): Modify service tests to return hash on save functions for consistency
69 lines
1.8 KiB
Python
69 lines
1.8 KiB
Python
"""User feedback storage and retrieval for listed properties."""
|
|
|
|
import logging
|
|
from typing import Any
|
|
|
|
from .cache import delete_feedback as cache_delete_feedback
|
|
from .cache import get_feedback as cache_get_feedback
|
|
from .cache import get_feedback_by_verdict
|
|
from .cache import init_db
|
|
from .cache import save_feedback as cache_save_feedback
|
|
from .config import FINN_CACHE_PATH
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def save_feedback(finnkode: str, verdict: str, notes: str | None = None) -> dict[str, Any]:
|
|
"""Store user feedback/verdict for a FINN listing.
|
|
|
|
Args:
|
|
finnkode: FINN property ID
|
|
verdict: User verdict (e.g., "liked", "disliked", "shortlisted")
|
|
notes: Optional free-text notes
|
|
|
|
Returns:
|
|
Dict with saved feedback details
|
|
"""
|
|
conn = init_db(FINN_CACHE_PATH)
|
|
return cache_save_feedback(conn, finnkode, verdict, notes)
|
|
|
|
|
|
def get_feedback(finnkode: str) -> dict[str, Any] | None:
|
|
"""Retrieve stored feedback for a FINN listing.
|
|
|
|
Args:
|
|
finnkode: FINN property ID
|
|
|
|
Returns:
|
|
Feedback dict if exists, else None
|
|
"""
|
|
conn = init_db(FINN_CACHE_PATH)
|
|
return cache_get_feedback(conn, finnkode)
|
|
|
|
|
|
def get_feedback_by_verdict_impl(verdict: str, limit: int = 100) -> list[dict[str, Any]]:
|
|
"""Retrieve all stored feedback with a given verdict.
|
|
|
|
Args:
|
|
verdict: Verdict to filter by
|
|
limit: Max results to return
|
|
|
|
Returns:
|
|
List of feedback dicts
|
|
"""
|
|
conn = init_db(FINN_CACHE_PATH)
|
|
return get_feedback_by_verdict(conn, verdict, limit=limit)
|
|
|
|
|
|
def delete_feedback(finnkode: str) -> dict[str, Any]:
|
|
"""Delete stored feedback for a FINN listing.
|
|
|
|
Args:
|
|
finnkode: FINN property ID
|
|
|
|
Returns:
|
|
Status dict
|
|
"""
|
|
conn = init_db(FINN_CACHE_PATH)
|
|
return cache_delete_feedback(conn, finnkode)
|