"""Module containing classes abstracting datasets
"""
# Any commits made to this module between 2015-05-01 and 2017-03-01
# by Gerrit Holl are developed for the EC project “Fidelity and
# Uncertainty in Climate Data Records from Earth Observations (FIDUCEO)”.
# Grant agreement: 638822
#
# All those contributions are dual-licensed under the MIT license for use
# in typhon, and the GNU General Public License version 3.
import abc
import itertools
import logging
import pathlib
import re
import shelve
import string
import shutil
import tempfile
import datetime
import sys
import collections
import warnings
import numpy
import numpy.lib.arraysetops
import numpy.lib.recfunctions
import netCDF4
import xarray
from .. import config
from .. import utils
from .. import math as tpmath
from ..physics.units import em
from ..constants import MiB
from . import filters
logger = logging.getLogger(__name__)
try:
import progressbar
except ImportError:
progressbar = None
[docs]
class GranuleLocatorError(Exception):
"""Problem locating granules.
"""
[docs]
class DataFileError(Exception):
"""Superclass for any datafile problems
Upon reading a large amounts of data, some files will contain
problems. When processing a year of data, we don't want to fail
entirely when one orbit file fails. But if there is a real bug
somewhere, we probably do want to fail. Therefore, the typhon dataset
framework is optionally resilient to datafile related errors, but only
if those derive from DataFileError.
Therefore, users implementing their own reading routine may wish to
catch errors arising from corrupted data, and raise an exception
derived from DataFileError instead. That way, typhon knows that a
problem is due to corrupted data and not due to a bug.
"""
[docs]
class InvalidFileError(DataFileError, ValueError):
"""Raised when the requested information cannot be obtained from the file
See DataFileError for more information.
"""
[docs]
class InvalidDataError(DataFileError):
"""Raised when data is not how it should be.
See DataFileError for more information.
"""
all_datasets = {}
[docs]
class Dataset(metaclass=abc.ABCMeta):
"""Represents a dataset.
This is an abstract class. More specific subclasses are
SingleFileDataset and MultiFileDataset.
To add a dataset, subclass one of the subclasses of Dataset, such as
MultiFileDataset, and implement the abstract methods.
Dataset objects have a limited number of attributes. To limit the
occurence of bugs, dynamically setting non-pre-existing attributes is
limited. Attributes can be set either by passing keyword arguments
when creating the object, or by setting the appropriate field in your
typhon configuration file (such as .typhonrc). The configuration
section will correspond to the object name, the key to the attribute,
and the value to the value assigned to the attribute. See also
:mod:`typhon.config`.
Attributes:
start_date (datetime.datetime or numpy.datetime64):
Starting date for dataset. May be used to search through ALL
granules. WARNING! If this is set at a time t_0 before the
actual first measurement t_1, then the collocation algorith (see
CollocatedDataset) will conclude that there are 0 collocations
in [t_0, t_1], and will not realise if data in [t_0, t_1] are
actually added later!
end_date (datetime.datetime or numpy.datetime64):
Similar to start_date, but for ending.
name (str):
Name for the dataset. Used to make sure there is only a
single dataset with the same name for any particular dataset.
If a dataset is initiated with a pre-exisitng name, the
previous product is called.
aliases (Mapping[str, str]):
Aliases for field. Dictionary can be useful if you want to
programmatically loop through the same field for many different
datasets, but they are named differently. For example, an alias
could be "ch4_profile".
unique_fields (Container[str]):
Set of fields that make any individual measurement unique. For
example, the default value is {"time", "lat", "lon"}.
related (Mapping[str, Dataset]):
Dictionary whose keys may refer to other datasets with related
information, such as DMPs or flags.
"""
_instances = None
start_date = None
end_date = None
name = ""
section = ""
aliases = {}
time_field = "time"
unique_fields = {"time", "lat", "lon"}
mandatory_fields = None # fields that reader relies upon
related = {}
maxsize = 10000*MiB
my_pseudo_fields = None
read_returns = "ndarray"
default_orbit_filters = []
# in case of xarray returns, if concatenation is NOT along time
# coordinate, this should be set to something else
concat_coor = None
# Make singleton: there is no point in having multiple copies of a
# dataset-object around when both relate to the same dataset and
# class.
def __new__(cls, name=None, **kwargs):
name = name or cls.name or cls.__name__
if cls._instances is None:
cls._instances = {}
# make sure subclasses aren't confused for their parents
if not cls in cls._instances:
cls._instances[cls] = {}
if name in cls._instances[cls]:
return cls._instances[cls][name]
self = super().__new__(cls)
cls._instances[cls][name] = self
if not cls in all_datasets:
all_datasets[cls] = {}
all_datasets[cls][name] = self
return self # __init__ takes care of the rest
# through __new__, I make sure there's only a single dataset
# object per class with the same name. However, __init__ will get
# called again despite __new__ returning an older self. Make sure
# we don't get hurt by that.
[docs]
def __init__(self, **kwargs):
"""Initialise a Dataset object.
All keyword arguments will be translated into attributes.
Does not take positional arguments.
Note that if you create a dataset with a name that already exists,
the existing object is returned, but __init__ is still called
(Python does this, see
https://docs.python.org/3.7/reference/datamodel.html#object.__new__).
"""
self.mandatory_fields = set()
for (k, v) in kwargs.items():
setattr(self, k, v)
self.setlocal()
if self.my_pseudo_fields is None:
self.my_pseudo_fields = collections.OrderedDict()
def __setattr__(self, k, v):
if hasattr(self, k) or hasattr(type(self), k):
super().__setattr__(k, v)
else:
raise AttributeError("Unknown attribute: {}. ".format(k))
[docs]
def setlocal(self):
"""Set local attributes, from config or otherwise.
"""
if self.section in config.conf:
for k in config.conf[self.section]:
setattr(self, k, config.conf[self.section][k])
[docs]
@abc.abstractmethod
def find_granules(self, start=datetime.datetime.min,
end=datetime.datetime.max,
include_last_before=False):
"""Loop through all granules for indicated period.
This is a generator that will loop through all granules from
`start` to `end`, inclusive.
See also: `find_granules_sorted`
Arguments:
start (datetime.datetime): Start
Starting datetime. When omitted, start at complete
beginning of dataset.
end (datetime.datetime): End
End datetime. When omitted, continue to end of dataset.
Last granule will start before this datetime, but contents
may continue beyond it.
include_last_before (bool): Be inclusive
When True, also return the last granule /before/ start, so
that a reader is sure to include all data in the covered
period. When False, the first granule yielded is the
first granule starting after start.
Yields:
pathlib.Path objects for all files in dataset. Sorting is not
guaranteed; if you need guaranteed sorting, use
`find_granules_sorted`.
"""
raise NotImplementedError()
[docs]
@abc.abstractmethod
def find_granules_sorted(self, start=None, end=None):
"""Yield all granules sorted by starting time then ending time.
For details, see `find_granules`.
"""
raise NotImplementedError()
[docs]
@abc.abstractmethod
def find_most_recent_granule_before(self, instant,
**locator_args):
"""Find granule covering instant
Find granule started most recently before `instant`.
Arguments:
instant (datetime.datetime): Time to search for
datetime for which a granule is sought
beginning of dataset.
**locator_args:
Any other keyword arguments that the particular dataset
needs. Commonly, `satname` is needed.
Returns:
pathlib.Path object for sought granule.
"""
...
# Cannot use functools.lru_cache because it cannot handle mutable
# results or arguments
[docs]
@utils.cache.mutable_cache(maxsize=10)
def read_period(self, start=None,
end=None,
onerror="skip",
fields="all",
pseudo_fields=None,
sorted=True,
locator_args=None,
reader_args=None,
limits=None,
simple_filters=(),
orbit_filters=None,
enforce_no_duplicates=True,
excs=(DataFileError, filters.FilterError)):
"""Read all granules between start and end, in bulk.
Arguments:
start (datetime.datetime): Start
Starting datetime. When omitted, start at complete
beginning of dataset.
end (datetime.datetime): End
End datetime. When omitted, continue to end of dataset.
Last granule will start before this datetime, but contents
may continue beyond it. Can also be a datetime.timedelta,
which will then be interpreted as a length after start.
onerror (str): What to do on errors
When reading many files, some files may have problems. If
onerror is set to "skip", files with errors are skipped
and reading continues with the next file. This is the
default behaviour. If onerror is set to "raise", the
method will reraise the original exception as soon as a
problem occurs.
fields (Iterable[str] or str): What fields to return
What fields to read from dataset. Either a collection
of strings corresponding to fields to read, or the str
"all" (default), which means read all fields.
pseudo_fields (Mapping[str, function]): See documentation for
self.read.
sorted (bool): Should the granules be read in sorted order?
Defaults to true. NB: does not currently guarantee that
the actual results are sorted; this is up to the
individual reading routines!
locator_args (dict): Extra keyword arguments passed on to
granule finding routines.
reader_args (dict): Extra keyword arguments to be passed on to
reading routine. Since these differ per type, those are
probably documented in the class docstring.
limits (dict): Limitations to apply to each granule. For the
exact format, see `:func:typhon.math.array.limit_ndarray`.
simple_filters (container/iterable): collection of functions to be
applied for filtering. Must take ndarray input, must give
ndarray output. For more advanced filtering, pass
instances of implementations of filters.OrbitFilter to
orbit_filters.
orbit_filters (Sequence[filters.OrbitFilter]): more advanced
filters to apply for each orbit or after all orbits.
Those are instances of filters.OrbitFilter and thus have
state, which can be useful for e.g. overlap checking. For
each of those, the system will call .reset() before the
first orbit is read, .filter(...) after each orbit, and
.finalise() at the end.
enforce_no_duplicates
excs (Sequence[Exception]): what exceptions to try and catch
after every read. Use with onorrer.
Returns:
Masked array containing all data in period. Invalid data may
or may not be masked, depending on the behaviour of the
reading routine implemented by the subclass.
"""
if locator_args is None:
locator_args = {}
if reader_args is None:
reader_args = {}
if limits is None:
limits = {}
if orbit_filters is None:
orbit_filters = self.default_orbit_filters
start = start or self.start_date
end = end or self.end_date
if isinstance(end, datetime.timedelta):
end = start + end
finder = self.find_granules_sorted if sorted else self.find_granules
logger.info("Reading {self.name:s} {locator_args!s} "
"for period {start:%Y-%m-%d %H:%M:%S} "
" – {end:%Y-%m-%d %H:%M:%S}".format(**vars()))
dobar = sorted and progressbar and sys.stdout.isatty()
if dobar:
bar = progressbar.ProgressBar(max_value=1,
widgets=[progressbar.Bar("=", "[", "]"), " ",
progressbar.Percentage(),
' (', progressbar.ETA(), ') '])
bar.start()
bar.update(0)
else:
logger.info("Psst! If you install the progressbar2 package, "
"you will get a fancy progressbar!")
anygood = False
arr = None
N = 0
time = self.time_field
for of in orbit_filters:
of.reset()
# NB: https://stackoverflow.com/a/37585628/974555
extra_filter_args = collections.ChainMap(*(of.args_to_reader
for of in orbit_filters))
overlap_filters = [
f for f in orbit_filters if isinstance(f, filters.OverlapFilter)]
late = overlap_filters[0].late if len(overlap_filters) > 0 else True
if len(overlap_filters) > 1:
raise ValueError("Found {:d} overlap filters: {!s}".format(
len(overlap_filters), overlap_filters))
for (g_start, gran) in finder(start, end, return_time=True,
include_last_before=True,
**locator_args):
try:
(cont, extra) = self.read(str(gran), fields=fields,
pseudo_fields=pseudo_fields, **reader_args,
**extra_filter_args)
for of in orbit_filters:
cont = of.filter(cont, **extra)
# FIXME: handle cases where very few scanlines are left
# after filtering. We may find errors downstream if there
# are very few scanlines left.
if (not late and
cont[time].size > 0):
if (enforce_no_duplicates and sorted and
arr is not None and
N>0 and (cont[time][0] <= latest)):
if isinstance(latest, xarray.DataArray):
latest = latest.values.astype("M8[ms]")
if isinstance(cont[time][0], xarray.DataArray):
conttime = cont[time][0].values.astype("M8[ms]")
else:
conttime = cont[time][0]
raise InvalidDataError(
"Reading routine for {!s} returned data starting "
"{:%Y-%m-%d %H:%M:%S}, which precedes last entry "
"for preceding granule at {:%Y-%m-%d %H:%M:%S}. "
"As data are supposed to be sorted in time, this "
"probably means duplicate removal is not working "
"as it should, or you are using an implementation "
"that deliberately leaves duplicates in, and you "
"should be passing enforce_no_duplicates=False to "
"read_period.".format(gran,
conttime.astype(datetime.datetime),
latest.astype(datetime.datetime)))
# NB: when datasets erroneously contain duplicate coordinates
# (I'm looking at you, FIDUCEO/FCDR_HIRS#159!), this
# comparison will cause a failure in xarray. In this case,
# compare values instead.
arrrr = cont[time]
if isinstance(arrrr, xarray.DataArray):
arrrr = arrrr.values
if not (arrrr[1:] >= arrrr[:-1]).all():
raise InvalidDataError("Reader for {!s} returned data "
"with unsorted time. This must be fixed.".format(
f))
cont = self._apply_limits_and_filters(cont, limits, simple_filters)
except excs as exc:
if onerror == "skip": # fields that reader relies upon
logger.error("Can not read file {}: {}".format(
gran, exc.args[0]))
continue
else:
raise
else:
(arr, N, latest) = self._add_gran_to_data(arr, cont, N, start,
end, g_start)
# contents.append(
# cont[(cont[time]<=end)&(cont[time]>=start)])
if N > 0:
anygood = True
if dobar:
bar.update(max((g_start-start) / (end-start),0))
if dobar:
bar.update(1)
bar.finish()
if anygood:
# NB: filter finalise before or after dataset finalise?
#
arr = self._finalise_arr(arr, N)
for of in orbit_filters:
arr = of.finalise(arr)
if "flags" in self.related:
arr = self.flag(arr)
return arr
else:
raise DataFileError("Can not find any valid data!")
def _apply_limits_and_filters(self, cont, limits, simple_filters):
if isinstance(cont, xarray.Dataset):
if len(limits)>0:
raise NotImplementedError(
"limits not implemented on xarray datasets")
oldsize = cont[self.time_field].size
for f in simple_filters:
cont = f(cont)
logger.debug("Filters reduced number from "
"{:d} to {:d}".format(oldsize, cont[self.time_field].size))
return cont
oldsize = cont.size
cont = tpmath.array.limit_ndarray(cont, limits)
for f in simple_filters:
cont = f(cont)
if cont.size < oldsize:
logger.debug("Applying limitations, reducing "
"{:d} to {:d}".format(oldsize, cont.size))
return cont
def _add_gran_to_data(self, arr, cont, N, start, end, g_start):
# first ensure we only add the segment we actually want
if isinstance(cont, xarray.Dataset):
tm = cont[self.time_field].values.astype("M8[ms]")
cont = cont.loc[dict.fromkeys(
utils.get_time_coordinates(cont)&cont.dims.keys(),
slice(start, end))]
else:
cont = cont[(cont[self.time_field]<end)&(cont[self.time_field]>=start)]
if isinstance(cont, xarray.Dataset):
# xarray reads lazily, but we want to merge all granules into
# a single object at the end, so have to load it into memory
# anyway; rather do it now than all at once later, so that we
# can keep track of how fast things are going
if cont[self.time_field].size > 0: # avoid https://github.com/pydata/xarray/issues/1329
cont.load()
if arr is None:
arr = [cont]
N = cont[self.time_field].size
else:
arr.append(cont)
N += cont[self.time_field].size
for i in range(len(arr)):
if arr[-i][self.time_field].size > 0:
latest = arr[-i][self.time_field][-1]
break
else:
latest = numpy.datetime64(0, "ms")
else:
if arr is None:
arr = cont
N = cont.size
else:
if (N+cont.size) > arr.size: # need to allocate more
frac_done = max((g_start-start) / (end-start),0)
# suppose all future files have on average the
# same size? Until we have reached 10%, simply
# double every time. After that, extrapolate more
# cleverly.
newsize = (int((N+cont[self.time_field].size)//frac_done)
if frac_done > 0.1
else (N+cont[self.time_field].size) * 2)
arr = self._ensure_large_enough(arr, cont, N, newsize,
frac_done)
self._add_cont_to_arr(arr, N, cont)
N += cont[self.time_field].size
if arr[self.time_field].size > 0:
latest = arr[self.time_field][N-1]
else:
latest = numpy.datetime64(0, "ms")
return arr, N, latest
def _ensure_large_enough(self, arr, cont, N, newsize, frac_done):
"""Allocate new space while adding gran to data
Helper for _add_gran_to_data, part of the read_period family of
helpers. Does NOT add cont to arr!"""
if isinstance(cont, xarray.Dataset):
raise NotImplementedError("Not used for xarray datasets. "
"But see version history at "
"https://arts.mi.uni-hamburg.de/trac/rt/browser/typhon/trunk/typhon/datasets/dataset.py?rev=10396#L462 "
"if there is a wish to reimplemented!")
else:
if newsize * arr.itemsize > self.maxsize:
raise MemoryError("This dataset is too large "
"for typhons little mind. Continuing might "
"ultimately need {:,.0f} MiB of RAM. This exceeds my "
"maximum (self.maxsize) of {:,.0f} MiB. "
"Sorry! ".format(
newsize*arr.itemsize/MiB,
self.maxsize/MiB))
logger.debug(
"New size ({:d} items, {:,.0f} MiB) would exceed allocated "
"size ({:d} items, {:,.0f} MiB). I'm {:.3%} "
"through. Allocating new: {:d} items, {:,.0f} "
"MiB. New size: {:d} items, {:,.0f} "
"MiB.".format(N+cont.size,
(cont.nbytes+arr.nbytes)/MiB,
arr.size, arr.nbytes/MiB, frac_done,
newsize-arr.size, (newsize-arr.size)*arr.itemsize/MiB,
newsize, newsize*arr.itemsize/MiB))
mod = (numpy.ma if hasattr(arr, "mask") else numpy)
arr = mod.concatenate(
(arr, mod.zeros(dtype=arr.dtype, shape=newsize-arr.size)))
return arr
def _add_cont_to_arr(self, arr, N, cont):
"""Changes arr in-situ, does not return"""
if isinstance(cont, xarray.Dataset):
# we should already know it's large enough
# for arr[self.time_field] I start at N
# for the other time coordinates at the relative "speed" they
# are behind N
# but this is not guaranteed to be regular so I would need to
# keep trac of each individually, or inspect it on-the-fly
# this approximation may be good enough for pre-allocation
# (which is approximate anyway), when actually storing we need
# to do a better job… for each time coordinate, check when it
# “dies”
raise NotImplementedError("This is not used for xarrays. "
"But see comment in source-code for some thoughts.")
else:
arr[N:(N+cont.size)] = cont
#arr = self._finalise_arr(arr, N)
def _finalise_arr(self, arr, N):
if isinstance(arr, list):
logger.debug("Concatenating {N:d} DataArrays...".format(N=N))
if self.concat_coor is None:
return utils.concat_each_time_coordinate(*arr)
else:
return xarray.concat(arr, dim=self.concat_coor)
logger.debug("Done!")
else:
return self._correct_overallocation(arr, N)
@staticmethod
def _correct_overallocation(arr, N):
if isinstance(arr, xarray.Dataset):
raise RuntimeError("We shouldn't be here. Ever.")
logger.debug("Correcting overallocation ({:d}->{:d})".format(
arr.size, N))
return arr[:N]
@abc.abstractmethod
def _read(self, f, fields="all"):
"""Read granule in file, low-level.
To be implemented by subclass. Do not call this method; call
:func:`Dataset.read` instead.
For documentation, see :func:`Dataset.read`.
"""
raise NotImplementedError()
# Cannot use functools.lru_cache because it cannot handle mutable
# results or arguments
[docs]
@utils.cache.mutable_cache(maxsize=10)
def read(self, f=None, fields="all", pseudo_fields=None, **kwargs):
"""Read granule in file and do some other fixes
Shall return an ndarray with at least the fields lat, lon, time.
Arguments:
f (str): String containing path to file to read.
fields (Iterable[str] or str): What fields to return.
See :func:`Dataset.read_period` for details.
pseudo_fields (Mapping[str, function]): Additional fields to
calculate on-the-fly after every read. That may be more
attractive from a memory point of view. In this mapping,
the keys will be names added to the dtype of the returned
ndarray (at the top level). The values are functions.
Each function must take four arguments:
content [ndarray or xarray.DataArray]
Content of file.
D [Mapping]
Output of earlier-executed data-fields.
header [ndarray or xarry.DataArray]
Header of file.
fn [string-like]
Name of file.
and return an ndarray or xarray.DataArray to be merged
with content. Dictionary are ordered now, make sure to
pass the correct order.
Any further keyword arguments are passed on to the particular
reading routine. For details, please refer to the docstring
for the class.
Returns:
Masked array containing data in file with selected fields.
"""
if isinstance(f, pathlib.PurePath):
f = str(f)
if pseudo_fields is None:
pseudo_fields = collections.OrderedDict()
# expand pseudo_fields with items from my_pseudo_fields not
# already there, and also process dependent fields
if fields != "all":
fields = list(fields)
for (k, (deps, v, cond)) in self.my_pseudo_fields.items():
if k in fields:
for d in deps:
if (d not in fields and
all(kwargs.get(condfn) in condval for (condfn,
condval) in cond.items())):
fields.append(d)
for mandatory in self.mandatory_fields:
if mandatory not in fields:
fields.append(mandatory)
for (k, (deps, v, cond)) in self.my_pseudo_fields.items():
if (fields=="all" or
k in fields and k not in pseudo_fields) and (
all(kwargs.get(condfn) in condval
for (condfn, condval) in cond.items())):
pseudo_fields[k] = v
logger.debug("Reading {:s}".format(f))
# should not pass pseudo_fields on to reader, it will get confused
fields = (fields if fields == "all" else
[f for f in fields if not f in pseudo_fields])
(M, extra) = (self._read(f, fields=fields, **kwargs)
if f is not None
else self._read(fields=fields, **kwargs))
M = self._add_pseudo_fields(M, pseudo_fields, extra, f)
return (M, extra)
def _add_pseudo_fields(self, M, pseudo_fields, extra, f):
D = collections.OrderedDict()
for (k, fnc) in pseudo_fields.items():
try:
D[k] = fnc(M, D, extra, f)
except TypeError as exc:
if "positional argument" in exc.args[0]:
# backward compatibility
D[k] = fnc(M)
else:
raise
if D != {}:
if self.read_returns == "ndarray":
mod = (numpy.ma if hasattr(M, "mask") else numpy)
newM = mod.zeros(shape=M.shape,
dtype=M.dtype.descr + [(k, v.dtype, v.shape[1:]) for (k,v) in D.items()])
for k in M.dtype.names:
newM[k] = M[k]
elif self.read_returns == "xarray":
newM = M
else:
raise ValueError("read_returns must be ndarray or xarray")
for (k, v) in D.items():
newM[k] = v
M = newM
return M
def __str__(self):
return "<{self.__class__.__name__:s}:{self.name:s}>".format(self=self)
# Leave disk-memoisation out of typhon until dependency on joblib has
# been decided.
#
# @tools.mark_for_disk_cache(
# process=dict(
# my_data=lambda x: x.view(dtype="i1")))
[docs]
def combine(self, my_data, other_obj, other_data=None, other_args=None, trans=None,
timetol=numpy.timedelta64(1, 's'), time_name="time"):
"""Combine with data from other dataset.
Combine a set of measurements from this dataset with another
dataset, where each individual measurement correspond to exactly
one from the other one, as identified by time/lat/lon, orbitid, or
measurument id, or other characteristics. The object attribute
unique_fields determines how those are found.
The other dataset may contain flags, DMPs, or different
information altogether.
Arguments:
my_data (ndarray): Data for self.
A (masked) array with a dtype such as returned from
`self.read <Dataset.read>`.
other_obj (Dataset): Dataset to match
Object from a Dataset subclass from which to find matching
data.
other_data (ndarray): Data for other. Optional.
Optionally, pass data for other object. If not provided
or None, this will be read using other_obj.
other_args (dict): Keyword arguments passed to
other_obj.read_period. May need to contain things like
{"locator_args": {"satname": "noaa18"}}
trans (collections.OrderedDict): Dictionary of what field in `my_data`
corresponds to what field in `other_data`. Optional; by
default, merges self.unique_fields and
other_obj.unique_fields, and assumes names between the two
are identical. Order is relevant for optimal recursive
bisection search for matches, which is to be implemented.
timetol (timedelta64): For datetime types, `isclose` does not
work (https://github.com/numpy/numpy/issues/5610). User
must pass an explicit tolerance, defaulting to 1 second.
time_name (str): Name for my time field. Defaults to "time".
Used to determine the period to read from the other
dataset.
Returns:
Masked ndarray of same size as `my_data` and same `dtype` as
returned by `other_obj.read`.
TODO: Allow user to pass already-read data from other dataset.
"""
if trans is None:
flds = list(self.unique_fields & other_obj.unique_fields)
trans = collections.OrderedDict(zip(flds, flds))
if other_args is None:
other_args = {}
if self.read_returns == "ndarray":
if other_data is None:
first = my_data[time_name].min().astype(datetime.datetime)
last = my_data[time_name].max().astype(datetime.datetime)
elif self.read_returns == "xarray":
if other_data is None:
first = my_data[time_name].min().values.astype("M8[ms]"
).astype(datetime.datetime)
last = my_data[time_name].max().values.astype("M8[ms]"
).astype(datetime.datetime)
else:
raise ValueError("read_returns must be ndarray or xarray")
if other_data is None:
other_data = other_obj.read_period(first, last,
**other_args)
(my_prim, other_prim) = next(iter(trans.items()))
if self.read_returns == "ndarray":
# my_data needs to be sorted
try:
my_data._fill_value
except AttributeError:
my_data.sort(order=my_prim, kind="heapsort")
else:
# see
# https://github.com/numpy/numpy/issues/8069
my_data.sort(order=my_prim, kind="heapsort",
fill_value=my_data._fill_value)
elif self.read_returns == "xarray":
# ensure sorted along time dimension
if not len(my_data[my_prim].dims) == len(other_data[other_prim].dims) == 1:
raise ValueError(
"Must use 1-D index for finding combinations, "
"{:s} has {:d} dimensions: {!s}, "
"{:s} has {:d} dimensions: {!s}.".format(
my_prim, len(my_data[my_prim].dims),
my_data[my_prim].dims,
other_prim, len(other_data[other_prim].dims),
other_data[other_prim].dims))
my_dim = my_data[my_prim].dims[0]
other_dim = other_data[other_prim].dims[0]
if not (my_data[my_prim][1:] >= my_data[my_prim][:-1]).all():
warnings.warn("Primary/reference instrument data should "
"be sorted in time to find matching data in other set. "
f"Field {my_prim:s} in {self.name:s} is not sorted in "
"time. Note that this problem may be unavoidable if "
"we are tasked to search for additional information "
"for both pairs of collocations, and times are sorted "
"according to one but not the other. Therefore, I "
"will force the sorting for now, but I'm not entirely "
"sure this cannot lead to problems later. You have "
"been warned.", UserWarning)
my_data = my_data.loc[
{my_data[my_prim].dims[0]:
numpy.argsort(my_data[my_prim])}]
# through interpolation, find times in `other` closest to times in
# myself; hopefully that means the difference is zero
if issubclass(my_data[my_prim].dtype.type, numpy.datetime64):
x = other_data[other_prim].astype("<M8[ms]").astype("f8")
xx = my_data[my_prim].astype("<M8[ms]").astype("f8")
else:
x = other_data[other_prim]
xx = my_data[my_prim]
if self.read_returns == "ndarray":
ii = numpy.interp(xx, x,
numpy.arange(other_data[other_prim].shape[0])).round().astype(
numpy.uint64)
else:
if not other_data[other_prim].dtype.kind=="M":
raise ValueError("When finding combinations based "
"on xarray datasets, I can only do so based on "
"time coordinates, but {:s} has dtype {!s}.".format(
other_prim, other_data[other_prim].dtype))
for (k, v) in other_data.data_vars.items():
if v.dtype.kind == "M":
raise ValueError("Found time data variable "
"that is not a coordinate, I'm confused, "
"should I interpolate it? Assign as a coordinate "
"to be sure!")
ii = dict.fromkeys(utils.get_time_dimensions(other_data))
for k in ii.keys():
x = other_data[k].astype("<M8[ms]").astype("f8")
ii[k] = numpy.interp(xx, x,
numpy.arange(x.shape[0])).round().astype(numpy.int64)
# if self.read_returns == "ndarray":
# other_combi = other_data[ii]
# # now go through fields to see that there is an actual match
# elif self.read_returns == "xarray":
other_combi = other_data[ii]
if self.read_returns == "ndarray":
near = numpy.all([(numpy.isclose(my_data[f_my], other_combi[f_oth])
if issubclass(my_data[f_my].dtype.type,
numpy.inexact) else
abs(my_data[f_my] - other_combi[f_oth]) < timetol
if issubclass(my_data[f_my].dtype.type,
numpy.datetime64) else
my_data[f_my] == other_combi[f_oth])
for (f_my, f_oth) in trans.items()], 0)
elif self.read_returns == "xarray":
near = numpy.all([
(numpy.isclose(my_data[f_my].values,
other_combi[f_oth].values)
if issubclass(my_data[f_my].dtype.type,
numpy.inexact) else
abs(my_data[f_my].values -
other_combi[f_oth].values) < timetol
if issubclass(my_data[f_my].dtype.type,
numpy.datetime64) else
my_data[f_my].values == other_combi[f_oth].values)
for (f_my, f_oth) in trans.items()], 0)
if not near.any():
# check time coverage
first_found = other_data["time"].min().values.astype("M8[ms]").astype(datetime.datetime)
last_found = other_data["time"].max().values.astype("M8[ms]").astype(datetime.datetime)
if (abs(first_found - first) > timetol and
abs(last_found - last) > timetol):
raise ValueError(f"Primary covers "
f"{first:%Y-%m-%d %H:%M:%S}–{last:%Y-%m-%d %H:%M:%S}, "
"but secondary only covers "
f"{first_found:%Y-%m-%d %H:%M:%S}–{last_found:%Y-%m-%d %H:%M:%S}. "
"Finding secondary fails. Perhaps the "
"secondary is coming from a derived dataset compared "
"to where the original collocations were made, and the "
"derived dataset is temporarily absent.")
else:
raise ValueError("Did not find any secondaries! Are times off?")
if not near.all():
logger.warn("Only {:d}/{:d} ({:%}) of secondaries found".format(
near.sum(), near.size, near.sum()/near.size))
if self.read_returns == "ndarray":
try:
other_combi.mask[~near] = numpy.ones_like(other_combi.mask[~near])
except AttributeError: # not a MaskedArray yet
other_combi = numpy.ma.masked_where(~near, other_combi)
elif self.read_returns == "xarray":
vars_with_dim = [k for k in other_combi.data_vars.keys()
if other_dim in other_combi[k].dims]
for v in vars_with_dim:
other_combi[v][{"time": ~near}] = numpy.nan
return other_combi
[docs]
def get_additional_field(self, M, fld):
"""Get additional field.
Get field from other dataset, original objects, or otherwise.
To be implemented by subclass implementations.
Exact fields depend on subclass.
Arguments:
M (ndarray): ndarray with existing data
A (masked) array with a dtype such as returned from
`self.read <Dataset.read>`.
fld (str): Additional field to read from original data
Returns:
ndarray with fields of M + fld.
"""
raise NotImplementedError("Must be implemented by child-class")
[docs]
def as_xarray_dataset(self):
raise NotImplementedError("Dataset {!s} has not implemented "
"conversion to xarray".format(self))
[docs]
class SingleFileDataset(Dataset):
"""Represents a dataset where all measurements are in one file.
"""
start_date = datetime.datetime.min
end_date = datetime.datetime.max
srcfile = None
# docstring in parent
[docs]
def find_granules(self, start=datetime.datetime.min,
end=datetime.datetime.max,
include_last_before=False):
if start < self.end_date and end > self.start_date:
yield self.srcfile
# docstring in parent
[docs]
def find_granules_sorted(self, start=datetime.datetime.min,
end=datetime.datetime.max,
include_last_before=False):
yield from self.find_granules(start, end)
[docs]
def get_times_for_granule(self, gran=None):
return (self.start_date, self.end_date)
[docs]
def read(self, f=None, fields="all"):
return super().read(f or self.srcfile, fields)
[docs]
def find_most_recent_granule_before(self, instant,
**locator_args):
if instant > self.start_date:
yield from self.find_granules(**locator_args)
else:
raise GranuleLocatorError("Instant out of range: "
"{:%Y-%m-%d %H:%M:%S}".format(instant))
[docs]
class MultiFileDataset(Dataset):
"""Represents a dataset where measurements are spread over multiple
files.
If filenames contain timestamps, this information is used to determine
the time for a granule or measurement. If filenames do not contain
timestamps, this information is obtained from the file contents.
Attributes:
basedir (pathlib.Path or str):
Describes the directory under which all granules are located.
Can be either a string or a pathlib.Path object.
subdir (pathlib.Path or str):
Describes the directory within basedir where granules are
located. May contain string formatting directives where
particular fields are replaces, such as `year`, `month`, and
`day`. For example: `subdir = '{year}/{month}'`. Sorting
cannot be more narrow than by day.
re (str):
Regular expression that should match valid granule files within
`basedir` / `subdir`. Should use symbolic group names to capture
relevant information when possible, such as starting time, orbit
number, etc. For time identification, relevant fields are
contained in MultiFileDataset.date_info, where each field also
exists in a version with "_end" appended.
MultiFileDataset.refields contains all recognised fields.
If any _end fields are found, the ending time is equal to the
beginning time with any _end fields replaced. If no _end
fields are found, the `granule_duration` attribute is used to
determine the ending time, or the file is read to get the
ending time (hopefully the header is enough).
granule_cache_file (pathlib.Path or str):
If set, use this file to cache information related to
granules. This is used to cache granule times if those are
not directly inferred from the filename. Otherwise, this is
not used. The full path to this file shall be `basedir` /
`granule_cache_file`.
granule_duration (datetime.timedelta):
If the filename contains starting times but no ending times,
granule_duration is used to determine the ending time. This
should be a datetime.timedelta object.
"""
basedir = None
subdir = ""
re = None
_re = None # compiled version, do not touch
granule_cache_file = None
_granule_start_times = None
granule_duration = None
datefields = "year month day hour minute second".split()
# likely extended later. Note that "tod" is also a datefield,
# interpreted as seconds since 00
refields = ["".join(x)
for x in itertools.product(datefields, ("", "_end"))]
valid_field_values = {}
# When set to a string, interpreted as a path to database of granules
# with corresponding first lines. This may be relevant for cases like
# the NOAA and MetOp satelline sensors, where subsequent granules
# contain some repetitions of lines from previous granules.
granules_firstline_file = None
granules_firstline_db = None
[docs]
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._str2path("basedir", "subdir", "granule_cache_file")
self._open_granule_file()
if self.re is not None:
self._re = re.compile(self.re, re.IGNORECASE)
def _str2path(self, *args):
for attr in args:
if (getattr(self, attr) is not None and
not isinstance(getattr(self, attr), pathlib.PurePath)):
setattr(self, attr, pathlib.Path(getattr(self, attr)))
def __getstate__(self):
state = self.__dict__.copy()
del state["_granule_start_times"]
return state
def __setstate__(self, state):
self.__dict__.update(state)
self._open_granule_file()
[docs]
def find_dir_for_time(self, dt):
"""Find the directory containing granules/measurements at (date)time
For a given datetime object, find the directory that contains
granules/measurument files for this particular time.
Arguments:
dt (datetime.datetime): Timestamp for inquiry.
In reality, any object with `year`, `month`, and `day`
attributes works.
Returns:
pathlib.Path object pointing to to relevant directory
"""
return pathlib.Path(str(self.basedir / self.subdir).format(
year=dt.year, month=dt.month, day=dt.day,
doy=dt.timetuple().tm_yday))
[docs]
def get_mandatory_fields(self):
fm = string.Formatter()
fields = {f[1] for f in fm.parse(str(self.basedir / self.subdir))}
if fields == {None}:
fields = set()
return fields - set(self.datefields)
[docs]
def verify_mandatory_fields(self, extra):
mandatory = self.get_mandatory_fields()
found = set(extra.keys())
missing = set()
if not mandatory <= found:
missing = mandatory - found
# I'm deleting elements so I must loop over a copy
for elem in missing.copy():
try:
extra[elem] = getattr(self, elem)
except AttributeError:
pass
else:
missing.remove(elem)
if missing:
raise GranuleLocatorError("Missing fields needed to search for "
"{self.name:s} files. Need: {mandatory:s}. "
"Found: {found:s}. Missing: {missing:s}".format(
self=self,
mandatory=", ".join(mandatory),
found=", ".join(found) or "(none)",
missing=", ".join(missing)))
for field in found:
if field in self.valid_field_values:
if not extra[field] in self.valid_field_values[field]:
raise GranuleLocatorError("Encountered invalid {field!s} "
"when searching for {self.name!s} files. "
"Found: {value!s}. Valid: "
"{valid_values!s}".format(
field=field, self=self, extra=extra,
value=extra[field],
valid_values=self.valid_field_values[field]))
[docs]
def get_subdir_resolution(self):
"""Return the resolution for the subdir precision.
Returns "year", "month", "day", or None (if there is no subdir).
Based on parsing of self.subdir attribute.
"""
fm = string.Formatter()
fields = {f[1] for f in fm.parse(str(self.subdir))}
if "day" in fields:
return "day"
if "month" in fields:
if "doy" in fields:
raise ValueError("Format string has both month and doy")
return "month"
if "year" in fields:
if "doy" in fields:
return "day"
return "year"
[docs]
def iterate_subdirs(self, d_start, d_end, **extra):
"""Iterate through all subdirs in dataset.
Note that this does not check for existance of those directories.
Yields a 2-element tuple where the first contains information on
year(/month/day), and the second is the path.
Arguments:
d_start (datetime.date): Starting date.
d_end (datetime.date): Ending date
**extra: Any extra keyword arguments. This will be passed on to
format self.basedir / self.subdir, in case the standard fields
like year, month, etc. do not provide enough information.
Yields:
pathlib.Path objects for each directory in the dataset
containing files between `d_start` and `d_end`.
"""
self.verify_mandatory_fields(extra)
# depending on resolution, iterate by year, month, or day.
# Resolution is determined by provided fields in self.subdir.
d = d_start
res = self.get_subdir_resolution()
pst = str(self.basedir / self.subdir)
if res == "year":
year = d.year
while datetime.date(year, 1, 1) <= d_end:
yield (dict(year=year),
pathlib.Path(pst.format(year=year, **extra)))
year += 1
elif res == "month":
year = d.year
month = d.month
while datetime.date(year, month, 1) <= d_end:
yield (dict(year=year, month=month),
pathlib.Path(pst.format(year=year, month=month,
**extra)))
if month == 12:
year += 1
month = 1
else:
month += 1
elif res == "day":
#while d < d_end:
if any(x[1] == "doy" for x in string.Formatter().parse(pst)):
while d <= d_end:
doy = d.timetuple().tm_yday
yield (dict(year=d.year, doy=doy),
pathlib.Path(pst.format(year=d.year, doy=doy,
**extra)))
d += datetime.timedelta(days=1)
else:
while d <= d_end:
yield (dict(year=d.year, month=d.month, day=d.day),
pathlib.Path(pst.format(
year=d.year, month=d.month, day=d.day,
**extra)))
d = d + datetime.timedelta(days=1)
else:
yield ({}, pathlib.Path(pst.format(**extra)))
[docs]
def find_granules(self, dt_start=None, dt_end=None,
include_last_before=False, **extra):
"""Yield all granules/measurementfiles in period
Accepts extra keyword arguments. Meaning depends on actual
dataset. Could be something like a satellite name in the case of
sensors occurring on multiple platforms, like HIRS. To see what
keyword arguments are accepted or possibly needed for a particular
dataset, call self.get_path_format_variables()
If keyword argument `return_time` is present and True, yield
tuples of (start_time, path) rather than just `path`.
The results are usually sorted by start time, but this is not
guaranteed and depends on the filesystem. If you need sorted
granules, please use find_granules_sorted.
Arguments:
d_start (datetime.date): Starting date.
d_end (datetime.date): Ending date
include_last_before (bool): Include last granule starting
before.
**extra: Any extra keyword arguments. This will be passed on to
format self.basedir / self.subdir, in case the standard fields
like year, month, etc. do not provide enough information.
Yields:
pathlib.Path objects for each datafile in the dataset between
`dt_start` and `dt_end`.
"""
if dt_start is None:
dt_start = self.start_date
if dt_end is None:
dt_end = self.end_date
return_time = extra.pop("return_time", False)
self.verify_mandatory_fields(extra)
d_start = (dt_start.date()
if isinstance(dt_start, datetime.datetime)
else dt_start)
d_end = (dt_end.date()
if isinstance(dt_end, datetime.datetime)
else dt_end)
found_any_dirs = False
found_any_grans = False
logger.debug(("Searching for {!s} granules between {!s} and {!s} "
).format(self.name, dt_start, dt_end))
before = None
if include_last_before and dt_start > self.start_date:
try:
before = self.find_most_recent_granule_before(dt_start,
return_time=return_time, **extra)
yield before
except GranuleLocatorError: # no problem
pass
for (timeinfo, subdir) in self.iterate_subdirs(d_start, d_end,
**extra):
if subdir.exists() and subdir.is_dir():
logger.debug("Searching directory {!s}".format(subdir))
found_any_dirs = True
for child in subdir.iterdir():
if before is not None and child == before[1]: # already yielded it as "last before"
continue
m = self._re.fullmatch(child.name)
if m is not None:
fit = True
for k in m.groupdict().keys() & extra.keys():
if m[k].lower() != extra[k].lower():
fit = False # not the right file
break
if not fit:
continue
found_any_grans = True
try:
(g_start, g_end) = self.get_times_for_granule(child,
**timeinfo)
except InvalidFileError as e:
logger.error(
"Skipping {!s}. Problem: {}".format(
child, e.args[0]))
continue
if g_end >= dt_start and g_start <= dt_end:
if return_time:
yield (g_start, child)
else:
yield child
if not found_any_dirs:
logger.warning("Found no directories. Make sure {self.name:s} has "
"coverage in {dt_start:%Y-%m-%d %H:%M:%S} – "
"{dt_end:%Y-%m-%d %H:%M:%S} and that you spelt any "
"additional information correctly: ".format(self=self,
dt_start=dt_start, dt_end=dt_end) + str(extra) + " Satellite "
"names and other fields are case-sensitive!")
elif not found_any_grans:
logger.warning("Directories searched appear to contain no matching "
"files. Make sure basedir, subdir, and regexp are "
"correct and you did not misspell any extra "
"information: " + str(extra))
[docs]
def find_granules_sorted(self, dt_start=None, dt_end=None,
include_last_before=False, **extra):
"""Yield all granules, sorted by times.
For documentation, see :func:`~Dataset.find_granules`.
"""
allgran = list(self.find_granules(dt_start, dt_end,
include_last_before, **extra))
# I've been through all granules at least once, so all should be
# cached now; no need for additional hints when granule timeinfo
# obtainable only with hints from subdir, which is not included in
# the re-matching method
if extra.get("return_time", False):
yield from sorted(allgran)
else:
yield from sorted(allgran, key=self.get_times_for_granule)
[docs]
def find_most_recent_granule_before(self, instant, **locator_args):
# docstring in parent class
return_time = locator_args.pop("return_time", False)
res = self.get_subdir_resolution()
if res == "day":
d0 = datetime.datetime(instant.year, instant.month, instant.day,
0, 0, 0)
d1 = d0 + datetime.timedelta(days=2) # closed interval
d0 = d0 - datetime.timedelta(days=1) # granule may start yesterday
elif res == "month":
d0 = datetime.datetime(instant.year, instant.month, 1,
0, 0, 0)
d1 = d0 + datetime.timedelta(days=31*2)
d0 = d0 - datetime.timedelta(days=31)
elif res == "year":
d0 = datetime.datetime(instant.year-1, 1, 1,
0, 0, 0)
d1 = datetime.datetime(instant.year+1, 12, 31, 23, 59, 59)
else:
d0 = d1 = None # search entire dataset
first = True
for (time_st, gran) in self.find_granules_sorted(d0, d1,
return_time=True, **locator_args):
if time_st > instant:
if not first:
if return_time:
return (lasttime, lastgran)
else:
return lastgran
break
lasttime = time_st
lastgran = gran
first = False
raise GranuleLocatorError("Can not find any granule "
"before {:%Y-%m-%d %H:%M:%S}".format(instant))
@staticmethod
def _getyear(gd, s, alt):
"""Extract year info from group-dict
Taking a group dict and a string, get an int for the year.
The group dict should come from re.fullmatch().groupdict(). The
second argument is typically "year" or "year_end". If this is a
4-digit string, it is taken as a year. If it is a 2-digit string,
it is taken as a 2-digit year, which is taken as 19xx for <= 68,
and 20xx for >= 69, according to POSIX and ISO C standards.
If there is no match, return alt.
Arguments:
gd (dict): Group-dict such as returned by re module
s (str): String to match for
int (str): Value returned in case there is no match
Returns:
int: year
"""
if s in gd:
i = int(gd[s])
if len(gd[s]) == 2:
return 1900 + i if i> 68 else 2000 + i
elif len(gd[s]) == 4:
return i
else:
raise ValueError("Found {:d}-digit string for the year. "
"Expected 2 or 4 digits. Giving up.".format(len(gd[s])))
else:
return alt
[docs]
def get_info_for_granule(self, p):
"""Return dict (re.fullmatch) for granule, based on re
Arguments:
p (pathlib.Path): path to granule
Returns:
dict: dictionary with info, such as returned by
:func:`re.fullmatch`.
"""
if not isinstance(p, pathlib.Path):
p = pathlib.Path(p)
m = self._re.fullmatch(p.name)
return m.groupdict()
[docs]
def get_times_for_granule(self, p, **kwargs):
"""For granule stored in `path`, get start and end times.
May take hints for year, month, day, hour, minute, second, and
their endings, according to self.datefields
Arguments:
p (pathlib.Path): path to granule
**kwargs: Any more info that may be needed
Returns:
(datetime, datetime): Start and end time for granule
"""
if not isinstance(p, pathlib.PurePath):
p = pathlib.PurePath(p)
if str(p) in self._granule_start_times.keys():
(start, end) = self._granule_start_times[str(p)]
else:
gd = self.get_info_for_granule(p)
if (any(f in gd.keys() for f in self.datefields) and
(any(f in gd.keys() for f in {x+"_end" for x in self.datefields})
or self.granule_duration is not None)):
st_date = [int(gd.get(p, kwargs.get(p, 0))) for p in self.datefields]
td = datetime.timedelta()
if st_date[1] == st_date[2] == 0:
if "doy" in gd:
td += datetime.timedelta(days=int(gd["doy"])-1)
# month and day can't be 0...
st_date[1] = st_date[1] or 1
st_date[2] = st_date[2] or 1
# maybe it's a two-year notation
st_date[0] = self._getyear(gd, "year", kwargs.get("year", 0))
try:
start = datetime.datetime(*st_date)
except ValueError as e:
raise InvalidFileError("File {!s} has invalid "
"starting datetime, giving up".format(p)) from v
if "tod" in gd and start.time() == datetime.time(0):
td += datetime.timedelta(seconds=int(gd["tod"]))
start += td
if any(k.endswith("_end") for k in gd.keys()):
# FIXME: Does this go well at the end of
# year/month/day boundary? Should really makes sure
# that end is the first time occurance after
# start_time fulfilling the provided information.
end_date = st_date.copy()
end_date[0] = self._getyear(gd, "year_end", kwargs.get("year_end", st_date[0]))
end_date[1:] = [int(gd.get(p+"_end",
kwargs.get(p+"_end", sd_x)))
for (p, sd_x) in zip(self.datefields[1:],
st_date[1:])]
try:
end = datetime.datetime(*end_date)
except ValueError as v:
raise InvalidFileError("File {!s} has invalid "
"ending datetime, giving up".format(p)) from v
if end_date < st_date: # must have crossed date boundary
end += datetime.timedelta(days=1)
elif self.granule_duration is not None:
end = start + self.granule_duration
else:
raise RuntimeError("This code should never execute")
else:
# implementation depends on dataset
(start, end) = self.get_time_from_granule_contents(str(p))
self._granule_start_times[str(p)] = (start, end)
return (start, end)
# not an abstract method because subclasses need to implement it /if
# and only if starting/ending times cannot be determined from the filename
[docs]
def get_time_from_granule_contents(self, p):
"""Get datetime objects for beginning and end of granule
If it returns None, then use same as start time.
Arguments:
p (pathlib.Path): Path to file
Returns:
(datetime, datetime): 2-tuple for start and end times
"""
raise ValueError(
("To determine starting and end-times for a {0} dataset, "
"I need to read the file. However, {0} has not implemented the "
"get_time_from_granule_contents method.".format(
type(self).__name__)))
def _open_granule_file(self):
if self.granule_cache_file is not None:
p = str(self.basedir / self.granule_cache_file)
try:
self._granule_start_times = shelve.open(p, protocol=4)
except OSError:
logger.error(("Unable to open granule file {} RW. "
"Opening copy instead.").format(p))
tf = tempfile.NamedTemporaryFile()
shutil.copyfile(p, tf.name)
#self._granule_start_times = shelve.open(p, flag='r')
self._granule_start_times = shelve.open(tf.name)
else:
self._granule_start_times = {}
def _filter_firstline(self, f, M):
# by default, do nothing
raise NotImplementedError("Dataset {self:s} {self.name:s} needs "
"database for storing a table with the first new scanline per "
"granule, i.e. the first line not contained in the previous. "
"Please define `granules_firstline` in source-code or "
"configuration file.")
[docs]
class SingleMeasurementPerFileDataset(MultiFileDataset):
"""Represents datasets where each file contains one measurement.
An example of this would be ACE-FTS, or some radio-occultation datasets.
Attributes:
filename_fields (Mapping[str, dtype])
dict with {name, dtype} for fields that should be copied from
the filename (as obtained with self.re) into the header
"""
granule_duration = datetime.timedelta(0)
filename_fields = {}
[docs]
@abc.abstractmethod
def read_single(self, p, fields="all"):
"""Read a single measurement from a single file.
Shall take one argument (a path object) and return a tuple with
(header, measurement). The header shall contain information like
latitude, longitude, time.
Arguments:
p (pathlib.Path): path to file
fields (Iterable[str] or str): What fields to return.
See :func:`Dataset.read_period` for details.
"""
raise NotImplementedError()
_head_dtype = {}
def _read(self, p, fields="all"):
"""Reads a single measurement converted to ndarray
Arguments:
p (pathlib.Path): path to file
fields (Iterable[str] or str): What fields to return.
See :func:`Dataset.read_period` for details.
"""
(head, body) = self.read_single(p, fields=fields)
dt = [(s+body.shape if len(s)==2 else (s[0], s[1], s[2]+body.shape))
for s in body.dtype.descr]
dt.extend([("lat", "f8"), ("lon", "f8"), ("time", "M8[s]")])
dt.extend([(s, self._head_dtype[s])
for s in (head.keys() & self._head_dtype.keys())
if s not in {"lat", "lon", "time"}])
if self.filename_fields:
info = self.get_info_for_granule(p)
dt.extend(self.filename_fields.items())
D = numpy.ma.empty(1, dt)
for nm in body.dtype.names:
D[nm] = body[nm]
for nm in {"lat", "lon", "time"}:
D[nm] = head[nm]
for nm in head.keys() & D.dtype.names:
if nm not in {"lat", "lon", "time"}:
D[nm] = head[nm]
if self.filename_fields:
for nm in self.filename_fields.keys():
D[nm] = info[nm]
return (D, {})
[docs]
class NetCDFDataset:
"""Mixin for any dataset where the contents are in NetCDF.
This may provide a good default for any NetCDF-based dataset. The
reading routine will take the most commonly occurring dimension as the
ndarray axes, and the rest within structured multidimensional dtype.
USE WITH CARE! PROVISIONAL API!
This one should use xarray, of course! See
https://arts.mi.uni-hamburg.de/trac/rt/ticket/145
"""
read_returns = "ndarray" # can be set to xarray
def _get_dtype_from_vars(self, alldims, allvars, fields, prim):
"""Get dtype from alldims, allvars
"""
return numpy.dtype([
(k,
"f4" if (getattr(v, "Scale", 1)!=1) else v.dtype,
tuple(s for (i, s) in enumerate(v.shape)
if v.dimensions[i] != alldims[prim].name))
for (k, v) in allvars.items()
if ((alldims[prim].name in v.dimensions) if fields=="all"
else (k in fields))])
def _read(self, f, fields="all",
pseudo_fields=None,
prim=None):
if self.read_returns == "ndarray":
return (self._read_ndarray(f, fields, pseudo_fields, prim), {})
elif self.read_returns == "xarray":
ds = xarray.open_dataset(f)
if fields != "all":
ds = ds[fields]
if pseudo_fields is not None:
warnings.warn("Ignoring pseudo-fields", UserWarning)
# if there is any problem loading the file, I want to know
# that now, not whenever I happen to read the data
ds.load()
return (ds, {})
def _read_ndarray(self, f, fields="all", pseudo_fields=None,
prim=None):
if pseudo_fields is None:
pseudo_fields = {}
with netCDF4.Dataset(f, 'r') as ds:
# generic conversion of NetCDF to ndarray; consider the most
# common dimension, make this one the shape of the ndarray,
# copy over all variables that share this dimension, and
# ignore the rest, unless variables are passed explicitly
# first do the “pseudo fields” so I know the dtype before I
# create the structured dtype
extra = {nm: f(ds) for (nm, f) in pseudo_fields.items()}
#
# if contained in one layer of groups, flatten those first
if ds.groups != {}: # NB: empty OrderedDict == {}
alldims = collections.OrderedDict(
itertools.chain.from_iterable(
ds[x].dimensions.items() for x in ds.groups.keys()))
allvars = collections.OrderedDict(
itertools.chain.from_iterable(
ds[x].variables.items() for x in ds.groups.keys()))
else:
alldims = ds.dimensions
allvars = ds.variables
# count most frequent dimensions
cnt = collections.Counter(
itertools.chain.from_iterable(
allvars[var].dimensions for var in allvars.keys()))
prim = prim or cnt.most_common(1)[0][0]
n = alldims[prim].size
M = numpy.zeros(shape=(n,),
dtype=self._get_dtype_from_vars(alldims, allvars,
fields, prim).descr +
[(nm, v.dtype, v.shape[1:]) for (nm, v) in
extra.items()])
M = numpy.ma.masked_array(M)
for v in M.dtype.names:
try:
M[v][...] = extra[v][...]
except KeyError:
# should be direct instead
pass
else:
continue
try:
M[v][...] = allvars[v][...] * getattr(allvars[v], "Scale", 1)
try:
M[v].mask[M[v]==allvars[v]._FillValue] = True
except AttributeError:
# no fill value or not a masked array...?
pass
except TypeError:
pass # probably not a numeric type
return M
[docs]
class HomemadeDataset(NetCDFDataset, MultiFileDataset):
"""For any dataset created by typhon or its dependencies.
Currently supports only saving to npz, through the save_npz method.
Eventually, should also support other file formats, in particular
NetCDF.
"""
stored_name = None
write_basedir = write_subdir = None
[docs]
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._str2path("write_basedir", "write_subdir", "stored_name")
[docs]
def find_granule_for_time(self, **kwargs):
"""Find granule for specific time.
May or may not exist.
Arguments (kw only) are passed on to format directories stored in
self.basedir / self.subdir / self.stored_name, along with
self.__dict__. One special keyword, "write", can be used when
writing data, as some datasets in "production mode" have different
files for writing than reading.
Returns path to granule.
"""
if kwargs.get("mode") == "write":
d = (getattr(self, "write_basedir", self.basedir) /
getattr(self, "write_subdir", self.subdir) /
self.stored_name)
else:
d = self.basedir / self.subdir / self.stored_name
# next line inspired by http://stackoverflow.com/a/32196401/974555
# which was posted by Alek Kowalczyk on 2015-08-25 and licensed
# under CC-BY-SA 3.0. I believe my code is sufficiently far from
# the original to not be a derivative work and that it can legally
# be MIT-licensed.
subsdict = {attr: getattr(self, attr)
for attr in dir(self)}
subsdict.update(**kwargs)
nm = pathlib.Path(str(d).format(**subsdict))
return nm
def _read(self, f, fields="all"):
if f.endswith("npz") or f.endswith("npy"):
if fields != "all":
warnings.warn("Ignoring fields≠'all'!", UserWarning)
return (numpy.load(f)["arr_0"], {})
elif f.endswith("nc"):
try:
return super()._read(f, fields=fields)
except ValueError as v:
if v.args[0] == ("zero-size array to reduction operation "
"minimum which has no identity"):
# A bug in xarray will cause a ValueError if trying to
# decode the times in a NetCDF file with length 0.
# See https://github.com/pydata/xarray/issues/1329 .
# Although I can't presently solve this (not decoding
# would be undesirable due to inconsistencies), I can
# at least replace it by an exception that means "bad
# data".
raise InvalidDataError("It looks like the NetCDF file "
"is empty or at least has a datetime variable with "
"length 0. Due to "
"https://github.com/pydata/xarray/issues/1329 "
"I can't currently handle that. Original message "
"was: " + v.args[0]) from v
else:
raise
# def find_granules(self, start, end):
# raise StopIteration()
# @abc.abstractmethod
# def quicksave(self, f):
# """Quick save to file
#
# :param str f: File to save to
# """
[docs]
def save_npz(self, path, M):
"""Save to compressed npz
Arguments:
path (pathlib.Path): Path to store to
M (ndarray): Contents of what to store.
"""
p = pathlib.Path(path)
p.parent.mkdir(parents=True, exist_ok=True)
numpy.savez_compressed(str(path), M)
[docs]
class MultiSatelliteDataset(metaclass=abc.ABCMeta):
satellites = set()
@property
def valid_field_values(self):
return {"satname": self.satellites}
[docs]
class HyperSpectral(Dataset, em.FwmuMixin):
"""Superclass for any hyperspectral instrument
"""
freqfile = None
# Not yet transferred from pyatmlab to typhon, not clean enough:
#
# - ProfileDataset
# - StationaryDataset
# - HyperSpectral
[docs]
class DatasetDeque:
"""A deque-like object sliding through a dataset.
For a particular dataset, keep data corresponding to time period in
memory. When reading new data, discard a corresponding time period of
data in the past. This should be useful to process longer periods of
data that do not fit into memory, to calculate sliding window
statistics, or to perform processing where values nearby in time are
required.
"""
dsobj = window = init_time = center_time = data = None
[docs]
def __init__(self, ds, window, init_time, *args, **kwargs):
"""Initialise a DatasetDeque
Arguments:
ds [Dataset]: Dataset that this belongs to, for example,
HIRS(). This dataset must have an implementation of
.as_xarray_dataset().
window [timedelta]: Duration that should be kept in memory.
For example, datetime.timedelta(hours=48).
init_time [datetime]: Instant around which initial window
shall be centred.
Remaining arguments passed to read_period.
"""
self.dsobj = ds
self.window = window
self.init_time = init_time
self.center_time = init_time
self.reset(newtime=self.init_time, *args, **kwargs)
[docs]
def reset(self, newtime=None, *args, **kwargs):
"""Reset to initial conditions
Sets data to indicated time or self.init_time ± self.window/2
Arguments:
newtime [datetime.datetime]: optional, time at which to
center window. If not given, reset to initial time.
Remaining arguments passed to read_period.
"""
self.edges = (self.init_time-self.window/2,
self.init_time+self.window/2)
self.center_time = newtime or self.init_time
M = self.dsobj.read_period(
*self.edges, *args, **kwargs)
self.data = self.dsobj.as_xarray_dataset(M)
[docs]
def move(self, period, *args, **kwargs):
"""Read period on the right, discard period on the left
This moves the window to the right by `period` by reading `period`
new data, appending this on the right, and discarding an equally
long period on the left.
Arguments:
period [timedelta]: Duration by which to shift.
Must be positive.
Remaining arguments passed to read_period.
"""
if period > self.window:
raise ValueError("Shifting period ({!s}) exceeds window "
"length ({!s})!".format(period, self.window))
self.center_time += period
ok = False
try:
Mnew = self.dsobj.read_period(
self.edges[1],
self.edges[1]+period,
*args, **kwargs)
except: # syntactically required
raise
else:
datanew = self.dsobj.as_xarray_dataset(Mnew)
ok = True
finally:
self.edges = (self.edges[0] + period,
self.edges[1] + period)
# need to convert to ms or it will become int and indexing will
# fail, see https://github.com/numpy/numpy/issues/8546
# and https://github.com/pydata/xarray/issues/1240
#
# If the very first iteration fails to read the first period and
# is off by X hours, EVERYTHING will be off by X hours if I simply
# reset it based on the first time in self.data! Therefore the
# 'min' function.
# newst = self.data["time"][0].values.astype("M8[ms]") + numpy.timedelta64(period)
newst = min(numpy.datetime64(self.edges[0]), self.data["time"][0].values.astype("M8[ms]") + numpy.timedelta64(period))
# BUG: need workaround for https://github.com/pydata/xarray/issues/1297
all_encodings = {k: v.encoding for (k, v) in self.data.data_vars.items()}
self.data = xarray.concat(
(self.data.sel(time=slice(newst, None)),
datanew),
dim="time")
for (k, v) in self.data.data_vars.items():
v.encoding = all_encodings[k]
[docs]
def resize(self, window):
"""Resize window
"""
self.window = window
self.edges = (self.center_time-window/2, self.center_time+window/2)
self.data = self.dsobj.as_xarray_dataset(
self.dsobj.read_period(*self.edges))
def __repr__(self):
return "<{:s} {:s} centred at {:%Y-%m-%d %H:%M}>".format(
self.__class__.__name__, repr(self.dsobj),
self.center_time)