Files
2026-05-16 06:54:17 +00:00

123 lines
4.4 KiB
Python

"""HTTP client with retries, delays, and error handling."""
import asyncio
import logging
import httpx
logger = logging.getLogger(__name__)
class HTTPClient:
"""HTTP client with configurable retries, delays, and timeout."""
def __init__(
self,
base_url: str = "",
user_agent: str = "personal-finn-eiendom-analyzer/0.1",
request_delay_seconds: float = 0.0,
retries: int = 1,
timeout_seconds: float = 30.0,
):
"""
Initialize HTTP client.
Args:
base_url: Base URL for requests
user_agent: User-Agent header value
request_delay_seconds: Delay between requests (to be respectful)
retries: Number of retry attempts for failed connections
timeout_seconds: Request timeout
"""
self.base_url = base_url
self.user_agent = user_agent
self.request_delay_seconds = request_delay_seconds
self.timeout = httpx.Timeout(timeout_seconds)
self.transport = httpx.AsyncHTTPTransport(retries=retries)
self.last_request_time: float | None = None
async def get(self, url: str, **kwargs) -> httpx.Response:
"""
Make async GET request with delay and error handling.
Args:
url: URL to fetch
**kwargs: Additional httpx arguments
Returns:
httpx.Response
Raises:
httpx.HTTPStatusError if status is 4xx or 5xx
"""
headers = kwargs.pop("headers", {})
if "User-Agent" not in headers:
headers["User-Agent"] = self.user_agent
for attempt in range(self._get_retries() + 1):
await self._apply_delay()
async with httpx.AsyncClient(
timeout=self.timeout,
base_url=self.base_url if not url.startswith("http") else "",
) as client:
try:
response = await client.get(url, headers=headers, **kwargs)
if response.status_code < 500:
response.raise_for_status()
logger.debug(f"GET {url} -> {response.status_code}")
return response
if attempt < self._get_retries():
await asyncio.sleep(2**attempt)
continue
response.raise_for_status()
return response
except httpx.HTTPStatusError as e:
logger.error(f"HTTP {e.response.status_code} for {url}")
raise
except httpx.RequestError as e:
logger.error(f"Request failed for {url}: {e}")
raise
def _get_retries(self) -> int:
"""Get retries count from transport."""
if hasattr(self.transport, "_retries"):
return self.transport._retries
return 1
async def post(self, url: str, **kwargs) -> httpx.Response:
"""Make async POST request with delay and error handling."""
headers = kwargs.pop("headers", {})
if "User-Agent" not in headers:
headers["User-Agent"] = self.user_agent
for attempt in range(self._get_retries() + 1):
await self._apply_delay()
async with httpx.AsyncClient(
timeout=self.timeout,
base_url=self.base_url if not url.startswith("http") else "",
) as client:
try:
response = await client.post(url, headers=headers, **kwargs)
if response.status_code < 500:
response.raise_for_status()
logger.debug(f"POST {url} -> {response.status_code}")
return response
if attempt < self._get_retries():
await asyncio.sleep(2**attempt)
continue
response.raise_for_status()
return response
except httpx.HTTPStatusError as e:
logger.error(f"HTTP {e.response.status_code} for {url}")
raise
except httpx.RequestError as e:
logger.error(f"Request failed for {url}: {e}")
raise
async def _apply_delay(self):
"""Apply delay between requests if configured."""
if self.request_delay_seconds > 0:
await asyncio.sleep(self.request_delay_seconds)