Source code for open_geodata_api.core.search

"""
STAC Search with optimized 3-tier fallback strategy - Performance Enhanced
"""

import warnings
from typing import Dict, Optional, Any, List, Union
from .collections import STACItemCollection

try:
    import pystac_client
    import planetary_computer
    PYSTAC_AVAILABLE = True
except ImportError:
    PYSTAC_AVAILABLE = False

[docs] class STACSearch: """Optimized STAC Search with smart fallback strategy and improved performance."""
[docs] def __init__(self, search_results: Dict, provider: str = "unknown", client_instance=None, original_search_params: Optional[Dict] = None, search_url: str = None, verbose: bool = False): self._results = search_results self._items = search_results.get('items', search_results.get('features', [])) self.provider = provider self._client = client_instance self._original_params = original_search_params or {} self._search_url = search_url self._verbose = verbose # Optimized fallback strategy tracking self._fallback_attempted = False self._pystac_attempted = False self._chunking_attempted = False # Enhanced caching system self._all_items_cached = search_results.get('all_items_cached', False) self._all_items_cache = None self._fallback_metadata_cache = {} # Performance optimization: Extract and store the original limit self._original_limit = self._original_params.get('limit') self._respect_limit = True # 🚀 PERFORMANCE OPTIMIZATION: Pre-calculate if fallback is needed self._needs_fallback = self._calculate_fallback_need() # 🚀 PERFORMANCE OPTIMIZATION: Cache simple results immediately if sufficient if not self._needs_fallback: limited_items = self._apply_limit_if_needed(self._items) self._all_items_cache = STACItemCollection(limited_items, provider=self.provider) self._all_items_cached = True
def _calculate_fallback_need(self) -> bool: """🚀 OPTIMIZATION: Pre-calculate if fallback is actually needed.""" # No fallback needed if we have enough items for the requested limit if self._original_limit and len(self._items) >= self._original_limit: return False # No fallback needed if we have less than 100 items (not hitting API limit) if len(self._items) < 100: return False # Fallback needed if we hit the 100-item API limit and want more return len(self._items) == 100 and (not self._original_limit or self._original_limit > 100) def _apply_limit_if_needed(self, items_list: List) -> List: """🔧 Apply original limit to items if specified.""" if self._respect_limit and self._original_limit and len(items_list) > self._original_limit: if self._verbose: print(f"🔧 Applying limit: {self._original_limit} items (was {len(items_list)})") return items_list[:self._original_limit] return items_list
[docs] def get_all_items(self) -> STACItemCollection: """🚀 OPTIMIZED: Fast return for simple cases, fallback only when needed.""" # 🚀 PERFORMANCE: Return cached result immediately if available if self._all_items_cache: return self._all_items_cache # 🚀 PERFORMANCE: Skip fallback logic entirely if not needed if not self._needs_fallback: limited_items = self._apply_limit_if_needed(self._items) self._all_items_cache = STACItemCollection(limited_items, provider=self.provider) return self._all_items_cache # Only attempt fallback if actually needed and not already attempted if not self._fallback_attempted and self._client: self._fallback_attempted = True if self._verbose: print(f"🔄 Attempting fallback strategies for {len(self._items)} items...") # Try pystac-client first pystac_result = self._try_pystac_fallback() if pystac_result: return pystac_result # Try chunking search as last resort chunking_result = self._try_chunking_fallback() if chunking_result: return chunking_result if self._verbose: print("⚠️ All fallback strategies failed, returning simple search results") # Return simple search results with limit applied limited_items = self._apply_limit_if_needed(self._items) self._all_items_cache = STACItemCollection(limited_items, provider=self.provider) return self._all_items_cache
def _try_pystac_fallback(self) -> Optional[STACItemCollection]: """🔄 FALLBACK TIER 2: Try pystac-client pagination.""" if self._pystac_attempted or not PYSTAC_AVAILABLE: return None self._pystac_attempted = True try: if self._verbose: print("🔄 Tier 2: Trying pystac-client fallback...") # Create pystac-client catalog for this provider pystac_catalog = self._client._create_pystac_catalog_fallback() if not pystac_catalog: return None # Create pystac-client search pystac_search = pystac_catalog.search(**self._original_params) # Suppress warnings and get all items with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=FutureWarning, module="pystac_client") warnings.filterwarnings("ignore", message=".*get_all_items.*deprecated.*") pystac_items = pystac_search.get_all_items() all_items_dicts = [item.to_dict() for item in pystac_items] # Apply limit to pystac results limited_items = self._apply_limit_if_needed(all_items_dicts) if self._verbose: print(f" ✅ pystac-client retrieved {len(limited_items)} items") # Cache and return self._all_items_cache = STACItemCollection(limited_items, provider=self.provider) self._all_items_cached = True return self._all_items_cache except Exception as e: if self._verbose: print(f" ❌ pystac-client fallback failed: {e}") return None def _try_chunking_fallback(self) -> Optional[STACItemCollection]: """🔄 FALLBACK TIER 3: Try own chunking search.""" if self._chunking_attempted: return None self._chunking_attempted = True try: if self._verbose: print("🔄 Tier 3: Trying chunking fallback...") if hasattr(self._client, '_fallback_chunking_search'): chunked_items = self._client._fallback_chunking_search( self._original_params, self._search_url, verbose=self._verbose ) # Apply limit to chunking results limited_items = self._apply_limit_if_needed(chunked_items) if self._verbose: print(f" ✅ Chunking retrieved {len(limited_items)} items") # Cache and return self._all_items_cache = STACItemCollection(limited_items, provider=self.provider) self._all_items_cached = True return self._all_items_cache except Exception as e: if self._verbose: print(f" ❌ Chunking fallback failed: {e}") return None
[docs] def item_collection(self) -> STACItemCollection: """Alias for get_all_items().""" return self.get_all_items()
[docs] def items(self): """🚀 OPTIMIZED: Return iterator over items with smart caching.""" # Use cached items if available if self._all_items_cache: for item_data in self._all_items_cache._items: from .items import STACItem yield STACItem(item_data, provider=self.provider) else: # Use simple items with limit applied limited_items = self._apply_limit_if_needed(self._items) for item_data in limited_items: from .items import STACItem yield STACItem(item_data, provider=self.provider)
[docs] def matched(self) -> Optional[int]: """Return total number of matched items.""" if self._all_items_cache: return len(self._all_items_cache._items) return self._results.get('numberMatched', self._results.get('matched'))
[docs] def total_items(self) -> Optional[int]: """Return total number of items.""" if self._all_items_cache: return len(self._all_items_cache._items) return self._results.get('total_returned')
[docs] def search_params(self) -> Optional[dict]: """Return search parameters used for the query.""" return self._results.get('search_params', self._original_params)
[docs] def all_keys(self) -> List[str]: """Return all keys from the search results.""" return list(self._results.keys())
[docs] def list_product_ids(self) -> List[str]: """🔧 FIXED: Return product IDs with simplified, reliable logic.""" return [item.get("id") for item in self.items()]
[docs] def get_fallback_status(self) -> Dict[str, Any]: """Get detailed fallback status information.""" return { 'needs_fallback': self._needs_fallback, 'fallback_attempted': self._fallback_attempted, 'pystac_attempted': self._pystac_attempted, 'chunking_attempted': self._chunking_attempted, 'all_items_cached': self._all_items_cached, 'original_items_count': len(self._items), 'cached_items_count': len(self._all_items_cache._items) if self._all_items_cache else None, 'original_limit': self._original_limit, 'respect_limit': self._respect_limit }
[docs] def set_limit_enforcement(self, enforce: bool): """Control whether to enforce the original limit parameter.""" self._respect_limit = enforce # Clear cache to force recalculation self._all_items_cache = None self._fallback_attempted = False self._needs_fallback = self._calculate_fallback_need()
def __len__(self): """Return length with optimized caching.""" if self._all_items_cache: return len(self._all_items_cache._items) return len(self._apply_limit_if_needed(self._items)) def __repr__(self): """Enhanced representation with optimization info.""" count = len(self) status = "cached" if self._all_items_cached else "simple" limit_info = f", limit={self._original_limit}" if self._original_limit else "" return f"STACSearch({count} items found, provider='{self.provider}', {status}{limit_info})"