|
| 1 | +from shapely.geometry import Polygon |
1 | 2 | import geopandas as gpd |
2 | | -from shapely.geometry import shape |
3 | | -from typing import List, Dict |
| 3 | +import numpy as np |
| 4 | +from typing import List, Dict, Any, Tuple |
4 | 5 |
|
| 6 | +# Reusable functions |
| 7 | +def filter_quality(properties: List[Dict[str, Any]], |
| 8 | + geometries: List[Dict[str, Any]], |
| 9 | + ids: List[str], |
| 10 | + min_points: int = 5, |
| 11 | + min_view_angle: float = 3) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]], List[str]]: |
| 12 | + """Filter out scenes with insufficient points or bad quality.""" |
| 13 | + index_sub = [i for i, prop in enumerate(properties) |
| 14 | + if prop.get('ground_control', False) and |
| 15 | + prop.get('quality_category', '') == 'standard' and |
| 16 | + prop.get('view_angle', 0) < min_view_angle and |
| 17 | + len(geometries[i]['coordinates'][0]) >= min_points] |
5 | 18 |
|
6 | | -def create_gdf(features: List[Dict]) -> gpd.GeoDataFrame: |
| 19 | + return ([properties[i] for i in index_sub], |
| 20 | + [geometries[i] for i in index_sub], |
| 21 | + [ids[i] for i in index_sub]) |
| 22 | + |
| 23 | +def compute_central_coordinates(geometries: List[Dict[str, Any]]) -> Tuple[np.ndarray, np.ndarray]: |
| 24 | + """Compute central latitude and longitude for each polygon.""" |
| 25 | + central_lon = np.array([np.mean([pt[0] for pt in geom['coordinates'][0]]) for geom in geometries]) |
| 26 | + central_lat = np.array([np.mean([pt[1] for pt in geom['coordinates'][0]]) for geom in geometries]) |
| 27 | + return central_lon, central_lat |
| 28 | + |
| 29 | +def compute_local_times(properties: List[Dict[str, Any]], central_lon: np.ndarray) -> np.ndarray: |
| 30 | + """Compute local times from UTC and central longitude.""" |
| 31 | + hours = np.array([int(p['acquired'].split('T')[1].split('Z')[0].split(':')[0]) for p in properties]) |
| 32 | + minutes = np.array([int(p['acquired'].split('T')[1].split('Z')[0].split(':')[1]) for p in properties]) |
| 33 | + seconds = np.array([float(p['acquired'].split('T')[1].split('Z')[0].split(':')[2]) for p in properties]) |
| 34 | + utc_times = hours + minutes / 60 + seconds / 3600 |
| 35 | + local_times = utc_times + central_lon / 15 |
| 36 | + return local_times |
| 37 | + |
| 38 | +def geometries_to_polygons(geometries: List[Dict[str, Any]]) -> List[Polygon]: |
| 39 | + """Convert GeoJSON geometries to Shapely Polygons.""" |
| 40 | + return [Polygon(sub['coordinates'][0]) for sub in geometries] |
| 41 | + |
| 42 | +def calculate_intersections(polygons: List[Polygon], |
| 43 | + properties: List[Dict[str, Any]], |
| 44 | + min_sun_angle: float = 0, |
| 45 | + area_threshold: float = 25.0) -> Tuple[np.ndarray, np.ndarray]: |
7 | 46 | """ |
8 | | - Convert Planet features into GeoDataFrame. |
| 47 | + Compute pairwise intersected areas and sun angle differences between polygons. |
| 48 | + Only compares polygons from the same instrument and different satellites. |
9 | 49 | """ |
| 50 | + n = len(polygons) |
| 51 | + area_2d = np.zeros((n, n), dtype=np.float32) |
| 52 | + sun_diff_2d = np.zeros((n, n), dtype=np.float32) |
| 53 | + sun_angles = np.array([90 - p['sun_elevation'] for p in properties]) |
| 54 | + instruments = [p['instrument'] for p in properties] |
| 55 | + satellite_ids = [p['satellite_id'] for p in properties] |
| 56 | + |
| 57 | + for i in range(n): |
| 58 | + for j in range(i + 1, n): |
| 59 | + if instruments[i] == instruments[j] and satellite_ids[i] != satellite_ids[j]: |
| 60 | + if polygons[i].bounds[2] >= polygons[j].bounds[0] and \ |
| 61 | + polygons[i].bounds[3] >= polygons[j].bounds[1]: |
| 62 | + intersection = polygons[i].intersection(polygons[j]) |
| 63 | + if intersection.area > 0: |
| 64 | + area_2d[i, j] = intersection.area |
| 65 | + area_2d[j, i] = area_2d[i, j] |
| 66 | + sun_diff_2d[i, j] = abs(sun_angles[i] - sun_angles[j]) |
| 67 | + sun_diff_2d[j, i] = sun_diff_2d[i, j] |
| 68 | + return area_2d, sun_diff_2d |
| 69 | + |
| 70 | +def process_tiles(all_properties: List[List[Dict[str, Any]]], |
| 71 | + all_geometries: List[List[Dict[str, Any]]], |
| 72 | + all_ids: List[List[str]], |
| 73 | + min_view_angle: float = 3, |
| 74 | + min_sun_angle: float = 0) -> gpd.GeoDataFrame: |
| 75 | + """ |
| 76 | + Process multiple tiles/datasets and return a unified GeoDataFrame. |
| 77 | + """ |
| 78 | + merged_properties = [] |
| 79 | + merged_geometries = [] |
| 80 | + merged_ids = [] |
| 81 | + |
| 82 | + # Step 1: Filter and merge tiles |
| 83 | + for props, geoms, ids in zip(all_properties, all_geometries, all_ids): |
| 84 | + f_props, f_geoms, f_ids = filter_quality(props, geoms, ids, min_view_angle=min_view_angle) |
| 85 | + merged_properties.extend(f_props) |
| 86 | + merged_geometries.extend(f_geoms) |
| 87 | + merged_ids.extend(f_ids) |
| 88 | + |
| 89 | + # Step 2: Compute central coordinates and local times |
| 90 | + central_lon, central_lat = compute_central_coordinates(merged_geometries) |
| 91 | + local_times = compute_local_times(merged_properties, central_lon) |
| 92 | + |
| 93 | + # Step 3: Convert to Shapely Polygons |
| 94 | + polygons = geometries_to_polygons(merged_geometries) |
10 | 95 |
|
11 | | - geometries = [shape(f["geometry"]) for f in features] |
12 | | - properties = [f["properties"] for f in features] |
| 96 | + # Step 4: Compute intersections and sun differences |
| 97 | + area_2d, sun_diff_2d = calculate_intersections(polygons, merged_properties, min_sun_angle=min_sun_angle) |
13 | 98 |
|
14 | | - gdf = gpd.GeoDataFrame(properties, geometry=geometries) |
15 | | - gdf.set_crs(epsg=4326, inplace=True) |
| 99 | + # Step 5: Create GeoDataFrame |
| 100 | + gdf = gpd.GeoDataFrame({ |
| 101 | + "id": list(range(len(merged_ids))), |
| 102 | + "name": merged_ids, |
| 103 | + "geometry": polygons, |
| 104 | + "view_angle": [p['view_angle'] for p in merged_properties], |
| 105 | + "acquired": [p['acquired'] for p in merged_properties], |
| 106 | + "cloud_cover": [p['cloud_cover'] for p in merged_properties], |
| 107 | + "sun_elevation": [p['sun_elevation'] for p in merged_properties], |
| 108 | + "sun_angle": [90 - p['sun_elevation'] for p in merged_properties], |
| 109 | + "satellite_id": [p['satellite_id'] for p in merged_properties], |
| 110 | + "central_lon": central_lon, |
| 111 | + "central_lat": central_lat, |
| 112 | + "local_times": local_times, |
| 113 | + "max_sun_diff": [sun_diff_2d[i, :].max() for i in range(len(polygons))] |
| 114 | + }) |
| 115 | + gdf.set_index('id', inplace=True) |
16 | 116 | return gdf |
0 commit comments