"""
Base client with silent 3-tier fallback strategy
"""
import requests
import warnings
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Union, Any
from abc import ABC, abstractmethod
from ..core.search import STACSearch
try:
import pystac_client
import planetary_computer
PYSTAC_AVAILABLE = True
except ImportError:
PYSTAC_AVAILABLE = False
[docs]
class BaseSTACClient(ABC):
"""Abstract base class with improved error handling."""
[docs]
def __init__(self, base_url: str, provider_name: str, verbose: bool = False):
self.base_url = base_url
self.search_url = f"{base_url}/search"
self.provider_name = provider_name
self.verbose = verbose
self.collections = self._fetch_collections()
self._collection_details = {}
def _fetch_collections(self):
"""Fetch all collections from the STAC API."""
url = f"{self.base_url}/collections"
try:
response = requests.get(url)
response.raise_for_status()
data = response.json()
collections = data.get('collections', [])
return {col['id']: f"{self.base_url}/collections/{col['id']}" for col in collections}
except requests.RequestException as e:
# 🔥 FIXED: Better error handling for mocking
verbose = getattr(self, 'verbose', False) # Safe attribute access
if verbose:
print(f"Error fetching collections: {e}")
return {}
[docs]
def list_collections(self):
"""Return a list of all available collection names."""
return sorted(list(self.collections.keys()))
[docs]
def search_collections(self, keyword):
"""Search for collections containing a specific keyword."""
keyword = keyword.lower()
return [col for col in self.collections.keys() if keyword in col.lower()]
[docs]
def get_collection_info(self, collection_name):
"""Get detailed information about a specific collection."""
if collection_name not in self.collections:
return None
if collection_name not in self._collection_details:
try:
response = requests.get(self.collections[collection_name])
response.raise_for_status()
self._collection_details[collection_name] = response.json()
except requests.RequestException as e:
if self.verbose:
print(f"Error fetching collection details: {e}")
return None
return self._collection_details[collection_name]
[docs]
def collections_title(self):
"""
Get dictionary of collection IDs mapped to their titles.
Returns
-------
dict
Dictionary with collection IDs as keys and titles as values
"""
collections_dict = {}
collection_ids = self.list_collections()
for coll_id in collection_ids:
try:
coll_info = self.get_collection_info(coll_id)
if coll_info:
title = coll_info.get('title', 'No title')
collections_dict[coll_id] = title
else:
collections_dict[coll_id] = 'No title'
except Exception as e:
if self.verbose:
print("Warning: Failed to get title for collection {}: {}".format(coll_id, e))
collections_dict[coll_id] = 'Error fetching title'
return collections_dict
[docs]
def available_collections(self):
"""
Get the full JSON response from the collections endpoint without modifications.
Returns
-------
dict
Complete JSON response from collections endpoint
"""
url = "{}/collections".format(self.base_url)
try:
response = requests.get(url)
response.raise_for_status()
return response.json()
except requests.RequestException as e:
if self.verbose:
print("Error fetching full collections response: {}".format(e))
return {
'collections': [],
'links': [],
'error': str(e)
}
def _format_datetime_rfc3339(self, datetime_input: Union[str, datetime]) -> str:
"""Convert datetime to RFC3339 format."""
if not datetime_input:
return None
if isinstance(datetime_input, datetime):
return datetime_input.strftime('%Y-%m-%dT%H:%M:%SZ')
datetime_str = str(datetime_input)
if 'T' in datetime_str and datetime_str.endswith('Z'):
return datetime_str
if '/' in datetime_str:
start_date, end_date = datetime_str.split('/')
if 'T' not in start_date:
start_rfc3339 = f"{start_date}T00:00:00Z"
else:
start_rfc3339 = start_date if start_date.endswith('Z') else f"{start_date}Z"
if 'T' not in end_date:
end_rfc3339 = f"{end_date}T23:59:59Z"
else:
end_rfc3339 = end_date if end_date.endswith('Z') else f"{end_date}Z"
return f"{start_rfc3339}/{end_rfc3339}"
if 'T' not in datetime_str:
return f"{datetime_str}T00:00:00Z"
if not datetime_str.endswith('Z'):
return f"{datetime_str}Z"
return datetime_str
def _build_search_payload(self, collections, intersects, bbox, datetime, query, limit, days=None):
"""
Build search payload from parameters.
Args:
collections: List of collection names
intersects: GeoJSON geometry for intersection
bbox: Bounding box [west, south, east, north]
datetime: Specific datetime or datetime range
query: Additional query parameters
limit: Maximum number of results
days: Number of days back from today to search (convenience parameter)
Returns:
Dictionary containing the search payload
"""
search_payload = {}
if collections:
search_payload["collections"] = collections
if intersects:
search_payload["intersects"] = intersects
if bbox:
search_payload["bbox"] = bbox
# 🆕 ENHANCED: Handle 'days' parameter with priority over datetime
if days is not None:
from datetime import datetime as dt, timedelta
# 🔥 FIX: Use dt.now() instead of datetime.now()
# Convert days to datetime range (days back from today)
end_date = dt.now()
start_date = end_date - timedelta(days=days)
datetime_range = f"{start_date.strftime('%Y-%m-%dT%H:%M:%SZ')}/{end_date.strftime('%Y-%m-%dT%H:%M:%SZ')}"
search_payload["datetime"] = datetime_range
if self.verbose:
print(f"🗓️ Using days parameter: {days} days back ({start_date.strftime('%Y-%m-%d')} to {end_date.strftime('%Y-%m-%d')})")
elif datetime:
# Use explicit datetime if days is not specified
if isinstance(datetime, list):
search_payload["datetime"] = "/".join(datetime)
else:
search_payload["datetime"] = self._format_datetime_rfc3339(datetime)
if query:
search_payload["query"] = query
if limit:
search_payload["limit"] = limit
return search_payload
[docs]
@abstractmethod
def search(self, collections: Optional[List[str]] = None, **kwargs) -> STACSearch:
"""Abstract search method to be implemented by each provider."""
pass
@abstractmethod
def _create_pystac_catalog_fallback(self):
"""Create pystac-client catalog for fallback strategy."""
pass
def _fallback_chunking_search(self, search_params: Dict, search_url: str = None, verbose: bool = False) -> List[Dict]:
"""🔄 FALLBACK TIER 3: Own chunking implementation (silent)."""
if not search_url:
search_url = self.search_url
# Simple time-based chunking strategy
if "datetime" in search_params and "/" in search_params["datetime"]:
return self._chunked_time_search(search_params, search_url, verbose)
else:
return self._simple_pagination_fallback(search_params, search_url, verbose)
def _chunked_time_search(self, search_params: Dict, search_url: str, verbose: bool = False) -> List[Dict]:
"""Silent time-based chunking for fallback."""
start_date_str, end_date_str = search_params["datetime"].split("/")
start_dt = datetime.fromisoformat(start_date_str.replace('Z', '+00:00'))
end_dt = datetime.fromisoformat(end_date_str.replace('Z', '+00:00'))
all_items = []
chunk_days = 30 # Conservative chunk size
current_dt = start_dt
headers = {'Content-Type': 'application/json', 'Accept': 'application/geo+json'}
while current_dt < end_dt:
chunk_end = min(current_dt + timedelta(days=chunk_days), end_dt)
chunk_datetime = f"{current_dt.strftime('%Y-%m-%dT%H:%M:%SZ')}/{chunk_end.strftime('%Y-%m-%dT%H:%M:%SZ')}"
chunk_params = search_params.copy()
chunk_params["datetime"] = chunk_datetime
chunk_params["limit"] = 100
try:
response = requests.post(search_url, json=chunk_params, headers=headers, timeout=30)
response.raise_for_status()
data = response.json()
chunk_items = data.get('features', data.get('items', []))
all_items.extend(chunk_items)
if verbose:
print(f" 📅 Chunk {current_dt.strftime('%Y-%m-%d')}: {len(chunk_items)} items")
except Exception as e:
if verbose:
print(f" ⚠️ Chunk failed: {e}")
current_dt = chunk_end
return all_items
def _simple_pagination_fallback(self, search_params: Dict, search_url: str, verbose: bool = False) -> List[Dict]:
"""Silent pagination for non-time searches."""
all_items = []
headers = {'Content-Type': 'application/json', 'Accept': 'application/geo+json'}
page_params = {**search_params, "limit": 100}
for page in range(10): # Max 10 pages for safety
try:
response = requests.post(search_url, json=page_params, headers=headers, timeout=30)
response.raise_for_status()
data = response.json()
page_items = data.get('features', data.get('items', []))
if not page_items:
break
all_items.extend(page_items)
# Simple offset-based pagination
page_params = {**search_params, "limit": 100, "offset": len(all_items)}
if verbose:
print(f" 📄 Page {page + 1}: {len(page_items)} items")
except Exception as e:
if verbose:
print(f" ⚠️ Page {page + 1} failed: {e}")
break
return all_items
[docs]
def create_bbox_from_center(self, lat: float, lon: float, buffer_km: float = 10) -> List[float]:
"""Create a bounding box around a center point."""
buffer_deg = buffer_km / 111.0
return [lon - buffer_deg, lat - buffer_deg, lon + buffer_deg, lat + buffer_deg]
[docs]
def create_geojson_polygon(self, coordinates: List[List[float]]) -> Dict:
"""Create a GeoJSON polygon for area of interest."""
if coordinates[0] != coordinates[-1]:
coordinates.append(coordinates[0])
return {"type": "Polygon", "coordinates": [coordinates]}
def __repr__(self):
return f"{self.__class__.__name__}({len(self.collections)} collections, provider='{self.provider_name}')"