Source code for typhon.files.handlers.tovs

from datetime import datetime, timedelta

import numpy as np
import numexpr as ne
from netCDF4 import Dataset
from scipy.interpolate import CubicSpline
from typhon.utils import Timer
import xarray as xr

from .common import NetCDF4, expects_file_info
from .testers import check_lat_lon

__all__ = [
    'AVHRR_GAC_HDF',
    'MHS_HDF',
]


[docs]class AAPP_HDF(NetCDF4): """Base class for handling TOVS satellite data converted with AAPP tools """ # This file handler always wants to return at least time, lat and lon # fields. These fields are required for this: standard_fields = { "Data/scnlintime", # milliseconds since midnight "Data/scnlinyr", "Data/scnlindy", "Data/scnlin", "Geolocation/Latitude", "Geolocation/Longitude" }
[docs] def __init__(self, **kwargs): """ Args: **kwargs: Additional key word arguments for base class. """ # Call the base class initializer super().__init__(**kwargs)
[docs] @expects_file_info() def get_info(self, file_info, **kwargs): with Dataset(file_info.path, "r") as file: file_info.times[0] = \ datetime(int(file.startdatayr[0]), 1, 1) \ + timedelta(days=int(file.startdatady[0]) - 1) \ + timedelta(milliseconds=int(file.startdatatime_ms[0])) file_info.times[1] = \ datetime(int(file.enddatayr), 1, 1) \ + timedelta(days=int(file.enddatady) - 1) \ + timedelta(milliseconds=int(file.enddatatime_ms)) return file_info
@staticmethod def _get_time_field(dataset, user_fields): time = \ (dataset["Data/scnlinyr"].values - 1970).astype('datetime64[Y]') \ + (dataset["Data/scnlindy"].values - 1).astype('timedelta64[D]') \ + dataset["Data/scnlintime"].values.astype("timedelta64[ms]") dataset["time"] = "scnline", time # Remove the time fields that we do not need any longer (expect the # user asked for them explicitly) dataset = dataset.drop_vars( {"Data/scnlinyr", "Data/scnlindy", "Data/scnlintime"} - set(user_fields), ) return dataset @staticmethod def _mask_and_scale(dataset): # xarray.open_dataset can mask and scale automatically, but it does not # know the attribute *Scale* (which is specific for AAPP files): for var in dataset.variables: # We want to remove some attributes after applying them but # OrderedDict does not allow to pop the values: attrs = dict(dataset[var].attrs) mask = attrs.pop('FillValue', None) if mask is not None: dataset[var] = dataset[var].where( # Also cover overflow errors as they are in # NSS.MHSX.NN.D07045.S2234.E0021.B0896162.GC.h5 (dataset[var] != mask) & (dataset[var] != -2147483648.0) ) scaling = attrs.pop('Scale', None) if scaling is not None: dataset[var] = dataset[var].astype(float) * scaling dataset[var].attrs = attrs def _test_coords(self, dataset, wanted=None): # Maximal these dimensions (or less) should be in the dataset: if wanted is None: wanted = {'channel', 'scnline', 'scnpos'} reality = set(dataset.dims.keys()) if reality - wanted: raise ValueError( f"Unexpected dimension in AAPP file! {reality - wanted}" )
[docs]class MHS_HDF(AAPP_HDF): """File handler for MHS level 1C HDF files """
[docs] def __init__(self, **kwargs): super(MHS_HDF, self).__init__(**kwargs) # Map the standard fields to standard names (make also the names of all # dimensions more meaningful): self.mapping = { "Geolocation/Latitude": "lat", "Geolocation/Longitude": "lon", "Data/scnlin": "scnline", "Data/phony_dim_0": "scnline", "Data/phony_dim_1": "scnpos", "Data/phony_dim_2": "channel", "Geolocation/phony_dim_3": "scnline", "Geolocation/phony_dim_4": "scnpos", }
[docs] @expects_file_info() def read(self, file_info, mask_and_scale=True, **kwargs): """Read and parse MHS AAPP HDF5 files and load them to xarray Args: file_info: Path and name of the file as string or FileInfo object. This can also be a tuple/list of file names or a path with asterisk. mask_and_scale: Where the data contains missing values, it will be masked with NaNs. Furthermore, data with scaling attributes will be scaled with them. **kwargs: Additional keyword arguments that are valid for :class:`~typhon.files.handlers.common.NetCDF4`. Returns: A xrarray.Dataset object. """ # Make sure that the standard fields are always gonna be imported: user_fields = kwargs.pop("fields", {}) if user_fields: fields = self.standard_fields | set(user_fields) else: fields = None # We catch the user mapping here, since we do not want to deal with # user-defined names in the further processing. Instead, we use our own # mapping user_mapping = kwargs.pop("mapping", None) # Load the dataset from the file: dataset = super().read( file_info, fields=fields, mapping=self.mapping, mask_and_scale=mask_and_scale, **kwargs ) scnlines = dataset["scnline"].values dataset = dataset.assign_coords( scnline=dataset["scnline"] ) dataset["scnline"] = np.arange(1, dataset.scnline.size+1) dataset["scnpos"] = np.arange(1, 91) dataset["channel"] = "channel", np.arange(1, 6) # Create the time variable (is built from several other variables): dataset = self._get_time_field(dataset, user_fields) if mask_and_scale: self._mask_and_scale(dataset) # Make a fast check whether everything is alright self._test_coords(dataset) # Check the latitudes and longitudes: check_lat_lon(dataset) if user_mapping is not None: dataset = dataset.rename(user_mapping) return dataset
[docs]class AVHRR_GAC_HDF(AAPP_HDF): """File handler for AVHRR GAC level 1C HDF files """
[docs] def __init__(self, **kwargs): super(AVHRR_GAC_HDF, self).__init__(**kwargs) # Map the standard fields to standard names (make also the names of all # dimensions more meaningful): self.mapping = { "Geolocation/Latitude": "lat", "Geolocation/Longitude": "lon", "Data/scnlin": "scnline", "Data/phony_dim_0": "scnline", "Data/phony_dim_1": "scnpos", "Data/phony_dim_2": "channel", "Data/phony_dim_3": "calib", "Geolocation/phony_dim_4": "scnline", "Geolocation/phony_dim_5": "packed_pixels", }
[docs] @expects_file_info() def read(self, file_info, mask_and_scale=True, interpolate_packed_pixels=True, max_nans_interpolation=10, **kwargs): """Read and parse MHS AAPP HDF5 files and load them to xarray Args: file_info: Path and name of the file as string or FileInfo object. This can also be a tuple/list of file names or a path with asterisk. mask_and_scale: Where the data contains missing values, it will be masked with NaNs. Furthermore, data with scaling attributes will be scaled with them. interpolate_packed_pixels: Geo-location data is packed and must be interpolated to use them as reference for each pixel. max_nans_interpolation: How many NaN values are allowed in latitude and longitudes before raising an error? **kwargs: Additional keyword arguments that are valid for :class:`~typhon.files.handlers.common.NetCDF4`. Returns: A xrarray.Dataset object. """ # Make sure that the standard fields are always gonna be imported: user_fields = kwargs.pop("fields", {}) if user_fields: fields = self.standard_fields | set(user_fields) else: fields = None # We catch the user mapping here, since we do not want to deal with # user-defined names in the further processing. Instead, we use our own # mapping user_mapping = kwargs.pop("mapping", None) # Load the dataset from the file: dataset = super().read( file_info, fields=fields, mapping=self.mapping, mask_and_scale=mask_and_scale, **kwargs ) # Keep the original scnlines scnlines = dataset["scnline"].values dataset = dataset.assign_coords( scnline=dataset["scnline"] ) dataset["scnline"] = np.arange(1, dataset.scnline.size+1) dataset["scnpos"] = np.arange(1, 2049) dataset["channel"] = "channel", np.arange(1, 6) # Currently, the AAPP converting tool seems to have a bug. Instead of # retrieving 409 pixels per scanline, one gets 2048 pixels. The # additional values are simply duplicates (or rather quintuplicates): dataset = dataset.sel(scnpos=slice(4, None, 5)) dataset["scnpos"] = np.arange(1, 410) # Create the time variable (is built from several other variables): dataset = self._get_time_field(dataset, user_fields) if mask_and_scale: self._mask_and_scale(dataset) # All geolocation fields are packed in the AVHRR GAC files: if interpolate_packed_pixels: self._interpolate_packed_pixels(dataset, max_nans_interpolation) allowed_coords = {'channel', 'calib', 'scnline', 'scnpos'} else: allowed_coords = {'channel', 'calib', 'scnline', 'scnpos', 'packed_pixels'} # Make a fast check whether everything is alright self._test_coords(dataset, allowed_coords) # Check the latitudes and longitudes: check_lat_lon(dataset) if user_mapping is not None: dataset = dataset.rename(user_mapping) return dataset
@staticmethod def _interpolate_packed_pixels(dataset, max_nans_interpolation): given_pos = np.arange(5, 409, 8) new_pos = np.arange(1, 410) lat_in = np.deg2rad(dataset["lat"].values) lon_in = np.deg2rad(dataset["lon"].values) # We cannot define given positions for each scanline, but we have to # set them for all equally. Hence, we skip every scan position of all # scan lines even if only one contains a NaN value: nan_scnpos = \ np.isnan(lat_in).sum(axis=0) + np.isnan(lon_in).sum(axis=0) valid_pos = nan_scnpos == 0 if valid_pos.sum() < 52 - max_nans_interpolation: raise ValueError( "Too many NaNs in latitude and longitude of this AVHRR file. " "Cannot guarantee a good interpolation!" ) # Filter NaNs because CubicSpline cannot handle it: lat_in = lat_in[:, valid_pos] lon_in = lon_in[:, valid_pos] given_pos = given_pos[valid_pos] x_in = np.cos(lon_in) * np.cos(lat_in) y_in = np.sin(lon_in) * np.cos(lat_in) z_in = np.sin(lat_in) xf = CubicSpline(given_pos, x_in, axis=1, extrapolate=True)(new_pos) yf = CubicSpline(given_pos, y_in, axis=1, extrapolate=True)(new_pos) zf = CubicSpline(given_pos, z_in, axis=1, extrapolate=True)(new_pos) lon = np.rad2deg(np.arctan2(yf, xf)) lat = np.rad2deg(np.arctan2(zf, np.sqrt(xf ** 2 + yf ** 2))) dataset["lat"] = ("scnline", "scnpos"), lat dataset["lon"] = ("scnline", "scnpos"), lon # The other packed variables will be simply padded: for var_name, var in dataset.data_vars.items(): if "packed_pixels" not in var.dims: continue nan_scnpos = np.isnan(var).sum(axis=0) valid_pos = nan_scnpos == 0 given_pos = np.arange(5, 409, 8)[valid_pos] dataset[var_name] = xr.DataArray( CubicSpline( given_pos, var.values[:, valid_pos], axis=1, extrapolate=True)(new_pos), dims=("scnline", "scnpos") )