"""
STAC Item Collections module - Complete with all essential functions including missing ones
"""
import re
import json
from typing import List, Dict, Any, Optional, Union
from urllib.parse import urlparse
from pathlib import Path
from datetime import datetime, timedelta
try:
import pandas as pd
PANDAS_AVAILABLE = True
except ImportError:
PANDAS_AVAILABLE = False
try:
import geopandas as gpd
GEOPANDAS_AVAILABLE = True
except ImportError:
GEOPANDAS_AVAILABLE = False
try:
import geojson
GEOJSON_AVAILABLE = True
except ImportError:
GEOJSON_AVAILABLE = False
[docs]
class STACItemCollection:
"""
Complete STAC Item Collection with all essential functions including the missing ones.
"""
[docs]
def __init__(self, items: List[Dict], provider: str = "unknown"):
"""
Initialize STAC Item Collection.
Args:
items: List of STAC item dictionaries
provider: Provider name (e.g., "planetary_computer", "earthsearch")
"""
self._items = items
self.provider = provider
self._cached_dataframe = None
self._parent_search = None
def __len__(self):
"""Return number of items in collection."""
return len(self._items)
def __getitem__(self, index):
"""Get item by index as STACItem object."""
from .items import STACItem
return STACItem(self._items[index], provider=self.provider)
def __iter__(self):
"""Iterate over items as STACItem objects."""
from .items import STACItem
for item_data in self._items:
yield STACItem(item_data, provider=self.provider)
@property
def items(self):
"""Get all items as list of STACItem objects."""
from .items import STACItem
return [STACItem(item_data, provider=self.provider) for item_data in self._items]
@property
def raw_items(self):
"""Get raw item dictionaries (for internal use)."""
return self._items
# ========================================
# MISSING FUNCTIONS - ADDED
# ========================================
[docs]
def get_available_bands(self) -> List[str]:
"""
🆕 ADDED: Get list of all available bands/assets across the collection.
Returns:
Sorted list of unique band/asset names available in the collection
"""
all_bands = set()
for item in self._items:
assets = item.get('assets', {})
all_bands.update(assets.keys())
return sorted(list(all_bands))
[docs]
def get_all_urls(self, signed: Optional[bool] = None) -> Dict[str, Dict[str, str]]:
"""
🆕 ADDED: Get all URLs in the requested format.
Format: {<product_id>: {<band_name>: <url>, <band_name>: <url>, ...}, ...}
Args:
signed: Whether to sign URLs (auto-detected by provider if None)
Returns:
Dictionary with product_id -> {band_name: url} mapping
"""
all_urls = {}
from .items import STACItem
for i, item_data in enumerate(self._items):
stac_item = STACItem(item_data, provider=self.provider)
product_id = item_data.get('id', f'item_{i}')
try:
# Get all asset URLs for this item
item_urls = stac_item.get_all_asset_urls(signed=signed)
all_urls[product_id] = item_urls
except Exception as e:
print(f"⚠️ Error processing item {product_id}: {e}")
all_urls[product_id] = {}
return all_urls
[docs]
def get_band_urls(self, band_names: Optional[List[str]] = None,
asset_type: str = "all", signed: Optional[bool] = None) -> Dict[str, Dict[str, str]]:
"""
🆕 ADDED: Get URLs for specific bands or asset types with filtering options.
Args:
band_names: List of specific band names to include (None for all)
asset_type: Filter by asset type:
- "all": All assets (default)
- "image": Only image/tiff assets
- "bands": Only spectral bands (B01, B02, etc.)
- "visual": Only visual/RGB assets
signed: Whether to sign URLs (auto-detected by provider if None)
Returns:
Dictionary with product_id -> {band_name: url} mapping
"""
filtered_urls = {}
from .items import STACItem
for i, item_data in enumerate(self._items):
stac_item = STACItem(item_data, provider=self.provider)
product_id = item_data.get('id', f'item_{i}')
try:
# Get all asset URLs for this item
all_item_urls = stac_item.get_all_asset_urls(signed=signed)
# Apply filtering
filtered_item_urls = {}
for asset_key, asset_url in all_item_urls.items():
# Check if asset should be included
include_asset = False
# Filter by band names if specified
if band_names:
if asset_key in band_names:
include_asset = True
else:
# Filter by asset type
if asset_type == "all":
include_asset = True
elif asset_type == "image":
# Check if it's an image/tiff asset
asset_data = item_data.get('assets', {}).get(asset_key, {})
asset_mime_type = asset_data.get('type', '').lower()
if any(img_type in asset_mime_type for img_type in ['image/tiff', 'image/geotiff', 'application/geotiff']):
include_asset = True
elif asset_type == "bands":
# Check if it's a spectral band (B01, B02, etc.)
if re.match(r'^B\d+A?$', asset_key) or asset_key.lower() in ['red', 'green', 'blue', 'nir', 'swir']:
include_asset = True
elif asset_type == "visual":
# Check if it's a visual asset
if asset_key.lower() in ['visual', 'true-color', 'rgb', 'thumbnail', 'preview']:
include_asset = True
if include_asset:
filtered_item_urls[asset_key] = asset_url
filtered_urls[product_id] = filtered_item_urls
except Exception as e:
print(f"⚠️ Error processing item {product_id}: {e}")
filtered_urls[product_id] = {}
return filtered_urls
# ========================================
# UPDATED FUNCTIONS
# ========================================
[docs]
def to_simple_products_list(self, include_urls: bool = True,
url_bands: Optional[List[str]] = None) -> List[Dict[str, Any]]:
"""
🔧 UPDATED: Convert collection to simple products list with optional URLs.
Args:
include_urls: Whether to include URLs with href key
url_bands: Specific bands to include URLs for (None for all)
Returns:
List of simplified product dictionaries with optional URLs
"""
simple_products = []
from .items import STACItem
for i, item_data in enumerate(self._items):
properties = item_data.get('properties', {})
simple_product = {
'id': item_data.get('id', f'item_{i}'),
'collection': item_data.get('collection', 'unknown'),
'datetime': properties.get('datetime', 'unknown'),
'cloud_cover': properties.get('eo:cloud_cover'),
'asset_count': len(item_data.get('assets', {})),
'provider': self.provider
}
# Add URLs if requested
if include_urls:
try:
stac_item = STACItem(item_data, provider=self.provider)
if url_bands:
# Get URLs for specific bands
urls = {}
for band in url_bands:
if stac_item.has_asset(band):
urls[band] = stac_item.get_asset_url(band)
simple_product['href'] = urls
else:
# Get all URLs
simple_product['href'] = stac_item.get_all_asset_urls()
except Exception as e:
print(f"⚠️ Error getting URLs for item {simple_product['id']}: {e}")
simple_product['href'] = {}
simple_products.append(simple_product)
return simple_products
# ========================================
# EXISTING ESSENTIAL FUNCTIONS (MAINTAINED)
# ========================================
[docs]
def to_list(self) -> List[Dict]:
"""Convert collection to list of dictionaries."""
return self._items.copy()
[docs]
def to_dict(self) -> Dict[str, Any]:
"""Convert collection to dictionary format."""
return {
'type': 'FeatureCollection',
'provider': self.provider,
'total_items': len(self._items),
'features': self._items.copy(),
'metadata': {
'date_range': self.get_date_range(),
'extensions': self.list_asset_extensions(),
'collections': self.get_unique_collections(),
'available_bands': self.get_available_bands()
}
}
[docs]
def to_geojson(self, filename: Optional[str] = None) -> Dict[str, Any]:
"""Convert collection to GeoJSON format."""
if GEOJSON_AVAILABLE:
try:
features = []
for item in self._items:
if item.get('geometry'):
features.append(geojson.Feature(
geometry=item['geometry'],
properties=item.get('properties', {}),
id=item.get('id')
))
collection = geojson.FeatureCollection(features)
if filename:
with open(filename, 'w') as f:
geojson.dump(collection, f, indent=2)
print(f"✅ GeoJSON saved to {filename}")
return collection
except Exception as e:
print(f"⚠️ geojson library error: {e}, falling back to manual creation")
# Manual GeoJSON creation
geojson_data = {
'type': 'FeatureCollection',
'features': []
}
for item in self._items:
if item.get('geometry'):
feature = {
'type': 'Feature',
'geometry': item['geometry'],
'properties': item.get('properties', {}),
'id': item.get('id')
}
geojson_data['features'].append(feature)
if filename:
with open(filename, 'w') as f:
json.dump(geojson_data, f, indent=2)
print(f"✅ GeoJSON saved to {filename}")
if not GEOJSON_AVAILABLE:
print("💡 For better GeoJSON support, install: pip install geojson")
return geojson_data
[docs]
def get_all_assets(self) -> Dict[str, List[str]]:
"""Get all unique assets across all items."""
all_assets = {}
for i, item in enumerate(self._items):
assets = item.get('assets', {})
item_id = item.get('id', f'item_{i}')
for asset_key in assets.keys():
if asset_key not in all_assets:
all_assets[asset_key] = []
all_assets[asset_key].append(item_id)
return all_assets
[docs]
def get_assets_by_collection(self) -> Dict[str, List[str]]:
"""Get assets grouped by collection."""
collection_assets = {}
for item in self._items:
collection = item.get('collection', 'unknown')
assets = list(item.get('assets', {}).keys())
if collection not in collection_assets:
collection_assets[collection] = set()
collection_assets[collection].update(assets)
return {
collection: sorted(list(assets))
for collection, assets in collection_assets.items()
}
[docs]
def to_products_dict(self) -> Dict[str, Dict[str, Any]]:
"""Convert collection to products dictionary with detailed metadata."""
products = {}
for i, item in enumerate(self._items):
item_id = item.get('id', f'item_{i}')
properties = item.get('properties', {})
assets = item.get('assets', {})
products[item_id] = {
'id': item_id,
'collection': item.get('collection', 'unknown'),
'datetime': properties.get('datetime', 'unknown'),
'cloud_cover': properties.get('eo:cloud_cover'),
'geometry': item.get('geometry'),
'bbox': item.get('bbox'),
'assets': list(assets.keys()),
'asset_count': len(assets),
'provider': self.provider,
'properties': properties
}
return products
[docs]
def get_common_bands(self, min_occurrence: float = 0.5) -> List[str]:
"""Get commonly available bands/assets across the collection."""
if not self._items:
return []
asset_counts = {}
total_items = len(self._items)
for item in self._items:
assets = item.get('assets', {})
for asset_key in assets.keys():
asset_counts[asset_key] = asset_counts.get(asset_key, 0) + 1
min_count = int(total_items * min_occurrence)
common_bands = [
asset for asset, count in asset_counts.items()
if count >= min_count
]
return sorted(common_bands)
# ========================================
# PATTERN MATCHING AND FILTERING
# ========================================
[docs]
def get_assets_by_pattern(self, pattern: str, match_type: str = "extension") -> List[str]:
"""Get asset names that match the specified pattern."""
matching_assets = []
for item in self._items:
assets = item.get('assets', {})
for asset_key, asset_data in assets.items():
if self._asset_matches_pattern(asset_key, asset_data, pattern, match_type):
if asset_key not in matching_assets:
matching_assets.append(asset_key)
return matching_assets
def _asset_matches_pattern(self, asset_key: str, asset_data: Dict,
pattern: str, match_type: str) -> bool:
"""Check if an asset matches the specified pattern."""
pattern_lower = pattern.lower()
if match_type == "extension":
return self._check_extension_match(asset_data, pattern_lower)
elif match_type == "mime":
return self._check_mime_match(asset_data, pattern_lower)
elif match_type == "name":
return pattern_lower in asset_key.lower()
elif match_type == "url":
asset_url = asset_data.get('href', '')
return pattern_lower in asset_url.lower()
else:
return self._check_extension_match(asset_data, pattern_lower)
def _check_extension_match(self, asset_data: Dict, pattern: str) -> bool:
"""Check if asset's actual file extension matches pattern."""
asset_url = asset_data.get('href', '')
if not asset_url:
return False
try:
parsed_url = urlparse(asset_url)
url_path = parsed_url.path
file_extension = Path(url_path).suffix.lower()
pattern_clean = pattern.lstrip('.')
extension_clean = file_extension.lstrip('.')
return extension_clean == pattern_clean or pattern in file_extension
except Exception:
return False
def _check_mime_match(self, asset_data: Dict, pattern: str) -> bool:
"""Check if asset's MIME type matches pattern."""
asset_type = asset_data.get('type', '').lower()
return pattern in asset_type
[docs]
def get_assets_by_extension(self, extension: str) -> List[str]:
"""Convenience method to get assets by file extension."""
if not extension.startswith('.'):
extension = '.' + extension
return self.get_assets_by_pattern(extension, match_type="extension")
[docs]
def get_assets_by_mime_type(self, mime_type: str) -> List[str]:
"""Convenience method to get assets by MIME type."""
return self.get_assets_by_pattern(mime_type, match_type="mime")
[docs]
def list_asset_extensions(self) -> Dict[str, List[str]]:
"""List all unique file extensions and which assets have them."""
extensions_map = {}
for item in self._items:
assets = item.get('assets', {})
for asset_key, asset_data in assets.items():
asset_url = asset_data.get('href', '')
if asset_url:
try:
parsed_url = urlparse(asset_url)
file_extension = Path(parsed_url.path).suffix.lower()
if file_extension:
if file_extension not in extensions_map:
extensions_map[file_extension] = []
if asset_key not in extensions_map[file_extension]:
extensions_map[file_extension].append(asset_key)
except Exception:
continue
return extensions_map
# ========================================
# FILTERING FUNCTIONS
# ========================================
[docs]
def filter_by_cloud_cover(self, max_cloud_cover: float) -> 'STACItemCollection':
"""Filter items by cloud cover percentage."""
filtered_items = []
for item in self._items:
cloud_cover = item.get('properties', {}).get('eo:cloud_cover', 0)
if cloud_cover <= max_cloud_cover:
filtered_items.append(item)
return STACItemCollection(filtered_items, provider=self.provider)
[docs]
def filter_by_date_range(self, start_date: Optional[str] = None,
end_date: Optional[str] = None,
days_back: Optional[int] = None,
auto_fix_dates: bool = True) -> 'STACItemCollection':
"""
🔧 ENHANCED: Filter items by date range with automatic invalid date correction.
Args:
start_date: Start date (YYYY-MM-DD format) - optional if using days_back
end_date: End date (YYYY-MM-DD format) - optional if using days_back
days_back: Number of days back from today - alternative to start_date/end_date
auto_fix_dates: Whether to automatically fix invalid dates (default: True)
Returns:
New STACItemCollection with filtered items
Examples:
# Using date range with auto-correction
filtered = collection.filter_by_date_range("2024-01-01", "2024-02-31") # Auto-fixes to 2024-02-29
# Using days back (last 30 days)
filtered = collection.filter_by_date_range(days_back=30)
# Disable auto-correction
filtered = collection.filter_by_date_range("2024-01-01", "2024-02-31", auto_fix_dates=False)
"""
# Validate input parameters
if not start_date and not end_date and not days_back:
print("⚠️ No filtering parameters provided. Returning original collection.")
return STACItemCollection(self._items.copy(), provider=self.provider)
# Calculate date range based on parameters
try:
if days_back is not None:
# Option 1: Use days_back parameter
if start_date:
# days_back from specific start_date
start_dt = self._parse_date(start_date)
end_dt = start_dt + timedelta(days=days_back)
else:
# days_back from today
end_dt = datetime.now()
start_dt = end_dt - timedelta(days=days_back)
else:
# Option 2: Use start_date and end_date
start_dt = self._parse_date(start_date) if start_date else datetime.min
end_dt = self._parse_date(end_date) if end_date else datetime.max
# Convert to date objects for comparison
start_date_obj = start_dt.date()
end_date_obj = end_dt.date()
print(f"🗓️ Filtering items from {start_date_obj} to {end_date_obj}")
except ValueError as e:
if auto_fix_dates:
print(f"❌ Date parsing error: {e}")
print("💡 Use YYYY-MM-DD format for dates or set auto_fix_dates=True")
else:
print(f"❌ Date parsing error with auto_fix_dates=False: {e}")
return STACItemCollection([], provider=self.provider)
# Filter items
filtered_items = []
processed_items = 0
for item in self._items:
item_datetime = item.get('properties', {}).get('datetime', '')
if item_datetime:
try:
# Parse item datetime
item_dt = self._parse_item_datetime(item_datetime)
item_date = item_dt.date()
# Check if item falls within date range
if start_date_obj <= item_date <= end_date_obj:
filtered_items.append(item)
processed_items += 1
except Exception as e:
# Skip items with invalid datetime
continue
print(f"✅ Filtered {len(filtered_items)} items from {processed_items} total items")
return STACItemCollection(filtered_items, provider=self.provider)
def _parse_date(self, date_str: str) -> datetime:
"""
🔧 FIXED: Parse date string with automatic invalid date correction.
Args:
date_str: Date string in YYYY-MM-DD format
Returns:
datetime object with corrected date if needed
Raises:
ValueError: If date string format or month is invalid
"""
import calendar
import re
if not date_str:
raise ValueError("Date string cannot be empty")
# First try direct parsing for valid dates
date_formats = [
'%Y-%m-%d',
'%Y/%m/%d',
'%Y-%m-%dT%H:%M:%S',
'%Y-%m-%dT%H:%M:%SZ',
'%Y-%m-%dT%H:%M:%S.%fZ'
]
for fmt in date_formats:
try:
return datetime.strptime(date_str, fmt)
except ValueError:
continue
# If direct parsing fails, try to fix invalid dates
try:
# Handle YYYY-MM-DD format with potential invalid day
match = re.match(r"^(\d{4})-(\d{1,2})-(\d{1,2})$", date_str)
if match:
year, month, day = map(int, match.groups())
# Validate month
if month < 1 or month > 12:
raise ValueError(f"Invalid month: {month}. Must be 1-12.")
# Get last valid day of the month
last_day = calendar.monthrange(year, month)[1]
if day > last_day:
print(f"⚠️ Invalid date {date_str}: Day {day} doesn't exist in {calendar.month_name[month]} {year}")
print(f"🔧 Auto-correcting to {year}-{month:02d}-{last_day:02d}")
day = last_day
return datetime(year, month, day)
except Exception:
pass
# Try pandas if available as last resort
if PANDAS_AVAILABLE:
try:
return pd.to_datetime(date_str).to_pydatetime()
except Exception:
pass
raise ValueError(f"Unable to parse date: '{date_str}'. Use YYYY-MM-DD format.")
def _parse_item_datetime(self, datetime_str: str) -> datetime:
"""Parse item datetime with robust error handling."""
if PANDAS_AVAILABLE:
try:
return pd.to_datetime(datetime_str).to_pydatetime()
except Exception:
pass
datetime_formats = [
'%Y-%m-%dT%H:%M:%S.%fZ',
'%Y-%m-%dT%H:%M:%SZ',
'%Y-%m-%dT%H:%M:%S',
'%Y-%m-%d'
]
for fmt in datetime_formats:
try:
return datetime.strptime(datetime_str, fmt)
except ValueError:
continue
raise ValueError(f"Unable to parse item datetime: '{datetime_str}'")
[docs]
def get_unique_collections(self) -> List[str]:
"""Get list of unique collection names."""
collections = set()
for item in self._items:
collection = item.get('collection', '')
if collection:
collections.add(collection)
return list(collections)
[docs]
def get_date_range(self) -> Dict[str, str]:
"""Get date range of items in collection."""
dates = []
for item in self._items:
item_datetime = item.get('properties', {}).get('datetime', '')
if item_datetime:
try:
dates.append(self._parse_item_datetime(item_datetime))
except Exception:
continue
if dates:
return {
'start': min(dates).strftime('%Y-%m-%d'),
'end': max(dates).strftime('%Y-%m-%d')
}
return {'start': 'unknown', 'end': 'unknown'}
# ========================================
# PANDAS/GEOPANDAS DEPENDENT FUNCTIONS
# ========================================
[docs]
def to_dataframe(self, include_geometry: bool = True) -> 'pd.DataFrame':
"""Convert collection to pandas/geopandas DataFrame."""
if not PANDAS_AVAILABLE:
raise ImportError(
"❌ pandas is required for to_dataframe().\n"
"💡 Install with: pip install pandas"
)
if self._cached_dataframe is not None:
return self._cached_dataframe
# Build DataFrame from items
df_data = []
for item in self._items:
row = {
'id': item.get('id', ''),
'collection': item.get('collection', ''),
'datetime': item.get('properties', {}).get('datetime', ''),
'provider': self.provider
}
# Add all properties
properties = item.get('properties', {})
for key, value in properties.items():
if key not in row:
row[key] = value
# Add geometry info
geometry = item.get('geometry', {})
if geometry:
row['geometry_type'] = geometry.get('type', '')
# Add bbox if available
bbox = item.get('bbox', [])
if bbox and len(bbox) >= 4:
row['bbox_west'] = bbox[0]
row['bbox_south'] = bbox[1]
row['bbox_east'] = bbox[2]
row['bbox_north'] = bbox[3]
# Add asset count
assets = item.get('assets', {})
row['asset_count'] = len(assets)
df_data.append(row)
# Create DataFrame
if include_geometry and GEOPANDAS_AVAILABLE:
try:
from shapely.geometry import shape
geometries = []
for item in self._items:
geom = item.get('geometry')
if geom:
geometries.append(shape(geom))
else:
geometries.append(None)
df = gpd.GeoDataFrame(df_data, geometry=geometries)
except Exception:
df = pd.DataFrame(df_data)
if include_geometry:
print("⚠️ Failed to create GeoDataFrame, falling back to regular DataFrame")
else:
df = pd.DataFrame(df_data)
if include_geometry and not GEOPANDAS_AVAILABLE:
print("💡 For geometry support, install: pip install geopandas")
self._cached_dataframe = df
return df
[docs]
def to_geodataframe(self) -> 'gpd.GeoDataFrame':
"""Convert collection to geopandas GeoDataFrame."""
if not GEOPANDAS_AVAILABLE:
raise ImportError(
"❌ geopandas is required for to_geodataframe().\n"
"💡 Install with: pip install geopandas"
)
return self.to_dataframe(include_geometry=True)
# ========================================
# EXPORT AND UTILITY FUNCTIONS
# ========================================
[docs]
def export_urls_json(self, filename: str, asset_keys: Optional[List[str]] = None):
"""Export all URLs to JSON file for external processing."""
all_urls = {}
processed_count = 0
from .items import STACItem
for item in self._items:
stac_item = STACItem(item, provider=self.provider)
item_id = item.get('id', f'item_{processed_count}')
try:
if asset_keys:
item_urls = {}
for asset_key in asset_keys:
if stac_item.has_asset(asset_key):
item_urls[asset_key] = stac_item.get_asset_url(asset_key)
all_urls[item_id] = item_urls
else:
all_urls[item_id] = stac_item.get_all_asset_urls()
processed_count += 1
except Exception as e:
print(f"⚠️ Error processing item {item_id}: {e}")
continue
export_data = {
'provider': self.provider,
'total_items': len(self._items),
'processed_items': processed_count,
'exported_at': datetime.now().isoformat(),
'asset_keys': asset_keys or 'all',
'urls': all_urls
}
with open(filename, 'w') as f:
json.dump(export_data, f, indent=2)
print(f"✅ Exported {processed_count} items to {filename}")
[docs]
def print_collection_summary(self):
"""Print a comprehensive summary of the collection."""
date_range = self.get_date_range()
extensions = self.list_asset_extensions()
common_bands = self.get_common_bands()
available_bands = self.get_available_bands()
print(f"📦 STAC Collection Summary")
print(f"=" * 50)
print(f"🔗 Provider: {self.provider}")
print(f"📊 Total Items: {len(self._items)}")
print(f"📅 Date Range: {date_range['start']} to {date_range['end']}")
print(f"📋 Collections: {self.get_unique_collections()}")
print(f"🎯 Available Bands ({len(available_bands)}): {available_bands[:10]}{'...' if len(available_bands) > 10 else ''}")
print(f"🔗 Common Bands: {common_bands[:10]}{'...' if len(common_bands) > 10 else ''}")
print(f"\n📋 File Extensions:")
for ext, assets in list(extensions.items())[:5]:
print(f" {ext}: {len(assets)} assets")
print(f"\n💡 Usage Examples:")
print(f" # Get available bands: bands = collection.get_available_bands()")
print(f" # Get all URLs: urls = collection.get_all_urls()")
print(f" # Get band URLs: urls = collection.get_band_urls(['B04', 'B03', 'B02'])")
print(f" # Get image URLs: urls = collection.get_band_urls(asset_type='image')")
print(f" # Simple products: products = collection.to_simple_products_list(include_urls=True)")
# Show dependency status
print(f"\n📦 Optional Dependencies:")
print(f" pandas: {'✅ Available' if PANDAS_AVAILABLE else '❌ Not installed (pip install pandas)'}")
print(f" geopandas: {'✅ Available' if GEOPANDAS_AVAILABLE else '❌ Not installed (pip install geopandas)'}")
print(f" geojson: {'✅ Available' if GEOJSON_AVAILABLE else '❌ Not installed (pip install geojson)'}")
[docs]
def check_dependencies(self):
"""Check status of optional dependencies."""
deps = {
'pandas': PANDAS_AVAILABLE,
'geopandas': GEOPANDAS_AVAILABLE,
'geojson': GEOJSON_AVAILABLE
}
print("📦 Dependency Status:")
for dep, available in deps.items():
status = "✅ Available" if available else "❌ Not installed"
install_cmd = f"pip install {dep}" if not available else ""
print(f" {dep}: {status} {install_cmd}")
return deps
def __repr__(self):
"""String representation of the collection."""
return f"STACItemCollection({len(self._items)} items, provider='{self.provider}')"
def __str__(self):
"""Human-readable string representation."""
stats = self.get_date_range()
return f"STACItemCollection: {len(self._items)} items from {self.provider} ({stats['start']} to {stats['end']})"