|
| 1 | +import click |
| 2 | +import xarray as xr |
| 3 | +import geopandas as gpd |
| 4 | +import json |
| 5 | +import numpy as np |
| 6 | +import pandas as pd |
| 7 | +import re |
| 8 | +from datetime import datetime |
| 9 | +import cftime |
| 10 | + |
| 11 | +# Global flag to skip uploading |
| 12 | +skip_upload = False |
| 13 | + |
| 14 | +def convert_time(obj, output='compact'): |
| 15 | + if isinstance(obj, str): # Handle ctime (string) |
| 16 | + dt_obj = datetime.strptime(obj, '%a %b %d %H:%M:%S %Y') |
| 17 | + elif isinstance(obj, np.datetime64): # Handle datetime64 |
| 18 | + dt_obj = pd.Timestamp(obj).to_pydatetime() |
| 19 | + elif isinstance(obj, pd.Timestamp): # Handle pandas Timestamp explicitly |
| 20 | + dt_obj = obj.to_pydatetime() |
| 21 | + elif isinstance(obj, datetime): # Handle Python datetime objects |
| 22 | + dt_obj = obj |
| 23 | + elif isinstance( |
| 24 | + obj, |
| 25 | + ( |
| 26 | + cftime.DatetimeNoLeap, |
| 27 | + cftime.DatetimeAllLeap, |
| 28 | + cftime.Datetime360Day, |
| 29 | + cftime.DatetimeJulian, |
| 30 | + ), |
| 31 | + ): |
| 32 | + dt_obj = datetime(obj.year, obj.month, obj.day, obj.hour, obj.minute, obj.second) |
| 33 | + elif isinstance(obj, (int, float)): |
| 34 | + if obj > 1e18: # Assume nanoseconds timestamp |
| 35 | + dt_obj = datetime.fromtimestamp(obj / 1e9) |
| 36 | + elif obj > 1e10: # Assume milliseconds timestamp |
| 37 | + dt_obj = datetime.fromtimestamp(obj / 1000) |
| 38 | + else: # Assume seconds timestamp |
| 39 | + dt_obj = datetime.fromtimestamp(obj) |
| 40 | + else: |
| 41 | + return obj # Return as-is if the type is unrecognized |
| 42 | + |
| 43 | + if output == 'iso': |
| 44 | + return dt_obj.isoformat() |
| 45 | + elif output == 'datetime': |
| 46 | + return dt_obj |
| 47 | + elif output == 'compact': |
| 48 | + return int(dt_obj.strftime('%Y%m%d%H%M%S')) |
| 49 | + elif output == 'unix': |
| 50 | + return dt_obj.timestamp() |
| 51 | + |
| 52 | +def convert_longitude(lon): |
| 53 | + """ |
| 54 | + Converts longitude values from 0-360 to -180 to 180. |
| 55 | + This ensures compatibility with NetCDF data that uses either range. |
| 56 | + """ |
| 57 | + lon = np.array(lon) |
| 58 | + lon[lon > 180] -= 360 # If lon > 180, convert it to the -180 to 180 range. |
| 59 | + return lon |
| 60 | + |
| 61 | +def parse_filename(filename): |
| 62 | + pattern = ( |
| 63 | + r'(?P<variable>\w+)_Amon_(?P<model>[\w\-]+)_ssp(?P<scenario>\d+)_' |
| 64 | + r'(?P<ensemble>r\d+i\d+p\d+f?\d*)_?(?P<grid>\w+)?_(?P<time_range>\d{4,6}-\d{4,6})\.nc' |
| 65 | + ) |
| 66 | + match = re.match(pattern, filename) |
| 67 | + if match: |
| 68 | + return { |
| 69 | + 'Variable': match.group('variable'), |
| 70 | + 'Model': match.group('model'), |
| 71 | + 'Scenario': f'ssp{match.group("scenario")}', |
| 72 | + 'Ensemble': match.group('ensemble'), |
| 73 | + 'Grid': match.group('grid'), |
| 74 | + 'Time Range': match.group('time_range') |
| 75 | + } |
| 76 | + else: |
| 77 | + return {'Filename': filename} |
| 78 | + |
| 79 | +@click.command() |
| 80 | +@click.argument('geojson_file', type=click.Path(exists=True)) |
| 81 | +@click.argument('netcdf_files', nargs=-1, type=click.Path(exists=True)) |
| 82 | +@click.option('--sliding-variable', default='time', show_default=True, help="The variable representing time or another sliding dimension.") |
| 83 | +@click.option('--x-variable', default='lon', show_default=True, help="The variable representing longitude.") |
| 84 | +@click.option('--y-variable', default='lat', show_default=True, help="The variable representing latitude.") |
| 85 | +@click.option('--output-json', default='mergednetcdf.json', show_default=True, help="Output JSON file name.") |
| 86 | +def extract_netcdf_data(geojson_file, netcdf_files, sliding_variable, x_variable, y_variable, output_json): |
| 87 | + """Extracts NetCDF data for each feature in the GEOJSON file and merges multiple NetCDF files by time index.""" |
| 88 | + |
| 89 | + # Load GeoJSON |
| 90 | + gdf = gpd.read_file(geojson_file) |
| 91 | + |
| 92 | + # Generate the listing of Model, Scenario, Ensemble from NetCDF files |
| 93 | + model_scenario_ensemble_files = {} |
| 94 | + |
| 95 | + for netcdf_file in netcdf_files: |
| 96 | + print(netcdf_file) |
| 97 | + parsed = parse_filename(netcdf_file) |
| 98 | + model_scenario_ensemble = f"{parsed['Model']}_{parsed['Scenario']}_{parsed['Ensemble']}" |
| 99 | + |
| 100 | + if model_scenario_ensemble not in model_scenario_ensemble_files: |
| 101 | + model_scenario_ensemble_files[model_scenario_ensemble] = [] |
| 102 | + model_scenario_ensemble_files[model_scenario_ensemble].append(netcdf_file) |
| 103 | + |
| 104 | + output_data = {} |
| 105 | + |
| 106 | + for model_scenario_ensemble, netcdf_group in model_scenario_ensemble_files.items(): |
| 107 | + # Storage for merged data for this group |
| 108 | + merged_data = {"time": []} |
| 109 | + variable_descriptions = {} |
| 110 | + variable_names = [] |
| 111 | + |
| 112 | + for netcdf_file in netcdf_group: |
| 113 | + ds = xr.open_dataset(netcdf_file) |
| 114 | + |
| 115 | + # Ensure necessary variables exist |
| 116 | + if sliding_variable not in ds or x_variable not in ds or y_variable not in ds: |
| 117 | + raise ValueError(f"One or more specified variables not found in {netcdf_file}.") |
| 118 | + |
| 119 | + # Process each feature in the GeoJSON |
| 120 | + for _, feature in gdf.iterrows(): |
| 121 | + feature_id = str(feature.get("Plant_Code", feature.get("id", _))) # Ensure feature_id is a string |
| 122 | + x, y = feature.geometry.centroid.x, feature.geometry.centroid.y |
| 123 | + |
| 124 | + # Convert longitude if necessary (0-360 to -180 to 180) |
| 125 | + x = convert_longitude(x) |
| 126 | + |
| 127 | + # Find nearest grid cell |
| 128 | + x_idx = np.abs(ds[x_variable] - x).argmin().item() |
| 129 | + y_idx = np.abs(ds[y_variable] - y).argmin().item() |
| 130 | + |
| 131 | + for var in ds.data_vars: |
| 132 | + if sliding_variable in ds[var].dims: |
| 133 | + df = ds[var].isel({x_variable: x_idx, y_variable: y_idx}).to_pandas() |
| 134 | + |
| 135 | + # Check if the variable is a DataFrame |
| 136 | + if isinstance(df, pd.DataFrame) and sliding_variable in df.columns: |
| 137 | + # Convert time to UNIX time (seconds) and update the header |
| 138 | + df[sliding_variable] = df[sliding_variable].apply(lambda t: convert_time(t, 'unix')) |
| 139 | + df.rename(columns={sliding_variable: 'unix_time'}, inplace=True) |
| 140 | + elif isinstance(df, pd.Series) and sliding_variable in df.index: |
| 141 | + # Convert time in Series if present |
| 142 | + df = df.apply(lambda t: convert_time(t, 'unix')) |
| 143 | + df.name = 'unix_time' |
| 144 | + |
| 145 | + var_name = f"{var}" # Append filename to differentiate variables |
| 146 | + variable_names.append(var_name) |
| 147 | + # Extract description from NetCDF metadata |
| 148 | + description = ds[var].attrs.get("long_name") or ds[var].attrs.get("standard_name") or "No description available" |
| 149 | + variable_descriptions[var_name] = description |
| 150 | + |
| 151 | + # Merge time index |
| 152 | + if not merged_data["time"]: |
| 153 | + merged_data["time"] = sorted(df.index) |
| 154 | + |
| 155 | + # Use 'unix_time' as the index for reindexing |
| 156 | + merged_data[var_name] = df.reindex(merged_data["time"]).dropna().tolist() |
| 157 | + |
| 158 | + # Convert to structured format |
| 159 | + headers = list(merged_data.keys()) |
| 160 | + headers[headers.index("time")] = "unix_time" |
| 161 | + |
| 162 | + data_obj = { |
| 163 | + "name": f"merged_data_{model_scenario_ensemble}", |
| 164 | + "description": "Merged NetCDF data from multiple sources", |
| 165 | + "type": "_".join(variable_names), |
| 166 | + "header": headers, |
| 167 | + "rows": [list(row) for row in zip(*merged_data.values())] |
| 168 | + } |
| 169 | + |
| 170 | + # Convert pandas Timestamps to a serializable format |
| 171 | + for row in data_obj["rows"]: |
| 172 | + for idx, value in enumerate(row): |
| 173 | + row[idx] = convert_time(value, 'unix') |
| 174 | + |
| 175 | + # Generate summary stats |
| 176 | + summary = {} |
| 177 | + for column_name, values in zip(data_obj["header"], zip(*data_obj["rows"])): |
| 178 | + values = np.array(values, dtype=object) |
| 179 | + if np.issubdtype(values.dtype, np.number) and not np.isnan(values).all(): |
| 180 | + summary[column_name] = { |
| 181 | + "type": "number", |
| 182 | + "min": float(np.nanmin(values)), |
| 183 | + "max": float(np.nanmax(values)), |
| 184 | + "description": variable_descriptions.get(column_name, "No description available") |
| 185 | + } |
| 186 | + else: |
| 187 | + unique_values = set(filter(pd.notna, values)) |
| 188 | + summary[column_name] = { |
| 189 | + "type": "string", |
| 190 | + "description": variable_descriptions.get(column_name, "No description available") |
| 191 | + } |
| 192 | + if len(unique_values) < 100: |
| 193 | + summary[column_name]["unique_values"] = list(unique_values) |
| 194 | + |
| 195 | + data_obj["summary"] = summary |
| 196 | + output_data[model_scenario_ensemble] = [data_obj] |
| 197 | + |
| 198 | + # Save output to JSON file |
| 199 | + with open(output_json, 'w') as f: |
| 200 | + json.dump(output_data, f, indent=2) |
| 201 | + |
| 202 | + click.echo(f"Extraction complete. Output saved to {output_json}") |
| 203 | + |
| 204 | +if __name__ == '__main__': |
| 205 | + extract_netcdf_data() |
0 commit comments