"""
Example: Compute Lifted Index (LI) from WRF output using wrf-python + MetPy
LI = T_env(500 hPa) - T_parcel(500 hPa)
- Pressure: wrf.getvar("pressure") -> hPa
- Temperature: wrf.getvar("temp", units="K")
- Dewpoint: wrf.getvar("td", units="K")
- PSFC: wrf.getvar("PSFC") / 100.0 -> hPa
- T500: wrf.vinterp(..., level=500 hPa)
"""
import os
import glob
import numpy as np
from netCDF4 import Dataset
import wrf
from scipy.ndimage import gaussian_filter
import metpy.calc as mpcalc
from metpy.units import units
def compute_lifted_index(
pressure,
temperature,
dewpoint,
psfc_hpa,
T500,
smooth_sigma=1.0,
lowest_layer_dp=50.0,
):
"""
Compute Lifted Index (LI) using full vertical profiles and a moist-adiabatic parcel lift.
LI = T_env(500 hPa) - T_parcel(500 hPa)
Parameters
----------
pressure : xarray.DataArray
3D full pressure [hPa], shape (nz, ny, nx).
temperature : xarray.DataArray
3D temperature [K], shape (nz, ny, nx).
dewpoint : xarray.DataArray
3D dewpoint temperature [K], shape (nz, ny, nx).
psfc_hpa : xarray.DataArray
2D surface pressure [hPa], shape (ny, nx).
T500 : xarray.DataArray
2D environmental temperature at 500 hPa [K], shape (ny, nx).
smooth_sigma : float or None
Sigma for Gaussian smoothing (in grid points) applied to T500.
Set to None to disable smoothing.
lowest_layer_dp : float
Pressure depth [hPa] above the surface over which to average T and Td
for the starting parcel (e.g. 50 hPa).
Returns
-------
LI_da : xarray.DataArray
2D field of Lifted Index [K], same horizontal shape/coords as T500.
"""
# Convert to numpy (may be masked arrays)
pres_np = wrf.to_np(pressure) # (nz, ny, nx), hPa
temp_np = wrf.to_np(temperature) # (nz, ny, nx), K
dew_np = wrf.to_np(dewpoint) # (nz, ny, nx), K
psfc_np = wrf.to_np(psfc_hpa) # (ny, nx), hPa
# Environmental 500-hPa temperature field (optionally smoothed)
t500_np = wrf.to_np(T500) # (ny, nx), K
if smooth_sigma is not None:
t500_np = gaussian_filter(t500_np, sigma=smooth_sigma)
nz, ny, nx = pres_np.shape
li_np = np.full((ny, nx), np.nan, dtype=np.float32)
for j in range(ny):
for i in range(nx):
p_col = pres_np[:, j, i]
T_col = temp_np[:, j, i]
Td_col = dew_np[:, j, i]
psfc_here = psfc_np[j, i]
# Convert masked arrays to plain ndarrays with NaNs where masked
if np.ma.isMaskedArray(p_col):
p_vals = p_col.filled(np.nan)
else:
p_vals = np.array(p_col, dtype=float)
if np.ma.isMaskedArray(T_col):
T_vals = T_col.filled(np.nan)
else:
T_vals = np.array(T_col, dtype=float)
if np.ma.isMaskedArray(Td_col):
Td_vals = Td_col.filled(np.nan)
else:
Td_vals = np.array(Td_col, dtype=float)
# Skip bad/missing columns
if (
np.any(np.isnan(p_vals))
or np.any(np.isnan(T_vals))
or np.any(np.isnan(Td_vals))
or np.isnan(psfc_here)
):
continue
# Attach units for temperature/dewpoint
T_q = T_vals * units.kelvin
Td_q = Td_vals * units.kelvin
# Lowest `lowest_layer_dp` hPa above the surface
layer_mask = (p_vals <= psfc_here) & (
p_vals >= psfc_here - lowest_layer_dp
)
if np.any(layer_mask):
T0_q = T_q[layer_mask].mean()
Td0_q = Td_q[layer_mask].mean()
else:
# Fallback: use lowest model level
T0_q = T_q[0]
Td0_q = Td_q[0]
# Sort pressure from high -> low (required by parcel_profile)
sort_idx = np.argsort(p_vals)[::-1] # largest p -> smallest
p_sorted_vals = p_vals[sort_idx]
p_sorted = p_sorted_vals * units.hectopascal
# Compute parcel profile along this sorted pressure column
parcel_T = mpcalc.parcel_profile(p_sorted, T0_q, Td0_q) # K
# Interpolate parcel T to 500 hPa
# np.interp expects x ascending, so flip arrays (low -> high)
p_asc = p_sorted.magnitude[::-1] # hPa ascending
T_parcel_asc = parcel_T.to("kelvin").magnitude[::-1] # K
# Require that 500 hPa be within the pressure range
if (500.0 < p_asc.min()) or (500.0 > p_asc.max()):
continue
T_parcel_500 = np.interp(500.0, p_asc, T_parcel_asc) # K
T_env_500 = t500_np[j, i] # K
# LI = T_env(500) - T_parcel(500)
li_np[j, i] = T_env_500 - T_parcel_500
# Wrap back into an xarray.DataArray using T500 as a template
LI_da = T500.copy(data=li_np)
LI_da.name = "li"
LI_da.attrs["description"] = "Lifted Index (T_env(500 hPa) - T_parcel(500 hPa))"
LI_da.attrs["units"] = "K"
return LI_da
# ----------------------------------------------------------------------
# Minimal example usage on WRF output
# ----------------------------------------------------------------------
if __name__ == "__main__":
import sys
if len(sys.argv) != 3:
print(
"\nUsage: python li_example.py /path/to/wrf/run d02\n"
" where /path/to/wrf/run contains wrfout_d02* files\n"
)
sys.exit(1)
path_wrf = sys.argv[1]
domain = sys.argv[2]
for ncfile_path in sorted(glob.glob(os.path.join(path_wrf, f"wrfout_{domain}*"))):
ncfile = Dataset(ncfile_path)
print(f"Processing {os.path.basename(ncfile_path)}")
# 3D pressure [hPa]
pressure = wrf.getvar(ncfile, "pressure") # hPa
# 3D temperature [K]
temperature = wrf.getvar(ncfile, "temp", units="K")
# 3D dewpoint [K] from WRF diagnostic
dewpoint = wrf.getvar(ncfile, "td", units="K")
# Surface pressure [Pa] -> [hPa]
psfc_pa = wrf.getvar(ncfile, "PSFC") # Pa
psfc = psfc_pa / 100.0 # hPa
psfc.attrs["units"] = "hPa"
# 500 hPa environmental temperature
geopotential_height = wrf.getvar(ncfile, "geopt")
T500 = wrf.vinterp(
ncfile,
temperature,
"pressure",
[500],
field_type="tk",
extrapolate=True,
squeeze=True,
meta=True,
).squeeze()
# Compute LI field
LI_da = compute_lifted_index(
pressure=pressure,
temperature=temperature,
dewpoint=dewpoint,
psfc_hpa=psfc,
T500=T500,
smooth_sigma=1.0, # smoothing for plotting; set None to disable
lowest_layer_dp=50.0, # depth of mixed parcel layer [hPa]
)
# Example: print stats; you can save or plot LI_da as needed
LI = wrf.to_np(LI_da)
print(
f" LI stats: min={np.nanmin(LI):.2f} K, max={np.nanmax(LI):.2f} K"
)
ncfile.close()